python类_current_frames()的实例源码

utils.py 文件源码 项目:satori-rtm-sdk-python 作者: satori-com 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def print_all_stacktraces():
    print("\n*** STACKTRACE - START ***\n")
    code = []
    for threadId, stack in sys._current_frames().items():
        threadName = ''
        for t in threading.enumerate():
            if t.ident == threadId:
                threadName = t.name
        code.append("\n# ThreadID: %s %s" % (threadId, threadName))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                                                        lineno, name))
            if line:
                code.append("  %s" % (line.strip()))

    for line in code:
        print(line)
    print("\n*** STACKTRACE - END ***\n")
block_reporter.py 文件源码 项目:stackimpact-python 作者: stackimpact 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def process_sample(self, signal_frame, sample_time, main_thread_id):
        if self.profile:
            start = time.clock()

            current_frames = sys._current_frames()
            items = current_frames.items()
            for thread_id, thread_frame in items:
                if thread_id == main_thread_id:
                    thread_frame = signal_frame

                stack = self.recover_stack(thread_frame)
                if stack:
                    current_node = self.profile
                    for frame in reversed(stack):
                        current_node = current_node.find_or_add_child(str(frame))
                    current_node.increment(sample_time, 1)

                thread_id, thread_frame, stack = None, None, None

            items = None
            current_frames = None

            self.profile._overhead += (time.clock() - start)
config.py 文件源码 项目:SWEETer-Cat 作者: DanielAndreasen 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")

    ## get traceback info
    import threading, sys, traceback
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
gunicorn.py 文件源码 项目:nanobox-adapter-libcloud 作者: nanobox-io 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")

    ## get traceback info
    import threading, sys, traceback
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
cli.py 文件源码 项目:incubator-airflow-old 作者: apache 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def sigquit_handler(sig, frame):
    """Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT
    e.g. kill -s QUIT <PID> or CTRL+\
    """
    print("Dumping stack traces for all threads in PID {}".format(os.getpid()))
    id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for thread_id, stack in sys._current_frames().items():
        code.append("\n# Thread: {}({})"
                    .format(id_to_name.get(thread_id, ""), thread_id))
        for filename, line_number, name, line in traceback.extract_stack(stack):
            code.append('File: "{}", line {}, in {}'
                        .format(filename, line_number, name))
            if line:
                code.append("  {}".format(line.strip()))
    print("\n".join(code))
testcase.py 文件源码 项目:QTAF 作者: Tencent 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _get_current_traceback(self, thread ):
        '''????????????

        :param thread: ????????
        :type thread: Thread
        '''
        for thread_id, stack in sys._current_frames().items():
            if thread_id != thread.ident:
                continue
            tb = "Traceback ( thread-%d possibly hold at ):\n" % thread_id
            for filename, lineno, name, line in traceback.extract_stack(stack):
                tb += '  File: "%s", line %d, in %s\n' % (filename, lineno, name)
                if line:
                    tb += "    %s\n" % (line.strip())
            return tb
        else:
            raise RuntimeError("thread not found")
util.py 文件源码 项目:QTAF 作者: Tencent 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_thread_traceback(thread):
    '''????????????

    :param thread: ????????
    :type thread: Thread
    '''
    for thread_id, stack in sys._current_frames().items():
        if thread_id != thread.ident:
            continue
        tb = "Traceback ( thread-%d possibly hold at ):\n" % thread_id
        for filename, lineno, name, line in traceback.extract_stack(stack):
            tb += '  File: "%s", line %d, in %s\n' % (filename, lineno, name)
            if line:
                tb += "    %s\n" % (line.strip())
        return tb
    else:
        raise RuntimeError("thread not found")
ikpdb.py 文件源码 项目:ikpdb 作者: audaxis 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def enable_tracing(self):
        """ Enable tracing if it is disabled and debugged program is running, 
        else do nothing.
        Do this on all threads but the debugger thread.
        :return: True if tracing has been enabled, False else.
        """
        _logger.x_debug("enable_tracing()")
        #self.dump_tracing_state("before enable_tracing()")
        if not self.tracing_enabled and self.execution_started:
            # Restore or set trace function on all existing frames appart from 
            # debugger
            threading.settrace(self._tracer)  # then enable on all threads to come
            for thr in threading.enumerate():
                if thr.ident != self.debugger_thread_ident:  # skip debugger thread
                    a_frame = sys._current_frames()[thr.ident]
                    while a_frame:
                        a_frame.f_trace = self._tracer
                        a_frame = a_frame.f_back
            iksettrace._set_trace_on(self._tracer, self.debugger_thread_ident)
            self.tracing_enabled = True

        #self.dump_tracing_state("after enable_tracing()")
        return self.tracing_enabled
