def test_dir(self):
ctx = self.Context()
s = ctx.socket(zmq.PUB)
self.assertTrue('send' in dir(s))
self.assertTrue('IDENTITY' in dir(s))
self.assertTrue('AFFINITY' in dir(s))
self.assertTrue('FD' in dir(s))
s.close()
ctx.term()
python类IDENTITY的实例源码
def test_identity(self):
s = self.context.socket(zmq.PULL)
self.sockets.append(s)
ident = b'identity\0\0'
s.identity = ident
self.assertEqual(s.get(zmq.IDENTITY), ident)
def test_dir(self):
ctx = self.Context()
s = ctx.socket(zmq.PUB)
self.assertTrue('send' in dir(s))
self.assertTrue('IDENTITY' in dir(s))
self.assertTrue('AFFINITY' in dir(s))
self.assertTrue('FD' in dir(s))
s.close()
ctx.term()
def test_identity(self):
s = self.context.socket(zmq.PULL)
self.sockets.append(s)
ident = b'identity\0\0'
s.identity = ident
self.assertEqual(s.get(zmq.IDENTITY), ident)
def test_init(self, mock_context):
mock_socket = MagicMock()
mock_receiver = MagicMock()
logger = MagicMock()
mock_context.return_value = mock_socket
mock_socket.socket.return_value = mock_receiver
ServerConnection("ip_address", 1025, logger)
mock_context.assert_called_once_with()
mock_socket.socket.assert_called_once_with(zmq.ROUTER)
mock_receiver.setsockopt.assert_called_once_with(zmq.IDENTITY, b"recodex-monitor")
mock_receiver.bind.assert_called_once_with("tcp://ip_address:1025")
def initialize(self):
self._context = zmq.Context()
self._tosock = self._context.socket(zmq.PUSH)
self._frsock = self._context.socket(zmq.DEALER)
self._tosock.setsockopt(zmq.IDENTITY, self.identity)
self._frsock.setsockopt(zmq.IDENTITY, self.identity)
self._tosock.set_hwm(2)
self._tosock.connect(self._conn_info[0])
self._frsock.connect(self._conn_info[1])
def _setup_ipc(self):
'''
Subscribe to the right topic
in the device IPC and publish to the
publisher proxy.
'''
self.ctx = zmq.Context()
# subscribe to device IPC
log.debug('Creating the dealer IPC for %s', self._name)
self.sub = self.ctx.socket(zmq.DEALER)
if six.PY2:
self.sub.setsockopt(zmq.IDENTITY, self._name)
elif six.PY3:
self.sub.setsockopt(zmq.IDENTITY, bytes(self._name, 'utf-8'))
try:
self.sub.setsockopt(zmq.HWM, self.opts['hwm'])
# zmq 2
except AttributeError:
# zmq 3
self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm'])
# subscribe to the corresponding IPC pipe
self.sub.connect(DEV_IPC_URL)
# self.sub.setsockopt(zmq.SUBSCRIBE, '')
# publish to the publisher IPC
self.pub = self.ctx.socket(zmq.PUSH)
self.pub.connect(PUB_IPC_URL)
try:
self.pub.setsockopt(zmq.HWM, self.opts['hwm'])
# zmq 2
except AttributeError:
# zmq 3
self.pub.setsockopt(zmq.SNDHWM, self.opts['hwm'])
def set_socket_option(self, name, option, value):
"""calls ``zmq.setsockopt`` on the given socket.
:param name: the name of the socket where data will pad through
:param option: the option from the ``zmq`` module
:param value:
Here are some examples of options:
* ``zmq.HWM``: Set high water mark
* ``zmq.AFFINITY``: Set I/O thread affinity
* ``zmq.IDENTITY``: Set socket identity
* ``zmq.SUBSCRIBE``: Establish message filter
* ``zmq.UNSUBSCRIBE``: Remove message filter
* ``zmq.SNDBUF``: Set kernel transmit buffer size
* ``zmq.RCVBUF``: Set kernel receive buffer size
* ``zmq.LINGER``: Set linger period for socket shutdown
* ``zmq.BACKLOG``: Set maximum length of the queue of outstanding connections
* for the full list go to ``http://api.zeromq.org/4-0:zmq-setsockopt``
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.create('pipe-in', zmq.PULL)
>>>
>>> # block after 10 messages are queued
>>> sockets.set_socket_option('pipe-in', zmq.HWM, 10)
"""
socket = self.get_by_name(name)
socket.setsockopt(option, value)
def create(self, name, socket_type):
"""Creates a named socket by type. Can raise a SocketAlreadyExists.
Returns the socket itself
:param name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
"""
if name in self.sockets:
raise SocketAlreadyExists(self, name)
self.sockets[name] = self.context.socket(socket_type)
self.set_socket_option(name, zmq.IDENTITY, str(uuid4()))
return self.get_by_name(name)
def run(self):
self.player = self._build_player()
self.ctx = zmq.Context()
self.c2s_socket = self.ctx.socket(zmq.PUSH)
self.c2s_socket.setsockopt(zmq.IDENTITY, self.identity)
self.c2s_socket.set_hwm(5)
self.c2s_socket.connect(self.pipe_c2s)
self._prepare()
for dp in self.get_data():
self.c2s_socket.send(dumps(dp), copy=False)
def start_dispatch_thread():
global INITED, DISPATCHER
if INITED:
return
DISPATCHER = zmq.devices.ThreadDevice(zmq.FORWARDER, zmq.XSUB, zmq.XPUB)
DISPATCHER.bind_in(INTERNAL_SOCKET)
DISPATCHER.connect_out(CHANGES_SOCKET)
DISPATCHER.setsockopt_in(zmq.IDENTITY, b'XSUB')
DISPATCHER.setsockopt_out(zmq.IDENTITY, b'XPUB')
DISPATCHER.start()
#Fix weird nosetests problems. TODO: find and fix underlying problem
sleep(0.01)
INITED = True
def test_tracker(self):
"test the MessageTracker object for tracking when zmq is done with a buffer"
addr = 'tcp://127.0.0.1'
a = self.context.socket(zmq.PUB)
port = a.bind_to_random_port(addr)
a.close()
iface = "%s:%i"%(addr,port)
a = self.context.socket(zmq.PAIR)
# a.setsockopt(zmq.IDENTITY, b"a")
b = self.context.socket(zmq.PAIR)
self.sockets.extend([a,b])
a.connect(iface)
time.sleep(0.1)
p1 = a.send(b'something', copy=False, track=True)
self.assertTrue(isinstance(p1, zmq.MessageTracker))
self.assertFalse(p1.done)
p2 = a.send_multipart([b'something', b'else'], copy=False, track=True)
self.assert_(isinstance(p2, zmq.MessageTracker))
self.assertEqual(p2.done, False)
self.assertEqual(p1.done, False)
b.bind(iface)
msg = b.recv_multipart()
for i in range(10):
if p1.done:
break
time.sleep(0.1)
self.assertEqual(p1.done, True)
self.assertEqual(msg, [b'something'])
msg = b.recv_multipart()
for i in range(10):
if p2.done:
break
time.sleep(0.1)
self.assertEqual(p2.done, True)
self.assertEqual(msg, [b'something', b'else'])
m = zmq.Frame(b"again", track=True)
self.assertEqual(m.tracker.done, False)
p1 = a.send(m, copy=False)
p2 = a.send(m, copy=False)
self.assertEqual(m.tracker.done, False)
self.assertEqual(p1.done, False)
self.assertEqual(p2.done, False)
msg = b.recv_multipart()
self.assertEqual(m.tracker.done, False)
self.assertEqual(msg, [b'again'])
msg = b.recv_multipart()
self.assertEqual(m.tracker.done, False)
self.assertEqual(msg, [b'again'])
self.assertEqual(p1.done, False)
self.assertEqual(p2.done, False)
pm = m.tracker
del m
for i in range(10):
if p1.done:
break
time.sleep(0.1)
self.assertEqual(p1.done, True)
self.assertEqual(p2.done, True)
m = zmq.Frame(b'something', track=False)
self.assertRaises(ValueError, a.send, m, copy=False, track=True)
def test_tracker(self):
"test the MessageTracker object for tracking when zmq is done with a buffer"
addr = 'tcp://127.0.0.1'
a = self.context.socket(zmq.PUB)
port = a.bind_to_random_port(addr)
a.close()
iface = "%s:%i"%(addr,port)
a = self.context.socket(zmq.PAIR)
# a.setsockopt(zmq.IDENTITY, b"a")
b = self.context.socket(zmq.PAIR)
self.sockets.extend([a,b])
a.connect(iface)
time.sleep(0.1)
p1 = a.send(b'something', copy=False, track=True)
self.assertTrue(isinstance(p1, zmq.MessageTracker))
self.assertFalse(p1.done)
p2 = a.send_multipart([b'something', b'else'], copy=False, track=True)
self.assert_(isinstance(p2, zmq.MessageTracker))
self.assertEqual(p2.done, False)
self.assertEqual(p1.done, False)
b.bind(iface)
msg = b.recv_multipart()
for i in range(10):
if p1.done:
break
time.sleep(0.1)
self.assertEqual(p1.done, True)
self.assertEqual(msg, [b'something'])
msg = b.recv_multipart()
for i in range(10):
if p2.done:
break
time.sleep(0.1)
self.assertEqual(p2.done, True)
self.assertEqual(msg, [b'something', b'else'])
m = zmq.Frame(b"again", track=True)
self.assertEqual(m.tracker.done, False)
p1 = a.send(m, copy=False)
p2 = a.send(m, copy=False)
self.assertEqual(m.tracker.done, False)
self.assertEqual(p1.done, False)
self.assertEqual(p2.done, False)
msg = b.recv_multipart()
self.assertEqual(m.tracker.done, False)
self.assertEqual(msg, [b'again'])
msg = b.recv_multipart()
self.assertEqual(m.tracker.done, False)
self.assertEqual(msg, [b'again'])
self.assertEqual(p1.done, False)
self.assertEqual(p2.done, False)
pm = m.tracker
del m
for i in range(10):
if p1.done:
break
time.sleep(0.1)
self.assertEqual(p1.done, True)
self.assertEqual(p2.done, True)
m = zmq.Frame(b'something', track=False)
self.assertRaises(ValueError, a.send, m, copy=False, track=True)
def test_tracker(self):
"test the MessageTracker object for tracking when zmq is done with a buffer"
addr = 'tcp://127.0.0.1'
a = self.context.socket(zmq.PUB)
port = a.bind_to_random_port(addr)
a.close()
iface = "%s:%i"%(addr,port)
a = self.context.socket(zmq.PAIR)
# a.setsockopt(zmq.IDENTITY, b"a")
b = self.context.socket(zmq.PAIR)
self.sockets.extend([a,b])
a.connect(iface)
time.sleep(0.1)
p1 = a.send(b'something', copy=False, track=True)
self.assertTrue(isinstance(p1, zmq.MessageTracker))
self.assertFalse(p1.done)
p2 = a.send_multipart([b'something', b'else'], copy=False, track=True)
self.assert_(isinstance(p2, zmq.MessageTracker))
self.assertEqual(p2.done, False)
self.assertEqual(p1.done, False)
b.bind(iface)
msg = b.recv_multipart()
for i in range(10):
if p1.done:
break
time.sleep(0.1)
self.assertEqual(p1.done, True)
self.assertEqual(msg, [b'something'])
msg = b.recv_multipart()
for i in range(10):
if p2.done:
break
time.sleep(0.1)
self.assertEqual(p2.done, True)
self.assertEqual(msg, [b'something', b'else'])
m = zmq.Frame(b"again", track=True)
self.assertEqual(m.tracker.done, False)
p1 = a.send(m, copy=False)
p2 = a.send(m, copy=False)
self.assertEqual(m.tracker.done, False)
self.assertEqual(p1.done, False)
self.assertEqual(p2.done, False)
msg = b.recv_multipart()
self.assertEqual(m.tracker.done, False)
self.assertEqual(msg, [b'again'])
msg = b.recv_multipart()
self.assertEqual(m.tracker.done, False)
self.assertEqual(msg, [b'again'])
self.assertEqual(p1.done, False)
self.assertEqual(p2.done, False)
pm = m.tracker
del m
for i in range(10):
if p1.done:
break
time.sleep(0.1)
self.assertEqual(p1.done, True)
self.assertEqual(p2.done, True)
m = zmq.Frame(b'something', track=False)
self.assertRaises(ValueError, a.send, m, copy=False, track=True)
def test_tracker(self):
"test the MessageTracker object for tracking when zmq is done with a buffer"
addr = 'tcp://127.0.0.1'
a = self.context.socket(zmq.PUB)
port = a.bind_to_random_port(addr)
a.close()
iface = "%s:%i"%(addr,port)
a = self.context.socket(zmq.PAIR)
# a.setsockopt(zmq.IDENTITY, b"a")
b = self.context.socket(zmq.PAIR)
self.sockets.extend([a,b])
a.connect(iface)
time.sleep(0.1)
p1 = a.send(b'something', copy=False, track=True)
self.assertTrue(isinstance(p1, zmq.MessageTracker))
self.assertFalse(p1.done)
p2 = a.send_multipart([b'something', b'else'], copy=False, track=True)
self.assert_(isinstance(p2, zmq.MessageTracker))
self.assertEqual(p2.done, False)
self.assertEqual(p1.done, False)
b.bind(iface)
msg = b.recv_multipart()
for i in range(10):
if p1.done:
break
time.sleep(0.1)
self.assertEqual(p1.done, True)
self.assertEqual(msg, [b'something'])
msg = b.recv_multipart()
for i in range(10):
if p2.done:
break
time.sleep(0.1)
self.assertEqual(p2.done, True)
self.assertEqual(msg, [b'something', b'else'])
m = zmq.Frame(b"again", track=True)
self.assertEqual(m.tracker.done, False)
p1 = a.send(m, copy=False)
p2 = a.send(m, copy=False)
self.assertEqual(m.tracker.done, False)
self.assertEqual(p1.done, False)
self.assertEqual(p2.done, False)
msg = b.recv_multipart()
self.assertEqual(m.tracker.done, False)
self.assertEqual(msg, [b'again'])
msg = b.recv_multipart()
self.assertEqual(m.tracker.done, False)
self.assertEqual(msg, [b'again'])
self.assertEqual(p1.done, False)
self.assertEqual(p2.done, False)
pm = m.tracker
del m
for i in range(10):
if p1.done:
break
time.sleep(0.1)
self.assertEqual(p1.done, True)
self.assertEqual(p2.done, True)
m = zmq.Frame(b'something', track=False)
self.assertRaises(ValueError, a.send, m, copy=False, track=True)
def test_tracker(self):
"test the MessageTracker object for tracking when zmq is done with a buffer"
addr = 'tcp://127.0.0.1'
a = self.context.socket(zmq.PUB)
port = a.bind_to_random_port(addr)
a.close()
iface = "%s:%i"%(addr,port)
a = self.context.socket(zmq.PAIR)
# a.setsockopt(zmq.IDENTITY, b"a")
b = self.context.socket(zmq.PAIR)
self.sockets.extend([a,b])
a.connect(iface)
time.sleep(0.1)
p1 = a.send(b'something', copy=False, track=True)
self.assertTrue(isinstance(p1, zmq.MessageTracker))
self.assertFalse(p1.done)
p2 = a.send_multipart([b'something', b'else'], copy=False, track=True)
self.assert_(isinstance(p2, zmq.MessageTracker))
self.assertEqual(p2.done, False)
self.assertEqual(p1.done, False)
b.bind(iface)
msg = b.recv_multipart()
for i in range(10):
if p1.done:
break
time.sleep(0.1)
self.assertEqual(p1.done, True)
self.assertEqual(msg, [b'something'])
msg = b.recv_multipart()
for i in range(10):
if p2.done:
break
time.sleep(0.1)
self.assertEqual(p2.done, True)
self.assertEqual(msg, [b'something', b'else'])
m = zmq.Frame(b"again", track=True)
self.assertEqual(m.tracker.done, False)
p1 = a.send(m, copy=False)
p2 = a.send(m, copy=False)
self.assertEqual(m.tracker.done, False)
self.assertEqual(p1.done, False)
self.assertEqual(p2.done, False)
msg = b.recv_multipart()
self.assertEqual(m.tracker.done, False)
self.assertEqual(msg, [b'again'])
msg = b.recv_multipart()
self.assertEqual(m.tracker.done, False)
self.assertEqual(msg, [b'again'])
self.assertEqual(p1.done, False)
self.assertEqual(p2.done, False)
pm = m.tracker
del m
for i in range(10):
if p1.done:
break
time.sleep(0.1)
self.assertEqual(p1.done, True)
self.assertEqual(p2.done, True)
m = zmq.Frame(b'something', track=False)
self.assertRaises(ValueError, a.send, m, copy=False, track=True)
def test_tracker(self):
"test the MessageTracker object for tracking when zmq is done with a buffer"
addr = 'tcp://127.0.0.1'
a = self.context.socket(zmq.PUB)
port = a.bind_to_random_port(addr)
a.close()
iface = "%s:%i"%(addr,port)
a = self.context.socket(zmq.PAIR)
# a.setsockopt(zmq.IDENTITY, b"a")
b = self.context.socket(zmq.PAIR)
self.sockets.extend([a,b])
a.connect(iface)
time.sleep(0.1)
p1 = a.send(b'something', copy=False, track=True)
self.assertTrue(isinstance(p1, zmq.MessageTracker))
self.assertFalse(p1.done)
p2 = a.send_multipart([b'something', b'else'], copy=False, track=True)
self.assert_(isinstance(p2, zmq.MessageTracker))
self.assertEqual(p2.done, False)
self.assertEqual(p1.done, False)
b.bind(iface)
msg = b.recv_multipart()
for i in range(10):
if p1.done:
break
time.sleep(0.1)
self.assertEqual(p1.done, True)
self.assertEqual(msg, [b'something'])
msg = b.recv_multipart()
for i in range(10):
if p2.done:
break
time.sleep(0.1)
self.assertEqual(p2.done, True)
self.assertEqual(msg, [b'something', b'else'])
m = zmq.Frame(b"again", track=True)
self.assertEqual(m.tracker.done, False)
p1 = a.send(m, copy=False)
p2 = a.send(m, copy=False)
self.assertEqual(m.tracker.done, False)
self.assertEqual(p1.done, False)
self.assertEqual(p2.done, False)
msg = b.recv_multipart()
self.assertEqual(m.tracker.done, False)
self.assertEqual(msg, [b'again'])
msg = b.recv_multipart()
self.assertEqual(m.tracker.done, False)
self.assertEqual(msg, [b'again'])
self.assertEqual(p1.done, False)
self.assertEqual(p2.done, False)
pm = m.tracker
del m
for i in range(10):
if p1.done:
break
time.sleep(0.1)
self.assertEqual(p1.done, True)
self.assertEqual(p2.done, True)
m = zmq.Frame(b'something', track=False)
self.assertRaises(ValueError, a.send, m, copy=False, track=True)
def test_tracker(self):
"test the MessageTracker object for tracking when zmq is done with a buffer"
addr = 'tcp://127.0.0.1'
a = self.context.socket(zmq.PUB)
port = a.bind_to_random_port(addr)
a.close()
iface = "%s:%i"%(addr,port)
a = self.context.socket(zmq.PAIR)
# a.setsockopt(zmq.IDENTITY, b"a")
b = self.context.socket(zmq.PAIR)
self.sockets.extend([a,b])
a.connect(iface)
time.sleep(0.1)
p1 = a.send(b'something', copy=False, track=True)
self.assertTrue(isinstance(p1, zmq.MessageTracker))
self.assertFalse(p1.done)
p2 = a.send_multipart([b'something', b'else'], copy=False, track=True)
self.assert_(isinstance(p2, zmq.MessageTracker))
self.assertEqual(p2.done, False)
self.assertEqual(p1.done, False)
b.bind(iface)
msg = b.recv_multipart()
for i in range(10):
if p1.done:
break
time.sleep(0.1)
self.assertEqual(p1.done, True)
self.assertEqual(msg, [b'something'])
msg = b.recv_multipart()
for i in range(10):
if p2.done:
break
time.sleep(0.1)
self.assertEqual(p2.done, True)
self.assertEqual(msg, [b'something', b'else'])
m = zmq.Frame(b"again", track=True)
self.assertEqual(m.tracker.done, False)
p1 = a.send(m, copy=False)
p2 = a.send(m, copy=False)
self.assertEqual(m.tracker.done, False)
self.assertEqual(p1.done, False)
self.assertEqual(p2.done, False)
msg = b.recv_multipart()
self.assertEqual(m.tracker.done, False)
self.assertEqual(msg, [b'again'])
msg = b.recv_multipart()
self.assertEqual(m.tracker.done, False)
self.assertEqual(msg, [b'again'])
self.assertEqual(p1.done, False)
self.assertEqual(p2.done, False)
pm = m.tracker
del m
for i in range(10):
if p1.done:
break
time.sleep(0.1)
self.assertEqual(p1.done, True)
self.assertEqual(p2.done, True)
m = zmq.Frame(b'something', track=False)
self.assertRaises(ValueError, a.send, m, copy=False, track=True)