def handle_exception(self, frame, arg):
if self.stepping == STEPPING_ATTACH_BREAK:
self.block_maybe_attach()
if not DETACHED and should_debug_code(frame.f_code):
break_type = BREAK_ON.should_break(self, *arg)
if break_type:
update_all_thread_stacks(self)
self.block(lambda: report_exception(frame, arg, self.id, break_type))
# forward call to previous trace function, if any, updating the current trace function
# with a new one if available
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
self.prev_trace_func = old_trace_func(frame, 'exception', arg)
return self.trace_func
python类stepping()的实例源码
def async_break(self):
def async_break_send():
with _SendLockCtx:
sent_break_complete = False
global SEND_BREAK_COMPLETE
if SEND_BREAK_COMPLETE == True or SEND_BREAK_COMPLETE == self.id:
# multiple threads could be sending this...
SEND_BREAK_COMPLETE = False
sent_break_complete = True
write_bytes(conn, ASBR)
write_int(conn, self.id)
if sent_break_complete:
# if we have threads which have not broken yet capture their frame list and
# send it now. If they block we'll send an updated (and possibly more accurate - if
# there are any thread locals) list of frames.
update_all_thread_stacks(self)
self.stepping = STEPPING_NONE
self.block(async_break_send)
def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
all_threads = list(THREADS.items())
THREADS_LOCK.release()
for tid, pyThread in all_threads:
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS_LOCK.acquire()
THREADS.clear()
THREADS_LOCK.release()
BREAKPOINTS.clear()
def handle_exception(self, frame, arg):
if self.stepping == STEPPING_ATTACH_BREAK:
self.block_maybe_attach()
if not DETACHED and should_debug_code(frame.f_code):
break_type = BREAK_ON.should_break(self, *arg)
if break_type:
update_all_thread_stacks(self)
self.block(lambda: report_exception(frame, arg, self.id, break_type))
# forward call to previous trace function, if any, updating the current trace function
# with a new one if available
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
self.prev_trace_func = old_trace_func(frame, 'exception', arg)
return self.trace_func
def async_break(self):
def async_break_send():
with _SendLockCtx:
sent_break_complete = False
global SEND_BREAK_COMPLETE
if SEND_BREAK_COMPLETE == True or SEND_BREAK_COMPLETE == self.id:
# multiple threads could be sending this...
SEND_BREAK_COMPLETE = False
sent_break_complete = True
write_bytes(conn, ASBR)
write_int(conn, self.id)
if sent_break_complete:
# if we have threads which have not broken yet capture their frame list and
# send it now. If they block we'll send an updated (and possibly more accurate - if
# there are any thread locals) list of frames.
update_all_thread_stacks(self)
self.stepping = STEPPING_NONE
self.block(async_break_send)
def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
all_threads = list(THREADS.items())
THREADS_LOCK.release()
for tid, pyThread in all_threads:
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS_LOCK.acquire()
THREADS.clear()
THREADS_LOCK.release()
BREAKPOINTS.clear()
def handle_exception(self, frame, arg):
if self.stepping == STEPPING_ATTACH_BREAK:
self.block_maybe_attach()
if not DETACHED and should_debug_code(frame.f_code):
break_type = BREAK_ON.should_break(self, *arg)
if break_type:
update_all_thread_stacks(self)
self.block(lambda: report_exception(frame, arg, self.id, break_type))
# forward call to previous trace function, if any, updating the current trace function
# with a new one if available
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
self.prev_trace_func = old_trace_func(frame, 'exception', arg)
return self.trace_func
def async_break(self):
def async_break_send():
with _SendLockCtx:
sent_break_complete = False
global SEND_BREAK_COMPLETE
if SEND_BREAK_COMPLETE == True or SEND_BREAK_COMPLETE == self.id:
# multiple threads could be sending this...
SEND_BREAK_COMPLETE = False
sent_break_complete = True
write_bytes(conn, ASBR)
write_int(conn, self.id)
if sent_break_complete:
# if we have threads which have not broken yet capture their frame list and
# send it now. If they block we'll send an updated (and possibly more accurate - if
# there are any thread locals) list of frames.
update_all_thread_stacks(self)
self.stepping = STEPPING_NONE
self.block(async_break_send)
def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
all_threads = list(THREADS.items())
THREADS_LOCK.release()
for tid, pyThread in all_threads:
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS_LOCK.acquire()
THREADS.clear()
THREADS_LOCK.release()
BREAKPOINTS.clear()
def handle_exception(self, frame, arg):
if self.stepping == STEPPING_ATTACH_BREAK:
self.block_maybe_attach()
if not DETACHED and should_debug_code(frame.f_code):
break_type = BREAK_ON.ShouldBreak(self, *arg)
if break_type:
update_all_thread_stacks(self)
self.block(lambda: report_exception(frame, arg, self.id, break_type))
# forward call to previous trace function, if any, updating the current trace function
# with a new one if available
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
self.prev_trace_func = old_trace_func(frame, 'exception', arg)
return self.trace_func
def async_break(self):
def async_break_send():
with _SendLockCtx:
sent_break_complete = False
global SEND_BREAK_COMPLETE
if SEND_BREAK_COMPLETE == True or SEND_BREAK_COMPLETE == self.id:
# multiple threads could be sending this...
SEND_BREAK_COMPLETE = False
sent_break_complete = True
write_bytes(conn, ASBR)
write_int(conn, self.id)
if sent_break_complete:
# if we have threads which have not broken yet capture their frame list and
# send it now. If they block we'll send an updated (and possibly more accurate - if
# there are any thread locals) list of frames.
update_all_thread_stacks(self)
self.stepping = STEPPING_NONE
self.block(async_break_send)
def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
for tid, pyThread in THREADS.items():
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS.clear()
BREAKPOINTS.clear()
THREADS_LOCK.release()
def handle_exception(self, frame, arg):
if self.stepping == STEPPING_ATTACH_BREAK:
self.block_maybe_attach()
if not DETACHED and should_debug_code(frame.f_code):
break_type = BREAK_ON.should_break(self, *arg)
if break_type:
update_all_thread_stacks(self)
self.block(lambda: report_exception(frame, arg, self.id, break_type))
# forward call to previous trace function, if any, updating the current trace function
# with a new one if available
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
self.prev_trace_func = old_trace_func(frame, 'exception', arg)
return self.trace_func
def async_break(self):
def async_break_send():
with _SendLockCtx:
sent_break_complete = False
global SEND_BREAK_COMPLETE
if SEND_BREAK_COMPLETE == True or SEND_BREAK_COMPLETE == self.id:
# multiple threads could be sending this...
SEND_BREAK_COMPLETE = False
sent_break_complete = True
write_bytes(conn, ASBR)
write_int(conn, self.id)
if sent_break_complete:
# if we have threads which have not broken yet capture their frame list and
# send it now. If they block we'll send an updated (and possibly more accurate - if
# there are any thread locals) list of frames.
update_all_thread_stacks(self)
self.stepping = STEPPING_NONE
self.block(async_break_send)
def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
all_threads = list(THREADS.items())
THREADS_LOCK.release()
for tid, pyThread in all_threads:
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS_LOCK.acquire()
THREADS.clear()
THREADS_LOCK.release()
BREAKPOINTS.clear()
def handle_exception(self, frame, arg):
if self.stepping == STEPPING_ATTACH_BREAK:
self.block_maybe_attach()
if not DETACHED and should_debug_code(frame.f_code):
break_type = BREAK_ON.should_break(self, *arg)
if break_type:
update_all_thread_stacks(self)
self.block(lambda: report_exception(frame, arg, self.id, break_type))
# forward call to previous trace function, if any, updating the current trace function
# with a new one if available
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
self.prev_trace_func = old_trace_func(frame, 'exception', arg)
return self.trace_func
def async_break(self):
def async_break_send():
with _SendLockCtx:
sent_break_complete = False
global SEND_BREAK_COMPLETE
if SEND_BREAK_COMPLETE == True or SEND_BREAK_COMPLETE == self.id:
# multiple threads could be sending this...
SEND_BREAK_COMPLETE = False
sent_break_complete = True
write_bytes(conn, ASBR)
write_int(conn, self.id)
if sent_break_complete:
# if we have threads which have not broken yet capture their frame list and
# send it now. If they block we'll send an updated (and possibly more accurate - if
# there are any thread locals) list of frames.
update_all_thread_stacks(self)
self.stepping = STEPPING_NONE
self.block(async_break_send)
def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
all_threads = list(THREADS.items())
THREADS_LOCK.release()
for tid, pyThread in all_threads:
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS_LOCK.acquire()
THREADS.clear()
THREADS_LOCK.release()
BREAKPOINTS.clear()
def handle_exception(self, frame, arg):
if self.stepping == STEPPING_ATTACH_BREAK:
self.block_maybe_attach()
if not DETACHED and should_debug_code(frame.f_code):
break_type = BREAK_ON.should_break(self, *arg)
if break_type:
update_all_thread_stacks(self)
self.block(lambda: report_exception(frame, arg, self.id, break_type))
# forward call to previous trace function, if any, updating the current trace function
# with a new one if available
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
self.prev_trace_func = old_trace_func(frame, 'exception', arg)
return self.trace_func
def async_break(self):
def async_break_send():
with _SendLockCtx:
sent_break_complete = False
global SEND_BREAK_COMPLETE
if SEND_BREAK_COMPLETE == True or SEND_BREAK_COMPLETE == self.id:
# multiple threads could be sending this...
SEND_BREAK_COMPLETE = False
sent_break_complete = True
write_bytes(conn, ASBR)
write_int(conn, self.id)
if sent_break_complete:
# if we have threads which have not broken yet capture their frame list and
# send it now. If they block we'll send an updated (and possibly more accurate - if
# there are any thread locals) list of frames.
update_all_thread_stacks(self)
self.stepping = STEPPING_NONE
self.block(async_break_send)
def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
all_threads = list(THREADS.items())
THREADS_LOCK.release()
for tid, pyThread in all_threads:
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS_LOCK.acquire()
THREADS.clear()
THREADS_LOCK.release()
BREAKPOINTS.clear()
def handle_exception(self, frame, arg):
if self.stepping == STEPPING_ATTACH_BREAK:
self.block_maybe_attach()
if not DETACHED and should_debug_code(frame.f_code):
break_type = BREAK_ON.should_break(self, *arg)
if break_type:
update_all_thread_stacks(self)
self.block(lambda: report_exception(frame, arg, self.id, break_type))
# forward call to previous trace function, if any, updating the current trace function
# with a new one if available
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
self.prev_trace_func = old_trace_func(frame, 'exception', arg)
return self.trace_func
def async_break(self):
def async_break_send():
with _SendLockCtx:
sent_break_complete = False
global SEND_BREAK_COMPLETE
if SEND_BREAK_COMPLETE == True or SEND_BREAK_COMPLETE == self.id:
# multiple threads could be sending this...
SEND_BREAK_COMPLETE = False
sent_break_complete = True
write_bytes(conn, ASBR)
write_int(conn, self.id)
if sent_break_complete:
# if we have threads which have not broken yet capture their frame list and
# send it now. If they block we'll send an updated (and possibly more accurate - if
# there are any thread locals) list of frames.
update_all_thread_stacks(self)
self.stepping = STEPPING_NONE
self.block(async_break_send)
def command_auto_resume(self):
tid = read_int(self.conn)
THREADS_LOCK.acquire()
thread = THREADS[tid]
THREADS_LOCK.release()
stepping = thread.stepping
if ((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and thread.cur_frame.f_lineno != thread.stopped_on_line):
report_step_finished(tid)
else:
self.command_resume_all()
def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
all_threads = list(THREADS.items())
THREADS_LOCK.release()
for tid, pyThread in all_threads:
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS_LOCK.acquire()
THREADS.clear()
THREADS_LOCK.release()
BREAKPOINTS.clear()
def __len__(self):
return self.len_value
# Specifies list of files not to debug. Can be extended by other modules
# (the REPL does this for $attach support and not stepping into the REPL).
def __init__(self, id = None):
if id is not None:
self.id = id
else:
self.id = thread.get_ident()
self._events = {'call' : self.handle_call,
'line' : self.handle_line,
'return' : self.handle_return,
'exception' : self.handle_exception,
'c_call' : self.handle_c_call,
'c_return' : self.handle_c_return,
'c_exception' : self.handle_c_exception,
}
self.cur_frame = None
self.stepping = STEPPING_NONE
self.unblock_work = None
self._block_lock = thread.allocate_lock()
self._block_lock.acquire()
self._block_starting_lock = thread.allocate_lock()
self._is_blocked = False
self._is_working = False
self.stopped_on_line = None
self.detach = False
self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
self.prev_trace_func = None
self.trace_func_stack = []
self.reported_process_loaded = False
self.django_stepping = None
self.is_sending = False
# stackless changes
if stackless is not None:
self._stackless_attach()
if sys.platform == 'cli':
self.frames = []
def context_dispatcher(self, old, new):
self.stepping = STEPPING_NONE
# for those tasklets that started before we started tracing
# we need to make sure that the trace is set by patching
# it in the context switch
if old and new:
if hasattr(new.frame, "f_trace") and not new.frame.f_trace:
sys.call_tracing(new.settrace,(self.trace_func,))
def _stackless_schedule_cb(self, prev, next):
current = stackless.getcurrent()
if not current:
return
current_tf = current.trace_function
try:
current.trace_function = None
self.stepping = STEPPING_NONE
# If the current frame has no trace function, we may need to get it
# from the previous frame, depending on how we ended up in the
# callback.
if current_tf is None:
f_back = current.frame.f_back
if f_back is not None:
current_tf = f_back.f_trace
if next is not None:
# Assign our trace function to the current stack
f = next.frame
if next is current:
f = f.f_back
while f:
if isinstance(f, types.FrameType):
f.f_trace = self.trace_func
f = f.f_back
next.trace_function = self.trace_func
finally:
current.trace_function = current_tf
def trace_func(self, frame, event, arg):
# If we're so far into process shutdown that sys is already gone, just stop tracing.
if sys is None:
return None
elif self.is_sending:
# https://pytools.codeplex.com/workitem/1864
# we're currently doing I/O w/ the socket, we don't want to deliver
# any breakpoints or async breaks because we'll deadlock. Continue
# to return the trace function so all of our frames remain
# balanced. A better way to deal with this might be to do
# sys.settrace(None) when we take the send lock, but that's much
# more difficult because our send context manager is used both
# inside and outside of the trace function, and so is used when
# tracing is enabled and disabled, and so it's very easy to get our
# current frame tracking to be thrown off...
return self.trace_func
try:
# if should_debug_code(frame.f_code) is not true during attach
# the current frame is None and a pop_frame will cause an exception and
# break the debugger
if self.cur_frame is None:
# happens during attach, we need frame for blocking
self.push_frame(frame)
if self.stepping == STEPPING_BREAK and should_debug_code(frame.f_code):
if self.detach:
if stackless is not None:
stackless.set_schedule_callback(None)
stackless.tasklet.__call__ = self.__oldstacklesscall__
sys.settrace(None)
return None
self.async_break()
return self._events[event](frame, arg)
except (StackOverflowException, KeyboardInterrupt):
# stack overflow, disable tracing
return self.trace_func