def trace_func(self, frame, event, arg):
# If we're so far into process shutdown that sys is already gone, just stop tracing.
if sys is None:
return None
elif self.is_sending:
# https://pytools.codeplex.com/workitem/1864
# we're currently doing I/O w/ the socket, we don't want to deliver
# any breakpoints or async breaks because we'll deadlock. Continue
# to return the trace function so all of our frames remain
# balanced. A better way to deal with this might be to do
# sys.settrace(None) when we take the send lock, but that's much
# more difficult because our send context manager is used both
# inside and outside of the trace function, and so is used when
# tracing is enabled and disabled, and so it's very easy to get our
# current frame tracking to be thrown off...
return self.trace_func
try:
# if should_debug_code(frame.f_code) is not true during attach
# the current frame is None and a pop_frame will cause an exception and
# break the debugger
if self.cur_frame is None:
# happens during attach, we need frame for blocking
self.push_frame(frame)
if self.stepping == STEPPING_BREAK and should_debug_code(frame.f_code):
if self.detach:
if stackless is not None:
stackless.set_schedule_callback(None)
stackless.tasklet.__call__ = self.__oldstacklesscall__
sys.settrace(None)
return None
self.async_break()
return self._events[event](frame, arg)
except (StackOverflowException, KeyboardInterrupt):
# stack overflow, disable tracing
return self.trace_func
python类trace()的实例源码
def handle_return(self, frame, arg):
self.pop_frame()
if not DETACHED:
stepping = self.stepping
# only update stepping state when this frame is debuggable (matching handle_call)
if stepping is not STEPPING_NONE and should_debug_code(frame.f_code):
if stepping > STEPPING_OVER:
self.stepping -= 1
elif stepping < STEPPING_OUT:
self.stepping += 1
elif stepping in USER_STEPPING:
if self.cur_frame is None or frame.f_code.co_name == "<module>" :
# only return to user code modules
if self.should_block_on_frame(frame):
# restore back the module frame for the step out of a module
self.push_frame(ModuleExitFrame(frame))
self.stepping = STEPPING_NONE
update_all_thread_stacks(self)
self.block(lambda: (report_step_finished(self.id), mark_all_threads_for_break(skip_thread = self)))
self.pop_frame()
elif self.should_block_on_frame(self.cur_frame):
# if we're returning into non-user code then don't block in the
# non-user code, wait until we hit user code again
self.stepping = STEPPING_NONE
update_all_thread_stacks(self)
self.block(lambda: (report_step_finished(self.id), mark_all_threads_for_break(skip_thread = self)))
# forward call to previous trace function, if any
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
old_trace_func(frame, 'return', arg)
# restore previous frames trace function if there is one
if self.trace_func_stack:
self.prev_trace_func = self.trace_func_stack.pop()
def _dumpf(frame):
if frame is None:
return "<None>"
else:
addn = "(with trace!)"
if frame.f_trace is None:
addn = " **No Trace Set **"
return "Frame at %d, file %s, line: %d%s" % (id(frame), frame.f_code.co_filename, frame.f_lineno, addn)
def dispatch_return(self, frame, arg):
traceenter("dispatch_return", _dumpf(frame), arg)
if self.logicalbotframe is frame:
# We dont want to debug parent frames.
tracev("dispatch_return resetting sys.trace")
sys.settrace(None)
return
# self.bSetTrace = 0
self.currentframe = frame.f_back
return bdb.Bdb.dispatch_return(self, frame, arg)
def trace_dispatch(self, frame, event, arg):
traceenter("trace_dispatch", _dumpf(frame), event, arg)
if self.debugApplication is None:
trace("trace_dispatch has no application!")
return # None
return bdb.Bdb.trace_dispatch(self, frame, event, arg)
#
# The user functions do bugger all!
#
# def user_call(self, frame, argument_list):
# traceenter("user_call",_dumpf(frame))
def AttachApp(self, debugApplication, codeContainerProvider):
# traceenter("AttachApp", debugApplication, codeContainerProvider)
self.codeContainerProvider = codeContainerProvider
self.debugApplication = debugApplication
self.stackSniffer = _wrap(stackframe.DebugStackFrameSniffer(self), axdebug.IID_IDebugStackFrameSniffer)
self.stackSnifferCookie = debugApplication.AddStackFrameSniffer(self.stackSniffer)
# trace("StackFrameSniffer added (%d)" % self.stackSnifferCookie)
# Connect to the application events.
self.appEventConnection = win32com.client.connect.SimpleConnection(self.debugApplication, self, axdebug.IID_IRemoteDebugApplicationEvents)
def SetupAXDebugging(self, baseFrame = None, userFrame = None):
"""Get ready for potential debugging. Must be called on the thread
that is being debugged.
"""
# userFrame is for non AXScript debugging. This is the first frame of the
# users code.
if userFrame is None:
userFrame = baseFrame
else:
# We have missed the "dispatch_call" function, so set this up now!
userFrame.f_locals['__axstack_address__'] = axdebug.GetStackAddress()
traceenter("SetupAXDebugging", self)
self._threadprotectlock.acquire()
try:
thisThread = win32api.GetCurrentThreadId()
if self.debuggingThread is None:
self.debuggingThread = thisThread
else:
if self.debuggingThread!=thisThread:
trace("SetupAXDebugging called on other thread - ignored!")
return
# push our context.
self.recursiveData.insert(0, (self.logicalbotframe,self.stopframe, self.currentframe,self.debuggingThreadStateHandle))
finally:
self._threadprotectlock.release()
trace("SetupAXDebugging has base frame as", _dumpf(baseFrame))
self.botframe = baseFrame
self.stopframe = userFrame
self.logicalbotframe = baseFrame
self.currentframe = None
self.debuggingThreadStateHandle = axdebug.GetThreadStateHandle()
self._BreakFlagsChanged()
# RemoteDebugApplicationEvents
def _BreakFlagsChanged(self):
traceenter("_BreakFlagsChanged to %s with our thread = %s, and debugging thread = %s" % (self.breakFlags, self.debuggingThread, win32api.GetCurrentThreadId()))
trace("_BreakFlagsChanged has breaks", self.breaks)
# If a request comes on our debugging thread, then do it now!
# if self.debuggingThread!=win32api.GetCurrentThreadId():
# return
if len(self.breaks) or self.breakFlags:
if self.logicalbotframe:
trace("BreakFlagsChange with bot frame", _dumpf(self.logicalbotframe))
# We have frames not to be debugged (eg, Scripting engine frames
# (sys.settrace will be set when out logicalbotframe is hit -
# this may not be the right thing to do, as it may not cause the
# immediate break we desire.)
self.logicalbotframe.f_trace = self.trace_dispatch
else:
trace("BreakFlagsChanged, but no bottom frame")
if self.stopframe is not None:
self.stopframe.f_trace = self.trace_dispatch
# If we have the thread-state for the thread being debugged, then
# we dynamically set its trace function - it is possible that the thread
# being debugged is in a blocked call (eg, a message box) and we
# want to hit the debugger the instant we return
if self.debuggingThreadStateHandle is not None and \
self.breakFlags and \
self.debuggingThread != win32api.GetCurrentThreadId():
axdebug.SetThreadStateTrace(self.debuggingThreadStateHandle, self.trace_dispatch)
def _dumpf(frame):
if frame is None:
return "<None>"
else:
addn = "(with trace!)"
if frame.f_trace is None:
addn = " **No Trace Set **"
return "Frame at %d, file %s, line: %d%s" % (id(frame), frame.f_code.co_filename, frame.f_lineno, addn)
def dispatch_return(self, frame, arg):
traceenter("dispatch_return", _dumpf(frame), arg)
if self.logicalbotframe is frame:
# We dont want to debug parent frames.
tracev("dispatch_return resetting sys.trace")
sys.settrace(None)
return
# self.bSetTrace = 0
self.currentframe = frame.f_back
return bdb.Bdb.dispatch_return(self, frame, arg)
def trace_dispatch(self, frame, event, arg):
traceenter("trace_dispatch", _dumpf(frame), event, arg)
if self.debugApplication is None:
trace("trace_dispatch has no application!")
return # None
return bdb.Bdb.trace_dispatch(self, frame, event, arg)
#
# The user functions do bugger all!
#
# def user_call(self, frame, argument_list):
# traceenter("user_call",_dumpf(frame))
def AttachApp(self, debugApplication, codeContainerProvider):
# traceenter("AttachApp", debugApplication, codeContainerProvider)
self.codeContainerProvider = codeContainerProvider
self.debugApplication = debugApplication
self.stackSniffer = _wrap(stackframe.DebugStackFrameSniffer(self), axdebug.IID_IDebugStackFrameSniffer)
self.stackSnifferCookie = debugApplication.AddStackFrameSniffer(self.stackSniffer)
# trace("StackFrameSniffer added (%d)" % self.stackSnifferCookie)
# Connect to the application events.
self.appEventConnection = win32com.client.connect.SimpleConnection(self.debugApplication, self, axdebug.IID_IRemoteDebugApplicationEvents)
def SetupAXDebugging(self, baseFrame = None, userFrame = None):
"""Get ready for potential debugging. Must be called on the thread
that is being debugged.
"""
# userFrame is for non AXScript debugging. This is the first frame of the
# users code.
if userFrame is None:
userFrame = baseFrame
else:
# We have missed the "dispatch_call" function, so set this up now!
userFrame.f_locals['__axstack_address__'] = axdebug.GetStackAddress()
traceenter("SetupAXDebugging", self)
self._threadprotectlock.acquire()
try:
thisThread = win32api.GetCurrentThreadId()
if self.debuggingThread is None:
self.debuggingThread = thisThread
else:
if self.debuggingThread!=thisThread:
trace("SetupAXDebugging called on other thread - ignored!")
return
# push our context.
self.recursiveData.insert(0, (self.logicalbotframe,self.stopframe, self.currentframe,self.debuggingThreadStateHandle))
finally:
self._threadprotectlock.release()
trace("SetupAXDebugging has base frame as", _dumpf(baseFrame))
self.botframe = baseFrame
self.stopframe = userFrame
self.logicalbotframe = baseFrame
self.currentframe = None
self.debuggingThreadStateHandle = axdebug.GetThreadStateHandle()
self._BreakFlagsChanged()
# RemoteDebugApplicationEvents
def _BreakFlagsChanged(self):
traceenter("_BreakFlagsChanged to %s with our thread = %s, and debugging thread = %s" % (self.breakFlags, self.debuggingThread, win32api.GetCurrentThreadId()))
trace("_BreakFlagsChanged has breaks", self.breaks)
# If a request comes on our debugging thread, then do it now!
# if self.debuggingThread!=win32api.GetCurrentThreadId():
# return
if len(self.breaks) or self.breakFlags:
if self.logicalbotframe:
trace("BreakFlagsChange with bot frame", _dumpf(self.logicalbotframe))
# We have frames not to be debugged (eg, Scripting engine frames
# (sys.settrace will be set when out logicalbotframe is hit -
# this may not be the right thing to do, as it may not cause the
# immediate break we desire.)
self.logicalbotframe.f_trace = self.trace_dispatch
else:
trace("BreakFlagsChanged, but no bottom frame")
if self.stopframe is not None:
self.stopframe.f_trace = self.trace_dispatch
# If we have the thread-state for the thread being debugged, then
# we dynamically set its trace function - it is possible that the thread
# being debugged is in a blocked call (eg, a message box) and we
# want to hit the debugger the instant we return
if self.debuggingThreadStateHandle is not None and \
self.breakFlags and \
self.debuggingThread != win32api.GetCurrentThreadId():
axdebug.SetThreadStateTrace(self.debuggingThreadStateHandle, self.trace_dispatch)
def _stackless_attach(self):
try:
stackless.tasklet.trace_function
except AttributeError:
# the tasklets need to be traced on a case by case basis
# sys.trace needs to be called within their calling context
def __call__(tsk, *args, **kwargs):
f = tsk.tempval
def new_f(old_f, args, kwargs):
sys.settrace(self.trace_func)
try:
if old_f is not None:
return old_f(*args, **kwargs)
finally:
sys.settrace(None)
tsk.tempval = new_f
stackless.tasklet.setup(tsk, f, args, kwargs)
return tsk
def settrace(tsk, tb):
if hasattr(tsk.frame, "f_trace"):
tsk.frame.f_trace = tb
sys.settrace(tb)
self.__oldstacklesscall__ = stackless.tasklet.__call__
stackless.tasklet.settrace = settrace
stackless.tasklet.__call__ = __call__
if sys.platform == 'cli':
self.frames = []
def context_dispatcher(self, old, new):
self.stepping = STEPPING_NONE
# for those tasklets that started before we started tracing
# we need to make sure that the trace is set by patching
# it in the context switch
if old and new:
if hasattr(new.frame, "f_trace") and not new.frame.f_trace:
sys.call_tracing(new.settrace,(self.trace_func,))
def _stackless_schedule_cb(self, prev, next):
current = stackless.getcurrent()
if not current:
return
current_tf = current.trace_function
try:
current.trace_function = None
self.stepping = STEPPING_NONE
# If the current frame has no trace function, we may need to get it
# from the previous frame, depending on how we ended up in the
# callback.
if current_tf is None:
f_back = current.frame.f_back
if f_back is not None:
current_tf = f_back.f_trace
if next is not None:
# Assign our trace function to the current stack
f = next.frame
if next is current:
f = f.f_back
while f:
if isinstance(f, types.FrameType):
f.f_trace = self.trace_func
f = f.f_back
next.trace_function = self.trace_func
finally:
current.trace_function = current_tf
def handle_return(self, frame, arg):
self.pop_frame()
if not DETACHED:
stepping = self.stepping
# only update stepping state when this frame is debuggable (matching handle_call)
if stepping is not STEPPING_NONE and should_debug_code(frame.f_code):
if stepping > STEPPING_OVER:
self.stepping -= 1
elif stepping < STEPPING_OUT:
self.stepping += 1
elif stepping in USER_STEPPING:
if self.cur_frame is None or frame.f_code.co_name == "<module>" :
# only return to user code modules
if self.should_block_on_frame(frame):
# restore back the module frame for the step out of a module
self.push_frame(ModuleExitFrame(frame))
self.stepping = STEPPING_NONE
update_all_thread_stacks(self)
self.block(lambda: (report_step_finished(self.id), mark_all_threads_for_break(skip_thread = self)))
self.pop_frame()
elif self.should_block_on_frame(self.cur_frame):
# if we're returning into non-user code then don't block in the
# non-user code, wait until we hit user code again
self.stepping = STEPPING_NONE
update_all_thread_stacks(self)
self.block(lambda: (report_step_finished(self.id), mark_all_threads_for_break(skip_thread = self)))
# forward call to previous trace function, if any
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
old_trace_func(frame, 'return', arg)
# restore previous frames trace function if there is one
if self.trace_func_stack:
self.prev_trace_func = self.trace_func_stack.pop()
def context_dispatcher(self, old, new):
self.stepping = STEPPING_NONE
# for those tasklets that started before we started tracing
# we need to make sure that the trace is set by patching
# it in the context switch
if not old:
pass # starting new
elif not new:
pass # killing prev
else:
if hasattr(new.frame, "f_trace") and not new.frame.f_trace:
sys.call_tracing(new.settrace,(self.trace_func,))
def _stackless_attach(self):
try:
stackless.tasklet.trace_function
except AttributeError:
# the tasklets need to be traced on a case by case basis
# sys.trace needs to be called within their calling context
def __call__(tsk, *args, **kwargs):
f = tsk.tempval
def new_f(old_f, args, kwargs):
sys.settrace(self.trace_func)
try:
if old_f is not None:
return old_f(*args, **kwargs)
finally:
sys.settrace(None)
tsk.tempval = new_f
stackless.tasklet.setup(tsk, f, args, kwargs)
return tsk
def settrace(tsk, tb):
if hasattr(tsk.frame, "f_trace"):
tsk.frame.f_trace = tb
sys.settrace(tb)
self.__oldstacklesscall__ = stackless.tasklet.__call__
stackless.tasklet.settrace = settrace
stackless.tasklet.__call__ = __call__
if sys.platform == 'cli':
self.frames = []