config.py 文件源码 项目:TripMeal 作者: DanielAndreasen 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")

    ## get traceback info
    import threading, sys, traceback
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
app.py 文件源码 项目:data-store 作者: HumanCellAtlas 项目源码 文件源码 阅读 62 收藏 0 点赞 0 评论 0
def timeout_response() -> chalice.Response:
    """
    Produce a chalice Response object that indicates a timeout.  Stacktraces for all running threads, other than the
    current thread, are provided in the response object.
    """
    frames = sys._current_frames()
    current_threadid = threading.get_ident()
    trace_dump = {
        thread_id: traceback.format_stack(frame)
        for thread_id, frame in frames.items()
        if thread_id != current_threadid}

    problem = {
        'status': requests.codes.gateway_timeout,
        'code': "timed_out",
        'title': "Timed out processing request.",
        'traces': trace_dump,
    }
    return chalice.Response(
        status_code=problem['status'],
        headers={"Content-Type": "application/problem+json"},
        body=json.dumps(problem),
    )
base.py 文件源码 项目:py-ipv8 作者: qstokkink 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def setUpClass(cls):
        cls.__lockup_timestamp__ = time.time()
        def check_twisted():
            while time.time() - cls.__lockup_timestamp__ < cls.MAX_TEST_TIME:
                time.sleep(2)
                # If the test class completed normally, exit
                if not cls.__testing__:
                    return
            # If we made it here, there is a serious issue which we cannot recover from.
            # Most likely the Twisted threadpool got into a deadlock while shutting down.
            import os, traceback
            print >> sys.stderr, "The test-suite locked up! Force quitting! Thread dump:"
            for tid, stack in sys._current_frames().items():
                if tid != threading.currentThread().ident:
                    print >> sys.stderr, "THREAD#%d" % tid
                    for line in traceback.format_list(traceback.extract_stack(stack)):
                        print >> sys.stderr, "|", line[:-1].replace('\n', '\n|   ')
            os._exit(1)
        t = threading.Thread(target=check_twisted)
        t.daemon = True
        t.start()
test_threading.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_clear_threads_states_after_fork(self):
        # Issue #17094: check that threads states are cleared after fork()

        # start a bunch of threads
        threads = []
        for i in range(16):
            t = threading.Thread(target=lambda : time.sleep(0.3))
            threads.append(t)
            t.start()

        pid = os.fork()
        if pid == 0:
            # check that threads states have been cleared
            if len(sys._current_frames()) == 1:
                os._exit(0)
            else:
                os._exit(1)
        else:
            _, status = os.waitpid(pid, 0)
            self.assertEqual(0, status)

        for t in threads:
            t.join()
cli.py 文件源码 项目:airflow 作者: apache-airflow 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def sigquit_handler(sig, frame):
    """Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT
    e.g. kill -s QUIT <PID> or CTRL+\
    """
    print("Dumping stack traces for all threads in PID {}".format(os.getpid()))
    id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for thread_id, stack in sys._current_frames().items():
        code.append("\n# Thread: {}({})"
                    .format(id_to_name.get(thread_id, ""), thread_id))
        for filename, line_number, name, line in traceback.extract_stack(stack):
            code.append('File: "{}", line {}, in {}'
                        .format(filename, line_number, name))
            if line:
                code.append("  {}".format(line.strip()))
    print("\n".join(code))
