def set_exc_info(self, exc_info):
"""Sets the exception information of a ``Future.``
Preserves tracebacks on Python 2.
.. versionadded:: 4.0
"""
self._exc_info = exc_info
self._log_traceback = True
if not _GC_CYCLE_FINALIZERS:
self._tb_logger = _TracebackLogger(exc_info)
try:
self._set_done()
finally:
# Activate the logger after all callbacks have had a
# chance to call result() or exception().
if self._log_traceback and self._tb_logger is not None:
self._tb_logger.activate()
self._exc_info = exc_info
python类Future()的实例源码
def set_exc_info(self, exc_info):
"""Sets the exception information of a ``Future.``
Preserves tracebacks on Python 2.
.. versionadded:: 4.0
"""
self._exc_info = exc_info
self._log_traceback = True
if not _GC_CYCLE_FINALIZERS:
self._tb_logger = _TracebackLogger(exc_info)
try:
self._set_done()
finally:
# Activate the logger after all callbacks have had a
# chance to call result() or exception().
if self._log_traceback and self._tb_logger is not None:
self._tb_logger.activate()
self._exc_info = exc_info
def set_exc_info(self, exc_info):
"""Sets the exception information of a ``Future.``
Preserves tracebacks on Python 2.
.. versionadded:: 4.0
"""
self._exc_info = exc_info
self._log_traceback = True
if not _GC_CYCLE_FINALIZERS:
self._tb_logger = _TracebackLogger(exc_info)
try:
self._set_done()
finally:
# Activate the logger after all callbacks have had a
# chance to call result() or exception().
if self._log_traceback and self._tb_logger is not None:
self._tb_logger.activate()
self._exc_info = exc_info
def funcWrapper(*args) -> bool:
"""can be passed to executor, workflow step is loaded from args[0], funcWrapper wait for futures and stores values
in steps before a launcher is started"""
assert (len(args) == 1)
aSWorkflowStepState = args[0]
assert (isinstance(aSWorkflowStepState, states.WorkflowStepState))
# wait for prior future results
for key, input in aSWorkflowStepState.inputPortStates.items():
currentValue = input.getEvaluation() # future
if isinstance(currentValue, Future):
# Quick and dirty: blocks until port-states are written by launchTool;
# Later: this wont work in multi-CPU / multi-node enviroments because port states might be not in sync!
input.setValue(currentValue.result())
logging.getLogger('system').debug(currentValue.result())
else:
input.setValue(currentValue)
launcher.launchTool(aSWorkflowStepState)
return True
def mock_session_pools(f):
"""
Helper decorator that allows tests to initialize :class:.`Session` objects
without actually connecting to a Cassandra cluster.
"""
@wraps(f)
def wrapper(*args, **kwargs):
with patch.object(Session, "add_or_renew_pool") as mocked_add_or_renew_pool:
future = Future()
future.set_result(object())
mocked_add_or_renew_pool.return_value = future
f(*args, **kwargs)
return wrapper
def __del__(self):
if self.formatted_tb:
app_log.error('Future exception was never retrieved: %s',
''.join(self.formatted_tb).rstrip())
def result(self, timeout=None):
"""If the operation succeeded, return its result. If it failed,
re-raise its exception.
This method takes a ``timeout`` argument for compatibility with
`concurrent.futures.Future` but it is an error to call it
before the `Future` is done, so the ``timeout`` is never used.
"""
self._clear_tb_log()
if self._result is not None:
return self._result
if self._exc_info is not None:
raise_exc_info(self._exc_info)
self._check_done()
return self._result
def exception(self, timeout=None):
"""If the operation raised an exception, return the `Exception`
object. Otherwise returns None.
This method takes a ``timeout`` argument for compatibility with
`concurrent.futures.Future` but it is an error to call it
before the `Future` is done, so the ``timeout`` is never used.
"""
self._clear_tb_log()
if self._exc_info is not None:
return self._exc_info[1]
else:
self._check_done()
return None
def add_done_callback(self, fn):
"""Attaches the given callback to the `Future`.
It will be invoked with the `Future` as its argument when the Future
has finished running and its result is available. In Tornado
consider using `.IOLoop.add_future` instead of calling
`add_done_callback` directly.
"""
if self._done:
fn(self)
else:
self._callbacks.append(fn)
def set_result(self, result):
"""Sets the result of a ``Future``.
It is undefined to call any of the ``set`` methods more than once
on the same object.
"""
self._result = result
self._set_done()
def __del__(self):
if not self._log_traceback:
# set_exception() was not called, or result() or exception()
# has consumed the exception
return
tb = traceback.format_exception(*self._exc_info)
app_log.error('Future %r exception was never retrieved: %s',
self, ''.join(tb).rstrip())
def __del__(self):
if self.formatted_tb:
app_log.error('Future exception was never retrieved: %s',
''.join(self.formatted_tb).rstrip())
def result(self, timeout=None):
"""If the operation succeeded, return its result. If it failed,
re-raise its exception.
This method takes a ``timeout`` argument for compatibility with
`concurrent.futures.Future` but it is an error to call it
before the `Future` is done, so the ``timeout`` is never used.
"""
self._clear_tb_log()
if self._result is not None:
return self._result
if self._exc_info is not None:
raise_exc_info(self._exc_info)
self._check_done()
return self._result
def exception(self, timeout=None):
"""If the operation raised an exception, return the `Exception`
object. Otherwise returns None.
This method takes a ``timeout`` argument for compatibility with
`concurrent.futures.Future` but it is an error to call it
before the `Future` is done, so the ``timeout`` is never used.
"""
self._clear_tb_log()
if self._exc_info is not None:
return self._exc_info[1]
else:
self._check_done()
return None
def set_result(self, result):
"""Sets the result of a ``Future``.
It is undefined to call any of the ``set`` methods more than once
on the same object.
"""
self._result = result
self._set_done()
def set_exception(self, exception):
"""Sets the exception of a ``Future.``"""
self.set_exc_info(
(exception.__class__,
exception,
getattr(exception, '__traceback__', None)))
def __del__(self):
if not self._log_traceback:
# set_exception() was not called, or result() or exception()
# has consumed the exception
return
tb = traceback.format_exception(*self._exc_info)
app_log.error('Future %r exception was never retrieved: %s',
self, ''.join(tb).rstrip())
def __del__(self):
if self.formatted_tb:
app_log.error('Future exception was never retrieved: %s',
''.join(self.formatted_tb).rstrip())
def exception(self, timeout=None):
"""If the operation raised an exception, return the `Exception`
object. Otherwise returns None.
This method takes a ``timeout`` argument for compatibility with
`concurrent.futures.Future` but it is an error to call it
before the `Future` is done, so the ``timeout`` is never used.
"""
self._clear_tb_log()
if self._exc_info is not None:
return self._exc_info[1]
else:
self._check_done()
return None
def add_done_callback(self, fn):
"""Attaches the given callback to the `Future`.
It will be invoked with the `Future` as its argument when the Future
has finished running and its result is available. In Tornado
consider using `.IOLoop.add_future` instead of calling
`add_done_callback` directly.
"""
if self._done:
fn(self)
else:
self._callbacks.append(fn)