def setUp(self):
httpretty.enable()
httpretty.register_uri(
httpretty.GET, "http://test.gf/users",
body=ujson.dumps([
sam_profile,
jack_profile,
]),
content_type="application/json"
)
httpretty.register_uri(
httpretty.GET, "http://test.gf/blog/sam",
body=ujson.dumps(sams_articles),
content_type="application/json"
)
httpretty.register_uri(
httpretty.GET, "http://test.gf/blog/jack",
body=ujson.dumps(jacks_articles),
content_type="application/json"
)
python类dumps()的实例源码
def process_response(self, req, resp, resource, req_succeeded):
"""
Converts response body to JSON.
:param req: Falcon request
:type req: falcon.request.Request
:param resp: Falcon response
:type resp: falcon.response.Response
:param resource:
:type resource: falcon_dbapi.resources.base.BaseResource
:param req_succeeded:
:type req_succeeded: bool
"""
if req_succeeded:
resp.body = json.dumps(resp.body)
def _exec_hmset_dict(self, insts):
models_keys_insts_keys_insts_map = defaultdict(dict)
models_keys_insts_keys_map = defaultdict(set)
for inst in insts:
model = type(inst)
if not model.__use_redis__:
continue
filters_names_set = await self._get_filters_names_set(inst)
for filters_names in filters_names_set:
model_redis_key = model.get_key(filters_names.decode())
inst_redis_key = model.get_instance_key(inst)
inst_old_redis_key = getattr(inst, 'old_redis_key', None)
if inst_old_redis_key is not None and inst_old_redis_key != inst_redis_key:
models_keys_insts_keys_map[model_redis_key].add(inst_old_redis_key)
models_keys_insts_keys_insts_map[model_redis_key][inst_redis_key] = ujson.dumps(inst.todict())
for model_key, insts_keys_insts_map in models_keys_insts_keys_insts_map.items():
await self.redis_bind.hmset_dict(model_key, insts_keys_insts_map)
for model_key, insts_keys in models_keys_insts_keys_map.items():
await self.redis_bind.hdel(model_key, *insts_keys)
def test_integrity_error_handling_with_foreign_key(self, post_method, stream, session):
stream.feed_data(ujson.dumps([{'m2_id': 1}]).encode())
stream.feed_eof()
request = SwaggerRequest('/model1/', 'post', body=stream, headers={'content-type': 'application/json'})
resp = await post_method(request, session)
assert resp.status_code == 400
assert ujson.loads(resp.body) == {
'params': {'m2_id': 1},
'database message': {
'message': 'Cannot add or update a child row: '
'a foreign key constraint fails '
'(`swaggerit_test`.`model1_swagger`, '
'CONSTRAINT `model1_swagger_ibfk_1` FOREIGN '
'KEY (`m2_id`) REFERENCES `model2_swagger` '
'(`id`))',
'code': 1452
}
}
def test_model_base_error_handling_with_patch_and_with_nested_delete(self, patch_method, post_method, stream, session):
stream.feed_data(b'[{}]')
stream.feed_eof()
request = SwaggerRequest('/model1/1/', 'patch', body=stream, headers={'content-type': 'application/json'})
await post_method(request, session)
stream = asyncio.StreamReader(loop=session.loop)
body = {'model2_': {'id': 1, '_operation': 'delete'}}
stream.feed_data(ujson.dumps(body).encode())
stream.feed_eof()
request = SwaggerRequest('/model1/1/', 'patch', path_params={'id': 1}, body=stream, headers={'content-type': 'application/json'})
resp = await patch_method(request, session)
assert resp.status_code == 400
assert ujson.loads(resp.body) == {
'instance': [body],
'message': "Can't execute nested 'delete' operation"
}
def test_model_base_error_handling_with_patch_and_with_nested_remove(self, patch_method, post_method, stream, session):
stream.feed_data(b'[{}]')
stream.feed_eof()
request = SwaggerRequest('/model1/1/', 'patch', body=stream, headers={'content-type': 'application/json'})
await post_method(request, session)
stream = asyncio.StreamReader(loop=session.loop)
body = {'model2_': {'id': 1, '_operation': 'remove'}}
stream.feed_data(ujson.dumps(body).encode())
stream.feed_eof()
request = SwaggerRequest('/model1/1/', 'patch', path_params={'id': 1}, body=stream, headers={'content-type': 'application/json'})
resp = await patch_method(request, session)
assert resp.status_code == 400
assert ujson.loads(resp.body) == {
'instance': [body],
'message': "Can't execute nested 'remove' operation"
}
def test_model_base_error_handling_with_patch_and_with_nested_update(self, patch_method, post_method, stream, session):
stream.feed_data(b'[{}]')
stream.feed_eof()
request = SwaggerRequest('/model1/1/', 'patch', body=stream, headers={'content-type': 'application/json'})
await post_method(request, session)
stream = asyncio.StreamReader(loop=session.loop)
body = {'model2_': {'id': 1, '_operation': 'update'}}
stream.feed_data(ujson.dumps(body).encode())
stream.feed_eof()
request = SwaggerRequest('/model1/1/', 'patch', path_params={'id': 1}, body=stream, headers={'content-type': 'application/json'})
resp = await patch_method(request, session)
assert resp.status_code == 400
assert ujson.loads(resp.body) == {
'instance': [body],
'message': "Can't execute nested 'update' operation"
}
def test_if_istances_are_seted_on_redis_with_two_models_correctly(
self, session, redis):
session.add(await Model10.new(session, id=1))
session.add(await Model11.new(session, id=1))
session.add(await Model10.new(session, id=2))
session.add(await Model11.new(session, id=2))
await session.commit()
assert await redis.hgetall(Model10.__key__) == {
b'1': ujson.dumps({b'id': 1}).encode(),
b'2': ujson.dumps({b'id': 2}).encode()
}
assert await redis.hgetall(Model11.__key__) == {
b'1': ujson.dumps({b'id': 1}).encode(),
b'2': ujson.dumps({b'id': 2}).encode()
}
def test_if_two_commits_sets_redis_with_two_models_correctly(
self, session, redis):
session.add(await Model10.new(session, id=1))
session.add(await Model11.new(session, id=1))
await session.commit()
assert await redis.hgetall(Model10.__key__) == {
b'1': ujson.dumps({b'id': 1}).encode()
}
assert await redis.hgetall(Model11.__key__) == {
b'1': ujson.dumps({b'id': 1}).encode()
}
session.add(await Model10.new(session, id=2))
session.add(await Model11.new(session, id=2))
await session.commit()
assert await redis.hgetall(Model10.__key__) == {
b'1': ujson.dumps({b'id': 1}).encode(),
b'2': ujson.dumps({b'id': 2}).encode()
}
assert await redis.hgetall(Model11.__key__) == {
b'1': ujson.dumps({b'id': 1}).encode(),
b'2': ujson.dumps({b'id': 2}).encode()
}
def test_if_two_instance_are_deleted_from_redis(self, session, redis):
inst1 = await Model10.new(session, id=1)
inst2 = await Model10.new(session, id=2)
session.add_all([inst1, inst2])
await session.commit()
assert await redis.hgetall(Model10.__key__) == {
b'1': ujson.dumps({b'id': 1}).encode(),
b'2': ujson.dumps({b'id': 2}).encode()
}
session.delete(inst1)
session.delete(inst2)
await session.commit()
assert await redis.hgetall(Model10.__key__) == {}
def _post(self, uri, data):
if type(data) == dict:
data = [data]
if type(data[0]) != dict:
raise RuntimeError('submitted data must be a dictionary')
data = json.dumps(data)
if self.nowait:
uri = "{0}?nowait=1".format(uri)
logger.debug('uri: %s' % uri)
body = self.session.post(uri, data=data, verify=self.verify_ssl)
logger.debug('status code: ' + str(body.status_code))
if body.status_code > 299:
logger.error('request failed: %s' % str(body.status_code))
logger.error(json.loads(body.text).get('message'))
return None
body = json.loads(body.text)
return body
def test_can_access_resource_params(self):
"""Test that params can be accessed from within process_resource"""
global context
class Resource:
def on_get(self, req, resp, **params):
resp.body = json.dumps(params)
app = falcon.API(middleware=AccessParamsMiddleware())
app.add_route('/path/{id}', Resource())
client = testing.TestClient(app)
response = client.simulate_request(path='/path/22')
assert 'params' in context
assert context['params']
assert context['params']['id'] == '22'
assert response.json == {'added': True, 'id': '22'}
def __init__(self, status=None, body=None, json=None, headers=None):
self._default_status = status
self._default_headers = headers
if json is not None:
if body is not None:
msg = 'Either json or body may be specified, but not both'
raise ValueError(msg)
self._default_body = json_dumps(json, ensure_ascii=False)
else:
self._default_body = body
self.captured_req = None
self.captured_resp = None
self.captured_kwargs = None
def send_message(self, method, body, headers):
id = self._message_id
self._message_id += 1
new_headers = {
'message_id': id,
'sent_at': time.time(),
}
if headers:
new_headers.update(headers)
payload = {
'method': method,
'body': body,
'headers': new_headers,
}
# This is a bit ugly, but decide how much we care
if (method != 'v0.reply.control.ping' and 'parent_message_id' in new_headers) or\
method == 'v0.connection.close':
logger.info('Sending rewarder message: %s', payload)
else:
logger.debug('Sending rewarder message: %s', payload)
self.sendMessage(ujson.dumps(payload).encode('utf-8'), False)
def test_double_performance_float_precision(benchmark):
print("\nArray with 256 doubles:")
name = 'rapidjson (precise)'
serialize = rapidjson.dumps
deserialize = rapidjson.loads
ser_data, des_data = benchmark(run_client_test,
name, serialize, deserialize,
data=doubles,
iterations=50000,
)
msg = "%-11s serialize: %0.3f deserialize: %0.3f total: %0.3f" % (
name, ser_data, des_data, ser_data + des_data
)
print(msg)
def from_tweets(cls, tweets, metadata=None, **kwargs):
"""
:param tweets: a iterable of tweets
:param kwargs: extra attributes to be considered for inclusion. should be json serializable.
:return:
"""
tl = cls()
json_tweets = json.loads(serializers.serialize("json", tweets));
for key, values in kwargs.items():
if len(values) != len(json_tweets):
continue
for tweet, value in zip(json_tweets, values):
tweet['fields'][key] = value
tl.save()
json_repr = {'metadata': metadata, 'tweets': json_tweets, 'pk': tl.pk, 'created_at': tl.datetime.isoformat()}
tl.json = json.dumps(json_repr)
tl.save()
return tl
def test_should_fail_missing_timestamp_in_body(self, bulk_processor):
events_resource = _init_resource(self)
events_resource._processor = bulk_processor
unit_test_patch = os.path.dirname(__file__)
json_file_path = 'event_template_json/req_simple_event.json'
patch_to_req_simple_event_file = os.path.join(unit_test_patch,
json_file_path)
with open(patch_to_req_simple_event_file, 'r') as fi:
events = json.load(fi)['events']
body = {'events': [events]}
self.simulate_request(
path=ENDPOINT,
method='POST',
headers={
'Content-Type': 'application/json',
'X_ROLES': 'monasca'
},
body=json.dumps(body)
)
self.assertEqual(falcon.HTTP_422, self.srmock.status)
def doAudit(user, path, kwargs, responsetime, statuscode, result, tags):
client = getClient('system')
audit = client.audit.new()
audit.user = user
audit.call = path
audit.statuscode = statuscode
audit.tags = tags
audit.args = json.dumps([]) # we dont want to log self
auditkwargs = kwargs.copy()
auditkwargs.pop('ctx', None)
audit.kwargs = json.dumps(auditkwargs)
try:
if not isinstance(result, types.GeneratorType):
audit.result = json.dumps(result)
else:
audit.result = json.dumps('Result of type generator')
except:
audit.result = json.dumps('binary data')
audit.responsetime = responsetime
client.audit.set(audit)
def authenticate(self, type='', **kwargs):
cache = j.clients.redis.getByInstance('system')
if j.core.portal.active.force_oauth_instance and not type:
type = j.core.portal.active.force_oauth_instance
if not type:
type = 'github'
ctx = kwargs['ctx']
referer = ctx.env.get('HTTP_REFERER', '/')
redirect = kwargs.get('redirect', referer)
client = j.clients.oauth.get(instance=type)
cache_data = json.dumps({'type': type, 'redirect': redirect})
cache.set(client.state, cache_data, ex=600)
ctx.start_response('302 Found', [('Location', client.url)])
return 'OK'
def test_delete_valid(self, init_db, client, headers, headers_without_content_type):
client = await client
body = [{
'store_id': 1,
'name': 'Placement Test',
'variations': [{
'name': 'Var 1',
'_operation': 'insert',
'slots': [{'id': 1}]
}]
}]
resp = await client.post('/placements/', headers=headers, data=ujson.dumps(body))
obj = (await resp.json())[0]
resp = await client.get('/placements/{}/'.format(obj['small_hash']), headers=headers_without_content_type)
assert resp.status == 200
resp = await client.delete('/placements/{}/'.format(obj['small_hash']), headers=headers_without_content_type)
assert resp.status == 204
resp = await client.get('/placements/{}/'.format(obj['small_hash']), headers=headers_without_content_type)
assert resp.status == 404
def test_get_items_not_found(self, init_db, client, headers, headers_without_content_type):
body = [{
'store_id': 1,
'name': 'Placement Test',
'variations': [{
'_operation': 'insert',
'name': 'Var 1',
'slots': [{'id': 2}]
}]
}]
client = await client
resp = await client.post('/placements/', headers=headers, data=ujson.dumps(body))
obj = (await resp.json())[0]
resp = await client.get('/placements/{}/items'.format(obj['small_hash']), headers=headers_without_content_type)
assert resp.status == 404
def test_patch(self, init_db, client, headers):
client = await client
body = [{
'name': 'test',
'class_module': 'tests.integration.fixtures',
'class_name': 'EngineStrategyTest'
}]
resp = await client.post('/engine_strategies/', headers=headers, data=ujson.dumps(body))
obj = (await resp.json())[0]
body = {
'name': 'test2'
}
resp = await client.patch('/engine_strategies/1/', headers=headers, data=ujson.dumps(body))
obj['name'] = 'test2'
assert resp.status == 200
assert (await resp.json()) == obj
def test_delete_valid(self, init_db, client, headers, headers_without_content_type):
client = await client
body = [{
'name': 'test',
'class_module': 'tests.integration.fixtures',
'class_name': 'EngineStrategyTest'
}]
resp = await client.post('/engine_strategies/', headers=headers, data=ujson.dumps(body))
resp = await client.get('/engine_strategies/1/', headers=headers_without_content_type)
assert resp.status == 200
resp = await client.delete('/engine_strategies/1/', headers=headers_without_content_type)
assert resp.status == 204
resp = await client.get('/engine_strategies/1/', headers=headers_without_content_type)
assert resp.status == 404
def test_get(self, init_db, headers, headers_without_content_type, client):
client = await client
body = [{
'name': 'test',
'stores': [{'id': 1}],
'schema': {'properties': {'test': {'type': 'string'}}, 'type': 'object', 'id_names': ['test']}
}]
await client.post('/item_types/', headers=headers, data=ujson.dumps(body))
body[0]['id'] = 1
body[0]['available_filters'] = [{'name': 'test', 'schema': {'type': 'string'}}]
body[0]['stores'] = [{
'configuration': {'data_path': '/test'},
'country': 'test',
'id': 1,
'name': 'test'
}]
body[0]['store_items_class'] = None
resp = await client.get('/item_types/?stores=id:1', headers=headers_without_content_type)
assert resp.status == 200
assert await resp.json() == body
def test_delete(self, init_db, headers, headers_without_content_type, client):
client = await client
body = [{
'name': 'test',
'stores': [{'id': 1}],
'schema': {'properties': {'test': {'type': 'string'}}, 'type': 'object', 'id_names': ['test']}
}]
resp = await client.post('/item_types/', headers=headers, data=ujson.dumps(body))
resp = await client.get('/item_types/1/', headers=headers_without_content_type)
assert resp.status == 200
resp = await client.delete('/item_types/1/', headers=headers_without_content_type)
assert resp.status == 204
resp = await client.get('/item_types/1/', headers=headers_without_content_type)
assert resp.status == 404
def test_get(self, init_db, headers, headers_without_content_type, client):
client = await client
body = [{
'name': 'test',
'stores': [{'id': 1}],
'schema': {'properties': {'test': {'type': 'string'}}, 'type': 'object', 'id_names': ['test']}
}]
await client.post('/item_types/', headers=headers, data=ujson.dumps(body))
resp = await client.get('/item_types/1/', headers=headers_without_content_type)
body[0]['id'] = 1
body[0]['available_filters'] = [{'name': 'test', 'schema': {'type': 'string'}}]
body[0]['stores'] = [{
'configuration': {'data_path': '/test'},
'country': 'test',
'id': 1,
'name': 'test'
}]
body[0]['store_items_class'] = None
assert resp.status == 200
assert await resp.json() == body[0]
def test_items_post_invalid(self, init_db, headers, client):
client = await client
body = [{
'name': 'test',
'stores': [{'id': 1}],
'schema': {'properties': {'id': {'type': 'string'}}, 'type': 'object', 'id_names': ['id']}
}]
await client.post('/item_types/', headers=headers, data=ujson.dumps(body))
body = [{'id': 1}]
resp = await client.post('/item_types/1/items?store_id=1', headers=headers, data=ujson.dumps(body))
assert resp.status == 400
assert await resp.json() == {
'instance': 1,
'message': "1 is not of type 'string'. "\
"Failed validating instance['0']['id'] "\
"for schema['items']['properties']['id']['type']",
'schema': {'type': 'string'}
}
def test_items_patch_invalid(self, init_db, headers, client):
client = await client
body = [{
'name': 'test',
'stores': [{'id': 1}],
'schema': {
'properties': {
'id': {'type': 'string'}
},
'type': 'object',
'id_names': ['id']
}
}]
await client.post('/item_types/', headers=headers, data=ujson.dumps(body))
body = [{'id': 1}]
resp = await client.post('/item_types/1/items?store_id=1', headers=headers, data=ujson.dumps(body))
assert resp.status == 400
assert await resp.json() == {
'instance': 1,
'message': "1 is not of type 'string'. "\
"Failed validating instance['0']['id'] "\
"for schema['items']['properties']['id']['type']",
'schema': {'type': 'string'}
}
def test_if_items_patch_updates_stock_filter(self, init_db, headers, redis, session, client, api):
body = [{
'name': 'test',
'stores': [{'id': 1}],
'schema': {'properties': {'id': {'type': 'string'}}, 'type': 'object', 'id_names': ['id']}
}]
client = await client
await client.post('/item_types/', headers=headers, data=ujson.dumps(body))
body = [{'id': 'test'}]
resp = await client.post('/item_types/1/items?store_id=1', headers=headers, data=ujson.dumps(body))
assert resp.status == 201
test_model = _all_models['store_items_test_1']
await ItemsIndicesMap(test_model).update(session)
body = [{'id': 'test', '_operation': 'delete'}]
resp = await client.patch('/item_types/1/items?store_id=1', headers=headers, data=ujson.dumps(body))
stock_filter = np.fromstring(await redis.get('store_items_test_1_stock_filter'), dtype=np.bool).tolist()
assert stock_filter == [False]
def test_post(self, init_db, client, headers, headers_without_content_type):
client = await client
body = [{
'name': 'Top Seller Object Test',
'type': 'top_seller_array',
'configuration': {'days_interval': 7},
'store_id': 1,
'item_type_id': 1,
'strategy_id': 1
}]
resp = await client.post('/engine_objects/', headers=headers, data=ujson.dumps(body))
resp_json = (await resp.json())
body[0]['id'] = 2
body[0]['store'] = resp_json[0]['store']
body[0]['strategy'] = resp_json[0]['strategy']
body[0]['item_type'] = resp_json[0]['item_type']
assert resp.status == 201
assert resp_json == body