python类extract_stack()的实例源码

error.py 文件源码 项目:scapy-radio 作者: BastilleResearch 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def filter(self, record):        
        from config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
testcase.py 文件源码 项目:QTAF 作者: Tencent 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _get_current_traceback(self, thread ):
        '''????????????

        :param thread: ????????
        :type thread: Thread
        '''
        for thread_id, stack in sys._current_frames().items():
            if thread_id != thread.ident:
                continue
            tb = "Traceback ( thread-%d possibly hold at ):\n" % thread_id
            for filename, lineno, name, line in traceback.extract_stack(stack):
                tb += '  File: "%s", line %d, in %s\n' % (filename, lineno, name)
                if line:
                    tb += "    %s\n" % (line.strip())
            return tb
        else:
            raise RuntimeError("thread not found")
util.py 文件源码 项目:QTAF 作者: Tencent 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_thread_traceback(thread):
    '''????????????

    :param thread: ????????
    :type thread: Thread
    '''
    for thread_id, stack in sys._current_frames().items():
        if thread_id != thread.ident:
            continue
        tb = "Traceback ( thread-%d possibly hold at ):\n" % thread_id
        for filename, lineno, name, line in traceback.extract_stack(stack):
            tb += '  File: "%s", line %d, in %s\n' % (filename, lineno, name)
            if line:
                tb += "    %s\n" % (line.strip())
        return tb
    else:
        raise RuntimeError("thread not found")
Debug.py 文件源码 项目:StatisKit 作者: StatisKit 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def caller_trace(back=0):
    """
    Trace caller stack and save info into global dicts, which
    are printed automatically at the end of SCons execution.
    """
    global caller_bases, caller_dicts
    import traceback
    tb = traceback.extract_stack(limit=3+back)
    tb.reverse()
    callee = tb[1][:3]
    caller_bases[callee] = caller_bases.get(callee, 0) + 1
    for caller in tb[2:]:
        caller = callee + caller[:3]
        try:
            entry = caller_dicts[callee]
        except KeyError:
            caller_dicts[callee] = entry = {}
        entry[caller] = entry.get(caller, 0) + 1
        callee = caller

# print a single caller and its callers, if any
pydevd_console.py 文件源码 项目:specto 作者: mrknow 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_interactive_console(thread_id, frame_id, frame, console_message):
    """returns the global interactive console.
    interactive console should have been initialized by this time
    :rtype: DebugConsole
    """
    if InteractiveConsoleCache.thread_id == thread_id and InteractiveConsoleCache.frame_id == frame_id:
        return InteractiveConsoleCache.interactive_console_instance

    InteractiveConsoleCache.interactive_console_instance = DebugConsole()
    InteractiveConsoleCache.thread_id = thread_id
    InteractiveConsoleCache.frame_id = frame_id

    console_stacktrace = traceback.extract_stack(frame, limit=1)
    if console_stacktrace:
        current_context = console_stacktrace[0] # top entry from stacktrace
        context_message = 'File "%s", line %s, in %s' % (current_context[0], current_context[1], current_context[2])
        console_message.add_console_message(CONSOLE_OUTPUT, "[Current context]: %s" % (context_message,))
    return InteractiveConsoleCache.interactive_console_instance
error.py 文件源码 项目:isf 作者: w3h 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def filter(self, record):        
        from config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
test_zipimport.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def doTraceback(self, module):
        try:
            module.do_raise()
        except:
            tb = sys.exc_info()[2].tb_next

            f,lno,n,line = extract_tb(tb, 1)[0]
            self.assertEqual(line, raise_src.strip())

            f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
            self.assertEqual(line, raise_src.strip())

            s = io.StringIO()
            print_tb(tb, 1, s)
            self.assertTrue(s.getvalue().endswith(raise_src))
        else:
            raise AssertionError("This ought to be impossible")
contractor.py 文件源码 项目:g3ar 作者: VillanCh 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _worker(self, dictobj):
        """"""
        func = dictobj['func']
        args = dictobj['args']
        argv = dictobj['argv']

        try:
            result = func(*args, **argv)
        except Exception as e:
            #print 'ecp occured'
            result = tuple([e, traceback.extract_stack()])

        self.lock.acquire()
        self._executed_task_count = self._executed_task_count + 1
        self._add_result_to_queue(result=result)
        self.lock.release()
