def response(cls, r):
"""Extract the data from the API response *r*.
This method essentially strips the actual response of the envelope
while raising an :class:`~errors.ApiError` if it contains one or more
errors.
:param r: the HTTP response from an API call
:type r: :class:`requests.Response`
:returns: API response data
:rtype: json
"""
try:
data = r.json()
except ValueError:
raise errors.ApiError(r)
if data['meta'].get("errors"):
raise errors.ApiError(data['meta'])
return data["response"]
python类Response()的实例源码
def post(self, endpoint, query=None, body=None, data=None):
"""
Send a POST request to armada. If both body and data are specified,
body will will be used.
:param string endpoint: URL string following hostname and API prefix
:param dict query: dict of k, v parameters to add to the query string
:param string body: string to use as the request body.
:param data: Something json.dumps(s) can serialize.
:return: A requests.Response object
"""
api_url = '{}{}'.format(self.base_url, endpoint)
self.logger.debug("Sending POST with armada_client session")
if body is not None:
self.logger.debug("Sending POST with explicit body: \n%s" % body)
resp = self._session.post(
api_url, params=query, data=body, timeout=3600)
else:
self.logger.debug("Sending POST with JSON body: \n%s" % str(data))
resp = self._session.post(
api_url, params=query, json=data, timeout=3600)
return resp
def post(self, endpoint, query=None, body=None, data=None):
"""
Send a POST request to Drydock. If both body and data are specified,
body will will be used.
:param string endpoint: The URL string following the hostname and API prefix
:param dict query: A dict of k, v parameters to add to the query string
:param string body: A string to use as the request body. Will be treated as raw
:param data: Something json.dumps(s) can serialize. Result will be used as the request body
:return: A requests.Response object
"""
self.logger.debug("Sending POST with drydock_client session")
if body is not None:
self.logger.debug("Sending POST with explicit body: \n%s" % body)
resp = self.__session.post(
self.base_url + endpoint, params=query, data=body, timeout=10)
else:
self.logger.debug("Sending POST with JSON body: \n%s" % str(data))
resp = self.__session.post(
self.base_url + endpoint, params=query, json=data, timeout=10)
return resp
def test_store_companies_house_profile_in_session_saves_in_session(
mock_retrieve_profile, client
):
data = {
'date_of_creation': '2000-10-10',
'company_name': 'Example corp',
'company_status': 'active',
'company_number': '01234567',
'registered_office_address': {'foo': 'bar'}
}
response = Response()
response.status_code = http.client.OK
response.json = lambda: data
session = client.session
mock_retrieve_profile.return_value = response
helpers.store_companies_house_profile_in_session(session, '01234567')
mock_retrieve_profile.assert_called_once_with(number='01234567')
assert session[helpers.COMPANIES_HOUSE_PROFILE_SESSION_KEY] == data
assert session.modified is True
def telljoke(self, ctx: commands.Context) -> None:
"""
Responds with a random joke from theoatmeal.com
"""
# TODO: get more joke formats (or a better source)
# Send typing as the request can take some time.
await self.bot.send_typing(ctx.message.channel)
# Build a new request object
req: grequests.AsyncRequest = grequests.get('http://theoatmeal.com/djtaf/', timeout=1)
# Send new request
res: List[requests.Response] = grequests.map([req], exception_handler=self.request_exception_handler)
# Since we only have one request we can just fetch the first one.
# res[0].content is the html in bytes
soup = BeautifulSoup(res[0].content.decode(res[0].encoding), 'html.parser')
joke_ul = soup.find(id='joke_0')
joke = joke_ul.find('h2', {'class': 'part1'}).text.lstrip() + '\n' + joke_ul.find(id='part2_0').text
await self.bot.say(joke)
def randomfact(self, ctx: commands.Context) -> None:
"""
Responds with a random fact scraped from unkno.com
"""
# Send typing as the request can take some time.
await self.bot.send_typing(ctx.message.channel)
# Build a new request object
req: grequests.AsyncRequest = grequests.get('http://www.unkno.com/', timeout=1)
# Send new request
res: List[requests.Response] = grequests.map([req], exception_handler=self.request_exception_handler)
# Since we only have one request we can just fetch the first one.
# res[0].content is the html in bytes
soup = BeautifulSoup(res[0].content.decode(res[0].encoding), 'html.parser')
await self.bot.say(soup.find(id='content').text)
def test_push_pact(mock_put):
broker_client = BrokerClient(broker_url=settings.PACT_BROKER_URL)
mocked_response = Response()
mocked_response.status_code = CREATED
mock_put.return_value = mocked_response
with open(PACT_FILE_PATH) as data_file:
pact = json.load(data_file)
broker_client.push_pact(
provider=PROVIDER,
consumer=CONSUMER,
pact_file=PACT_FILE_PATH,
consumer_version=CONSUMER_VERSION
)[0]
mock_put.assert_called_once_with(
EXPECTED_PUSH_PACT_URL,
json=pact,
auth=None
)
def test_tag_consumer(mock_put):
broker_client = BrokerClient(broker_url=settings.PACT_BROKER_URL)
mocked_response = Response()
mocked_response.status_code = CREATED
mock_put.return_value = mocked_response
broker_client.tag_consumer(
provider=PROVIDER,
consumer=CONSUMER,
consumer_version=CONSUMER_VERSION,
tag=TAG
)[0]
mock_put.assert_called_once_with(
EXPECTED_TAG_CONSUMER_URL,
headers={'Content-Type': 'application/json'},
auth=None
)
def send(self, req, **kwargs):
resp = requests.Response()
try:
resp.raw = urlopen(req.url)
content_length = resp.raw.headers.get('content-length')
if content_length is not None:
resp.headers['Content-Length'] = content_length
except IOError as e:
if e.errno == errno.EACCES:
resp.status_code = requests.codes.forbidden
elif e.errno == errno.ENOENT:
resp.status_code = requests.codes.not_found
else:
resp.status_code = requests.codes.bad_request
except OSError:
resp.status_code = requests.codes.bad_request
else:
resp.status_code = requests.codes.ok
return resp
def test_it(self):
import sys
import mock
import requests
from ci_diff_helper import _github
response = requests.Response()
remaining = '17'
response.headers[_github._RATE_REMAINING_HEADER] = remaining
rate_limit = '60'
response.headers[_github._RATE_LIMIT_HEADER] = rate_limit
rate_reset = '1475953149'
response.headers[_github._RATE_RESET_HEADER] = rate_reset
with mock.patch('six.print_') as mocked:
self._call_function_under_test(response)
msg = _github._RATE_LIMIT_TEMPLATE.format(
remaining, rate_limit, rate_reset)
self.assertEqual(mocked.call_count, 2)
mocked.assert_any_call(msg, file=sys.stderr)
mocked.assert_any_call(_github._GH_ENV_VAR_MSG,
file=sys.stderr)
def test_match_not_found(self):
"""Check if the server send an empty match if not found."""
self.service.cache_manager.find_match.return_value = None
self.service.riot_api_handler.get_match.side_effect = LoLException(
"404", requests.Response())
response = self.stub.Match(service_pb2.MatchRequest(id=4242,
region=constants_pb2.EUW))
self.assertEqual(response, match_pb2.MatchReference())
test_drv_opencontrail.py 文件源码
项目:networking-opencontrail
作者: openstack
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def test_request_api_server_authorized(self, options, config, request):
driver = self._get_driver(options, config)
driver._apiinsecure = True
response = requests.Response()
response.status_code = requests.codes.ok
request.return_value = response
url = "/URL"
result = driver._request_api_server(url)
self.assertEqual(response, result)
request.assert_called_with(url, data=None, headers=mock.ANY,
verify=False)
test_drv_opencontrail.py 文件源码
项目:networking-opencontrail
作者: openstack
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_request_api_server_auth_failed(self, options, config, request):
driver = self._get_driver(options, config)
driver._apiinsecure = False
driver._use_api_certs = False
response = requests.Response()
response.status_code = requests.codes.unauthorized
request.return_value = response
url = "/URL"
self.assertRaises(RuntimeError, driver._request_api_server, url)
test_drv_opencontrail.py 文件源码
项目:networking-opencontrail
作者: openstack
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_request_api_server_authn(self, options, config, request):
driver = self._get_driver(options, config)
driver._apiinsecure = True
response = requests.Response()
response.status_code = requests.codes.ok
request.return_value = response
url = "/URL"
result = driver._request_api_server_authn(url)
self.assertEqual(response, result)
request.assert_called_with(url, data=None, headers=mock.ANY,
verify=False)
def test_vnc_api_is_authenticated_unauthorized(self, config, request):
config.APISERVER = mock.MagicMock()
config.APISERVER.use_ssl = False
config.APISERVER.api_server_ip = "localhost"
config.APISERVER.api_server_port = "8082"
response = requests.Response()
response.status_code = requests.codes.unauthorized
request.return_value = response
result = utils.vnc_api_is_authenticated()
request.assert_called_with(mock.ANY)
self.assertEqual(True, result)
def test_vnc_api_is_authenticated_invalid(self, config, request):
config.APISERVER = mock.MagicMock()
config.APISERVER.use_ssl = True
config.APISERVER.api_server_ip = "localhost"
config.APISERVER.api_server_port = "8082"
response = requests.Response()
response.status_code = requests.codes.server_error
request.return_value = response
self.assertRaises(requests.exceptions.HTTPError,
utils.vnc_api_is_authenticated)
request.assert_called_with(mock.ANY)
def process(self, http_response):
# type: (requests.Response) -> PrestoStatus
if not http_response.ok:
self.raise_response_error(http_response)
http_response.encoding = 'utf-8'
response = http_response.json()
logger.debug('HTTP {}: {}'.format(http_response.status_code, response))
if 'error' in response:
raise self._process_error(response['error'])
if constants.HEADER_CLEAR_SESSION in http_response.headers:
for prop in get_header_values(
response.headers,
constants.HEADER_CLEAR_SESSION,
):
self._client_session.properties.pop(prop, None)
if constants.HEADER_SET_SESSION in http_response.headers:
for key, value in get_session_property_values(
response.headers,
constants.HEADER_SET_SESSION,
):
self._client_session.properties[key] = value
self._next_uri = response.get('nextUri')
return PrestoStatus(
id=response['id'],
stats=response['stats'],
info_uri=response['infoUri'],
next_uri=self._next_uri,
rows=response.get('data', []),
columns=response.get('columns'),
)
def request(self, method, *resources, **kwargs):
method = method.upper()
response_key = kwargs.pop('response_key', None)
key = kwargs.pop('key', None)
if key is not None:
response_key = key
query = kwargs.pop('query', None)
data = kwargs.pop('data', None)
fragment = kwargs.pop('fragment', '')
params = kwargs.pop('params', '')
keep_blank_values = kwargs.pop('keep_blank_values', None)
timeout = kwargs.pop('timeout', 60)
resource = self.build_resource(resources)
content_type = kwargs.pop('content_type', 'json')
if data is not None:
if 'json' in content_type:
kwargs['json'] = data
if content_type == 'body':
kwargs['data'] = data
url = self.url_for(resource, query, params=params,
fragment=fragment,
keep_blank_values=keep_blank_values)
self.logger.info('Request %s for %s', method, url)
response = requests.request(method, url, timeout=timeout, **kwargs)
return self.handle_response(response, response_key=response_key)
def emaillogin(email, otp):
"""Raw Cloud API call, request cloud token with email address & OTP.
Args:
email(str): Email address connected to Cozify account.
otp(int): One time passcode.
Returns:
str: cloud token
"""
payload = {
'email': email,
'password': otp
}
response = requests.post(cloudBase + 'user/emaillogin', params=payload)
if response.status_code == 200:
return response.text
else:
raise APIError(response.status_code, response.text)
def hubkeys(cloud_token):
"""1:1 implementation of user/hubkeys
Args:
cloud_token(str) Cloud remote authentication token.
Returns:
dict: Map of hub_id: hub_token pairs.
"""
headers = {
'Authorization': cloud_token
}
response = requests.get(cloudBase + 'user/hubkeys', headers=headers)
if response.status_code == 200:
return json.loads(response.text)
else:
raise APIError(response.status_code, response.text)
def refreshsession(cloud_token):
"""1:1 implementation of user/refreshsession
Args:
cloud_token(str) Cloud remote authentication token.
Returns:
str: New cloud remote authentication token. Not automatically stored into state.
"""
headers = {
'Authorization': cloud_token
}
response = requests.get(cloudBase + 'user/refreshsession', headers=headers)
if response.status_code == 200:
return response.text
else:
raise APIError(response.status_code, response.text)
def cone_search(ra, dec, radius, table="gaiadr1.gaia_source", **kwargs):
"""
Perform a cone search against the ESA Gaia database using the TAP.
:param ra:
Right ascension (degrees).
:param dec:
Declination (degrees).
:param radius:
Cone search radius (degrees).
:param table: [optional]
The table name to perform the cone search on. Some examples are:
gaiadr1.gaia_source
gaiadr1.tgas_source
:param kwargs:
Keyword arguments are passed directly to the `query` method.
:returns:
The data returned by the ESA/Gaia archive -- either as an astropy
table or as a dictionary -- and optionally, the `requests.response`
object used.
"""
return query(
""" SELECT *
FROM {table}
WHERE CONTAINS(
POINT('ICRS',{table}.ra,{table}.dec),
CIRCLE('ICRS',{ra:.10f},{dec:.10f},{radius:.10f})) = 1;""".format(
table=table, ra=ra, dec=dec, radius=radius), **kwargs)
def __init__(self, response, description, *args, **kwargs):
"""Exception
exception instance has:
response, description, content and status_code
:param response: requests.response
:param description: str - description for error
"""
self.response = response
self.description = description
self.status_code = response.status_code
self.content = response.content
super(ResponseError, self).__init__(*args, **kwargs)
def __repr__(self): # pragma: no cover
return 'Error status code: {}. Description: {}'.format(
self.response.status_code, self.description)
def handle_response(self, response, response_key=None):
"""Handler for response object
:param response: requests.response obj
:param response_key: key for dict in response obj
:return object, result for response, python obj
"""
status_code = response.status_code
try:
result = response.json()
except Exception as e:
self.logger.exception(e)
raise ResponseError(response, e)
if result:
if response_key is not None and status_code in self.ok_statuses:
if response_key in result:
result = result[response_key]
else:
raise ResponseError(response, 'Response key not found!')
elif response_key is not None and status_code in self.to_none_statuses:
result = None
elif status_code not in self.ok_statuses and status_code not in self.to_none_statuses:
raise ResponseError(response,
'Status code {} not in ok_statuses {}'.format(
status_code, self.ok_statuses))
if response_key is not None and self.empty_to_none and result is not None and not result:
result = None
return result
def make_402_payment(self, response, max_price):
"""Payment handling method implemented by a BitRequests subclass.
Args:
response (requests.response): 402 response from the API server.
max_price (int): maximum allowed price for a request (in satoshi).
Returns:
headers (dict):
dict of headers with payment data to send to the
API server to inform payment status for the resource.
"""
raise NotImplementedError()
def make_402_payment(self, response, max_price):
"""Make an on-chain payment."""
# Retrieve payment headers
headers = response.headers
price = headers.get(OnChainRequests.HTTP_BITCOIN_PRICE)
payee_address = headers.get(OnChainRequests.HTTP_BITCOIN_ADDRESS)
# Verify that the payment method is supported
if price is None or payee_address is None:
raise UnsupportedPaymentMethodError(
'Resource does not support that payment method.')
# Convert string headers into correct data types
price = int(price)
# Verify resource cost against our budget
if max_price and price > max_price:
max_price_err = 'Resource price ({}) exceeds max price ({}).'
raise ResourcePriceGreaterThanMaxPriceError(max_price_err.format(price, max_price))
# Create the signed transaction
onchain_payment = self.wallet.make_signed_transaction_for(
payee_address, price, use_unconfirmed=True)[0].get('txn').to_hex()
return_address = self.wallet.current_address
logger.debug('[OnChainRequests] Signed transaction: {}'.format(
onchain_payment))
return {
'Bitcoin-Transaction': onchain_payment,
'Return-Wallet-Address': return_address,
OnChainRequests.HTTP_BITCOIN_PRICE: str(price),
OnChainRequests.HTTP_PAYER_21USERNAME: urllib.parse.quote(self.username) if self.username else None
}
def get_402_info(self, url):
"""Get channel payment information about the resource."""
response = requests.get(url)
price = response.headers.get(ChannelRequests.HTTP_BITCOIN_PRICE)
channel_url = response.headers.get(ChannelRequests.HTTP_BITCOIN_PAYMENT_CHANNEL_SERVER)
return {ChannelRequests.HTTP_BITCOIN_PRICE: price,
ChannelRequests.HTTP_BITCOIN_PAYMENT_CHANNEL_SERVER: channel_url}
def requestlogin(email):
"""Raw Cloud API call, request OTP to be sent to account email address.
Args:
email(str): Email address connected to Cozify account.
"""
payload = { 'email': email }
response = requests.post(cloudBase + 'user/requestlogin', params=payload)
if response.status_code is not 200:
raise APIError(response.status_code, response.text)
def lan_ip():
"""1:1 implementation of hub/lan_ip
This call will fail with an APIError if the requesting source address is not the same as that of the hub, i.e. if they're not in the same NAT network.
The above is based on observation and may only be partially true.
Returns:
list: List of Hub ip addresses.
"""
response = requests.get(cloudBase + 'hub/lan_ip')
if response.status_code == 200:
return json.loads(response.text)
else:
raise APIError(response.status_code, response.text)