def msgpack_encode(data_type, obj):
return msgpack.dumps(
msgpack_compat_obj_encode(data_type, obj), encoding='utf-8')
python类dumps()的实例源码
def __setitem__(self, key, value):
with self.db.begin(write=True) as tx:
tx.put(key, msgpack.dumps(value))
def rekey(self, priv1, pub2):
priv_to = api.secure_random(self.KEY_SIZE)
rk = super(PRE, self).rekey(
convert_priv(priv1), convert_priv(priv_to), dtype=bytes)
epriv_to = self.encrypt(pub2, priv_to)
return msgpack.dumps([rk, epriv_to])
def reencrypt(self, rekey, emsg):
rk, epriv = msgpack.loads(rekey)
remsg = super(PRE, self).reencrypt(rk, emsg)
return msgpack.dumps([2, epriv, remsg]) # type 2 emsg
def interface_dht_value(self):
signature = self.seal(self.interface_hrac())
return b"uaddr" + signature + self.seal + self.interface_hrac() + msgpack.dumps(self.dht_interface_info())
def _write_header(self, header_path):
"""
Writes the msgpack dumped self.header dict to the file located at
`header_path`.
:param string/bytes header_path: The path to write the msgpack dumped
header to
"""
with open(header_path, mode='wb') as f:
try:
f.write(msgpack.dumps(self.header))
except ValueError as e:
raise e
def publish_treasure_map(self):
encrypted_treasure_map, signature_for_bob = self.alice.encrypt_for(self.bob,
self.treasure_map.packed_payload())
signature_for_ursula = self.alice.seal(self.hrac()) # TODO: Great use-case for Ciphertext class
# In order to know this is safe to propagate, Ursula needs to see a signature, our public key,
# and, reasons explained in treasure_map_dht_key above, the uri_hash.
dht_value = signature_for_ursula + self.alice.seal + self.hrac() + msgpack.dumps(
encrypted_treasure_map) # TODO: Ideally, this is a Ciphertext object instead of msgpack (see #112)
dht_key = self.treasure_map_dht_key()
setter = self.alice.server.set(dht_key, b"trmap" + dht_value)
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(setter)
return encrypted_treasure_map, dht_value, signature_for_bob, signature_for_ursula
def enact(self, networky_stuff):
for contract in self._accepted_contracts.values():
policy_payload = contract.encrypt_payload_for_ursula()
full_payload = self.alice.seal + msgpack.dumps(policy_payload)
response = networky_stuff.enact_policy(contract.ursula,
self.hrac(),
full_payload) # TODO: Parse response for confirmation.
# Assuming response is what we hope for
self.treasure_map.add_ursula(contract.ursula)
def packed_payload(self):
return msgpack.dumps(self.ids)
def payload(self):
pfrags_as_bytes = [bytes(p) for p in self.pfrags]
packed_receipt_and_pfrags = msgpack.dumps((self.receipt_bytes, msgpack.dumps(pfrags_as_bytes)))
return bytes(self.receipt_signature) + self.bob.seal + packed_receipt_and_pfrags
def __init__(self, data, safe=True,
json_dumps_params=None, **kwargs):
if safe and not isinstance(data, dict):
raise TypeError(
'In order to allow non-dict objects to be serialized set the '
'safe parameter to False.'
)
if json_dumps_params is None:
json_dumps_params = {}
kwargs.setdefault('content_type', 'application/json')
data = json.dumps(data, **json_dumps_params)
super().__init__(content=data, **kwargs)
def build_test_data(destdir):
l = get_test_data_list()
for i in range(len(l)):
# packer = msgpack.Packer()
serialized = msgpack.dumps(l[i])
f = open(os.path.join(destdir, str(i) + '.msgpack.golden'), 'wb')
f.write(serialized)
f.close()
serialized = cbor.dumps(l[i])
f = open(os.path.join(destdir, str(i) + '.cbor.golden'), 'wb')
f.write(serialized)
f.close()
def put(self, sample):
packet = msgpack.dumps(sample)
self._socket1.send(packet)
self._conn1_send_count += 1
def run(self):
def _run():
for sample in graph_run(self._node.graph, self._node, False):
self._num_send += 1
sample['__send_count__'] = self._num_send
packet = msgpack.dumps(sample)
self._socket.send(packet, copy=False)
processes = [mp.Process(target=_run) for _ in range(self._nproc)]
for p in processes:
p.start()
def build_test_data(destdir):
l = get_test_data_list()
for i in range(len(l)):
# packer = msgpack.Packer()
serialized = msgpack.dumps(l[i])
f = open(os.path.join(destdir, str(i) + '.msgpack.golden'), 'wb')
f.write(serialized)
f.close()
serialized = cbor.dumps(l[i])
f = open(os.path.join(destdir, str(i) + '.cbor.golden'), 'wb')
f.write(serialized)
f.close()
def build_test_data(destdir):
l = get_test_data_list()
for i in range(len(l)):
# packer = msgpack.Packer()
serialized = msgpack.dumps(l[i])
f = open(os.path.join(destdir, str(i) + '.msgpack.golden'), 'wb')
f.write(serialized)
f.close()
serialized = cbor.dumps(l[i])
f = open(os.path.join(destdir, str(i) + '.cbor.golden'), 'wb')
f.write(serialized)
f.close()
def msgpack_request(http_client):
default_client = http_client
async def call(url, data=None, client=None, accept='*/*'):
if not client:
client = default_client
method = client.post if data is not None else client.get
resp = await method(
url,
data=msgpack.dumps(data, use_bin_type=True),
headers={'content-type': 'application/msgpack', 'accept': accept})
return await http_response_decode(resp)
return call
def dumps(obj):
return msgpack.dumps(obj,use_bin_type=True)
def serialize(
data,
):
serialized_object = msgpack.dumps(data)
return serialized_object
def dumps(obj):
"""
Serialize an object.
Returns:
str
"""
return msgpack.dumps(obj, use_bin_type=True)