python类JoinableQueue()的实例源码

__init__.py 文件源码 项目:cc98 作者: zjuchenyuan 项目源码 文件源码 阅读 79 收藏 0 点赞 0 评论 0
def _producer_multi_threads(queue_task, queue_product, worker_function):
    """
    ??????????????
    :type queue_task: multiprocessing.JoinableQueue
    :type queue_product: multiprocessing.JoinableQueue
    :type worker_function: Callable[[Any], Any]
    """
    while True:
        try:
            task = queue_task.get()
            if isinstance(task, _QueueEndSignal):  # ????
                # finally ?? task_done() ?break??????????
                break
            if isinstance(task, dict):
                result = worker_function(**task)
            elif isinstance(task, (tuple, list)):
                result = worker_function(*task)
            else:
                result = worker_function(task)

            queue_product.put((task, result))
        except:
            traceback.print_exc()
        finally:
            queue_task.task_done()
rollouts.py 文件源码 项目:-NIPS-2017-Learning-to-Run 作者: kyleliang919 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, args):
        self.args = args

        self.tasks = multiprocessing.JoinableQueue()
        self.results = multiprocessing.Queue()

        self.actors = []
        self.actors.append(Actor(self.args, self.tasks, self.results, 9999, args.monitor))

        for i in xrange(self.args.num_threads-1):
            self.actors.append(Actor(self.args, self.tasks, self.results, 37*(i+3), False))

        for a in self.actors:
            a.start()

        # we will start by running 20,000 / 1000 = 20 episodes for the first ieration

        self.average_timesteps_in_episode = 1000
save_article_content.py 文件源码 项目:web_develop 作者: dongweiming 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def use_multiprocessing_with_queue2():
    queue = multiprocessing.JoinableQueue()
    num_consumers = multiprocessing.cpu_count() * 2
    results_queue = multiprocessing.Queue()

    for article in Article.objects.all():
        queue.put(article)

    for _ in range(num_consumers):
        p = multiprocessing.Process(target=save_article_result_with_queue2,
                                    args=(queue, results_queue))
        p.start()

    queue.join()

    results = []

    while 1:
        try:
            updated_article = results_queue.get(timeout=1)
        except Empty:
            break
        results.append(updated_article)
    print len(results)
save_article_content.py 文件源码 项目:web_develop 作者: dongweiming 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def use_multiprocessing_with_queue2():
    queue = multiprocessing.JoinableQueue()
    num_consumers = multiprocessing.cpu_count() * 2
    results_queue = multiprocessing.Queue()

    for article in Article.objects.all()[5:8]:
        queue.put(article)

    for _ in range(num_consumers):
        p = multiprocessing.Process(target=save_article_result_with_queue2,
                                    args=(queue, results_queue))
        p.start()

    queue.join()

    results = []

    while 1:
        try:
            updated_article = results_queue.get(timeout=1)
        except Empty:
            break
        results.append(updated_article)
    print len(results)
sqli-scanner.py 文件源码 项目:sqli-scanner 作者: the-c0d3r 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, inFile, outFile, processcount=None):
        """
        Initiate controller procedure
        :param inFile: the file containing the URLs
        :param outFile: the output file, "result.txt" by default
        """
        try:
            self.urllist = deduplicate(FileReader(inFile).read()).result
            self.workerCount = int(processcount) if processcount else multiprocessing.cpu_count() * 2
            self.taskQ = multiprocessing.JoinableQueue()
            self.resultQ = multiprocessing.Queue()
            self.workers = []
            self.outfile = outFile

            self.start()
            logging.info("[+] All work done, saving file")
        except KeyboardInterrupt:
            pass
        finally:
            self.cleanup()
