def test_downloader_conn_error(self):
exception = ConnectionError()
with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps:
max_retry = 3
for _ in range(max_retry + 1):
rsps.add(responses.GET, self.TEST_MPD_URL, body=exception)
dl = live.Downloader(
mpd=self.TEST_MPD_URL,
output_dir='output_connerror',
duplicate_etag_retry=2,
singlethreaded=True,
max_connection_error_retry=max_retry)
dl.run()
dl.stream_id = '17875351285037717'
output_file = 'output_connerror.mp4'
dl.stitch(output_file, cleartempfiles=True)
self.assertFalse(os.path.isfile(output_file), '{0!s} not generated'.format(output_file))
python类RequestsMock()的实例源码
def setup_versions(self, stable, devel, nightly, esr, esr_next=None):
"""
Add a mock response for official versions
"""
self.cleanup()
body = {
"FIREFOX_NIGHTLY": nightly,
"FIREFOX_ESR": esr,
"FIREFOX_ESR_NEXT": esr_next,
"LATEST_FIREFOX_DEVEL_VERSION": devel,
"LATEST_FIREFOX_OLDER_VERSION": "3.6.28",
"LATEST_FIREFOX_RELEASED_DEVEL_VERSION": devel,
"LATEST_FIREFOX_VERSION": stable,
}
local_mock = responses.RequestsMock()
local_mock.add(responses.GET, versions.URL_VERSIONS, json=body)
local_mock.start()
yield local_mock
local_mock.stop()
def test_create_new_user(self):
with responses.RequestsMock() as rsps:
rsps.add(
self.deform_client.user.create.method.upper(),
self.deform_client.user.create.get_context({})['url'],
json={
'message': 'Check your email for confirmation code'
},
status=201
)
response = self.deform_client.user.create(
email='a' + self.CONFIG['DEFORM']['EMAIL'],
password=self.CONFIG['DEFORM']['PASSWORD']
)
assert_that(
response,
has_entry('message', 'Check your email for confirmation code')
)
def test_confirm(self):
session_id = 'Qivjy3PW3dqxAOY9i4MeT1H0FHyGkw'
with responses.RequestsMock() as rsps:
rsps.add(
self.deform_client.user.confirm.method.upper(),
self.deform_client.user.confirm.get_context({})['url'],
json={
'sessionId': session_id
},
status=200
)
response = self.deform_client.user.confirm(
code='12345'
)
assert_that(
response,
has_entry('sessionId', session_id)
)
def test_confirm_already_confirmed_code(self):
with responses.RequestsMock() as rsps:
rsps.add(
self.deform_client.user.confirm.method.upper(),
self.deform_client.user.confirm.get_context({})['url'],
json={
'message': 'User already exists.'
},
status=409
)
assert_that(
calling(self.deform_client.user.confirm).with_args(
code='12345'
),
raises(ConflictError, '^User already exists\.$')
)
def test_all_the_things(self, core_client, mgmt_server_base_uri, resource, params):
endpoint = mgmt_server_base_uri + resource
with responses.RequestsMock() as rsps:
resp_body = {'foo': 'bar', 'message': 'OK'}
rsps.add(responses.POST, endpoint,
json=resp_body, status=200,
content_type='application/json')
lm = cpauto.LoginMessage(core_client)
if resource == "show-login-message":
r = lm.show()
assert r.status_code == 200
assert r.json() == resp_body
if resource == "set-login-message":
r = lm.set(params=params)
assert r.status_code == 200
assert r.json() == resp_body
def test_show_all_access_layers(core_client, mgmt_server_base_uri,
limit, offset, order, details_level):
endpoint = mgmt_server_base_uri + 'show-access-layers'
with responses.RequestsMock() as rsps:
resp_body = {'foo': 'bar', 'message': 'OK'}
rsps.add(responses.POST, endpoint,
json=resp_body, status=200,
content_type='application/json')
c = cpauto.AccessLayer(core_client)
r = c.show_all(limit=limit, offset=offset,
order=order, details_level=details_level)
assert r.status_code == 200
assert r.json() == resp_body
# NATRule
def test_preferred_object(self):
with open('tests/rets_responses/GetObject_multipart.byte', 'rb') as f:
multiple = f.read()
multi_headers = {
'Content-Type': 'multipart/parallel; boundary="24cbd0e0afd2589bb9dcb1f34cf19862"; charset=utf-8',
'Connection': 'keep-alive', 'RETS-Version': 'RETS/1.7.2', 'MIME-Version': '1.0, 1.0'}
with responses.RequestsMock() as resps:
resps.add(resps.POST, 'http://server.rets.com/rets/GetObject.ashx',
body=multiple, status=200, headers=multi_headers)
obj = self.session.get_preferred_object(resource='Property', object_type='Photo', content_id=1)
self.assertTrue(obj)
resps.add(resps.POST, 'http://server.rets.com/rets/GetObject.ashx',
body=multiple, status=200)
resource = dict()
resource['ResourceID'] = 'Agent'
obj1 = self.session.get_preferred_object(resource=resource, object_type='Photo', content_id=1)
self.assertTrue(obj1)
def test_class_metadata(self):
with open('tests/rets_responses/COMPACT-DECODED/GetMetadata_classes.xml') as f:
contents = ''.join(f.readlines())
with open('tests/rets_responses/COMPACT-DECODED/GetMetadata_classes_single.xml') as f:
single_contents = ''.join(f.readlines())
with responses.RequestsMock() as resps:
resps.add(resps.POST, 'http://server.rets.com/rets/GetMetadata.ashx',
body=contents, status=200)
resource_classes = self.session.get_class_metadata(resource='Agent')
self.assertEqual(len(resource_classes), 6)
resps.add(resps.POST, 'http://server.rets.com/rets/GetMetadata.ashx',
body=single_contents, status=200)
resource_classes_single = self.session.get_class_metadata(resource='Property')
self.assertEqual(len(resource_classes_single), 1)
def test_auto_offset(self):
with open('tests/rets_responses/COMPACT-DECODED/Search_1of2.xml') as f:
search1_contents = ''.join(f.readlines())
with open('tests/rets_responses/COMPACT-DECODED/Search_2of2.xml') as f:
search2_contents = ''.join(f.readlines())
with responses.RequestsMock() as resps:
resps.add(resps.POST, 'http://server.rets.com/rets/Search.ashx',
body=search1_contents, status=200, stream=True)
resps.add(resps.POST, 'http://server.rets.com/rets/Search.ashx',
body=search2_contents, status=200, stream=True)
results = self.session.search(resource='Property',
resource_class='RES',
search_filter={'ListingPrice': 200000})
self.assertEqual(len(results), 6)
def test_change_parser_automatically(self):
self.assertEqual(self.session.metadata_format, 'COMPACT-DECODED')
with open('tests/rets_responses/Errors/20514.xml') as f:
dtd_error = ''.join(f.readlines())
with open('tests/rets_responses/STANDARD-XML/GetMetadata_system.xml') as f:
content = ''.join(f.readlines())
with responses.RequestsMock() as resps:
resps.add(resps.POST, 'http://server.rets.com/rets/GetMetadata.ashx',
body=dtd_error, status=200)
resps.add(resps.POST, 'http://server.rets.com/rets/GetMetadata.ashx',
body=content, status=200)
self.session.get_system_metadata()
self.assertEqual(self.session.metadata_format, 'STANDARD-XML')
def test_request_renew_auth_token_success(self, auth_mock):
auth_mock.request_new_access_token.return_value = "access_token"
with responses.RequestsMock(
assert_all_requests_are_fired=True) as rsps:
rsps.add(
responses.GET,
'https://hostname.com/v1/this/1',
body="foo", status=401
)
rsps.add(
responses.GET,
'https://hostname.com/v1/this/1',
body="bar", status=200
)
session = self.session
response = session.get(self.request_url)
headers = dict(response.request.headers)
assert response.text == "bar"
assert all(value == headers[key] for key, value in self.exp_headers.items())
def test_request_renew_auth_token_fail(self, auth_mock):
auth_mock.request_new_access_token.return_value = "access_token"
with responses.RequestsMock(
assert_all_requests_are_fired=True) as rsps:
rsps.add(
responses.GET,
'https://hostname.com/v1/this/1',
body="bar",
status=401
)
rsps.add(
responses.GET,
'https://hostname.com/v1/this/1',
body="foo",
status=401
)
session = self.session
response = session.get(self.request_url)
assert response.text == "foo"
def test_no_hooks(self, auth_mock):
auth_mock.request_new_access_token.return_value = "access_token"
with responses.RequestsMock(
assert_all_requests_are_fired=True) as rsps:
rsps.add(
responses.GET,
'https://hostname.com/v1/this/1',
body="foo", status=401
)
rsps.add(
responses.GET,
'https://hostname.com/v1/this/1',
body="bar", status=200
)
session = self.session
hook_mock = mock.Mock()
# no hooks
session.get(self.request_url)
assert_not_called(hook_mock.pre_hook)
assert_not_called(hook_mock.post_hook)
def test_pre_hook(self, auth_mock):
auth_mock.request_new_access_token.return_value = "access_token"
with responses.RequestsMock(
assert_all_requests_are_fired=True) as rsps:
rsps.add(
responses.GET,
'https://hostname.com/v1/this/1',
body="foo", status=401
)
rsps.add(
responses.GET,
'https://hostname.com/v1/this/1',
body="bar", status=200
)
session = self.session
hook_mock = mock.Mock()
session.register_pre_hook(hook_mock.pre_hook)
session.get(self.request_url)
assert_called_once(hook_mock.pre_hook)
assert_not_called(hook_mock.post_hook)
def test_pre_and_post_hooks(self, auth_mock):
auth_mock.request_new_access_token.return_value = "access_token"
with responses.RequestsMock(
assert_all_requests_are_fired=True) as rsps:
rsps.add(
responses.GET,
'https://hostname.com/v1/this/1',
body="foo", status=401
)
rsps.add(
responses.GET,
'https://hostname.com/v1/this/1',
body="bar", status=200
)
session = self.session
hook_mock = mock.Mock()
session.register_pre_hook(hook_mock.pre_hook)
session.register_post_hook(hook_mock.post_hook)
session.get(self.request_url)
assert_called_once(hook_mock.pre_hook)
assert_called_once(hook_mock.post_hook)
def with_mock_responses(req_resp=None):
def decorator(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
class_name, func_name = re.sub(r'([A-Z]+)', r'_\1', args[0].__class__.__name__).lower().strip('_'), f.__name__
fixtures = load_reqresp_fixture(req_resp or "{}/{}".format(class_name, func_name))
with responses.RequestsMock(assert_all_requests_are_fired=False) as rsps:
for fixture in fixtures:
if 'url_re' in fixture:
url_re = fixture.pop('url_re')
fixture['url'] = re.compile(Client.build_url(url_re))
else:
fixture['url'] = Client.build_url(fixture['url'])
if "content_type" in fixture and fixture['content_type'] == "application/json":
fixture['body'] = json.dumps(fixture['body'])
rsps.add(**fixture)
return f(responses=rsps, *args, **kwargs)
return wrapper
return decorator
def test_modpack_fetch(minimal_pack, tinkers_update):
"""Does the File.fetch fetches the file correctly?"""
minimal_pack.path /= 'files'
session = requests.Session()
file = tinkers_update
with betamax.Betamax(session).use_cassette('file-fetch'):
with pytest.raises(OSError):
minimal_pack.fetch(file, session=session)
minimal_pack.path.mkdir()
minimal_pack.fetch(file, session=session)
filepath = minimal_pack.path / file.name
assert filepath.is_file()
assert filepath.stat().st_mtime == file.date.timestamp()
with responses.RequestsMock() as rsps:
minimal_pack.fetch(file, session=session)
assert len(rsps.calls) == 0
def setup(self, rsps: RequestsMock):
for package_name in self.package_names:
response = [
{"ref": "refs/tags/{}".format(package['version'])}
for package in mock_package_list
if package['name'] == package_name
]
github_name = package_name
if '/' not in github_name:
github_name = package_name + "/" + package_name
rsps.add(rsps.GET, Github._REFS_URL.format(github_name), json=response)
re_notfound = re.compile(escape_regex_format_str(Github._REFS_URL).format("([a-zA-Z-_.]+)"))
for _ in range(self._expected_missing):
rsps.add(rsps.GET, re_notfound, status=404)
def test_parameterized_queries(self, type, query, parameters, expected,
dw, dataset_key, query_response_json):
with responses.RequestsMock() as rsps:
def request_callback(request):
for value in expected:
assert_that(request.url, contains_string(value),
reason="Expected [[\n{}\n]] to contain "
"[[\n{}\n]]".format(request.url, expected))
return(200, {}, json.dumps(query_response_json))
rsps.add_callback(rsps.GET, re.compile(
r'https?://query\.data\.world/.*'),
callback=request_callback,
content_type="application/json",
match_querystring=True)
dw.query(dataset_key, query, query_type=type,
parameters=parameters)
def test_write_basic(self):
with responses.RequestsMock() as resp:
def upload_endpoint(request):
assert "test" == ''.join([chunk.decode('utf-8')
for chunk in request.body])
assert request.headers.get('User-Agent') == _user_agent()
return 200, {}, json.dumps({})
resp.add_callback(resp.PUT,
'{}/uploads/{}/{}/files/{}'
.format('https://api.data.world/v0',
'user', 'dataset', 'file.txt'),
callback=upload_endpoint)
with RemoteFile(DefaultConfig(),
"user/dataset", "file.txt") as writer:
writer.write("test")
def test_write_csv(self):
with responses.RequestsMock() as resp:
def upload_endpoint(request):
assert "a,b\r\n42,17\r\n420,178\r\n" == \
''.join([chunk.decode('utf-8')
for chunk in request.body])
assert request.headers.get('User-Agent') == _user_agent()
return 200, {}, json.dumps({})
resp.add_callback(resp.PUT, '{}/uploads/{}/{}/files/{}'
.format('https://api.data.world/v0',
'user', 'dataset', 'file.csv'),
callback=upload_endpoint)
with RemoteFile(DefaultConfig(),
"user/dataset", "file.csv") as writer:
csvw = csv.DictWriter(writer, fieldnames=['a', 'b'])
csvw.writeheader()
csvw.writerow({'a': 42, 'b': 17})
csvw.writerow({'a': 420, 'b': 178})
def test_read_jsonl(self):
with responses.RequestsMock() as resp:
def download_endpoint(request):
assert request.headers.get('User-Agent') == _user_agent()
return 200, {}, '{"A":"1", "B":"2", "C":"3"}\n' \
'{"A":"4", "B":"5", "C":"6"}\n'
resp.add_callback(resp.GET, '{}/file_download/{}/{}/{}'
.format('https://query.data.world',
'user', 'dataset', 'file.csv'),
callback=download_endpoint)
with RemoteFile(DefaultConfig(), "user/dataset", "file.csv",
mode="r") as reader:
rows = [json.loads(line) for line in reader if line.strip()]
assert rows[0] == {'A': '1', 'B': '2', 'C': '3'}
assert rows[1] == {'A': '4', 'B': '5', 'C': '6'}
def test_endpoint_all(self):
with responses.RequestsMock() as rsps:
rsps.add(responses.GET, MOCK_API_URL + '/users', json=[
{
'id': 1,
'username': 'user1',
'group': 'watchers',
},
{
'id': 2,
'username': 'user2',
'group': 'watchers',
},
{
'id': 3,
'username': 'user3',
'group': 'admin',
},
])
users = generic_client.users.all()
self.assertEqual(len(users), 3)
def test_endpoint_links(self):
with responses.RequestsMock() as rsps:
rsps.add('GET', MOCK_API_URL + '/users?page=2', json=[
{
'id': 3,
'username': 'user1',
'group': 'watchers',
},
{
'id': 4,
'username': 'user2',
'group': 'watchers',
},
], match_querystring=True, headers={
'Link': '<http://example.com/users?page=3>; rel=next,<http://example.com/users?page=1>; rel=previous'
})
users = generic_client.users.filter(page=2)
self.assertEqual(users.response.links, {
'next': {'url': 'http://example.com/users?page=3', 'rel': 'next'},
'previous': {'url': 'http://example.com/users?page=1', 'rel': 'previous'}
})
def test_endpoint_delete(self):
with responses.RequestsMock() as rsps:
rsps.add(responses.DELETE, MOCK_API_URL + '/users/1', status=204)
generic_client.users.delete(1)
with responses.RequestsMock() as rsps:
rsps.add(responses.DELETE, MOCK_API_URL + '/users/1', status=404)
with self.assertRaises(generic_client.ResourceNotFound):
generic_client.users.delete(1)
with responses.RequestsMock() as rsps:
rsps.add(responses.DELETE, MOCK_API_URL + '/users/1', status=500)
with self.assertRaises(generic_client.HTTPError):
generic_client.users.delete(1)
def test_resource_save_uuid(self):
with responses.RequestsMock() as rsps:
rsps.add(responses.GET, MOCK_API_URL + '/users/1', json={
'uuid': 1,
'username': 'user1',
'group': 'watchers',
})
user1 = generic_client.users.get(uuid=1)
self.assertEqual(user1.username, 'user1')
with responses.RequestsMock() as rsps:
rsps.add(responses.PUT, MOCK_API_URL + '/users/1', json={
'uuid': 1,
'username': 'user1',
'group': 'admins',
})
user1.group = 'admins'
user1.save()
test_resource_trailing_slash.py 文件源码
项目:genericclient-requests
作者: genericclient
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def test_resource_save(self):
with responses.RequestsMock() as rsps:
rsps.add(responses.GET, MOCK_API_URL + '/users/1/', json={
'id': 1,
'username': 'user1',
'group': 'watchers',
})
user1 = generic_client.users.get(id=1)
self.assertEqual(user1.username, 'user1')
with responses.RequestsMock() as rsps:
rsps.add(responses.PUT, MOCK_API_URL + '/users/1/', json={
'id': 1,
'username': 'user1',
'group': 'admins',
})
user1.group = 'admins'
user1.save()
def test_endpoint_detail_route(self):
with responses.RequestsMock() as rsps:
rsps.add_callback(
responses.POST, MOCK_API_URL + '/users/2/notify',
callback=request_callback,
content_type='application/json',
)
response = generic_client.users(id=2).notify(unread=3)
self.assertEqual(response.json(), {'unread': 3})
with responses.RequestsMock() as rsps:
rsps.add_callback(
responses.GET, MOCK_API_URL + '/users/2/notify',
callback=request_callback,
content_type='application/json',
)
response = generic_client.users(_method='get', id=2).notify(unread=3)
self.assertEqual(response.json(), {'unread': 3})
def test_endpoint_list_route(self):
with responses.RequestsMock() as rsps:
rsps.add_callback(
responses.POST, MOCK_API_URL + '/users/notify',
callback=request_callback,
content_type='application/json',
)
response = generic_client.users().notify(unread=3)
self.assertEqual(response.json(), {'unread': 3})
with responses.RequestsMock() as rsps:
rsps.add_callback(
responses.GET, MOCK_API_URL + '/users/notify',
callback=request_callback,
content_type='application/json',
)
response = generic_client.users(_method='get').notify(unread=3)
self.assertEqual(response.json(), {'unread': 3})