def get_messages(self, timeout=0.1, count=1):
started = time()
sleep_time = timeout / 10.0
while count:
try:
msg = self.subscriber.recv_multipart(copy=True, flags=zmq.NOBLOCK)
except zmq.Again:
if time() - started > timeout:
break
sleep(sleep_time)
else:
partition_seqno, global_seqno = unpack(">II", msg[2])
seqno = global_seqno if self.count_global else partition_seqno
if not self.counter:
self.counter = seqno
elif self.counter != seqno:
if self.seq_warnings:
self.logger.warning("Sequence counter mismatch: expected %d, got %d. Check if system "
"isn't missing messages." % (self.counter, seqno))
self.counter = None
yield msg[1]
count -= 1
if self.counter:
self.counter += 1
self.stats[self.stat_key] += 1
评论列表
文章目录