def test_endpoint_links(self):
with responses.RequestsMock() as rsps:
rsps.add('GET', MOCK_API_URL + '/users/?page=2', json=[
{
'id': 3,
'username': 'user1',
'group': 'watchers',
},
{
'id': 4,
'username': 'user2',
'group': 'watchers',
},
], match_querystring=True, headers={
'Link': '<http://example.com/users/?page=3>; rel=next,<http://example.com/users/?page=1>; rel=previous'
})
users = generic_client.users.filter(page=2)
self.assertEqual(users.response.links, {
'next': {'url': 'http://example.com/users/?page=3', 'rel': 'next'},
'previous': {'url': 'http://example.com/users/?page=1', 'rel': 'previous'}
})
python类RequestsMock()的实例源码
test_endpoint_trailing_slash.py 文件源码
项目:genericclient-requests
作者: genericclient
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
test_functional.py 文件源码
项目:money-to-prisoners-send-money
作者: ministryofjustice
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def test_failure_page(self):
processor_id = '3'
with responses.RequestsMock() as rsps:
rsps.add(rsps.GET, api_url('/payments/'), json={
'uuid': 'wargle-blargle',
'processor_id': processor_id,
'recipient_name': 'James Bond',
'amount': 2000,
'status': 'pending',
'created': datetime.datetime.now().isoformat() + 'Z',
'prisoner_number': 'A5544CD',
'prisoner_dob': '1992-12-05'
})
rsps.add(rsps.GET, govuk_url('/payments/%s' % processor_id), json={
'state': {'status': 'failed'}
})
self.driver.get(self.live_server_url + '/en-gb/debit-card/confirmation/?payment_ref=REF12345')
self.assertInSource('We’re sorry, your payment could not be processed on this occasion')
self.assertInSource('Your reference number is <strong>REF12345</strong>')
test_forms.py 文件源码
项目:money-to-prisoners-send-money
作者: ministryofjustice
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def assertFormValid(self, form): # noqa
with responses.RequestsMock() as rsps, \
mock.patch('send_money.forms.PrisonerDetailsForm.get_api_session') as mocked_api_session:
mocked_api_session.side_effect = get_api_session
mock_auth(rsps)
rsps.add(
rsps.GET,
api_url('/prisoner_validity/'),
json={
'count': 1,
'results': [{
'prisoner_number': 'A1234AB',
'prisoner_dob': '1980-10-05',
}],
},
status=200,
)
self.assertTrue(form.is_valid(), msg='\n\n%s' % form.errors.as_text())
test_commands.py 文件源码
项目:money-to-prisoners-send-money
作者: ministryofjustice
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_update_incomplete_payments_no_govuk_payment_found(self):
with responses.RequestsMock() as rsps:
mock_auth(rsps)
rsps.add(
rsps.GET,
api_url('/payments/'),
json=self.payment_data,
status=200,
)
rsps.add(
rsps.GET,
govuk_url('/payments/%s/' % self.processor_id),
status=404
)
rsps.add(
rsps.PATCH,
api_url('/payments/%s/' % self.ref),
status=200,
)
call_command('update_incomplete_payments')
self.assertEqual(rsps.calls[3].request.body.decode(), '{"status": "failed"}')
test_views.py 文件源码
项目:money-to-prisoners-send-money
作者: ministryofjustice
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_search_not_limited_to_specific_prisons(self, mocked_api_session):
mocked_api_session.side_effect = get_api_session
self.choose_bank_transfer_payment_method()
with responses.RequestsMock() as rsps:
mock_auth(rsps)
rsps.add(
rsps.GET,
api_url('/prisoner_validity/') + '?prisoner_number=A1231DE'
'&prisoner_dob=1980-10-04',
match_querystring=True,
json={
'count': 0,
'results': []
},
)
self.client.post(self.url, data={
'prisoner_number': 'A1231DE',
'prisoner_dob_0': '4',
'prisoner_dob_1': '10',
'prisoner_dob_2': '1980',
}, follow=True)
test_views.py 文件源码
项目:money-to-prisoners-send-money
作者: ministryofjustice
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_can_limit_search_to_specific_prisons(self, mocked_api_session):
mocked_api_session.side_effect = get_api_session
self.choose_bank_transfer_payment_method()
with responses.RequestsMock() as rsps:
mock_auth(rsps)
rsps.add(
rsps.GET,
api_url('/prisoner_validity/') + '?prisoner_number=A1231DE'
'&prisoner_dob=1980-10-04'
'&prisons=ABC,DEF',
match_querystring=True,
json={
'count': 0,
'results': []
},
)
self.client.post(self.url, data={
'prisoner_number': 'A1231DE',
'prisoner_dob_0': '4',
'prisoner_dob_1': '10',
'prisoner_dob_2': '1980',
}, follow=True)
test_views.py 文件源码
项目:money-to-prisoners-send-money
作者: ministryofjustice
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_search_not_limited_to_specific_prisons(self, mocked_api_session):
mocked_api_session.side_effect = get_api_session
self.choose_debit_card_payment_method()
with responses.RequestsMock() as rsps:
mock_auth(rsps)
rsps.add(
rsps.GET,
api_url('/prisoner_validity/') + '?prisoner_number=A1231DE'
'&prisoner_dob=1980-10-04',
match_querystring=True,
json={
'count': 0,
'results': []
},
)
self.client.post(self.url, data={
'prisoner_name': 'john smith',
'prisoner_number': 'A1231DE',
'prisoner_dob_0': '4',
'prisoner_dob_1': '10',
'prisoner_dob_2': '1980',
}, follow=True)
test_views.py 文件源码
项目:money-to-prisoners-send-money
作者: ministryofjustice
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_debit_card_payment_handles_govuk_errors(self):
self.choose_debit_card_payment_method()
self.fill_in_prisoner_details()
self.fill_in_amount()
with responses.RequestsMock() as rsps:
mock_auth(rsps)
rsps.add(
rsps.POST,
api_url('/payments/'),
json={'uuid': 'wargle-blargle'},
status=201,
)
rsps.add(
rsps.POST,
govuk_url('/payments/'),
status=500
)
with self.patch_prisoner_details_check(), silence_logger():
response = self.client.get(self.url, follow=False)
self.assertContains(response, 'We’re sorry, your payment could not be processed on this occasion')
test_views.py 文件源码
项目:money-to-prisoners-send-money
作者: ministryofjustice
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def test_confirmation_handles_api_update_errors(self):
self.choose_debit_card_payment_method()
self.fill_in_prisoner_details()
self.fill_in_amount()
with responses.RequestsMock() as rsps:
mock_auth(rsps)
rsps.add(
rsps.GET,
api_url('/payments/%s/' % self.ref),
status=500,
)
with self.patch_prisoner_details_check(), silence_logger():
response = self.client.get(
self.url, {'payment_ref': self.ref}, follow=False
)
self.assertContains(response, 'your payment could not be processed')
self.assertContains(response, self.ref[:8].upper())
# check session is cleared
for key in self.complete_session_keys:
self.assertNotIn(key, self.client.session)
def response_mocker(kwargs, base_url, endpoint_url, status=200,
content_type='application/json', post=False, data=None):
"""
Generates a mocked requests response for a given set of
kwargs, base url and endpoint url
"""
url = re.sub('\{\{(?P<m>[a-zA-Z_]+)\}\}', lambda m: "%s" % kwargs.get(m.group(1)),
base_url + endpoint_url)
with responses.RequestsMock() as rsps:
if post:
rsps.add(responses.POST, url,
body=b'{"data": "some json formatted output"}',
status=status, content_type='application/json')
response = requests.post(url, data=data)
elif content_type == 'application/json':
rsps.add(responses.GET, url,
body=b'{"data": "some json formatted output"}',
status=status, content_type='application/json')
response = requests.get(url)
elif content_type == 'text/plain':
rsps.add(responses.GET, url,
body="Some text-based content\n spanning multiple lines",
status=status, content_type='text/plain')
response = requests.get(url)
else:
rsps.add(responses.GET, url,
body=b"Some other binary stuff...",
status=status, content_type='application/octet-stream')
response = requests.get(url)
return response
def test_downloader_http_errors(self):
with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps:
rsps.add(responses.GET, self.TEST_MPD_URL, status=500)
rsps.add(responses.GET, self.TEST_MPD_URL, status=404)
dl = live.Downloader(
mpd=self.TEST_MPD_URL,
output_dir='output_httperrors',
duplicate_etag_retry=2,
singlethreaded=True)
dl.run()
dl.stream_id = '17875351285037717'
output_file = 'output_httperrors.mp4'
dl.stitch(output_file, cleartempfiles=True)
self.assertFalse(os.path.isfile(output_file), '{0!s} is generated'.format(output_file))
def test_downloader_resp_headers(self):
with open('mpdstub/mpd/17875351285037717.mpd', 'r') as f:
mpd_content = f.read()
with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps:
rsps.add(responses.GET, self.TEST_MPD_URL, body=mpd_content)
rsps.add(responses.GET, self.TEST_MPD_URL, body=mpd_content,
headers={'Cache-Control': 'max-age=1'})
rsps.add(responses.GET, self.TEST_MPD_URL, body=mpd_content,
headers={'X-FB-Video-Broadcast-Ended': '1'})
dl = live.Downloader(
mpd=self.TEST_MPD_URL,
output_dir='output_respheaders')
dl.run()
with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps:
rsps.add(responses.GET, self.TEST_MPD_URL, body=mpd_content,
headers={'Cache-Control': 'max-age=1'})
rsps.add(responses.GET, self.TEST_MPD_URL, body=mpd_content,
headers={'Cache-Control': 'max-age=1000'})
dl = live.Downloader(
mpd=self.TEST_MPD_URL,
output_dir='output_respheaders')
dl.run()
# Can't stitch and check for output because responses does not support
# url pass through, so the segments cannot be downloaded.
def api_responses():
with responses.RequestsMock() as r:
yield r
def test_generate_metadata(self):
with RequestsMock() as rsp:
rsp.add(rsp.GET, re.compile('.*SHOW\+DATABASES.*'),
json=json_database_list, match_querystring=True)
rsp.add(rsp.GET, re.compile('.*SHOW\+MEASUREMENTS.*'),
json=json_measurement_list, match_querystring=True)
rsp.add(rsp.GET, re.compile('.*SHOW\+MEASUREMENTS.*'),
json=json_measurement_list, match_querystring=True)
rsp.add(rsp.GET, re.compile('.*SHOW\+FIELD\+KEYS.*'),
json=json_field_keys, match_querystring=True)
rsp.add(rsp.GET, re.compile('.*SHOW\+TAG\+KEYS.*'),
json=json_tag_keys, match_querystring=True)
rsp.add(rsp.GET, re.compile('.*SHOW\+FIELD\+KEYS.*'),
json=json_field_keys, match_querystring=True)
rsp.add(rsp.GET, re.compile('.*SHOW\+TAG\+KEYS.*'),
json=json_tag_keys, match_querystring=True)
rsp.add(rsp.GET, re.compile('.*SHOW\+FIELD\+KEYS.*'),
json=json_field_keys, match_querystring=True)
rsp.add(rsp.GET, re.compile('.*SHOW\+TAG\+KEYS.*'),
json=json_tag_keys, match_querystring=True)
rsp.add(rsp.GET, re.compile('.*SHOW\+FIELD\+KEYS.*'),
json=json_field_keys, match_querystring=True)
rsp.add(rsp.GET, re.compile('.*SHOW\+TAG\+KEYS.*'),
json=json_tag_keys, match_querystring=True)
metadata = generate_metadata('influxdb://localhost:8086')
file1 = open(os.path.join('test_data', 'test_metadata.xml'), 'r').read()
open(os.path.join('test_data', 'tmp_metadata.xml'), 'wb').write(metadata)
self.assert_(metadata == file1)
def test_len_collection(self):
first_feed = next(self._container.itervalues())
collection = first_feed.OpenCollection()
with RequestsMock() as rsp:
rsp.add(rsp.GET, re.compile('.*SELECT\+COUNT.*'),
json=json_count(collection.name), match_querystring=True)
len_collection = len(collection)
self.assertEqual(len_collection, NUM_TEST_POINTS)