def posturl(self, scanUrl):
"""
Send a URL to Cuckoo
"""
data = {"url": scanUrl}
try:
res = requests.post(urljoin(self.url_base, "tasks/create/url").encode("utf-8"), data=data, auth=HTTPBasicAuth(
self.api_user,
self.api_passwd
),
verify=False)
if res and res.ok:
print("Cuckoo Request: {}, Task created with ID: {}".format(res.status_code, res.json()["task_id"]))
else:
print("Cuckoo Request failed: {}".format(res.status_code))
except Exception as e:
print("Cuckoo Request failed: {}".format(e))
return
python类HTTPBasicAuth()的实例源码
def run(self, args, logger):
response = request('POST',
'https://eu.battle.net/oauth/token',
auth=HTTPBasicAuth(config.API_KEY, config.API_SECRET),
params=dict(grant_type='client_credentials'),
allow_redirects=False)
if response.status_code != 200:
logger.error("failed to get access token got %s: %s" % (response.status_code, response.content))
return 1
data = response.json()
access_token = data['access_token']
logger.info("writing access_token to %s, expires in %s" % (args.filename, data['expires_in']))
with open(args.filename, 'w') as f:
f.write(access_token)
return 0
def ch138():
ek = 'RZ_CH138_PW'
s = requests.Session()
auth = HTTPBasicAuth('captcha', 'QJc9U6wxD4SFT0u')
url = 'http://captcha.ringzer0team.com:7421'
for _ in xrange(1001):
time.sleep(0.10)
r = s.get('{0}/form1.php'.format(url), auth=auth)
m = re.search(r'if \(A == "([a-z0-9]*)"\)', r.text)
captcha = m.group(1)
r = s.get('{0}/captcha/captchabroken.php?new'.format(url), auth=auth)
payload = {'captcha':captcha}
r = s.post('{0}/captcha1.php'.format(url), auth=auth, data=payload)
doc = lxml.html.document_fromstring(r.text)
alert = doc.xpath('//div[contains(@class, "alert")]')[0]
msg = alert.text_content().strip()
print msg
def __init__(self, host, port, username, password, verify):
"""
Initializes the epoRemote with the information for the target ePO instance
:param host: the hostname of the ePO to run remote commands on
:param port: the port of the desired ePO
:param username: the username to run the remote commands as
:param password: the password for the ePO user
:param verify: Whether to verify the ePO server's certificate
"""
logger.debug('Initializing epoRemote for ePO {} on port {} with user {}'.format(host, port, username))
self._baseurl = 'https://{}:{}/remote'.format(host, port)
self._auth = HTTPBasicAuth(username, password)
self._session = requests.Session()
self._verify = verify
def add_strategy(self, domain, strategy):
"""Add a new domain and authentication strategy.
:param str domain: The domain you wish to match against. For example:
``'https://api.github.com'``
:param str strategy: The authentication strategy you wish to use for
that domain. For example: ``('username', 'password')`` or
``requests.HTTPDigestAuth('username', 'password')``
.. code-block:: python
a = AuthHandler({})
a.add_strategy('https://api.github.com', ('username', 'password'))
"""
# Turn tuples into Basic Authentication objects
if isinstance(strategy, tuple):
strategy = HTTPBasicAuth(*strategy)
key = self._key_from_url(domain)
self.strategies[key] = strategy
def get_strategy_for(self, url):
"""Retrieve the authentication strategy for a specified URL.
:param str url: The full URL you will be making a request against. For
example, ``'https://api.github.com/user'``
:returns: Callable that adds authentication to a request.
.. code-block:: python
import requests
a = AuthHandler({'example.com', ('foo', 'bar')})
strategy = a.get_strategy_for('http://example.com/example')
assert isinstance(strategy, requests.auth.HTTPBasicAuth)
"""
key = self._key_from_url(url)
return self.strategies.get(key, NullAuthStrategy())
artifactory_repo.py 文件源码
项目:ansible-module-artifactory
作者: William1444
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def artifactory_repo_present(data):
del data['state']
headers = {
"Content-Type": "application/json"
}
user = data['user']
password = data['password']
del data['user']
del data['password']
url = "{}/{}/{}".format(data['artifactory'], 'api/repositories', data['key'])
result = requests.put(url, json.dumps(data), headers=headers, auth=HTTPBasicAuth(user, password))
if result.status_code == 200:
return False, True, {"status": result.status_code}
elif result.status_code == 400 and 'errors' in result.json():
for errors in result.json()['errors']:
if 'key already exists' in errors['message']:
return False, False, result.json()
# default: something went wrong
meta = {"status": result.status_code, 'response': result.json()}
return True, False, meta
artifactory_license.py 文件源码
项目:ansible-module-artifactory
作者: William1444
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def artifactory_license(data):
headers = {
"Content-Type": "application/json"
}
user = data['user']
password = data['password']
del data['user']
del data['password']
url = "{}/{}".format(data['artifactory'], 'api/system/license')
result = requests.post(url, json.dumps(data), headers=headers, auth=HTTPBasicAuth(user, password))
if result.status_code == 200:
return False, True, result.json()
# default: something went wrong
meta = {"status": result.status_code, 'response': result.json()}
return True, False, meta
def post(self, data):
"""
Post data to the associated endpoint and await the server's response.
:param data: the data to be posted.
:type data: str or json
"""
auth = None
if self._username is None or self._password is None:
raise ValueError("Username or Password is not set")
else:
auth = HTTPBasicAuth(self._username, self._password)
resp = requests.post(self._endpoint, data=data, json=None,
verify=self._verify_ssl, timeout=self._timeout,
auth=auth)
if resp.text == '':
return {"code": resp.status_code, "name": resp.reason, "message": ""}
return resp.text
def overwrite_comment(self, url, body):
""" overwrite comment on the given api url with body """
payload = { "body": body }
res = self.requests.patch(
url,
data = json.dumps(payload),
auth = HTTPBasicAuth(
self.name,
self.token
)
)
res.raise_for_status()
def post_comment(self, url, body):
""" Add github comment using url endpoint """
payload = { "body": body }
res = self.requests.post(
url,
data = json.dumps(payload),
auth = HTTPBasicAuth(
self.name,
self.token
)
)
res.raise_for_status()
def _get_access_token(self):
"""Get token and refresh_token from mailup."""
data = {
"grant_type": "password",
"username": self.username,
"password": self.password,
}
response = self.post(
ENDPOINT["token"],
data=data,
auth=HTTPBasicAuth(self.client_id, self.client_secret),
)
data = response_parser(response)
self.refresh_token = data["refresh_token"]
return data["access_token"]
tower-server-registration.py 文件源码
项目:cloudcenter-content
作者: datacenter
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def get_host_id(name):
s = requests.Session()
url = tower_base_url+"hosts/"
headers = {
'content-type': "application/json"
}
querystring = {"name": name}
response = s.request("GET", url, headers=headers, params=querystring, verify=False,
auth=HTTPBasicAuth(args.tower_username, args.tower_password))
results = response.json()['results']
if len(results) < 1:
print("No host found with that name.")
sys.exit(1)
elif len(results) > 1:
print("Multiple hosts found with that name, so I won't remove any of them.")
sys.exit(1)
else:
return results[0]['id']
tower-server-registration.py 文件源码
项目:cloudcenter-content
作者: datacenter
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def add_host(host_name, inventory):
s = requests.Session()
url = tower_base_url+"hosts/"
headers = {
'content-type': "application/json"
}
payload = {
"name": host_name,
"enabled": True,
"inventory": inventory
}
s.request("POST", url, headers=headers, data=json.dumps(payload), verify=False,
auth=HTTPBasicAuth(args.tower_username, args.tower_password))
def get_jwt_for_registry(auth_url, registry, appname):
# get auth username and password from dockercfg
try:
cfg = auth.resolve_authconfig(auth.load_config(), registry=registry)
username = cfg['username'] if 'username' in cfg else cfg['Username']
password = cfg['password'] if 'password' in cfg else cfg['Password']
# phase, phase_config = get_phase_config_from_registry(registry)
# domain = phase_config.get(user_config.domain_key, '')
# only use `lain.local` as service
url = "%s?service=%s&scope=repository:%s:push,pull&account=%s" % (
auth_url, "lain.local", appname, username)
response = requests.get(url, auth=HTTPBasicAuth(username, password))
if response.status_code < 400 and response.json()['token']:
return response.json()['token']
except Exception as e:
warn("can not load registry auth config : %s, need lain login first." % e)
return ''
def test_oauth_token(self):
"""oauth_token() makes a POST to /oauth/token with the appropriate headers and query params"""
uaac = UAAClient('http://example.com', 'foo', False)
m = Mock()
uaac._request = m
uaac.oauth_token('foo', 'bar', 'baz')
args, kwargs = m.call_args
assert args == ('/oauth/token', 'POST')
assert kwargs['params'] == {
'code': 'foo',
'grant_type': 'authorization_code',
'response_type': 'token'
}
assert isinstance(kwargs['auth'], HTTPBasicAuth)
assert kwargs['auth'].username == 'bar'
assert kwargs['auth'].password == 'baz'
def test_get_client_token(self):
"""_get_client_token() makes a POST to /oauth/token with the appropriate headers and query params"""
uaac = UAAClient('http://example.com', 'foo', False)
m = Mock()
uaac._request = m
uaac._get_client_token('bar', 'baz')
args, kwargs = m.call_args
assert args == ('/oauth/token', 'POST')
assert kwargs['params'] == {
'grant_type': 'client_credentials',
'response_type': 'token'
}
assert isinstance(kwargs['auth'], HTTPBasicAuth)
assert kwargs['auth'].username == 'bar'
assert kwargs['auth'].password == 'baz'
def _get_client_token(self, client_id, client_secret):
""" Returns the client credentials token
Args:
client_id: The oauth client id that this code was generated for
client_secret: The secret for the client_id above
Raises:
UAAError: there was an error getting the token
Returns:
dict: An object representing the token
"""
response = self._request(
'/oauth/token',
'POST',
params={
'grant_type': 'client_credentials',
'response_type': 'token'
},
auth=HTTPBasicAuth(client_id, client_secret)
)
return response.get('access_token', None)
def complete_pair(self, pin):
# The user should have a PIN on the screen now, pass it in here to complete the pairing process
payload = self._build_json_payload("actRegister",
[{"clientid":self.device_id, "nickname":self.nickname},
[{"value":"no", "function":"WOL"}]])
self.auth = HTTPBasicAuth('',pin) # Going to keep this in the object, just in case we need it again later
r = self.do_POST(url='/sony/accessControl', payload=payload, auth=self.auth)
if r.status_code == 200:
print("have paired")
self.paired = True
# let's call connect again to get the cookies all set up properly
a,b = self.connect()
if b is True:
return r,True
else: return r,False
else:
return None,False