core.py 文件源码

python
阅读 29 收藏 0 点赞 0 评论 0

项目:clopure 作者: vbkaisetsu 项目源码 文件源码
def clopure_iter_mp_split(self, fn, local_vars):
        def iter_split_generator(*g):
            q_in = Queue()
            q_out = Queue()
            exit_input_thread = False
            semaphore = Semaphore(self.queue_size)
            ps = [Process(target=self.iter_split_evaluate_wrapper, args=(fn, local_vars, len(g), q_in, q_out)) for i in range(self.procs)]
            for p in ps:
                p.start()
            def input_thread():
                try:
                    for i, item in enumerate(zip(*g)):
                        semaphore.acquire()
                        if exit_input_thread:
                            return
                        q_in.put((i, item))
                except BaseException:
                    traceback.print_exc(file=sys.stdout)
                for i in range(self.procs):
                    q_in.put((0, EOFMessage))

            t = Thread(target=input_thread)
            t.start()
            cur = 0
            n_working_procs = self.procs
            l = [None] * self.queue_size
            while True:
                k, data = q_out.get()
                if data is EOFMessage:
                    n_working_procs -= 1
                    if n_working_procs == 0:
                        break
                    continue
                l[k - cur] = (k, data)
                while l[0]:
                    yield l.pop(0)[1]
                    l.append(None)
                    cur += 1
                    semaphore.release()
            exit_input_thread = True
            semaphore.release()
        return iter_split_generator
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号