def __exit__(self, exc_type, exc_val, exc_tb):
self.before_exit()
signal_responses = results_collected.send_robust(
sender=self, results=self.get_results_to_send(),
context=copy.deepcopy(context.current.data))
if exc_type is None:
for (receiver, response) in signal_responses:
if isinstance(response, BaseException):
orig_tb = ''.join(
traceback.format_tb(response.__traceback__))
error_msg = '{}{}: {}'.format(
orig_tb,
type(response).__name__,
str(response)
)
if hasattr(response, 'clone_with_more_info'):
new_exc = response.clone_with_more_info(
orig_tb=orig_tb)
else:
new_exc = type(response)(error_msg)
raise new_exc
python类format_tb()的实例源码
def _report_unexpected_error(self):
error_type, error, tb = sys.exc_info()
origin = tb
while origin.tb_next is not None:
origin = origin.tb_next
filename = origin.tb_frame.f_code.co_filename
lineno = origin.tb_lineno
message = '{0} at "{1}", line {2:d} : {3}'.format(error_type.__name__, filename, lineno, error)
environment.splunklib_logger.error(message + '\nTraceback:\n' + ''.join(traceback.format_tb(tb)))
self.write_error(message)
# endregion
# region Types
def _report_unexpected_error(self):
error_type, error, tb = sys.exc_info()
origin = tb
while origin.tb_next is not None:
origin = origin.tb_next
filename = origin.tb_frame.f_code.co_filename
lineno = origin.tb_lineno
message = '{0} at "{1}", line {2:d} : {3}'.format(error_type.__name__, filename, lineno, error)
environment.splunklib_logger.error(message + '\nTraceback:\n' + ''.join(traceback.format_tb(tb)))
self.write_error(message)
# endregion
# region Types
def roster_background_thread(self, sr):
'''Entry for background roster update thread'''
try:
logging.debug('roster_thread for ' + str(sr))
# Allow test hooks with static ejabberd_controller
if hasattr(self.ctx, 'ejabberd_controller') and self.ctx.ejabberd_controller is not None:
e = self.ctx.ejabberd_controller
else:
e = ejabberdctl(self.ctx)
groups, commands = self.roster_update_users(e, sr)
self.roster_update_groups(e, groups)
# For some reason, the vcard changes are not pushed to the clients. Rinse and repeat.
# Maybe not necessary with synchronous thread?
# for cmd in commands:
# e.execute(cmd)
self.ctx.shared_roster_db.sync()
except AttributeError:
pass # For tests
except Exception, err:
(etype, value, tb) = sys.exc_info()
traceback.print_exception(etype, value, tb)
logging.warn('roster_groups thread: %s:\n%s'
% (str(err), ''.join(traceback.format_tb(tb))))
return False
def format_exception(exc_info, encoding='UTF-8'):
ec, ev, tb = exc_info
# Our exception object may have been turned into a string, and Python 3's
# traceback.format_exception() doesn't take kindly to that (it expects an
# actual exception object). So we work around it, by doing the work
# ourselves if ev is not an exception object.
if not is_base_exception(ev):
tb_data = force_unicode(
''.join(traceback.format_tb(tb)),
encoding)
ev = exc_to_unicode(ev)
return tb_data + ev
else:
return force_unicode(
''.join(traceback.format_exception(*exc_info)),
encoding)
def print_exception(type=None, value=None, tb=None, limit=None):
if type is None:
type, value, tb = sys.exc_info()
import traceback
print
print "<H3>Traceback (most recent call last):</H3>"
list = traceback.format_tb(tb, limit) + \
traceback.format_exception_only(type, value)
print "<PRE>%s<B>%s</B></PRE>" % (
escape("".join(list[:-1])),
escape(list[-1]),
)
del tb
def pytest_collection_modifyitems(session, config, items):
failure = None
item_ids = _get_set_of_item_ids(items)
try:
seed = getattr(config, 'random_order_seed', None)
bucket_type = config.getoption('random_order_bucket')
_shuffle_items(items, bucket_key=_random_order_item_keys[bucket_type], disable=_disable, seed=seed)
except Exception as e:
# See the finally block -- we only fail if we have lost user's tests.
_, _, exc_tb = sys.exc_info()
failure = 'pytest-random-order plugin has failed with {0!r}:\n{1}'.format(
e, ''.join(traceback.format_tb(exc_tb, 10))
)
config.warn(0, failure, None)
finally:
# Fail only if we have lost user's tests
if item_ids != _get_set_of_item_ids(items):
if not failure:
failure = 'pytest-random-order plugin has failed miserably'
raise RuntimeError(failure)
def test_errors():
def f1():
assert 0, 'holy moly'
def raise_later(e):
errors.reraise(e)
def f2():
try:
f1()
except AssertionError as e:
prepared_e = errors.prepare_for_reraise(e)
else:
assert False, 'f1 should have raised AssertionError'
raise_later(prepared_e)
try:
f2()
except AssertionError:
formatted = traceback.format_tb(sys.exc_info()[2])
formatted_message = ''.join(formatted)
assert_in('holy moly', formatted_message)
assert_in('f1', formatted_message)
else:
assert False, 'f2 should have raised AssertionError'
def errorMsg():
try:
tb = sys.exc_info()[2]
tbinfo = traceback.format_tb(tb)[0]
theMsg = tbinfo + " \n" + str(sys.exc_type)+ ": " + str(sys.exc_value) + " \n"
arcMsg = "ArcPy ERRORS:\n" + arcpy.GetMessages(2) + "\n"
PrintMsg(theMsg, 2)
PrintMsg(arcMsg, 2)
except:
PrintMsg("Unhandled error in errorMsg method", 2)
#======================================================================================================================================
def test_traceback_format(self):
try:
raise KeyError('blah')
except KeyError:
type_, value, tb = sys.exc_info()
traceback_fmt = 'Traceback (most recent call last):\n' + \
''.join(traceback.format_tb(tb))
file_ = StringIO()
_testcapi.traceback_print(tb, file_)
python_fmt = file_.getvalue()
else:
raise Error("unable to create test traceback string")
# Make sure that Python and the traceback module format the same thing
self.assertEqual(traceback_fmt, python_fmt)
# Make sure that the traceback is properly indented.
tb_lines = python_fmt.splitlines()
self.assertEqual(len(tb_lines), 3)
banner, location, source_line = tb_lines
self.assertTrue(banner.startswith('Traceback'))
self.assertTrue(location.startswith(' File'))
self.assertTrue(source_line.startswith(' raise'))
def test_MemoryError(self):
# PyErr_NoMemory always raises the same exception instance.
# Check that the traceback is not doubled.
import traceback
from _testcapi import raise_memoryerror
def raiseMemError():
try:
raise_memoryerror()
except MemoryError as e:
tb = e.__traceback__
else:
self.fail("Should have raises a MemoryError")
return traceback.format_tb(tb)
tb1 = raiseMemError()
tb2 = raiseMemError()
self.assertEqual(tb1, tb2)
def timer_actions(self):
now = time.time()
try:
# voter will update via RPC once every interval.
self.voter.update()
if now - self.last_vote > self.vote_interval:
self.last_vote = now
self.voter.vote_for_comments()
except Exception as e:
self.logger.error(str(e))
self.logger.error(''.join(traceback.format_tb(sys.exc_info()[2])))
# Update tracked comments.
tracked_comments = set(self.voter.db.tracked_comments.values())
if self.last_tracked_comments != tracked_comments:
self.comments_widget.update_comments()
self.last_tracked_comments = tracked_comments
self.voting_power_label.setText('Voting power: %s' % self.voter.get_voting_power())
def load_wsgi(self):
try:
self.wsgi = self.app.wsgi()
except SyntaxError as e:
if not self.cfg.reload:
raise
self.log.exception(e)
# fix from PR #1228
# storing the traceback into exc_tb will create a circular reference.
# per https://docs.python.org/2/library/sys.html#sys.exc_info warning,
# delete the traceback after use.
try:
exc_type, exc_val, exc_tb = sys.exc_info()
self.reloader.add_extra_file(exc_val.filename)
tb_string = traceback.format_tb(exc_tb)
self.wsgi = util.make_fail_app(tb_string)
finally:
del exc_tb
def _report_unexpected_error(self):
error_type, error, tb = sys.exc_info()
origin = tb
while origin.tb_next is not None:
origin = origin.tb_next
filename = origin.tb_frame.f_code.co_filename
lineno = origin.tb_lineno
message = '{0} at "{1}", line {2:d} : {3}'.format(error_type.__name__, filename, lineno, error)
environment.splunklib_logger.error(message + '\nTraceback:\n' + ''.join(traceback.format_tb(tb)))
self.write_error(message)
# endregion
# region Types
def getPreviousExceptions(limit=0):
"""
sys.exc_info() returns : type,value,traceback
traceback.extract_tb(traceback) : returns (filename, line number, function name, text)
"""
try:
exinfo = sys.exc_info()
if exinfo[0] is not None:
stack = traceback.format_tb(exinfo[2])
return str('\n'.join(['Tracebacks (most recent call last):',
''.join(stack[(len(stack)>1 and 1 or 0):]),
': '.join([str(exinfo[0].__name__),str(exinfo[1])])
]))
else:
return ''
except Exception,e:
print 'Aaaargh!'
return traceback.format_exc()
def test_traceback_format(self):
from _testcapi import traceback_print
try:
raise KeyError('blah')
except KeyError:
type_, value, tb = sys.exc_info()
traceback_fmt = 'Traceback (most recent call last):\n' + \
''.join(traceback.format_tb(tb))
file_ = StringIO()
traceback_print(tb, file_)
python_fmt = file_.getvalue()
else:
raise Error("unable to create test traceback string")
# Make sure that Python and the traceback module format the same thing
self.assertEqual(traceback_fmt, python_fmt)
# Make sure that the traceback is properly indented.
tb_lines = python_fmt.splitlines()
self.assertEqual(len(tb_lines), 3)
banner, location, source_line = tb_lines
self.assertTrue(banner.startswith('Traceback'))
self.assertTrue(location.startswith(' File'))
self.assertTrue(source_line.startswith(' raise'))
def test_traceback_format(self):
from _testcapi import traceback_print
try:
raise KeyError('blah')
except KeyError:
type_, value, tb = sys.exc_info()
traceback_fmt = 'Traceback (most recent call last):\n' + \
''.join(traceback.format_tb(tb))
file_ = StringIO()
traceback_print(tb, file_)
python_fmt = file_.getvalue()
else:
raise Error("unable to create test traceback string")
# Make sure that Python and the traceback module format the same thing
self.assertEqual(traceback_fmt, python_fmt)
# Make sure that the traceback is properly indented.
tb_lines = python_fmt.splitlines()
self.assertEqual(len(tb_lines), 3)
banner, location, source_line = tb_lines
self.assertTrue(banner.startswith('Traceback'))
self.assertTrue(location.startswith(' File'))
self.assertTrue(source_line.startswith(' raise'))
def load_wsgi(self):
try:
self.wsgi = self.app.wsgi()
except SyntaxError as e:
if not self.cfg.reload:
raise
self.log.exception(e)
# fix from PR #1228
# storing the traceback into exc_tb will create a circular reference.
# per https://docs.python.org/2/library/sys.html#sys.exc_info warning,
# delete the traceback after use.
try:
exc_type, exc_val, exc_tb = sys.exc_info()
self.reloader.add_extra_file(exc_val.filename)
tb_string = traceback.format_tb(exc_tb)
self.wsgi = util.make_fail_app(tb_string)
finally:
del exc_tb
def format_exception(exc_info, encoding='UTF-8'):
ec, ev, tb = exc_info
# Our exception object may have been turned into a string, and Python 3's
# traceback.format_exception() doesn't take kindly to that (it expects an
# actual exception object). So we work around it, by doing the work
# ourselves if ev is not an exception object.
if not is_base_exception(ev):
tb_data = force_unicode(
''.join(traceback.format_tb(tb)),
encoding)
ev = exc_to_unicode(ev)
return tb_data + ev
else:
return force_unicode(
''.join(traceback.format_exception(*exc_info)),
encoding)
def _report_unexpected_error(self):
error_type, error, tb = sys.exc_info()
origin = tb
while origin.tb_next is not None:
origin = origin.tb_next
filename = origin.tb_frame.f_code.co_filename
lineno = origin.tb_lineno
message = '{0} at "{1}", line {2:d} : {3}'.format(error_type.__name__, filename, lineno, error)
environment.splunklib_logger.error(message + '\nTraceback:\n' + ''.join(traceback.format_tb(tb)))
self.write_error(message)
# endregion
# region Types
def test_exceptions_csv(self):
try:
raise Exception("Test exception")
except Exception as e:
tb = sys.exc_info()[2]
runners.locust_runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))
runners.locust_runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))
response = requests.get("http://127.0.0.1:%i/exceptions/csv" % self.web_port)
self.assertEqual(200, response.status_code)
reader = csv.reader(StringIO(response.text))
rows = []
for row in reader:
rows.append(row)
self.assertEqual(2, len(rows))
self.assertEqual("Test exception", rows[1][1])
self.assertEqual(2, int(rows[1][0]), "Exception count should be 2")
def test_traceback_format(self):
try:
raise KeyError('blah')
except KeyError:
type_, value, tb = sys.exc_info()
traceback_fmt = 'Traceback (most recent call last):\n' + \
''.join(traceback.format_tb(tb))
file_ = StringIO()
traceback_print(tb, file_)
python_fmt = file_.getvalue()
else:
raise Error("unable to create test traceback string")
# Make sure that Python and the traceback module format the same thing
self.assertEqual(traceback_fmt, python_fmt)
# Make sure that the traceback is properly indented.
tb_lines = python_fmt.splitlines()
self.assertEqual(len(tb_lines), 3)
banner, location, source_line = tb_lines
self.assertTrue(banner.startswith('Traceback'))
self.assertTrue(location.startswith(' File'))
self.assertTrue(source_line.startswith(' raise'))
def test_MemoryError(self):
# PyErr_NoMemory always raises the same exception instance.
# Check that the traceback is not doubled.
import traceback
from _testcapi import raise_memoryerror
def raiseMemError():
try:
raise_memoryerror()
except MemoryError as e:
tb = e.__traceback__
else:
self.fail("Should have raises a MemoryError")
return traceback.format_tb(tb)
tb1 = raiseMemError()
tb2 = raiseMemError()
self.assertEqual(tb1, tb2)
def __repr__(self):
if self.has_exception:
return "Attempts: {0}, Error:\n{1}".format(self.attempt_number, "".join(traceback.format_tb(self.value[2])))
else:
return "Attempts: {0}, Value: {1}".format(self.attempt_number, self.value)
def __repr__(self):
if self.has_exception:
return "Attempts: {0}, Error:\n{1}".format(self.attempt_number, "".join(traceback.format_tb(self.value[2])))
else:
return "Attempts: {0}, Value: {1}".format(self.attempt_number, self.value)
def query(self, obj, operator=None):
# Get the complete current value via the base class interface.
value = self._query(obj)
if value is None:
return any.to_any(value)
# Don't apply an operator if the value is already a CORBA Any, which
# should only occur in the case of a (legacy) user callback.
if isinstance(value, CORBA.Any):
return value
# Attempt to apply the operator to the returned value.
try:
if operator in ("[]", "[*]"):
value = self._getDefaultOperator(value, operator)
elif operator == "[?]":
value = self._getKeysOperator(value, operator)
return any.to_any(check_type_for_long(value))
elif operator == "[@]":
value = self._getKeyValuePairsOperator(value, operator)
return any.to_any(check_type_for_long(value))
elif operator == "[#]":
value = len(self._getDefaultOperator(value, operator))
return any.to_any(check_type_for_long(value))
elif operator != None:
value = self._getSliceOperator(value, operator)
except Exception, e:
raise AttributeError, "error processing operator '%s': '%s' %s" % (operator, e, "\n".join(traceback.format_tb(sys.exc_traceback)))
# At this point, value must be a normal sequence, so we can use the
# standard conversion routine.
return self._toAny(value)
def print_exception(type=None, value=None, tb=None, limit=None):
if type is None:
type, value, tb = sys.exc_info()
import traceback
print
print "<H3>Traceback (most recent call last):</H3>"
list = traceback.format_tb(tb, limit) + \
traceback.format_exception_only(type, value)
print "<PRE>%s<B>%s</B></PRE>" % (
escape("".join(list[:-1])),
escape(list[-1]),
)
del tb
def __repr__(self):
if self.has_exception:
return "Attempts: {0}, Error:\n{1}".format(self.attempt_number, "".join(traceback.format_tb(self.value[2])))
else:
return "Attempts: {0}, Value: {1}".format(self.attempt_number, self.value)
def execute(self):
# noinspection PyBroadException
try:
if self._argv is None:
self._argv = os.path.splitext(os.path.basename(self._path))[0]
self._execute(self._path, self._argv, self._environ)
except:
error_type, error, tb = sys.exc_info()
message = 'Command execution failed: ' + unicode(error)
self._logger.error(message + '\nTraceback:\n' + ''.join(traceback.format_tb(tb)))
sys.exit(1)
def __repr__(self):
if self.has_exception:
return "Attempts: {0}, Error:\n{1}".format(self.attempt_number, "".join(traceback.format_tb(self.value[2])))
else:
return "Attempts: {0}, Value: {1}".format(self.attempt_number, self.value)