collectstatic.py 文件源码 项目:django-collectfaster 作者: dreipol 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def set_options(self, **options):
        self.faster = options.pop('faster')
        self.queue_worker_amount = int(options.pop('workers'))
        self.use_multiprocessing = options.pop('use_multiprocessing')

        if self.use_multiprocessing:
            self.task_queue = multiprocessing.JoinableQueue()
            self.worker_spawn_method = self.mp_spawn
        else:
            self.task_queue = GeventQueue()
            self.worker_spawn_method = self.gevent_spawn

        super(Command, self).set_options(**options)

        if self.faster:
            # The original management command of Django collects all the files and calls the post_process method of
            # the storage backend within the same method. Because we are using a task queue, post processing is started
            # before all files were collected.
            self.post_process_original = self.post_process
            self.post_process = False
rollouts.py 文件源码 项目:parallel-trpo 作者: kvfrans 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, args):
        self.args = args

        self.tasks = multiprocessing.JoinableQueue()
        self.results = multiprocessing.Queue()

        self.actors = []
        self.actors.append(Actor(self.args, self.tasks, self.results, 9999, args.monitor))

        for i in xrange(self.args.num_threads-1):
            self.actors.append(Actor(self.args, self.tasks, self.results, 37*(i+3), False))

        for a in self.actors:
            a.start()

        # we will start by running 20,000 / 1000 = 20 episodes for the first ieration

        self.average_timesteps_in_episode = 1000
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_task_done(self):
        queue = self.JoinableQueue()

        workers = [self.Process(target=self._test_task_done, args=(queue,))
                   for i in range(4)]

        for p in workers:
            p.daemon = True
            p.start()

        for i in range(10):
            queue.put(i)

        queue.join()

        for p in workers:
            queue.put(None)

        for p in workers:
            p.join()
insert_data.py 文件源码 项目:bug_report_distributing_server 作者: XY-e 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def insert_all():
    import warnings
    warnings.filterwarnings("ignore")

    queue = multiprocessing.JoinableQueue()

    get_bug_list(queue)

    # print("totally: ", queue.qsize())

    process_num = multiprocessing.cpu_count() * 2

    for i in range(process_num):
        t = multiprocessing.Process(target=get_one_bug, args=(queue,))
        t.start()

    queue.join()

    print("finished")
    save_fail_queue()
TFManager.py 文件源码 项目:TensorFlowOnSpark 作者: yahoo 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def start(authkey, queues, mode='local'):
  """Create a new multiprocess.Manager (or return existing one).

  Args:
    :authkey: string authorization key
    :queues: *INTERNAL_USE*
    :mode: 'local' indicates that the manager will only be accessible from the same host, otherwise remotely accessible.

  Returns:
    A TFManager instance, which is also cached in local memory of the Python worker process.
  """
  global mgr, qdict, kdict
  qdict.clear()
  kdict.clear()
  for q in queues:
    qdict[q] = JoinableQueue()
  TFManager.register('get_queue', callable=lambda qname: qdict[qname])
  TFManager.register('get', callable=lambda key: _get(key))
  TFManager.register('set', callable=lambda key, value: _set(key, value))
  if mode == 'remote':
    mgr = TFManager(address=('',0), authkey=authkey)
  else:
    mgr = TFManager(authkey=authkey)
  mgr.start()
  return mgr
extractor.py 文件源码 项目:hyperbolic-caching 作者: kantai 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def load_articles(worker, num_procs = 64):
    input_file = "enwiki-20080103-pages-articles.xml.bz2"

    q = multiprocessing.JoinableQueue(25000)
    procs = []
    for i in range(num_procs):         
        procs.append( multiprocessing.Process(
            target=worker(q, talker = (i == 0))))
        procs[-1].daemon = True
        procs[-1].start()
    def make_article_callback(aid, t, pc):
        q.put((aid,t,pc))
    sys.stderr.write("starting...\n")
    process(input_file, cb = make_article_callback, lim = None)
    q.join()
    for p in procs:
        q.put( None )
    q.join()
    sys.stderr.write("\n")
