def log(msg, msgx='', level=logging.INFO, need_tb=False):
"""
Logging in UCC Config Module.
:param msg: message content
:param msgx: detail info.
:param level: logging level
:param need_tb: if need logging traceback
:return:
"""
global LOGGING_STOPPED
if LOGGING_STOPPED:
return
msgx = ' - ' + msgx if msgx else ''
content = 'UCC Config Module: %s%s' % (msg, msgx)
if need_tb:
stack = ''.join(traceback.format_stack())
content = '%s\r\n%s' % (content, stack)
stulog.logger.log(level, content, exc_info=1)
python类format_stack()的实例源码
def timeout_response() -> chalice.Response:
"""
Produce a chalice Response object that indicates a timeout. Stacktraces for all running threads, other than the
current thread, are provided in the response object.
"""
frames = sys._current_frames()
current_threadid = threading.get_ident()
trace_dump = {
thread_id: traceback.format_stack(frame)
for thread_id, frame in frames.items()
if thread_id != current_threadid}
problem = {
'status': requests.codes.gateway_timeout,
'code': "timed_out",
'title': "Timed out processing request.",
'traces': trace_dump,
}
return chalice.Response(
status_code=problem['status'],
headers={"Content-Type": "application/problem+json"},
body=json.dumps(problem),
)
def tryLock(self, timeout=None, id=None):
""" Try to lock the mutex.
@param timeout int: give up after this many milliseconds
@param id: debug id
@return bool: whether locking succeeded
"""
if timeout is None:
locked = QtCore.QMutex.tryLock(self)
else:
locked = QtCore.QMutex.tryLock(self, timeout)
if self.debug and locked:
self.l.lock()
try:
if id is None:
self.tb.append(''.join(traceback.format_stack()[:-1]))
else:
self.tb.append(" " + str(id))
#print 'trylock', self, len(self.tb)
finally:
self.l.unlock()
return locked
def lock(self, id=None):
""" Lock mutex. Will try again every 5 seconds.
@param id: debug id
"""
c = 0
waitTime = 5000 # in ms
while True:
if self.tryLock(waitTime, id):
break
c += 1
if self.debug:
self.l.lock()
try:
logger.debug('Waiting for mutex lock ({:.1} sec). '
'Traceback follows:'.format(c*waitTime/1000.))
logger.debug(''.join(traceback.format_stack()))
if len(self.tb) > 0:
logger.debug('Mutex is currently locked from: {0}\n'
''.format(self.tb[-1]))
else:
logger.debug('Mutex is currently locked from [???]')
finally:
self.l.unlock()
#print 'lock', self, len(self.tb)
def _do_get(self):
if self._checked_out:
if self._checkout_traceback:
suffix = ' at:\n%s' % ''.join(
chop_traceback(self._checkout_traceback))
else:
suffix = ''
raise AssertionError("connection is already checked out" + suffix)
if not self._conn:
self._conn = self._create_connection()
self._checked_out = True
if self._store_traceback:
self._checkout_traceback = traceback.format_stack()
return self._conn
def _do_get(self):
if self._checked_out:
if self._checkout_traceback:
suffix = ' at:\n%s' % ''.join(
chop_traceback(self._checkout_traceback))
else:
suffix = ''
raise AssertionError("connection is already checked out" + suffix)
if not self._conn:
self._conn = self._create_connection()
self._checked_out = True
if self._store_traceback:
self._checkout_traceback = traceback.format_stack()
return self._conn
def _do_get(self):
if self._checked_out:
if self._checkout_traceback:
suffix = ' at:\n%s' % ''.join(
chop_traceback(self._checkout_traceback))
else:
suffix = ''
raise AssertionError("connection is already checked out" + suffix)
if not self._conn:
self._conn = self._create_connection()
self._checked_out = True
if self._store_traceback:
self._checkout_traceback = traceback.format_stack()
return self._conn
def _do_get(self):
if self._checked_out:
if self._checkout_traceback:
suffix = ' at:\n%s' % ''.join(
chop_traceback(self._checkout_traceback))
else:
suffix = ''
raise AssertionError("connection is already checked out" + suffix)
if not self._conn:
self._conn = self._create_connection()
self._checked_out = True
if self._store_traceback:
self._checkout_traceback = traceback.format_stack()
return self._conn
def _do_get(self):
if self._checked_out:
if self._checkout_traceback:
suffix = ' at:\n%s' % ''.join(
chop_traceback(self._checkout_traceback))
else:
suffix = ''
raise AssertionError("connection is already checked out" + suffix)
if not self._conn:
self._conn = self._create_connection()
self._checked_out = True
if self._store_traceback:
self._checkout_traceback = traceback.format_stack()
return self._conn
def run(self):
def log_failure(failure):
logger.exception(failure.value)
if failure.frames:
logger.critical(str("").join(format_tb(failure.getTracebackObject())))
def errback_main(failure):
log_failure(failure)
self.task.start(interval=0).addErrback(errback_main)
def errback_flush_states(failure):
log_failure(failure)
self._flush_states_task.start(interval=300).addErrback(errback_flush_states)
def debug(sig, frame):
logger.critical("Signal received: printing stack trace")
logger.critical(str("").join(format_stack(frame)))
self.task.start(interval=0).addErrback(errback_main)
self._logging_task.start(interval=30)
self._flush_states_task.start(interval=300).addErrback(errback_flush_states)
signal(SIGUSR1, debug)
reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
reactor.run()
def _startRunCallbacks(self, result):
if self.called:
if self._suppressAlreadyCalled:
self._suppressAlreadyCalled = False
return
if self.debug:
if self._debugInfo is None:
self._debugInfo = DebugInfo()
extra = "\n" + self._debugInfo._getDebugTracebacks()
raise AlreadyCalledError(extra)
raise AlreadyCalledError
if self.debug:
if self._debugInfo is None:
self._debugInfo = DebugInfo()
self._debugInfo.invoker = traceback.format_stack()[:-2]
self.called = True
self.result = result
self._runCallbacks()
def get_file(file_id, api=None):
# TODO: remove this function and just use the Pillar SDK directly.
if file_id is None:
return None
if api is None:
api = system_util.pillar_api()
try:
return File.find(file_id, api=api)
except ResourceNotFound:
f = sys.exc_info()[2].tb_frame.f_back
tb = traceback.format_stack(f=f, limit=2)
log.warning('File %s not found, but requested from %s\n%s',
file_id, request.url, ''.join(tb))
return None
def debug(self, truesock_func, *a, **kw):
if self.is_http:
frame = inspect.stack()[0][0]
lines = list(map(utf8, traceback.format_stack(frame)))
message = [
"HTTPretty intercepted and unexpected socket method call.",
("Please open an issue at "
"'https://github.com/gabrielfalcao/HTTPretty/issues'"),
"And paste the following traceback:\n",
"".join(decode_utf8(lines)),
]
raise RuntimeError("\n".join(message))
if not self.truesock:
raise UnmockedError()
return getattr(self.truesock, truesock_func)(*a, **kw)
def _do_get(self):
if self._checked_out:
if self._checkout_traceback:
suffix = ' at:\n%s' % ''.join(
chop_traceback(self._checkout_traceback))
else:
suffix = ''
raise AssertionError("connection is already checked out" + suffix)
if not self._conn:
self._conn = self._create_connection()
self._checked_out = True
if self._store_traceback:
self._checkout_traceback = traceback.format_stack()
return self._conn
def _do_get(self):
if self._checked_out:
if self._checkout_traceback:
suffix = ' at:\n%s' % ''.join(
chop_traceback(self._checkout_traceback))
else:
suffix = ''
raise AssertionError("connection is already checked out" + suffix)
if not self._conn:
self._conn = self._create_connection()
self._checked_out = True
if self._store_traceback:
self._checkout_traceback = traceback.format_stack()
return self._conn
def execute_insert_sql(self, *args, **kwargs):
stack_trace = traceback.format_stack()
query_dict = {
'query': [],
'query_type': QUERY_TYPE_WRITE,
'traceback': stack_trace,
'model': "%s.%s" % (self.query.model.__module__, self.query.model.__name__),
'start_time': datetime.datetime.now(),
}
for sql, params in self.as_sql():
query_dict['query'].append(sql % params)
try:
return self._snoopy_execute_insert_sql(*args, **kwargs)
finally:
# This gets called just before the `return`
query_dict['end_time'] = datetime.datetime.now()
SnoopyRequest.record_query_data(query_dict)
def _do_get(self):
if self._checked_out:
if self._checkout_traceback:
suffix = ' at:\n%s' % ''.join(
chop_traceback(self._checkout_traceback))
else:
suffix = ''
raise AssertionError("connection is already checked out" + suffix)
if not self._conn:
self._conn = self._create_connection()
self._checked_out = True
if self._store_traceback:
self._checkout_traceback = traceback.format_stack()
return self._conn
def _do_get(self):
if self._checked_out:
if self._checkout_traceback:
suffix = ' at:\n%s' % ''.join(
chop_traceback(self._checkout_traceback))
else:
suffix = ''
raise AssertionError("connection is already checked out" + suffix)
if not self._conn:
self._conn = self._create_connection()
self._checked_out = True
if self._store_traceback:
self._checkout_traceback = traceback.format_stack()
return self._conn
def _captureThreadStacks(self):
"""Capture the stacks for all currently running threads.
:return: A list of ``(thread-description, stack)`` tuples. See
`traceback.format_stack` for the format of ``stack``.
"""
threads = {t.ident: t for t in threading.enumerate()}
def describe(ident):
if ident in threads:
return repr(threads[ident])
else:
return "<*Unknown* %d>" % ident
return [
(describe(ident), traceback.format_stack(frame))
for ident, frame in sys._current_frames().items()
]
def _do_get(self):
if self._checked_out:
if self._checkout_traceback:
suffix = ' at:\n%s' % ''.join(
chop_traceback(self._checkout_traceback))
else:
suffix = ''
raise AssertionError("connection is already checked out" + suffix)
if not self._conn:
self._conn = self._create_connection()
self._checked_out = True
if self._store_traceback:
self._checkout_traceback = traceback.format_stack()
return self._conn