def _dispatch_client_change_callback(self, client):
if self._is_destroyed:
return
log.info("Start dispatching client change callback.")
for callback in self._client_callbacks:
try:
callback(client)
except (Exception, gevent.Timeout):
self._sc.increment("errors.zk.client.change_callback.failure",
tags={'host': hostname},
sample_rate=1)
log.exception("Failed to exec client change callback.")
python类Timeout()的实例源码
def stop(self):
""" Stop the node. """
self.alarm.stop_async()
self.protocol.stop_and_wait()
wait_for = [self.alarm]
wait_for.extend(self.protocol.greenlets)
wait_for.extend(self.greenlet_task_dispatcher.stop())
# We need a timeout to prevent an endless loop from trying to
# contact the disconnected client
gevent.wait(wait_for, timeout=self.shutdown_timeout)
# Filters must be uninstalled after the alarm task has stopped. Since
# the events are polled by a alarm task callback, if the filters are
# uninstalled before the alarm task is fully stopped the callback
# `poll_blockchain_events` will fail.
#
# We need a timeout to prevent an endless loop from trying to
# contact the disconnected client
try:
with gevent.Timeout(self.shutdown_timeout):
self.blockchain_events.uninstall_all_event_listeners()
except gevent.timeout.Timeout:
pass
# save the state after all tasks are done
if self.serialization_file:
save_snapshot(self.serialization_file, self)
if self.db_lock is not None:
self.db_lock.release()
def timeout_ctx(self):
return gevent.Timeout(self.cfg.keepalive, False)
def _wait(self, action=None, timeout=None):
"""
???service_name??service??
:param timeout:
:type timeout: float
:return:
"""
remain = timeout
waiter = Waiter()
self._oc.add_waiter(self.service_name, waiter)
try:
while True:
with Timeout(remain, _TimeOut):
start = time.time()
cur_action = waiter.get()
remain = remain - (time.time() - start)
if action is None: # ???????????
break
elif action == cur_action: # ????????
break
elif remain < 0.001: # ????????1ms
raise _TimeOut
else:
continue
except _TimeOut: # ????
return False
except Exception as e:
raise err.OctpParamError('catch unexpect error: %s. more: %s', e, traceback.format_exc())
else:
return True
finally:
self._oc.del_waiter(self.service_name, waiter)
def __call__(self, func):
func_logger = logging.getLogger(func.__module__)
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
request = kwargs.get('request')
if request:
_of = getattr(func, 'original', func)
if 'request' not in _of.__code__.co_varnames:
del kwargs['request']
response = kwargs.get('response')
if response:
_of = getattr(func, 'original', func)
if 'response' not in _of.__code__.co_varnames:
del kwargs['response']
try:
if self.timeout:
with gevent.Timeout(self.timeout):
return self._process(func, args, kwargs, request, response)
else:
return self._process(func, args, kwargs, request, response)
except falcon.http_status.HTTPStatus:
raise
except Exception as e:
return self._process_exception_output(e, func_logger, request, response, args, kwargs)
finally:
execution_time = (time.time() - start) * 1000
self._finish_exec(execution_time, func_logger, args, kwargs, request, func)
return self._gevent_wrapper(wrapper)
def __call__(self, *args, **kwargs):
with gevent.Timeout(self.timeout):
return self._gevent_wrapper(self.app)(*args, **kwargs)
def handle_request(self, *args): # pragma: no cover
"""
Apply the configured 'timeout' value to each individual request.
Note that self.timeout is set to half the configured timeout by
the arbiter, so we use the value directly from the config.
"""
with gevent.Timeout(self.cfg.timeout):
return super(GeventWorker, self).handle_request(*args)
def rollback(self):
with gevent.Timeout(5):
super(RoutingSession, self).rollback()
def close(self):
current_transactions = tuple()
if self.transaction is not None:
current_transactions = self.transaction._iterate_parents()
try:
with gevent.Timeout(5):
super(RoutingSession, self).close()
# pylint: disable=E0712
except gevent.Timeout:
# pylint: enable=E0712
close_connections(self.engines.itervalues(), current_transactions)
raise
def execute(self, **kwargs):
"""
Runs the fulfillment strategy on the initiator until the conditions are met.
:return:
"""
with gevent.Timeout(self.timeout):
result = self.fulfillment.run(self.initiator, self.conditions, **kwargs)
result.event_name = self.name
return result
def run(self, event_name, event_result_q):
"""
Execute an individual event.
Success:
- return a result with 'success' = True
Failure:
- Raise an exception
- Timeout
- Return Result with 'success' = False
:param event_name:
:param event_result_q:
:return:
"""
event = self.events_dict[event_name]
try:
result = event.execute(event_results=self.event_results)
self.event_results.add(result)
except (Exception, Timeout) as e:
logger.error('%s', {
'message': 'event_execution_error',
'exception': e,
'event_name': event_name,
})
logger.error(traceback.format_exc())
return event_result_q.put(EVENT_RESULT.FAILURE)
event_result_q.put(result.success())
def timeout_ctx(self):
return gevent.Timeout(self.cfg.keepalive, False)
def timeout_ctx(self):
return gevent.Timeout(self.cfg.keepalive, False)
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def _wait_read(self):
assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
self.__readable = AsyncResult()
# timeout is because libzmq cannot always be trusted to play nice with libevent.
# I can only confirm that this actually happens for send, but lets be symmetrical
# with our dirty hacks.
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__readable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__readable.set()
def test_timeout(self):
a,b = self.create_bound_pair()
g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
timeout = gevent.Timeout(0.1)
timeout.start()
self.assertRaises(gevent.Timeout, b.recv)
g.kill()
def test_green_device(self):
rep = self.context.socket(zmq.REP)
req = self.context.socket(zmq.REQ)
self.sockets.extend([req, rep])
port = rep.bind_to_random_port('tcp://127.0.0.1')
g = gevent.spawn(zmq.green.device, zmq.QUEUE, rep, rep)
req.connect('tcp://127.0.0.1:%i' % port)
req.send(b'hi')
timeout = gevent.Timeout(3)
timeout.start()
receiver = gevent.spawn(req.recv)
self.assertEqual(receiver.get(2), b'hi')
timeout.cancel()
g.kill(block=True)
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def _wait_read(self):
assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
self.__readable = AsyncResult()
# timeout is because libzmq cannot always be trusted to play nice with libevent.
# I can only confirm that this actually happens for send, but lets be symmetrical
# with our dirty hacks.
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__readable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__readable.set()
def test_timeout(self):
a,b = self.create_bound_pair()
g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
timeout = gevent.Timeout(0.1)
timeout.start()
self.assertRaises(gevent.Timeout, b.recv)
g.kill()