def filter(self, record):
from config import conf
wt = conf.warning_threshold
if wt > 0:
stk = traceback.extract_stack()
caller=None
for f,l,n,c in stk:
if n == 'warning':
break
caller = l
tm,nb = self.warning_table.get(caller, (0,0))
ltm = time.time()
if ltm-tm > wt:
tm = ltm
nb = 0
else:
if nb < 2:
nb += 1
if nb == 2:
record.msg = "more "+record.msg
else:
return 0
self.warning_table[caller] = (tm,nb)
return 1
python类extract_stack()的实例源码
def _get_current_traceback(self, thread ):
'''????????????
:param thread: ????????
:type thread: Thread
'''
for thread_id, stack in sys._current_frames().items():
if thread_id != thread.ident:
continue
tb = "Traceback ( thread-%d possibly hold at ):\n" % thread_id
for filename, lineno, name, line in traceback.extract_stack(stack):
tb += ' File: "%s", line %d, in %s\n' % (filename, lineno, name)
if line:
tb += " %s\n" % (line.strip())
return tb
else:
raise RuntimeError("thread not found")
def get_thread_traceback(thread):
'''????????????
:param thread: ????????
:type thread: Thread
'''
for thread_id, stack in sys._current_frames().items():
if thread_id != thread.ident:
continue
tb = "Traceback ( thread-%d possibly hold at ):\n" % thread_id
for filename, lineno, name, line in traceback.extract_stack(stack):
tb += ' File: "%s", line %d, in %s\n' % (filename, lineno, name)
if line:
tb += " %s\n" % (line.strip())
return tb
else:
raise RuntimeError("thread not found")
def caller_trace(back=0):
"""
Trace caller stack and save info into global dicts, which
are printed automatically at the end of SCons execution.
"""
global caller_bases, caller_dicts
import traceback
tb = traceback.extract_stack(limit=3+back)
tb.reverse()
callee = tb[1][:3]
caller_bases[callee] = caller_bases.get(callee, 0) + 1
for caller in tb[2:]:
caller = callee + caller[:3]
try:
entry = caller_dicts[callee]
except KeyError:
caller_dicts[callee] = entry = {}
entry[caller] = entry.get(caller, 0) + 1
callee = caller
# print a single caller and its callers, if any
def get_interactive_console(thread_id, frame_id, frame, console_message):
"""returns the global interactive console.
interactive console should have been initialized by this time
:rtype: DebugConsole
"""
if InteractiveConsoleCache.thread_id == thread_id and InteractiveConsoleCache.frame_id == frame_id:
return InteractiveConsoleCache.interactive_console_instance
InteractiveConsoleCache.interactive_console_instance = DebugConsole()
InteractiveConsoleCache.thread_id = thread_id
InteractiveConsoleCache.frame_id = frame_id
console_stacktrace = traceback.extract_stack(frame, limit=1)
if console_stacktrace:
current_context = console_stacktrace[0] # top entry from stacktrace
context_message = 'File "%s", line %s, in %s' % (current_context[0], current_context[1], current_context[2])
console_message.add_console_message(CONSOLE_OUTPUT, "[Current context]: %s" % (context_message,))
return InteractiveConsoleCache.interactive_console_instance
def filter(self, record):
from config import conf
wt = conf.warning_threshold
if wt > 0:
stk = traceback.extract_stack()
caller=None
for f,l,n,c in stk:
if n == 'warning':
break
caller = l
tm,nb = self.warning_table.get(caller, (0,0))
ltm = time.time()
if ltm-tm > wt:
tm = ltm
nb = 0
else:
if nb < 2:
nb += 1
if nb == 2:
record.msg = "more "+record.msg
else:
return 0
self.warning_table[caller] = (tm,nb)
return 1
def doTraceback(self, module):
try:
module.do_raise()
except:
tb = sys.exc_info()[2].tb_next
f,lno,n,line = extract_tb(tb, 1)[0]
self.assertEqual(line, raise_src.strip())
f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
self.assertEqual(line, raise_src.strip())
s = io.StringIO()
print_tb(tb, 1, s)
self.assertTrue(s.getvalue().endswith(raise_src))
else:
raise AssertionError("This ought to be impossible")
def _worker(self, dictobj):
""""""
func = dictobj['func']
args = dictobj['args']
argv = dictobj['argv']
try:
result = func(*args, **argv)
except Exception as e:
#print 'ecp occured'
result = tuple([e, traceback.extract_stack()])
self.lock.acquire()
self._executed_task_count = self._executed_task_count + 1
self._add_result_to_queue(result=result)
self.lock.release()
def _worker(self, dictobj):
""""""
func = dictobj['func']
args = dictobj['args']
argv = dictobj['argv']
try:
result = func(*args, **argv)
except Exception as e:
#print 'ecp occured'
result = tuple([e, traceback.extract_stack()])
self.lock.acquire()
self._executed_task_count = self._executed_task_count + 1
self._add_result_to_queue(result=result)
self.lock.release()
def worker_int(worker):
worker.log.info("worker received INT or QUIT signal")
## get traceback info
import threading, sys, traceback
id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
threadId))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename,
lineno, name))
if line:
code.append(" %s" % (line.strip()))
worker.log.debug("\n".join(code))
def mute_errors(self, fn, *args, **kwargs):
"""Run given function and catch any of the errors it logged.
The self._errors key will not be changed by using this function.
Works by patching the BaseValidator.log_error function.
BaseValidator.log_error should not be overridden in children.
"""
caught_errors = set()
def patched_log_error(self, field, msg):
tb = traceback.extract_stack()[:-1] if self.debug else None
caught_errors.add(ValidatorLogError(msg, self._path + (field,), tb))
old_log_error = BaseValidator.log_error
try:
BaseValidator.log_error = patched_log_error
val = fn(*args, **kwargs)
finally:
BaseValidator.log_error = old_log_error
return val, caught_errors
def log_error(self, field, msg):
"""Add an error to the validator.
This method should not be overridden in subclasses, as doing so
is likely to break the error and warning coercion decorators.
:param field: The field the error was raised on.
:param msg: The message to associate with the error.
"""
if self.collect_errors:
tb = traceback.extract_stack()[:-1] if self.debug else None
err = ValidatorLogError(msg, self._path + (field,), tb)
if self.verbose:
self._IIIFValidator.logger.error(str(err))
self._errors.add(err)
if self.fail_fast:
raise FailFastException
def refresh():
override_files = []
for stack in traceback.extract_stack():
f = os.path.join(os.path.dirname(stack[0]), OVERRIDE_FILE)
if f not in override_files:
override_files.insert(0, f)
if OVERRIDE_FILE in override_files:
del override_files[override_files.index(OVERRIDE_FILE)]
override_files.append(OVERRIDE_FILE)
def import_path(path):
if sys.version_info < (3, 5, 0):
from importlib.machinery import SourceFileLoader
return SourceFileLoader(__name__, path).load_module()
import importlib.util
spec = importlib.util.spec_from_file_location(__name__, path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
for override_file in override_files:
if not os.path.isfile(override_file):
continue
mod = import_path(override_file)
globals().update({n: getattr(mod, n) for n in dir(mod) if not n.startswith("__")})
def blockingCallFromThread(reactor, f, *args, **kwargs):
"""
Improved version of twisted's blockingCallFromThread that shows the complete
stacktrace when an exception is raised on the reactor's thread.
If being called from the reactor thread already, just return the result of execution of the callable.
"""
if isInIOThread():
return f(*args, **kwargs)
else:
queue = Queue.Queue()
def _callFromThread():
result = defer.maybeDeferred(f, *args, **kwargs)
result.addBoth(queue.put)
reactor.callFromThread(_callFromThread)
result = queue.get()
if isinstance(result, failure.Failure):
other_thread_tb = traceback.extract_tb(result.getTracebackObject())
this_thread_tb = traceback.extract_stack()
logger.error("Exception raised on the reactor's thread %s: \"%s\".\n Traceback from this thread:\n%s\n"
" Traceback from the reactor's thread:\n %s", result.type.__name__, result.getErrorMessage(),
''.join(traceback.format_list(this_thread_tb)), ''.join(traceback.format_list(other_thread_tb)))
result.raiseException()
return result
def caller_trace(back=0):
"""
Trace caller stack and save info into global dicts, which
are printed automatically at the end of SCons execution.
"""
global caller_bases, caller_dicts
import traceback
tb = traceback.extract_stack(limit=3+back)
tb.reverse()
callee = tb[1][:3]
caller_bases[callee] = caller_bases.get(callee, 0) + 1
for caller in tb[2:]:
caller = callee + caller[:3]
try:
entry = caller_dicts[callee]
except KeyError:
caller_dicts[callee] = entry = {}
entry[caller] = entry.get(caller, 0) + 1
callee = caller
# print a single caller and its callers, if any
def doTraceback(self, module):
try:
module.do_raise()
except:
tb = sys.exc_info()[2].tb_next
f,lno,n,line = extract_tb(tb, 1)[0]
self.assertEqual(line, raise_src.strip())
f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
self.assertEqual(line, raise_src.strip())
s = StringIO.StringIO()
print_tb(tb, 1, s)
self.assertTrue(s.getvalue().endswith(raise_src))
else:
raise AssertionError("This ought to be impossible")
def first_spark_call():
"""
Return a CallSite representing the first Spark call in the current call stack.
"""
tb = traceback.extract_stack()
if len(tb) == 0:
return None
file, line, module, what = tb[len(tb) - 1]
sparkpath = os.path.dirname(file)
first_spark_frame = len(tb) - 1
for i in range(0, len(tb)):
file, line, fun, what = tb[i]
if file.startswith(sparkpath):
first_spark_frame = i
break
if first_spark_frame == 0:
file, line, fun, what = tb[0]
return CallSite(function=fun, file=file, linenum=line)
sfile, sline, sfun, swhat = tb[first_spark_frame]
ufile, uline, ufun, uwhat = tb[first_spark_frame - 1]
return CallSite(function=sfun, file=ufile, linenum=uline)
def doTraceback(self, module):
try:
module.do_raise()
except:
tb = sys.exc_info()[2].tb_next
f,lno,n,line = extract_tb(tb, 1)[0]
self.assertEqual(line, raise_src.strip())
f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
self.assertEqual(line, raise_src.strip())
s = io.StringIO()
print_tb(tb, 1, s)
self.assertTrue(s.getvalue().endswith(raise_src))
else:
raise AssertionError("This ought to be impossible")
def getCallerName():
"""?????????????????, ???????????????"""
import traceback
s_trace = traceback.extract_stack()
full_function_name = s_trace[-3][2]
#?????????
#s_trace = s_trace[-4]
#full_function_name = s_trace[-1]
#s_filters = list('().')
#for s_filter in s_filters:
#if full_function_name != None:
#full_function_name = str.replace(full_function_name, s_filter, '')
#else:
#full_function_name = ''
#???? ????
filename = sys._getframe(2).f_code.co_filename
filename = os.path.basename(filename)
filename = filename.split('.')[0]
return filename + "_" + full_function_name
def assemble(*args, **kwargs):
"""????, ????????, ????????
args: tuple boll
return: bool
"""
#?????????????? ??four<-0.3, ???????
#???????? ???????? ??????
s_trace = traceback.extract_stack()
if len(s_trace)>=2 and is_allow:
#print s_trace[-2]
fname,line,mod,fn_name = s_trace[-2]
fn_name = s_trace[-1][-2]
args_text = _getFunctionArgs(fname, line, fn_name)
#print args_text
#print args
args = np.array(args)
return args.all()