def _stackless_schedule_cb(self, prev, next):
current = stackless.getcurrent()
if not current:
return
current_tf = current.trace_function
try:
current.trace_function = None
self.stepping = STEPPING_NONE
# If the current frame has no trace function, we may need to get it
# from the previous frame, depending on how we ended up in the
# callback.
if current_tf is None:
f_back = current.frame.f_back
if f_back is not None:
current_tf = f_back.f_trace
if next is not None:
# Assign our trace function to the current stack
f = next.frame
if next is current:
f = f.f_back
while f:
if isinstance(f, types.FrameType):
f.f_trace = self.trace_func
f = f.f_back
next.trace_function = self.trace_func
finally:
current.trace_function = current_tf
python类trace_func()的实例源码
def trace_func(self, frame, event, arg):
# If we're so far into process shutdown that sys is already gone, just stop tracing.
if sys is None:
return None
elif self.is_sending:
# https://pytools.codeplex.com/workitem/1864
# we're currently doing I/O w/ the socket, we don't want to deliver
# any breakpoints or async breaks because we'll deadlock. Continue
# to return the trace function so all of our frames remain
# balanced. A better way to deal with this might be to do
# sys.settrace(None) when we take the send lock, but that's much
# more difficult because our send context manager is used both
# inside and outside of the trace function, and so is used when
# tracing is enabled and disabled, and so it's very easy to get our
# current frame tracking to be thrown off...
return self.trace_func
try:
# if should_debug_code(frame.f_code) is not true during attach
# the current frame is None and a pop_frame will cause an exception and
# break the debugger
if self.cur_frame is None:
# happens during attach, we need frame for blocking
self.push_frame(frame)
if self.stepping == STEPPING_BREAK and should_debug_code(frame.f_code):
if self.detach:
if stackless is not None:
stackless.set_schedule_callback(None)
stackless.tasklet.__call__ = self.__oldstacklesscall__
sys.settrace(None)
return None
self.async_break()
return self._events[event](frame, arg)
except (StackOverflowException, KeyboardInterrupt):
# stack overflow, disable tracing
return self.trace_func
def new_external_thread():
thread = new_thread()
if not attach_sent_break:
# we are still doing the attach, make this thread break.
thread.stepping = STEPPING_ATTACH_BREAK
elif SEND_BREAK_COMPLETE:
# user requested break all, make this thread break
thread.stepping = STEPPING_BREAK
sys.settrace(thread.trace_func)
def __init__(self, id = None):
if id is not None:
self.id = id
else:
self.id = thread.get_ident()
self._events = {'call' : self.handle_call,
'line' : self.handle_line,
'return' : self.handle_return,
'exception' : self.handle_exception,
'c_call' : self.handle_c_call,
'c_return' : self.handle_c_return,
'c_exception' : self.handle_c_exception,
}
self.cur_frame = None
self.stepping = STEPPING_NONE
self.unblock_work = None
self._block_lock = thread.allocate_lock()
self._block_lock.acquire()
self._block_starting_lock = thread.allocate_lock()
self._is_blocked = False
self._is_working = False
self.stopped_on_line = None
self.detach = False
self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
self.prev_trace_func = None
self.trace_func_stack = []
self.reported_process_loaded = False
self.django_stepping = None
self.is_sending = False
# stackless changes
if stackless is not None:
self._stackless_attach()
if sys.platform == 'cli':
self.frames = []
def _stackless_attach(self):
try:
stackless.tasklet.trace_function
except AttributeError:
# the tasklets need to be traced on a case by case basis
# sys.trace needs to be called within their calling context
def __call__(tsk, *args, **kwargs):
f = tsk.tempval
def new_f(old_f, args, kwargs):
sys.settrace(self.trace_func)
try:
if old_f is not None:
return old_f(*args, **kwargs)
finally:
sys.settrace(None)
tsk.tempval = new_f
stackless.tasklet.setup(tsk, f, args, kwargs)
return tsk
def settrace(tsk, tb):
if hasattr(tsk.frame, "f_trace"):
tsk.frame.f_trace = tb
sys.settrace(tb)
self.__oldstacklesscall__ = stackless.tasklet.__call__
stackless.tasklet.settrace = settrace
stackless.tasklet.__call__ = __call__
if sys.platform == 'cli':
self.frames = []
def context_dispatcher(self, old, new):
self.stepping = STEPPING_NONE
# for those tasklets that started before we started tracing
# we need to make sure that the trace is set by patching
# it in the context switch
if old and new:
if hasattr(new.frame, "f_trace") and not new.frame.f_trace:
sys.call_tracing(new.settrace,(self.trace_func,))
def _stackless_schedule_cb(self, prev, next):
current = stackless.getcurrent()
if not current:
return
current_tf = current.trace_function
try:
current.trace_function = None
self.stepping = STEPPING_NONE
# If the current frame has no trace function, we may need to get it
# from the previous frame, depending on how we ended up in the
# callback.
if current_tf is None:
f_back = current.frame.f_back
if f_back is not None:
current_tf = f_back.f_trace
if next is not None:
# Assign our trace function to the current stack
f = next.frame
if next is current:
f = f.f_back
while f:
if isinstance(f, types.FrameType):
f.f_trace = self.trace_func
f = f.f_back
next.trace_function = self.trace_func
finally:
current.trace_function = current_tf
def trace_func(self, frame, event, arg):
# If we're so far into process shutdown that sys is already gone, just stop tracing.
if sys is None:
return None
elif self.is_sending:
# https://pytools.codeplex.com/workitem/1864
# we're currently doing I/O w/ the socket, we don't want to deliver
# any breakpoints or async breaks because we'll deadlock. Continue
# to return the trace function so all of our frames remain
# balanced. A better way to deal with this might be to do
# sys.settrace(None) when we take the send lock, but that's much
# more difficult because our send context manager is used both
# inside and outside of the trace function, and so is used when
# tracing is enabled and disabled, and so it's very easy to get our
# current frame tracking to be thrown off...
return self.trace_func
try:
# if should_debug_code(frame.f_code) is not true during attach
# the current frame is None and a pop_frame will cause an exception and
# break the debugger
if self.cur_frame is None:
# happens during attach, we need frame for blocking
self.push_frame(frame)
if self.stepping == STEPPING_BREAK and should_debug_code(frame.f_code):
if self.detach:
if stackless is not None:
stackless.set_schedule_callback(None)
stackless.tasklet.__call__ = self.__oldstacklesscall__
sys.settrace(None)
return None
self.async_break()
return self._events[event](frame, arg)
except (StackOverflowException, KeyboardInterrupt):
# stack overflow, disable tracing
return self.trace_func
def new_external_thread():
thread = new_thread()
if not attach_sent_break:
# we are still doing the attach, make this thread break.
thread.stepping = STEPPING_ATTACH_BREAK
elif SEND_BREAK_COMPLETE:
# user requested break all, make this thread break
thread.stepping = STEPPING_BREAK
sys.settrace(thread.trace_func)
def __init__(self, id = None):
if id is not None:
self.id = id
else:
self.id = thread.get_ident()
self._events = {'call' : self.handle_call,
'line' : self.handle_line,
'return' : self.handle_return,
'exception' : self.handle_exception,
'c_call' : self.handle_c_call,
'c_return' : self.handle_c_return,
'c_exception' : self.handle_c_exception,
}
self.cur_frame = None
self.stepping = STEPPING_NONE
self.unblock_work = None
self._block_lock = thread.allocate_lock()
self._block_lock.acquire()
self._block_starting_lock = thread.allocate_lock()
self._is_blocked = False
self._is_working = False
self.stopped_on_line = None
self.detach = False
self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
self.prev_trace_func = None
self.trace_func_stack = []
self.reported_process_loaded = False
self.django_stepping = None
self.is_sending = False
# stackless changes
if stackless is not None:
self._stackless_attach()
if sys.platform == 'cli':
self.frames = []
def _stackless_attach(self):
try:
stackless.tasklet.trace_function
except AttributeError:
# the tasklets need to be traced on a case by case basis
# sys.trace needs to be called within their calling context
def __call__(tsk, *args, **kwargs):
f = tsk.tempval
def new_f(old_f, args, kwargs):
sys.settrace(self.trace_func)
try:
if old_f is not None:
return old_f(*args, **kwargs)
finally:
sys.settrace(None)
tsk.tempval = new_f
stackless.tasklet.setup(tsk, f, args, kwargs)
return tsk
def settrace(tsk, tb):
if hasattr(tsk.frame, "f_trace"):
tsk.frame.f_trace = tb
sys.settrace(tb)
self.__oldstacklesscall__ = stackless.tasklet.__call__
stackless.tasklet.settrace = settrace
stackless.tasklet.__call__ = __call__
if sys.platform == 'cli':
self.frames = []
def context_dispatcher(self, old, new):
self.stepping = STEPPING_NONE
# for those tasklets that started before we started tracing
# we need to make sure that the trace is set by patching
# it in the context switch
if old and new:
if hasattr(new.frame, "f_trace") and not new.frame.f_trace:
sys.call_tracing(new.settrace,(self.trace_func,))
def _stackless_schedule_cb(self, prev, next):
current = stackless.getcurrent()
if not current:
return
current_tf = current.trace_function
try:
current.trace_function = None
self.stepping = STEPPING_NONE
# If the current frame has no trace function, we may need to get it
# from the previous frame, depending on how we ended up in the
# callback.
if current_tf is None:
f_back = current.frame.f_back
if f_back is not None:
current_tf = f_back.f_trace
if next is not None:
# Assign our trace function to the current stack
f = next.frame
if next is current:
f = f.f_back
while f:
if isinstance(f, types.FrameType):
f.f_trace = self.trace_func
f = f.f_back
next.trace_function = self.trace_func
finally:
current.trace_function = current_tf
def trace_func(self, frame, event, arg):
# If we're so far into process shutdown that sys is already gone, just stop tracing.
if sys is None:
return None
elif self.is_sending:
# https://pytools.codeplex.com/workitem/1864
# we're currently doing I/O w/ the socket, we don't want to deliver
# any breakpoints or async breaks because we'll deadlock. Continue
# to return the trace function so all of our frames remain
# balanced. A better way to deal with this might be to do
# sys.settrace(None) when we take the send lock, but that's much
# more difficult because our send context manager is used both
# inside and outside of the trace function, and so is used when
# tracing is enabled and disabled, and so it's very easy to get our
# current frame tracking to be thrown off...
return self.trace_func
try:
# if should_debug_code(frame.f_code) is not true during attach
# the current frame is None and a pop_frame will cause an exception and
# break the debugger
if self.cur_frame is None:
# happens during attach, we need frame for blocking
self.push_frame(frame)
if self.stepping == STEPPING_BREAK and should_debug_code(frame.f_code):
if self.detach:
if stackless is not None:
stackless.set_schedule_callback(None)
stackless.tasklet.__call__ = self.__oldstacklesscall__
sys.settrace(None)
return None
self.async_break()
return self._events[event](frame, arg)
except (StackOverflowException, KeyboardInterrupt):
# stack overflow, disable tracing
return self.trace_func
def new_external_thread():
thread = new_thread()
if not attach_sent_break:
# we are still doing the attach, make this thread break.
thread.stepping = STEPPING_ATTACH_BREAK
elif SEND_BREAK_COMPLETE:
# user requested break all, make this thread break
thread.stepping = STEPPING_BREAK
sys.settrace(thread.trace_func)
def debug(file, port_num, debug_id, debug_options, currentPid, run_as = 'script'):
# remove us from modules so there's no trace of us
sys.modules['$visualstudio_py_debugger'] = sys.modules['visualstudio_py_debugger']
__name__ = '$visualstudio_py_debugger'
del sys.modules['visualstudio_py_debugger']
wait_on_normal_exit = 'WaitOnNormalExit' in debug_options
## Begin modification by Don Jayamanne
# Pass current Process id to pass back to debugger
attach_process(port_num, debug_id, debug_options, currentPid, report = True)
## End Modification by Don Jayamanne
# setup the current thread
cur_thread = new_thread()
cur_thread.stepping = STEPPING_LAUNCH_BREAK
# start tracing on this thread
sys.settrace(cur_thread.trace_func)
# now execute main file
globals_obj = {'__name__': '__main__'}
try:
if run_as == 'module':
exec_module(file, globals_obj)
elif run_as == 'code':
exec_code(file, '<string>', globals_obj)
else:
exec_file(file, globals_obj)
finally:
sys.settrace(None)
THREADS_LOCK.acquire()
del THREADS[cur_thread.id]
THREADS_LOCK.release()
report_thread_exit(cur_thread)
# Give VS debugger a chance to process commands
# by waiting for ack of "last" command
global _threading
if _threading is None:
import threading
_threading = threading
global last_ack_event
last_ack_event = _threading.Event()
with _SendLockCtx:
write_bytes(conn, LAST)
last_ack_event.wait(5)
if wait_on_normal_exit:
do_wait()
# Code objects for functions which are going to be at the bottom of the stack, right below the first
# stack frame for user code. When we walk the stack to determine whether to report or block on a given
# frame, hitting any of these means that we walked all the frames that we needed to look at.
def debug(file, port_num, debug_id, debug_options, currentPid, run_as = 'script'):
# remove us from modules so there's no trace of us
sys.modules['$visualstudio_py_debugger'] = sys.modules['visualstudio_py_debugger']
__name__ = '$visualstudio_py_debugger'
del sys.modules['visualstudio_py_debugger']
wait_on_normal_exit = 'WaitOnNormalExit' in debug_options
## Begin modification by Don Jayamanne
# Pass current Process id to pass back to debugger
attach_process(port_num, debug_id, debug_options, currentPid, report = True)
## End Modification by Don Jayamanne
# setup the current thread
cur_thread = new_thread()
cur_thread.stepping = STEPPING_LAUNCH_BREAK
# start tracing on this thread
sys.settrace(cur_thread.trace_func)
# now execute main file
globals_obj = {'__name__': '__main__'}
try:
if run_as == 'module':
exec_module(file, globals_obj)
elif run_as == 'code':
exec_code(file, '<string>', globals_obj)
else:
exec_file(file, globals_obj)
finally:
sys.settrace(None)
THREADS_LOCK.acquire()
del THREADS[cur_thread.id]
THREADS_LOCK.release()
report_thread_exit(cur_thread)
# Give VS debugger a chance to process commands
# by waiting for ack of "last" command
global _threading
if _threading is None:
import threading
_threading = threading
global last_ack_event
last_ack_event = _threading.Event()
with _SendLockCtx:
write_bytes(conn, LAST)
last_ack_event.wait(5)
if wait_on_normal_exit:
do_wait()
# Code objects for functions which are going to be at the bottom of the stack, right below the first
# stack frame for user code. When we walk the stack to determine whether to report or block on a given
# frame, hitting any of these means that we walked all the frames that we needed to look at.
def debug(file, port_num, debug_id, debug_options, run_as = 'script'):
# remove us from modules so there's no trace of us
sys.modules['$visualstudio_py_debugger'] = sys.modules['visualstudio_py_debugger']
__name__ = '$visualstudio_py_debugger'
del sys.modules['visualstudio_py_debugger']
wait_on_normal_exit = 'WaitOnNormalExit' in debug_options
attach_process(port_num, debug_id, debug_options, report = True)
# setup the current thread
cur_thread = new_thread()
cur_thread.stepping = STEPPING_LAUNCH_BREAK
# start tracing on this thread
sys.settrace(cur_thread.trace_func)
# now execute main file
globals_obj = {'__name__': '__main__'}
try:
if run_as == 'module':
exec_module(file, globals_obj)
elif run_as == 'code':
exec_code(file, '<string>', globals_obj)
else:
exec_file(file, globals_obj)
finally:
sys.settrace(None)
THREADS_LOCK.acquire()
del THREADS[cur_thread.id]
THREADS_LOCK.release()
report_thread_exit(cur_thread)
# Give VS debugger a chance to process commands
# by waiting for ack of "last" command
global _threading
if _threading is None:
import threading
_threading = threading
global last_ack_event
last_ack_event = _threading.Event()
with _SendLockCtx:
write_bytes(conn, LAST)
last_ack_event.wait(5)
if wait_on_normal_exit:
do_wait()
# Code objects for functions which are going to be at the bottom of the stack, right below the first
# stack frame for user code. When we walk the stack to determine whether to report or block on a given
# frame, hitting any of these means that we walked all the frames that we needed to look at.
def __init__(self, id = None):
if id is not None:
self.id = id
else:
self.id = thread.get_ident()
self._events = {'call' : self.handle_call,
'line' : self.handle_line,
'return' : self.handle_return,
'exception' : self.handle_exception,
'c_call' : self.handle_c_call,
'c_return' : self.handle_c_return,
'c_exception' : self.handle_c_exception,
}
self.cur_frame = None
self.stepping = STEPPING_NONE
self.unblock_work = None
self._block_lock = thread.allocate_lock()
self._block_lock.acquire()
self._block_starting_lock = thread.allocate_lock()
self._is_blocked = False
self._is_working = False
self.stopped_on_line = None
self.detach = False
self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
self.prev_trace_func = None
self.trace_func_stack = []
self.reported_process_loaded = False
self.django_stepping = None
# stackless changes
if stackless is not None:
stackless.set_schedule_callback(self.context_dispatcher)
# the tasklets need to be traced on a case by case basis
# sys.trace needs to be called within their calling context
def __call__(tsk, *args, **kwargs):
f = tsk.tempval
def new_f(old_f, args, kwargs):
sys.settrace(self.trace_func)
try:
if old_f is not None:
return old_f(*args, **kwargs)
finally:
sys.settrace(None)
tsk.tempval = new_f
stackless.tasklet.setup(tsk, f, args, kwargs)
return tsk
def settrace(tsk, tb):
if hasattr(tsk.frame, "f_trace"):
tsk.frame.f_trace = tb
sys.settrace(tb)
self.__oldstacklesscall__ = stackless.tasklet.__call__
stackless.tasklet.settrace = settrace
stackless.tasklet.__call__ = __call__
if sys.platform == 'cli':
self.frames = []
def handle_line(self, frame, arg):
if not DETACHED:
stepping = self.stepping
# http://pytools.codeplex.com/workitem/815
# if we block for a step into/over we don't want to block again for a breakpoint
blocked_for_stepping = False
if stepping is not STEPPING_NONE: # check for the common case of no stepping first...
if (((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and frame.f_lineno != self.stopped_on_line)
or stepping == STEPPING_LAUNCH_BREAK
or stepping == STEPPING_ATTACH_BREAK):
if ((stepping == STEPPING_LAUNCH_BREAK and not MODULES) or
not should_debug_code(frame.f_code)): # don't break into our own debugger / non-user code
# don't break into inital Python code needed to set things up
return self.trace_func
blocked_for_stepping = stepping != STEPPING_LAUNCH_BREAK and stepping != STEPPING_ATTACH_BREAK
self.block_maybe_attach()
if BREAKPOINTS and blocked_for_stepping is False:
bp = BREAKPOINTS.get(frame.f_lineno)
if bp is not None:
for (filename, bp_id), (condition, bound) in bp.items():
if filename == frame.f_code.co_filename or (not bound and filename_is_same(filename, frame.f_code.co_filename)):
if condition:
try:
res = eval(condition.condition, frame.f_globals, frame.f_locals)
if condition.break_when_changed:
block = condition.last_value != res
condition.last_value = res
else:
block = res
except:
block = True
else:
block = True
if block:
probe_stack()
update_all_thread_stacks(self)
self.block(lambda: (report_breakpoint_hit(bp_id, self.id), mark_all_threads_for_break()))
break
# forward call to previous trace function, if any, updating trace function appropriately
old_trace_func = self.prev_trace_func
if old_trace_func is not None:
self.prev_trace_func = None # clear first incase old_trace_func stack overflows
self.prev_trace_func = old_trace_func(frame, 'line', arg)
return self.trace_func