def send_message_to_facebook(access_token, user_id, message_data):
""" Makes use of Send API:
https://developers.facebook.com/docs/messenger-platform/send-api-reference
"""
headers = {
'Content-Type': 'application/json',
}
params = {
'access_token': access_token,
}
payload = {
'recipient': {
'id': user_id,
},
'message': message_data,
}
url = 'https://graph.facebook.com/v2.6/me/messages'
response = requests.post(url, headers=headers, params=params,
data=json.dumps(payload))
response.raise_for_status()
return response.json()
python类post()的实例源码
def api_call(self, url, http_method, data):
headers = {"Content-Type" : "application/json", "x-api-key" : self.api_key,
"authorization" : "Bearer " + self.access_token}
r = None
if http_method is PyCurlVerbs.POST:
r = requests.post(url, data=json.dumps(data), headers=headers)
elif http_method is PyCurlVerbs.DELETE:
r = requests.delete(url, headers=headers)
elif http_method is PyCurlVerbs.PUT:
r = requests.put(url, data=json.dumps(data), headers=headers)
elif http_method is PyCurlVerbs.GET:
r = requests.get(url, headers=headers)
return r
def primary_auth(self):
""" Performs primary auth against Okta """
auth_data = {
"username": self.username,
"password": self.password
}
resp = requests.post(self.base_url+'/api/v1/authn', json=auth_data)
resp_json = resp.json()
if 'status' in resp_json:
if resp_json['status'] == 'MFA_REQUIRED':
factors_list = resp_json['_embedded']['factors']
state_token = resp_json['stateToken']
session_token = self.verify_mfa(factors_list, state_token)
elif resp_json['status'] == 'SUCCESS':
session_token = resp_json['sessionToken']
elif resp.status_code != 200:
print(resp_json['errorSummary'])
exit(1)
else:
print(resp_json)
exit(1)
return session_token
def upload_prediction(self, file_path):
filename, signedRequest, headers, status_code = self.authorize(file_path)
if status_code!=200:
return status_code
dataset_id, comp_id, status_code = self.get_current_competition()
if status_code!=200:
return status_code
with open(file_path, 'rb') as fp:
r = requests.Request('PUT', signedRequest, data=fp.read())
prepped = r.prepare()
s = requests.Session()
resp = s.send(prepped)
if resp.status_code!=200:
return resp.status_code
r = requests.post(self._submissions_url,
data={'competition_id':comp_id, 'dataset_id':dataset_id, 'filename':filename},
headers=headers)
return r.status_code
def sendRequest(ip, port, route, data=None, protocol="http"):
url = "{protocol}://{ip}:{port}{route}".format(protocol=protocol, ip=ip, port=port, route=route)
if data is not None:
try:
resp = requests.post(url, data=data)
except requests.HTTPError as e:
raise PipelineServiceError("{reason}".format(reason=e))
else:
try:
resp = requests.get(url)
except requests.HTTPError as e:
raise PipelineServiceError("{reason}".format(reason=e))
return resp
def put_comments(self, resource, comment):
""" Post a comment on a file or URL.
The initial idea of VirusTotal Community was that users should be able to make comments on files and URLs,
the comments may be malware analyses, false positive flags, disinfection instructions, etc.
Imagine you have some automatic setup that can produce interesting results related to a given sample or URL
that you submit to VirusTotal for antivirus characterization, you might want to give visibility to your setup
by automatically reviewing samples and URLs with the output of your automation.
:param resource: either a md5/sha1/sha256 hash of the file you want to review or the URL itself that you want
to comment on.
:param comment: the actual review, you can tag it using the "#" twitter-like syntax (e.g. #disinfection #zbot)
and reference users using the "@" syntax (e.g. @VirusTotalTeam).
:return: If the comment was successfully posted the response code will be 1, 0 otherwise.
"""
params = {'apikey': self.api_key, 'resource': resource, 'comment': comment}
try:
response = requests.post(self.base + 'comments/put', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def getESCXBalance(address):
try:
payload = {
"method": "get_balances",
"params": {
"filters":[{"field": "address", "op": "==", "value": address},
{"field": "asset", "op": "==", "value": "ESCX"}],
"filterop": "and"
},
"jsonrpc":"2.0",
"id":0
}
response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
json_data = response.json()
#quantity = json_data.quantity
return (json_data['result'].pop()['quantity']) / 100000000
except:
return 0;
def add_transaction(tx):
"""
POST transaction to etherscan.io.
"""
tx_hex = rlp.encode(tx).encode("hex")
# use https://etherscan.io/pushTx to debug
print("tx_hex:", tx_hex)
url = 'https://api.etherscan.io/api'
url += '?module=proxy&action=eth_sendRawTransaction'
if ETHERSCAN_API_KEY:
'&apikey=%' % ETHERSCAN_API_KEY
# TODO: handle 504 timeout, 403 and other errors from etherscan
response = requests.post(url, data={'hex': tx_hex})
# response is like:
# {'jsonrpc': '2.0', 'result': '0x24a8...14ea', 'id': 1}
# or on error like this:
# {'jsonrpc': '2.0', 'id': 1, 'error': {
# 'message': 'Insufficient funds...', 'code': -32010, 'data': None}}
response_json = response.json()
print("response_json:", response_json)
PyWalib.handle_etherscan_tx_error(response_json)
tx_hash = response_json['result']
# the response differs from the other responses
return tx_hash
def register(name):
# hit api to see if name is already registered
if check_name(name)['status'] == 'error':
print('{} already registered.'.format(name))
else:
# generate new keypair
(pub, priv) = rsa.newkeys(512)
if os.path.exists(KEY_LOCATION) == False:
os.mkdir(KEY_LOCATION)
# save to disk
with open('{}/.key'.format(KEY_LOCATION), 'wb') as f:
pickle.dump((pub, priv), f, pickle.HIGHEST_PROTOCOL)
r = requests.post('{}/names'.format(API_LOCATION), data = {'name' : name, 'n' : pub.n, 'e' : pub.e})
if r.json()['status'] == 'success':
print('Successfully registered new name: {}'.format(name))
else:
print('Error registering name: {}'.format(name))
def _make_request(self, path, cni_envs, expected_status=None):
method = 'POST'
address = config.CONF.cni_daemon.bind_address
url = 'http://%s/%s' % (address, path)
try:
LOG.debug('Making request to CNI Daemon. %(method)s %(path)s\n'
'%(body)s',
{'method': method, 'path': url, 'body': cni_envs})
resp = requests.post(url, json=cni_envs,
headers={'Connection': 'close'})
except requests.ConnectionError:
LOG.exception('Looks like %s cannot be reached. Is kuryr-daemon '
'running?', address)
raise
LOG.debug('CNI Daemon returned "%(status)d %(reason)s".',
{'status': resp.status_code, 'reason': resp.reason})
if expected_status and resp.status_code != expected_status:
LOG.error('CNI daemon returned error "%(status)d %(reason)s".',
{'status': resp.status_code, 'reason': resp.reason})
raise k_exc.CNIError('Got invalid status code from CNI daemon.')
return resp
def scan_file(self, this_file):
""" Submit a file to be scanned by VirusTotal
:param this_file: File to be scanned (32MB file size limit)
:return: JSON response that contains scan_id and permalink.
"""
params = {'apikey': self.api_key}
try:
if type(this_file) == str and os.path.isfile(this_file):
files = {'file': (this_file, open(this_file, 'rb'))}
elif isinstance(this_file, StringIO.StringIO):
files = {'file': this_file.read()}
else:
files = {'file': this_file}
except TypeError as e:
return dict(error=e.message)
try:
response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def rescan_file(self, this_hash):
""" Rescan a previously submitted filed or schedule an scan to be performed in the future.
:param this_hash: a md5/sha1/sha256 hash. You can also specify a CSV list made up of a combination of any of
the three allowed hashes (up to 25 items), this allows you to perform a batch request with
one single call. Note that the file must already be present in our file store.
:return: JSON response that contains scan_id and permalink.
"""
params = {'apikey': self.api_key, 'resource': this_hash}
try:
response = requests.post(self.base + 'file/rescan', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def scan_url(self, this_url):
""" Submit a URL to be scanned by VirusTotal.
:param this_url: The URL that should be scanned. This parameter accepts a list of URLs (up to 4 with the
standard request rate) so as to perform a batch scanning request with one single call. The
URLs must be separated by a new line character.
:return: JSON response that contains scan_id and permalink.
"""
params = {'apikey': self.api_key, 'url': this_url}
try:
response = requests.post(self.base + 'url/scan', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def scan_file(self, this_file, notify_url=None, notify_changes_only=None):
""" Submit a file to be scanned by VirusTotal.
Allows you to send a file for scanning with VirusTotal. Before performing your submissions we encourage you to
retrieve the latest report on the files, if it is recent enough you might want to save time and bandwidth by
making use of it. File size limit is 32MB, in order to submmit files up to 200MB in size you must request a
special upload URL.
:param this_file: The file to be uploaded.
:param notify_url: A URL to which a POST notification should be sent when the scan finishes.
:param notify_changes_only: Used in conjunction with notify_url. Indicates if POST notifications should be
sent only if the scan results differ from the previous analysis.
:return: JSON response that contains scan_id and permalink.
"""
params = {'apikey': self.api_key}
files = {'file': (this_file, open(this_file, 'rb'))}
try:
response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def cancel_rescan_file(self, resource):
""" Delete a previously scheduled scan.
Deletes a scheduled file rescan task. The file rescan api allows you to schedule periodic scans of a file,
this API call tells VirusTotal to stop rescanning a file that you have previously enqueued for recurrent
scanning.
:param resource: The md5/sha1/sha256 hash of the file whose dynamic behavioural report you want to retrieve.
:return: JSON acknowledgement. In the event that the scheduled scan deletion fails for whatever reason, the
response code will be -1.
"""
params = {'apikey': self.api_key, 'resource': resource}
try:
response = requests.post(self.base + 'rescan/delete', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def put_comments(self, resource, comment):
""" Post a comment on a file or URL.
Allows you to place comments on URLs and files, these comments will be publicly visible in VirusTotal
Community, under the corresponding tab in the reports for each particular item.
Comments can range from URLs and locations where a given file was found in the wild to full reverse
engineering reports on a given malware specimen, anything that may help other analysts in extending their
knowledge about a particular file or URL.
:param resource: Either an md5/sha1/sha256 hash of the file you want to review or the URL itself that you want
to comment on.
:param comment: The actual review, you can tag it using the "#" twitter-like syntax (e.g. #disinfection #zbot)
and reference users using the "@" syntax (e.g. @VirusTotalTeam).
:return: JSON response
"""
params = {'apikey': self.api_key, 'resource': resource, 'comment': comment}
try:
response = requests.post(self.base + 'comments/put', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def get_auth_code(self):
""" Get access token for connect to youtube api """
oauth_url = 'https://accounts.google.com/o/oauth2/token'
data = dict(
refresh_token=self.refresh_token,
client_id=self.client_id,
client_secret=self.client_secret,
grant_type='refresh_token',
)
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
response = requests.post(oauth_url, data=data, headers=headers)
response = response.json()
return response.get('access_token')
def getESCXBalance(address):
try:
payload = {
"method": "get_balances",
"params": {
"filters":[{"field": "address", "op": "==", "value": address},
{"field": "asset", "op": "==", "value": "ESCX"}],
"filterop": "and"
},
"jsonrpc":"2.0",
"id":0
}
response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
json_data = response.json()
#quantity = json_data.quantity
return (json_data['result'].pop()['quantity']) / 100000000
except:
return 0;
def create_cd(col_name, type, display_name):
url = 'https://api.kentik.com/api/v5/customdimension'
json_template = '''
{
"name": "{{ column }}",
"type": "{{ data_type }}",
"display_name": "{{ pretty_name }}"
}
'''
t = Template(json_template)
data = json.loads(t.render(column = col_name, data_type = type, pretty_name = display_name))
response = requests.post(url, headers=headers, data=data)
if response.status_code != 201:
print("Unable to create custom dimension column. Exiting.")
print("Status code: {}").format(response.status_code)
print("Error message: {}").format(response.json()['error'])
exit()
else:
print("Custom dimension \"{}\" created as id: {}").format(display_name, \
response.json()['customDimension']['id'])
return(response.json()['customDimension']['id'])
def post_to_all(self, url = '/', data={}):
if (url == '/'):
res = []
for masterip in masterips:
try:
requests.post("http://"+getip(masterip)+":"+master_port+"/isalive/",data=data)
except Exception as e:
logger.debug(e)
continue
res.append(masterip)
return res
data = dict(data)
data['token'] = session['token']
logger.info("Docklet Request: user = %s data = %s, url = %s"%(session['username'], data, url))
result = {}
for masterip in masterips:
try:
res = requests.post("http://"+getip(masterip)+":"+master_port+url,data=data).json()
except Exception as e:
logger.debug(e)
continue
result[masterip] = res
logger.debug("get result from " + getip(masterip))
return result
def get_token():
headers = {
'Host': 'authorization.shanghaidisneyresort.com',
'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8',
'X-NewRelic-ID': 'Uw4BWVZSGwICU1VRAgkH',
'X-Conversation-Id': 'shdrA5320488-E03F-4795-A818-286C658EEBB6',
'Accept': 'application/json',
'User-Agent': 'SHDR/4.1.1 (iPhone; iOS 9.3.5; Scale/2.00)',
'Accept-Language': 'zh-Hans-CN;q=1',
}
data = 'assertion_type=public&client_id=DPRD-SHDR.MOBILE.IOS-PROD&client_secret=&grant_type=assertion'
resp = requests.post('https://authorization.shanghaidisneyresort.com/curoauth/v1/token',
headers=headers, data=data)
resp.raise_for_status()
token = resp.json()['access_token']
return token
def login(self):
"""Log in to the MyQ service."""
params = {
'username': self.username,
'password': self.password
}
login = requests.post(
'https://{host_uri}/{login_endpoint}'.format(
host_uri=HOST_URI,
login_endpoint=self.LOGIN_ENDPOINT),
json=params,
headers={
'MyQApplicationId': self.brand[APP_ID],
'User-Agent': self.USERAGENT
}
)
auth = login.json()
self.security_token = auth['SecurityToken']
self._logger.debug('Logged in to MyQ API')
return True
def test_last_request(self, mock):
response = requests.post(GetMock.url,
headers={'custom-header': 'huseyin'},
data={'name': 'huseyin'})
self.assertEqual(response.status_code, 200)
last_request = mock.last_request
# Test if last request has expected values.
self.assertEqual(last_request.url, GetMock.url)
body = last_request.body
# In python 3 httpretty backend returns binary string for body.
# So we are decoding it back to unicode to test.
if isinstance(body, six.binary_type):
body = body.decode('utf-8')
self.assertEqual(body, 'name=huseyin')
self.assertEqual(last_request.headers.get('custom-header'),
'huseyin')
# Make sure that class's last_request is same as instances.
self.assertIs(GetMock.last_request, mock.last_request)
def test_fetch_from_url_or_retry_post_json(self):
# mocked requests
identifier = "1csb, 2pah"
base_url = c.http_pdbe
endpoint_url = "api/pdb/entry/summary/"
response = response_mocker(kwargs={}, base_url=base_url,
endpoint_url=endpoint_url,
content_type='application/octet-stream',
post=True, data=identifier)
self.fetch_from_url_or_retry = MagicMock(return_value=response)
url = base_url + endpoint_url + identifier
r = self.fetch_from_url_or_retry(url, json=True, post=True, data=identifier,
header={'application/octet-stream'},
retry_in=None, wait=0,
n_retries=10, stream=False).json()
self.assertEqual(r, json.loads('{"data": "some json formatted output"}'))
def call_api(self, method, endpoint, payload):
url = urlparse.urljoin(self.api_base_url, endpoint)
if method == 'POST':
response = requests.post(url, data=payload)
elif method == 'delete':
response = requests.delete(url)
elif method == 'put':
response = requests.put(url, data=payload)
else:
if self.api_key:
payload.update({'api_token': self.api_key})
response = requests.get(url, params=payload)
content = json.loads(response.content)
return content
def verify_single_factor(self, factor_id, state_token):
""" Verifies a single MFA factor """
factor_answer = raw_input('Enter MFA token: ')
req_data = {
"stateToken": state_token,
"answer": factor_answer
}
post_url = "%s/api/v1/authn/factors/%s/verify" % (self.base_url, factor_id)
resp = requests.post(post_url, json=req_data)
resp_json = resp.json()
if 'status' in resp_json:
if resp_json['status'] == "SUCCESS":
return resp_json['sessionToken']
elif resp.status_code != 200:
print(resp_json['errorSummary'])
exit(1)
else:
print(resp_json)
exit(1)
def read_and_report(force_alert=False):
h, t = dht.read_retry(dht.DHT22, TEMP_SENSOR_PIN)
data = {
"reading": {
"humidity": h,
"temperature": t,
"sensor_id": mac
}
}
if force_alert:
data['reading']['force_alert'] = True
print("Sending reading: {}".format(data))
try:
response = requests.post(
"http://{DOMAIN}/readings".format(DOMAIN=SERVER_DOMAIN),
data=json.dumps(data), headers=headers)
print("Reading Sent")
except requests.exceptions.RequestException as e:
print("Error sending reading: {}".format(e))
def api_request(self, call, params, kind='auth', http_call='get'):
"""
General API request. Generally, use the convenience functions below
:param kind: the type of request to make. 'auth' makes an authenticated call; 'basic' is unauthenticated
:param call: the API call to make
:param params: a dict of query parameters
:return: a json response, a BitXAPIError is thrown if the api returns with an error
"""
url = self.construct_url(call)
auth = self.auth if kind == 'auth' else None
if http_call == 'get':
response = requests.get(url, params, headers=self.headers, auth=auth)
elif http_call == 'post':
response = requests.post(url, data = params, headers=self.headers, auth=auth)
else:
raise ValueError('Invalid http_call parameter')
try:
result = response.json()
except ValueError:
result = {'error': 'No JSON content returned'}
if response.status_code != 200 or 'error' in result:
raise BitXAPIError(response)
else:
return result
def create_limit_order(self, order_type, volume, price):
"""
Create a new limit order
:param order_type: 'buy' or 'sell'
:param volume: the volume, in BTC
:param price: the ZAR price per bitcoin
:return: the order id
"""
data = {
'pair': self.pair,
'type': 'BID' if order_type == 'buy' else 'ASK',
'volume': str(volume),
'price': str(price)
}
result = self.api_request('postorder', params=data, http_call='post')
return result
def validate_captcha(captcha_post):
"""
Validates the Google re-captcha data.
:param str captcha_post: The re-captcha post data
:return bool:
"""
validated = False
data = {
'secret': settings.GOOGLE_RECAPTCHA_SECRET_KEY,
'response': captcha_post
}
response = requests.post('https://www.google.com/recaptcha/api/siteverify', data=data, verify=False).json()
if response['success']:
validated = True
return validated