def getESCXBalance(address):
try:
payload = {
"method": "get_balances",
"params": {
"filters":[{"field": "address", "op": "==", "value": address},
{"field": "asset", "op": "==", "value": "ESCX"}],
"filterop": "and"
},
"jsonrpc":"2.0",
"id":0
}
response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
json_data = response.json()
#quantity = json_data.quantity
return (json_data['result'].pop()['quantity']) / 100000000
except:
return 0;
python类auth()的实例源码
def auth():
# Basic Authentication
requests.get('https://api.github.com/user', auth=HTTPBasicAuth('user', 'pass'))
requests.get('https://api.github.com/user', auth=('user', 'pass'))
# Digest Authentication
url = 'http://httpbin.org/digest-auth/auth/user/pass'
requests.get(url, auth=HTTPDigestAuth('user', 'pass'))
# OAuth2 Authentication????requests-oauthlib
url = 'https://api.twitter.com/1.1/account/verify_credentials.json'
auth = OAuth2('YOUR_APP_KEY', 'YOUR_APP_SECRET', 'USER_OAUTH_TOKEN')
requests.get(url, auth=auth)
pass
def getESCXBalance(address):
try:
payload = {
"method": "get_balances",
"params": {
"filters":[{"field": "address", "op": "==", "value": address},
{"field": "asset", "op": "==", "value": "ESCX"}],
"filterop": "and"
},
"jsonrpc":"2.0",
"id":0
}
response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
json_data = response.json()
#quantity = json_data.quantity
return (json_data['result'].pop()['quantity']) / 100000000
except:
return 0;
def retrieve_access_token(self):
""" once you have the authorization code, you can call this function to get
the access_token. The access_token gives you full access to the API and is
valid throughout the day
"""
if self.api_key is None:
raise (TypeError, 'Value api_key cannot be None. Please go to the Developer Console to get this value')
if self.redirect_uri is None:
raise (TypeError, 'Value redirect_uri cannot be None. Please go to the Developer Console to get this value')
if self.api_secret is None:
raise (TypeError, 'Value api_secret cannot be None. Please go to the Developer Console to get this value')
if self.code is None:
raise (TypeError, 'Value code cannot be None. Please visit the login URL to generate a code')
params = {'code': self.code, 'redirect_uri': self.redirect_uri, 'grant_type': 'authorization_code'}
url = self.config['host'] + self.config['routes']['accessToken']
headers = {"Content-Type" : "application/json", "x-api-key" : self.api_key}
r = requests.post(url, auth=(self.api_key, self.api_secret), data=json.dumps(params), headers=headers)
body = json.loads(r.text)
if 'access_token' not in body:
raise SystemError(body);
return body['access_token']
def get_state(self) -> str:
"""Get device state.
Returns:
"on", "off", or "unknown"
"""
if self.state_cmd is None:
return "unknown"
resp = requests.request(self.state_method, self.state_cmd,
data=self.state_data, json=self.state_json,
headers=self.headers, auth=self.auth)
if self.state_response_off in resp.text:
return "off"
elif self.state_response_on in resp.text:
return "on"
return "unknown"
ghost2loggerloopback.py 文件源码
项目:stackstorm-ghost2logger
作者: StackStorm-Exchange
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def _ghost2loggergetGUID(self):
# Let's deal with the JSON API calls here
HEADERS = {"Content-Type": 'application/json'}
_URL = self._ghost_url + "/v1/guid"
try:
r = requests.get(_URL, headers=HEADERS, verify=False,
auth=(self._username, self._password))
_data = r.json()
if _data['host'] == "GUID":
for _item in _data['pattern']:
_guid = str(_item)
return str(_guid)
else:
return ""
except Exception as e:
self._logger.info('[Ghost2logger]: Cannot get GUID: ' +
str(e))
ghost2loggerloopback.py 文件源码
项目:stackstorm-ghost2logger
作者: StackStorm-Exchange
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def _ghost2loggergpurge(self):
# Let's deal with the JSON API calls here
self._logger.info('[Ghost2logger]: Purge Ghost2logger')
HEADERS = {"Content-Type": 'application/json'}
_URL = self._ghost_url + "/v1/all"
try:
r = requests.delete(_URL, headers=HEADERS, verify=False,
auth=(self._username, self._password))
_data = r.json()
self._logger.info('[Ghost2logger]: Purged ghost2logger' +
str(_data))
except Exception as e:
self._logger.info('[Ghost2logger]: Cannot purge Ghost2logger: ' +
str(e))
def __getAppSpaces(self):
"""Returns a list with the name of availables appspaces on FileNet,
to apps variable.
"""
try:
appspaces = requests.get(self.baseurl+'appspacenames',
auth=self.cred)
appspaces.raise_for_status()
self.appspaces = appspaces.json()
self.apps = appspaces.json().keys()
except Exception as e:
print (str(e)+':\n'+str(appspaces.json()['UserMessage']['Text']))
print (appspaces.json())
self.apps = appspaces.json()
def __getQueues(self):
"""Creates a list with URL adresses from workbaskets. Also creates a
dictionary with workbasket name as key and it's URL as value.
"""
for apps in self.roles.keys():
for roles in self.roles.values():
if roles:
for role in roles:
my_role = requests.get(self.baseurl
+'appspaces/'
+apps+'/roles/'
+role,
auth = self.cred)
if my_role.ok:
for uri in my_role.json()['workbaskets'].values():
self.queue_urls.append(uri['URI'])
self.workbaskets[uri['URI'].split(
'/')[-1]] = uri['URI']
def getQueue(self, work_basket):
"""Returns a Queue for a given Workbasket.
Usage:
>>> my_queue = pe.getQueue('workbasket_name')
>>> my_queue.get('count')->Variable with the total tasks in this Queue.
"""
queue = requests.get(self.client.baseurl
+ self.client.workbaskets.get(work_basket),
auth = self.client.cred)
count = requests.get(queue.url + '/queueelements/count',
auth = self.client.cred).json()['count']
queue = queue.json()
queue['count'] = count
return queue
def getAllTasks(self):
"""Returns all tasks from all Queues.
Usage:
>>> tasks = pe.getAllTasks()
When a Queue has no tasks, a message informing which Queues are empty,
will be printed.
"""
tasks = []
for uri in self.client.queue_urls:
queue = requests.get(self.client.baseurl + uri,
auth = self.client.cred)
found_tasks = self.getTasks(queue.json())
if found_tasks:
tasks.append(found_tasks)
return [tsk for task in tasks for tsk in task]
def lockTask(self, task):
"""Receives a task dictionary, obtainned with getTasks() method,
and locks the task so other users can't access this task at same time.
Usage:
>>> pe.lockTask(task)
"""
locked = requests.get(self.client.baseurl
+task['stepElement'],
auth = self.client.cred)
eTag = locked.headers['ETag']
locked = requests.put(self.client.baseurl
+ task['stepElement'],
auth = self.client.cred,
params={'action':'lock',
'If-Match':eTag}
)
def endTask(self, task, comment=None):
"""Receives a task and finishes it, finishing the workflow itself or
moving to the next step in the task. Is also possible to create a
comment before ending the task.
Usage:
>>> pe.endTask(task) #or
>>> pe.endTask(task, u'Completed the task!')
"""
params = {'action':'dispatch'}
step = self.getStep(task)
if step.get('systemProperties').get('responses')\
and not step.get('systemProperties').get('selectedResponse'):
return "This task needs to be updated. Check the updateTask method."
else:
params['selectedResponse'] = 1
if comment:
task = self.saveAndUnlockTask(task, comment)
lock = self.lockTask(task)
params['If-Match'] = task['ETag']
dispatched = requests.put(self.client.baseurl + task['stepElement'],
auth = self.client.cred,
params=params)
def getUser(self, search_string):
"""Receives a string and looks for it in directory service. If the
string search isn't found, the message "User not Found" will be
returned, otherwise a list with all matching cases, limited to 50
results will be returned.
Usage:
>>> users = pe.getUser('user_name')
"""
users = []
user = requests.get(self.client.baseurl+'users',
auth=self.client.cred,
params={'searchPattern':search_string,
'searchType':4, 'limit':50})
if user.json().get('users'):
for usr in user.json()['users']:
users.append(usr['displayName'])
else:
return "User not Found"
return users
def getGroup(self, search_string):
"""Receives a string and looks for it in directory service. If the
string search isn't found, the message "Group not Found" will be
returned, otherwise a list with all matching cases, limited to 50
results will be returned.
Usage:
>>> users = pe.getGroup('group_name')
"""
groups = []
group = requests.get(self.client.baseurl+'groups',
auth=self.client.cred,
params={'searchPattern':search_string,
'searchType':4, 'limit':3000})
if group.json().get('groups'):
for grp in group.json()['groups']:
groups.append(grp['displayName'])
else:
return "Group not Found"
return groups
def sendMessage() -> str:
""" Send a message (internal or external) to the HackerOne report identified by the given ID"""
data = request.get_json(force=True)
message = data['message']
internal = data['internal']
id = data['id']
if config.DEBUG:
print("/v1/sendMessage: id=%s, internal=%s" % (id, internal))
if config.DEBUGVERBOSE:
print("message=%s" % message)
h1Data = {'data': {'type': 'activity-comment',
'attributes': {'message': message,
'internal': internal}}}
headers = {'Content-Type': 'application/json'}
resp = requests.post('https://api.hackerone.com/v1/reports/%s/activities' % id,
headers=headers,
data=json.dumps(h1Data).encode('utf-8'),
auth=(config.apiName, secrets.apiToken))
return json.dumps(resp.json())
def changeStatus() -> str:
""" Change the status of the report at the given ID to the given status """
data = request.get_json(force=True)
status = data['status']
message = data['message']
id = data['id']
if config.DEBUG:
print("/v1/changeStatus: id=%s, status=%s" % (id, status))
if config.DEBUGVERBOSE:
print("message=%s" % message)
h1Data = {'data': {'type': 'state-change',
'attributes': {'message': message,
'state': status}}}
headers = {'Content-Type': 'application/json'}
resp = requests.post('https://api.hackerone.com/v1/reports/%s/state_changes' % id,
headers=headers,
data=json.dumps(h1Data).encode('utf-8'),
auth=(config.apiName, secrets.apiToken))
return json.dumps(resp.json())
def postComment(id: str, vti: VulnTestInfo, internal=False, addStopMessage=False) -> Mapping:
""" Post a comment to the report with the given ID using the information in the given VulnTestInfo
- Set internal=True in order to post an internal comment
- Set addStopMessage=True in order to add the stop message """
if config.DEBUG:
print("Posting comment: internal=%s, reproduced=%s, id=%s" % (str(internal), str(vti.reproduced), id))
if addStopMessage:
message = vti.message + '\n\n' + constants.disableMessage
else:
message = vti.message
postMessage("Posting Message: \n\n%s" % message) # TODO: Delete this
resp = requests.post('http://api:8080/v1/sendMessage',
json={'message': message, 'internal': internal, 'id': id},
auth=HTTPBasicAuth('AutoTriageBot', secrets.apiBoxToken))
if config.triageOnReproduce and vti.reproduced:
changeStatus(id, 'triaged')
return resp.json()
def __init__(self, api_url, auth_custom=None, auth_user=None, auth_pass=None, login=None):
"""
:param api_url: str Moira API URL
:param auth_custom: dict auth custom headers
:param auth_user: str auth user
:param auth_pass: str auth password
:param login: str auth login
"""
if not api_url.endswith('/'):
self.api_url = api_url + '/'
else:
self.api_url = api_url
self.auth = None
self.auth_custom = {'X-Webauth-User': login}
if auth_user and auth_pass:
self.auth = HTTPBasicAuth(auth_user, auth_pass)
if auth_custom:
self.auth_custom.update(auth_custom)
def get(self, path='', **kwargs):
"""
:param path: str api path
:param kwargs: additional parameters for request
:return: dict response
:raises: HTTPError
:raises: InvalidJSONError
"""
r = requests.get(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
r.raise_for_status()
try:
return r.json()
except ValueError:
raise InvalidJSONError(r.content)
def delete(self, path='', **kwargs):
"""
:param path: str api path
:param kwargs: additional parameters for request
:return: dict response
:raises: HTTPError
:raises: InvalidJSONError
"""
r = requests.delete(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
r.raise_for_status()
try:
return r.json()
except ValueError:
raise InvalidJSONError(r.content)
def put(self, path='', **kwargs):
"""
:param path: str api path
:param kwargs: additional parameters for request
:return: dict response
:raises: HTTPError
:raises: InvalidJSONError
"""
r = requests.put(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
r.raise_for_status()
try:
return r.json()
except ValueError:
raise InvalidJSONError(r.content)
def add_torrent(self, torrent_data: Union[str, bytes], download_dir: str=None) -> bool:
self.total_size = 0
self.expected_torrent_name = ''
lf = NamedTemporaryFile()
lf.write(torrent_data)
params = {'action': 'add-file', 'token': self.token}
files = {'torrent_file': open(lf.name, 'rb')}
try:
response = requests.post(
self.UTORRENT_URL,
auth=self.auth,
params=params,
files=files,
timeout=25).json()
lf.close()
if 'error' in response:
return False
else:
return True
except RequestException:
lf.close()
return False
def add_url(self, url: str, download_dir: str=None) -> bool:
self.total_size = 0
self.expected_torrent_name = ''
params = {'action': 'add-url', 'token': self.token, 's': url}
try:
response = requests.get(
self.UTORRENT_URL,
auth=self.auth,
# cookies=self.cookies,
params=params,
timeout=25).json()
if 'error' in response:
return False
else:
return True
except RequestException:
return False
def push_pact(self, *, pact_file, provider, consumer, consumer_version):
request_url = urls.PUSH_PACT_URL.format(
broker_url=self.broker_url,
provider=provider,
consumer=consumer,
consumer_version=consumer_version
)
with open(pact_file) as data_file:
pact_json = json.load(data_file)
response = requests.put(
request_url,
auth=self._auth,
json=pact_json
)
response.raise_for_status()
return response, (
f'Pact between {consumer} and {provider} pushed.'
)
def tag_consumer(self, *, provider, consumer, consumer_version, tag):
request_url = urls.TAG_CONSUMER_URL.format(
broker_url=self.broker_url,
consumer=consumer,
consumer_version=consumer_version,
tag=tag
)
response = requests.put(
request_url,
headers={'Content-Type': 'application/json'},
auth=self._auth
)
response.raise_for_status()
return response, (
f'{consumer} version {consumer_version} tagged as {tag}'
)
def __init__(self, base_url, project, user, password, verify=False, timeout=None, auth_type=None):
if not base_url.endswith('/'):
base_url += '/'
collection, project = self.get_collection_and_project(project)
# Remove part after / in project-name, like DefaultCollection/MyProject => DefaultCollection
# API responce only in Project, without subproject
self._url = base_url + '%s/_apis/' % collection
if project:
self._url_prj = base_url + '%s/%s/_apis/' % (collection, project)
else:
self._url_prj = self._url
self.http_session = requests.Session()
auth = auth_type(user, password)
self.http_session.auth = auth
self.timeout = timeout
self._verify = verify
if not self._verify:
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def _compose_args(self, method, data=None):
args = {}
if data is None:
data = {}
query = 'params' if method == 'get' else 'data'
args[query] = self._structure.get('common-params', {})
args[query].update(data)
args['headers'] = self._structure.get('common-headers', {})
auth = self._structure.get('auth')
if auth == 'params':
args[query].update({'username': self.billing_username,
'password': self.billing_password})
elif auth == 'headers':
args['auth'] = HTTPBasicAuth(
self.billing_username, self.billing_password)
if self._structure.get('verify') == False:
args['verify'] = False
return args
def get_url(url):
'''request url'''
headers = {'Content-type': 'application/json'}
auth = (controller['any_user'], controller['any_pass'])
logging.info(url)
try:
response = requests.get(url, headers=headers, auth=auth, verify=False)
logging.info("Url GET Status: %s" % response.status_code)
if response.status_code in [200]:
return True, response.json()
else:
return False, str(response.text)
except requests.exceptions.ConnectionError, e:
logging.error('Connection Error: %s' % e.message)
return False, str(e.message)
def post_xml(url, data):
'''request post'''
headers = {'Content-type': 'application/xml'}
auth = (controller['any_user'], controller['any_pass'])
try:
response = requests.post(url, data=data, auth=auth, headers=headers, verify=False)
logging.info("Url POST Status: %s" % response.status_code)
if response.status_code in [200, 204]:
if len(response.text) > 0:
return True, response.json()
else:
return True, {}
else:
return False, str(response.text)
except requests.exceptions.ConnectionError, e:
logging.error('Connection Error: %s' % e.message)
return False, str(e.message)