def run(self):
self.log.debug("Broker starts XPUB:{}, XSUB:{}"
.format(self.xpub_url, self.xsub_url))
# self.proxy.start()
poller = zmq.Poller()
poller.register(self.xpub, zmq.POLLIN)
poller.register(self.xsub, zmq.POLLIN)
self.running = True
while self.running:
events = dict(poller.poll(1000))
if self.xpub in events:
message = self.xpub.recv_multipart()
self.log.debug("subscription message: {}".format(message[0]))
self.xsub.send_multipart(message)
if self.xsub in events:
message = self.xsub.recv_multipart()
self.log.debug("publishing message: {}".format(message))
self.xpub.send_multipart(message)
python类POLLIN的实例源码
def __init__(self, port, pipeline=100, host='localhost', log_file=None):
"""Create a new ZMQDealer object.
"""
context = zmq.Context.instance()
# noinspection PyUnresolvedReferences
self.socket = context.socket(zmq.DEALER)
self.socket.hwm = pipeline
self.socket.connect('tcp://%s:%d' % (host, port))
self._log_file = log_file
self.poller = zmq.Poller()
# noinspection PyUnresolvedReferences
self.poller.register(self.socket, zmq.POLLIN)
if self._log_file is not None:
self._log_file = os.path.abspath(self._log_file)
# If log file directory does not exists, create it
log_dir = os.path.dirname(self._log_file)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# clears any existing log
if os.path.exists(self._log_file):
os.remove(self._log_file)
def __init__(self, bind_address, linger=-1, poll_timeout=2, loop=None):
self.bind_address = bind_address
self.loop = loop
self.context = zmq.asyncio.Context()
self.poll_timeout = poll_timeout
self.socket = self.context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, linger)
self.in_poller = zmq.asyncio.Poller()
self.in_poller.register(self.socket, zmq.POLLIN)
log.info('Bound to: ' + self.bind_address)
self.socket.bind(self.bind_address)
self._kill = False
def full_req_transceiver(zmq_url, data):
"""Used to send data and close connection.
:param zmq_url: URL for the socket to connect to.
:param data: The data to send.
:returns: The unpacked response.
"""
# TODO: Harden this
# TODO: Add linger and POLLIN support : https://github.com/zeromq/pyzmq/issues/132
ctx, socket = get_ctx_and_connect_req_socket(zmq_url)
packed = msgpack.packb(data)
socket.send_multipart([packed])
rep = socket.recv()
unpacked_rep = msgpack.unpackb(rep, encoding='utf-8')
socket.close()
ctx.term()
return unpacked_rep
def transceiver(self, payload):
"""Sends and receives messages.
:param payload: A dict representing the message to send.
:returns: A string representing the unpacked response.
"""
# TODO: Harden this
# TODO: Add linger and POLLIN support :
# https://github.com/zeromq/pyzmq/issues/132
packed = msgpack.packb(payload)
# blocks
self.socket.send_multipart([packed])
if self.response_timeout:
if not self.poller.poll(self.response_timeout * 1000):
raise IOError('Timeout while waiting for server response')
# blocks
rep = self.socket.recv()
return self.check_and_return(msgpack.unpackb(rep, encoding='utf-8'))
def __init__(self, targname, cfg, isServer=False):
self.targname = targname
self.cfg = cfg
self.isServer = isServer
self.fnCallName = ''
self.ctx = zmq.Context()
self.ctx.linger = 100
if not self.isServer:
self.sock = self.ctx.socket(zmq.DEALER)
self.sock.linger = 100
self.sock.connect('tcp://%s:%s' % (self.cfg['server'],self.cfg.get('port',7677))) # this times out with EINVAL when no internet
self.poller = zmq.Poller()
self.poller.register(self.sock, zmq.POLLIN)
else:
self.sock = self.ctx.socket(zmq.ROUTER)
self.sock.linger = 100
self.sock.bind('tcp://*:%s' % (self.cfg.get('port',7677)))
self.poller = zmq.Poller()
self.poller.register(self.sock, zmq.POLLIN)
self.be = GetBackend(self.cfg['backend'])(self.targname, self.cfg)
self.inTime = time.time()
self.inactiveLimit = int(self.cfg.get('inactivelimit',0))
print 'inactivelimit ',self.inactiveLimit
def spin_once(self, polling_sec=0.010):
'''Read the queued data and call the callback for them.
You have to handle KeyboardInterrupt (\C-c) manually.
Example:
>>> def callback(msg):
... print msg
>>> sub = jps.Subscriber('topic_name', callback)
>>> try:
... while True:
... sub.spin_once():
... time.sleep(0.1)
... except KeyboardInterrupt:
... pass
'''
# parse all data
while True:
socks = dict(self._poller.poll(polling_sec * 1000))
if socks.get(self._socket) == zmq.POLLIN:
msg = self._socket.recv()
self._callback(msg)
else:
return
def set_topic(self, name, topic):
"""shortcut to :py:meth:SocketManager.set_socket_option(zmq.TOPIC, topic)
:param name: the name of the socket where data will pad through
:param topic: the option from the ``zmq`` module
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_bind('events', zmq.SUB, 'tcp://*:6000', zmq.POLLIN)
>>>
>>> # subscribe only to topics beginning with "logs"
>>> sockets.set_topic('events', 'logs')
>>> event = sockets.recv_event_safe('events')
>>> event.topic, event.data
'logs:2016-06-20', {'stdout': 'hello world'}
"""
safe_topic = bytes(topic)
self.set_socket_option(name, self.zmq.SUBSCRIBE, safe_topic)
def ensure_and_bind(self, socket_name, socket_type, address, polling_mechanism):
"""Ensure that a socket exists, that is *binded* to the given address
and that is registered with the given polling mechanism.
This method is a handy replacement for calling
``.get_or_create()``, ``.bind()`` and then ``.engage()``.
returns the socket itself.
:param socket_name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
:param address: a valid zeromq address (i.e: inproc://whatevs)
:param polling_mechanism: ``zmq.POLLIN``, ``zmq.POLLOUT`` or ``zmq.POLLIN | zmq.POLLOUT``
"""
self.get_or_create(socket_name, socket_type, polling_mechanism)
socket = self.bind(socket_name, address, polling_mechanism)
self.engage()
return socket
def ready(self, name, polling_mechanism, timeout=None):
"""Polls all sockets and checks if the socket with the given name is ready for either ``zmq.POLLIN`` or ``zmq.POLLOUT``.
returns the socket if available, or ``None``
:param socket_name: the socket name
:param polling_mechanism: either ``zmq.POLLIN`` or ``zmq.POLLOUT``
:param timeout: the polling timeout in miliseconds that will
be passed to ``zmq.Poller().poll()`` (optional, defaults to
``core.DEFAULT_POLLING_TIMEOUT``)
"""
socket = self.get_by_name(name)
available_mechanism = self.engage(timeout is None and self.timeout or timeout).pop(socket, None)
if polling_mechanism == available_mechanism:
return socket
def get_or_create(self, name, socket_type, polling_mechanism):
"""ensure that a socket exists and is registered with a given
polling_mechanism (POLLIN, POLLOUT or both)
returns the socket itself.
:param name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
:param polling_mechanism: one of (``zmq.POLLIN``, ``zmq.POLLOUT``, ``zmq.POLLIN | zmq.POLLOUT``)
"""
if name not in self.sockets:
self.create(name, socket_type)
socket = self.get_by_name(name)
self.register_socket(socket, polling_mechanism)
return socket
def _send_raw(self, serialized):
self.create_socket()
self._socket.send_string(serialized, zmq.NOBLOCK)
poller = zmq.Poller()
poller.register(self._socket, zmq.POLLIN)
if poller.poll(self._timeout * 1000):
msg = self._socket.recv()
self.on_message(msg)
self.cleanup_socket()
else:
self._transport.log("Peer " + self._address + " timed out.")
self.cleanup_socket()
self._transport.remove_peer(self._address)
def zmq_request(self, msg_type, msg_content, timeout=__DEFAULT_REQUEST_TIMEOUT):
# new socket to talk to server
self.__socket = zmq.Context().socket(zmq.REQ)
self.__socket.connect("tcp://localhost:" + ZMQPort.RQ)
# init poller and register to socket that web can poll socket to check is it has messages
poller = zmq.Poller()
poller.register(self.__socket, zmq.POLLIN)
send_flatbuf_msg(self.__socket, msg_type, msg_content)
reqs = 0
while reqs * self.__POLL_INTERVAL <= timeout:
socks = dict(poller.poll(self.__POLL_INTERVAL))
if self.__socket in socks and socks[self.__socket] == zmq.POLLIN:
msg = self.__socket.recv()
msgObj = TransMsg.GetRootAsTransMsg(msg, 0)
return msgObj.Content()
reqs = reqs + 1
return False
def register(self, queue, handler, flags=zmq.POLLIN):
"""
Register *queue* to be polled on each cycle of the task. Any messages
with the relevant *flags* (defaults to ``POLLIN``) will trigger the
specified *handler* method which is expected to take a single argument
which will be *queue*.
:param zmq.Socket queue:
The queue to poll.
:param handler:
The function or method to call when a message with matching *flags*
arrives in *queue*.
:param int flags:
The flags to match in the queue poller (defaults to ``POLLIN``).
"""
self.poller.register(queue, flags)
self.handlers[queue] = handler
def watch_queue(self, queue, callback, flags=zmq.POLLIN):
"""
Call *callback* when zmq *queue* has something to read (when *flags* is
set to ``POLLIN``, the default) or is available to write (when *flags*
is set to ``POLLOUT``). No parameters are passed to the callback.
:param queue:
The zmq queue to poll.
:param callback:
The function to call when the poll is successful.
:param int flags:
The condition to monitor on the queue (defaults to ``POLLIN``).
"""
if queue in self._queue_callbacks:
raise ValueError('already watching %r' % queue)
self._poller.register(queue, flags)
self._queue_callbacks[queue] = callback
return queue
def watch_file(self, fd, callback, flags=zmq.POLLIN):
"""
Call *callback* when *fd* has some data to read. No parameters are
passed to the callback. The *flags* are as for :meth:`watch_queue`.
:param fd:
The file-like object, or fileno to monitor.
:param callback:
The function to call when the file has data available.
:param int flags:
The condition to monitor on the file (defaults to ``POLLIN``).
"""
if isinstance(fd, int):
fd = os.fdopen(fd)
self._poller.register(fd, flags)
self._queue_callbacks[fd.fileno()] = callback
return fd
def test_getsockopt_events(self):
sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
eventlet.sleep()
poll_out = zmq.Poller()
poll_out.register(sock1, zmq.POLLOUT)
sock_map = poll_out.poll(100)
self.assertEqual(len(sock_map), 1)
events = sock1.getsockopt(zmq.EVENTS)
self.assertEqual(events & zmq.POLLOUT, zmq.POLLOUT)
sock1.send(b'')
poll_in = zmq.Poller()
poll_in.register(sock2, zmq.POLLIN)
sock_map = poll_in.poll(100)
self.assertEqual(len(sock_map), 1)
events = sock2.getsockopt(zmq.EVENTS)
self.assertEqual(events & zmq.POLLIN, zmq.POLLIN)
def update_view(self):
while True:
events = dict(self.poller.poll(5))
if not events:
break
for socket in events:
if events[socket] != zmq.POLLIN:
continue
message = socket.recv_pyobj()
timestamp, angles, accel, tmp = message
x_angle = angles[0]
y_angle = angles[1]
z_angle = angles[2]
x_accel = accel[0]
y_accel = accel[1]
z_accel = accel[2]
self.vis_sensors.push_data(timestamp, angles)
self.vis_3d.update_view(x_angle,y_angle,z_angle)
self.beep.beep(x_angle)
self.vis_instrument.update_view(x_accel, y_accel, z_accel)
self.vis_sensors.update_view()
def socket_fitness(self, chrom):
if self.socket.closed:
self.socket = self.context.socket(zmq.REQ)
self.socket.bind(self.socket_port)
self.poll.register(self.socket, zmq.POLLIN)
self.socket.send_string(';'.join([
self.func.get_Driving(),
self.func.get_Follower(),
self.func.get_Link(),
self.func.get_Target(),
self.func.get_ExpressionName(),
self.func.get_Expression(),
','.join(["{}:{}".format(e[0], e[1]) for e in self.targetPath]),
','.join([str(e) for e in chrom])
]))
while True:
socks = dict(self.poll.poll(100))
if socks.get(self.socket)==zmq.POLLIN:
return float(self.socket.recv().decode('utf-8'))
else:
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close()
self.poll.unregister(self.socket)
return self.func(chrom)
def socket_fitness(self, chrom):
if self.socket.closed:
self.socket = self.context.socket(zmq.REQ)
self.socket.bind(self.socket_port)
self.poll.register(self.socket, zmq.POLLIN)
self.socket.send_string(';'.join([
self.func.get_Driving(),
self.func.get_Follower(),
self.func.get_Link(),
self.func.get_Target(),
self.func.get_ExpressionName(),
self.func.get_Expression(),
','.join(["{}:{}".format(e[0], e[1]) for e in self.targetPath]),
','.join([str(e) for e in chrom])
]))
while True:
socks = dict(self.poll.poll(100))
if socks.get(self.socket)==zmq.POLLIN:
return float(self.socket.recv().decode('utf-8'))
else:
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close()
self.poll.unregister(self.socket)
return self.func(chrom)