def test_poll(self):
a,b = self.create_bound_pair()
tic = time.time()
evt = a.poll(50)
self.assertEqual(evt, 0)
evt = a.poll(50, zmq.POLLOUT)
self.assertEqual(evt, zmq.POLLOUT)
msg = b'hi'
a.send(msg)
evt = b.poll(50)
self.assertEqual(evt, zmq.POLLIN)
msg2 = self.recv(b)
evt = b.poll(50)
self.assertEqual(evt, 0)
self.assertEqual(msg2, msg)
python类POLLOUT的实例源码
def test_no_events(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, 0)
self.assertTrue(s1 in poller)
self.assertFalse(s2 in poller)
poller.register(s1, 0)
self.assertFalse(s1 in poller)
def test_pubsub(self):
s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
s2.setsockopt(zmq.SUBSCRIBE, b'')
# Sleep to allow sockets to connect.
wait()
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, zmq.POLLIN)
# Now make sure that both are send ready.
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(s2 in socks, 0)
# Make sure that s1 stays in POLLOUT after a send.
s1.send(b'msg1')
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
# Make sure that s2 is POLLIN after waiting.
wait()
socks = dict(poller.poll())
self.assertEqual(socks[s2], zmq.POLLIN)
# Make sure that s2 goes into 0 after recv.
s2.recv()
socks = dict(poller.poll())
self.assertEqual(s2 in socks, 0)
poller.unregister(s1)
poller.unregister(s2)
# Wait for everything to finish.
wait()
def _get_descriptors(self):
"""Returns three elements tuple with socket descriptors ready
for gevent.select.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets:
if isinstance(socket, zmq.Socket):
rlist.append(socket.getsockopt(zmq.FD))
continue
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError('Socket must be a 0MQ socket, an integer fd '
'or have a fileno() method: %r' % socket)
if flags & zmq.POLLIN:
rlist.append(fd)
if flags & zmq.POLLOUT:
wlist.append(fd)
if flags & zmq.POLLERR:
xlist.append(fd)
return (rlist, wlist, xlist)
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def test_poll(self):
a,b = self.create_bound_pair()
tic = time.time()
evt = a.poll(50)
self.assertEqual(evt, 0)
evt = a.poll(50, zmq.POLLOUT)
self.assertEqual(evt, zmq.POLLOUT)
msg = b'hi'
a.send(msg)
evt = b.poll(50)
self.assertEqual(evt, zmq.POLLIN)
msg2 = self.recv(b)
evt = b.poll(50)
self.assertEqual(evt, 0)
self.assertEqual(msg2, msg)
def test_pair(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
# Sleep to allow sockets to connect.
wait()
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
# Poll result should contain both sockets
socks = dict(poller.poll())
# Now make sure that both are send ready.
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(socks[s2], zmq.POLLOUT)
# Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN
s1.send(b'msg1')
s2.send(b'msg2')
wait()
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT|zmq.POLLIN)
self.assertEqual(socks[s2], zmq.POLLOUT|zmq.POLLIN)
# Make sure that both are in POLLOUT after recv.
s1.recv()
s2.recv()
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(socks[s2], zmq.POLLOUT)
poller.unregister(s1)
poller.unregister(s2)
# Wait for everything to finish.
wait()
def test_no_events(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, 0)
self.assertTrue(s1 in poller)
self.assertFalse(s2 in poller)
poller.register(s1, 0)
self.assertFalse(s1 in poller)
def _get_descriptors(self):
"""Returns three elements tuple with socket descriptors ready
for gevent.select.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets:
if isinstance(socket, zmq.Socket):
rlist.append(socket.getsockopt(zmq.FD))
continue
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError('Socket must be a 0MQ socket, an integer fd '
'or have a fileno() method: %r' % socket)
if flags & zmq.POLLIN:
rlist.append(fd)
if flags & zmq.POLLOUT:
wlist.append(fd)
if flags & zmq.POLLERR:
xlist.append(fd)
return (rlist, wlist, xlist)
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def test_poll(self):
a,b = self.create_bound_pair()
tic = time.time()
evt = a.poll(50)
self.assertEqual(evt, 0)
evt = a.poll(50, zmq.POLLOUT)
self.assertEqual(evt, zmq.POLLOUT)
msg = b'hi'
a.send(msg)
evt = b.poll(50)
self.assertEqual(evt, zmq.POLLIN)
msg2 = self.recv(b)
evt = b.poll(50)
self.assertEqual(evt, 0)
self.assertEqual(msg2, msg)
def test_no_events(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, 0)
self.assertTrue(s1 in poller)
self.assertFalse(s2 in poller)
poller.register(s1, 0)
self.assertFalse(s1 in poller)
def test_pubsub(self):
s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
s2.setsockopt(zmq.SUBSCRIBE, b'')
# Sleep to allow sockets to connect.
wait()
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, zmq.POLLIN)
# Now make sure that both are send ready.
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(s2 in socks, 0)
# Make sure that s1 stays in POLLOUT after a send.
s1.send(b'msg1')
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
# Make sure that s2 is POLLIN after waiting.
wait()
socks = dict(poller.poll())
self.assertEqual(socks[s2], zmq.POLLIN)
# Make sure that s2 goes into 0 after recv.
s2.recv()
socks = dict(poller.poll())
self.assertEqual(s2 in socks, 0)
poller.unregister(s1)
poller.unregister(s2)
# Wait for everything to finish.
wait()
def _get_descriptors(self):
"""Returns three elements tuple with socket descriptors ready
for gevent.select.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets:
if isinstance(socket, zmq.Socket):
rlist.append(socket.getsockopt(zmq.FD))
continue
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError('Socket must be a 0MQ socket, an integer fd '
'or have a fileno() method: %r' % socket)
if flags & zmq.POLLIN:
rlist.append(fd)
if flags & zmq.POLLOUT:
wlist.append(fd)
if flags & zmq.POLLERR:
xlist.append(fd)
return (rlist, wlist, xlist)
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def test_poll(self):
a,b = self.create_bound_pair()
tic = time.time()
evt = a.poll(50)
self.assertEqual(evt, 0)
evt = a.poll(50, zmq.POLLOUT)
self.assertEqual(evt, zmq.POLLOUT)
msg = b'hi'
a.send(msg)
evt = b.poll(50)
self.assertEqual(evt, zmq.POLLIN)
msg2 = self.recv(b)
evt = b.poll(50)
self.assertEqual(evt, 0)
self.assertEqual(msg2, msg)
def test_pair(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
# Sleep to allow sockets to connect.
wait()
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
# Poll result should contain both sockets
socks = dict(poller.poll())
# Now make sure that both are send ready.
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(socks[s2], zmq.POLLOUT)
# Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN
s1.send(b'msg1')
s2.send(b'msg2')
wait()
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT|zmq.POLLIN)
self.assertEqual(socks[s2], zmq.POLLOUT|zmq.POLLIN)
# Make sure that both are in POLLOUT after recv.
s1.recv()
s2.recv()
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(socks[s2], zmq.POLLOUT)
poller.unregister(s1)
poller.unregister(s2)
# Wait for everything to finish.
wait()
def test_no_events(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, 0)
self.assertTrue(s1 in poller)
self.assertFalse(s2 in poller)
poller.register(s1, 0)
self.assertFalse(s1 in poller)
def send_safe(self, name, data, *args, **kw):
"""serializes the data with the configured ``serialization_backend``,
waits for the socket to become available, then sends it over
through the provided socket name.
returns ``True`` if the message was sent, or ``False`` if the
socket never became available.
.. note::
you can safely use this function without waiting for a
socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
:param name: the name of the socket where data should be sent through
:param data: the data to be serialized then sent
:param ``*args``: args to be passed to wait_until_ready
:param ``**kw``: kwargs to be passed to wait_until_ready
"""
socket = self.wait_until_ready(name, self.zmq.POLLOUT, *args, **kw)
if not socket:
return False
payload = self.serialization_backend.pack(data)
socket.send(payload)
return True