def error(*k, **kw):
"""
Wrap logging.errors, display the origin of the message when '-vv' is set
"""
global log
log.error(*k, **kw)
if verbose > 2:
st = traceback.extract_stack()
if st:
st = st[:-1]
buf = []
for filename, lineno, name, line in st:
buf.append(' File "%s", line %d, in %s' % (filename, lineno, name))
if line:
buf.append(' %s' % line.strip())
if buf: log.error("\n".join(buf))
python类extract_stack()的实例源码
def worker_int(worker):
worker.log.info("worker received INT or QUIT signal")
## get traceback info
import threading, sys, traceback
id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
threadId))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename,
lineno, name))
if line:
code.append(" %s" % (line.strip()))
worker.log.debug("\n".join(code))
def __init__(self, msg='', ex=None):
"""
:param msg: error message
:type msg: string
:param ex: exception causing this error (optional)
:type ex: exception
"""
self.msg = msg
assert not isinstance(msg, Exception)
self.stack = []
if ex:
if not msg:
self.msg = str(ex)
if isinstance(ex, WafError):
self.stack = ex.stack
else:
self.stack = traceback.extract_tb(sys.exc_info()[2])
self.stack += traceback.extract_stack()[:-1]
self.verbose_msg = ''.join(traceback.format_list(self.stack))
def error(*k, **kw):
"""
Wrap logging.errors, display the origin of the message when '-vv' is set
"""
global log
log.error(*k, **kw)
if verbose > 2:
st = traceback.extract_stack()
if st:
st = st[:-1]
buf = []
for filename, lineno, name, line in st:
buf.append(' File "%s", line %d, in %s' % (filename, lineno, name))
if line:
buf.append(' %s' % line.strip())
if buf: log.error("\n".join(buf))
def __init__(self, msg='', ex=None):
"""
:param msg: error message
:type msg: string
:param ex: exception causing this error (optional)
:type ex: exception
"""
self.msg = msg
assert not isinstance(msg, Exception)
self.stack = []
if ex:
if not msg:
self.msg = str(ex)
if isinstance(ex, WafError):
self.stack = ex.stack
else:
self.stack = traceback.extract_tb(sys.exc_info()[2])
self.stack += traceback.extract_stack()[:-1]
self.verbose_msg = ''.join(traceback.format_list(self.stack))
def error(*k, **kw):
"""
Wrap logging.errors, display the origin of the message when '-vv' is set
"""
global log
log.error(*k, **kw)
if verbose > 2:
st = traceback.extract_stack()
if st:
st = st[:-1]
buf = []
for filename, lineno, name, line in st:
buf.append(' File "%s", line %d, in %s' % (filename, lineno, name))
if line:
buf.append(' %s' % line.strip())
if buf: log.error("\n".join(buf))
def filter(self, record):
from config import conf
wt = conf.warning_threshold
if wt > 0:
stk = traceback.extract_stack()
caller=None
for f,l,n,c in stk:
if n == 'warning':
break
caller = l
tm,nb = self.warning_table.get(caller, (0,0))
ltm = time.time()
if ltm-tm > wt:
tm = ltm
nb = 0
else:
if nb < 2:
nb += 1
if nb == 2:
record.msg = "more "+record.msg
else:
return 0
self.warning_table[caller] = (tm,nb)
return 1
def filter(self, record):
from config import conf
wt = conf.warning_threshold
if wt > 0:
stk = traceback.extract_stack()
caller=None
for f,l,n,c in stk:
if n == 'warning':
break
caller = l
tm,nb = self.warning_table.get(caller, (0,0))
ltm = time.time()
if ltm-tm > wt:
tm = ltm
nb = 0
else:
if nb < 2:
nb += 1
if nb == 2:
record.msg = "more "+record.msg
else:
return 0
self.warning_table[caller] = (tm,nb)
return 1
def __init__(self, *, loop=None):
"""Initialize the future.
The optional event_loop argument allows to explicitly set the event
loop object used by the future. If it's not provided, the future uses
the default event loop.
"""
if loop is None:
self._loop = events.get_event_loop() # ????
else:
self._loop = loop
self._callbacks = [] # ??
if self._loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
def first_spark_call():
"""
Return a CallSite representing the first Spark call in the current call stack.
"""
tb = traceback.extract_stack()
if len(tb) == 0:
return None
file, line, module, what = tb[len(tb) - 1]
sparkpath = os.path.dirname(file)
first_spark_frame = len(tb) - 1
for i in range(0, len(tb)):
file, line, fun, what = tb[i]
if file.startswith(sparkpath):
first_spark_frame = i
break
if first_spark_frame == 0:
file, line, fun, what = tb[0]
return CallSite(function=fun, file=file, linenum=line)
sfile, sline, sfun, swhat = tb[first_spark_frame]
ufile, uline, ufun, uwhat = tb[first_spark_frame - 1]
return CallSite(function=sfun, file=ufile, linenum=uline)
def filter(self, record):
from config import conf
wt = conf.warning_threshold
if wt > 0:
stk = traceback.extract_stack()
caller=None
for f,l,n,c in stk:
if n == 'warning':
break
caller = l
tm,nb = self.warning_table.get(caller, (0,0))
ltm = time.time()
if ltm-tm > wt:
tm = ltm
nb = 0
else:
if nb < 2:
nb += 1
if nb == 2:
record.msg = "more "+record.msg
else:
return 0
self.warning_table[caller] = (tm,nb)
return 1
def log_callstack_last(back_trace=False):
if not back_trace:
stack = traceback.extract_stack()[:-1]
else:
stack = traceback.extract_tb(sys.exc_info()[2])
message = "empty"
print("Parsing stack")
for path, line, func, code in stack:
print(path,func)
if func not in BACKTRACE_FILTER_FUNC:
if func not in BACKTRACE_FILTER_HIDE_CODE:
file = os.path.split(path)[-1]
message = "%s:%s" % (file, line)
return message
def filter(self, record):
from .config import conf
wt = conf.warning_threshold
if wt > 0:
stk = traceback.extract_stack()
caller=None
for f,l,n,c in stk:
if n == 'warning':
break
caller = l
tm,nb = self.warning_table.get(caller, (0,0))
ltm = time.time()
if ltm-tm > wt:
tm = ltm
nb = 0
else:
if nb < 2:
nb += 1
if nb == 2:
record.msg = "more "+record.msg
else:
return 0
self.warning_table[caller] = (tm,nb)
return 1
def worker_int(worker):
worker.log.info("worker received INT or QUIT signal")
## get traceback info
import threading, sys, traceback
id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
threadId))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename,
lineno, name))
if line:
code.append(" %s" % (line.strip()))
worker.log.debug("\n".join(code))
def known(s):
tb = traceback.extract_stack()
tb = tb[-1]
position = "%s:%s" % (tb[2], tb[1])
db = get_config().get_db_con()
if db.known(s):
sample_info = db.sample_info_fetch(s)
return RuleResult(position,
result=sample_info.get_result(),
reason=sample_info.reason,
further_analysis=False)
return RuleResult(position,
result=Result.unknown,
reason="Datei ist dem System noch nicht bekannt",
further_analysis=True)
def file_larger_than(s, args):
tb = traceback.extract_stack()
tb = tb[-1]
position = "%s:%s" % (tb[2], tb[1])
size = args['byte']
if s.file_size > size:
return RuleResult(position,
result=Result.unknown,
reason="Datei hat mehr als %d bytes"
% size,
further_analysis=True)
return RuleResult(position,
result=Result.ignored,
reason="Datei ist nur %d bytes lang"
% s.file_size,
further_analysis=False)
def cuckoo_score(s, args):
tb = traceback.extract_stack()
tb = tb[-1]
position = "%s:%s" % (tb[2], tb[1])
threshold = args['higher']
if s.cuckoo_report.score >= threshold:
return RuleResult(position,
result=Result.bad,
reason="Cuckoo score >= %s: %s"
% (threshold, s.cuckoo_report.score),
further_analysis=False)
return RuleResult(position,
result=Result.unknown,
reason="Cuckoo score < %s: %s"
% (threshold, s.cuckoo_report.score),
further_analysis=True)
def doTraceback(self, module):
try:
module.do_raise()
except:
tb = sys.exc_info()[2].tb_next
f,lno,n,line = extract_tb(tb, 1)[0]
self.assertEqual(line, raise_src.strip())
f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
self.assertEqual(line, raise_src.strip())
s = io.StringIO()
print_tb(tb, 1, s)
self.assertTrue(s.getvalue().endswith(raise_src))
else:
raise AssertionError("This ought to be impossible")
def _raise_typecheck_error(msg, is_return=False, value=None, received_type=None,
expected_type=None, func=None):
if pytypes.warning_mode:
import traceback
tb = traceback.extract_stack()
off = util._calc_traceback_list_offset(tb)
cat = pytypes.ReturnTypeWarning if is_return else pytypes.InputTypeWarning
warn_explicit(msg, cat, tb[off][0], tb[off][1])
# if not func is None:
# warn_explicit(msg, cat, func.__code__.co_filename,
# func.__code__.co_firstlineno, func.__module__)
# else:
# warn(msg, pytypes.ReturnTypeWarning)
else:
if is_return:
raise pytypes.ReturnTypeError(msg)
else:
raise pytypes.InputTypeError(msg)
def _warn_argname(msg, func, slf, clsm, cls=None, warn_tp=pytypes.exceptions.TypeWarning):
if not pytypes.warn_argnames:
return
if cls is None:
if slf or clsm:
try:
cls_name = get_class_that_defined_method(func).__name__
except:
cls_name = '<unknown class>'
else:
cls_name = None
else:
cls_name = cls.__name__
tb = traceback.extract_stack()
off = _calc_traceback_list_offset(tb)
if cls_name is None:
_msg = '%s: %s.%s'%(msg, func.__module__, func.__name__)
else:
_msg = '%s: %s.%s.%s'%(msg, func.__module__, cls_name, func.__name__)
warn_explicit(_msg, warn_tp, tb[off][0], tb[off][1])
def sigquit_handler(sig, frame):
"""Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT
e.g. kill -s QUIT <PID> or CTRL+\
"""
print("Dumping stack traces for all threads in PID {}".format(os.getpid()))
id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for thread_id, stack in sys._current_frames().items():
code.append("\n# Thread: {}({})"
.format(id_to_name.get(thread_id, ""), thread_id))
for filename, line_number, name, line in traceback.extract_stack(stack):
code.append('File: "{}", line {}, in {}'
.format(filename, line_number, name))
if line:
code.append(" {}".format(line.strip()))
print("\n".join(code))
def filter(self, record):
from config import conf
wt = conf.warning_threshold
if wt > 0:
stk = traceback.extract_stack()
caller=None
for f,l,n,c in stk:
if n == 'warning':
break
caller = l
tm,nb = self.warning_table.get(caller, (0,0))
ltm = time.time()
if ltm-tm > wt:
tm = ltm
nb = 0
else:
if nb < 2:
nb += 1
if nb == 2:
record.msg = "more "+record.msg
else:
return 0
self.warning_table[caller] = (tm,nb)
return 1
def filter(self, record):
from .config import conf
wt = conf.warning_threshold
if wt > 0:
stk = traceback.extract_stack()
caller=None
for f,l,n,c in stk:
if n == 'warning':
break
caller = l
tm,nb = self.warning_table.get(caller, (0,0))
ltm = time.time()
if ltm-tm > wt:
tm = ltm
nb = 0
else:
if nb < 2:
nb += 1
if nb == 2:
record.msg = "more "+record.msg
else:
return 0
self.warning_table[caller] = (tm,nb)
return 1
def extract_address():
#tb = traceback.extract_stack()
# print()
# for t in tb:
# print(t[0], t[1], t[2], t[3])
#frame = tb[-3]
# return '{0}/{1}/{2}'.format(frame[1], frame[2], frame[3])
#return '{0}/{1}'.format(frame[1], frame[2])
# Retun an address in the format:
# 'instruction pointer' / 'qualified function name'
frame = sys._getframe(2)
ip = frame.f_lasti
names = []
var_name = _extract_target_of_assignment()
if var_name is not None:
names.append(var_name)
while frame is not None:
n = frame.f_code.co_name
if n.startswith('<'): break
names.append(n)
if n == _current_function_name: break
frame = frame.f_back
return "{}/{}".format(ip, '.'.join(reversed(names)))
def timeRemaining(seconds=0):
""" Function to be used with 'while' """
global _timers
if seconds == 0: return True
now = time.time()
stack = traceback.extract_stack()
filename, line_no, q1, q2 = stack[-2]
if filename.startswith("<pyshell"):
filename = "pyshell"
if (filename, line_no) not in _timers:
_timers[(filename, line_no)] = (now, seconds)
return True
start, duration = _timers[(filename, line_no)]
if seconds != duration:
_timers[(filename, line_no)] = (now, seconds)
return True
if now - start > duration:
del _timers[(filename, line_no)]
return False
else:
return True
def doTraceback(self, module):
try:
module.do_raise()
except:
tb = sys.exc_info()[2].tb_next
f,lno,n,line = extract_tb(tb, 1)[0]
self.assertEqual(line, raise_src.strip())
f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
self.assertEqual(line, raise_src.strip())
s = StringIO.StringIO()
print_tb(tb, 1, s)
self.assertTrue(s.getvalue().endswith(raise_src))
else:
raise AssertionError("This ought to be impossible")
def doTraceback(self, module):
try:
module.do_raise()
except:
tb = sys.exc_info()[2].tb_next
f,lno,n,line = extract_tb(tb, 1)[0]
self.assertEqual(line, raise_src.strip())
f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
self.assertEqual(line, raise_src.strip())
s = StringIO.StringIO()
print_tb(tb, 1, s)
self.assertTrue(s.getvalue().endswith(raise_src))
else:
raise AssertionError("This ought to be impossible")
def filter(self, record):
from config import conf
wt = conf.warning_threshold
if wt > 0:
stk = traceback.extract_stack()
caller=None
for f,l,n,c in stk:
if n == 'warning':
break
caller = l
tm,nb = self.warning_table.get(caller, (0,0))
ltm = time.time()
if ltm-tm > wt:
tm = ltm
nb = 0
else:
if nb < 2:
nb += 1
if nb == 2:
record.msg = "more "+record.msg
else:
return 0
self.warning_table[caller] = (tm,nb)
return 1
def filter(self, record):
from config import conf
wt = conf.warning_threshold
if wt > 0:
stk = traceback.extract_stack()
caller=None
for f,l,n,c in stk:
if n == 'warning':
break
caller = l
tm,nb = self.warning_table.get(caller, (0,0))
ltm = time.time()
if ltm-tm > wt:
tm = ltm
nb = 0
else:
if nb < 2:
nb += 1
if nb == 2:
record.msg = "more "+record.msg
else:
return 0
self.warning_table[caller] = (tm,nb)
return 1
def _raw_log(self, logfn, message, exc_info):
cname = ''
loc = ''
fn = ''
tb = traceback.extract_stack()
if len(tb) > 2:
if self.show_loc:
loc = '(%s:%d):' % (os.path.basename(tb[-3][0]), tb[-3][1])
fn = tb[-3][2]
if fn != '<module>':
if self.__class__.__name__ != Logger.__name__:
fn = self.__class__.__name__ + '.' + fn
fn += '()'
if self.show_fcn:
logfn(loc + cname + fn + ': ' + message, exc_info=exc_info)
else:
logfn(message, exc_info=exc_info)