resource_score_extractor.py 文件源码 项目:FSquaDRA2 作者: zyrikby 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def calculate_resources_similarity(out_file, in_dir, apk_pairs_file, scorerers, threads, timeout):
    field_names = ["apk1", "apk2", "result"]
    for sc in scorerers:
        field_names.extend(["%s_%s" % (sc.get_name(), RES_TYPE_NAMES[k]) for k in RES_TYPE_NAMES.keys()])

    in_queue = multiprocessing.JoinableQueue(IN_QUEUE_SIZE)
    out_queue = multiprocessing.JoinableQueue(OUT_QUEUE_SIZE)

    queue_populator = QueuePopulatorThread(in_queue, in_dir, apk_pairs_file)
    results_writer = FileWriterThread(out_queue, out_file, tuple(field_names))
    processor = ScorererProcessorPool(in_queue=in_queue, out_queue=out_queue, samples_directory=in_dir, scorerers=scorerers, threads=threads, timeout=timeout)

    queue_populator.start()
    processor.start_processes()
    results_writer.start()

    in_queue.join()    
    queue_populator.join()

    processor.stop_processes()

    out_queue.join()
    results_writer.stop_thread()
    results_writer.join()
Concurrent_AP.py 文件源码 项目:ProjectOfDataMining 作者: IljaNovo 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def compute_responsibilities(hdf5_file, N_columns, damping, N_processes):
    """Organize the computation and update of the responsibility matrix
        for Affinity Propagation clustering with 'damping' as the eponymous 
        damping parameter. Each of the processes concurrently involved in this task 
        is an instance of the class 'Responsibilities_worker' defined above.
    """

    slice_queue = multiprocessing.JoinableQueue()

    pid_list = []
    for i in xrange(N_processes):
        worker = Responsibilities_worker(hdf5_file, '/aff_prop_group',
                   N_columns, damping, slice_queue)
        worker.daemon = True
        worker.start()
        pid_list.append(worker.pid)

    for rows_slice in chunk_generator(N_columns, 8 * N_processes):
        slice_queue.put(rows_slice)

    slice_queue.join()
    slice_queue.close()

    terminate_processes(pid_list)
refactor.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def refactor(self, items, write=False, doctests_only=False,
                 num_processes=1):
        if num_processes == 1:
            return super(MultiprocessRefactoringTool, self).refactor(
                items, write, doctests_only)
        try:
            import multiprocessing
        except ImportError:
            raise MultiprocessingUnsupported
        if self.queue is not None:
            raise RuntimeError("already doing multiple processes")
        self.queue = multiprocessing.JoinableQueue()
        self.output_lock = multiprocessing.Lock()
        processes = [multiprocessing.Process(target=self._child)
                     for i in xrange(num_processes)]
        try:
            for p in processes:
                p.start()
            super(MultiprocessRefactoringTool, self).refactor(items, write,
                                                              doctests_only)
        finally:
            self.queue.join()
            for i in xrange(num_processes):
                self.queue.put(None)
            for p in processes:
                if p.is_alive():
                    p.join()
            self.queue = None
plugins_process_pool.py 文件源码 项目:midip-sslyze 作者: soukupa5 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, available_plugins, network_retries=DEFAULT_NETWORK_RETRIES,
                 network_timeout=DEFAULT_NETWORK_TIMEOUT,
                 max_processes_nb=DEFAULT_MAX_PROCESSES_NB,
                 max_processes_per_hostname_nb=DEFAULT_PROCESSES_PER_HOSTNAME_NB):
        """
        Args:
            available_plugins (PluginsFinder): An object encapsulating the list of available plugins.
            network_retries (Optional[int)]: How many times plugins should retry a connection that timed out.
            network_timeout (Optional[int]): The time until an ongoing connection times out within all plugins.
            max_processes_nb (Optional[int]): The maximum number of processes to spawn for running scans concurrently.
            max_processes_per_hostname_nb (Optional[int]): The maximum of processes that can be used for running scans
                concurrently on a single server.

        Returns:
            PluginsProcessPool: An object for queueing scan commands to be run concurrently.

        """

        self._available_plugins = available_plugins
        self._network_retries = network_retries
        self._network_timeout = network_timeout
        self._max_processes_nb = max_processes_nb
        self._max_processes_per_hostname_nb = max_processes_per_hostname_nb

        # Create hostname-specific queues to ensure aggressive scan commands targeting this hostname are never
        # run concurrently
        self._hostname_queues_dict = {}
        self._processes_dict = {}

        self._task_queue = JoinableQueue()  # Processes get tasks from task_queue and
        self._result_queue = JoinableQueue()  # put the result of each task in result_queue
        self._queued_tasks_nb = 0
