def command_set_lineno(self):
tid = read_int(self.conn)
fid = read_int(self.conn)
lineno = read_int(self.conn)
try:
THREADS_LOCK.acquire()
THREADS[tid].cur_frame.f_lineno = lineno
newline = THREADS[tid].cur_frame.f_lineno
THREADS_LOCK.release()
with _SendLockCtx:
write_bytes(self.conn, SETL)
write_int(self.conn, 1)
write_int(self.conn, tid)
write_int(self.conn, newline)
except:
with _SendLockCtx:
write_bytes(self.conn, SETL)
write_int(self.conn, 0)
write_int(self.conn, tid)
write_int(self.conn, 0)
python类cur_frame()的实例源码
def command_set_lineno(self):
tid = read_int(self.conn)
fid = read_int(self.conn)
lineno = read_int(self.conn)
try:
THREADS_LOCK.acquire()
THREADS[tid].cur_frame.f_lineno = lineno
newline = THREADS[tid].cur_frame.f_lineno
THREADS_LOCK.release()
with _SendLockCtx:
write_bytes(self.conn, SETL)
write_int(self.conn, 1)
write_int(self.conn, tid)
write_int(self.conn, newline)
except:
with _SendLockCtx:
write_bytes(self.conn, SETL)
write_int(self.conn, 0)
write_int(self.conn, tid)
write_int(self.conn, 0)
def command_set_lineno(self):
tid = read_int(self.conn)
fid = read_int(self.conn)
lineno = read_int(self.conn)
try:
THREADS_LOCK.acquire()
THREADS[tid].cur_frame.f_lineno = lineno
newline = THREADS[tid].cur_frame.f_lineno
THREADS_LOCK.release()
with _SendLockCtx:
write_bytes(self.conn, SETL)
write_int(self.conn, 1)
write_int(self.conn, tid)
write_int(self.conn, newline)
except:
with _SendLockCtx:
write_bytes(self.conn, SETL)
write_int(self.conn, 0)
write_int(self.conn, tid)
write_int(self.conn, 0)
def command_set_lineno(self):
tid = read_int(self.conn)
fid = read_int(self.conn)
lineno = read_int(self.conn)
try:
THREADS_LOCK.acquire()
THREADS[tid].cur_frame.f_lineno = lineno
newline = THREADS[tid].cur_frame.f_lineno
THREADS_LOCK.release()
with _SendLockCtx:
write_bytes(self.conn, SETL)
write_int(self.conn, 1)
write_int(self.conn, tid)
write_int(self.conn, newline)
except:
with _SendLockCtx:
write_bytes(self.conn, SETL)
write_int(self.conn, 0)
write_int(self.conn, tid)
write_int(self.conn, 0)
def command_set_lineno(self):
tid = read_int(self.conn)
fid = read_int(self.conn)
lineno = read_int(self.conn)
try:
THREADS_LOCK.acquire()
THREADS[tid].cur_frame.f_lineno = lineno
newline = THREADS[tid].cur_frame.f_lineno
THREADS_LOCK.release()
with _SendLockCtx:
write_bytes(self.conn, SETL)
write_int(self.conn, 1)
write_int(self.conn, tid)
write_int(self.conn, newline)
except:
with _SendLockCtx:
write_bytes(self.conn, SETL)
write_int(self.conn, 0)
write_int(self.conn, tid)
write_int(self.conn, 0)
def command_set_lineno(self):
tid = read_int(self.conn)
fid = read_int(self.conn)
lineno = read_int(self.conn)
try:
THREADS_LOCK.acquire()
THREADS[tid].cur_frame.f_lineno = lineno
newline = THREADS[tid].cur_frame.f_lineno
THREADS_LOCK.release()
with _SendLockCtx:
write_bytes(self.conn, SETL)
write_int(self.conn, 1)
write_int(self.conn, tid)
write_int(self.conn, newline)
except:
with _SendLockCtx:
write_bytes(self.conn, SETL)
write_int(self.conn, 0)
write_int(self.conn, tid)
write_int(self.conn, 0)
def command_auto_resume(self):
tid = read_int(self.conn)
THREADS_LOCK.acquire()
thread = THREADS[tid]
THREADS_LOCK.release()
stepping = thread.stepping
if ((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and thread.cur_frame.f_lineno != thread.stopped_on_line):
report_step_finished(tid)
else:
self.command_resume_all()
def command_set_lineno(self):
tid = read_int(self.conn)
fid = read_int(self.conn)
lineno = read_int(self.conn)
try:
THREADS_LOCK.acquire()
THREADS[tid].cur_frame.f_lineno = lineno
newline = THREADS[tid].cur_frame.f_lineno
THREADS_LOCK.release()
with _SendLockCtx:
write_bytes(self.conn, SETL)
write_int(self.conn, 1)
write_int(self.conn, tid)
write_int(self.conn, newline)
except:
with _SendLockCtx:
write_bytes(self.conn, SETL)
write_int(self.conn, 0)
write_int(self.conn, tid)
write_int(self.conn, 0)
def __init__(self, id = None):
if id is not None:
self.id = id
else:
self.id = thread.get_ident()
self._events = {'call' : self.handle_call,
'line' : self.handle_line,
'return' : self.handle_return,
'exception' : self.handle_exception,
'c_call' : self.handle_c_call,
'c_return' : self.handle_c_return,
'c_exception' : self.handle_c_exception,
}
self.cur_frame = None
self.stepping = STEPPING_NONE
self.unblock_work = None
self._block_lock = thread.allocate_lock()
self._block_lock.acquire()
self._block_starting_lock = thread.allocate_lock()
self._is_blocked = False
self._is_working = False
self.stopped_on_line = None
self.detach = False
self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
self.prev_trace_func = None
self.trace_func_stack = []
self.reported_process_loaded = False
self.django_stepping = None
self.is_sending = False
# stackless changes
if stackless is not None:
self._stackless_attach()
if sys.platform == 'cli':
self.frames = []
def push_frame(self, frame):
self.cur_frame = frame
self.frames.append(frame)
def pop_frame(self):
self.frames.pop()
self.cur_frame = self.frames[-1]
def push_frame(self, frame):
self.cur_frame = frame
def push_frame(self, frame):
self.cur_frame = frame
self.frames.append(frame)
def pop_frame(self):
self.frames.pop()
self.cur_frame = self.frames[-1]
def push_frame(self, frame):
self.cur_frame = frame
def pop_frame(self):
self.cur_frame = self.cur_frame.f_back
def trace_func(self, frame, event, arg):
# If we're so far into process shutdown that sys is already gone, just stop tracing.
if sys is None:
return None
elif self.is_sending:
# https://pytools.codeplex.com/workitem/1864
# we're currently doing I/O w/ the socket, we don't want to deliver
# any breakpoints or async breaks because we'll deadlock. Continue
# to return the trace function so all of our frames remain
# balanced. A better way to deal with this might be to do
# sys.settrace(None) when we take the send lock, but that's much
# more difficult because our send context manager is used both
# inside and outside of the trace function, and so is used when
# tracing is enabled and disabled, and so it's very easy to get our
# current frame tracking to be thrown off...
return self.trace_func
try:
# if should_debug_code(frame.f_code) is not true during attach
# the current frame is None and a pop_frame will cause an exception and
# break the debugger
if self.cur_frame is None:
# happens during attach, we need frame for blocking
self.push_frame(frame)
if self.stepping == STEPPING_BREAK and should_debug_code(frame.f_code):
if self.detach:
if stackless is not None:
stackless.set_schedule_callback(None)
stackless.tasklet.__call__ = self.__oldstacklesscall__
sys.settrace(None)
return None
self.async_break()
return self._events[event](frame, arg)
except (StackOverflowException, KeyboardInterrupt):
# stack overflow, disable tracing
return self.trace_func
def block(self, block_lambda, keep_stopped_on_line = False):
"""blocks the current thread until the debugger resumes it"""
assert not self._is_blocked
#assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident()) # we should only ever block ourselves
# send thread frames before we block
self.enum_thread_frames_locally()
if not keep_stopped_on_line:
self.stopped_on_line = self.cur_frame.f_lineno
# need to synchronize w/ sending the reason we're blocking
self._block_starting_lock.acquire()
self._is_blocked = True
block_lambda()
self._block_starting_lock.release()
while not DETACHED:
self._block_lock.acquire()
if self.unblock_work is None:
break
# the debugger wants us to do something, do it, and then block again
self._is_working = True
self.unblock_work()
self.unblock_work = None
self._is_working = False
self._block_starting_lock.acquire()
assert self._is_blocked
self._is_blocked = False
self._block_starting_lock.release()
def run_on_thread(self, text, cur_frame, execution_id, frame_kind, repr_kind = PYTHON_EVALUATION_RESULT_REPR_KIND_NORMAL):
self._block_starting_lock.acquire()
if not self._is_blocked:
report_execution_error('<expression cannot be evaluated at this time>', execution_id)
elif not self._is_working:
self.schedule_work(lambda : self.run_locally(text, cur_frame, execution_id, frame_kind, repr_kind))
else:
report_execution_error('<error: previous evaluation has not completed>', execution_id)
self._block_starting_lock.release()
def run_on_thread_no_report(self, text, cur_frame, frame_kind):
self._block_starting_lock.acquire()
if not self._is_blocked:
pass
elif not self._is_working:
self.schedule_work(lambda : self.run_locally_no_report(text, cur_frame, frame_kind))
else:
pass
self._block_starting_lock.release()
def enum_child_on_thread(self, text, cur_frame, execution_id, frame_kind):
self._block_starting_lock.acquire()
if not self._is_working and self._is_blocked:
self.schedule_work(lambda : self.enum_child_locally(text, cur_frame, execution_id, frame_kind))
self._block_starting_lock.release()
else:
self._block_starting_lock.release()
report_children(execution_id, [])
def get_locals(self, cur_frame, frame_kind):
if frame_kind == FRAME_KIND_DJANGO:
locs = {}
# iterate going forward, so later items replace earlier items
for d in cur_frame.f_locals['context'].dicts:
# hasattr check to defend against someone passing a bad dictionary value
# and us breaking the app.
if hasattr(d, 'keys') and d != DJANGO_BUILTINS:
for key in d.keys():
locs[key] = d[key]
else:
locs = cur_frame.f_locals
return locs
def run_locally(self, text, cur_frame, execution_id, frame_kind, repr_kind = PYTHON_EVALUATION_RESULT_REPR_KIND_NORMAL):
try:
code = self.compile(text, cur_frame)
res = eval(code, cur_frame.f_globals, self.get_locals(cur_frame, frame_kind))
self.locals_to_fast(cur_frame)
# Report any updated variable values first
self.enum_thread_frames_locally()
report_execution_result(execution_id, res, repr_kind)
except:
# Report any updated variable values first
self.enum_thread_frames_locally()
report_execution_exception(execution_id, sys.exc_info())
def run_locally_no_report(self, text, cur_frame, frame_kind):
code = self.compile(text, cur_frame)
res = eval(code, cur_frame.f_globals, self.get_locals(cur_frame, frame_kind))
self.locals_to_fast(cur_frame)
sys.displayhook(res)
def command_step_over(self):
# set step over
tid = read_int(self.conn)
thread = get_thread_from_id(tid)
if thread is not None:
assert thread._is_blocked
if DJANGO_DEBUG:
source_obj = get_django_frame_source(thread.cur_frame)
if source_obj is not None:
thread.django_stepping = True
self.command_resume_all()
return
thread.stepping = STEPPING_OVER
self.command_resume_all()
def command_auto_resume(self):
tid = read_int(self.conn)
THREADS_LOCK.acquire()
thread = THREADS[tid]
THREADS_LOCK.release()
stepping = thread.stepping
if ((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and thread.cur_frame.f_lineno != thread.stopped_on_line):
report_step_finished(tid)
else:
self.command_resume_all()
def execute_code_no_report(self, text, tid, fid, frame_kind):
# execute given text in specified frame, without sending back the results
thread, cur_frame = self.get_thread_and_frame(tid, fid, frame_kind)
if thread is not None and cur_frame is not None:
thread.run_locally_no_report(text, cur_frame, frame_kind)
def command_enum_children(self):
# execute given text in specified frame
text = read_string(self.conn)
tid = read_int(self.conn) # thread id
fid = read_int(self.conn) # frame id
eid = read_int(self.conn) # execution id
frame_kind = read_int(self.conn) # frame kind
thread, cur_frame = self.get_thread_and_frame(tid, fid, frame_kind)
if thread is not None and cur_frame is not None:
thread.enum_child_on_thread(text, cur_frame, eid, frame_kind)
def get_thread_and_frame(self, tid, fid, frame_kind):
thread = get_thread_from_id(tid)
cur_frame = None
if thread is not None:
cur_frame = thread.cur_frame
for i in xrange(fid):
cur_frame = cur_frame.f_back
return thread, cur_frame
def __init__(self, id = None):
if id is not None:
self.id = id
else:
self.id = thread.get_ident()
self._events = {'call' : self.handle_call,
'line' : self.handle_line,
'return' : self.handle_return,
'exception' : self.handle_exception,
'c_call' : self.handle_c_call,
'c_return' : self.handle_c_return,
'c_exception' : self.handle_c_exception,
}
self.cur_frame = None
self.stepping = STEPPING_NONE
self.unblock_work = None
self._block_lock = thread.allocate_lock()
self._block_lock.acquire()
self._block_starting_lock = thread.allocate_lock()
self._is_blocked = False
self._is_working = False
self.stopped_on_line = None
self.detach = False
self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
self.prev_trace_func = None
self.trace_func_stack = []
self.reported_process_loaded = False
self.django_stepping = None
self.is_sending = False
# stackless changes
if stackless is not None:
self._stackless_attach()
if sys.platform == 'cli':
self.frames = []