def __init__(self, opts=None):
if opts is None:
self.opts = self.process_config(CONFIG_LOCATION)
else:
self.opts = opts
self.ctx = zmq.Context()
self.pub_socket = self.ctx.socket(zmq.PUB)
self.pub_socket.bind('tcp://127.0.0.1:2000')
self.loop = zmq.eventloop.IOLoop.instance()
self.pub_stream = zmq.eventloop.zmqstream.ZMQStream(self.pub_socket, self.loop)
# Now create PULL socket over IPC to listen to reactor
self.pull_socket = self.ctx.socket(zmq.PULL)
self.pull_socket.bind('ipc:///tmp/reactor.ipc')
self.pull_stream = zmq.eventloop.zmqstream.ZMQStream(self.pull_socket, self.loop)
self.pull_stream.on_recv(self.republish)
python类PUB的实例源码
def test_unicode_sockopts(self):
"""test setting/getting sockopts with unicode strings"""
topic = "tést"
if str is not unicode:
topic = topic.decode('utf8')
p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
self.assertEqual(s.send_unicode, s.send_unicode)
self.assertEqual(p.recv_unicode, p.recv_unicode)
self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)
identb = s.getsockopt(zmq.IDENTITY)
identu = identb.decode('utf16')
identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
self.assertEqual(identu, identu2)
time.sleep(0.1) # wait for connection/subscription
p.send_unicode(topic,zmq.SNDMORE)
p.send_unicode(topic*2, encoding='latin-1')
self.assertEqual(topic, s.recv_unicode())
self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_shadow(self):
ctx = self.Context()
ctx2 = self.Context.shadow(ctx.underlying)
self.assertEqual(ctx.underlying, ctx2.underlying)
s = ctx.socket(zmq.PUB)
s.close()
del ctx2
self.assertFalse(ctx.closed)
s = ctx.socket(zmq.PUB)
ctx2 = self.Context.shadow(ctx.underlying)
s2 = ctx2.socket(zmq.PUB)
s.close()
s2.close()
ctx.term()
self.assertRaisesErrno(zmq.EFAULT, ctx2.socket, zmq.PUB)
del ctx2
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def compose_message(message: bytes, topic: bytes,
serializer: AgentAddressSerializer) -> bytes:
"""
Compose a message and leave it ready to be sent through a socket.
This is used in PUB-SUB patterns to combine the topic and the message
in a single bytes buffer.
Parameters
----------
message
Message to be composed.
topic
Topic to combine the message with.
serializer
Serialization for the message part.
Returns
-------
The bytes representation of the final message to be sent.
"""
if serializer.requires_separator:
return topic + TOPIC_SEPARATOR + message
return topic + message
def test_agentchannel_sync_pub():
"""
Test basic SYNC_PUB AgentChannel operations: initialization, equivalence
and basic methods.
"""
sender = AgentAddress('ipc', 'addr0', 'PUB', 'server', 'pickle')
receiver = AgentAddress('ipc', 'addr0', 'PULL', 'server', 'pickle')
channel = AgentChannel('SYNC_PUB', sender=sender, receiver=receiver)
# Equivalence
assert channel == AgentChannel('SYNC_PUB', sender=sender,
receiver=receiver)
assert not channel == 'foo'
assert channel != 'foo'
# Basic methods
assert channel.twin() == AgentChannel('SYNC_SUB', sender=receiver.twin(),
receiver=sender.twin())
# Other attributes
assert hasattr(channel, 'uuid')
assert channel.transport == 'ipc'
assert channel.serializer == 'pickle'
def indicators_create(self, data):
if isinstance(data, dict):
data = Indicator(**data)
if isinstance(data, Indicator):
data = [data]
if self.socket_type == "PUB":
for i in data:
self.socket.send_multipart([self.topic, str(i)])
return
if self.socket_type == 'PUSH_ZYRE_GATEWAY':
for i in data:
self.socket.send_multipart(['PUB', self.topic, str(i)])
return
for i in data:
self.socket.send(str(i))
def __init__(self, repAddress, pubAddress):
"""Constructor"""
super(RpcServer, self).__init__()
# ??????????key?????value?????
self.__functions = {}
# zmq????
self.__context = zmq.Context()
self.__socketREP = self.__context.socket(zmq.REP) # ????socket
self.__socketREP.bind(repAddress)
self.__socketPUB = self.__context.socket(zmq.PUB) # ????socket
self.__socketPUB.bind(pubAddress)
# ??????
self.__active = False # ????????
self.__thread = threading.Thread(target=self.run) # ????????
#----------------------------------------------------------------------
def run(self):
self._loop = zmq.asyncio.ZMQEventLoop()
asyncio.set_event_loop(self._loop)
self.context = zmq.asyncio.Context()
self.status_sock = self.context.socket(zmq.ROUTER)
self.data_sock = self.context.socket(zmq.PUB)
self.status_sock.bind("tcp://*:%s" % self.status_port)
self.data_sock.bind("tcp://*:%s" % self.data_port)
self.poller = zmq.asyncio.Poller()
self.poller.register(self.status_sock, zmq.POLLIN)
self._loop.create_task(self.poll_sockets())
try:
self._loop.run_forever()
finally:
self.status_sock.close()
self.data_sock.close()
self.context.destroy()
test_zmq_pub_sub.py 文件源码
项目:integration-prototype
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_pub(self):
"""Publish log messages. bind() to PUB socket."""
# pylint: disable=E1101
context = zmq.Context()
pub = context.socket(zmq.PUB)
try:
pub.bind('tcp://*:{}'.format(self.sub_port))
except zmq.ZMQError as error:
print(error)
time.sleep(0.1)
send_count = self.send_count
for i in range(send_count):
pub.send_string('hi there {}'.format(i))
time.sleep(1e-5)
sys.stdout.flush()
# Wait for the watcher thread to exit.
while self.watcher.isAlive():
self.watcher.join(timeout=1e-5)
pub.close()
context.term()
test_zmq_pub_sub.py 文件源码
项目:integration-prototype
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_pub(self):
"""Publish log messages. connect() to PUB socket."""
# pylint: disable=E1101
context = zmq.Context()
pub = context.socket(zmq.PUB)
try:
_address = 'tcp://{}:{}'.format(self.sub_host, self.sub_port)
pub.connect(_address)
except zmq.ZMQError as error:
print('ERROR:', error)
time.sleep(0.1)
send_count = self.send_count
for i in range(send_count):
pub.send_string('hi there {}'.format(i))
time.sleep(1e-5)
# Wait for the watcher thread to exit
while self.watcher.isAlive():
self.watcher.join(timeout=1e-5)
pub.close()
context.term()
logging_handlers.py 文件源码
项目:integration-prototype
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 34
收藏 0
点赞 0
评论 0
def to(cls, channel, host='127.0.0.1',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
level=logging.NOTSET):
"""Convenience class method to create a ZmqLoghandler and
connect to a ZMQ subscriber.
Args:
channel (string): Logging channel name. This is used to build a
ZMQ topic.
host (string): Hostname / ip address of the subscriber to publish
to.
port (int, string): Port on which to publish messages.
level (int): Logging level
"""
context = zmq.Context()
publisher = context.socket(zmq.PUB)
address = 'tcp://{}:{}'.format(host, port)
publisher.connect(address)
time.sleep(0.1) # This sleep hopefully fixes the silent joiner problem.
return cls(channel, publisher, level=level)
def test_unicode_sockopts(self):
"""test setting/getting sockopts with unicode strings"""
topic = "tést"
if str is not unicode:
topic = topic.decode('utf8')
p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
self.assertEqual(s.send_unicode, s.send_unicode)
self.assertEqual(p.recv_unicode, p.recv_unicode)
self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)
identb = s.getsockopt(zmq.IDENTITY)
identu = identb.decode('utf16')
identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
self.assertEqual(identu, identu2)
time.sleep(0.1) # wait for connection/subscription
p.send_unicode(topic,zmq.SNDMORE)
p.send_unicode(topic*2, encoding='latin-1')
self.assertEqual(topic, s.recv_unicode())
self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_init_socket(self):
pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
logger = self.logger
handler = handlers.PUBHandler(pub)
handler.setLevel(logging.DEBUG)
handler.root_topic = self.topic
logger.addHandler(handler)
self.assertTrue(handler.socket is pub)
self.assertTrue(handler.ctx is pub.context)
self.assertTrue(handler.ctx is self.context)
sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
import time; time.sleep(0.1)
msg1 = 'message'
logger.info(msg1)
(topic, msg2) = sub.recv_multipart()
self.assertEqual(topic, b'zmq.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def test_unicode_sockopts(self):
"""test setting/getting sockopts with unicode strings"""
topic = "tést"
if str is not unicode:
topic = topic.decode('utf8')
p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
self.assertEqual(s.send_unicode, s.send_unicode)
self.assertEqual(p.recv_unicode, p.recv_unicode)
self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)
identb = s.getsockopt(zmq.IDENTITY)
identu = identb.decode('utf16')
identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
self.assertEqual(identu, identu2)
time.sleep(0.1) # wait for connection/subscription
p.send_unicode(topic,zmq.SNDMORE)
p.send_unicode(topic*2, encoding='latin-1')
self.assertEqual(topic, s.recv_unicode())
self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_shadow(self):
ctx = self.Context()
ctx2 = self.Context.shadow(ctx.underlying)
self.assertEqual(ctx.underlying, ctx2.underlying)
s = ctx.socket(zmq.PUB)
s.close()
del ctx2
self.assertFalse(ctx.closed)
s = ctx.socket(zmq.PUB)
ctx2 = self.Context.shadow(ctx.underlying)
s2 = ctx2.socket(zmq.PUB)
s.close()
s2.close()
ctx.term()
self.assertRaisesErrno(zmq.EFAULT, ctx2.socket, zmq.PUB)
del ctx2
def test_init_socket(self):
pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
logger = self.logger
handler = handlers.PUBHandler(pub)
handler.setLevel(logging.DEBUG)
handler.root_topic = self.topic
logger.addHandler(handler)
self.assertTrue(handler.socket is pub)
self.assertTrue(handler.ctx is pub.context)
self.assertTrue(handler.ctx is self.context)
sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
import time; time.sleep(0.1)
msg1 = 'message'
logger.info(msg1)
(topic, msg2) = sub.recv_multipart()
self.assertEqual(topic, b'zmq.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def test_unicode_sockopts(self):
"""test setting/getting sockopts with unicode strings"""
topic = "tést"
if str is not unicode:
topic = topic.decode('utf8')
p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
self.assertEqual(s.send_unicode, s.send_unicode)
self.assertEqual(p.recv_unicode, p.recv_unicode)
self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)
identb = s.getsockopt(zmq.IDENTITY)
identu = identb.decode('utf16')
identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
self.assertEqual(identu, identu2)
time.sleep(0.1) # wait for connection/subscription
p.send_unicode(topic,zmq.SNDMORE)
p.send_unicode(topic*2, encoding='latin-1')
self.assertEqual(topic, s.recv_unicode())
self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_shadow(self):
ctx = self.Context()
ctx2 = self.Context.shadow(ctx.underlying)
self.assertEqual(ctx.underlying, ctx2.underlying)
s = ctx.socket(zmq.PUB)
s.close()
del ctx2
self.assertFalse(ctx.closed)
s = ctx.socket(zmq.PUB)
ctx2 = self.Context.shadow(ctx.underlying)
s2 = ctx2.socket(zmq.PUB)
s.close()
s2.close()
ctx.term()
self.assertRaisesErrno(zmq.EFAULT, ctx2.socket, zmq.PUB)
del ctx2
def test_init_socket(self):
pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
logger = self.logger
handler = handlers.PUBHandler(pub)
handler.setLevel(logging.DEBUG)
handler.root_topic = self.topic
logger.addHandler(handler)
self.assertTrue(handler.socket is pub)
self.assertTrue(handler.ctx is pub.context)
self.assertTrue(handler.ctx is self.context)
sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
import time; time.sleep(0.1)
msg1 = 'message'
logger.info(msg1)
(topic, msg2) = sub.recv_multipart()
self.assertEqual(topic, b'zmq.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_shadow(self):
ctx = self.Context()
ctx2 = self.Context.shadow(ctx.underlying)
self.assertEqual(ctx.underlying, ctx2.underlying)
s = ctx.socket(zmq.PUB)
s.close()
del ctx2
self.assertFalse(ctx.closed)
s = ctx.socket(zmq.PUB)
ctx2 = self.Context.shadow(ctx.underlying)
s2 = ctx2.socket(zmq.PUB)
s.close()
s2.close()
ctx.term()
self.assertRaisesErrno(zmq.EFAULT, ctx2.socket, zmq.PUB)
del ctx2
def test_init_socket(self):
pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
logger = self.logger
handler = handlers.PUBHandler(pub)
handler.setLevel(logging.DEBUG)
handler.root_topic = self.topic
logger.addHandler(handler)
self.assertTrue(handler.socket is pub)
self.assertTrue(handler.ctx is pub.context)
self.assertTrue(handler.ctx is self.context)
sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
import time; time.sleep(0.1)
msg1 = 'message'
logger.info(msg1)
(topic, msg2) = sub.recv_multipart()
self.assertEqual(topic, b'zmq.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)