def test_getframe(self):
self.assertRaises(TypeError, sys._getframe, 42, 42)
self.assertRaises(ValueError, sys._getframe, 2000000000)
self.assertTrue(
SysModuleTest.test_getframe.im_func.func_code \
is sys._getframe().f_code
)
# sys._current_frames() is a CPython-only gimmick.
python类_current_frames()的实例源码
def test_current_frames(self):
have_threads = True
try:
import thread
except ImportError:
have_threads = False
if have_threads:
self.current_frames_with_threads()
else:
self.current_frames_without_threads()
# Test sys._current_frames() in a WITH_THREADS build.
def current_frames_without_threads(self):
# Not much happens here: there is only one thread, with artificial
# "thread id" 0.
d = sys._current_frames()
self.assertEqual(len(d), 1)
self.assertIn(0, d)
self.assertTrue(d[0] is sys._getframe())
def test_getframe(self):
self.assertRaises(TypeError, sys._getframe, 42, 42)
self.assertRaises(ValueError, sys._getframe, 2000000000)
self.assertTrue(
SysModuleTest.test_getframe.im_func.func_code \
is sys._getframe().f_code
)
# sys._current_frames() is a CPython-only gimmick.
def test_current_frames(self):
have_threads = True
try:
import thread
except ImportError:
have_threads = False
if have_threads:
self.current_frames_with_threads()
else:
self.current_frames_without_threads()
# Test sys._current_frames() in a WITH_THREADS build.
def current_frames_without_threads(self):
# Not much happens here: there is only one thread, with artificial
# "thread id" 0.
d = sys._current_frames()
self.assertEqual(len(d), 1)
self.assertIn(0, d)
self.assertTrue(d[0] is sys._getframe())
def dumpstacks(self, data):
import threading, sys, traceback
id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""), threadId))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
if line:
code.append(" %s" % (line.strip()))
return "\r\n".join(code)
def iter_frames(self, t):
#sys._current_frames(): dictionary with thread id -> topmost frame
current_frames = sys._current_frames()
v = current_frames.get(t.ident)
if v is not None:
return [v]
return []
# IFDEF CYTHON
# def create_db_frame(self, *args, **kwargs):
# raise AssertionError('This method should not be called on cython (PyDbFrame should be used directly).')
# ELSE
# just create the db frame directly
def create_db_frame(self, args):
#the frame must be cached as a weak-ref (we return the actual db frame -- which will be kept
#alive until its trace_dispatch method is not referenced anymore).
#that's a large workaround because:
#1. we can't have weak-references to python frame object
#2. only from 2.5 onwards we have _current_frames support from the interpreter
db_frame = PyDBFrame(args)
db_frame.frame = args[-1]
self._AddDbFrame(db_frame)
return db_frame
def __str__(self):
return 'State:%s Stop:%s Cmd: %s Kill:%s Frames:%s' % (
self.pydev_state, self.pydev_step_stop, self.pydev_step_cmd, self.pydev_notify_kill, len(self.iter_frames(None)))
#=======================================================================================================================
# NOW, WE HAVE TO DEFINE WHICH THREAD INFO TO USE
# (whether we have to keep references to the frames or not)
# from version 2.5 onwards, we can use sys._current_frames to get a dict with the threads
# and frames, but to support other versions, we can't rely on that.
#=======================================================================================================================
def run(self):
time.sleep(10)
thread_id_to_name = {}
try:
for t in threading.enumerate():
thread_id_to_name[t.ident] = '%s (daemon: %s)' % (t.name, t.daemon)
except:
pass
stack_trace = [
'===============================================================================',
'pydev pyunit runner: Threads still found running after tests finished',
'================================= Thread Dump =================================']
for thread_id, stack in sys._current_frames().items():
stack_trace.append('\n-------------------------------------------------------------------------------')
stack_trace.append(" Thread %s" % thread_id_to_name.get(thread_id, thread_id))
stack_trace.append('')
if 'self' in stack.f_locals:
sys.stderr.write(str(stack.f_locals['self']) + '\n')
for filename, lineno, name, line in traceback.extract_stack(stack):
stack_trace.append(' File "%s", line %d, in %s' % (filename, lineno, name))
if line:
stack_trace.append(" %s" % (line.strip()))
stack_trace.append('\n=============================== END Thread Dump ===============================')
sys.stderr.write('\n'.join(stack_trace))
def test_getframe(self):
self.assertRaises(TypeError, sys._getframe, 42, 42)
self.assertRaises(ValueError, sys._getframe, 2000000000)
self.assertTrue(
SysModuleTest.test_getframe.__code__ \
is sys._getframe().f_code
)
# sys._current_frames() is a CPython-only gimmick.
def test_current_frames(self):
have_threads = True
try:
import _thread
except ImportError:
have_threads = False
if have_threads:
self.current_frames_with_threads()
else:
self.current_frames_without_threads()
# Test sys._current_frames() in a WITH_THREADS build.
def current_frames_without_threads(self):
# Not much happens here: there is only one thread, with artificial
# "thread id" 0.
d = sys._current_frames()
self.assertEqual(len(d), 1)
self.assertIn(0, d)
self.assertTrue(d[0] is sys._getframe())
def dumpstacks(self):
id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
for threadId, stack in sys._current_frames().items():
self.logger.debug("# Thread: %s(%d)" % (id2name.get(threadId,""), threadId))
for filename, lineno, name, line in traceback.extract_stack(stack):
self.logger.debug('File: "%s", line %d, in %s' % (filename, lineno, name))
if line:
self.logger.debug(" %s" % (line.strip()))
def dump_tracing_state(self, context):
""" A debug tool to dump all threads tracing state
"""
print "Dumping all threads Tracing state: (%s)" % context
print " self.tracing_enabled=%s" % self.tracing_enabled
print " self.execution_started=%s" % self.execution_started
print " self.frame_beginning=%s" % self.frame_beginning
print " self.debugger_thread_ident=%s" % self.debugger_thread_ident
for thr in threading.enumerate():
is_current_thread = thr.ident == threading.current_thread().ident
print " Thread: %s, %s %s" % (thr.name, thr.ident, "<= Current*" if is_current_thread else '')
a_frame = sys._current_frames()[thr.ident]
while a_frame:
flags = []
if a_frame == self.frame_beginning:
flags.append("beginning")
if a_frame == inspect.currentframe():
flags.append("current")
if flags:
flags_str = "**"+",".join(flags)
else:
flags_str = ""
print " => %s, %s:%s(%s) | %s %s" % (a_frame,
a_frame.f_code.co_filename,
a_frame.f_lineno,
a_frame.f_code.co_name,
a_frame.f_trace,
flags_str)
a_frame = a_frame.f_back
def stackdump(sig, frm):
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# ThreadID: %s" % threadId)
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
if line:
code.append(" %s" % (line.strip()))
print "\n".join(code)
def stackdump(sig, frm):
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# ThreadID: %s" % threadId)
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
if line:
code.append(" %s" % (line.strip()))
print "\n".join(code)
def threads_handler():
frames = sys._current_frames()
text = ['%d threads found\n\n' % len(frames)]
for i, frame in frames.items():
s = 'Thread 0x%x:\n%s\n' % (i, ''.join(traceback.format_stack(frame)))
text.append(s)
return { 'Content-Type': 'text/plain' }, ''.join(text)
def threads_handler():
frames = sys._current_frames()
text = ['%d threads found\n\n' % len(frames)]
for i, frame in frames.items():
s = 'Thread 0x%x:\n%s\n' % (i, ''.join(traceback.format_stack(frame)))
text.append(s)
return { 'Content-Type': 'text/plain' }, ''.join(text)