def _init_zmq(self):
# this is ugly but work well
import zmq
# tasks_or_queue only return the indices, need to get it from self._jobs
def wrapped_map(pID, tasks, remain_jobs):
# ====== create ZMQ socket ====== #
ctx = zmq.Context()
sk = ctx.socket(zmq.PAIR)
sk.set(zmq.SNDHWM, self._hwm)
sk.set(zmq.LINGER, -1)
sk.bind("ipc:///tmp/%d" % (self._ID + pID))
# ====== Doing the jobs ====== #
t = tasks.get()
while t is not None:
# `t` is just list of indices
t = [self._jobs[i] for i in t]
# monitor current number of remain jobs
remain_jobs.add(-len(t))
if self._batch == 1: # batch=1, NO need for list of inputs
ret = self._func(t[0])
else: # we have input is list of inputs here
ret = self._func(t)
# if a generator is return, traverse through the
# iterator and return each result
if not isinstance(ret, types.GeneratorType):
ret = (ret,)
for r in ret:
# ignore None values
if r is not None:
sk.send_pyobj(r)
# delete old data (this work, checked)
del ret
# ge tne tasks
t = tasks.get()
# ending signal
sk.send_pyobj(None)
# wait for ending message
sk.recv()
sk.close()
ctx.term()
sys.exit(0)
# ====== start the processes ====== #
self._processes = [Process(target=wrapped_map,
args=(i, self._tasks, self._remain_jobs))
for i in range(self._ncpu)]
[p.start() for p in self._processes]
# ====== pyzmq PULL socket ====== #
ctx = zmq.Context()
sockets = []
for i in range(self._ncpu):
sk = ctx.socket(zmq.PAIR)
sk.set(zmq.RCVHWM, 0) # no limit receiving
sk.connect("ipc:///tmp/%d" % (self._ID + i))
sockets.append(sk)
self._ctx = ctx
self._sockets = sockets
self._zmq_noblock = zmq.NOBLOCK
self._zmq_again = zmq.error.Again
评论列表
文章目录