def test_root_topic(self):
logger, handler, sub = self.connect_handler()
handler.socket.bind(self.iface)
sub2 = sub.context.socket(zmq.SUB)
self.sockets.append(sub2)
sub2.connect(self.iface)
sub2.setsockopt(zmq.SUBSCRIBE, b'')
handler.root_topic = b'twoonly'
msg1 = 'ignored'
logger.info(msg1)
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
topic,msg2 = sub2.recv_multipart()
self.assertEqual(topic, b'twoonly.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
python类NOBLOCK的实例源码
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def shutdown(self):
"""Shut down paired listener with <END> signal."""
if hasattr(self, 'socket'):
try:
self.socket.send_unicode('<END>', zmq.NOBLOCK)
except zmq.error.ZMQError:
# may need to listen first
try:
self.socket.recv_unicode(zmq.NOBLOCK)
self.socket.send_unicode('<END>', zmq.NOBLOCK)
except zmq.error.ZMQError:
# paired process is probably dead already
pass
if hasattr(self, 'process'):
# try to let the subprocess clean up, but don't wait too long
try:
self.process.communicate(timeout=1)
except subprocess.TimeoutExpired:
self.process.kill()
def send(self, message, send_more=False, block=True, as_json=False):
flags = 0
if send_more:
flags = zmq.SNDMORE
if not block:
flags = flags | zmq.NOBLOCK
try:
if as_json:
self.socket.send_json(message, flags)
else:
self.socket.send(message, flags, copy=self.zmq_copy, track=self.zmq_track)
except zmq.Again as e:
if not block:
pass
else:
raise e
except zmq.ZMQError as e:
logger.error(sys.exc_info()[1])
raise e
def get_command(self):
"""Attempt to return a unicode object from the command socket
If no message is available without blocking (as opposed to a blank
message), return None
"""
try:
message_bytes = self.socket.recv(zmq.NOBLOCK)
log.debug("Received message: %r", message_bytes)
except zmq.ZMQError as exc:
if exc.errno == zmq.EAGAIN:
return None
else:
raise
else:
return message_bytes.decode(config.CODEC)
def poll_command_request(self):
"""If the command RPC socket has an incoming request,
separate it into its action and its params and put it
on the command request queue.
"""
try:
message = self.rpc.recv(zmq.NOBLOCK)
except zmq.ZMQError as exc:
if exc.errno == zmq.EAGAIN:
return
else:
raise
_logger.debug("Received command %s", message)
segments = _unpack(message)
action, params = segments[0], segments[1:]
_logger.debug("Adding %s, %s to the request queue", action, params)
self._command = _Command(action, params)
def generator_from_zmq_pull(context, host):
socket = context.socket(zmq.PULL)
# TODO: Configure socket with clean properties to avoid message overload.
if host.endswith('/'):
host = host[:-1]
print_item("+", "Binding ZMQ pull socket : " + colorama.Fore.CYAN + "{0}".format(host) + colorama.Style.RESET_ALL)
socket.bind(host)
while True:
try:
message = socket.recv(flags=zmq.NOBLOCK)
except zmq.Again as e:
message = None
if message is None:
yield None # NOTE: We have to make the generator non blocking.
else:
task = json.loads(message)
yield task
def test_root_topic(self):
logger, handler, sub = self.connect_handler()
handler.socket.bind(self.iface)
sub2 = sub.context.socket(zmq.SUB)
self.sockets.append(sub2)
sub2.connect(self.iface)
sub2.setsockopt(zmq.SUBSCRIBE, b'')
handler.root_topic = b'twoonly'
msg1 = 'ignored'
logger.info(msg1)
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
topic,msg2 = sub2.recv_multipart()
self.assertEqual(topic, b'twoonly.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def test_root_topic(self):
logger, handler, sub = self.connect_handler()
handler.socket.bind(self.iface)
sub2 = sub.context.socket(zmq.SUB)
self.sockets.append(sub2)
sub2.connect(self.iface)
sub2.setsockopt(zmq.SUBSCRIBE, b'')
handler.root_topic = b'twoonly'
msg1 = 'ignored'
logger.info(msg1)
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
topic,msg2 = sub2.recv_multipart()
self.assertEqual(topic, b'twoonly.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def test_root_topic(self):
logger, handler, sub = self.connect_handler()
handler.socket.bind(self.iface)
sub2 = sub.context.socket(zmq.SUB)
self.sockets.append(sub2)
sub2.connect(self.iface)
sub2.setsockopt(zmq.SUBSCRIBE, b'')
handler.root_topic = b'twoonly'
msg1 = 'ignored'
logger.info(msg1)
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
topic,msg2 = sub2.recv_multipart()
self.assertEqual(topic, b'twoonly.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def test_root_topic(self):
logger, handler, sub = self.connect_handler()
handler.socket.bind(self.iface)
sub2 = sub.context.socket(zmq.SUB)
self.sockets.append(sub2)
sub2.connect(self.iface)
sub2.setsockopt(zmq.SUBSCRIBE, b'')
handler.root_topic = b'twoonly'
msg1 = 'ignored'
logger.info(msg1)
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
topic,msg2 = sub2.recv_multipart()
self.assertEqual(topic, b'twoonly.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def test_root_topic(self):
logger, handler, sub = self.connect_handler()
handler.socket.bind(self.iface)
sub2 = sub.context.socket(zmq.SUB)
self.sockets.append(sub2)
sub2.connect(self.iface)
sub2.setsockopt(zmq.SUBSCRIBE, b'')
handler.root_topic = b'twoonly'
msg1 = 'ignored'
logger.info(msg1)
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
topic,msg2 = sub2.recv_multipart()
self.assertEqual(topic, b'twoonly.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def test_root_topic(self):
logger, handler, sub = self.connect_handler()
handler.socket.bind(self.iface)
sub2 = sub.context.socket(zmq.SUB)
self.sockets.append(sub2)
sub2.connect(self.iface)
sub2.setsockopt(zmq.SUBSCRIBE, b'')
handler.root_topic = b'twoonly'
msg1 = 'ignored'
logger.info(msg1)
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
topic,msg2 = sub2.recv_multipart()
self.assertEqual(topic, b'twoonly.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def run_arduino_bridge(self):
"""
start the bridge
:return:
"""
while True:
if self.last_problem:
self.report_problem()
# noinspection PyBroadException
try:
z = self.subscriber.recv_multipart(zmq.NOBLOCK)
self.payload = umsgpack.unpackb(z[1])
# print("[%s] %s" % (z[0], self.payload))
command = self.payload['command']
if command in self.command_dict:
self.command_dict[command]()
else:
print("can't execute unknown command'")
self.board.sleep(.001)
except zmq.error.Again:
self.board.sleep(.001)
# return
def receive_loop(self):
"""
This is the receive loop for zmq messages
It is assumed that this method will be overwritten to meet the needs of the application and to handle
received messages.
:return:
"""
while True:
try:
data = self.subscriber.recv_multipart(zmq.NOBLOCK)
self.incoming_message_processing(data[0].decode(), umsgpack.unpackb(data[1]))
self.board.sleep(.01)
except zmq.error.Again:
try:
self.board.sleep(.01)
except:
self.clean_up()
except KeyboardInterrupt:
self.clean_up()
# noinspection PyMethodMayBeStatic
def receive_loop(self):
"""
This is the receive loop for zmq messages.
It is assumed that this method will be overwritten to meet the needs of the application and to handle
received messages.
:return:
"""
while True:
try:
data = self.subscriber.recv_multipart(zmq.NOBLOCK)
self.incoming_message_processing(data[0].decode(), umsgpack.unpackb(data[1]))
time.sleep(.001)
except zmq.error.Again:
time.sleep(.001)
except KeyboardInterrupt:
self.clean_up()
# noinspection PyMethodMayBeStatic
def run_raspberry_bridge(self):
# self.pi.set_mode(11, pigpio.INPUT)
# cb1 = self.pi.callback(11, pigpio.EITHER_EDGE, self.cbf)
while True:
if self.last_problem:
self.report_problem()
# noinspection PyBroadException
try:
z = self.subscriber.recv_multipart(zmq.NOBLOCK)
self.payload = umsgpack.unpackb(z[1])
command = self.payload['command']
if command in self.command_dict:
self.command_dict[command]()
else:
print("can't execute unknown command'")
# time.sleep(.001)
except KeyboardInterrupt:
self.cleanup()
sys.exit(0)
except zmq.error.Again:
time.sleep(.001)
def _fetch_messages(self):
"""
Get an input message from the socket
"""
try:
[_, msg] = self.socket.recv_multipart(flags=zmq.NOBLOCK)
if Global.CONFIG_MANAGER.tracing_mode:
Global.LOGGER.debug("fetched a new message")
self.fetched = self.fetched + 1
obj = pickle.loads(msg)
self._deliver_message(obj)
return obj
except zmq.error.Again:
return None
except Exception as new_exception:
Global.LOGGER.error(new_exception)
raise new_exception
def _main_do_control_send(self):
nr_scheduled = self._control_mqueue.qsize()
nr_done = 0
for i in range(nr_scheduled):
job = self._control_mqueue.get()
if job.identifier is not None:
rc = utils.router_send_json(job.sock, job.identifier, job.payload, flag=zmq.NOBLOCK)
else:
rc = utils.req_send_json(job.sock, job.payload, flag=zmq.NOBLOCK)
if not rc:
if job.countdown > 0:
self._control_mqueue.put(ControlMessage(job[0], job[1], job[2], job.countdown - 1))
else:
nr_done += 1
return nr_done
def _send_raw(self, serialized):
self.create_socket()
self._socket.send_string(serialized, zmq.NOBLOCK)
poller = zmq.Poller()
poller.register(self._socket, zmq.POLLIN)
if poller.poll(self._timeout * 1000):
msg = self._socket.recv()
self.on_message(msg)
self.cleanup_socket()
else:
self._transport.log("Peer " + self._address + " timed out.")
self.cleanup_socket()
self._transport.remove_peer(self._address)