def fetch(self):
url = "https://www.reddit.com/top/.json"
headers = {
"User-Agent": f"ChangeMeClient/0.1 by YOUR_USERNAME_ON_REDDIT"
}
response = requests.request(method='GET', url=url, headers=headers)
json = response.json()
self.logger.debug(f'Fetched. Code: {response.status_code}')
data = []
if response.status_code == 200:
data = json['data']['children']
return data
# get necessary data
python类request()的实例源码
cisco_meraki_fw_ap_rules_api.py 文件源码
项目:cisco-meraki-fw-ap-rules-api
作者: robertcsapo
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def meraki_requests(url,meraki_api_key,error_handle):
url = "https://dashboard.meraki.com/api/v0%s" % url
if meraki_debug == "1":
print("GET: %s" % url)
querystring = {}
headers = {
'x-cisco-meraki-api-key': meraki_api_key,
'content-type': "application/json",
'cache-control': "no-cache",
}
response = requests.request("GET", url, headers=headers, params=querystring)
if response.status_code == 200:
json_data = json.loads(response.text)
return json_data
else:
if meraki_debug == "1":
print(response.text)
if error_handle == "enable":
sys.exit("Failed: Code %s" % response.status_code)
# Meraki REST API Call - PUT
def get_user_api_token(logger):
"""
Generate iAuditor API Token
:param logger: the logger
:return: API Token if authenticated else None
"""
username = input("iAuditor username: ")
password = getpass()
generate_token_url = "https://api.safetyculture.io/auth"
payload = "username=" + username + "&password=" + password + "&grant_type=password"
headers = {
'content-type': "application/x-www-form-urlencoded",
'cache-control': "no-cache",
}
response = requests.request("POST", generate_token_url, data=payload, headers=headers)
if response.status_code == requests.codes.ok:
return response.json()['access_token']
else:
logger.error('An error occurred calling ' + generate_token_url + ': ' + str(response.json()))
return None
def _req(self, method: str='GET', endpoint: str=None, params: dict=None,
data: dict=None, uses_oauth: bool=False):
if params is None:
params = {}
fetch = self.session.request if uses_oauth else requests.request
res = fetch(
method=method,
url='{}/{}'.format(self.base_url, endpoint),
params={
'format': 'xml',
'key': self._developer_key,
**params
},
data=data
)
res.raise_for_status()
return res
def fetch(self):
section = 'hot' # hot | top | user
sort = 'viral' # viral | top | time | rising (only available with user section)
show_viral = 'true'
show_mature = 'true'
album_previews = 'false'
url = f'https://api.imgur.com/3/gallery/{section}/{sort}'
querystring = {"showViral": f"{show_viral}", "mature": f"{show_mature}", "album_previews": f"{album_previews}"}
headers = {'authorization': f'Client-ID {client_id}'}
response = requests.request("GET", url, headers=headers, params=querystring)
json = response.json()
self.logger.debug(f'Fetched. Code: {response.status_code}')
return json['data'][:FETCH_LIMIT]
def create_logset(logset_name=None, params=None):
"""
Add a new logset to the current account.
If a filename is given, load the contents of the file
as json parameters for the request.
If a name is given, create a new logset with the given name
"""
if params is not None:
request_params = params
else:
request_params = {
'logset': {
'name': logset_name
}
}
headers = api_utils.generate_headers('rw')
try:
response = requests.post(_url()[1], json=request_params, headers=headers)
handle_response(response, 'Creating logset failed.\n', 201)
except requests.exceptions.RequestException as error:
sys.stderr.write(error)
sys.exit(1)
def add_new_user(first_name, last_name, email):
"""
Add a new user to the current account.
"""
action, url = _url(('users',))
json_content = {
"user":
{
"email": str(email),
"first_name": str(first_name),
"last_name": str(last_name)
}
}
body = json.dumps(json_content)
headers = api_utils.generate_headers('owner', method='POST', action=action, body=body)
try:
response = requests.request('POST', url, json=json_content, headers=headers)
handle_create_user_response(response)
except requests.exceptions.RequestException as error:
sys.stderr.write(error)
sys.exit(1)
def delete_user(user_key):
"""
Delete a user from the current account.
"""
action, url = _url(('users', user_key))
headers = api_utils.generate_headers('owner', method='DELETE', action=action, body='')
try:
response = requests.request('DELETE', url, data='', headers=headers)
if response_utils.response_error(response) is True: # Check response has no errors
sys.stderr.write('Delete user failed, status code: %s' % response.status_code)
sys.exit(1)
elif response.status_code == 204:
print 'Deleted user'
except requests.exceptions.RequestException as error:
sys.stderr.write(error)
sys.exit(1)
def read(self, buf_len):
"""
Implementation note: due to a constraint of the requests library, the
buf_len that is used the first time this method is called will cause
all future requests to ``read`` to have the same ``buf_len`` even if
a different ``buf_len`` is passed in on subsequent requests.
"""
if self._iter is None: # lazy load response body iterator
method = self.input_spec.get('method', 'GET').upper()
headers = self.input_spec.get('headers', {})
params = self.input_spec.get('params', {})
req = requests.request(
method, self.input_spec['url'], headers=headers, params=params,
stream=True, allow_redirects=True)
req.raise_for_status() # we have the response headers already
self._iter = req.iter_content(buf_len, decode_unicode=False)
try:
return six.next(self._iter)
except StopIteration:
return b''
def push(data, spec, **kwargs):
task_output = kwargs.get('task_output', {})
target = task_output.get('target', 'memory')
url = spec['url']
method = spec.get('method', 'POST').upper()
if target == 'filepath':
with open(data, 'rb') as fd:
request = requests.request(
method, url, headers=spec.get('headers', {}), data=fd,
params=spec.get('params', {}), allow_redirects=True)
elif target == 'memory':
request = requests.request(
method, url, headers=spec.get('headers', {}), data=data,
params=spec.get('params', {}), allow_redirects=True)
else:
raise Exception('Invalid HTTP fetch target: ' + target)
try:
request.raise_for_status()
except Exception:
print 'HTTP push failed (%s). Response: %s' % (url, request.text)
raise
def _func(self, conv):
try:
request = registration_request(conv)
except NoSuchEvent:
self._status = ERROR
else:
# 'offline_access only allow if response_type == 'code'
try:
req_scope = request['scope']
except KeyError:
pass
else:
if 'offline_access' in req_scope:
if request['response_type'] != 'code':
self._status = ERROR
self._message = 'Offline access not allowed for ' \
'anything but code flow'
return {}
def _func(self, conv):
try:
request = registration_request(conv)
except NoSuchEvent:
self._status = ERROR
else:
try:
_ss = request['software_statement']
except KeyError:
pass
else:
missing = []
for claim in ['redirect_uris', 'grant_types', 'client_name',
'client_uri']:
if claim not in _ss:
missing.append(claim)
if 'jwks_uri' not in _ss and 'jwks' not in _ss:
missing.append('jwks_uri/jwks')
if missing:
self._status = WARNING
self._message = 'Missing "{}" claims from Software ' \
'Statement'.format(missing)
return {}
def _func(self, conv):
try:
request = registration_request(conv)
except NoSuchEvent:
self._status = ERROR
else:
try:
req_scopes = request['scope']
except KeyError:
pass
else:
if 'offline_access' in req_scopes:
if request['response_type'] != ['code']:
self._status = ERROR
self._message = 'Offline access only when using ' \
'"code" flow'
return {}
def _func(self, conv):
request = access_token_request(conv)
ca = request['parsed_client_assertion']
missing = []
for claim in ["iss", "sub", "aud", "iat", "exp", "jti"]:
if claim not in ca:
missing.append(claim)
if missing:
self._status = ERROR
self._message = 'Redirect_uri not registered'
# verify jti entropy
bits = calculate(ca['jti'])
if bits < 128:
self._status = WARNING
self._message = 'Not enough entropy in string: {} < 128'.format(
bits)
return {}
def search(api_key, term, location):
"""Query the Search API by a search term and location.
Args:
term (str): The search term passed to the API.
location (str): The search location passed to the API.
Returns:
dict: The JSON response from the request.
"""
url_params = {
'term': term.replace(' ', '+'),
'location': location.replace(' ', '+'),
'limit': SEARCH_LIMIT
}
return request(API_HOST, SEARCH_PATH, api_key, url_params=url_params)
def get_state(self) -> str:
"""Get device state.
Returns:
"on", "off", or "unknown"
"""
if self.state_cmd is None:
return "unknown"
resp = requests.request(self.state_method, self.state_cmd,
data=self.state_data, json=self.state_json,
headers=self.headers, auth=self.auth)
if self.state_response_off in resp.text:
return "off"
elif self.state_response_on in resp.text:
return "on"
return "unknown"
def stt(self, audio, language, limit):
""" Web API wrapper for performing Speech to Text (STT)
Args:
audio (bytes): The recorded audio, as in a FLAC file
language (str): A BCP-47 language code, e.g. 'en-US'
limit (int): Maximum minutes to transcribe(?)
Returns:
str: JSON structure with transcription results
"""
return self.request({
'method': 'POST',
'headers': {'Content-Type': 'audio/x-flac'},
'query': {'lang': language, 'limit': limit},
'data': audio
})
def request_jira(client, url, method='GET', **kwargs):
jwt_authorization = 'JWT %s' % encode_token(
method, url, app.config.get('ADDON_KEY'),
client.sharedSecret)
result = requests.request(
method,
client.baseUrl.rstrip('/') + url,
headers={
"Authorization": jwt_authorization,
"Content-Type": "application/json"
},
**kwargs)
try:
result.raise_for_status()
except requests.HTTPError as e:
raise requests.HTTPError(e.response.text, response=e.response)
return result
def copy(self, destination):
'''
Method to copy resource to another location
Args:
destination (rdflib.term.URIRef, str): URI location to move resource
Returns:
(Resource) new, moved instance of resource
'''
# set move headers
destination_uri = self.repo.parse_uri(destination)
# http request
response = self.repo.api.http_request('COPY', self.uri, data=None, headers={'Destination':destination_uri.toPython()})
# handle response
if response.status_code == 201:
return destination_uri
else:
raise Exception('HTTP %s, could not move resource %s to %s' % (response.status_code, self.uri, destination_uri))
def _build_rdf(self, data=None):
'''
Parse incoming rdf as self.rdf.orig_graph, create copy at self.rdf.graph
Args:
data (): payload from GET request, expected RDF content in various serialization formats
Returns:
None
'''
# recreate rdf data
self.rdf = SimpleNamespace()
self.rdf.data = data
self.rdf.prefixes = SimpleNamespace()
self.rdf.uris = SimpleNamespace()
# populate prefixes
for prefix,uri in self.repo.context.items():
setattr(self.rdf.prefixes, prefix, rdflib.Namespace(uri))
# graph
self._parse_graph()
def revert_to(self):
'''
method to revert resource to this version by issuing PATCH
Args:
None
Returns:
None: sends PATCH request, and refreshes parent resource
'''
# send patch
response = self.resource.repo.api.http_request('PATCH', self.uri)
# if response 204
if response.status_code == 204:
logger.debug('reverting to previous version of resource, %s' % self.uri)
# refresh current resource handle
self._current_resource.refresh()
else:
raise Exception('HTTP %s, could not revert to resource version, %s' % (response.status_code, self.uri))
def test_add_two_simple_handlers_with_same_route(server: FakeServer,
methods: List[str],
routes: List[str],
statuses: List[int]):
server. \
on_(methods[0], routes[0]). \
response(status=statuses[0])
server. \
on_(methods[1], routes[1]). \
response(status=statuses[1])
response_0 = requests.request(methods[0], server.base_uri + routes[0])
response_1 = requests.request(methods[1], server.base_uri + routes[1])
assert response_0.status_code == statuses[0]
assert response_1.status_code == statuses[1]
def _request(self, method, path, **kwargs):
url = self.url_for('api', 'v{self.version}'.format(self=self), *path.split('/'))
try:
headers = self.headers.update(kwargs['headers'])
except KeyError:
headers = self.headers
try:
response = requests.request(method, url, headers=headers, **kwargs)
response.raise_for_status()
return response.json()
except requests.exceptions.ConnectionError:
raise VMFarmsAPIError('Cannot connect to VM Farms API at {}.'.format(self.url))
except requests.exceptions.HTTPError as error:
self._raise_http_error_as_api_error(error)
except ValueError:
raise VMFarmsAPIError('Unexpected response from server.', response)
def _req(self, method, frag, data=None):
resp = requests.request(method, self.url + frag,
data=json.dumps(data, default=_json_serial),
verify=self.verify,
auth=self.credentials,
headers={'Content-Type': 'application/json'})
# try to extract reason from response when request returns error
if 400 <= resp.status_code < 600:
try:
resp.reason = json.loads(resp.text)['Message'];
except:
pass;
resp.raise_for_status()
return resp
def sendjson(self, method, urlpath, obj=None):
"""Send json to the OpenDaylight controller."""
headers = {'Content-Type': 'application/json'}
data = jsonutils.dumps(obj, indent=2) if obj else None
url = '/'.join([self.url, urlpath])
LOG.debug("Sending METHOD (%(method)s) URL (%(url)s) JSON (%(obj)s)" %
{'method': method, 'url': url, 'obj': obj})
r = requests.request(method, url=url,
headers=headers, data=data,
auth=self.auth, timeout=self.timeout)
try:
r.raise_for_status()
except Exception as ex:
LOG.error("Error Sending METHOD (%(method)s) URL (%(url)s)"
"JSON (%(obj)s) return: %(r)s ex: %(ex)s rtext: "
"%(rtext)s" %
{'method': method, 'url': url, 'obj': obj, 'r': r,
'ex': ex, 'rtext': r.text})
return r
try:
return json.loads(r.content)
except Exception:
LOG.debug("%s" % r)
return
def execute_as_string(self, request):
"""Execute a given HttpRequest to get a string response back
Args:
request (HttpRequest): The given HttpRequest to execute.
Returns:
HttpResponse: The response of the HttpRequest.
"""
response = requests.request(HttpMethodEnum.to_string(request.http_method),
request.query_url,
headers=request.headers,
params=request.query_parameters,
data=request.parameters,
files=request.files,
auth=(request.username, request.password))
return self.convert_response(response, False)
def execute_as_binary(self, request):
"""Execute a given HttpRequest to get a binary response back
Args:
request (HttpRequest): The given HttpRequest to execute.
Returns:
HttpResponse: The response of the HttpRequest.
"""
response = requests.request(HttpMethodEnum.to_string(request.http_method),
request.query_url,
headers=request.headers,
params=request.query_parameters,
data=request.parameters,
files=request.files,
auth=(request.username, request.password))
return self.convert_response(response, True)
NewsArticleClass.py 文件源码
项目:Python-Scripts-Repo-on-Data-Science
作者: qalhata
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def scrapeSource(url, magicFrag='2017',
scraperFunction=getNYTText, token='None'):
urlBodies = {}
requests = urllib3.PoolManager()
response = requests.request('GET', url)
soup = BeautifulSoup(response.data)
# the above lines of code sets up the beautifulSoup page
# now we find links
# links are always of the form <a href='url'> link-text </a>
for a in soup.findAll('a'):
try:
# the line above refers to indiv. scrapperFunction
# for NYT & washPost
if body and len(body) > 0:
urlBodies[url] = body
print(url)
except:
numErrors = 0
numErrors += 1
def get_version(self):
"""Fetches the current version number of the Graph API being used."""
args = {"access_token": self.access_token}
try:
response = requests.request("GET",
"https://graph.facebook.com/" +
self.version + "/me",
params=args,
timeout=self.timeout,
proxies=self.proxies)
except requests.HTTPError as e:
response = json.loads(e.read())
raise GraphAPIError(response)
try:
headers = response.headers
version = headers["facebook-api-version"].replace("v", "")
return float(version)
except Exception:
raise GraphAPIError("API version number not available")
def debug_access_token(self, token, app_id, app_secret):
"""
Gets information about a user access token issued by an app. See
<https://developers.facebook.com/docs/facebook-login/access-tokens
#debug>
We can generate the app access token by concatenating the app
id and secret: <https://developers.facebook.com/docs/
facebook-login/access-tokens#apptokens>
"""
args = {
"input_token": token,
"access_token": "%s|%s" % (app_id, app_secret)
}
return self.request("/debug_token", args=args)