mpi.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:odin 作者: imito 项目源码 文件源码
def _init_zmq(self):
    # this is ugly but work well
    import zmq

    # tasks_or_queue only return the indices, need to get it from self._jobs
    def wrapped_map(pID, tasks, remain_jobs):
      # ====== create ZMQ socket ====== #
      ctx = zmq.Context()
      sk = ctx.socket(zmq.PAIR)
      sk.set(zmq.SNDHWM, self._hwm)
      sk.set(zmq.LINGER, -1)
      sk.bind("ipc:///tmp/%d" % (self._ID + pID))

      # ====== Doing the jobs ====== #
      t = tasks.get()
      while t is not None:
        # `t` is just list of indices
        t = [self._jobs[i] for i in t]
        # monitor current number of remain jobs
        remain_jobs.add(-len(t))
        if self._batch == 1: # batch=1, NO need for list of inputs
          ret = self._func(t[0])
        else: # we have input is list of inputs here
          ret = self._func(t)
        # if a generator is return, traverse through the
        # iterator and return each result
        if not isinstance(ret, types.GeneratorType):
          ret = (ret,)
        for r in ret:
          # ignore None values
          if r is not None:
            sk.send_pyobj(r)
        # delete old data (this work, checked)
        del ret
        # ge tne tasks
        t = tasks.get()
      # ending signal
      sk.send_pyobj(None)
      # wait for ending message
      sk.recv()
      sk.close()
      ctx.term()
      sys.exit(0)
    # ====== start the processes ====== #
    self._processes = [Process(target=wrapped_map,
                               args=(i, self._tasks, self._remain_jobs))
                       for i in range(self._ncpu)]
    [p.start() for p in self._processes]
    # ====== pyzmq PULL socket ====== #
    ctx = zmq.Context()
    sockets = []
    for i in range(self._ncpu):
      sk = ctx.socket(zmq.PAIR)
      sk.set(zmq.RCVHWM, 0) # no limit receiving
      sk.connect("ipc:///tmp/%d" % (self._ID + i))
      sockets.append(sk)
    self._ctx = ctx
    self._sockets = sockets
    self._zmq_noblock = zmq.NOBLOCK
    self._zmq_again = zmq.error.Again
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号