plugins_process_pool.py 文件源码 项目:midip-sslyze 作者: soukupa5 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _check_and_create_process(self, hostname):
        if hostname not in self._hostname_queues_dict.keys():
            # We haven't this hostname before
            if self._get_current_processes_nb() < self._max_processes_nb:
                # Create a new process and new queue for this hostname
                hostname_queue = JoinableQueue()
                self._hostname_queues_dict[hostname] = hostname_queue

                process = WorkerProcess(hostname_queue, self._task_queue, self._result_queue,
                                        self._available_plugins.get_commands(), self._network_retries,
                                        self._network_timeout)
                process.start()
                self._processes_dict[hostname] = [process]
            else:
                # We are already using the maximum number of processes
                # Do not create a process and re-use a random existing hostname queue
                self._hostname_queues_dict[hostname] = random.choice(self._hostname_queues_dict.values())
                self._processes_dict[hostname] = []

        else:
            # We have seen this hostname before - create a new process if possible
            if len(self._processes_dict[hostname]) < self._max_processes_per_hostname_nb \
                    and self._get_current_processes_nb() < self._max_processes_nb:
                # We can create a new process; no need to create a queue as it already exists
                process = WorkerProcess(self._hostname_queues_dict[hostname], self._task_queue, self._result_queue,
                                        self._available_plugins.get_commands(), self._network_retries,
                                        self._network_timeout)
                process.start()
                self._processes_dict[hostname].append(process)
test_ringbuffer.py 文件源码 项目:ringbuffer 作者: bslatkin 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def new_queue(self):
        return multiprocessing.JoinableQueue()
woker.py 文件源码 项目:waterflowers 作者: chaodalong 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, worker_nums=None, callback=None):
        if worker_nums is not None:
            self.__worker_nums = worker_nums

        self.master_pid = os.getpid()
        self.queue = multiprocessing.JoinableQueue()
        self.__callback = callback

        # create worker
        self.__create_worker()
woker.py 文件源码 项目:waterflowers 作者: chaodalong 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, worker_nums=None, callback=None):
        if worker_nums is not None:
            self.__worker_nums = worker_nums

        self.master_pid = os.getpid()
        self.queue = multiprocessing.JoinableQueue()
        self.__callback = callback

        # create worker
        self.__create_worker()
__init__.py 文件源码 项目:cc98 作者: zjuchenyuan 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _producer_multi_processes(queue_task,
                              queue_product,
                              threads_per_process,
                              worker_function):
    """
    ???????????????

    :type queue_task: multiprocessing.JoinableQueue
    :type queue_product: multiprocessing.JoinableQueue
    :type threads_per_process: int
    :type worker_function: Callable[[Any], Any]
    """
    _queue_task = queue.Queue(maxsize=threads_per_process)
    _queue_product = queue.Queue()

    pool = [threading.Thread(target=_producer_multi_threads, args=(_queue_task, _queue_product, worker_function))
            for _ in range(threads_per_process)]
    for t in pool:
        t.daemon = True
        t.start()

    th = threading.Thread(target=_subprocesses_queue_transfer, args=(queue_task, _queue_task))
    th.daemon = True
    th.start()

    th = threading.Thread(target=_subprocesses_queue_transfer, args=(_queue_product, queue_product))
    th.daemon = True
    th.start()

    # ?????????
    for t in pool:
        t.join()
        logger.debug("subthread {} of {} stopped".format(t.name, multiprocessing.current_process().name))
    logger.debug("subprocess {} completed".format(multiprocessing.current_process().name))
