def reset(self):
self.status = READY
self._num_recv = 0
self._drained = False
def run():
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.set_hwm(32)
socket.RCVTIMEO = 1
socket.connect(self._address)
while self.status != DRAINED:
try:
packet = socket.recv(copy=False)
sample = msgpack.loads(packet)
except zmq.error.Again:
sleep(0.1)
sample = None
if sample == b'END':
self._drained = True
elif sample is not None:
self._num_recv += 1
sample['__process_id__'] = mp.current_process().name
sample['__recv_count__'] = self._num_recv
self._queue.put(sample)
socket.close()
self._thread = threading.Thread(target=run)
self._thread.start()
评论列表
文章目录