python类extract_stack()的实例源码

Logs.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def error(*k, **kw):
    """
    Wrap logging.errors, display the origin of the message when '-vv' is set
    """
    global log
    log.error(*k, **kw)
    if verbose > 2:
        st = traceback.extract_stack()
        if st:
            st = st[:-1]
            buf = []
            for filename, lineno, name, line in st:
                buf.append('  File "%s", line %d, in %s' % (filename, lineno, name))
                if line:
                    buf.append('    %s' % line.strip())
            if buf: log.error("\n".join(buf))
config.py 文件源码 项目:SWEETer-Cat 作者: DanielAndreasen 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")

    ## get traceback info
    import threading, sys, traceback
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
Errors.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, msg='', ex=None):
        """
        :param msg: error message
        :type msg: string
        :param ex: exception causing this error (optional)
        :type ex: exception
        """
        self.msg = msg
        assert not isinstance(msg, Exception)

        self.stack = []
        if ex:
            if not msg:
                self.msg = str(ex)
            if isinstance(ex, WafError):
                self.stack = ex.stack
            else:
                self.stack = traceback.extract_tb(sys.exc_info()[2])
        self.stack += traceback.extract_stack()[:-1]
        self.verbose_msg = ''.join(traceback.format_list(self.stack))
Logs.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def error(*k, **kw):
    """
    Wrap logging.errors, display the origin of the message when '-vv' is set
    """
    global log
    log.error(*k, **kw)
    if verbose > 2:
        st = traceback.extract_stack()
        if st:
            st = st[:-1]
            buf = []
            for filename, lineno, name, line in st:
                buf.append('  File "%s", line %d, in %s' % (filename, lineno, name))
                if line:
                    buf.append('    %s' % line.strip())
            if buf: log.error("\n".join(buf))
Errors.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self, msg='', ex=None):
        """
        :param msg: error message
        :type msg: string
        :param ex: exception causing this error (optional)
        :type ex: exception
        """
        self.msg = msg
        assert not isinstance(msg, Exception)

        self.stack = []
        if ex:
            if not msg:
                self.msg = str(ex)
            if isinstance(ex, WafError):
                self.stack = ex.stack
            else:
                self.stack = traceback.extract_tb(sys.exc_info()[2])
        self.stack += traceback.extract_stack()[:-1]
        self.verbose_msg = ''.join(traceback.format_list(self.stack))
Logs.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def error(*k, **kw):
    """
    Wrap logging.errors, display the origin of the message when '-vv' is set
    """
    global log
    log.error(*k, **kw)
    if verbose > 2:
        st = traceback.extract_stack()
        if st:
            st = st[:-1]
            buf = []
            for filename, lineno, name, line in st:
                buf.append('  File "%s", line %d, in %s' % (filename, lineno, name))
                if line:
                    buf.append('    %s' % line.strip())
            if buf: log.error("\n".join(buf))
error.py 文件源码 项目:CyberScan 作者: medbenali 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def filter(self, record):        
        from config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
error.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def filter(self, record):        
        from config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
futures.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def __init__(self, *, loop=None):
        """Initialize the future.

        The optional event_loop argument allows to explicitly set the event
        loop object used by the future. If it's not provided, the future uses
        the default event loop.
        """
        if loop is None:
            self._loop = events.get_event_loop()      # ????
        else:
            self._loop = loop

        self._callbacks = []     # ??

        if self._loop.get_debug():
            self._source_traceback = traceback.extract_stack(sys._getframe(1))
traceback_utils.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def first_spark_call():
    """
    Return a CallSite representing the first Spark call in the current call stack.
    """
    tb = traceback.extract_stack()
    if len(tb) == 0:
        return None
    file, line, module, what = tb[len(tb) - 1]
    sparkpath = os.path.dirname(file)
    first_spark_frame = len(tb) - 1
    for i in range(0, len(tb)):
        file, line, fun, what = tb[i]
        if file.startswith(sparkpath):
            first_spark_frame = i
            break
    if first_spark_frame == 0:
        file, line, fun, what = tb[0]
        return CallSite(function=fun, file=file, linenum=line)
    sfile, sline, sfun, swhat = tb[first_spark_frame]
    ufile, uline, ufun, uwhat = tb[first_spark_frame - 1]
    return CallSite(function=sfun, file=ufile, linenum=uline)
error.py 文件源码 项目:CVE-2016-6366 作者: RiskSense-Ops 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def filter(self, record):        
        from config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