multiprocessing_utils.py 文件源码 项目:warriorframework 作者: warriorframework 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def create_and_start_process_with_queue(target_module, args_dict, jobs_list, output_q, p_name=''):
    """Creates python multiprocesses for the provided target module with the
    provided arguments and  starts them

    Arguments:
    1. target_module = module for which multiple processes has to be started
    2. args_list = list of arguments to be passed to the target module
    3. jobs_list = list of process created
    4. output_q  = multiprocessing.Queue object to handle returns from the target module
    """

    # THis is to handle the first process when
    # output_q wll be none,create a new q and use the
    # same q for all instances of process started
    if output_q is None:
        # output_q = multiprocessing.JoinableQueue()
        output_q = multiprocessing.Manager().Queue()

    args_dict["output_q"] = output_q

    # now we need to convert the args_dict into
    # a tuple so first create a listout of the dict
    # and then convert the list into a tuple
    args_list = []
    for _, value in args_dict.iteritems():
        args_list.append(value)
    args_tuple = tuple(args_list)

    process = multiprocessing.Process(name=p_name, target=target_module, args=args_tuple)
    jobs_list.append(process)

    process.start()

    return process, jobs_list, output_q
dtforchestrator.py 文件源码 项目:distributed-tensorflow-orchestration 作者: ct-clmsn 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def launch_mesos_tf(marathon_url_str, tsknom_str, cpu_float, mem_float, ntasks_int, uri_str, marathon_usr, marathon_usrpwd, localhost_str, mxattempts=10):
   toret_nodes = dict()

   docker = False
   if uri_str.find('docker') > -1:
      uri_str = uri_str.replace('docker://', '')
      docker = True

   uri_str = uri_str.rstrip('/')
   marathon_url_str = marathon_url_str.rstrip('/') 

   counter = 0
   tq = JoinableQueue()
   q = Queue()
   plist = list()

   consumers = [ Consumer(tq, q) for i in xrange(ntasks_int) ]
   for c in consumers:
      c.start()

   for i in xrange(ntasks_int):
      tq.put(Task(post_marathon_tasks, (marathon_url_str, tsknom_str, cpu_float, mem_float, i+1, ntasks_int, uri_str, marathon_usr, marathon_usrpwd, localhost_str, mxattempts, docker)))

   for i in xrange(ntasks_int):
      tq.put(None)

   tq.join()

   for i in xrange(1, ntasks_int+1):
      toret_nodes[i] = q.get()

   return toret_nodes
processqueue.py 文件源码 项目:core-python 作者: yidao620c 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def demo():
    q = multiprocessing.JoinableQueue()  # ??????????
    cons_p = multiprocessing.Process(target=consumer, args=(q, ))
    cons_p.daemon = True
    cons_p.start()

    seq = [1, 2, 3, 4, 5]
    producer(seq, q)

    q.join()  # ?????????????????????????
sabo.py 文件源码 项目:sabo 作者: tokers 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def sabo_init(conf):
    sabo_log_init(conf["base"]["log_path"])
    sabo_error_log("info", "sabo start...")
    task_queue = multiprocessing.JoinableQueue()
    result_queue = multiprocessing.JoinableQueue()

    return task_queue, result_queue
