def should_break(self, thread, ex_type, ex_value, trace):
probe_stack()
name = get_exception_name(ex_type)
mode = self.break_on.get(name, self.default_mode)
break_type = BREAK_TYPE_NONE
if mode & BREAK_MODE_ALWAYS:
if self.is_handled(thread, ex_type, ex_value, trace):
break_type = BREAK_TYPE_HANDLED
else:
break_type = BREAK_TYPE_UNHANDLED
elif (mode & BREAK_MODE_UNHANDLED) and not self.is_handled(thread, ex_type, ex_value, trace):
break_type = BREAK_TYPE_UNHANDLED
if break_type:
if issubclass(ex_type, SystemExit):
if not BREAK_ON_SYSTEMEXIT_ZERO:
if not ex_value or (isinstance(ex_value, SystemExit) and not ex_value.code):
break_type = BREAK_TYPE_NONE
return break_type
python类trace()的实例源码
def handle_exception(self, frame, arg):
if self.stepping == STEPPING_ATTACH_BREAK:
self.block_maybe_attach()
if not DETACHED and should_debug_code(frame.f_code):
break_type = BREAK_ON.should_break(self, *arg)
if break_type:
update_all_thread_stacks(self)
self.block(lambda: report_exception(frame, arg, self.id, break_type))
# forward call to previous trace function, if any, updating the current trace function
# with a new one if available
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
self.prev_trace_func = old_trace_func(frame, 'exception', arg)
return self.trace_func
def should_break(self, thread, ex_type, ex_value, trace):
probe_stack()
name = get_exception_name(ex_type)
mode = self.break_on.get(name, self.default_mode)
break_type = BREAK_TYPE_NONE
if mode & BREAK_MODE_ALWAYS:
if self.is_handled(thread, ex_type, ex_value, trace):
break_type = BREAK_TYPE_HANDLED
else:
break_type = BREAK_TYPE_UNHANDLED
elif (mode & BREAK_MODE_UNHANDLED) and not self.is_handled(thread, ex_type, ex_value, trace):
break_type = BREAK_TYPE_UNHANDLED
if break_type:
if issubclass(ex_type, SystemExit):
if not BREAK_ON_SYSTEMEXIT_ZERO:
if not ex_value or (isinstance(ex_value, SystemExit) and not ex_value.code):
break_type = BREAK_TYPE_NONE
return break_type
def dispatch_line(self, frame):
traceenter("dispatch_line", _dumpf(frame), _dumpf(self.botframe))
# trace("logbotframe is", _dumpf(self.logicalbotframe), "botframe is", self.botframe)
if frame is self.logicalbotframe:
trace("dispatch_line", _dumpf(frame), "for bottom frame returing tracer")
# The next code executed in the frame above may be a builtin (eg, apply())
# in which sys.trace needs to be set.
sys.settrace(self.trace_dispatch)
# And return the tracer incase we are about to execute Python code,
# in which case sys tracer is ignored!
return self.trace_dispatch
if self.codeContainerProvider.FromFileName(frame.f_code.co_filename) is None:
trace("dispatch_line has no document for", _dumpf(frame), "- skipping trace!")
return None
self.currentframe = frame # So the stack sniffer knows our most recent, debuggable code.
return bdb.Bdb.dispatch_line(self, frame)
def dispatch_call(self, frame, arg):
traceenter("dispatch_call",_dumpf(frame))
frame.f_locals['__axstack_address__'] = axdebug.GetStackAddress()
if frame is self.botframe:
trace("dispatch_call is self.botframe - returning tracer")
return self.trace_dispatch
# Not our bottom frame. If we have a document for it,
# then trace it, otherwise run at full speed.
if self.codeContainerProvider.FromFileName(frame.f_code.co_filename) is None:
trace("dispatch_call has no document for", _dumpf(frame), "- skipping trace!")
## sys.settrace(None)
return None
return self.trace_dispatch
# rc = bdb.Bdb.dispatch_call(self, frame, arg)
# trace("dispatch_call", _dumpf(frame),"returned",rc)
# return rc
def CloseApp(self):
traceenter("ClosingApp")
self.reset()
self.logicalbotframe = None
if self.stackSnifferCookie is not None:
try:
self.debugApplication.RemoveStackFrameSniffer(self.stackSnifferCookie)
except pythoncom.com_error:
trace("*** Could not RemoveStackFrameSniffer %d" % (self.stackSnifferCookie))
if self.stackSniffer:
_wrap_remove(self.stackSniffer)
self.stackSnifferCookie = self.stackSniffer = None
if self.appEventConnection is not None:
self.appEventConnection.Disconnect()
self.appEventConnection = None
self.debugApplication = None
self.appDebugger = None
if self.codeContainerProvider is not None:
self.codeContainerProvider.Close()
self.codeContainerProvider = None
def ResetAXDebugging(self):
traceenter("ResetAXDebugging", self, "with refcount", len(self.recursiveData))
if win32api.GetCurrentThreadId()!=self.debuggingThread:
trace("ResetAXDebugging called on other thread")
return
if len(self.recursiveData)==0:
# print "ResetAXDebugging called for final time."
self.logicalbotframe = None
self.debuggingThread = None
self.currentframe = None
self.debuggingThreadStateHandle = None
return
self.logbotframe, self.stopframe, self.currentframe, self.debuggingThreadStateHandle = self.recursiveData[0]
self.recursiveData = self.recursiveData[1:]
def dispatch_line(self, frame):
traceenter("dispatch_line", _dumpf(frame), _dumpf(self.botframe))
# trace("logbotframe is", _dumpf(self.logicalbotframe), "botframe is", self.botframe)
if frame is self.logicalbotframe:
trace("dispatch_line", _dumpf(frame), "for bottom frame returing tracer")
# The next code executed in the frame above may be a builtin (eg, apply())
# in which sys.trace needs to be set.
sys.settrace(self.trace_dispatch)
# And return the tracer incase we are about to execute Python code,
# in which case sys tracer is ignored!
return self.trace_dispatch
if self.codeContainerProvider.FromFileName(frame.f_code.co_filename) is None:
trace("dispatch_line has no document for", _dumpf(frame), "- skipping trace!")
return None
self.currentframe = frame # So the stack sniffer knows our most recent, debuggable code.
return bdb.Bdb.dispatch_line(self, frame)
def dispatch_call(self, frame, arg):
traceenter("dispatch_call",_dumpf(frame))
frame.f_locals['__axstack_address__'] = axdebug.GetStackAddress()
if frame is self.botframe:
trace("dispatch_call is self.botframe - returning tracer")
return self.trace_dispatch
# Not our bottom frame. If we have a document for it,
# then trace it, otherwise run at full speed.
if self.codeContainerProvider.FromFileName(frame.f_code.co_filename) is None:
trace("dispatch_call has no document for", _dumpf(frame), "- skipping trace!")
## sys.settrace(None)
return None
return self.trace_dispatch
# rc = bdb.Bdb.dispatch_call(self, frame, arg)
# trace("dispatch_call", _dumpf(frame),"returned",rc)
# return rc
def CloseApp(self):
traceenter("ClosingApp")
self.reset()
self.logicalbotframe = None
if self.stackSnifferCookie is not None:
try:
self.debugApplication.RemoveStackFrameSniffer(self.stackSnifferCookie)
except pythoncom.com_error:
trace("*** Could not RemoveStackFrameSniffer %d" % (self.stackSnifferCookie))
if self.stackSniffer:
_wrap_remove(self.stackSniffer)
self.stackSnifferCookie = self.stackSniffer = None
if self.appEventConnection is not None:
self.appEventConnection.Disconnect()
self.appEventConnection = None
self.debugApplication = None
self.appDebugger = None
if self.codeContainerProvider is not None:
self.codeContainerProvider.Close()
self.codeContainerProvider = None
def ResetAXDebugging(self):
traceenter("ResetAXDebugging", self, "with refcount", len(self.recursiveData))
if win32api.GetCurrentThreadId()!=self.debuggingThread:
trace("ResetAXDebugging called on other thread")
return
if len(self.recursiveData)==0:
# print "ResetAXDebugging called for final time."
self.logicalbotframe = None
self.debuggingThread = None
self.currentframe = None
self.debuggingThreadStateHandle = None
return
self.logbotframe, self.stopframe, self.currentframe, self.debuggingThreadStateHandle = self.recursiveData[0]
self.recursiveData = self.recursiveData[1:]
def should_break(self, thread, ex_type, ex_value, trace):
probe_stack()
name = get_exception_name(ex_type)
mode = self.break_on.get(name, self.default_mode)
break_type = BREAK_TYPE_NONE
if mode & BREAK_MODE_ALWAYS:
if self.is_handled(thread, ex_type, ex_value, trace):
break_type = BREAK_TYPE_HANDLED
else:
break_type = BREAK_TYPE_UNHANDLED
elif (mode & BREAK_MODE_UNHANDLED) and not self.is_handled(thread, ex_type, ex_value, trace):
break_type = BREAK_TYPE_UNHANDLED
if break_type:
if issubclass(ex_type, SystemExit):
if not BREAK_ON_SYSTEMEXIT_ZERO:
if not ex_value or (isinstance(ex_value, SystemExit) and not ex_value.code):
break_type = BREAK_TYPE_NONE
return break_type
def handle_exception(self, frame, arg):
if self.stepping == STEPPING_ATTACH_BREAK:
self.block_maybe_attach()
if not DETACHED and should_debug_code(frame.f_code):
break_type = BREAK_ON.should_break(self, *arg)
if break_type:
update_all_thread_stacks(self)
self.block(lambda: report_exception(frame, arg, self.id, break_type))
# forward call to previous trace function, if any, updating the current trace function
# with a new one if available
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
self.prev_trace_func = old_trace_func(frame, 'exception', arg)
return self.trace_func
def ShouldBreak(self, thread, ex_type, ex_value, trace):
probe_stack()
name = ex_type.__module__ + '.' + ex_type.__name__
mode = self.break_on.get(name, self.default_mode)
break_type = BREAK_TYPE_NONE
if mode & BREAK_MODE_ALWAYS:
if self.IsHandled(thread, ex_type, ex_value, trace):
break_type = BREAK_TYPE_HANDLED
else:
break_type = BREAK_TYPE_UNHANLDED
elif (mode & BREAK_MODE_UNHANDLED) and not self.IsHandled(thread, ex_type, ex_value, trace):
break_type = BREAK_TYPE_HANDLED
if break_type:
if issubclass(ex_type, SystemExit):
if not BREAK_ON_SYSTEMEXIT_ZERO:
if ((isinstance(ex_value, int) and not ex_value) or
(isinstance(ex_value, SystemExit) and not ex_value.code)):
break_type = BREAK_TYPE_NONE
return break_type
def handle_exception(self, frame, arg):
if self.stepping == STEPPING_ATTACH_BREAK:
self.block_maybe_attach()
if not DETACHED and should_debug_code(frame.f_code):
break_type = BREAK_ON.ShouldBreak(self, *arg)
if break_type:
update_all_thread_stacks(self)
self.block(lambda: report_exception(frame, arg, self.id, break_type))
# forward call to previous trace function, if any, updating the current trace function
# with a new one if available
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
self.prev_trace_func = old_trace_func(frame, 'exception', arg)
return self.trace_func
def should_break(self, thread, ex_type, ex_value, trace):
probe_stack()
name = get_exception_name(ex_type)
mode = self.break_on.get(name, self.default_mode)
break_type = BREAK_TYPE_NONE
if mode & BREAK_MODE_ALWAYS:
if self.is_handled(thread, ex_type, ex_value, trace):
break_type = BREAK_TYPE_HANDLED
else:
break_type = BREAK_TYPE_UNHANDLED
elif (mode & BREAK_MODE_UNHANDLED) and not self.is_handled(thread, ex_type, ex_value, trace):
break_type = BREAK_TYPE_UNHANDLED
if break_type:
if issubclass(ex_type, SystemExit):
if not BREAK_ON_SYSTEMEXIT_ZERO:
if not ex_value or (isinstance(ex_value, SystemExit) and not ex_value.code):
break_type = BREAK_TYPE_NONE
return break_type
def handle_exception(self, frame, arg):
if self.stepping == STEPPING_ATTACH_BREAK:
self.block_maybe_attach()
if not DETACHED and should_debug_code(frame.f_code):
break_type = BREAK_ON.should_break(self, *arg)
if break_type:
update_all_thread_stacks(self)
self.block(lambda: report_exception(frame, arg, self.id, break_type))
# forward call to previous trace function, if any, updating the current trace function
# with a new one if available
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
self.prev_trace_func = old_trace_func(frame, 'exception', arg)
return self.trace_func
def should_break(self, thread, ex_type, ex_value, trace):
probe_stack()
name = get_exception_name(ex_type)
mode = self.break_on.get(name, self.default_mode)
break_type = BREAK_TYPE_NONE
if mode & BREAK_MODE_ALWAYS:
if self.is_handled(thread, ex_type, ex_value, trace):
break_type = BREAK_TYPE_HANDLED
else:
break_type = BREAK_TYPE_UNHANDLED
elif (mode & BREAK_MODE_UNHANDLED) and not self.is_handled(thread, ex_type, ex_value, trace):
break_type = BREAK_TYPE_UNHANDLED
if break_type:
if issubclass(ex_type, SystemExit):
if not BREAK_ON_SYSTEMEXIT_ZERO:
if ((isinstance(ex_value, int) and not ex_value) or
(isinstance(ex_value, SystemExit) and not ex_value.code)):
break_type = BREAK_TYPE_NONE
return break_type
def handle_exception(self, frame, arg):
if self.stepping == STEPPING_ATTACH_BREAK:
self.block_maybe_attach()
if not DETACHED and should_debug_code(frame.f_code):
break_type = BREAK_ON.should_break(self, *arg)
if break_type:
update_all_thread_stacks(self)
self.block(lambda: report_exception(frame, arg, self.id, break_type))
# forward call to previous trace function, if any, updating the current trace function
# with a new one if available
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
self.prev_trace_func = old_trace_func(frame, 'exception', arg)
return self.trace_func
def should_break(self, thread, ex_type, ex_value, trace):
probe_stack()
name = get_exception_name(ex_type)
mode = self.break_on.get(name, self.default_mode)
break_type = BREAK_TYPE_NONE
if mode & BREAK_MODE_ALWAYS:
if self.is_handled(thread, ex_type, ex_value, trace):
break_type = BREAK_TYPE_HANDLED
else:
break_type = BREAK_TYPE_UNHANDLED
elif (mode & BREAK_MODE_UNHANDLED) and not self.is_handled(thread, ex_type, ex_value, trace):
break_type = BREAK_TYPE_UNHANDLED
if break_type:
if issubclass(ex_type, SystemExit):
if not BREAK_ON_SYSTEMEXIT_ZERO:
if not ex_value or (isinstance(ex_value, SystemExit) and not ex_value.code):
break_type = BREAK_TYPE_NONE
return break_type
def handle_exception(self, frame, arg):
if self.stepping == STEPPING_ATTACH_BREAK:
self.block_maybe_attach()
if not DETACHED and should_debug_code(frame.f_code):
break_type = BREAK_ON.should_break(self, *arg)
if break_type:
update_all_thread_stacks(self)
self.block(lambda: report_exception(frame, arg, self.id, break_type))
# forward call to previous trace function, if any, updating the current trace function
# with a new one if available
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
self.prev_trace_func = old_trace_func(frame, 'exception', arg)
return self.trace_func
def should_break(self, thread, ex_type, ex_value, trace):
probe_stack()
name = get_exception_name(ex_type)
mode = self.break_on.get(name, self.default_mode)
break_type = BREAK_TYPE_NONE
if mode & BREAK_MODE_ALWAYS:
if self.is_handled(thread, ex_type, ex_value, trace):
break_type = BREAK_TYPE_HANDLED
else:
break_type = BREAK_TYPE_UNHANDLED
elif (mode & BREAK_MODE_UNHANDLED) and not self.is_handled(thread, ex_type, ex_value, trace):
break_type = BREAK_TYPE_UNHANDLED
if break_type:
if issubclass(ex_type, SystemExit):
if not BREAK_ON_SYSTEMEXIT_ZERO:
if not ex_value or (isinstance(ex_value, SystemExit) and not ex_value.code):
break_type = BREAK_TYPE_NONE
return break_type
def handle_exception(self, frame, arg):
if self.stepping == STEPPING_ATTACH_BREAK:
self.block_maybe_attach()
if not DETACHED and should_debug_code(frame.f_code):
break_type = BREAK_ON.should_break(self, *arg)
if break_type:
update_all_thread_stacks(self)
self.block(lambda: report_exception(frame, arg, self.id, break_type))
# forward call to previous trace function, if any, updating the current trace function
# with a new one if available
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
self.prev_trace_func = old_trace_func(frame, 'exception', arg)
return self.trace_func
def _stackless_attach(self):
try:
stackless.tasklet.trace_function
except AttributeError:
# the tasklets need to be traced on a case by case basis
# sys.trace needs to be called within their calling context
def __call__(tsk, *args, **kwargs):
f = tsk.tempval
def new_f(old_f, args, kwargs):
sys.settrace(self.trace_func)
try:
if old_f is not None:
return old_f(*args, **kwargs)
finally:
sys.settrace(None)
tsk.tempval = new_f
stackless.tasklet.setup(tsk, f, args, kwargs)
return tsk
def settrace(tsk, tb):
if hasattr(tsk.frame, "f_trace"):
tsk.frame.f_trace = tb
sys.settrace(tb)
self.__oldstacklesscall__ = stackless.tasklet.__call__
stackless.tasklet.settrace = settrace
stackless.tasklet.__call__ = __call__
if sys.platform == 'cli':
self.frames = []
def context_dispatcher(self, old, new):
self.stepping = STEPPING_NONE
# for those tasklets that started before we started tracing
# we need to make sure that the trace is set by patching
# it in the context switch
if old and new:
if hasattr(new.frame, "f_trace") and not new.frame.f_trace:
sys.call_tracing(new.settrace,(self.trace_func,))
def _stackless_schedule_cb(self, prev, next):
current = stackless.getcurrent()
if not current:
return
current_tf = current.trace_function
try:
current.trace_function = None
self.stepping = STEPPING_NONE
# If the current frame has no trace function, we may need to get it
# from the previous frame, depending on how we ended up in the
# callback.
if current_tf is None:
f_back = current.frame.f_back
if f_back is not None:
current_tf = f_back.f_trace
if next is not None:
# Assign our trace function to the current stack
f = next.frame
if next is current:
f = f.f_back
while f:
if isinstance(f, types.FrameType):
f.f_trace = self.trace_func
f = f.f_back
next.trace_function = self.trace_func
finally:
current.trace_function = current_tf
def handle_return(self, frame, arg):
self.pop_frame()
if not DETACHED:
stepping = self.stepping
# only update stepping state when this frame is debuggable (matching handle_call)
if stepping is not STEPPING_NONE and should_debug_code(frame.f_code):
if stepping > STEPPING_OVER:
self.stepping -= 1
elif stepping < STEPPING_OUT:
self.stepping += 1
elif stepping in USER_STEPPING:
if self.cur_frame is None or frame.f_code.co_name == "<module>" :
# only return to user code modules
if self.should_block_on_frame(frame):
# restore back the module frame for the step out of a module
self.push_frame(ModuleExitFrame(frame))
self.stepping = STEPPING_NONE
update_all_thread_stacks(self)
self.block(lambda: (report_step_finished(self.id), mark_all_threads_for_break(skip_thread = self)))
self.pop_frame()
elif self.should_block_on_frame(self.cur_frame):
# if we're returning into non-user code then don't block in the
# non-user code, wait until we hit user code again
self.stepping = STEPPING_NONE
update_all_thread_stacks(self)
self.block(lambda: (report_step_finished(self.id), mark_all_threads_for_break(skip_thread = self)))
# forward call to previous trace function, if any
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
old_trace_func(frame, 'return', arg)
# restore previous frames trace function if there is one
if self.trace_func_stack:
self.prev_trace_func = self.trace_func_stack.pop()
def _stackless_attach(self):
try:
stackless.tasklet.trace_function
except AttributeError:
# the tasklets need to be traced on a case by case basis
# sys.trace needs to be called within their calling context
def __call__(tsk, *args, **kwargs):
f = tsk.tempval
def new_f(old_f, args, kwargs):
sys.settrace(self.trace_func)
try:
if old_f is not None:
return old_f(*args, **kwargs)
finally:
sys.settrace(None)
tsk.tempval = new_f
stackless.tasklet.setup(tsk, f, args, kwargs)
return tsk
def settrace(tsk, tb):
if hasattr(tsk.frame, "f_trace"):
tsk.frame.f_trace = tb
sys.settrace(tb)
self.__oldstacklesscall__ = stackless.tasklet.__call__
stackless.tasklet.settrace = settrace
stackless.tasklet.__call__ = __call__
if sys.platform == 'cli':
self.frames = []
def context_dispatcher(self, old, new):
self.stepping = STEPPING_NONE
# for those tasklets that started before we started tracing
# we need to make sure that the trace is set by patching
# it in the context switch
if old and new:
if hasattr(new.frame, "f_trace") and not new.frame.f_trace:
sys.call_tracing(new.settrace,(self.trace_func,))
def _stackless_schedule_cb(self, prev, next):
current = stackless.getcurrent()
if not current:
return
current_tf = current.trace_function
try:
current.trace_function = None
self.stepping = STEPPING_NONE
# If the current frame has no trace function, we may need to get it
# from the previous frame, depending on how we ended up in the
# callback.
if current_tf is None:
f_back = current.frame.f_back
if f_back is not None:
current_tf = f_back.f_trace
if next is not None:
# Assign our trace function to the current stack
f = next.frame
if next is current:
f = f.f_back
while f:
if isinstance(f, types.FrameType):
f.f_trace = self.trace_func
f = f.f_back
next.trace_function = self.trace_func
finally:
current.trace_function = current_tf