def block_maybe_attach(self):
will_block_now = True
if self.stepping == STEPPING_ATTACH_BREAK:
# only one thread should send the attach break in
attach_lock.acquire()
global attach_sent_break
if attach_sent_break:
will_block_now = False
attach_sent_break = True
attach_lock.release()
probe_stack()
stepping = self.stepping
self.stepping = STEPPING_NONE
def block_cond():
if will_block_now:
if stepping == STEPPING_OVER or stepping == STEPPING_INTO:
report_step_finished(self.id)
return mark_all_threads_for_break(skip_thread = self)
else:
if not DETACHED:
if stepping == STEPPING_ATTACH_BREAK:
self.reported_process_loaded = True
return report_process_loaded(self.id)
update_all_thread_stacks(self)
self.block(block_cond)
python类stepping()的实例源码
def mark_all_threads_for_break(stepping = STEPPING_BREAK, skip_thread = None):
THREADS_LOCK.acquire()
for thread in THREADS.values():
if thread is skip_thread:
continue
thread.stepping = stepping
THREADS_LOCK.release()
def command_step_into(self):
tid = read_int(self.conn)
thread = get_thread_from_id(tid)
if thread is not None:
assert thread._is_blocked
thread.stepping = STEPPING_INTO
self.command_resume_all()
def command_step_over(self):
# set step over
tid = read_int(self.conn)
thread = get_thread_from_id(tid)
if thread is not None:
assert thread._is_blocked
if DJANGO_DEBUG:
source_obj = get_django_frame_source(thread.cur_frame)
if source_obj is not None:
thread.django_stepping = True
self.command_resume_all()
return
thread.stepping = STEPPING_OVER
self.command_resume_all()
def command_resume_all(self):
# resume all
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for thread in all_threads:
thread._block_starting_lock.acquire()
if thread.stepping == STEPPING_BREAK or thread.stepping == STEPPING_ATTACH_BREAK:
thread.stepping = STEPPING_NONE
if thread._is_blocked:
thread.unblock()
thread._block_starting_lock.release()
def command_auto_resume(self):
tid = read_int(self.conn)
THREADS_LOCK.acquire()
thread = THREADS[tid]
THREADS_LOCK.release()
stepping = thread.stepping
if ((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and thread.cur_frame.f_lineno != thread.stopped_on_line):
report_step_finished(tid)
else:
self.command_resume_all()
def command_clear_stepping(self):
tid = read_int(self.conn)
thread = get_thread_from_id(tid)
if thread is not None:
thread.stepping = STEPPING_NONE
def new_external_thread():
thread = new_thread()
if not attach_sent_break:
# we are still doing the attach, make this thread break.
thread.stepping = STEPPING_ATTACH_BREAK
elif SEND_BREAK_COMPLETE:
# user requested break all, make this thread break
thread.stepping = STEPPING_BREAK
sys.settrace(thread.trace_func)
def __len__(self):
return self.len_value
# Specifies list of files not to debug. Can be extended by other modules
# (the REPL does this for $attach support and not stepping into the REPL).
def __init__(self, id = None):
if id is not None:
self.id = id
else:
self.id = thread.get_ident()
self._events = {'call' : self.handle_call,
'line' : self.handle_line,
'return' : self.handle_return,
'exception' : self.handle_exception,
'c_call' : self.handle_c_call,
'c_return' : self.handle_c_return,
'c_exception' : self.handle_c_exception,
}
self.cur_frame = None
self.stepping = STEPPING_NONE
self.unblock_work = None
self._block_lock = thread.allocate_lock()
self._block_lock.acquire()
self._block_starting_lock = thread.allocate_lock()
self._is_blocked = False
self._is_working = False
self.stopped_on_line = None
self.detach = False
self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
self.prev_trace_func = None
self.trace_func_stack = []
self.reported_process_loaded = False
self.django_stepping = None
self.is_sending = False
# stackless changes
if stackless is not None:
self._stackless_attach()
if sys.platform == 'cli':
self.frames = []
def context_dispatcher(self, old, new):
self.stepping = STEPPING_NONE
# for those tasklets that started before we started tracing
# we need to make sure that the trace is set by patching
# it in the context switch
if old and new:
if hasattr(new.frame, "f_trace") and not new.frame.f_trace:
sys.call_tracing(new.settrace,(self.trace_func,))
def trace_func(self, frame, event, arg):
# If we're so far into process shutdown that sys is already gone, just stop tracing.
if sys is None:
return None
elif self.is_sending:
# https://pytools.codeplex.com/workitem/1864
# we're currently doing I/O w/ the socket, we don't want to deliver
# any breakpoints or async breaks because we'll deadlock. Continue
# to return the trace function so all of our frames remain
# balanced. A better way to deal with this might be to do
# sys.settrace(None) when we take the send lock, but that's much
# more difficult because our send context manager is used both
# inside and outside of the trace function, and so is used when
# tracing is enabled and disabled, and so it's very easy to get our
# current frame tracking to be thrown off...
return self.trace_func
try:
# if should_debug_code(frame.f_code) is not true during attach
# the current frame is None and a pop_frame will cause an exception and
# break the debugger
if self.cur_frame is None:
# happens during attach, we need frame for blocking
self.push_frame(frame)
if self.stepping == STEPPING_BREAK and should_debug_code(frame.f_code):
if self.detach:
if stackless is not None:
stackless.set_schedule_callback(None)
stackless.tasklet.__call__ = self.__oldstacklesscall__
sys.settrace(None)
return None
self.async_break()
return self._events[event](frame, arg)
except (StackOverflowException, KeyboardInterrupt):
# stack overflow, disable tracing
return self.trace_func
def handle_return(self, frame, arg):
self.pop_frame()
if not DETACHED:
stepping = self.stepping
# only update stepping state when this frame is debuggable (matching handle_call)
if stepping is not STEPPING_NONE and should_debug_code(frame.f_code):
if stepping > STEPPING_OVER:
self.stepping -= 1
elif stepping < STEPPING_OUT:
self.stepping += 1
elif stepping in USER_STEPPING:
if self.cur_frame is None or frame.f_code.co_name == "<module>" :
# only return to user code modules
if self.should_block_on_frame(frame):
# restore back the module frame for the step out of a module
self.push_frame(ModuleExitFrame(frame))
self.stepping = STEPPING_NONE
update_all_thread_stacks(self)
self.block(lambda: (report_step_finished(self.id), mark_all_threads_for_break(skip_thread = self)))
self.pop_frame()
elif self.should_block_on_frame(self.cur_frame):
# if we're returning into non-user code then don't block in the
# non-user code, wait until we hit user code again
self.stepping = STEPPING_NONE
update_all_thread_stacks(self)
self.block(lambda: (report_step_finished(self.id), mark_all_threads_for_break(skip_thread = self)))
# forward call to previous trace function, if any
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
old_trace_func(frame, 'return', arg)
# restore previous frames trace function if there is one
if self.trace_func_stack:
self.prev_trace_func = self.trace_func_stack.pop()
def block_maybe_attach(self):
will_block_now = True
if self.stepping == STEPPING_ATTACH_BREAK:
# only one thread should send the attach break in
attach_lock.acquire()
global attach_sent_break
if attach_sent_break:
will_block_now = False
attach_sent_break = True
attach_lock.release()
probe_stack()
stepping = self.stepping
self.stepping = STEPPING_NONE
def block_cond():
if will_block_now:
if stepping == STEPPING_OVER or stepping == STEPPING_INTO:
report_step_finished(self.id)
return mark_all_threads_for_break(skip_thread = self)
else:
if not DETACHED:
if stepping == STEPPING_ATTACH_BREAK:
self.reported_process_loaded = True
return report_process_loaded(self.id)
update_all_thread_stacks(self)
self.block(block_cond)
def command_step_into(self):
tid = read_int(self.conn)
thread = get_thread_from_id(tid)
if thread is not None:
assert thread._is_blocked
thread.stepping = STEPPING_INTO
self.command_resume_all()
def command_step_out(self):
tid = read_int(self.conn)
thread = get_thread_from_id(tid)
if thread is not None:
assert thread._is_blocked
thread.stepping = STEPPING_OUT
self.command_resume_all()
def command_step_over(self):
# set step over
tid = read_int(self.conn)
thread = get_thread_from_id(tid)
if thread is not None:
assert thread._is_blocked
if DJANGO_DEBUG:
source_obj = get_django_frame_source(thread.cur_frame)
if source_obj is not None:
thread.django_stepping = True
self.command_resume_all()
return
thread.stepping = STEPPING_OVER
self.command_resume_all()
def command_resume_all(self):
# resume all
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for thread in all_threads:
thread._block_starting_lock.acquire()
if thread.stepping == STEPPING_BREAK or thread.stepping == STEPPING_ATTACH_BREAK:
thread.stepping = STEPPING_NONE
if thread._is_blocked:
thread.unblock()
thread._block_starting_lock.release()
def command_auto_resume(self):
tid = read_int(self.conn)
THREADS_LOCK.acquire()
thread = THREADS[tid]
THREADS_LOCK.release()
stepping = thread.stepping
if ((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and thread.cur_frame.f_lineno != thread.stopped_on_line):
report_step_finished(tid)
else:
self.command_resume_all()
def new_thread(tid = None, set_break = False, frame = None):
# called during attach w/ a thread ID provided.
if tid == debugger_thread_id:
return None
cur_thread = Thread(tid)
THREADS_LOCK.acquire()
THREADS[cur_thread.id] = cur_thread
THREADS_LOCK.release()
cur_thread.push_frame(frame)
if set_break:
cur_thread.stepping = STEPPING_ATTACH_BREAK
if not DETACHED:
report_new_thread(cur_thread)
return cur_thread