python类Queue()的实例源码

background_thread.py 文件源码 项目:opencensus-python 作者: census-instrumentation 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _get_items(self):
        """Get multiple items from a Queue.

        Gets at least one (blocking) and at most ``max_items`` items
        (non-blocking) from a given Queue. Does not mark the items as done.

        :rtype: Sequence
        :returns: A sequence of items retrieved from the queue.
        """
        items = [self._queue.get()]

        while len(items) < self._max_batch_size:
            try:
                items.append(self._queue.get_nowait())
            except queue.Empty:
                break

        return items
interactive_debugger_server_lib.py 文件源码 项目:tensorboard 作者: tensorflow 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, receive_port):
    """Receives health pills from a debugger and writes them to disk.

    Args:
      receive_port: The port at which to receive health pills from the
        TensorFlow debugger.
      always_flush: A boolean indicating whether the EventsWriter will be
        flushed after every write. Can be used for testing.
    """
    super(InteractiveDebuggerDataServer, self).__init__(
        receive_port, InteractiveDebuggerDataStreamHandler)

    self._incoming_channel = queue.Queue()
    self._outgoing_channel = comm_channel_lib.CommChannel()
    self._run_states = RunStates(breakpoints_func=lambda: self.breakpoints)
    self._tensor_store = tensor_store_lib.TensorStore()

    curried_handler_constructor = functools.partial(
        InteractiveDebuggerDataStreamHandler,
        self._incoming_channel, self._outgoing_channel, self._run_states,
        self._tensor_store)
    grpc_debug_server.EventListenerBaseServicer.__init__(
        self, receive_port, curried_handler_constructor)
retrying_executor_test.py 文件源码 项目:task_processing 作者: Yelp 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_retry_loop_does_not_retry_task(mock_retrying_executor):
    mock_event = _get_mock_event(is_terminal=True)
    mock_retrying_executor.stopping = True
    mock_retrying_executor._is_current_attempt = mock.Mock(return_value=True)
    mock_retrying_executor.retry = mock.Mock(return_value=False)
    mock_retrying_executor.retry_pred = mock.Mock(return_value=False)
    mock_retrying_executor.task_retries = mock_retrying_executor.\
        task_retries.set(mock_event.task_id, 1)
    modified_task_id = mock_event.task_id + '-retry1'
    modified_mock_event = mock_event.set(
        'task_id',
        modified_task_id
    )
    mock_retrying_executor.src_queue = Queue()
    mock_retrying_executor.src_queue.put(modified_mock_event)

    mock_retrying_executor.retry_loop()

    assert mock_retrying_executor.dest_queue.qsize() == 1
    assert len(mock_retrying_executor.task_retries) == 0
retrying_executor.py 文件源码 项目:task_processing 作者: Yelp 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self,
                 downstream_executor,
                 retry_pred=lambda e: not e.success,
                 retries=3):
        self.executor = downstream_executor
        self.retries = retries
        self.retry_pred = retry_pred

        self.task_retries = m()
        self.task_retries_lock = Lock()

        self.src_queue = downstream_executor.get_event_queue()
        self.dest_queue = Queue()
        self.stopping = False

        self.retry_thread = Thread(target=self.retry_loop)
        self.retry_thread.daemon = True
        self.retry_thread.start()
timeout_executor.py 文件源码 项目:task_processing 作者: Yelp 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, downstream_executor):
        self.downstream_executor = downstream_executor

        self.tasks_lock = Lock()
        # Tasks that are pending termination
        self.killed_tasks = []
        # Tasks that are currently running
        self.running_tasks = []

        self.src_queue = downstream_executor.get_event_queue()
        self.dest_queue = Queue()
        self.stopping = False

        self.timeout_thread = Thread(target=self.timeout_loop)
        self.timeout_thread.daemon = True
        self.timeout_thread.start()
