def stop(self):
"""Stop the greenlet workers and empty all queues."""
with self._state_change:
if not self._running:
return
self._running = False
for queue in (self.callback_queue,):
queue.put(_STOP)
while self._workers:
worker = self._workers.pop()
worker.join()
# Clear the queues
self.callback_queue = Queue() # pragma: nocover
python2atexit.unregister(self.stop)
python类put()的实例源码
def stop(self):
"""Stop the greenlet workers and empty all queues."""
with self._state_change:
if not self._running:
return
self._running = False
for queue in (self.callback_queue,):
queue.put(_STOP)
while self._workers:
worker = self._workers.pop()
worker.join()
# Clear the queues
self.callback_queue = Queue() # pragma: nocover
python2atexit.unregister(self.stop)
def _app():
class Broker:
def subscribe(self, subscriber):
for idx, _ in enumerate(LIFECYCLE_EVENTS):
subscriber.put(event(idx))
subscriber.put(StopIteration)
def unsubscribe(self, queue):
queue.put(StopIteration)
app = vadvisor.app.rest.app
broker = Broker()
app.eventBroker = broker
app.eventStore = InMemoryStore()
q = queue.Queue()
broker.subscribe(q)
for element in q:
app.eventStore.put(element)
return app
def dispatch_callback(self, callback):
"""Dispatch to the callback object
The callback is put on separate queues to run depending on the
type as documented for the :class:`SequentialGeventHandler`.
"""
self.callback_queue.put(lambda: callback.func(*callback.args))
def init_queue_with_item(queue, item=None):
# drain out queue
while queue.qsize() > 0:
queue.get()
if item:
queue.put(item)
def dispatch_callback(self, callback):
"""Dispatch to the callback object
The callback is put on separate queues to run depending on the
type as documented for the :class:`SequentialGeventHandler`.
"""
self.callback_queue.put(lambda: callback.func(*callback.args))