mpEngineProdCons.py 文件源码 项目:appcompatprocessor 作者: mbevilacqua 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def __init__(self, maxCores, producer_Class, consumer_Class, governorOffFlag = False):
        logger.debug("mpEngine initializing")
        self.governorOffFlag = governorOffFlag
        self.maxCores = maxCores
        self.__deleting__ = False
        self.__internalLock__ = multiprocessing.Lock()
        self.killed_event = multiprocessing.Event()

        # Producers
        self.num_producers = 0
        self.next_worker_num = 0
        self.producer_Class = producer_Class
        self.producer_pool = []
        self.producer_pool_exitEvent = []
        self.producer_task_queue = multiprocessing.JoinableQueue()
        self.producer_results_queue = multiprocessing.JoinableQueue()
        self.producer_pool_progress = multiprocessing.Value('i', 0)

        # Consumers
        self.num_consumers = 0
        self.next_consumer_num = 0
        self.consumer_Class = consumer_Class
        self.consumer_pool = []
        # Note: consumer_pool_exitEvent is used both to notify a worker it should end and for the worker to notify it has dones so
        self.consumer_pool_exitEvent = []
        self.consumer_task_queue = self.producer_results_queue
        self.consumer_results_queue = multiprocessing.JoinableQueue()
        self.consumer_pool_progress = multiprocessing.Value('i', 0)

        # Tasks
        self.num_tasks = multiprocessing.Value('i', 0)
        self.tasks_added = False

        # Rebalance checks
        self._rebalance_last_kick = datetime.now()
        self.rebalance_backoff_timer = 60 * 1
        self._rebalance_mem_last_kick = datetime.now()
        self.rebalance_mem_backoff_timer = 60 * 2
refactor.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def refactor(self, items, write=False, doctests_only=False,
                 num_processes=1):
        if num_processes == 1:
            return super(MultiprocessRefactoringTool, self).refactor(
                items, write, doctests_only)
        try:
            import multiprocessing
        except ImportError:
            raise MultiprocessingUnsupported
        if self.queue is not None:
            raise RuntimeError("already doing multiple processes")
        self.queue = multiprocessing.JoinableQueue()
        self.output_lock = multiprocessing.Lock()
        processes = [multiprocessing.Process(target=self._child)
                     for i in range(num_processes)]
        try:
            for p in processes:
                p.start()
            super(MultiprocessRefactoringTool, self).refactor(items, write,
                                                              doctests_only)
        finally:
            self.queue.join()
            for i in range(num_processes):
                self.queue.put(None)
            for p in processes:
                if p.is_alive():
                    p.join()
            self.queue = None
save_article_content.py 文件源码 项目:web_develop 作者: dongweiming 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def use_multiprocessing_with_queue():
    queue = multiprocessing.JoinableQueue()
    num_consumers = multiprocessing.cpu_count() * 2

    for article in Article.objects.all():
        queue.put(article)

    for _ in range(num_consumers):
        p = multiprocessing.Process(target=save_article_result_with_queue,
                                    args=(queue,))
        p.start()

    queue.join()
save_article_content.py 文件源码 项目:web_develop 作者: dongweiming 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def use_multiprocessing_with_queue():
    queue = multiprocessing.JoinableQueue()
    num_consumers = multiprocessing.cpu_count() * 2

    for article in Article.objects.all()[:4]:
        queue.put(article)

    for _ in range(num_consumers):
        p = multiprocessing.Process(target=save_article_result_with_queue,
                                    args=(queue,))
        p.start()

    queue.join()
local_executor.py 文件源码 项目:incubator-airflow-old 作者: apache 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def start(self):
            self.executor.queue = multiprocessing.JoinableQueue()

            self.executor.workers = [
                QueuedLocalWorker(self.executor.queue, self.executor.result_queue)
                for _ in range(self.executor.parallelism)
            ]

            self.executor.workers_used = len(self.executor.workers)

            for w in self.executor.workers:
                w.start()
parallel.py 文件源码 项目:luna16 作者: gzuidhof 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __iter__(self):
        queue = JoinableQueue(maxsize=self.max_queue_size)

        n_batches, job_queue = self._start_producers(queue)

        # Run as consumer (read items from queue, in current thread)
        for x in xrange(n_batches):
            item = queue.get()
            #print queue.qsize(), "GET"
            yield item # Yield the item to the consumer (user)
            queue.task_done()

        queue.close()
        job_queue.close()


问题


面经


文章

微信
公众号

扫码关注公众号