nikon.py 文件源码 项目:sequoia-ptpy 作者: Parrot-Developers 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def session(self):
        '''
        Manage Nikon session with context manager.
        '''
        # When raw device, do not perform
        if self.__no_polling:
            with super(Nikon, self).session():
                yield
            return
        # Within a normal PTP session
        with super(Nikon, self).session():
            # launch a polling thread
            self.__event_queue = Queue()
            self.__nikon_event_proc = Thread(
                name='NikonEvtPolling',
                target=self.__nikon_poll_events
            )
            self.__nikon_event_proc.daemon = False
            atexit.register(self._nikon_shutdown)
            self.__nikon_event_proc.start()

            try:
                yield
            finally:
                self._nikon_shutdown()
ip.py 文件源码 项目:sequoia-ptpy 作者: Parrot-Developers 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, device=None):
        '''Instantiate the first available PTP device over IP'''
        self.__setup_constructors()
        logger.debug('Init IP')

        self.__dev = device
        if device is None:
            raise NotImplementedError(
                'IP discovery not implemented. Please provide a device.'
            )
        self.__device = device

        # Signal usable implicit session
        self.__implicit_session_open = Event()
        # Signal implicit session is shutting down
        self.__implicit_session_shutdown = Event()

        self.__check_session_lock = Lock()
        self.__transaction_lock = Lock()

        self.__event_queue = Queue()

        atexit.register(self._shutdown)
event_source.py 文件源码 项目:rqalpha 作者: ricequant 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, fps, mod_config):
        self._env = Environment.get_instance()
        self.mod_config = mod_config
        self.fps = fps
        self.event_queue = Queue()

        self.before_trading_fire_date = datetime.date(2000, 1, 1)
        self.after_trading_fire_date = datetime.date(2000, 1, 1)
        self.settlement_fire_date = datetime.date(2000, 1, 1)

        if not mod_config.redis_uri:
            self.quotation_engine_thread = Thread(target=self.quotation_worker)
            self.quotation_engine_thread.daemon = True

        self.clock_engine_thread = Thread(target=self.clock_worker)
        self.clock_engine_thread.daemon = True
concurrency.py 文件源码 项目:ternarynet 作者: czhu95 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, predictors, batch_size=5):
        """ :param predictors: a list of OnlinePredictor"""
        assert len(predictors)
        for k in predictors:
            #assert isinstance(k, OnlinePredictor), type(k)
            # TODO use predictors.return_input here
            assert k.return_input == False
        self.input_queue = queue.Queue(maxsize=len(predictors)*100)
        self.threads = [
            PredictorWorkerThread(
                self.input_queue, f, id, batch_size=batch_size)
            for id, f in enumerate(predictors)]

        if six.PY2:
            # TODO XXX set logging here to avoid affecting TF logging
            import tornado.options as options
            options.parse_command_line(['--logging=debug'])
future_full_pipeline.py 文件源码 项目:python-dse-driver 作者: datastax 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run(self):
        futures = queue.Queue(maxsize=121)

        self.start_profile()

        for i in range(self.num_queries):
            if i >= 120:
                old_future = futures.get_nowait()
                old_future.result()

            key = "{}-{}".format(self.thread_num, i)
            future = self.run_query(key)
            futures.put_nowait(future)

        while True:
            try:
                futures.get_nowait().result()
            except queue.Empty:
                break

        self.finish_profile
future_batches.py 文件源码 项目:python-dse-driver 作者: datastax 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def run(self):
        futures = queue.Queue(maxsize=121)

        self.start_profile()

        for i in range(self.num_queries):
            if i > 0 and i % 120 == 0:
                # clear the existing queue
                while True:
                    try:
                        futures.get_nowait().result()
                    except queue.Empty:
                        break

            key = "{0}-{1}".format(self.thread_num, i)
            future = self.run_query(key)
            futures.put_nowait(future)

        while True:
            try:
                futures.get_nowait().result()
            except queue.Empty:
                break

        self.finish_profile()
