def json_compat_obj_encode(data_type, obj, caller_permissions=None, alias_validators=None,
old_style=False, for_msgpack=False, should_redact=False):
"""Encodes an object into a JSON-compatible dict based on its type.
Args:
data_type (Validator): Validator for obj.
obj (object): Object to be serialized.
caller_permissions (list): The list of raw-string caller permissions
with which to serialize.
Returns:
An object that when passed to json.dumps() will produce a string
giving the JSON-encoded object.
See json_encode() for additional information about validation.
"""
serializer = StoneToPythonPrimitiveSerializer(
caller_permissions, alias_validators, for_msgpack, old_style, should_redact)
return serializer.encode(data_type, obj)
# --------------------------------------------------------------
# JSON Decoder
python类dumps()的实例源码
def json_compat_obj_encode(
data_type, obj, alias_validators=None, old_style=False,
for_msgpack=False):
"""Encodes an object into a JSON-compatible dict based on its type.
Args:
data_type (Validator): Validator for obj.
obj (object): Object to be serialized.
Returns:
An object that when passed to json.dumps() will produce a string
giving the JSON-encoded object.
See json_encode() for additional information about validation.
"""
serializer = StoneToPythonPrimitiveSerializer(alias_validators, for_msgpack, old_style)
return serializer.encode(data_type, obj)
# --------------------------------------------------------------
# JSON Decoder
def encrypt(self, data, key, path=None, algorithm=None):
"""
Encrypts data in a form ready to ship to the storage layer.
:param bytes data: Data to encrypt
:param bytes key: Data encryption key to use when encrypting
:param tuple(str) path: Path to the data (to be able to share
sub-paths). If None, encrypted with just our pubkey.
If contains only 1 element or is a string, this is just used as a
unique identifier w/o granular encryption.
:param dict algorithm: Algorithm parameters (name, curve, re-encryption
type, m/n etc). None if default
:return: Encrypted data
:rtype: bytes
"""
ciphertext = msgpack.dumps(self.keyring.encrypt(data, data_key))
# Derive keys and encrypt them
# TODO: https://github.com/nucypher/nucypher-kms/issues/33
if path is not None:
enc_keys = self.encrypt_key(data_key, path=path)
else:
enc_keys = [self.encrypt_key(data_key, path=path)]
return storage_data
def send(self, topic, payload):
'''Send a message with topic, payload
`
Topic is a unicode string. It will be sent as utf-8 encoded byte array.
Payload is a python dict. It will be sent as a msgpack serialized dict.
If payload has the key '__raw_data__'
we pop if of the payload and send its raw contents as extra frames
everything else need to be serializable
the contents of the iterable in '__raw_data__'
require exposing the pyhton memoryview interface.
'''
if '__raw_data__' not in payload:
self.socket.send_string(topic, flags=zmq.SNDMORE)
self.socket.send(serializer.dumps(payload, use_bin_type=True))
else:
extra_frames = payload.pop('__raw_data__')
assert(isinstance(extra_frames, (list, tuple)))
self.socket.send_string(topic, flags=zmq.SNDMORE)
self.socket.send(serializer.dumps(payload), flags=zmq.SNDMORE)
for frame in extra_frames[:-1]:
self.socket.send(frame, flags=zmq.SNDMORE, copy=True)
self.socket.send(extra_frames[-1], copy=True)
def update(self):
# ? ???? ?? drained???? test???.
if not self._prev_drained:
node = self._node
predecessors_status = [v.runner.status for v in node.graph.predecessors(node)]
prev_drained_test = predecessors_status.count(DRAINED) == len(predecessors_status)
if prev_drained_test:
self._prev_drained = True
if not self._sub_drained:
conn1_finished_test = self._conn1_send_count == sum(self._conn1_recv_count.values())
conn2_finished_test = self._conn2_recv_count == sum(self._conn2_send_count.values())
retry_test = self._retry_count >= 3
if conn1_finished_test and conn2_finished_test and retry_test:
self._sub_drained = True
for _ in range(self._nproc):
self._socket1.send(msgpack.dumps(b'END'))
if self._prev_drained and self._sub_drained:
self.status = DRAINED
else:
self.status = READY
def reset(self):
self.status = READY
context = zmq.Context()
self._socket = context.socket(zmq.PULL)
self._socket.RCVTIMEO = 1
sync_socket = context.socket(zmq.PUSH)
while self._ports['conn1'] is None or self._ports['sync_conn1'] is None:
sleep(0.01)
# Handshake with main process
self._socket.connect(self._address + ':' + str(self._ports['conn1']))
sync_socket.connect(self._address + ':' + str(self._ports['sync_conn1']))
packet = msgpack.dumps(b'SYNC')
sync_socket.send(packet)
sync_socket.close()
self._num_recv = 0
self._drained = False
def __init__(self, socket, sflags=0):
self.sflags = sflags
self.socket = socket
self.dumps = pickledumps
self.loads = pickle.loads
if sflags & SFLAG_MSGPACK:
if not msgpack:
raise Exception('Missing "msgpack" python module ( http://visi.kenshoto.com/viki/Msgpack )')
def msgpackloads(b):
return msgpack.loads(b, **loadargs)
def msgpackdumps(b):
return msgpack.dumps(b, **dumpargs)
self.dumps = msgpackdumps
self.loads = msgpackloads
if sflags & SFLAG_JSON:
self.dumps = jsondumps
self.loads = jsonloads
def sendMessage(self, mtype, objname, data):
"""
Send message is responsable for transmission of cobra messages,
and socket reconnection in the event that the send fails for network
reasons.
"""
# NOTE: for errors while using msgpack, we must send only the str
if mtype == COBRA_ERROR and self.sflags & (SFLAG_MSGPACK | SFLAG_JSON):
data = str(data)
try:
buf = self.dumps(data)
except Exception as e:
raise CobraPickleException("The arguments/attributes must be serializable: %s" % e)
objname = toUtf8(objname)
self.sendExact(struct.pack("<III", mtype, len(objname), len(buf)) + objname + buf)
def json_compat_obj_encode(
data_type, obj, alias_validators=None, old_style=False,
for_msgpack=False):
"""Encodes an object into a JSON-compatible dict based on its type.
Args:
data_type (Validator): Validator for obj.
obj (object): Object to be serialized.
Returns:
An object that when passed to json.dumps() will produce a string
giving the JSON-encoded object.
See json_encode() for additional information about validation.
"""
serializer = StoneToPythonPrimitiveSerializer(alias_validators, for_msgpack, old_style)
return serializer.encode(data_type, obj)
# --------------------------------------------------------------
# JSON Decoder
def test_mainline(self, m_select):
m_select.side_effect = iter([
([self.sck], [], []),
([self.sck], [], []),
])
exp_msg = {MSG_KEY_TYPE: MSG_TYPE_STATUS,
MSG_KEY_STATUS: STATUS_RESYNC}
self.sck.recv.return_value = msgpack.dumps(exp_msg)
for _ in xrange(2):
msg_gen = self.reader.new_messages(timeout=1)
msg_type, msg = next(msg_gen)
self.assertEqual(msg_type, MSG_TYPE_STATUS)
self.assertEqual(msg, exp_msg)
self.assertEqual(
self.sck.recv.mock_calls,
[
call(16384),
call(16384),
]
)
def test_retryable_error(self, m_select):
m_select.side_effect = iter([
([self.sck], [], []),
([self.sck], [], []),
([self.sck], [], []),
([self.sck], [], []),
])
errors = []
for no in [errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR]:
err = socket.error()
err.errno = no
errors.append(err)
exp_msg = {MSG_KEY_TYPE: MSG_TYPE_STATUS,
MSG_KEY_STATUS: STATUS_RESYNC}
self.sck.recv.side_effect = iter(errors + [msgpack.dumps(exp_msg)])
for _ in errors:
msg_gen = self.reader.new_messages(timeout=1)
self.assertRaises(StopIteration, next, msg_gen)
msg_gen = self.reader.new_messages(timeout=1)
msg_type, msg = next(msg_gen)
self.assertEqual(msg_type, MSG_TYPE_STATUS)
self.assertEqual(msg, exp_msg)
def send_message(self, msg_type, fields=None, flush=True):
"""
Send a message of the given type with the given fields.
Optionally, flush the data to the socket.
This method will flush the buffer if it grows too large in any
case.
:param msg_type: one of the MSG_TYPE_* constants.
:param dict fields: dict mapping MSG_KEY_* constants to values.
:param flush: True to force the data to be written immediately.
"""
msg = {MSG_KEY_TYPE: msg_type}
if fields:
msg.update(fields)
self._buf.write(msgpack.dumps(msg))
if flush:
self.flush()
else:
self._maybe_flush()
def json_compat_obj_encode(data_type, obj, caller_permissions=None, alias_validators=None,
old_style=False, for_msgpack=False, should_redact=False):
"""Encodes an object into a JSON-compatible dict based on its type.
Args:
data_type (Validator): Validator for obj.
obj (object): Object to be serialized.
caller_permissions (list): The list of raw-string caller permissions
with which to serialize.
Returns:
An object that when passed to json.dumps() will produce a string
giving the JSON-encoded object.
See json_encode() for additional information about validation.
"""
serializer = StoneToPythonPrimitiveSerializer(
caller_permissions, alias_validators, for_msgpack, old_style, should_redact)
return serializer.encode(data_type, obj)
# --------------------------------------------------------------
# JSON Decoder
def test_hmset_with_objects_and_without_ids_with_set_map_len_greater_than_chunks(self, model):
session = mock.MagicMock()
session.redis_bind.hkeys.return_value = ['2'.encode(), '1'.encode()]
model.CHUNKS = 1
expected_map1 = {
'1'.encode(): msgpack.dumps({'id': 1})
}
expected_map2 = {
'2'.encode(): msgpack.dumps({'id': 2})
}
assert model.update(session, [{'id': 1}, {'id': 2}]) == [{'id': 1}, {'id': 2}]
assert (session.redis_bind.hmset.call_args_list == [
mock.call('test', expected_map1),
mock.call('test', expected_map2)
] or session.redis_bind.hmset.call_args_list == [
mock.call('test', expected_map2),
mock.call('test', expected_map1)
])
def test_hmset_with_objects_and_with_ids_len_greater_than_chunks(self, model):
session = mock.MagicMock()
session.redis_bind.hkeys.return_value = ['2'.encode(), '1'.encode()]
model.CHUNKS = 1
expected_map1 = {
'1'.encode(): msgpack.dumps({'id': 1})
}
expected_map2 = {
'2'.encode(): msgpack.dumps({'id': 2})
}
assert model.update(session, [{'id': 1}, {'id': 2}], [{'id': 1}, {'id': 2}]) == [
{'id': 1}, {'id': 2}
]
assert (session.redis_bind.hmset.call_args_list == [
mock.call('test', expected_map1),
mock.call('test', expected_map2)
] or session.redis_bind.hmset.call_args_list == [
mock.call('test', expected_map2),
mock.call('test', expected_map1)
])
def test_if_istances_are_seted_on_redis_with_two_models_correctly(
self, session, model1, model2, redis):
session.add(model1(session, id=1))
session.add(model2(session, id=1))
session.add(model1(session, id=2))
session.add(model2(session, id=2))
session.commit()
expected = [
mock.call('test1', {b'1': msgpack.dumps({'id': 1}), b'2': msgpack.dumps({'id': 2})}),
mock.call('test2', {b'1': msgpack.dumps({'id': 1}), b'2': msgpack.dumps({'id': 2})})
]
assert len(expected) == len(redis.hmset.call_args_list)
for call_ in redis.hmset.call_args_list:
assert call_ in expected
def test_if_two_commits_sets_redis_with_two_models_correctly(
self, session, model1, model2, redis):
session.add(model1(session, id=1))
session.add(model2(session, id=1))
session.commit()
session.add(model1(session, id=2))
session.add(model2(session, id=2))
session.commit()
expected = [
mock.call('test1', {b'1': msgpack.dumps({'id': 1})}),
mock.call('test2', {b'1': msgpack.dumps({'id': 1})}),
mock.call('test1', {b'2': msgpack.dumps({'id': 2})}),
mock.call('test2', {b'2': msgpack.dumps({'id': 2})})
]
assert len(expected) == len(redis.hmset.call_args_list)
for call_ in redis.hmset.call_args_list:
assert call_ in expected
def insert(cls, session, objs, **kwargs):
input_ = deepcopy(objs)
objs = cls._to_list(objs)
ids_objs_map = dict()
counter = 0
for obj in objs:
obj = cls(obj)
obj_key = obj.get_key()
ids_objs_map[obj_key] = msgpack.dumps(obj)
counter += 1
if counter == cls.CHUNKS:
session.redis_bind.hmset(cls.__key__, ids_objs_map)
ids_objs_map = dict()
counter = 0
if ids_objs_map:
session.redis_bind.hmset(cls.__key__, ids_objs_map)
return objs
def _exec_hmset(self, insts):
models_keys_insts_keys_insts_map = defaultdict(dict)
models_keys_insts_keys_map = defaultdict(set)
for inst in insts:
model = type(inst)
if not model.__use_redis__:
continue
filters_names_set = self._get_filters_names_set(inst)
for filters_names in filters_names_set:
model_redis_key = type(model).get_key(model, filters_names.decode())
inst_redis_key = inst.get_key()
inst_old_redis_key = getattr(inst, 'old_redis_key', None)
if inst_old_redis_key is not None and inst_old_redis_key != inst_redis_key:
models_keys_insts_keys_map[model_redis_key].add(inst_old_redis_key)
models_keys_insts_keys_insts_map[model_redis_key][inst_redis_key] = msgpack.dumps(inst.todict())
for model_key, insts_keys_insts_map in models_keys_insts_keys_insts_map.items():
self.redis_bind.hmset(model_key, insts_keys_insts_map)
for model_key, insts_keys in models_keys_insts_keys_map.items():
self.redis_bind.hdel(model_key, *insts_keys)
def test_json_push(app):
req = Request.blank('/push')
req.method = 'POST'
req.content_type = 'application/json'
req.body = bytestr(json.dumps({'queue': 'normal', 'name': 'boo', 'args': [1, 2, 3]}))
res = req.get_response(app)
assert res.status_code == 200
assert app.manager.queue.get_queue('normal')
def test_msgpack_push(app):
req = Request.blank('/push')
req.method = 'POST'
req.content_type = 'application/x-msgpack'
req.body = msgpack.dumps({'queue': 'normal', 'name': 'boo', 'args': [1, 2, 3]})
res = req.get_response(app)
assert app.manager.queue.get_queue('normal')
def test_task_without_queue(app):
req = Request.blank('/push')
req.method = 'POST'
req.content_type = 'application/json'
req.body = bytestr(json.dumps({'name': 'boo', 'args': [1, 2, 3]}))
res = req.get_response(app)
assert res.status_code == 400
assert res.json == {'message': 'queue required', 'error': 'bad-params'}
def test_task_without_name(app):
req = Request.blank('/push')
req.method = 'POST'
req.content_type = 'application/json'
req.body = bytestr(json.dumps({'queue': 'boo'}))
res = req.get_response(app)
assert res.status_code == 400
assert res.json == {'message': 'name required', 'error': 'bad-params'}
def test_result_get(app):
@app.manager.task
def add(a, b):
return a + b
req = Request.blank('/push')
req.method = 'POST'
req.content_type = 'application/json'
req.body = bytestr(json.dumps({'queue': 'boo', 'name': 'add',
'args': (1, 2), 'keep_result': 100}))
res = req.get_response(app)
tid = res.json['id']
assert Request.blank('/result?id={}'.format(tid)).get_response(app).json == None
app.manager.process(app.manager.pop(['boo'], 1))
assert Request.blank('/result?id={}'.format(tid)).get_response(app).json == {'result': 3}
def push(self, queue, task, eta=None):
assert ':' not in queue, 'Queue name must not contain colon: "{}"'.format(queue)
body = dumps(task, use_bin_type=True) # TODO: may be better to move task packing to manager
if eta:
self.client.zadd(SCHEDULE_KEY, eta, sitem(queue, body))
else:
self.client.rpush(rqname(queue), body)
def set(self, id, value, ttl):
self.client.set(id, dumps(value, use_bin_type=True), ttl)
def encode(self, validator, value):
return json.dumps(super(StoneToJsonSerializer, self).encode(validator, value))
# --------------------------------------------------------------
# JSON Encoder
#
# These interfaces are preserved for backward compatibility and symmetry with deserialization
# functions.
def msgpack_encode(data_type, obj):
return msgpack.dumps(
msgpack_compat_obj_encode(data_type, obj), encoding='utf-8')
def pack(tag, msg):
'''
tags look like '/namespace/type'
msg is a dictionary containing the message to be processed
'''
return msgpack.dumps({'tag': tag, 'data': msg})
def encode(self, validator, value):
return json.dumps(super(StoneToJsonSerializer, self).encode(validator, value))
# --------------------------------------------------------------
# JSON Encoder
#
# These interfaces are preserved for backward compatibility and symmetry with deserialization
# functions.