gunicorn.py 文件源码 项目:pypers 作者: frankosan 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")

    ## get traceback info
    import threading, sys, traceback
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
threadtree.py 文件源码 项目:easypy 作者: weka-io 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def iter_thread_frames():
        main_thread_frame = None
        for ident, frame in sys._current_frames().items():
            if IDENT_TO_UUID.get(ident) == MAIN_UUID:
                main_thread_frame = frame
                # the MainThread should be shown in it's "greenlet" version
                continue
            yield ident, frame

        for thread in threading.enumerate():
            if not getattr(thread, '_greenlet', None):
                # some inbetween state, before greenlet started or after it died?...
                pass
            elif thread._greenlet.gr_frame:
                yield thread.ident, thread._greenlet.gr_frame
            else:
                # a thread with greenlet but without gr_frame will be fetched from sys._current_frames
                # If we switch to another greenlet by the time we get there we will get inconsistent dup of threads.
                # TODO - make best-effort attempt to show coherent thread dump
                yield thread.ident, main_thread_frame
test_threading.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_clear_threads_states_after_fork(self):
        # Issue #17094: check that threads states are cleared after fork()

        # start a bunch of threads
        threads = []
        for i in range(16):
            t = threading.Thread(target=lambda : time.sleep(0.3))
            threads.append(t)
            t.start()

        pid = os.fork()
        if pid == 0:
            # check that threads states have been cleared
            if len(sys._current_frames()) == 1:
                os._exit(0)
            else:
                os._exit(1)
        else:
            _, status = os.waitpid(pid, 0)
            self.assertEqual(0, status)

        for t in threads:
            t.join()
ThreadedAppServer.py 文件源码 项目:w4py 作者: Cito 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def threadDump(signum, frame):
    """Signal handler for dumping thread stack frames to stdout."""
    print
    print "App server has been signaled to attempt a thread dump."
    print
    print "Thread stack frame dump at", asclocaltime()
    sys.stdout.flush()
    frames = sys._current_frames()
    print
    print "-" * 79
    print
    for threadID in sorted(frames):
        frame = frames[threadID]
        print "Thread ID: %d (reference count = %d)" % (
            threadID, sys.getrefcount(frame))
        print ''.join(traceback.format_list(traceback.extract_stack(frame)))
    print "-" * 79
    sys.stdout.flush()
debug.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_full_thread_dump():
    """Returns a string containing a traceback for all threads"""
    output = io.StringIO()
    time = strftime("%Y-%m-%d %H:%M:%S", gmtime())
    thread_names = {}
    for thread in threading.enumerate():
        thread_names[thread.ident] = thread.name
    output.write("\n>>>> Begin stack trace (%s) >>>>\n" % time)
    for threadId, stack in current_frames().items():
        output.write(
            "\n# ThreadID: %s (%s)\n" %
            (threadId, thread_names.get(threadId, "unknown")))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            output.write(
                'File: "%s", line %d, in %s\n' %
                (filename, lineno, name))
            if line:
                output.write("  %s\n" % (line.strip()))
    output.write("\n<<<< End stack trace <<<<\n\n")

    thread_dump = output.getvalue()
    output.close()
    return thread_dump
runtest.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _captureThreadStacks(self):
        """Capture the stacks for all currently running threads.

        :return: A list of ``(thread-description, stack)`` tuples. See
            `traceback.format_stack` for the format of ``stack``.
        """
        threads = {t.ident: t for t in threading.enumerate()}

        def describe(ident):
            if ident in threads:
                return repr(threads[ident])
            else:
                return "<*Unknown* %d>" % ident

        return [
            (describe(ident), traceback.format_stack(frame))
            for ident, frame in sys._current_frames().items()
        ]
nrlock.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _debug_lock_release(self):
                errbuf = ""
                owner = self._RLock__owner
                if not owner:
                        return errbuf

                # Get stack of current owner, if lock is owned.
                for tid, stack in sys._current_frames().items():
                        if tid != owner.ident:
                                continue
                        errbuf += "Stack of owner:\n"
                        for filenm, lno, func, txt in \
                            traceback.extract_stack(stack):
                                errbuf += "  File: \"{0}\", line {1:d},in {2}".format(
                                    filenm, lno, func)
                                if txt:
                                        errbuf += "\n    {0}".format(txt.strip())
                                errbuf += "\n"
                        break

                return errbuf