asynchronous.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __call__(self, event):
        group = self._group_by(event)
        try:
            queue = self._queues[group]
        except KeyError:
            queue = six_queue.Queue(self._queue_depth)
            self._queues[group] = queue
            thread = self._thread_group.add_thread(self._run, group, queue)
            thread.link(self._done, group)
        queue.put(event)
a3c.py 文件源码 项目:human-rl 作者: gsastry 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, env, policy, num_local_steps, render=True):
        threading.Thread.__init__(self)
        self.queue = queue.Queue(5)
        self.num_local_steps = num_local_steps
        self.env = env
        self.last_features = None
        self.policy = policy
        self.daemon = True
        self.sess = None
        self.summary_writer = None
        self.render = render
a3c.py 文件源码 项目:human-rl 作者: gsastry 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, env, policy, num_local_steps, render=True):
        threading.Thread.__init__(self)
        self.queue = queue.Queue(5)
        self.num_local_steps = num_local_steps
        self.env = env
        self.last_features = None
        self.policy = policy
        self.daemon = True
        self.sess = None
        self.summary_writer = None
        self.render = render
threaded_queue.py 文件源码 项目:cloud-volume 作者: seung-lab 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, n_threads, queue_size=0):
    self._n_threads = n_threads

    self._queue = Queue.Queue(maxsize=queue_size) # 0 = infinite size
    self._error_queue = Queue.Queue(maxsize=queue_size)
    self._threads = ()
    self._terminate = threading.Event()

    self._processed_lock = threading.Lock()
    self.processed = 0
    self._inserted = 0

    self.with_progress = None

    self.start_threads(n_threads)
threaded_queue.py 文件源码 项目:cloud-volume 作者: seung-lab 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def _consume_queue(self, terminate_evt):
    """
    This is the main thread function that consumes functions that are
    inside the _queue object. To use, execute self._queue(fn), where fn
    is a function that performs some kind of network IO or otherwise
    benefits from threading and is independent.

    terminate_evt is automatically passed in on thread creation and 
    is a common event for this generation of threads. The threads
    will terminate when the event is set and the queue burns down.

    Returns: void
    """
    interface = self._initialize_interface()

    while not terminate_evt.is_set():
      try:
        fn = self._queue.get(block=True, timeout=0.01)
      except Queue.Empty:
        continue # periodically check if the thread is supposed to die

      fn = partial(fn, interface)

      try:
        self._consume_queue_execution(fn)
      except Exception as err:
        self._error_queue.put(err)

    self._close_interface(interface)
threaded_queue.py 文件源码 项目:cloud-volume 作者: seung-lab 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def _check_errors(self):
    try:
      err = self._error_queue.get(block=False) 
      self._error_queue.task_done()
      self.kill_threads()
      raise err
    except Queue.Empty:
      pass
connectionpools.py 文件源码 项目:cloud-volume 作者: seung-lab 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self):
        self.pool = Queue.Queue(maxsize=0)
        self.outstanding = 0
        self._lock = threading.Lock()

        def handler(signum, frame):
            self.reset_pool()

        signal.signal(signal.SIGINT, handler)
        signal.signal(signal.SIGTERM, handler)
connectionpools.py 文件源码 项目:cloud-volume 作者: seung-lab 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def get_connection(self):    
        with self._lock:
            try:        
                conn = self.pool.get(block=False)
                self.pool.task_done()
            except Queue.Empty:
                conn = self._create_connection()
            finally:
                self.outstanding += 1

        return conn
test_sender.py 文件源码 项目:kinesis_producer 作者: ludia 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_init(config):
    q = queue.Queue()
    accumulator = RecordAccumulator(RawBuffer, config)
    client = mock.Mock()

    sender = Sender(queue=q, accumulator=accumulator,
                    client=client, partitioner=partitioner)
    sender.start()
    sender.close()
    sender.join()


问题


面经


文章

微信
公众号

扫码关注公众号