contractor.py 文件源码 项目:g3ar 作者: VillanCh 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _worker(self, dictobj):
        """"""
        func = dictobj['func']
        args = dictobj['args']
        argv = dictobj['argv']

        try:
            result = func(*args, **argv)
        except Exception as e:
            #print 'ecp occured'
            result = tuple([e, traceback.extract_stack()])

        self.lock.acquire()
        self._executed_task_count = self._executed_task_count + 1
        self._add_result_to_queue(result=result)
        self.lock.release()
config.py 文件源码 项目:TripMeal 作者: DanielAndreasen 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")

    ## get traceback info
    import threading, sys, traceback
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
base_validator.py 文件源码 项目:tripoli 作者: DDMAL 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def mute_errors(self, fn, *args, **kwargs):
        """Run given function and catch any of the errors it logged.

        The self._errors key will not be changed by using this function.

        Works by patching the BaseValidator.log_error function.
        BaseValidator.log_error should not be overridden in children.
        """
        caught_errors = set()

        def patched_log_error(self, field, msg):
            tb = traceback.extract_stack()[:-1] if self.debug else None
            caught_errors.add(ValidatorLogError(msg, self._path + (field,), tb))

        old_log_error = BaseValidator.log_error
        try:
            BaseValidator.log_error = patched_log_error
            val = fn(*args, **kwargs)
        finally:
            BaseValidator.log_error = old_log_error
        return val, caught_errors
base_validator.py 文件源码 项目:tripoli 作者: DDMAL 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def log_error(self, field, msg):
        """Add an error to the validator.

        This method should not be overridden in subclasses, as doing so
        is likely to break the error and warning coercion decorators.

        :param field: The field the error was raised on.
        :param msg: The message to associate with the error.
        """
        if self.collect_errors:
            tb = traceback.extract_stack()[:-1] if self.debug else None
            err = ValidatorLogError(msg, self._path + (field,), tb)
            if self.verbose:
                self._IIIFValidator.logger.error(str(err))
            self._errors.add(err)
        if self.fail_fast:
            raise FailFastException
configuration.py 文件源码 项目:modelforge 作者: src-d 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def refresh():
    override_files = []
    for stack in traceback.extract_stack():
        f = os.path.join(os.path.dirname(stack[0]), OVERRIDE_FILE)
        if f not in override_files:
            override_files.insert(0, f)
    if OVERRIDE_FILE in override_files:
        del override_files[override_files.index(OVERRIDE_FILE)]
    override_files.append(OVERRIDE_FILE)

    def import_path(path):
        if sys.version_info < (3, 5, 0):
            from importlib.machinery import SourceFileLoader
            return SourceFileLoader(__name__, path).load_module()
        import importlib.util
        spec = importlib.util.spec_from_file_location(__name__, path)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        return module

    for override_file in override_files:
        if not os.path.isfile(override_file):
            continue
        mod = import_path(override_file)
        globals().update({n: getattr(mod, n) for n in dir(mod) if not n.startswith("__")})
util.py 文件源码 项目:py-ipv8 作者: qstokkink 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def blockingCallFromThread(reactor, f, *args, **kwargs):
    """
    Improved version of twisted's blockingCallFromThread that shows the complete
    stacktrace when an exception is raised on the reactor's thread.
    If being called from the reactor thread already, just return the result of execution of the callable.
    """
    if isInIOThread():
            return f(*args, **kwargs)
    else:
        queue = Queue.Queue()

        def _callFromThread():
            result = defer.maybeDeferred(f, *args, **kwargs)
            result.addBoth(queue.put)
        reactor.callFromThread(_callFromThread)
        result = queue.get()
        if isinstance(result, failure.Failure):
            other_thread_tb = traceback.extract_tb(result.getTracebackObject())
            this_thread_tb = traceback.extract_stack()
            logger.error("Exception raised on the reactor's thread %s: \"%s\".\n Traceback from this thread:\n%s\n"
                         " Traceback from the reactor's thread:\n %s", result.type.__name__, result.getErrorMessage(),
                         ''.join(traceback.format_list(this_thread_tb)), ''.join(traceback.format_list(other_thread_tb)))
            result.raiseException()
        return result
