def put(self, request):
"""
Handles the adding to the worker queue
"""
# Determine if we need to spin a worker or not
self._work_tracker.lock.acquire(blocking=True)
if len(self._work_tracker.available) == 0:
if len(self._work_tracker) < self._settings\
.nntp_processing['threads']:
# Spin up more work
self.spawn_workers(count=1)
# Append to Queue for processing
self._work_queue.put(request)
# Release our lock
self._work_tracker.lock.release()
python类Queue()的实例源码
def _concurrent_execute(self, context, start_req, parser, pool, pool_size):
queue = Queue() # ????
# ????????????
for r in start_req:
queue.put_nowait(r)
if pool is None:
pool = GeventPool(pool_size)
greenlets = []
while True:
try:
req = self._check_req(queue.get(timeout=1))
if req.parser is None:
req.parser = parser
greenlets.append(pool.spawn(req, context, queue))
except Empty:
break
return [greenlet.get() for greenlet in greenlets]
def test_queue(self):
"""???????????Queue"""
task_queue = Queue()
def worker(name):
while not task_queue.empty():
task = task_queue.get()
_log.info('Worker %s got task %s' % (name, task))
gevent.sleep(0)
_log.info('Quitting time!')
def boss():
for i in xrange(1,25):
task_queue.put_nowait(i)
gevent.spawn(boss).join()
gevent.joinall([
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'nancy'),
])
def stop(self):
"""Stop the greenlet workers and empty all queues."""
with self._state_change:
if not self._running:
return
self._running = False
for queue in (self.callback_queue,):
queue.put(_STOP)
while self._workers:
worker = self._workers.pop()
worker.join()
# Clear the queues
self.callback_queue = Queue() # pragma: nocover
python2atexit.unregister(self.stop)
def stream_topic(topic_name):
"""
GET /api/<version>/topic_stream/<topic_name>
Stream a topic over HTTP by keeping the http connection alive.
"""
topic_name = "/" + topic_name
try:
msg_class, real_topic, _ = rostopic.get_topic_class(topic_name)
except rostopic.ROSTopicIOException as e:
raise e
if not real_topic:
return error("Topic does not exist", 404)
queue = Queue(5)
def callback(dataIn, queue=queue):
data = getattr(dataIn, "data", None)
if data is None:
data = {"header": getattr(dataIn, "header"), "status": getattr(dataIn, "status")}
queue.put(data)
sub = rospy.Subscriber(real_topic, msg_class, callback)
def gen(queue=queue):
while True:
x = queue.get()
yield str(x) + "\n"
return Response(gen(), mimetype='text/plain')
def mocked_rpc_server():
class MockedRpcServer(object):
queue = Queue()
outbox = []
def __init__(self, host, port):
pass
@classmethod
def mocked_send(cls, message):
cls.queue.put(message.serialize())
def recv(self):
results = self.queue.get()
return Message.unserialize(results)
def send(self, message):
self.outbox.append(message.serialize())
return MockedRpcServer
def get_weibo_users_timeline_async(self, id_str):
def get_timeline_data(api_account):
while not tasks.empty():
client = WeiboAPIService(appKey=api_account[1], appSecret=api_account[2], token=api_account[3])
id = tasks.get_nowait()
data.put_nowait(client.get_weibo_user_timeline(id))
result_data = []
data = Queue()
tasks = Queue()
for id in id_str.split(",")[0:10]:
tasks.put_nowait(id)
# ?????api??
if self.api_accounts == None:
self.api_accounts = self.weiboDAO.get_weibo_accounts()
threads = []
for account in self.api_accounts:
threads.append(gevent.spawn(get_timeline_data,account))
gevent.joinall(threads)
while not data.empty():
result_data.append(data.get_nowait())
return result_data
def set_options(self, **options):
self.faster = options.pop('faster')
self.queue_worker_amount = int(options.pop('workers'))
self.use_multiprocessing = options.pop('use_multiprocessing')
if self.use_multiprocessing:
self.task_queue = multiprocessing.JoinableQueue()
self.worker_spawn_method = self.mp_spawn
else:
self.task_queue = GeventQueue()
self.worker_spawn_method = self.gevent_spawn
super(Command, self).set_options(**options)
if self.faster:
# The original management command of Django collects all the files and calls the post_process method of
# the storage backend within the same method. Because we are using a task queue, post processing is started
# before all files were collected.
self.post_process_original = self.post_process
self.post_process = False
def __init__(self,amqp_info):
Greenlet.__init__(self)
ExampleConsumer.__init__(self,amqp_info)
self.callinfos={}
self.send_queue=Queue()
self.lock = BoundedSemaphore(1)
self.send_greenlet=None
self.handle_stoping=False
self.send_stop_evt=Event()
self.timeout_stop_evt=Event()
self.timeout_handle_greenlet=gevent.spawn(self.on_timeout_handle)
self.timeout_handle_greenlet.start()
def send_task(self):
while True:
if self.send_queue.empty()&self.handle_stoping:
self.send_stop_evt.set()
return
if not self.send_queue.empty():
callinfo=self.send_queue.get_nowait()
# ??RPC?????RPC????`rpc_queue`????????`reply_to`?`correlation_id`
self._channel.basic_publish(exchange=self.Exchange,
routing_key=self.Queue,
properties=pika.BasicProperties(
reply_to = self.callback_queue,
),
body=callinfo.body)
gevent.sleep(0)
def __init__(self, host, port, password):
self.host = host
self.port = port
self.password = password
self.timeout = 5
self._run = True
self._EOL = '\n'
self._commands_sent = []
self._auth_request_event = Event()
self._receive_events_greenlet = None
self._process_events_greenlet = None
self.event_handlers = {}
self.connected = False
self._esl_event_queue = Queue()
self._process_esl_event_queue = True
def stop(self):
"""Stop the greenlet workers and empty all queues."""
with self._state_change:
if not self._running:
return
self._running = False
for queue in (self.callback_queue,):
queue.put(_STOP)
while self._workers:
worker = self._workers.pop()
worker.join()
# Clear the queues
self.callback_queue = Queue() # pragma: nocover
python2atexit.unregister(self.stop)
def _app():
class Broker:
def subscribe(self, subscriber):
for idx, _ in enumerate(LIFECYCLE_EVENTS):
subscriber.put(event(idx))
subscriber.put(StopIteration)
def unsubscribe(self, queue):
queue.put(StopIteration)
app = vadvisor.app.rest.app
broker = Broker()
app.eventBroker = broker
app.eventStore = InMemoryStore()
q = queue.Queue()
broker.subscribe(q)
for element in q:
app.eventStore.put(element)
return app
def pipeline(stages, initial_data):
monitors = Group()
# Make sure items in initial_data are iterable.
if not isinstance(initial_data, types.GeneratorType):
try:
iter(initial_data)
except:
raise TypeError('initial_data must be iterable')
# The StopIteration will bubble through the queues as it is reached.
# Once a stage monitor sees it, it indicates that the stage will read
# no more data and the monitor can wait for the current work to complete
# and clean up.
if hasattr(initial_data, 'append'):
initial_data.append(StopIteration)
if not stages:
return PipelineResult(monitors, [])
# chain stage queue io
# Each stage shares an output queue with the next stage's input.
qs = [initial_data] + [Queue() for _ in range(len(stages))]
for stage, in_q, out_q in zip(stages, qs[:-1], qs[1:]):
stage.in_q = in_q
stage.out_q = out_q
monitors.spawn(stage_monitor, stage)
gevent.sleep(0)
return PipelineResult(monitors, stages[-1].out_q)
def __init__(self, target, id=''):
self.target = target
self.id = id
self.ip = []
self.dns_ip = ['1.1.1.1', '127.0.0.1', '0.0.0.0', '202.102.110.203', '202.102.110.204',
'220.250.64.225']
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64; rv:49.0) Gecko/20100101 Firefox/49.0'}
self.queue = Queue()
self.thread_num = 60
self.c_count = {}
self.domain = []
self.domains = {}
self.title = {}
self.appname = {}
self.removed_domains = []
self.init()
def __init__(self, settings=None, hooks=None, *args, **kwargs):
"""
Initialize the NNTPManager() based on the provided settings.
it is presumed settings is a loaded NNTPSettings() object.
"""
# A connection pool of NNTPConnections
self._pool = []
# A mapping of active worker threads
self._workers = []
# Keep track of the workers available for processing
# we will use this value to determine if we need to spin
# up another process or not.
self._work_tracker = WorkTracker()
# Queue Control
self._work_queue = Queue()
# Map signal
gevent.signal(signal.SIGQUIT, gevent.kill)
# Define our hooks (if any)
self.hooks = HookManager()
if hooks:
self.hooks.add(hooks=hooks)
if settings is None:
# Use defaults
settings = NNTPSettings()
if not len(settings.nntp_servers):
logger.warning("There were no NNTP Servers defined to load.")
raise AttributeError('No NNTP Servers Defined')
# Store our defined settings
self._settings = settings
return
def group(self, name, block=True):
"""
Queue's an NNTPRequest for processing and returns a call
to GROUP (fetching details on it specifically)
If block is not set to true, then it is up to the calling
application to monitor the request until it's complete.
Since the Request Object is inherited from a gevent.Event()
object, one can easily check the status with the ready()
call or, wait() if they want to block until content is ready.
See http://www.gevent.org/gevent.event.html#module-gevent.event
for more details.
To remain thread-safe; it's recommended that you do not change
any of the response contents or articles contents prior to
it's flag being set (marking completion)
"""
# Push request to the queue
request = NNTPConnectionRequest(actions=[
# Append list of NNTPConnection requests in a list
# ('function, (*args), (**kwargs) )
('group', (name, ), {}),
])
# Append to Queue for processing
self.put(request)
# We'll know when our request has been handled because the
# request is included in the response.
if block:
request.wait()
# Simplify things by returning just the response object
# instead of the request
return request.response[0]
# We aren't blocking, so just return the request object
return request
def groups(self, filters=None, lazy=True, block=True):
"""
Queue's an NNTPRequest for processing and returns the
NNTP Group lists.
If block is not set to true, then it is up to the calling
application to monitor the request until it's complete.
Since the Request Object is inherited from a gevent.Event()
object, one can easily check the status with the ready()
call or, wait() if they want to block until content is ready.
See http://www.gevent.org/gevent.event.html#module-gevent.event
for more details.
To remain thread-safe; it's recommended that you do not change
any of the response contents or articles contents prior to
it's flag being set (marking completion)
"""
# Push request to the queue
request = NNTPConnectionRequest(actions=[
# Append list of NNTPConnection requests in a list
# ('function, (*args), (**kwargs) )
('groups', list(), {'filters': filters, 'lazy': lazy}),
])
# Append to Queue for processing
self.put(request)
# We'll know when our request has been handled because the
# request is included in the response.
if block:
request.wait()
# Simplify things by returning just the response object
# instead of the request
return request.response[0]
# We aren't blocking, so just return the request object
return request
def stat(self, id, full=None, group=None, block=True):
"""
Queue's an NNTPRequest for processing and returns it's
response if block is set to True.
If block is not set to true, then it is up to the calling
application to monitor the request until it's complete.
Since the Request Object is inherited from a gevent.Event()
object, one can easily check the status with the ready()
call or, wait() if they want to block until content is ready.
See http://www.gevent.org/gevent.event.html#module-gevent.event
for more details.
To remain thread-safe; it's recommended that you do not change
any of the response contents or articles contents prior to
it's flag being set (marking completion)
"""
# Push request to the queue
request = NNTPConnectionRequest(actions=[
# Append list of NNTPConnection requests in a list
# ('function, (*args), (**kwargs) )
('stat', (id, ), {'group': group, 'full': full}),
])
# Append to Queue for processing
self.put(request)
# We'll know when our request has been handled because the
# request is included in the response.
if block:
request.wait()
# Simplify things by returning just the response object
# instead of the request
return request.response[0]
# We aren't blocking, so just return the request object
return request
def seek_by_date(self, refdate, group=None, block=True):
"""
Returns a pointer in the selected group identified
by the date specified.
If block is not set to true, then it is up to the calling
application to monitor the request until it's complete.
Since the Request Object is inherited from a gevent.Event()
object, one can easily check the status with the ready()
call or, wait() if they want to block until content is ready.
See http://www.gevent.org/gevent.event.html#module-gevent.event
for more details.
To remain thread-safe; it's recommended that you do not change
any of the response contents or articles contents prior to
it's flag being set (marking completion)
"""
# Push request to the queue
request = NNTPConnectionRequest(actions=[
# Append list of NNTPConnection requests in a list
# ('function, (*args), (**kwargs) )
('seek_by_date', (refdate, ), {'group': group, }),
])
# Append to Queue for processing
self.put(request)
# We'll know when our request has been handled because the
# request is included in the response.
if block:
request.wait()
# Simplify things by returning just the response object
# instead of the request
return request.response[0]
# We aren't blocking, so just return the request object
return request
def __init__(self, *args, **kwargs):
Connection.__init__(self, *args, **kwargs)
self._write_queue = Queue()
self._connect_socket()
self._read_watcher = gevent.spawn(self.handle_read)
self._write_watcher = gevent.spawn(self.handle_write)
self._send_options_message()
def __init__(self):
self.queue_ = Queue()
def __init__(self, *args, **kwargs):
"""Set up some vars for this instance"""
self.queue = Queue()
pixelcanvas = Canvas(self.queue, kwargs['options'])
__request_processing_greenlet = spawn(pixelcanvas.CanvasUpdate)
del (kwargs['options'])
DatagramServer.__init__(self, *args, **kwargs)
def __init__(self, *args, **kwargs):
"""Set up some vars for this instance"""
self.queue = Queue()
pixelcanvas = Canvas(self.queue, kwargs['options'])
__request_processing_greenlet = spawn(pixelcanvas.CanvasUpdate)
del (kwargs['options'])
DatagramServer.__init__(self, *args, **kwargs)
def test_queue2(self):
"""?????size?????get/set????????"""
_log.info('test_queue2222222222')
task_queue = Queue(3)
def worker(name):
try:
while True:
task = task_queue.get(timeout=1) # decrements queue size by 1
print('Worker %s got task %s' % (name, task))
gevent.sleep(0)
except Empty:
print('Quitting time!')
def boss():
"""
Boss will wait to hand out work until a individual worker is
free since the maxsize of the task queue is 3.
"""
for i in xrange(1,10):
task_queue.put(i)
print('Assigned all work in iteration 1')
for i in xrange(10,20):
task_queue.put(i)
print('Assigned all work in iteration 2')
gevent.joinall([
gevent.spawn(boss),
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'bob'),
])
def init(self):
self.stdout_queue = Queue()
self.stderr_queue = Queue()
args = [
'sflowtool',
'-l',
]
self.spawn_process(args)
def __init__(self):
super(NotifyingQueue, self).__init__()
self._queue = Queue()
def __init__(self, api, token_address):
assert isinstance(api, RaidenAPI)
self.ready = Event()
self.api = api
self.token_address = token_address
existing_channels = self.api.get_channel_list(self.token_address)
open_channels = [
channel for channel in existing_channels if channel.state == CHANNEL_STATE_OPENED
]
if len(open_channels) == 0:
token = self.api.raiden.chain.token(self.token_address)
if not token.balance_of(self.api.raiden.address) > 0:
raise ValueError('not enough funds for echo node %s for token %s' % (
pex(self.api.raiden.address),
pex(self.token_address),
))
self.api.connect_token_network(
self.token_address,
token.balance_of(self.api.raiden.address),
initial_channel_target=10,
joinable_funds_target=.5,
)
self.last_poll_block = self.api.raiden.get_block_number()
self.received_transfers = Queue()
self.stop_signal = None # used to signal REMOVE_CALLBACK and stop echo_workers
self.greenlets = list()
self.lock = BoundedSemaphore()
self.seen_transfers = deque(list(), TRANSFER_MEMORY)
self.num_handled_transfers = 0
self.lottery_pool = Queue()
# register ourselves with the raiden alarm task
self.api.raiden.alarm.register_callback(self.echo_node_alarm_callback)
self.echo_worker_greenlet = gevent.spawn(self.echo_worker)
def __init__(self, func, iterable, spawn=None):
from gevent.queue import Queue
Greenlet.__init__(self)
if spawn is not None:
self.spawn = spawn
self.func = func
self.iterable = iterable
self.queue = Queue()
self.count = 0
self.rawlink(self._on_finish)
def __init__(self, func, iterable, spawn=None):
from gevent.queue import Queue
Greenlet.__init__(self)
if spawn is not None:
self.spawn = spawn
self.func = func
self.iterable = iterable
self.queue = Queue()
self.count = 0
self.waiting = [] # QQQ maybe deque will work faster there?
self.index = 0
self.maxindex = -1
self.rawlink(self._on_finish)