def get_response_and_time(self, key, default=(None, None)):
""" Retrieves response and timestamp for `key` if it's stored in cache,
otherwise returns `default`
:param key: key of resource
:param default: return this if `key` not found in cache
:returns: tuple (response, datetime)
.. note:: Response is restored after unpickling with :meth:`restore_response`
"""
try:
if key not in self.responses:
key = self.keys_map[key]
response, timestamp = self.responses[key]
except KeyError:
return default
return self.restore_response(response), timestamp
python类Response()的实例源码
def put(self, url, data, headers=None, ok_status_code=requests.codes.ok, auto_renew=True):
"""
Perform a HTTP PUT request.
:param str url: URL of the HTTP request.
:param bytes | None data: Binary data to send in the request body.
:param dict | None headers: Additional headers for the HTTP request.
:param int ok_status_code: (Optional) Expected status code for the HTTP response.
:param True | False auto_renew: (Optional) If True, auto recover from expired token error or Internet failure.
:rtype: requests.Response
"""
params = {
'proxies': self.proxies,
'data': data
}
if headers is not None:
params['headers'] = headers
return self.request('put', url, params=params,
ok_status_code=ok_status_code, auto_renew=auto_renew)
def save_response(self, key, response):
""" Save response to cache
:param key: key for this response
:param response: response to save
.. note:: Response is reduced before saving (with :meth:`reduce_response`)
to make it picklable
"""
#
# Delete the X-Auth-Token
#
if 'X-Auth-Token' in response.request.headers:
del response.request.headers['X-Auth-Token']
self.responses[key] = response
def get_response(self, key, default=None):
""" Retrieves response and timestamp for `key` if it's stored in cache,
otherwise returns `default`
:param key: key of resource
:param default: return this if `key` not found in cache
:returns: tuple (response, datetime)
.. note:: Response is restored after unpickling with :meth:`restore_response`
"""
try:
if key not in self.responses:
key = self.keys_map[key]
response = self.responses[key]
except KeyError:
return default
return response
def restore_response(self, response, seen=None):
""" Restore response object after unpickling
"""
if seen is None:
seen = {}
try:
return seen[id(response)]
except KeyError:
pass
result = requests.Response()
for field in self._response_attrs:
setattr(result, field, getattr(response, field, None))
result.raw._cached_content_ = result.content
seen[id(response)] = result
result.history = tuple(self.restore_response(r, seen) for r in response.history)
return result
test_drv_opencontrail.py 文件源码
项目:networking-opencontrail
作者: openstack
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_request_api_server_auth_recover(self, options, config, request,
text):
driver = self._get_driver(options, config)
driver._apiinsecure = False
driver._use_api_certs = False
response_bad = requests.Response()
response_bad.status_code = requests.codes.unauthorized
token = "xyztoken"
text.return_value = json.dumps({
'access': {
'token': {
'id': token
},
},
})
response_good = requests.Response()
response_good.status_code = requests.codes.ok
request.side_effect = [response_bad, response_good, response_good]
url = "/URL"
driver._request_api_server(url)
request.assert_called_with('/URL', data=None,
headers={'X-AUTH-TOKEN': token})
test_drv_opencontrail.py 文件源码
项目:networking-opencontrail
作者: openstack
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def test_request_backend(self, options, config):
driver = self._get_driver(options, config)
driver._relay_request = mock.MagicMock()
response = requests.Response()
response.status_code = requests.codes.ok
driver._relay_request.return_value = response
context = mock.MagicMock()
data_dict = {}
obj_name = 'object'
action = 'TEST'
code, message = driver._request_backend(context, data_dict, obj_name,
action)
self.assertEqual(requests.codes.ok, code)
self.assertEqual({'message': None}, message)
driver._relay_request.assert_called()
def test_asend_returns_appropriate_sorna_response(mocker, req_params,
mock_sorna_aresp):
req = Request(**req_params)
methods = Request._allowed_methods
for method in methods:
req.method = method
mock_reqfunc = mocker.patch.object(
aiohttp.ClientSession, method.lower(),
new_callable=asynctest.CoroutineMock
)
mock_reqfunc.return_value, conf = mock_sorna_aresp
resp = await req.asend()
assert isinstance(resp, Response)
assert resp.status == conf['status']
assert resp.reason == conf['reason']
assert resp.content_type == conf['content_type']
body = await conf['read']()
assert resp.content_length == len(body)
assert resp.text() == body.decode()
assert resp.json() == json.loads(body.decode())
def _parse_response_content_for_predictions(self, question_number, response):
"""
Parses the json representation of the docs from the HTTP response and returns it as list predictions
with scores (and confidences if they exist)
:param str question_number: used as qid
:param requests.Response response:
:return: list of feature vectors
:rtype: list(list(str))
"""
response_content = response.json()
results = []
if response_content['response']['numFound'] > 0:
for doc in response_content['response']['docs']:
if "ranker.confidence" in doc:
conf_score = doc["ranker.confidence"]
else:
conf_score = None
results.append(Prediction(qid=question_number, aid=doc[self.DOC_ID_FIELD_NAME], rank_score=doc['score'],
conf_score=conf_score))
else:
self.logger.warn('Empty response: %s' % vars(response))
return results
def _parse_response_content_for_predictions(self, question_number, response):
"""
Parses the json representation of the docs from the HTTP response and returns it as list predictions
with scores (and confidences if they exist)
:param str question_number: used as qid
:param requests.Response response:
:return: list of feature vectors
:rtype: list(list(str))
"""
response_content = response.json()
results = []
if response_content["matching_results"] > 0:
for doc in response_content['results']:
results.append(Prediction(qid=question_number, aid=doc[DOC_ID_FIELD_NAME], rank_score=doc['score'],
conf_score=None))
else:
self.logger.warn('Empty response: %s' % vars(response))
return results
def get(self, url, params=None, headers=None, ok_status_code=requests.codes.ok, auto_renew=True):
"""
Perform a HTTP GET request.
:param str url: URL of the HTTP request.
:param dict[str, T] | None params: (Optional) Dictionary to construct query string.
:param dict | None headers: (Optional) Additional headers for the HTTP request.
:param int ok_status_code: (Optional) Expected status code for the HTTP response.
:param True | False auto_renew: (Optional) If True, auto recover from expired token error or Internet failure.
:rtype: requests.Response
"""
args = {'proxies': self.proxies}
if params is not None:
args['params'] = params
if headers is not None:
args['headers'] = headers
return self.request('get', url, args, ok_status_code=ok_status_code, auto_renew=auto_renew)
def post(self, url, data=None, json=None, headers=None, ok_status_code=requests.codes.ok, auto_renew=True):
"""
Perform a HTTP POST request.
:param str url: URL of the HTTP request.
:param dict | None data: (Optional) Data in POST body of the request.
:param dict | None json: (Optional) Send the dictionary as JSON content in POST body and set proper headers.
:param dict | None headers: (Optional) Additional headers for the HTTP request.
:param int ok_status_code: (Optional) Expected status code for the HTTP response.
:param True | False auto_renew: (Optional) If True, auto recover from expired token error or Internet failure.
:rtype: requests.Response
"""
params = {
'proxies': self.proxies
}
if json is not None:
params['json'] = json
else:
params['data'] = data
if headers is not None:
params['headers'] = headers
return self.request('post', url, params, ok_status_code=ok_status_code, auto_renew=auto_renew)
def mixin_request_id(self, resp):
"""mixin request id from request response
:param request.Response resp: http response
"""
if isinstance(resp, Response):
# Extract 'X-Openstack-Request-Id' from headers if
# response is a Response object.
request_id = (resp.headers.get('openstack-request-id') or
resp.headers.get('x-openstack-request-id') or
resp.headers.get('x-compute-request-id'))
else:
# If resp is of type string or None.
request_id = resp
self.request_id = request_id
def __init__(self, data):
super(TestResponse, self).__init__()
self._content_consumed = True
if isinstance(data, dict):
self.status_code = data.get('status_code', 200)
# Fake the text attribute to streamline Response creation
text = data.get('text', "")
if isinstance(text, (dict, list)):
self._content = json.dumps(text)
default_headers = {
"Content-Type": "application/json",
}
else:
self._content = text
default_headers = {}
if six.PY3 and isinstance(self._content, six.string_types):
self._content = self._content.encode('utf-8', 'strict')
self.headers = data.get('headers') or default_headers
else:
self.status_code = data
def test_request_retry(self, mock_request):
"""Tests that two connection errors will be handled"""
class CustomMock(object):
i = 0
def connection_error(self, *args, **kwargs):
self.i += 1
if self.i < 3:
raise requests.exceptions.ConnectionError
else:
r = requests.Response()
r.status_code = 204
return r
mock_request.side_effect = CustomMock().connection_error
cli = InfluxDBClient(database='db')
cli.write_points(
self.dummy_points
)
def test_request_retry_raises(self, mock_request):
"""Tests that three connection errors will not be handled"""
class CustomMock(object):
i = 0
def connection_error(self, *args, **kwargs):
self.i += 1
if self.i < 4:
raise requests.exceptions.ConnectionError
else:
r = requests.Response()
r.status_code = 200
return r
mock_request.side_effect = CustomMock().connection_error
cli = InfluxDBClient(database='db')
with self.assertRaises(requests.exceptions.ConnectionError):
cli.write_points(self.dummy_points)
def test_request_retry_raises(self, mock_request):
"""Tests that three connection errors will not be handled"""
class CustomMock(object):
i = 0
def connection_error(self, *args, **kwargs):
self.i += 1
if self.i < 4:
raise requests.exceptions.ConnectionError
else:
r = requests.Response()
r.status_code = 200
return r
mock_request.side_effect = CustomMock().connection_error
cli = InfluxDBClient(database='db')
with self.assertRaises(requests.exceptions.ConnectionError):
cli.write_points(self.dummy_points)
def test_add_channel_failed_create_channel(mock_staff_client, mocker):
"""If client.channels.create fails an exception should be raised"""
response_500 = Response()
response_500.status_code = statuses.HTTP_500_INTERNAL_SERVER_ERROR
mock_staff_client.channels.create.return_value.raise_for_status.side_effect = HTTPError(response=response_500)
with pytest.raises(ChannelCreationException) as ex:
api.add_channel(
Search.from_dict({}),
"title",
"name",
"public_description",
"channel_type",
123,
456,
)
assert ex.value.args[0] == "Error creating channel name"
mock_staff_client.channels.create.return_value.raise_for_status.assert_called_with()
assert mock_staff_client.channels.create.call_count == 1
assert PercolateQuery.objects.count() == 0
assert Channel.objects.count() == 0
def test_send_batch_400_no_raise(self, mock_post):
"""
Test that if raise_for_status is False we don't raise an exception for a 400 response
"""
mock_post.return_value = Mock(
spec=Response,
status_code=HTTP_400_BAD_REQUEST,
json=mocked_json()
)
chunk_size = 10
recipient_tuples = [("{0}@example.com".format(letter), None) for letter in string.ascii_letters]
assert len(recipient_tuples) == 52
with override_settings(
MAILGUN_RECIPIENT_OVERRIDE=None,
):
resp_list = MailgunClient.send_batch(
'email subject', 'email body', recipient_tuples, chunk_size=chunk_size, raise_for_status=False
)
assert len(resp_list) == 6
for resp in resp_list:
assert resp.status_code == HTTP_400_BAD_REQUEST
assert mock_post.call_count == 6
assert mock_post.return_value.raise_for_status.called is False
def test_financial_aid_email_error(self, raise_for_status, mock_post):
"""
Test that send_financial_aid_email handles errors correctly
"""
mock_post.return_value = Mock(
spec=Response,
status_code=HTTP_400_BAD_REQUEST,
json=mocked_json(),
)
response = MailgunClient.send_financial_aid_email(
self.staff_user_profile.user,
self.financial_aid,
'email subject',
'email body',
raise_for_status=raise_for_status,
)
assert response.raise_for_status.called is raise_for_status
assert response.status_code == HTTP_400_BAD_REQUEST
assert response.json() == {}
def __init__(self, data):
super(TestResponse, self).__init__()
self._content_consumed = True
if isinstance(data, dict):
self.status_code = data.get('status_code', 200)
# Fake the text attribute to streamline Response creation
text = data.get('text', "")
if isinstance(text, (dict, list)):
self._content = json.dumps(text)
default_headers = {
"Content-Type": "application/json",
}
else:
self._content = text
default_headers = {}
if six.PY3 and isinstance(self._content, six.string_types):
self._content = self._content.encode('utf-8', 'strict')
self.headers = data.get('headers') or default_headers
else:
self.status_code = data
def post_settings(greeting_text):
""" Sets the START Button and also the Greeting Text.
The payload for the START Button will be 'USER_START'
:param str greeting_text: Desired Greeting Text (160 chars).
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
# Set the greeting texts
url = TD_STS_URL + PAGE_ACCESS_TOKEN
txtpayload = {}
txtpayload['setting_type'] = 'greeting'
txtpayload['greeting'] = {'text': greeting_text}
response_msg = json.dumps(txtpayload)
status = requests.post(url, headers=HEADER, data=response_msg)
# Set the start button
btpayload = {}
btpayload['setting_type'] = 'call_to_actions'
btpayload['thread_state'] = 'new_thread'
btpayload['call_to_actions'] = [{'payload': 'USER_START'}]
response_msg = json.dumps(btpayload)
status = requests.post(url, headers=HEADER, data=response_msg)
return status
def post_start_button(payload='START'):
""" Sets the Thread Settings Greeting Text
(/docs/messenger-platform/thread-settings/get-started-button).
:param str payload: Desired postback payload (Default START).
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
url = TD_STS_URL + PAGE_ACCESS_TOKEN
btpayload = {}
btpayload["setting_type"] = "call_to_actions"
btpayload["thread_state"] = "new_thread"
btpayload["call_to_actions"] = [{"payload": payload}]
response_msg = json.dumps(btpayload)
status = requests.post(url, headers=HEADER, data=response_msg)
return status
def post_domain_whitelisting(whitelisted_domains, domain_action_type='add'):
""" Sets the whistelisted domains for the Messenger Extension
(/docs/messenger-platform/thread-settings/domain-whitelisting).
:param list whistelisted_domains: Domains to be whistelisted.
:param str domain_action_type: Action to run `add/remove` (Defaut add).
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
url = TD_STS_URL + PAGE_ACCESS_TOKEN
payload = {}
payload['setting_type'] = 'domain_whitelisting'
payload['whitelisted_domains'] = whitelisted_domains
payload['domain_action_type'] = domain_action_type
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
def typing(fbid, sender_action):
""" Displays/Hides the typing gif/mark seen on facebook chat
(/docs/messenger-platform/send-api-reference/sender-actions)
:param str fbid: User id to display action.
:param str sender_action: `typing_off/typing_on/mark_seen`.
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
url = MSG_URL + PAGE_ACCESS_TOKEN
payload = {}
payload['recipient'] = {'id': fbid}
payload['sender_action'] = sender_action
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
# Send API Content Type
def post_text_message(fbid, message):
""" Sends a common text message
(/docs/messenger-platform/send-api-reference/text-message)
:param str fbid: User id to send the text.
:param str message: Text to be displayed for the user (230 chars).
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
url = MSG_URL + PAGE_ACCESS_TOKEN
payload = {}
payload['recipient'] = {'id': fbid}
payload['message'] = {'text': message} # Limit 320 chars
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
def post_attachment(fbid, media_url, file_type):
""" Sends a media attachment
(/docs/messenger-platform/send-api-reference/contenttypes)
:param str fbid: User id to send the audio.
:param str url: Url of a hosted media.
:param str type: image/audio/video/file.
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
url = MSG_URL + PAGE_ACCESS_TOKEN
payload = {}
payload['recipient'] = {'id': fbid}
attachment = {"type": file_type, "payload": {"url": media_url}}
payload['message'] = {"attachment": attachment}
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
def post_start_button(payload='START'):
""" Sets the **Get Started button**.
:usage:
>>> payload = 'GET_STARTED'
>>> response = fbbotw.post_start_button(payload=payload)
:param str payload: Desired postback payload (Default 'START').
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
:facebook docs: `/get-started-button <https://developers.facebook\
.com/docs/messenger-platform/messenger-profile/get-started-button>`_
"""
url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
payload_data = {}
payload_data['get_started'] = {'payload': payload}
data = json.dumps(payload_data)
status = requests.post(url, headers=HEADER, data=data)
return status
def post_domain_whitelist(whitelisted_domains):
""" Sets the whistelisted domains for the Messenger Extension
:usage:
>>> # Define the array of domains to be whitelisted (Max 10)
>>> whistelisted_domains = [
"https://myfirst_domain.com",
"https://another_domain.com"
]
>>> fbbotw.post_domain_whitelist(
whitelisted_domains=whitelisted_domains
)
:param list whistelisted_domains: Domains to be whistelisted (Max 10).
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
:facebook docs: `/domain-whitelisting <https://developers.facebook.\
com/docs/messenger-platform/messenger-profile/domain-whitelisting>`_
"""
url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
payload = {}
payload['whitelisted_domains'] = whitelisted_domains
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
def post_account_linking_url(account_linking_url):
""" Sets the **liking_url** to connect the user with your business login
:usage:
>>> my_callback_linking_url = "https://my_business_callback.com"
>>> response = fbbotw.post_account_linking_url(
account_linking_url=my_callback_linking_url
)
:param str account_linking_url: URL to the account linking OAuth flow.
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
:facebook docs: `/account-linking-url <https://developers.facebook.\
com/docs/messenger-platform/messenger-profile/account-linking-url>`_
"""
url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
payload = {}
payload['account_linking_url'] = account_linking_url
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status