Debug.py 文件源码 项目:objEnhancer 作者: BabbageCom 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def caller_trace(back=0):
    """
    Trace caller stack and save info into global dicts, which
    are printed automatically at the end of SCons execution.
    """
    global caller_bases, caller_dicts
    import traceback
    tb = traceback.extract_stack(limit=3+back)
    tb.reverse()
    callee = tb[1][:3]
    caller_bases[callee] = caller_bases.get(callee, 0) + 1
    for caller in tb[2:]:
        caller = callee + caller[:3]
        try:
            entry = caller_dicts[callee]
        except KeyError:
            caller_dicts[callee] = entry = {}
        entry[caller] = entry.get(caller, 0) + 1
        callee = caller

# print a single caller and its callers, if any
test_zipimport.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def doTraceback(self, module):
        try:
            module.do_raise()
        except:
            tb = sys.exc_info()[2].tb_next

            f,lno,n,line = extract_tb(tb, 1)[0]
            self.assertEqual(line, raise_src.strip())

            f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
            self.assertEqual(line, raise_src.strip())

            s = StringIO.StringIO()
            print_tb(tb, 1, s)
            self.assertTrue(s.getvalue().endswith(raise_src))
        else:
            raise AssertionError("This ought to be impossible")
traceback_utils.py 文件源码 项目:pyspark 作者: v-v-vishnevskiy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def first_spark_call():
    """
    Return a CallSite representing the first Spark call in the current call stack.
    """
    tb = traceback.extract_stack()
    if len(tb) == 0:
        return None
    file, line, module, what = tb[len(tb) - 1]
    sparkpath = os.path.dirname(file)
    first_spark_frame = len(tb) - 1
    for i in range(0, len(tb)):
        file, line, fun, what = tb[i]
        if file.startswith(sparkpath):
            first_spark_frame = i
            break
    if first_spark_frame == 0:
        file, line, fun, what = tb[0]
        return CallSite(function=fun, file=file, linenum=line)
    sfile, sline, sfun, swhat = tb[first_spark_frame]
    ufile, uline, ufun, uwhat = tb[first_spark_frame - 1]
    return CallSite(function=sfun, file=ufile, linenum=uline)
test_zipimport.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def doTraceback(self, module):
        try:
            module.do_raise()
        except:
            tb = sys.exc_info()[2].tb_next

            f,lno,n,line = extract_tb(tb, 1)[0]
            self.assertEqual(line, raise_src.strip())

            f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
            self.assertEqual(line, raise_src.strip())

            s = io.StringIO()
            print_tb(tb, 1, s)
            self.assertTrue(s.getvalue().endswith(raise_src))
        else:
            raise AssertionError("This ought to be impossible")
agl.py 文件源码 项目:autoxd 作者: nessessary 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def getCallerName():
    """?????????????????, ???????????????"""
    import traceback
    s_trace = traceback.extract_stack()
    full_function_name = s_trace[-3][2]
    #?????????
    #s_trace = s_trace[-4]
    #full_function_name = s_trace[-1]
    #s_filters = list('().')
    #for s_filter in s_filters:
        #if full_function_name != None:
            #full_function_name = str.replace(full_function_name, s_filter, '')
        #else:
            #full_function_name = ''
    #???? ????
    filename = sys._getframe(2).f_code.co_filename
    filename = os.path.basename(filename)
    filename = filename.split('.')[0]
    return filename + "_" + full_function_name
sign_observation.py 文件源码 项目:autoxd 作者: nessessary 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def assemble(*args, **kwargs):
    """????, ????????, ????????
    args: tuple boll
    return: bool
    """
    #?????????????? ??four<-0.3, ???????
    #???????? ???????? ??????
    s_trace = traceback.extract_stack()
    if len(s_trace)>=2 and is_allow:
        #print s_trace[-2]
        fname,line,mod,fn_name = s_trace[-2]
        fn_name = s_trace[-1][-2]
        args_text = _getFunctionArgs(fname, line, fn_name)
        #print args_text
    #print args
    args = np.array(args)
    return args.all()


问题


面经


文章

微信
公众号

扫码关注公众号