def _pull_from(self, subscription):
user = self.site.get_user(subscription['username'])
self.image_cache_handler.get_or_create(username=user.username)
new_images = self.image_cache_handler.get_the_news(user.images)
# This need run after send all images, because bulk is raising an
# InvalidOperation Exception: Bulk operations can only be executed once
self.image_cache_handler.add_the_images(new_images)
chat_ids = [s['chat_id'] for s in subscription['subscribers']]
p = pool.Pool(5)
for _id in chat_ids:
p.spawn(self._push_to, _id, new_images)
p.join()
python类spawn()的实例源码
def election(self, handle):
"""
:param handle: Election completed, will call this.
:type handle: callable
:return:
"""
if not callable(handle):
raise err.OctpProgramError('Parameter `handler` must be callable.')
while True:
self._election() # do election
if self._locker.is_acquired:
log.debug('Got locker')
gevent.spawn(self._heartbeat_handler)
handle() # call callback
break # everything finished
else:
log.debug('Get locker failed, start watcher.')
g = gevent.spawn(self._watcher_handler) # watch locker, election again when current locker is expired.
g.join() # wait master lose locker, then retry election
def _publish(self):
"""
Start coroutine for publish.
:return:
"""
for retry in range(constant.ETCD_RECONNECT_MAX_RETRY_INIT):
try:
co = gevent.spawn(self._publish_handler)
co.join(constant.ETCD_CONNECT_TIMEOUT)
e = co.exception
if e: # if _publish_handler raise some exception, reraise it.
raise e
else:
co.kill()
except (etcd.EtcdConnectionFailed, gevent.Timeout):
log.info('Connect to etcd failed, Retry(%d)...', retry)
gevent.sleep(constant.ETCD_RECONNECT_INTERVAL)
else:
log.info('Publish OK.')
break
else: # publish failed
raise err.OctpEtcdConnectError('Max attempts exceeded.')
def gevent_run(app, monkey_patch=True, start=True, debug=False,
**kwargs): # pragma: no cover
"""Run your app in gevent.spawn, run simple loop if start == True
:param app: queues.Microservice instance
:param monkey_patch: boolean, use gevent.monkey.patch_all() for patching standard modules, default: True
:param start: boolean, if True, server will be start (simple loop)
:param kwargs: other params for WSGIServer(**kwargs)
:return: server
"""
if monkey_patch:
from gevent import monkey
monkey.patch_all()
import gevent
gevent.spawn(app.run, debug=debug, **kwargs)
if start:
while not app.stopped:
gevent.sleep(0.1)
def runAsync(self, func, args, kwargs, title, success, error, errorcb=None, successcb=None):
def runner():
try:
func(*args, **kwargs)
except (Exception, exceptions.BaseError), e:
eco = j.errorconditionhandler.processPythonExceptionObject(e)
if errorcb:
try:
errorcb(eco)
except:
pass
errormsg = error + "</br> For more info check <a href='/grid/error condition?id=%s'>error</a> details" % eco.guid
self.sendMessage(title, errormsg, 'error', hide=False)
return
refreshhint = self.ctx.env.get('HTTP_REFERER')
self.sendMessage(title, success, 'success', refresh_hint=refreshhint)
if successcb:
successcb()
self.sendMessage(title, 'Started')
gevent.spawn(runner)
def __process_request(self, stream, pid, timeout):
# Process request and get response stream.
# Request are processed inside a thread pool to avoid
# userland code to block requests.
res = self._pool.spawn(self.__process_request_stream, stream)
# Wait for a period of seconds to get the execution result
try:
response = res.get(timeout=timeout)
except gevent.Timeout:
msg = 'SDK execution timed out after {}ms'.format(
int(timeout * 1000),
pid,
)
response = create_error_response(msg)
LOG.warn('{}. PID: {}'.format(msg, pid))
except:
LOG.exception('Failed to handle request. PID: %d', pid)
response = create_error_response('Failed to handle request')
self._send_response(response)
def test_propagation_with_new_context(self):
# create multiple futures so that we expect multiple
# traces instead of a single one
ctx = Context(trace_id=100, span_id=101)
self.tracer.context_provider.activate(ctx)
def greenlet():
with self.tracer.trace('greenlet') as span:
gevent.sleep(0.01)
jobs = [gevent.spawn(greenlet) for x in range(1)]
gevent.joinall(jobs)
traces = self.tracer.writer.pop_traces()
eq_(1, len(traces))
eq_(1, len(traces[0]))
eq_(traces[0][0].trace_id, 100)
eq_(traces[0][0].parent_id, 101)
def test_exception(self):
# it should catch the exception like usual
def greenlet():
with self.tracer.trace('greenlet'):
raise Exception('Custom exception')
g = gevent.spawn(greenlet)
g.join()
ok_(isinstance(g.exception, Exception))
traces = self.tracer.writer.pop_traces()
eq_(1, len(traces))
eq_(1, len(traces[0]))
span = traces[0][0]
eq_(1, span.error)
eq_('Custom exception', span.get_tag('error.msg'))
ok_('Traceback (most recent call last)' in span.get_tag('error.stack'))
def handle(self, source, address):
init_data = source.recv(BUFFER_SIZE)
try:
if len(init_data) > 3 and init_data[:3] == b'GET':
source.sendall(b'HTTP/1.1 200 OK\r\n' + format_date_time(time.time()).encode() + b'\r\n\r\nOK')
return
else:
dest = create_connection(self.tcp_service)
except IOError as ex:
sys.stderr.write('Error on create connection: {}'.format(ex))
return
forwarders = (
gevent.spawn(forward, source, dest, self),
gevent.spawn(forward, dest, source, self),
)
gevent.joinall(forwarders)
def run(self):
if self.client.login():
# tqdm has bug here, let it be None at now
bar = tqdm.tqdm(total=len(self.symbols), desc='overall')
p = gevent.pool.Pool(5)
for symbol in self.symbols:
p.spawn(self.update_symbol, symbol, bar)
p.join()
if bar:
bar.close()
self.out.close()
else:
log.error('login error')
def run(self):
c = self.client
if not c.login():
log.error('login failed')
return
on_data = self.on_data if self.out else None
parse = False if self.raw else True
g = gevent.pool.Group()
for symbols in self.split(self.symbols, self.size):
g.spawn(self.client.watch, symbols, on_data, parse)
g.join()
self.out.close()
def spawn_watchs(self, w, symbols_list):
parse = False if self.raw else True
on_data = functools.partial(self.child_on_data, w) if self.out else None
g = gevent.pool.Group()
for symbols in symbols_list:
g.spawn(self.client.watch, symbols, on_data, parse)
g.join()
def make_app(raw_config):
cfg = config.parse_config(raw_config, CONFIG_SPEC)
metrics_client = metrics_client_from_config(raw_config)
error_reporter = error_reporter_from_config(raw_config, __name__)
secrets = secrets_store_from_config(raw_config)
dispatcher = MessageDispatcher(metrics=metrics_client)
source = MessageSource(
config=cfg.amqp,
)
app = SocketServer(
metrics=metrics_client,
dispatcher=dispatcher,
secrets=secrets,
error_reporter=error_reporter,
ping_interval=cfg.web.ping_interval,
admin_auth=cfg.web.admin_auth,
conn_shed_rate=cfg.web.conn_shed_rate,
)
# register SIGUSR2 to trigger app quiescing,
# useful if app processes are behind
# a process manager like einhorn.
def _handle_quiesce_signal(_, frame):
app._quiesce({}, bypass_auth=True)
signal.signal(signal.SIGUSR2, _handle_quiesce_signal)
signal.siginterrupt(signal.SIGUSR2, False)
source.message_handler = dispatcher.on_message_received
app.status_publisher = source.send_message
gevent.spawn(source.pump_messages)
return app
def run(args):
if args.download:
resolvers = download_resolvers()
else:
resolvers = load_resolvers(args.resolvers)
random.shuffle(resolvers)
pool = gevent.pool.Pool(args.concurrency)
bar = progressbar.ProgressBar(redirect_stdout=True, redirect_stderr=True)
for resolver in bar(resolvers):
pool.add(gevent.spawn(check_resolver, args, resolver))
pool.join()
def initialize_reactor(cls):
if not cls._timers:
cls._timers = TimerManager()
cls._timeout_watcher = gevent.spawn(cls.service_timeouts)
cls._new_timer = gevent.event.Event()
def __init__(self, *args, **kwargs):
Connection.__init__(self, *args, **kwargs)
self._write_queue = Queue()
self._connect_socket()
self._read_watcher = gevent.spawn(self.handle_read)
self._write_watcher = gevent.spawn(self.handle_write)
self._send_options_message()
def copy_current_request_context(f):
"""A helper function that decorates a function to retain the current
request context. This is useful when working with greenlets. The moment
the function is decorated a copy of the request context is created and
then pushed when the function is called.
Example::
import gevent
from flask import copy_current_request_context
@app.route('/')
def index():
@copy_current_request_context
def do_some_work():
# do some work here, it can access flask.request like you
# would otherwise in the view function.
...
gevent.spawn(do_some_work)
return 'Regular response'
.. versionadded:: 0.10
"""
top = _request_ctx_stack.top
if top is None:
raise RuntimeError('This decorator can only be used at local scopes '
'when a request context is on the stack. For instance within '
'view functions.')
reqctx = top.copy()
def wrapper(*args, **kwargs):
with reqctx:
return f(*args, **kwargs)
return update_wrapper(wrapper, f)
def run(self):
gevent.spawn(self.callback)
def fire(self,name):
try:
ev = self._events[name.lower()].pop(0)
except:
return False
while ev:
gevent.spawn(ev.run)
try:
ev = self._events[name.lower()].pop(0)
except:
break
return True
def check_proxy(self):
"""return host is valid or not
"""
if not self.check_httpbin():
return
threads = []
self._before_check()
for index, url in enumerate(self.url_list):
threads.append(gevent.spawn(self._check, index, url))
gevent.joinall(threads)
self._after_check()