def get_messages(self, timeout=0.1, count=1):
started = time()
sleep_time = timeout / 10.0
while count:
try:
msg = self.subscriber.recv_multipart(copy=True, flags=zmq.NOBLOCK)
except zmq.Again:
if time() - started > timeout:
break
sleep(sleep_time)
else:
partition_seqno, global_seqno = unpack(">II", msg[2])
seqno = global_seqno if self.count_global else partition_seqno
if not self.counter:
self.counter = seqno
elif self.counter != seqno:
if self.seq_warnings:
self.logger.warning("Sequence counter mismatch: expected %d, got %d. Check if system "
"isn't missing messages." % (self.counter, seqno))
self.counter = None
yield msg[1]
count -= 1
if self.counter:
self.counter += 1
self.stats[self.stat_key] += 1
python类NOBLOCK的实例源码
def _get_data(self, blocking=True):
"""Get batch of data."""
# TODO complete docstring.
if not blocking:
try:
batch = self.socket.recv(flags=zmq.NOBLOCK)
except zmq.Again:
return None
else:
batch = self.socket.recv()
if batch == TERM_MSG:
raise EOCError()
if self.structure == 'array':
batch = numpy.fromstring(batch, dtype=self.dtype)
batch = numpy.reshape(batch, self.shape)
elif self.structure == 'dict':
batch = json.loads(batch)
elif self.structure == 'boolean':
batch = bool(batch)
return batch
def _receiveFromListener(self, quota) -> int:
"""
Receives messages from listener
:param quota: number of messages to receive
:return: number of received messages
"""
assert quota
i = 0
while i < quota:
try:
ident, msg = self.listener.recv_multipart(flags=zmq.NOBLOCK)
if not msg:
# Router probing sends empty message on connection
continue
i += 1
if self.onlyListener and ident not in self.remotesByKeys:
self.peersWithoutRemotes.add(ident)
self._verifyAndAppend(msg, ident)
except zmq.Again:
break
if i > 0:
logger.trace('{} got {} messages through listener'.
format(self, i))
return i
def run(self):
while not self._terminate:
connection_message = None
try:
connection_message = self._socket.recv_multipart(zmq.NOBLOCK)
[tag, json_message] = connection_message
message = json.loads(json_message.decode('utf8'))
if tag == b"queryResponse":
self._bus.resolve_response(message)
else:
handler_thread = ZMQHandlerThread(self._bus, tag.decode('utf-8'), message)
handler_thread.start()
#time.sleep(0.001)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
time.sleep(.001)
pass
elif e.errno == zmq.ETERM:
#print("terminate", self._address)
self._terminate = True
else:
print("message zmq exception:", self._address, e, e.errno)
except Exception as e:
print("message exception:", self._address, e, connection_message)
#print("message thread terminated:", self._address)
def send(output):
zmq_ctx = zmq.Context()
c = zmq_ctx.socket(zmq.PUB)
c.connect(output)
while True:
frame = (output).encode()
c.send(frame, zmq.NOBLOCK)
time.sleep(0.1)
def send(self, frame):
"""
passing the zmq frame to the output's connection
"""
self.connection.send(frame, zmq.NOBLOCK)
def receive_loop(self):
"""
This is the receive loop for Banyan messages.
This method may be overwritten to meet the needs
of the application before handling received messages.
"""
while True:
try:
data = self.subscriber.recv_multipart(zmq.NOBLOCK)
if self.numpy:
payload = msgpack.unpackb(data[1], object_hook=m.decode)
self.incoming_message_processing(data[0].decode(), payload)
else:
self.incoming_message_processing(data[0].decode(), umsgpack.unpackb(data[1]))
# if no messages are available, zmq throws this exception
except zmq.error.Again:
try:
time.sleep(self.loop_time)
except KeyboardInterrupt:
self.clean_up()
raise KeyboardInterrupt
def send(self, data, flags=0, copy=True, track=False):
"""send, which will only block current greenlet
state_changed always fires exactly once (success or fail) at the
end of this method.
"""
# if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
if flags & zmq.NOBLOCK:
try:
msg = super(_Socket, self).send(data, flags, copy, track)
finally:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# ensure the zmq.NOBLOCK flag is part of flags
flags |= zmq.NOBLOCK
while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
try:
# attempt the actual call
msg = super(_Socket, self).send(data, flags, copy, track)
except zmq.ZMQError as e:
# if the raised ZMQError is not EAGAIN, reraise
if e.errno != zmq.EAGAIN:
if not self.__in_send_multipart:
self.__state_changed()
raise
else:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# defer to the event loop until we're notified the socket is writable
self._wait_write()
def recv(self, flags=0, copy=True, track=False):
"""recv, which will only block current greenlet
state_changed always fires exactly once (success or fail) at the
end of this method.
"""
if flags & zmq.NOBLOCK:
try:
msg = super(_Socket, self).recv(flags, copy, track)
finally:
if not self.__in_recv_multipart:
self.__state_changed()
return msg
flags |= zmq.NOBLOCK
while True:
try:
msg = super(_Socket, self).recv(flags, copy, track)
except zmq.ZMQError as e:
if e.errno != zmq.EAGAIN:
if not self.__in_recv_multipart:
self.__state_changed()
raise
else:
if not self.__in_recv_multipart:
self.__state_changed()
return msg
self._wait_read()
def test_topic(self):
s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
s2.setsockopt(zmq.SUBSCRIBE, b'x')
time.sleep(0.1)
msg1 = b'message'
s1.send(msg1)
self.assertRaisesErrno(zmq.EAGAIN, s2.recv, zmq.NOBLOCK)
msg1 = b'xmessage'
s1.send(msg1)
msg2 = s2.recv()
self.assertEqual(msg1, msg2)
def test_again(self):
s = self.context.socket(zmq.REP)
self.assertRaises(Again, s.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
s.close()
def start_listener(self):
print('ZMQ listener started')
while True:
try:
self.s.recv(zmq.NOBLOCK) # note NOBLOCK here
except zmq.Again:
# no message to recv, do other things
time.sleep(0.05)
else:
self.on_q.put(ON_SIGNAL)
def header(self):
flags = 0 if self.block else zmq.NOBLOCK
self.raw_header = self.socket.recv(flags=flags)
return json.loads(self.raw_header.decode("utf-8"))
def next(self, as_json=False):
try:
if self.raw_header:
raw = self.raw_header
self.raw_header = None
else:
flags = 0 if self.block else zmq.NOBLOCK
raw = self.socket.recv(flags=flags, copy=self.zmq_copy, track=self.zmq_track)
self.statistics.bytes_received += len(raw)
if as_json:
return json.loads(raw.decode("utf-8"))
return raw
except zmq.ZMQError:
return None
def flush(self, success=True):
flags = 0 if self.block else zmq.NOBLOCK
# Clear remaining sub-messages
while self.has_more():
try:
self.socket.recv(flags=flags, copy=self.zmq_copy, track=self.zmq_track)
logger.info('Skipping sub-message')
except zmq.ZMQError:
pass
if success:
# Update statistics
self.statistics.total_bytes_received += self.statistics.bytes_received
self.statistics.bytes_received = 0
self.statistics.messages_received += 1
test_zmq_pub_sub.py 文件源码
项目:integration-prototype
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def recv_messages(zmq_subscriber, timeout_count, message_count):
"""Test utility function.
Subscriber thread that receives and counts ZMQ messages.
Args:
zmq_subscriber (zmq.Socket): ZMQ subscriber socket.
timeout_count (int): No. of failed receives until exit.
message_count (int): No. of messages expected to be received.
Returns:
(int) Number of messages received.
"""
# pylint: disable=E1101
fails = 0 # No. of receives that didn't return a message.
receive_count = 0 # Total number of messages received.
while fails < timeout_count:
try:
_ = zmq_subscriber.recv_string(flags=zmq.NOBLOCK)
fails = 0
receive_count += 1
if receive_count == message_count:
break
except zmq.ZMQError as error:
if error.errno == zmq.EAGAIN:
pass
else:
raise
fails += 1
time.sleep(1e-6)
return receive_count
logging_aggregator.py 文件源码
项目:integration-prototype
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def run(self):
"""Run loop.
Receives log messages from connected publishers and logs them via
a python logging interface.
"""
log = logging.getLogger('sip.logging_aggregator')
fail_count = 0
fail_count_limit = 100
# Exponential relaxation of timeout in event loop.
timeout = np.logspace(-6, -2, fail_count_limit)
while not self._stop_requested.is_set():
try:
topic, values = self._subscriber.recv_multipart(zmq.NOBLOCK)
str_values = values.decode('utf-8')
try:
dict_values = json.loads(str_values)
record = logging.makeLogRecord(dict_values)
log.handle(record)
fail_count = 0
except json.decoder.JSONDecodeError:
print('ERROR: Unable to convert JSON log record.')
raise
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
fail_count += 1
else:
raise # Re-raise the exception
if fail_count < fail_count_limit:
_timeout = timeout[fail_count]
else:
_timeout = timeout[-1]
self._stop_requested.wait(_timeout)
def receive(self):
""" Reception and pyobj de-serialization of one message. """
return self.socket.recv_pyobj(zmq.NOBLOCK)
def send_check_address(self, address_name):
""" Send request to check address. """
self.logger.trace('send CHECK_ADDRESS {}'.format(address_name))
try:
self.socket.send_pyobj((DeferredRequestHeaders.CHECK_ADDRESS,
(address_name, )),
zmq.NOBLOCK)
except zmq.error.Again:
self.logger.error('CHECK_ADDRESS not sent')
def send_isolate_addresses(self, address_names):
""" Send request to isolate address. """
self.logger.trace('send ISOLATE_ADDRESSES {}'.format(address_names))
try:
self.socket.send_pyobj((DeferredRequestHeaders.ISOLATE_ADDRESSES,
address_names),
zmq.NOBLOCK)
except zmq.error.Again:
self.logger.error('ISOLATE_ADDRESSES not sent')