def _setup_mocking(self):
def _mock_callback(request):
method = request.method.lower()
url = request.path_url
handler = getattr(self.app, method)
r = handler(
url,
data=request.body,
headers=dict(request.headers)
)
return (r.status_code, r.headers, r.data)
pattern = re.compile("{}/(.*)".format(self.host))
methods = [
responses.GET,
responses.POST,
responses.PUT,
responses.DELETE,
responses.PATCH,
]
for method in methods:
responses.add_callback(
method, pattern,
callback=_mock_callback
)
python类PUT的实例源码
def set_put_object_response(self, id, object_, user=1, code=200):
"""Sets mock PUT /api/odlcs/<id> response.
Args:
id (int): Target ID.
object_ (dict): Target data to update with.
user (int): User number to respond with.
code (int): Status code to respond with.
"""
object_.update({"id": id, "user": user})
content = json.dumps(object_)
self.rsps.add(
responses.PUT,
"{}/api/odlcs/{:d}".format(self.url, id),
status=code,
body=content if code == 200 else "",
content_type="application/json")
def test_resource_save(self):
with responses.RequestsMock() as rsps:
rsps.add(responses.GET, MOCK_API_URL + '/users/1', json={
'id': 1,
'username': 'user1',
'group': 'watchers',
})
user1 = generic_client.users.get(id=1)
self.assertEqual(user1.username, 'user1')
with responses.RequestsMock() as rsps:
rsps.add(responses.PUT, MOCK_API_URL + '/users/1', json={
'id': 1,
'username': 'user1',
'group': 'admins',
})
user1.group = 'admins'
user1.save()
def test_resource_save_uuid(self):
with responses.RequestsMock() as rsps:
rsps.add(responses.GET, MOCK_API_URL + '/users/1', json={
'uuid': 1,
'username': 'user1',
'group': 'watchers',
})
user1 = generic_client.users.get(uuid=1)
self.assertEqual(user1.username, 'user1')
with responses.RequestsMock() as rsps:
rsps.add(responses.PUT, MOCK_API_URL + '/users/1', json={
'uuid': 1,
'username': 'user1',
'group': 'admins',
})
user1.group = 'admins'
user1.save()
test_resource_trailing_slash.py 文件源码
项目:genericclient-requests
作者: genericclient
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_resource_save(self):
with responses.RequestsMock() as rsps:
rsps.add(responses.GET, MOCK_API_URL + '/users/1/', json={
'id': 1,
'username': 'user1',
'group': 'watchers',
})
user1 = generic_client.users.get(id=1)
self.assertEqual(user1.username, 'user1')
with responses.RequestsMock() as rsps:
rsps.add(responses.PUT, MOCK_API_URL + '/users/1/', json={
'id': 1,
'username': 'user1',
'group': 'admins',
})
user1.group = 'admins'
user1.save()
def test_publish_invalid(self):
# GIVEN datapackage that can be treated as valid by the dpm
self.valid_dp = datapackage.DataPackage({
"name": "some-datapackage",
"resources": [
{ "name": "some-resource", "path": "./data/some_data.csv", }
]
},
default_base_path='.')
patch('dpm.client.DataPackage', lambda *a: self.valid_dp).start()
patch('dpm.client.exists', lambda *a: True).start()
# AND the server that accepts any user
responses.add(
responses.POST, 'http://127.0.0.1:5000/api/auth/token',
json={'token': 'blabla'},
status=200)
# AND server rejects any datapackage as invalid
responses.add(
responses.PUT, 'http://127.0.0.1:5000/api/package/user/some-datapackage',
json={'message': 'invalid datapackage json'},
status=400)
# AND the client
client = Client(dp1_path, self.config)
# WHEN publish() is invoked
try:
result = client.publish()
except Exception as e:
result = e
# THEN HTTPStatusError should be raised
assert isinstance(result, HTTPStatusError)
# AND 'invalid datapackage json' should be printed to stdout
self.assertRegexpMatches(str(result), 'invalid datapackage json')
def test_add(self):
slims = Slims("testSlims", "http://localhost:9999", "admin", "admin")
responses.add(
responses.PUT,
'http://localhost:9999/rest/Content',
json={"entities": [{
"pk": 1,
"tableName": "Content",
"columns": []
}]},
content_type='application/json',
)
added = slims.add("Content", {"test": "foo"})
self.assertIsInstance(added, Record)
def test_heal(self):
val_response = {
'courses': [{u'WestonHS/PFLC1x/3T2015': None}],
'encoded_videos': [{
'url': 'https://testurl.mp4',
'file_size': 8499040,
'bitrate': 131,
'profile': 'mobile_low',
}]
}
responses.add(
responses.POST,
CONFIG_DATA['val_token_url'],
'{"access_token": "1234567890"}',
status=200
)
responses.add(
responses.GET,
build_url(CONFIG_DATA['val_api_url'], self.video_id),
body=json.dumps(val_response),
content_type='application/json',
status=200
)
responses.add(
responses.PUT,
build_url(CONFIG_DATA['val_api_url'], self.video_id),
status=200
)
heal = VedaHeal()
heal.discovery()
def test_push(self):
mydir = os.path.dirname(__file__)
build_path = os.path.join(mydir, './build_simple.yml')
command.build('foo/bar', build_path)
pkg_obj = store.PackageStore.find_package('foo', 'bar')
pkg_hash = pkg_obj.get_hash()
assert pkg_hash
contents = pkg_obj.get_contents()
all_hashes = set(find_object_hashes(contents))
upload_urls = {
blob_hash: dict(
head="https://example.com/head/{owner}/{hash}".format(owner='foo', hash=blob_hash),
put="https://example.com/put/{owner}/{hash}".format(owner='foo', hash=blob_hash)
) for blob_hash in all_hashes
}
# We will push the package twice, so we're mocking all responses twice.
for blob_hash in all_hashes:
urls = upload_urls[blob_hash]
# First time the package is pushed, s3 HEAD 404s, and we get a PUT.
self.requests_mock.add(responses.HEAD, urls['head'], status=404)
self.requests_mock.add(responses.PUT, urls['put'])
# Second time, s3 HEAD succeeds, and we're not expecting a PUT.
self.requests_mock.add(responses.HEAD, urls['head'])
self._mock_put_package('foo/bar', pkg_hash, upload_urls)
self._mock_put_tag('foo/bar', 'latest')
self._mock_put_package('foo/bar', pkg_hash, upload_urls)
self._mock_put_tag('foo/bar', 'latest')
# Push a new package.
command.push('foo/bar')
# Push it again; this time, we're verifying that there are no s3 uploads.
command.push('foo/bar')
def _mock_put_package(self, package, pkg_hash, upload_urls):
pkg_url = '%s/api/package/%s/%s' % (command.get_registry_url(), package, pkg_hash)
# Dry run, then the real thing.
self.requests_mock.add(responses.PUT, pkg_url, json.dumps(dict(upload_urls=upload_urls)))
self.requests_mock.add(responses.PUT, pkg_url, json.dumps(dict()))
def _mock_put_tag(self, package, tag):
tag_url = '%s/api/tag/%s/%s' % (command.get_registry_url(), package, tag)
self.requests_mock.add(responses.PUT, tag_url, json.dumps(dict()))
def test_version_add_confirmed(self, mock_input, mock_match_hash):
registry_url = command.get_registry_url()
mock_input.return_value = 'y'
mock_match_hash.return_value = 'fabc123'
# Response content is not checked by version_add, so
# status ok and URL verification are enough
self.requests_mock.add(
responses.PUT,
registry_url + "/api/version/user/test/2.9.12",
status=200,
)
command.version_add('user/test', '2.9.12', 'fabc123')
def test_short_hashes(self):
"""
Test various functions that use short hashes
"""
table_data, table_hash = self.make_table_data()
file_data, file_hash = self.make_file_data()
contents, contents_hash = self.make_contents(table=table_hash, file=file_hash)
self._mock_log('foo/bar', contents_hash)
self._mock_tag('foo/bar', 'mytag', contents_hash[0:6], cmd=responses.PUT)
command.tag_add('foo/bar', 'mytag', contents_hash[0:6])
self._mock_version('foo/bar', '1.0', contents_hash[0:6], cmd=responses.PUT)
command.version_add('foo/bar', '1.0', contents_hash[0:6], force=True)
def setup_recording(self, **kwargs):
_logger.info("recording ...")
self.responses.reset()
all_requests_re = re.compile("http.*")
methods = (responses.GET, responses.POST, responses.PUT,
responses.PATCH, responses.DELETE, responses.HEAD,
responses.OPTIONS)
for http_method in methods:
self.responses.add_callback(
http_method, all_requests_re,
match_querystring=False,
callback=self.record())