logfile.py 文件源码 项目:BlenderRobotDesigner 作者: HBPNeurorobotics 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def log_callstack_last(back_trace=False):
    if not back_trace:
        stack = traceback.extract_stack()[:-1]
    else:
        stack = traceback.extract_tb(sys.exc_info()[2])

    message = "empty"

    print("Parsing stack")

    for path, line, func, code in stack:
        print(path,func)
        if func not in BACKTRACE_FILTER_FUNC:
            if func not in BACKTRACE_FILTER_HIDE_CODE:
                file = os.path.split(path)[-1]
                message = "%s:%s" % (file, line)

    return message
error.py 文件源码 项目:hakkuframework 作者: 4shadoww 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def filter(self, record):        
        from .config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
gunicorn.py 文件源码 项目:nanobox-adapter-libcloud 作者: nanobox-io 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")

    ## get traceback info
    import threading, sys, traceback
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
rules.py 文件源码 项目:PeekabooAV 作者: scVENUS 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def known(s):
    tb = traceback.extract_stack()
    tb = tb[-1]
    position = "%s:%s" % (tb[2], tb[1])

    db = get_config().get_db_con()
    if db.known(s):
        sample_info = db.sample_info_fetch(s)
        return RuleResult(position,
                          result=sample_info.get_result(),
                          reason=sample_info.reason,
                          further_analysis=False)

    return RuleResult(position,
                      result=Result.unknown,
                      reason="Datei ist dem System noch nicht bekannt",
                      further_analysis=True)
rules.py 文件源码 项目:PeekabooAV 作者: scVENUS 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def file_larger_than(s, args):
    tb = traceback.extract_stack()
    tb = tb[-1]
    position = "%s:%s" % (tb[2], tb[1])

    size = args['byte']
    if s.file_size > size:
        return RuleResult(position,
                          result=Result.unknown,
                          reason="Datei hat mehr als %d bytes"
                          % size,
                          further_analysis=True)

    return RuleResult(position,
                      result=Result.ignored,
                      reason="Datei ist nur %d bytes lang"
                      % s.file_size,
                      further_analysis=False)
rules.py 文件源码 项目:PeekabooAV 作者: scVENUS 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def cuckoo_score(s, args):
    tb = traceback.extract_stack()
    tb = tb[-1]
    position = "%s:%s" % (tb[2], tb[1])

    threshold = args['higher']
    if s.cuckoo_report.score >= threshold:
        return RuleResult(position,
                          result=Result.bad,
                          reason="Cuckoo score >= %s: %s"
                          % (threshold, s.cuckoo_report.score),
                          further_analysis=False)

    return RuleResult(position,
                      result=Result.unknown,
                      reason="Cuckoo score < %s: %s"
                      % (threshold, s.cuckoo_report.score),
                      further_analysis=True)
test_zipimport.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def doTraceback(self, module):
        try:
            module.do_raise()
        except:
            tb = sys.exc_info()[2].tb_next

            f,lno,n,line = extract_tb(tb, 1)[0]
            self.assertEqual(line, raise_src.strip())

            f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
            self.assertEqual(line, raise_src.strip())

            s = io.StringIO()
            print_tb(tb, 1, s)
            self.assertTrue(s.getvalue().endswith(raise_src))
        else:
            raise AssertionError("This ought to be impossible")
type_util.py 文件源码 项目:pytypes 作者: Stewori 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _raise_typecheck_error(msg, is_return=False, value=None, received_type=None,
            expected_type=None, func=None):
    if pytypes.warning_mode:
        import traceback
        tb = traceback.extract_stack()
        off = util._calc_traceback_list_offset(tb)
        cat = pytypes.ReturnTypeWarning if is_return else pytypes.InputTypeWarning
        warn_explicit(msg, cat, tb[off][0], tb[off][1])
#       if not func is None:
#           warn_explicit(msg, cat, func.__code__.co_filename,
#                   func.__code__.co_firstlineno, func.__module__)
#       else:
#           warn(msg, pytypes.ReturnTypeWarning)
    else:
        if is_return:
            raise pytypes.ReturnTypeError(msg)
        else:
            raise pytypes.InputTypeError(msg)
util.py 文件源码 项目:pytypes 作者: Stewori 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _warn_argname(msg, func, slf, clsm, cls=None, warn_tp=pytypes.exceptions.TypeWarning):
    if not pytypes.warn_argnames:
        return
    if cls is None:
        if slf or clsm:
            try:
                cls_name = get_class_that_defined_method(func).__name__
            except:
                cls_name = '<unknown class>'
        else:
            cls_name = None
    else:
        cls_name = cls.__name__
    tb = traceback.extract_stack()
    off = _calc_traceback_list_offset(tb)
    if cls_name is None:
        _msg = '%s: %s.%s'%(msg, func.__module__, func.__name__)
    else:
        _msg = '%s: %s.%s.%s'%(msg, func.__module__, cls_name, func.__name__)
    warn_explicit(_msg, warn_tp, tb[off][0], tb[off][1])
