def _concurrent_execute(self, context, start_req, parser, pool, pool_size):
queue = Queue() # ????
# ????????????
for r in start_req:
queue.put_nowait(r)
if pool is None:
pool = GeventPool(pool_size)
greenlets = []
while True:
try:
req = self._check_req(queue.get(timeout=1))
if req.parser is None:
req.parser = parser
greenlets.append(pool.spawn(req, context, queue))
except Empty:
break
return [greenlet.get() for greenlet in greenlets]
python类Empty()的实例源码
def _messages_until_block(self, raiden, expiration_block):
""" Returns the received messages up to the block `expiration_block`.
"""
current_block = raiden.get_block_number()
while current_block < expiration_block:
try:
response = self.response_queue.get(
timeout=DEFAULT_EVENTS_POLL_TIMEOUT,
)
except Empty:
pass
else:
if response:
yield response
current_block = raiden.get_block_number()
def _wait_for_ssh(queue, ssh, command, timeout=1, attempts=40):
"""Wait until a successful connection to the ssh endpoint can be made."""
try:
host, port = queue.get(timeout=timeout * attempts)
except g_queue.Empty:
cli.bad_exit("No SSH endpoint found.")
for _ in six.moves.range(attempts):
_LOGGER.debug('Checking SSH endpoint %s:%s', host, port)
if checkout.connect(host, port):
run_ssh(host, port, ssh, list(command))
break # if run_ssh doesn't end with os.execvp()...
try:
host, port = queue.get(timeout=timeout)
queue.task_done()
except g_queue.Empty:
pass
# Either all the connection attempts failed or we're after run_ssh
# (not resulting in os.execvp) so let's "clear the queue" so the thread
# can join
queue.task_done()
def update_users(self):
already_updated = set()
while True:
# Only update so many at a time
if len(already_updated) > 10000:
return
try:
user_id, data = self.user_updates.get_nowait()
except Empty:
return
if user_id in already_updated:
continue
already_updated.add(user_id)
try:
User.update(**data).where(User.user_id == user_id).execute()
except:
self.log.exception('Failed to update user %s: ', user_id)
def _run(self):
"""
Read from the work_queue, process it using an NNTPRequest object.
"""
# block until we have an event to handle.
# print "Worker %s ready!" % self
while not self._exit.is_set():
# Begin our loop
try:
request = self._work_queue.get()
if request is StopIteration:
# during a close() call (defined below) we force
# a StopIteration into the queue to force an exit
# from a program level
return
if request.is_set():
# Process has been aborted or is no longer needed
continue
except StopIteration:
# Got Exit
return
except EmptyQueueException:
# Nothing available for us
continue
# Mark ourselves busy
self._work_tracker.mark_busy(self)
# If we reach here, we have a request to process
request.run(connection=self._connection)
# Mark ourselves available again
self._work_tracker.mark_available(self)
# Ensure our connection is closed before we exit
self._connection.close()
def close(self):
"""
closes out any open threads and cleans up NNTPManager
gracefully.
"""
while not self._work_queue.empty():
try:
self._work_queue.get_nowait()
except EmptyQueueException:
# Nothing available for us
break
for worker in self._workers:
# Toggle Exit
worker._exit.set()
self._work_queue.put(StopIteration)
for entry in self._pool:
entry.close()
for worker in self._workers:
logger.info("Waiting for workers to exit.")
worker.join()
del self._pool
del self._workers
self._workers = []
self._pool = []
def test_queue2(self):
"""?????size?????get/set????????"""
_log.info('test_queue2222222222')
task_queue = Queue(3)
def worker(name):
try:
while True:
task = task_queue.get(timeout=1) # decrements queue size by 1
print('Worker %s got task %s' % (name, task))
gevent.sleep(0)
except Empty:
print('Quitting time!')
def boss():
"""
Boss will wait to hand out work until a individual worker is
free since the maxsize of the task queue is 3.
"""
for i in xrange(1,10):
task_queue.put(i)
print('Assigned all work in iteration 1')
for i in xrange(10,20):
task_queue.put(i)
print('Assigned all work in iteration 2')
gevent.joinall([
gevent.spawn(boss),
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'bob'),
])
def _send_and_wait_time(self, raiden, recipient, transfer, timeout):
""" Utility to handle multiple messages for the same hashlock while
properly handling expiration timeouts.
"""
current_time = time.time()
limit_time = current_time + timeout
raiden.send_async(recipient, transfer)
while current_time <= limit_time:
# wait for a response message (not the Ack for the transfer)
try:
response = self.response_queue.get(
timeout=limit_time - current_time,
)
except Empty:
yield TIMEOUT
return
yield response
current_time = time.time()
if log.isEnabledFor(logging.DEBUG):
log.debug(
'TIMED OUT %s %s',
self.__class__,
pex(transfer),
)
def _send_and_wait_block(self, raiden, recipient, transfer, expiration_block):
""" Utility to handle multiple messages and timeout on a block number. """
raiden.send_async(recipient, transfer)
current_block = raiden.get_block_number()
while current_block < expiration_block:
try:
response = self.response_queue.get(
timeout=DEFAULT_EVENTS_POLL_TIMEOUT,
)
except Empty:
pass
else:
if response:
yield response
current_block = raiden.get_block_number()
if log.isEnabledFor(logging.DEBUG):
log.debug(
'TIMED OUT ON BLOCK %s %s %s',
current_block,
self.__class__,
pex(transfer),
block_number=current_block,
)
yield TIMEOUT
def ajax_endpoint(environ, start_response):
# WSGI????????????????????????????HTTP???
# ??????WSGI?????HTTP?????????????environ——??????
# HTTP?????dict???start_response——????HTTP??????
status = '200 OK'
headers = [
('Content-Type', 'application/json')
]
start_response(status, headers)
while True:
try:
datum = data_source.get(timeout=5)
yield json.dumps(datum) + '\n'
except Empty: pass
def poll(uid):
try:
msg = users[uid].queue.get(timeout=10)
except queue.Empty:
msg = []
return json.dumps(msg)
def test_queue_empty_exception(self):
from gevent.queue import Empty
h = self._makeOne()
h.start()
ev = self._getEvent()()
def func():
ev.set()
raise Empty()
call1 = Callback('completion', func, ())
h.dispatch_callback(call1)
ev.wait()
def _create_greenlet_worker(self, queue):
def greenlet_worker():
while True:
try:
func = queue.get()
if func is _STOP:
break
func()
except Empty:
continue
except Exception as exc:
log.warning("Exception in worker greenlet")
log.exception(exc)
return gevent.spawn(greenlet_worker)
def save_search_result_with_queue(queue):
while 1:
try:
p = queue.get(timeout=0)
except Empty:
break
save_search_result(p, queue)
print 'stopping crawler...'
def save_search_result_with_queue(queue):
while 1:
try:
p = queue.get(timeout=0)
except Empty:
break
print p, 'page'
save_search_result(p, queue)
print 'stopping crawler...'
def worker(n):
try:
while True:
task = tasks.get(timeout=1) # decrements queue size by 1
print('Worker %s got task %s' % (n, task))
gevent.sleep(1.5)
except Empty:
print('Quitting time!')
def test_queue_empty_exception(self):
from gevent.queue import Empty
h = self._makeOne()
h.start()
ev = self._getEvent()()
def func():
ev.set()
raise Empty()
call1 = Callback('completion', func, ())
h.dispatch_callback(call1)
ev.wait()
def _create_greenlet_worker(self, queue):
def greenlet_worker():
while True:
try:
func = queue.get()
if func is _STOP:
break
func()
except Empty:
continue
except Exception as exc:
log.warning("Exception in worker greenlet")
log.exception(exc)
return gevent.spawn(greenlet_worker)
def _callback_wrapper(self, command_id, command_obj, callback, greenlet):
assert callable(callback) or callback is None
if not self._queue.empty():
try:
next_cmd_id = self._queue.get_nowait()
log.d("Starting command id", next_cmd_id, " next in queue in service '{}'".format(self.name))
self._start(next_cmd_id)
except queue.Empty:
pass
command_obj.state = command.CommandState.finished
try:
greenlet.get()
except BaseException:
log.exception("Command", "{}({})".format(command_obj.__class__.__name__, command_id), "raised an exception")
command_obj.state = command.CommandState.failed
command_obj.exception = greenlet.exception
if constants.dev:
raise # doesnt work
if isinstance(greenlet.value, gevent.GreenletExit):
command_obj.state = command.CommandState.stopped
greenlet.value = None
if command_id in self._decorators:
greenlet.value = self._decorators[command_id](greenlet.value)
log.d(
"Command id", command_id, "in service '{}'".format(
self.name), "has finished running with state:", str(
command_obj.state))
if callback:
callback(greenlet.value)
def tracks():
"""Event Source endpoint for search queries."""
query = request.args.get('q') or 'pink floyd'
query = query.lower()
artists = set()
if Tag.is_tag(query):
log.info('Query "%s" seems a tag', query)
artists.update(Tag(name=query).top_artists)
elif Artist.is_artist(query):
log.info('Query "%s" seems an artist name', query)
artists.update(Artist(name=query).similar_artists)
else:
log.info('Query "%s" might be tag or artist', query)
artists.update(Artist(name=query).similar_artists)
artists.update(Tag(name=query).top_artists)
def fetch_artist(artist):
"""Fetch helper to run inside greenlet."""
hit = artist.hit_track
if hit:
if not hit.youtube_id:
log.info("Couldn't find youtube id, skipping track %s", hit)
return
hit_queue.put({
'name': str(hit),
'youtubeId': hit.youtube_id,
'thumbnailUrl': hit.thumbnail_url,
})
hit_queue = queue.Queue()
fetch_pool = pool.Pool(10)
glet = fetch_pool.map_async(fetch_artist, artists)
def gen():
"""Generate response by yielding to event source."""
hits = []
tried = 0
while glet:
try:
hit = hit_queue.get(timeout=1)
except queue.Empty:
tried += 1
if tried >= 30:
log.info('Query "%s" took too much, giving up', query)
fetch_pool.kill()
break
if glet:
continue
else:
break
hits.append(hit)
yield ('event: song\ndata: %s\n\n' % json.dumps(hit)).encode('utf-8')
Track.save_hits(hits)
yield 'event: finish\ndata: finish\n\n'.encode('utf-8')
return Response(gen(), mimetype='text/event-stream')
def Call(self,_func, *params):
if self.stoping:
return None,"Process ready to stop!"
ArgsType=[]
index=0
args=[]
for arg in params:
if arg==None:
return None,"Parameter cannot be None!"
elif isinstance(arg,(str,unicode)):
ArgsType.append(public.STRING)
args.append(arg)
elif isinstance(arg,long):
ArgsType.append(public.LONG)
args.append(arg)
elif isinstance(arg,float):
ArgsType.append(public.DOUBLE)
args.append(arg)
elif isinstance(arg,bool):
ArgsType.append(public.BOOL)
args.append(arg)
elif isinstance(arg,int):
#int?????bool ??,???????????int
ArgsType.append(public.INT)
args.append(arg)
elif isinstance(arg,Bytes):
ArgsType.append(public.BYTES)
#?base64
args.append(arg.b64encode())
elif isinstance(arg,dict):
ArgsType.append(public.MAP)
args.append(arg)
else:
return None,"args[%d] [%s] Types not allowed"%(index,type(arg))
index+=1
callInfo=CallInfo(_func,args,ArgsType,timeout=5)
callback=Queue()
if self.local_client!=None:
err=self.local_client.Call(callInfo,callback)
elif self.remote_client!=None:
err=self.remote_client.Call(callInfo,callback)
else:
return None, "rpc service connection failed"
if err!=None:
return None,err
try:
resultInfo=callback.get()
return resultInfo.Result, resultInfo.Error
except Empty:
return None,"Quitting time!"