def pop_frame(self):
self.frames.pop()
self.cur_frame = self.frames[-1]
python类cur_frame()的实例源码
def push_frame(self, frame):
self.cur_frame = frame
def pop_frame(self):
self.cur_frame = self.cur_frame.f_back
def push_frame(self, frame):
self.cur_frame = frame
self.frames.append(frame)
def pop_frame(self):
self.frames.pop()
self.cur_frame = self.frames[-1]
def pop_frame(self):
self.cur_frame = self.cur_frame.f_back
def trace_func(self, frame, event, arg):
# If we're so far into process shutdown that sys is already gone, just stop tracing.
if sys is None:
return None
elif self.is_sending:
# https://pytools.codeplex.com/workitem/1864
# we're currently doing I/O w/ the socket, we don't want to deliver
# any breakpoints or async breaks because we'll deadlock. Continue
# to return the trace function so all of our frames remain
# balanced. A better way to deal with this might be to do
# sys.settrace(None) when we take the send lock, but that's much
# more difficult because our send context manager is used both
# inside and outside of the trace function, and so is used when
# tracing is enabled and disabled, and so it's very easy to get our
# current frame tracking to be thrown off...
return self.trace_func
try:
# if should_debug_code(frame.f_code) is not true during attach
# the current frame is None and a pop_frame will cause an exception and
# break the debugger
if self.cur_frame is None:
# happens during attach, we need frame for blocking
self.push_frame(frame)
if self.stepping == STEPPING_BREAK and should_debug_code(frame.f_code):
if self.detach:
if stackless is not None:
stackless.set_schedule_callback(None)
stackless.tasklet.__call__ = self.__oldstacklesscall__
sys.settrace(None)
return None
self.async_break()
return self._events[event](frame, arg)
except (StackOverflowException, KeyboardInterrupt):
# stack overflow, disable tracing
return self.trace_func
def handle_return(self, frame, arg):
self.pop_frame()
if not DETACHED:
stepping = self.stepping
# only update stepping state when this frame is debuggable (matching handle_call)
if stepping is not STEPPING_NONE and should_debug_code(frame.f_code):
if stepping > STEPPING_OVER:
self.stepping -= 1
elif stepping < STEPPING_OUT:
self.stepping += 1
elif stepping in USER_STEPPING:
if self.cur_frame is None or frame.f_code.co_name == "<module>" :
# only return to user code modules
if self.should_block_on_frame(frame):
# restore back the module frame for the step out of a module
self.push_frame(ModuleExitFrame(frame))
self.stepping = STEPPING_NONE
update_all_thread_stacks(self)
self.block(lambda: (report_step_finished(self.id), mark_all_threads_for_break(skip_thread = self)))
self.pop_frame()
elif self.should_block_on_frame(self.cur_frame):
# if we're returning into non-user code then don't block in the
# non-user code, wait until we hit user code again
self.stepping = STEPPING_NONE
update_all_thread_stacks(self)
self.block(lambda: (report_step_finished(self.id), mark_all_threads_for_break(skip_thread = self)))
# forward call to previous trace function, if any
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
old_trace_func(frame, 'return', arg)
# restore previous frames trace function if there is one
if self.trace_func_stack:
self.prev_trace_func = self.trace_func_stack.pop()
def block(self, block_lambda, keep_stopped_on_line = False):
"""blocks the current thread until the debugger resumes it"""
assert not self._is_blocked
#assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident()) # we should only ever block ourselves
# send thread frames before we block
self.enum_thread_frames_locally()
if not keep_stopped_on_line:
self.stopped_on_line = self.cur_frame.f_lineno
# need to synchronize w/ sending the reason we're blocking
self._block_starting_lock.acquire()
self._is_blocked = True
block_lambda()
self._block_starting_lock.release()
while not DETACHED:
self._block_lock.acquire()
if self.unblock_work is None:
break
# the debugger wants us to do something, do it, and then block again
self._is_working = True
self.unblock_work()
self.unblock_work = None
self._is_working = False
self._block_starting_lock.acquire()
assert self._is_blocked
self._is_blocked = False
self._block_starting_lock.release()
def run_on_thread(self, text, cur_frame, execution_id, frame_kind, repr_kind = PYTHON_EVALUATION_RESULT_REPR_KIND_NORMAL):
self._block_starting_lock.acquire()
if not self._is_blocked:
report_execution_error('<expression cannot be evaluated at this time>', execution_id)
elif not self._is_working:
self.schedule_work(lambda : self.run_locally(text, cur_frame, execution_id, frame_kind, repr_kind))
else:
report_execution_error('<error: previous evaluation has not completed>', execution_id)
self._block_starting_lock.release()
def enum_child_on_thread(self, text, cur_frame, execution_id, frame_kind):
self._block_starting_lock.acquire()
if not self._is_working and self._is_blocked:
self.schedule_work(lambda : self.enum_child_locally(text, cur_frame, execution_id, frame_kind))
self._block_starting_lock.release()
else:
self._block_starting_lock.release()
report_children(execution_id, [])
def get_locals(self, cur_frame, frame_kind):
if frame_kind == FRAME_KIND_DJANGO:
locs = {}
# iterate going forward, so later items replace earlier items
for d in cur_frame.f_locals['context'].dicts:
# hasattr check to defend against someone passing a bad dictionary value
# and us breaking the app.
if hasattr(d, 'keys') and d != DJANGO_BUILTINS:
for key in d.keys():
locs[key] = d[key]
else:
locs = cur_frame.f_locals
return locs
def compile(self, text, cur_frame):
try:
code = compile(text, '<debug input>', 'eval')
except:
code = compile(text, '<debug input>', 'exec')
return code
def run_locally(self, text, cur_frame, execution_id, frame_kind, repr_kind = PYTHON_EVALUATION_RESULT_REPR_KIND_NORMAL):
try:
code = self.compile(text, cur_frame)
res = eval(code, cur_frame.f_globals, self.get_locals(cur_frame, frame_kind))
self.locals_to_fast(cur_frame)
# Report any updated variable values first
self.enum_thread_frames_locally()
report_execution_result(execution_id, res, repr_kind)
except:
# Report any updated variable values first
self.enum_thread_frames_locally()
report_execution_exception(execution_id, sys.exc_info())
def run_locally_no_report(self, text, cur_frame, frame_kind):
code = self.compile(text, cur_frame)
res = eval(code, cur_frame.f_globals, self.get_locals(cur_frame, frame_kind))
self.locals_to_fast(cur_frame)
sys.displayhook(res)
def command_auto_resume(self):
tid = read_int(self.conn)
THREADS_LOCK.acquire()
thread = THREADS[tid]
THREADS_LOCK.release()
stepping = thread.stepping
if ((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and thread.cur_frame.f_lineno != thread.stopped_on_line):
report_step_finished(tid)
else:
self.command_resume_all()
def command_execute_code(self):
# execute given text in specified frame
text = read_string(self.conn)
tid = read_int(self.conn) # thread id
fid = read_int(self.conn) # frame id
eid = read_int(self.conn) # execution id
frame_kind = read_int(self.conn)
repr_kind = read_int(self.conn)
thread, cur_frame = self.get_thread_and_frame(tid, fid, frame_kind)
if thread is not None and cur_frame is not None:
thread.run_on_thread(text, cur_frame, eid, frame_kind, repr_kind)
def execute_code_no_report(self, text, tid, fid, frame_kind):
# execute given text in specified frame, without sending back the results
thread, cur_frame = self.get_thread_and_frame(tid, fid, frame_kind)
if thread is not None and cur_frame is not None:
thread.run_locally_no_report(text, cur_frame, frame_kind)
def command_enum_children(self):
# execute given text in specified frame
text = read_string(self.conn)
tid = read_int(self.conn) # thread id
fid = read_int(self.conn) # frame id
eid = read_int(self.conn) # execution id
frame_kind = read_int(self.conn) # frame kind
thread, cur_frame = self.get_thread_and_frame(tid, fid, frame_kind)
if thread is not None and cur_frame is not None:
thread.enum_child_on_thread(text, cur_frame, eid, frame_kind)
def __init__(self, id = None):
if id is not None:
self.id = id
else:
self.id = thread.get_ident()
self._events = {'call' : self.handle_call,
'line' : self.handle_line,
'return' : self.handle_return,
'exception' : self.handle_exception,
'c_call' : self.handle_c_call,
'c_return' : self.handle_c_return,
'c_exception' : self.handle_c_exception,
}
self.cur_frame = None
self.stepping = STEPPING_NONE
self.unblock_work = None
self._block_lock = thread.allocate_lock()
self._block_lock.acquire()
self._block_starting_lock = thread.allocate_lock()
self._is_blocked = False
self._is_working = False
self.stopped_on_line = None
self.detach = False
self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
self.prev_trace_func = None
self.trace_func_stack = []
self.reported_process_loaded = False
self.django_stepping = None
self.is_sending = False
# stackless changes
if stackless is not None:
self._stackless_attach()
if sys.platform == 'cli':
self.frames = []