def publish_payload(self, payload, topic=''):
"""
This method will publish a python_banyan payload and its associated topic
:param payload: Protocol message to be published
:param topic: A string value
"""
if not type(topic) is str:
if sys.version_info[0] < 3:
raise AttributeError('Publish topic must be python_banyan string', 'topic')
else:
raise TypeError('Publish topic must be python_banyan string', 'topic')
# create python_banyan message pack payload
if self.numpy:
message = msgpack.packb(payload, default=m.encode)
else:
message = umsgpack.packb(payload)
pub_envelope = topic.encode()
self.publisher.send_multipart([pub_envelope, message])
python类packb()的实例源码
def _request(self, message):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(self.server_address)
sock.send(msgpack.packb(message, use_bin_type=True))
buff = bytes()
while True:
block = sock.recv(128)
if not block:
break
buff += block
resp = msgpack.unpackb(buff, encoding='utf-8')
sock.close()
if 'type' in resp and resp['type'] == 'redirect':
self.server_address = tuple(resp['leader'])
resp = self._request(message)
return resp
def test_simple_send_and_receive(self):
client = self._set_up_client()
payload = {'test': 'test_simple_send_receive'}
client.send_message_to_queue(
queue_key='test_simple_send_receive',
message=msgpack.packb(payload),
expiry=10,
capacity=10,
connection=client.get_connection('test_simple_send_receive'),
)
message = None
for i in range(3):
# Message will be on random server
message = message or client.get_connection('test_simple_send_receive').lpop('test_simple_send_receive')
self.assertIsNotNone(message)
self.assertEqual(payload, msgpack.unpackb(message, encoding='utf-8'))
def test_services_send_receive(self):
client = self._set_up_client(sentinel_services=['service1', 'service2', 'service3'])
payload = {'test': 'test_services_send_receive'}
client.send_message_to_queue(
queue_key='test_services_send_receive',
message=msgpack.packb(payload),
expiry=10,
capacity=10,
connection=client.get_connection('test_services_send_receive'),
)
message = None
for i in range(3):
# Message will be on random server
message = message or client.get_connection('test_services_send_receive').lpop('test_services_send_receive')
self.assertIsNotNone(message)
self.assertEqual(payload, msgpack.unpackb(message, encoding='utf-8'))
def test_no_hosts_send_receive(self):
client = SentinelRedisClient()
payload = {'test': 'test_no_hosts_send_receive'}
client.send_message_to_queue(
queue_key='test_no_hosts_send_receive',
message=msgpack.packb(payload),
expiry=10,
capacity=10,
connection=client.get_connection('test_no_hosts_send_receive'),
)
message = None
for i in range(3):
# Message will be on random server
message = message or client.get_connection('test_no_hosts_send_receive').lpop('test_no_hosts_send_receive')
self.assertIsNotNone(message)
self.assertEqual(payload, msgpack.unpackb(message, encoding='utf-8'))
def test_no_hosts_yields_single_default_host(self):
client = StandardRedisClient()
payload = {'test': 'test_no_hosts_yields_single_default_host'}
client.send_message_to_queue(
queue_key='test_no_hosts_yields_single_default_host',
message=msgpack.packb(payload),
expiry=10,
capacity=10,
connection=client.get_connection('test_no_hosts_yields_single_default_host'),
)
message = client.get_connection(
'test_no_hosts_yields_single_default_host',
).lpop('test_no_hosts_yields_single_default_host')
self.assertIsNotNone(message)
self.assertEqual(payload, msgpack.unpackb(message, encoding='utf-8'))
def test_string_host_yields_single_host(self):
client = StandardRedisClient(hosts=['redis://localhost:1234/0'])
payload = {'test': 'test_string_host_yields_single_host'}
client.send_message_to_queue(
queue_key='test_string_host_yields_single_host',
message=msgpack.packb(payload),
expiry=10,
capacity=10,
connection=client.get_connection('test_string_host_yields_single_host'),
)
message = client.get_connection(
'test_string_host_yields_single_host',
).lpop('test_string_host_yields_single_host')
self.assertIsNotNone(message)
self.assertEqual(payload, msgpack.unpackb(message, encoding='utf-8'))
def transceiver(self, payload):
"""Sends and receives messages.
:param payload: A dict representing the message to send.
:returns: A string representing the unpacked response.
"""
packed = msgpack.packb(payload)
await self.socket.send_multipart([packed])
if self.response_timeout:
if not await self.poller.poll(self.response_timeout * 1000):
raise IOError('Timeout while waiting for server response')
rep = await self.socket.recv()
return self.check_and_return(msgpack.unpackb(rep, encoding='utf-8'))
def full_req_transceiver(zmq_url, data):
"""Used to send data and close connection.
:param zmq_url: URL for the socket to connect to.
:param data: The data to send.
:returns: The unpacked response.
"""
# TODO: Harden this
# TODO: Add linger and POLLIN support : https://github.com/zeromq/pyzmq/issues/132
ctx, socket = get_ctx_and_connect_req_socket(zmq_url)
packed = msgpack.packb(data)
socket.send_multipart([packed])
rep = socket.recv()
unpacked_rep = msgpack.unpackb(rep, encoding='utf-8')
socket.close()
ctx.term()
return unpacked_rep
def transceiver(self, payload):
"""Sends and receives messages.
:param payload: A dict representing the message to send.
:returns: A string representing the unpacked response.
"""
# TODO: Harden this
# TODO: Add linger and POLLIN support :
# https://github.com/zeromq/pyzmq/issues/132
packed = msgpack.packb(payload)
# blocks
self.socket.send_multipart([packed])
if self.response_timeout:
if not self.poller.poll(self.response_timeout * 1000):
raise IOError('Timeout while waiting for server response')
# blocks
rep = self.socket.recv()
return self.check_and_return(msgpack.unpackb(rep, encoding='utf-8'))
def test_unpacker_hook_refcnt():
result = []
def hook(x):
result.append(x)
return x
basecnt = sys.getrefcount(hook)
up = Unpacker(object_hook=hook, list_hook=hook)
assert sys.getrefcount(hook) >= basecnt + 2
up.feed(packb([{}]))
up.feed(packb([{}]))
assert up.unpack() == [{}]
assert up.unpack() == [{}]
assert result == [{}, [{}], {}, [{}]]
del up
assert sys.getrefcount(hook) == basecnt
def test_unpacker_ext_hook():
class MyUnpacker(Unpacker):
def __init__(self):
super(MyUnpacker, self).__init__(ext_hook=self._hook,
encoding='utf-8')
def _hook(self, code, data):
if code == 1:
return int(data)
else:
return ExtType(code, data)
unpacker = MyUnpacker()
unpacker.feed(packb({'a': 1}, encoding='utf-8'))
assert unpacker.unpack() == {'a': 1}
unpacker.feed(packb({'a': ExtType(1, b'123')}, encoding='utf-8'))
assert unpacker.unpack() == {'a': 123}
unpacker.feed(packb({'a': ExtType(2, b'321')}, encoding='utf-8'))
assert unpacker.unpack() == {'a': ExtType(2, b'321')}
def test_ext():
def check(ext, packed):
assert packb(ext) == packed
assert unpackb(packed) == ext
check(ExtType(0x42, b'Z'), b'\xd4\x42Z') # fixext 1
check(ExtType(0x42, b'ZZ'), b'\xd5\x42ZZ') # fixext 2
check(ExtType(0x42, b'Z'*4), b'\xd6\x42' + b'Z'*4) # fixext 4
check(ExtType(0x42, b'Z'*8), b'\xd7\x42' + b'Z'*8) # fixext 8
check(ExtType(0x42, b'Z'*16), b'\xd8\x42' + b'Z'*16) # fixext 16
# ext 8
check(ExtType(0x42, b''), b'\xc7\x00\x42')
check(ExtType(0x42, b'Z'*255), b'\xc7\xff\x42' + b'Z'*255)
# ext 16
check(ExtType(0x42, b'Z'*256), b'\xc8\x01\x00\x42' + b'Z'*256)
check(ExtType(0x42, b'Z'*0xffff), b'\xc8\xff\xff\x42' + b'Z'*0xffff)
# ext 32
check(ExtType(0x42, b'Z'*0x10000), b'\xc9\x00\x01\x00\x00\x42' + b'Z'*0x10000)
# needs large memory
#check(ExtType(0x42, b'Z'*0xffffffff),
# b'\xc9\xff\xff\xff\xff\x42' + b'Z'*0xffffffff)
def test_extension_type():
def default(obj):
print('default called', obj)
if isinstance(obj, array.array):
typecode = 123 # application specific typecode
data = obj.tostring()
return ExtType(typecode, data)
raise TypeError("Unknown type object %r" % (obj,))
def ext_hook(code, data):
print('ext_hook called', code, data)
assert code == 123
obj = array.array('d')
obj.fromstring(data)
return obj
obj = [42, b'hello', array.array('d', [1.1, 2.2, 3.3])]
s = msgpack.packb(obj, default=default)
obj2 = msgpack.unpackb(s, ext_hook=ext_hook)
assert obj == obj2
def _runtest(format, nbytes, expected_header, expected_prefix, use_bin_type):
# create a new array
original_array = array(format)
original_array.fromlist([255] * (nbytes // original_array.itemsize))
original_data = get_data(original_array)
view = make_memoryview(original_array)
# pack, unpack, and reconstruct array
packed = packb(view, use_bin_type=use_bin_type)
unpacked = unpackb(packed)
reconstructed_array = make_array(format, unpacked)
# check that we got the right amount of data
assert len(original_data) == nbytes
# check packed header
assert packed[:1] == expected_header
# check packed length prefix, if any
assert packed[1:1+len(expected_prefix)] == expected_prefix
# check packed data
assert packed[1+len(expected_prefix):] == original_data
# check array unpacked correctly
assert original_array == reconstructed_array
def handle_slave_send(socket, address, req):
message = req['data']
message_id = message.get('message_id', '?')
message['to_slave'] = True
try:
runtime = send_funcs['message_send_enqueue'](message)
response = 'OK'
access_logger.info('Message (ID %s) from master %s queued successfully', message_id, address)
except Exception:
response = 'FAIL'
logger.exception('Queueing message (ID %s) from master %s failed.')
access_logger.error('Failed queueing message (ID %s) from master %s: %s', message_id, address, runtime)
metrics.incr('slave_message_send_fail_cnt')
socket.sendall(msgpack.packb(response))
def detect( self, msg ):
event_ids = msg.data[ 'msg_ids' ]
category = msg.data[ 'cat' ]
source = msg.data[ 'source' ]
why = msg.data[ 'summary' ]
detect = base64.b64encode( msgpack.packb( msg.data[ 'detect' ] ) )
detect_id = msg.data[ 'detect_id' ].upper()
oid = AgentId( source.split( ' / ' )[ 0 ] ).org_id
try:
self.db.execute_async( self.report_stmt_rep.bind( ( detect_id, source, category, ' / '.join( event_ids ), detect, why, self.getOrgTtl( oid ) ) ) )
for s in source.split( ' / ' ):
self.db.execute_async( self.report_stmt_tl.bind( ( AgentId( s ).org_id, detect_id, self.getOrgTtl( oid ) ) ) )
except:
import traceback
self.logCritical( 'Exc storing detect %s / %s' % ( str( msg.data ), traceback.format_exc() ) )
self.outputs.shoot( 'report_detect', msg.data )
if 0 != len( self.pageDest ):
self.paging.shoot( 'page', { 'to' : self.pageDest,
'msg' : json.dumps( msg.data[ 'detect' ], indent = 2 ),
'subject' : 'Detect: %s/%s' % ( category, source ) } )
return ( True, )
def __send_rcv(self, cmd, args, data):
"""
helper for ``send_rcv()``
"""
msg = msgpack.packb({
CMD_KW_CMD: cmd,
CMD_KW_ARGS: args,
CMD_KW_DATA: data
}, default=encode, use_bin_type=True)
log.debug("Sending %d bytes...", len(msg))
# Prefix message with protocol version
rsp = struct.pack('>I', PROTOCOL_VER)
# Prefix each message with a 4-byte length (network byte order)
rsp += struct.pack('>I', len(msg))
rsp += msg
self.__buffer.write(rsp)
# receive answer from server
return self._recv()
def start(self):
printD("streamserver: start")
self.running = True
while self.running:
frame = self.videostream.read()
serialized_data = msgpack.packb(frame, default=msgpack_numpy.encode)
# Write the length of the capture to the stream and flush to ensure it actually gets sent
data_len = len(serialized_data)
printD("data_len: %d" % data_len)
self.connection.write(struct.pack('<L', data_len))
self.connection.flush()
# Send the image data over the wire
self.connection.write(serialized_data)
self.connection.flush()
printD("send.")
sleep(0.001)
def call(self, request):
m = await self.request_data(EmailSendModel)
async with await self.sender.get_redis_conn() as redis:
group_key = f'group:{m.uid}'
v = await redis.incr(group_key)
if v > 1:
raise HTTPConflict(text=f'Send group with id "{m.uid}" already exists\n')
recipients_key = f'recipients:{m.uid}'
data = m.values(exclude={'recipients', 'from_address'})
data.update(
from_email=m.from_address.email,
from_name=m.from_address.name,
)
pipe = redis.pipeline()
pipe.lpush(recipients_key, *[msgpack.packb(r.values(), use_bin_type=True) for r in m.recipients])
pipe.expire(group_key, 86400)
pipe.expire(recipients_key, 86400)
await pipe.execute()
await self.sender.send_emails(recipients_key, **data)
logger.info('%s sending %d emails', m.company_code, len(m.recipients))
return Response(text='201 job enqueued\n', status=201)
def send_notification(self, name, change_type, change_info, directed_client=None):
"""Send an unsolicited notification to someone."""
# If the notification is directed, make sure it is directed at us
if directed_client is not None and self.client_id != directed_client:
return
notif_object = {'type': 'notification', 'operation': change_type, 'name': name}
if change_info is not None:
notif_object['payload'] = change_info
msg = msgpack.packb(notif_object)
try:
self.write_message(msg, binary=True)
except tornado.websocket.WebSocketClosedError:
pass
def test_msgs_recv():
def _recv_multipart():
m = Msg(id=msgpack.packb(1234), mtype=Msg.PING, token='token1234', data=[]).to_list()
return m
ctx = zmq.Context()
s = ctx.socket(zmq.REQ)
s.recv_multipart = _recv_multipart
m = Msg().recv(s)
assert msgpack.unpackb(m[0]) == 1234
assert m[1] == 'token1234'
assert m[2] == 'ping'
assert m[3] == '[]'
def render(self, data, media_type=None, renderer_context=None):
if not msgpack:
raise _import_error
return msgpack.packb(data, use_bin_type=True)
def test_redis_delay_task_decorator_invalid_function(event_loop, redis_instance):
import logging
logger = logging.getLogger("aiotasks")
class CustomLogger(logging.StreamHandler):
def __init__(self):
super(CustomLogger, self).__init__()
self.content = []
def emit(self, record):
self.content.append(record.msg)
custom = CustomLogger()
logger.addHandler(custom)
manager = build_manager(dsn=redis_instance, loop=event_loop)
async def run():
# Send an invalid task name
task_id = uuid.uuid4().hex
await manager._redis_poller.lpush(manager.task_list_name,
msgpack.packb(dict(task_id=task_id,
function="non_exist",
args=(),
kwargs={})))
manager.run()
await manager.wait(timeout=0.2, exit_on_finish=False, wait_timeout=0.1)
event_loop.run_until_complete(run())
manager.stop()
assert "No local task with name 'non_exist'" in custom.content
def test_memory_delay_task_decorator_invalid_function(event_loop):
import logging
logger = logging.getLogger("aiotasks")
class CustomLogger(logging.StreamHandler):
def __init__(self):
super(CustomLogger, self).__init__()
self.content = []
def emit(self, record):
self.content.append(record.msg)
custom = CustomLogger()
logger.addHandler(custom)
manager = build_manager(dsn="memory://", loop=event_loop)
async def run():
# Send an invalid task name
task_id = uuid.uuid4().hex
await manager._task_queue.put((manager.task_list_name,
msgpack.packb(dict(task_id=task_id,
function="non_exist",
args=[],
kwargs={}),
use_bin_type=True)))
manager.run()
await manager.wait(timeout=0.2, exit_on_finish=False, wait_timeout=0.1)
event_loop.run_until_complete(run())
manager.stop()
assert "No local task with name 'non_exist'" in custom.content
def build_subscribe_message(self, **kwargs) -> str:
return msgpack.packb(kwargs,
use_bin_type=True)
def response_msgpack(handler, response):
"""
Optional MSGPACK response.
Sets MSGPACK content type to given handler.
Packs response with MSGPACK.
:return: Bytes of MSGPACK packed response
:rtype: bytes
"""
handler.set_header('Content-Type', 'application/x-msgpack')
return msgpack.packb(response, default=json_serial)
def send(self, transport, message):
transport.sendto(msgpack.packb(message, use_bin_type=True,
default=extended_msgpack_serializer))
def send_peer(self, recipient, message):
if recipient != self.state.volatile['address']:
self.peer_transport.sendto(
msgpack.packb(message, use_bin_type=True), tuple(recipient))
def connection_made(self, transport):
self.transport = transport
if self.first_message:
transport.sendto(
msgpack.packb(self.first_message, use_bin_type=True))