def __init__(self):
self.http_method = None
# mandatory headers for each request
self.http_headers = {
'Authorization': None,
'Date': None,
}
self.dss_url = config.get_service_url('dss')
if(self.dss_url.endswith('/')):
self.dss_url = self.dss_url[:-1]
self.access_key = config.get_access_key()
self.secret_key = config.get_secret_key()
self.is_secure_request = config.check_secure()
self.dss_op_path = None
self.dss_query_str = None
self.dss_query_str_for_signature = None
python类request()的实例源码
def __request(self, method, url, params=None):
logger.info('{} {} Params: {}'.format(method, url, params))
cookies = {'JSESSIONID': self.auth_cookie}
h = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'X-Rundeck-Auth-Token': self.token
}
r = requests.request(
method, url, cookies=cookies, headers=h, json=params,
verify=self.verify
)
logger.debug(r.content)
r.raise_for_status()
try:
return r.json()
except ValueError as e:
logger.error(e.message)
return r.content
def create_request(self, method, path, options):
"""Creating a request with the given arguments
If api_version is set, appends it immediately after host
"""
version = '/' + options['api_version'] if 'api_version' in options else ''
# Adds a suffix (ex: ".html", ".json") to url
suffix = options['response_type'] if 'response_type' in options else 'json'
path = path + '.' + suffix
path = urlparse.urljoin(self.base, version + path)
if 'api_version' in options:
del options['api_version']
if 'response_type' in options:
del options['response_type']
return requests.request(method, path, **options)
def fetch(self):
section = 'hot' # hot | top | user
sort = 'viral' # viral | top | time | rising (only available with user section)
show_viral = 'true'
show_mature = 'true'
album_previews = 'false'
url = f'https://api.imgur.com/3/gallery/{section}/{sort}'
querystring = {"showViral": f"{show_viral}", "mature": f"{show_mature}", "album_previews": f"{album_previews}"}
headers = {'authorization': f'Client-ID {IMGUR_CLIENT_ID}'}
response = requests.request("GET", url, headers=headers, params=querystring)
json = response.json()
self.logger.debug(f'Fetched. Code: {response.status_code}')
return json['data'][:FETCH_LIMIT]
def _request(self, endpoint, method='GET', files=None, headers={}, **kwargs):
params = {}
if method == 'GET':
params = kwargs
data = None
headers = {'Content-Length': '0'}
else:
data = kwargs
response = requests.request(method,
endpoint,
auth=self.auth,
params=params,
json=data,
headers=headers,
files=files
)
if not response.status_code // 100 == 2:
error = WpApiError.factory(response)
raise error
return response.json()
def request(self, method, url, **kwargs):
try:
options = dict(kwargs)
if "headers" not in options:
options["headers"] = {}
options["timeout"] = 5
options["headers"]["User-Agent"] = "healthchecks.io"
r = requests.request(method, url, **options)
if r.status_code not in (200, 201, 204):
return "Received status code %d" % r.status_code
except requests.exceptions.Timeout:
# Well, we tried
return "Connection timed out"
except requests.exceptions.ConnectionError:
return "Connection failed"
def find_finger():
ps = request.vars.ps
import serial
ser = serial.Serial('/dev/ttyACM0', 9600)
ser.write('5')
# ser.write(b'5') #Prefixo b necessario se estiver utilizando Python 3.X
while True:
line = ser.readline()
print line
if "FINGERFOUND" in line:
id = line.split(",")[1]
ser.close()
r = requests.get("http://174.138.34.125:8081/walletapi/customer/%s/" % id)
obj = json.loads(r.text)
if ps == obj['password']:
return r.text
else:
return 'error'
def _request(self, action='GET', url='/', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
default_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': 'Bearer {0}'.format(self.options.get('auth_token'))
}
if not url.startswith(self.api_endpoint):
url = self.api_endpoint + url
r = requests.request(action, url, params=query_params,
data=json.dumps(data),
headers=default_headers)
r.raise_for_status() # if the request fails for any reason, throw an error.
if action == 'DELETE':
return ''
else:
return r.json()
def _request(self, action='GET', url='/', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
default_headers = {
'Accept': 'application/json',
# 'Content-Type': 'application/json',
'API-Key': self.options['auth_token']
}
r = requests.request(action, self.api_endpoint + url, params=query_params,
data=data,
headers=default_headers)
r.raise_for_status() # if the request fails for any reason, throw an error.
if action == 'DELETE' or action == 'PUT' or action == 'POST':
return r.text # vultr handles succss/failure via HTTP Codes, Only GET returns a response.
return r.json()
def _request(self, action='GET', url='/', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
default_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'X-NSONE-Key': self.options['auth_token']
}
default_auth = None
r = requests.request(action, self.api_endpoint + url, params=query_params,
data=json.dumps(data),
headers=default_headers,
auth=default_auth)
r.raise_for_status() # if the request fails for any reason, throw an error.
return r.json()
def _request(self, action='GET', url='/', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
query_params['format'] = 'json'
query_params['_user'] = self.options['auth_username']
query_params['_key'] = self.options['auth_token']
default_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
r = requests.request(action, self.api_endpoint + url, params=query_params,
data=json.dumps(data),
headers=default_headers)
r.raise_for_status() # if the request fails for any reason, throw an error.
return r.json()
def _request(self, action='GET', url='/', data=None, query_params=None):
if not data:
data = {}
if not query_params:
query_params = {}
result = requests.request(action, self.api_endpoint + url,
params=query_params,
data=json.dumps(data),
headers={
'Content-Type': 'application/json',
'Accept': 'application/json',
# GoDaddy use a key/secret pair to authenticate
'Authorization': 'sso-key {0}:{1}'.format(
self.options.get('auth_key'),
self.options.get('auth_secret'))
})
result.raise_for_status()
return result.json()
def _request(self, action='GET', url='/', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
query_params['format'] = 'json'
default_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
credentials = (self.options['auth_username'], self.options['auth_token'])
response = requests.request(action,
self.api_endpoint + url,
params=query_params,
data=json.dumps(data),
headers=default_headers,
auth=credentials)
# if the request fails for any reason, throw an error.
response.raise_for_status()
return response.json()
# Adds TTL parameter if passed as argument to lexicon.
def _request(self, action='GET', url='/', data=None, query_params=None):
# Set default values for missing arguments
data = data if data else {}
query_params = query_params if query_params else {}
# Merge authentication data into request
if action == 'GET':
query_params.update(self._build_authentication_data())
else:
data.update(self._build_authentication_data())
# Fire request against ClouDNS API and parse result as JSON
r = requests.request(action, self.api_endpoint + url, params=query_params, data=data)
r.raise_for_status()
payload = r.json()
# Check ClouDNS specific status code and description
if 'status' in payload and 'statusDescription' in payload and payload['status'] != 'Success':
raise Exception('ClouDNS API request has failed: ' + payload['statusDescription'])
# Return payload
return payload
def _request(self, action='GET', url='/', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
default_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'PddToken': self.options.get('auth_token')
}
if not url.startswith(self.api_endpoint):
url = self.api_endpoint + url
r = requests.request(action, url, params=query_params,
data=json.dumps(data),
headers=default_headers)
r.raise_for_status() # if the request fails for any reason, throw an error.
if action == 'DELETE':
return ''
else:
return r.json()
def _request(self, action='GET', url='/', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
default_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
default_auth = (self.options['auth_username'], self.options['auth_token'])
r = requests.request(action, self.api_endpoint + url, params=query_params,
data=json.dumps(data),
headers=default_headers,
auth=default_auth)
r.raise_for_status() # if the request fails for any reason, throw an error.
return r.json()
def _request(self, action='GET', url='', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
default_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
query_params['api_key'] = self.options.get('auth_token')
query_params['resultFormat'] = 'JSON'
query_params['api_action'] = url
r = requests.request(action, self.api_endpoint, params=query_params,
data=json.dumps(data),
headers=default_headers)
r.raise_for_status() # if the request fails for any reason, throw an error.
if action == 'DELETE':
return ''
else:
result = r.json()
if len(result['ERRORARRAY']) > 0:
raise Exception('Linode api error: {0}'.format(result['ERRORARRAY']))
return result
def _request(self, action='GET', url='/', data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
default_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
}
default_auth = requests.auth.HTTPBasicAuth(self.options['auth_username'], self.options['auth_token'])
r = requests.request(action, self.api_endpoint + url, params=query_params,
data=json.dumps(data),
headers=default_headers,
auth=default_auth)
r.raise_for_status() # if the request fails for any reason, throw an error.
return r.json()
def create_record(self, type, name, content):
request = {
'action': 'SET',
'type': type,
'name': self.options['domain'],
'value': content
}
if name is not None:
request['name'] = self._full_name(name)
if self.options.get('ttl'):
request['ttl'] = self.options.get('ttl')
if self.options.get('priority'):
request['prio'] = self.options.get('priority')
payload = self._get('/dns/dyndns.jsp', request)
if payload.find('is_ok').text != 'OK:':
raise Exception('An error occurred: {0}'.format(payload.find('is_ok').text))
logger.debug('create_record: %s', True)
return True
def delete_record(self, identifier=None, type=None, name=None, content=None):
if identifier is not None:
type, name, content = self._parse_identifier(identifier)
request = {
'action' : 'DELETE',
'name': self.options['domain']
}
if type is not None:
request['type'] = type
if name is not None:
request['name'] = self._full_name(name)
if content is not None:
request['value'] = content
payload = self._get('/dns/dyndns.jsp', request)
if payload.find('is_ok').text != 'OK:':
raise Exception('An error occurred: {0}'.format(payload.find('is_ok').text))
logger.debug('delete_record: %s', True)
return True
def upload_prediction(self, file_path):
filename, signedRequest, headers, status_code = self.authorize(file_path)
if status_code!=200:
return status_code
dataset_id, comp_id, status_code = self.get_current_competition()
if status_code!=200:
return status_code
with open(file_path, 'rb') as fp:
r = requests.Request('PUT', signedRequest, data=fp.read())
prepped = r.prepare()
s = requests.Session()
resp = s.send(prepped)
if resp.status_code!=200:
return resp.status_code
r = requests.post(self._submissions_url,
data={'competition_id':comp_id, 'dataset_id':dataset_id, 'filename':filename},
headers=headers)
return r.status_code
def extract_url_path_and_query(full_url=None, no_query=False):
"""
Convert http://foo.bar.com/aaa/p.html?x=y to /aaa/p.html?x=y
:param no_query:
:type full_url: str
:param full_url: full url
:return: str
"""
if full_url is None:
full_url = request.url
split = urlsplit(full_url)
result = split.path or "/"
if not no_query and split.query:
result += '?' + split.query
return result
# ################# End Client Request Handler #################
# ################# Begin Middle Functions #################
def index(self):
scope="alexa_all"
sd = json.dumps({
"alexa:all": {
"productID": ProductID,
"productInstanceAttributes": {
"deviceSerialNumber": "001"
}
}
})
url = "https://www.amazon.com/ap/oa"
callback = cherrypy.url() + "code"
payload = {"client_id" : Client_ID, "scope" : "alexa:all", "scope_data" : sd, "response_type" : "code", "redirect_uri" : callback }
req = requests.Request('GET', url, params=payload)
p = req.prepare()
raise cherrypy.HTTPRedirect(p.url)
def request(url, params=None):
params = params or {}
headers = {}
if 'user_agent' in CONFIG:
headers['User-Agent'] = CONFIG['user_agent']
req = requests.Request('GET', url, params=params, auth=(CONFIG['api_key'], ""), headers=headers).prepare()
logger.info("GET {}".format(req.url))
resp = session.send(req)
if 'Retry-After' in resp.headers:
retry_after = int(resp.headers['Retry-After'])
logger.info("Rate limit reached. Sleeping for {} seconds".format(retry_after))
time.sleep(retry_after)
return request(url, params)
resp.raise_for_status()
return resp
def index(self):
sd = json.dumps({
"alexa:all": {
"productID": config['alexa']['Device_Type_ID'],
"productInstanceAttributes": {
"deviceSerialNumber": hashlib.sha256(str(uuid.getnode()).encode()).hexdigest()
}
}
})
url = "https://www.amazon.com/ap/oa"
callback = cherrypy.url() + "code"
payload = {
"client_id": config['alexa']['Client_ID'],
"scope": "alexa:all",
"scope_data": sd,
"response_type": "code",
"redirect_uri": callback
}
req = requests.Request('GET', url, params=payload)
prepared_req = req.prepare()
raise cherrypy.HTTPRedirect(prepared_req.url)
def index(self):
scope="alexa_all"
sd = json.dumps({
"alexa:all": {
"productID": ProductID,
"productInstanceAttributes": {
"deviceSerialNumber": "001"
}
}
})
url = "https://www.amazon.com/ap/oa"
callback = cherrypy.url() + "code"
payload = {"client_id" : Client_ID, "scope" : "alexa:all", "scope_data" : sd, "response_type" : "code", "redirect_uri" : callback }
req = requests.Request('GET', url, params=payload)
p = req.prepare()
raise cherrypy.HTTPRedirect(p.url)
def hard_requests():
from requests import Request, Session
s = Session()
headers = {'User-Agent': 'fake1.3.4'}
req = Request('GET', build_uri('user/emails'), auth=('shfanzie', '**********'), headers = headers)
prepped = req.prepare()
print prepped.body
print prepped.headers
try:
resp = s.send(prepped, timeout=1)
resp.raise_for_status()
except exceptions.Timeout as e:
print e.message
except exceptions.HTTPError as e:
print e.message
else:
print resp.status_code
print resp.reason
print resp.request.headers
print resp.text
def _prepare_request(self, **kwargs):
"""Prepares a HTTP request.
Args:
kwargs (dict): keyword arguments for the authentication function
(``_add_ecdsa_signature()`` or ``_add_basic_auth()``) and
:py:class:`requests.Request` class.
Raises:
AssertionError: in case ``kwargs['path']`` doesn't start with ``/``.
"""
kwargs.setdefault('headers', {})
# Add appropriate authentication headers
if isinstance(self.private_key, SigningKey):
self._add_ecdsa_signature(kwargs)
elif self.email and self.password:
self._add_basic_auth(kwargs)
# Generate URL from path
path = kwargs.pop('path')
assert path.startswith('/')
kwargs['url'] = urljoin(self.api_url, path)
return requests.Request(**kwargs).prepare()
def get_unfiltered_page(sdk_client, fields, start_index, stream):
service_name = GENERIC_ENDPOINT_MAPPINGS[stream]['service_name']
service_caller = sdk_client.GetService(service_name, version=VERSION)
selector = {
'fields': fields,
'paging': {
'startIndex': str(start_index),
'numberResults': str(PAGE_SIZE)
}
}
with metrics.http_request_timer(stream):
LOGGER.info("Request %s %s from start_index %s for customer %s",
PAGE_SIZE,
stream,
start_index,
sdk_client.client_customer_id)
page = attempt_get_from_service(service_caller, selector)
return page
def get_field_list(stream_schema, stream, stream_metadata):
#NB> add synthetic keys
field_list = get_fields_to_sync(stream_schema, stream_metadata)
LOGGER.info("Request fields: %s", field_list)
field_list = filter_fields_by_stream_name(stream, field_list)
LOGGER.info("Filtered fields: %s", field_list)
# These are munged because of the nature of the API. When you pass
# the field to the API you need to change its first letter to
# upper case.
#
# See:
# https://developers.google.com/adwords/api/docs/reference/v201708/AdGroupAdService.AdGroupAd
# for instance
field_list = [f[0].upper()+f[1:] for f in field_list]
LOGGER.info("Munged fields: %s", field_list)
return field_list