funcserver.py 文件源码 项目:funcserver 作者: deep-compute 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def dump_stacks(self):
        '''
        Dumps the stack of all threads. This function
        is meant for debugging. Useful when a deadlock happens.

        borrowed from: http://blog.ziade.org/2012/05/25/zmq-and-gevent-debugging-nightmares/
        '''

        dump = []

        # threads
        threads = dict([(th.ident, th.name)
                            for th in threading.enumerate()])

        for thread, frame in sys._current_frames().items():
            if thread not in threads: continue
            dump.append('Thread 0x%x (%s)\n' % (thread, threads[thread]))
            dump.append(''.join(traceback.format_stack(frame)))
            dump.append('\n')

        return ''.join(dump)
topotest.py 文件源码 项目:topotests 作者: FRRouting 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def checkAddressSanitizerError(output, router, component):
    "Checks for AddressSanitizer in output. If found, then logs it and returns true, false otherwise"

    addressSantizerError = re.search('(==[0-9]+==)ERROR: AddressSanitizer: ([^\s]*) ', output)
    if addressSantizerError:
        sys.stderr.write("%s: %s triggered an exception by AddressSanitizer\n" % (router, component))
        # Sanitizer Error found in log
        pidMark = addressSantizerError.group(1)
        addressSantizerLog = re.search('%s(.*)%s' % (pidMark, pidMark), output, re.DOTALL)
        if addressSantizerLog:
            callingTest = os.path.basename(sys._current_frames().values()[0].f_back.f_back.f_globals['__file__'])
            callingProc = sys._getframe(2).f_code.co_name
            with open("/tmp/AddressSanitzer.txt", "a") as addrSanFile:
                sys.stderr.write('\n'.join(addressSantizerLog.group(1).splitlines()) + '\n')
                addrSanFile.write("## Error: %s\n\n" % addressSantizerError.group(2))
                addrSanFile.write("### AddressSanitizer error in topotest `%s`, test `%s`, router `%s`\n\n" % (callingTest, callingProc, router))
                addrSanFile.write('    '+ '\n    '.join(addressSantizerLog.group(1).splitlines()) + '\n')
                addrSanFile.write("\n---------------\n")
        return True
    return False
gunicorn.conf.py 文件源码 项目:tornadopy 作者: xubigshu 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")

    ## get traceback info
    import threading, sys, traceback

    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId, ""),
                                            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                                                        lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
debug.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run(self):
        while True:
            with self.lock:
                if self._stop is True:
                    return

            print("\n=============  THREAD FRAMES:  ================")
            for id, frame in sys._current_frames().items():
                if id == threading.current_thread().ident:
                    continue
                print("<< thread %d >>" % id)
                traceback.print_stack(frame)
            print("===============================================\n")

            time.sleep(self.interval)
debug.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run(self):
        while True:
            with self.lock:
                if self._stop is True:
                    return

            print("\n=============  THREAD FRAMES:  ================")
            for id, frame in sys._current_frames().items():
                if id == threading.current_thread().ident:
                    continue
                print("<< thread %d >>" % id)
                traceback.print_stack(frame)
            print("===============================================\n")

            time.sleep(self.interval)
eventlet_backdoor.py 文件源码 项目:iotronic 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _print_nativethreads():
    for threadId, stack in sys._current_frames().items():
        print(threadId)
        traceback.print_stack(stack)
        print()
rest_server.py 文件源码 项目:storperf 作者: opnfv 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def delete(self):
        self.logger.info("Threads: %s" % sys._current_frames())
        print sys._current_frames()
        try:
            return jsonify({'Slaves': storperf.terminate_workloads()})
        except Exception as e:
            abort(400, str(e))
gunicorn_conf.py 文件源码 项目:site 作者: alphageek-xyz 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")
    import threading, sys, traceback
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
test_sys.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_current_frames(self):
        have_threads = True
        try:
            import _thread
        except ImportError:
            have_threads = False

        if have_threads:
            self.current_frames_with_threads()
        else:
            self.current_frames_without_threads()

    # Test sys._current_frames() in a WITH_THREADS build.
test_sys.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def current_frames_without_threads(self):
        # Not much happens here:  there is only one thread, with artificial
        # "thread id" 0.
        d = sys._current_frames()
        self.assertEqual(len(d), 1)
        self.assertIn(0, d)
        self.assertTrue(d[0] is sys._getframe())


问题


面经


文章

微信
公众号

扫码关注公众号