cli.py 文件源码 项目:incubator-airflow-old 作者: apache 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def sigquit_handler(sig, frame):
    """Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT
    e.g. kill -s QUIT <PID> or CTRL+\
    """
    print("Dumping stack traces for all threads in PID {}".format(os.getpid()))
    id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for thread_id, stack in sys._current_frames().items():
        code.append("\n# Thread: {}({})"
                    .format(id_to_name.get(thread_id, ""), thread_id))
        for filename, line_number, name, line in traceback.extract_stack(stack):
            code.append('File: "{}", line {}, in {}'
                        .format(filename, line_number, name))
            if line:
                code.append("  {}".format(line.strip()))
    print("\n".join(code))
error.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def filter(self, record):        
        from config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
error.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def filter(self, record):        
        from .config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
state.py 文件源码 项目:pyprob 作者: probprog 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def extract_address():
    #tb = traceback.extract_stack()
    # print()
    # for t in tb:
    #     print(t[0], t[1], t[2], t[3])
    #frame = tb[-3]
    # return '{0}/{1}/{2}'.format(frame[1], frame[2], frame[3])
    #return '{0}/{1}'.format(frame[1], frame[2])
    # Retun an address in the format:
    # 'instruction pointer' / 'qualified function name'
    frame = sys._getframe(2)
    ip = frame.f_lasti
    names = []
    var_name = _extract_target_of_assignment()
    if var_name is not None:
        names.append(var_name)
    while frame is not None:
        n = frame.f_code.co_name
        if n.startswith('<'): break
        names.append(n)
        if n == _current_function_name: break
        frame = frame.f_back
    return "{}/{}".format(ip, '.'.join(reversed(names)))
__init__.py 文件源码 项目:chuck 作者: Calysto 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def timeRemaining(seconds=0):
    """ Function to be used with 'while' """
    global _timers
    if seconds == 0: return True
    now = time.time()
    stack = traceback.extract_stack()
    filename, line_no, q1, q2 = stack[-2]
    if filename.startswith("<pyshell"):
        filename = "pyshell"
    if (filename, line_no) not in _timers:
        _timers[(filename, line_no)] = (now, seconds)
        return True
    start, duration = _timers[(filename, line_no)]
    if seconds != duration:
        _timers[(filename, line_no)] = (now, seconds)
        return True
    if now - start > duration:
        del _timers[(filename, line_no)]
        return False
    else:
        return True
test_zipimport.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def doTraceback(self, module):
        try:
            module.do_raise()
        except:
            tb = sys.exc_info()[2].tb_next

            f,lno,n,line = extract_tb(tb, 1)[0]
            self.assertEqual(line, raise_src.strip())

            f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
            self.assertEqual(line, raise_src.strip())

            s = StringIO.StringIO()
            print_tb(tb, 1, s)
            self.assertTrue(s.getvalue().endswith(raise_src))
        else:
            raise AssertionError("This ought to be impossible")
test_zipimport.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def doTraceback(self, module):
        try:
            module.do_raise()
        except:
            tb = sys.exc_info()[2].tb_next

            f,lno,n,line = extract_tb(tb, 1)[0]
            self.assertEqual(line, raise_src.strip())

            f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
            self.assertEqual(line, raise_src.strip())

            s = StringIO.StringIO()
            print_tb(tb, 1, s)
            self.assertTrue(s.getvalue().endswith(raise_src))
        else:
            raise AssertionError("This ought to be impossible")
error.py 文件源码 项目:scapy-bpf 作者: guedou 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def filter(self, record):        
        from config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
error.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def filter(self, record):        
        from config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
logger.py 文件源码 项目:Andes 作者: cuihantao 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _raw_log(self, logfn, message, exc_info):
        cname = ''
        loc = ''
        fn = ''
        tb = traceback.extract_stack()
        if len(tb) > 2:
            if self.show_loc:
                loc = '(%s:%d):' % (os.path.basename(tb[-3][0]), tb[-3][1])
            fn = tb[-3][2]
            if fn != '<module>':
                if self.__class__.__name__ != Logger.__name__:
                    fn = self.__class__.__name__ + '.' + fn
                fn += '()'
        if self.show_fcn:
            logfn(loc + cname + fn + ': ' + message, exc_info=exc_info)
        else:
            logfn(message, exc_info=exc_info)


问题


面经


文章

微信
公众号

扫码关注公众号