def test_root_topic(self):
logger, handler, sub = self.connect_handler()
handler.socket.bind(self.iface)
sub2 = sub.context.socket(zmq.SUB)
self.sockets.append(sub2)
sub2.connect(self.iface)
sub2.setsockopt(zmq.SUBSCRIBE, b'')
handler.root_topic = b'twoonly'
msg1 = 'ignored'
logger.info(msg1)
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
topic,msg2 = sub2.recv_multipart()
self.assertEqual(topic, b'twoonly.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
python类EAGAIN的实例源码
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def _check_rc(rc, errno=None):
"""internal utility for checking zmq return condition
and raising the appropriate Exception class
"""
if rc == -1:
if errno is None:
from zmq.backend import zmq_errno
errno = zmq_errno()
from zmq import EAGAIN, ETERM
if errno == EINTR:
raise InterruptedSystemCall(errno)
elif errno == EAGAIN:
raise Again(errno)
elif errno == ETERM:
raise ContextTerminated(errno)
else:
raise ZMQError(errno)
def get_command(self):
"""Attempt to return a unicode object from the command socket
If no message is available without blocking (as opposed to a blank
message), return None
"""
try:
message_bytes = self.socket.recv(zmq.NOBLOCK)
log.debug("Received message: %r", message_bytes)
except zmq.ZMQError as exc:
if exc.errno == zmq.EAGAIN:
return None
else:
raise
else:
return message_bytes.decode(config.CODEC)
def poll_command_request(self):
"""If the command RPC socket has an incoming request,
separate it into its action and its params and put it
on the command request queue.
"""
try:
message = self.rpc.recv(zmq.NOBLOCK)
except zmq.ZMQError as exc:
if exc.errno == zmq.EAGAIN:
return
else:
raise
_logger.debug("Received command %s", message)
segments = _unpack(message)
action, params = segments[0], segments[1:]
_logger.debug("Adding %s, %s to the request queue", action, params)
self._command = _Command(action, params)
def test_root_topic(self):
logger, handler, sub = self.connect_handler()
handler.socket.bind(self.iface)
sub2 = sub.context.socket(zmq.SUB)
self.sockets.append(sub2)
sub2.connect(self.iface)
sub2.setsockopt(zmq.SUBSCRIBE, b'')
handler.root_topic = b'twoonly'
msg1 = 'ignored'
logger.info(msg1)
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
topic,msg2 = sub2.recv_multipart()
self.assertEqual(topic, b'twoonly.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def _check_rc(rc, errno=None):
"""internal utility for checking zmq return condition
and raising the appropriate Exception class
"""
if rc < 0:
from zmq.backend import zmq_errno
if errno is None:
errno = zmq_errno()
from zmq import EAGAIN, ETERM
if errno == EAGAIN:
raise Again(errno)
elif errno == ETERM:
raise ContextTerminated(errno)
else:
raise ZMQError(errno)
def test_root_topic(self):
logger, handler, sub = self.connect_handler()
handler.socket.bind(self.iface)
sub2 = sub.context.socket(zmq.SUB)
self.sockets.append(sub2)
sub2.connect(self.iface)
sub2.setsockopt(zmq.SUBSCRIBE, b'')
handler.root_topic = b'twoonly'
msg1 = 'ignored'
logger.info(msg1)
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
topic,msg2 = sub2.recv_multipart()
self.assertEqual(topic, b'twoonly.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def skip_plain_inauth(self):
"""test PLAIN failed authentication"""
server = self.socket(zmq.DEALER)
server.identity = b'IDENT'
client = self.socket(zmq.DEALER)
self.sockets.extend([server, client])
client.plain_username = USER
client.plain_password = b'incorrect'
server.plain_server = True
self.assertEqual(server.mechanism, zmq.PLAIN)
self.assertEqual(client.mechanism, zmq.PLAIN)
self.start_zap()
iface = 'tcp://127.0.0.1'
port = server.bind_to_random_port(iface)
client.connect("%s:%i" % (iface, port))
client.send(b'ping')
server.rcvtimeo = 250
self.assertRaisesErrno(zmq.EAGAIN, server.recv)
self.stop_zap()
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def test_root_topic(self):
logger, handler, sub = self.connect_handler()
handler.socket.bind(self.iface)
sub2 = sub.context.socket(zmq.SUB)
self.sockets.append(sub2)
sub2.connect(self.iface)
sub2.setsockopt(zmq.SUBSCRIBE, b'')
handler.root_topic = b'twoonly'
msg1 = 'ignored'
logger.info(msg1)
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
topic,msg2 = sub2.recv_multipart()
self.assertEqual(topic, b'twoonly.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def _check_rc(rc, errno=None):
"""internal utility for checking zmq return condition
and raising the appropriate Exception class
"""
if rc < 0:
from zmq.backend import zmq_errno
if errno is None:
errno = zmq_errno()
from zmq import EAGAIN, ETERM
if errno == EAGAIN:
raise Again(errno)
elif errno == ETERM:
raise ContextTerminated(errno)
else:
raise ZMQError(errno)
def test_root_topic(self):
logger, handler, sub = self.connect_handler()
handler.socket.bind(self.iface)
sub2 = sub.context.socket(zmq.SUB)
self.sockets.append(sub2)
sub2.connect(self.iface)
sub2.setsockopt(zmq.SUBSCRIBE, b'')
handler.root_topic = b'twoonly'
msg1 = 'ignored'
logger.info(msg1)
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
topic,msg2 = sub2.recv_multipart()
self.assertEqual(topic, b'twoonly.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def skip_plain_inauth(self):
"""test PLAIN failed authentication"""
server = self.socket(zmq.DEALER)
server.identity = b'IDENT'
client = self.socket(zmq.DEALER)
self.sockets.extend([server, client])
client.plain_username = USER
client.plain_password = b'incorrect'
server.plain_server = True
self.assertEqual(server.mechanism, zmq.PLAIN)
self.assertEqual(client.mechanism, zmq.PLAIN)
self.start_zap()
iface = 'tcp://127.0.0.1'
port = server.bind_to_random_port(iface)
client.connect("%s:%i" % (iface, port))
client.send(b'ping')
server.rcvtimeo = 250
self.assertRaisesErrno(zmq.EAGAIN, server.recv)
self.stop_zap()
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def test_root_topic(self):
logger, handler, sub = self.connect_handler()
handler.socket.bind(self.iface)
sub2 = sub.context.socket(zmq.SUB)
self.sockets.append(sub2)
sub2.connect(self.iface)
sub2.setsockopt(zmq.SUBSCRIBE, b'')
handler.root_topic = b'twoonly'
msg1 = 'ignored'
logger.info(msg1)
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
topic,msg2 = sub2.recv_multipart()
self.assertEqual(topic, b'twoonly.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def _check_rc(rc, errno=None):
"""internal utility for checking zmq return condition
and raising the appropriate Exception class
"""
if rc < 0:
from zmq.backend import zmq_errno
if errno is None:
errno = zmq_errno()
from zmq import EAGAIN, ETERM
if errno == EAGAIN:
raise Again(errno)
elif errno == ETERM:
raise ContextTerminated(errno)
else:
raise ZMQError(errno)
def test_root_topic(self):
logger, handler, sub = self.connect_handler()
handler.socket.bind(self.iface)
sub2 = sub.context.socket(zmq.SUB)
self.sockets.append(sub2)
sub2.connect(self.iface)
sub2.setsockopt(zmq.SUBSCRIBE, b'')
handler.root_topic = b'twoonly'
msg1 = 'ignored'
logger.info(msg1)
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
topic,msg2 = sub2.recv_multipart()
self.assertEqual(topic, b'twoonly.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def skip_plain_inauth(self):
"""test PLAIN failed authentication"""
server = self.socket(zmq.DEALER)
server.identity = b'IDENT'
client = self.socket(zmq.DEALER)
self.sockets.extend([server, client])
client.plain_username = USER
client.plain_password = b'incorrect'
server.plain_server = True
self.assertEqual(server.mechanism, zmq.PLAIN)
self.assertEqual(client.mechanism, zmq.PLAIN)
self.start_zap()
iface = 'tcp://127.0.0.1'
port = server.bind_to_random_port(iface)
client.connect("%s:%i" % (iface, port))
client.send(b'ping')
server.rcvtimeo = 250
self.assertRaisesErrno(zmq.EAGAIN, server.recv)
self.stop_zap()
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def run(self):
while not self._terminate:
connection_message = None
try:
connection_message = self._socket.recv_multipart(zmq.NOBLOCK)
[tag, json_message] = connection_message
message = json.loads(json_message.decode('utf8'))
if tag == b"queryResponse":
self._bus.resolve_response(message)
else:
handler_thread = ZMQHandlerThread(self._bus, tag.decode('utf-8'), message)
handler_thread.start()
#time.sleep(0.001)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
time.sleep(.001)
pass
elif e.errno == zmq.ETERM:
#print("terminate", self._address)
self._terminate = True
else:
print("message zmq exception:", self._address, e, e.errno)
except Exception as e:
print("message exception:", self._address, e, connection_message)
#print("message thread terminated:", self._address)
def send(self, data, flags=0, copy=True, track=False):
"""send, which will only block current greenlet
state_changed always fires exactly once (success or fail) at the
end of this method.
"""
# if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
if flags & zmq.NOBLOCK:
try:
msg = super(_Socket, self).send(data, flags, copy, track)
finally:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# ensure the zmq.NOBLOCK flag is part of flags
flags |= zmq.NOBLOCK
while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
try:
# attempt the actual call
msg = super(_Socket, self).send(data, flags, copy, track)
except zmq.ZMQError as e:
# if the raised ZMQError is not EAGAIN, reraise
if e.errno != zmq.EAGAIN:
if not self.__in_send_multipart:
self.__state_changed()
raise
else:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# defer to the event loop until we're notified the socket is writable
self._wait_write()
def recv(self, flags=0, copy=True, track=False):
"""recv, which will only block current greenlet
state_changed always fires exactly once (success or fail) at the
end of this method.
"""
if flags & zmq.NOBLOCK:
try:
msg = super(_Socket, self).recv(flags, copy, track)
finally:
if not self.__in_recv_multipart:
self.__state_changed()
return msg
flags |= zmq.NOBLOCK
while True:
try:
msg = super(_Socket, self).recv(flags, copy, track)
except zmq.ZMQError as e:
if e.errno != zmq.EAGAIN:
if not self.__in_recv_multipart:
self.__state_changed()
raise
else:
if not self.__in_recv_multipart:
self.__state_changed()
return msg
self._wait_read()
def test_topic(self):
s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
s2.setsockopt(zmq.SUBSCRIBE, b'x')
time.sleep(0.1)
msg1 = b'message'
s1.send(msg1)
self.assertRaisesErrno(zmq.EAGAIN, s2.recv, zmq.NOBLOCK)
msg1 = b'xmessage'
s1.send(msg1)
msg2 = s2.recv()
self.assertEqual(msg1, msg2)
def test_again(self):
s = self.context.socket(zmq.REP)
self.assertRaises(Again, s.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
s.close()
def assertRaisesErrno(self, errno, func, *args, **kwargs):
if errno == zmq.EAGAIN:
raise SkipTest("Skipping because we're green.")
try:
func(*args, **kwargs)
except zmq.ZMQError:
e = sys.exc_info()[1]
self.assertEqual(e.errno, errno, "wrong error raised, expected '%s' \
got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
else:
self.fail("Function did not raise any error")
test_zmq_pub_sub.py 文件源码
项目:integration-prototype
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def recv_messages(zmq_subscriber, timeout_count, message_count):
"""Test utility function.
Subscriber thread that receives and counts ZMQ messages.
Args:
zmq_subscriber (zmq.Socket): ZMQ subscriber socket.
timeout_count (int): No. of failed receives until exit.
message_count (int): No. of messages expected to be received.
Returns:
(int) Number of messages received.
"""
# pylint: disable=E1101
fails = 0 # No. of receives that didn't return a message.
receive_count = 0 # Total number of messages received.
while fails < timeout_count:
try:
_ = zmq_subscriber.recv_string(flags=zmq.NOBLOCK)
fails = 0
receive_count += 1
if receive_count == message_count:
break
except zmq.ZMQError as error:
if error.errno == zmq.EAGAIN:
pass
else:
raise
fails += 1
time.sleep(1e-6)
return receive_count