def update_all_thread_stacks(blocking_thread = None, check_is_blocked = True):
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for cur_thread in all_threads:
if cur_thread is blocking_thread:
continue
cur_thread._block_starting_lock.acquire()
if not check_is_blocked or not cur_thread._is_blocked:
# release the lock, we're going to run user code to evaluate the frames
cur_thread._block_starting_lock.release()
frames = cur_thread.get_frame_list()
# re-acquire the lock and make sure we're still not blocked. If so send
# the frame list.
cur_thread._block_starting_lock.acquire()
if not check_is_blocked or not cur_thread._is_blocked:
cur_thread.send_frame_list(frames)
cur_thread._block_starting_lock.release()
python类_is_blocked()的实例源码
def update_all_thread_stacks(blocking_thread = None, check_is_blocked = True):
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for cur_thread in all_threads:
if cur_thread is blocking_thread:
continue
cur_thread._block_starting_lock.acquire()
if not check_is_blocked or not cur_thread._is_blocked:
# release the lock, we're going to run user code to evaluate the frames
cur_thread._block_starting_lock.release()
frames = cur_thread.get_frame_list()
# re-acquire the lock and make sure we're still not blocked. If so send
# the frame list.
cur_thread._block_starting_lock.acquire()
if not check_is_blocked or not cur_thread._is_blocked:
cur_thread.send_frame_list(frames)
cur_thread._block_starting_lock.release()
def update_all_thread_stacks(blocking_thread = None, check_is_blocked = True):
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for cur_thread in all_threads:
if cur_thread is blocking_thread:
continue
cur_thread._block_starting_lock.acquire()
if not check_is_blocked or not cur_thread._is_blocked:
# release the lock, we're going to run user code to evaluate the frames
cur_thread._block_starting_lock.release()
frames = cur_thread.get_frame_list()
# re-acquire the lock and make sure we're still not blocked. If so send
# the frame list.
cur_thread._block_starting_lock.acquire()
if not check_is_blocked or not cur_thread._is_blocked:
cur_thread.send_frame_list(frames)
cur_thread._block_starting_lock.release()
def update_all_thread_stacks(blocking_thread):
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for cur_thread in all_threads:
if cur_thread is blocking_thread:
continue
cur_thread._block_starting_lock.acquire()
if not cur_thread._is_blocked:
# release the lock, we're going to run user code to evaluate the frames
cur_thread._block_starting_lock.release()
frames = cur_thread.get_frame_list()
# re-acquire the lock and make sure we're still not blocked. If so send
# the frame list.
cur_thread._block_starting_lock.acquire()
if not cur_thread._is_blocked:
cur_thread.send_frame_list(frames)
cur_thread._block_starting_lock.release()
def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
for tid, pyThread in THREADS.items():
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS.clear()
BREAKPOINTS.clear()
THREADS_LOCK.release()
def update_all_thread_stacks(blocking_thread = None, check_is_blocked = True):
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for cur_thread in all_threads:
if cur_thread is blocking_thread:
continue
cur_thread._block_starting_lock.acquire()
if not check_is_blocked or not cur_thread._is_blocked:
# release the lock, we're going to run user code to evaluate the frames
cur_thread._block_starting_lock.release()
frames = cur_thread.get_frame_list()
# re-acquire the lock and make sure we're still not blocked. If so send
# the frame list.
cur_thread._block_starting_lock.acquire()
if not check_is_blocked or not cur_thread._is_blocked:
cur_thread.send_frame_list(frames)
cur_thread._block_starting_lock.release()
def update_all_thread_stacks(blocking_thread = None, check_is_blocked = True):
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for cur_thread in all_threads:
if cur_thread is blocking_thread:
continue
cur_thread._block_starting_lock.acquire()
if not check_is_blocked or not cur_thread._is_blocked:
# release the lock, we're going to run user code to evaluate the frames
cur_thread._block_starting_lock.release()
frames = cur_thread.get_frame_list()
# re-acquire the lock and make sure we're still not blocked. If so send
# the frame list.
cur_thread._block_starting_lock.acquire()
if not check_is_blocked or not cur_thread._is_blocked:
cur_thread.send_frame_list(frames)
cur_thread._block_starting_lock.release()
def update_all_thread_stacks(blocking_thread = None, check_is_blocked = True):
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for cur_thread in all_threads:
if cur_thread is blocking_thread:
continue
cur_thread._block_starting_lock.acquire()
if not check_is_blocked or not cur_thread._is_blocked:
# release the lock, we're going to run user code to evaluate the frames
cur_thread._block_starting_lock.release()
frames = cur_thread.get_frame_list()
# re-acquire the lock and make sure we're still not blocked. If so send
# the frame list.
cur_thread._block_starting_lock.acquire()
if not check_is_blocked or not cur_thread._is_blocked:
cur_thread.send_frame_list(frames)
cur_thread._block_starting_lock.release()
def update_all_thread_stacks(blocking_thread = None, check_is_blocked = True):
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for cur_thread in all_threads:
if cur_thread is blocking_thread:
continue
cur_thread._block_starting_lock.acquire()
if not check_is_blocked or not cur_thread._is_blocked:
# release the lock, we're going to run user code to evaluate the frames
cur_thread._block_starting_lock.release()
frames = cur_thread.get_frame_list()
# re-acquire the lock and make sure we're still not blocked. If so send
# the frame list.
cur_thread._block_starting_lock.acquire()
if not check_is_blocked or not cur_thread._is_blocked:
cur_thread.send_frame_list(frames)
cur_thread._block_starting_lock.release()
def __init__(self, id = None):
if id is not None:
self.id = id
else:
self.id = thread.get_ident()
self._events = {'call' : self.handle_call,
'line' : self.handle_line,
'return' : self.handle_return,
'exception' : self.handle_exception,
'c_call' : self.handle_c_call,
'c_return' : self.handle_c_return,
'c_exception' : self.handle_c_exception,
}
self.cur_frame = None
self.stepping = STEPPING_NONE
self.unblock_work = None
self._block_lock = thread.allocate_lock()
self._block_lock.acquire()
self._block_starting_lock = thread.allocate_lock()
self._is_blocked = False
self._is_working = False
self.stopped_on_line = None
self.detach = False
self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
self.prev_trace_func = None
self.trace_func_stack = []
self.reported_process_loaded = False
self.django_stepping = None
self.is_sending = False
# stackless changes
if stackless is not None:
self._stackless_attach()
if sys.platform == 'cli':
self.frames = []
def block(self, block_lambda, keep_stopped_on_line = False):
"""blocks the current thread until the debugger resumes it"""
assert not self._is_blocked
#assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident()) # we should only ever block ourselves
# send thread frames before we block
self.enum_thread_frames_locally()
if not keep_stopped_on_line:
self.stopped_on_line = self.cur_frame.f_lineno
# need to synchronize w/ sending the reason we're blocking
self._block_starting_lock.acquire()
self._is_blocked = True
block_lambda()
self._block_starting_lock.release()
while not DETACHED:
self._block_lock.acquire()
if self.unblock_work is None:
break
# the debugger wants us to do something, do it, and then block again
self._is_working = True
self.unblock_work()
self.unblock_work = None
self._is_working = False
self._block_starting_lock.acquire()
assert self._is_blocked
self._is_blocked = False
self._block_starting_lock.release()
def unblock(self):
"""unblocks the current thread allowing it to continue to run"""
assert self._is_blocked
assert self.id != thread.get_ident() # only someone else should unblock us
self._block_lock.release()
def run_on_thread(self, text, cur_frame, execution_id, frame_kind, repr_kind = PYTHON_EVALUATION_RESULT_REPR_KIND_NORMAL):
self._block_starting_lock.acquire()
if not self._is_blocked:
report_execution_error('<expression cannot be evaluated at this time>', execution_id)
elif not self._is_working:
self.schedule_work(lambda : self.run_locally(text, cur_frame, execution_id, frame_kind, repr_kind))
else:
report_execution_error('<error: previous evaluation has not completed>', execution_id)
self._block_starting_lock.release()
def enum_child_on_thread(self, text, cur_frame, execution_id, frame_kind):
self._block_starting_lock.acquire()
if not self._is_working and self._is_blocked:
self.schedule_work(lambda : self.enum_child_locally(text, cur_frame, execution_id, frame_kind))
self._block_starting_lock.release()
else:
self._block_starting_lock.release()
report_children(execution_id, [])
def command_step_into(self):
tid = read_int(self.conn)
thread = get_thread_from_id(tid)
if thread is not None:
assert thread._is_blocked
thread.stepping = STEPPING_INTO
self.command_resume_all()
def command_step_out(self):
tid = read_int(self.conn)
thread = get_thread_from_id(tid)
if thread is not None:
assert thread._is_blocked
thread.stepping = STEPPING_OUT
self.command_resume_all()
def command_step_over(self):
# set step over
tid = read_int(self.conn)
thread = get_thread_from_id(tid)
if thread is not None:
assert thread._is_blocked
if DJANGO_DEBUG:
source_obj = get_django_frame_source(thread.cur_frame)
if source_obj is not None:
thread.django_stepping = True
self.command_resume_all()
return
thread.stepping = STEPPING_OVER
self.command_resume_all()
def command_resume_all(self):
# resume all
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for thread in all_threads:
thread._block_starting_lock.acquire()
if thread.stepping == STEPPING_BREAK or thread.stepping == STEPPING_ATTACH_BREAK:
thread.stepping = STEPPING_NONE
if thread._is_blocked:
thread.unblock()
thread._block_starting_lock.release()
def __init__(self, id = None):
if id is not None:
self.id = id
else:
self.id = thread.get_ident()
self._events = {'call' : self.handle_call,
'line' : self.handle_line,
'return' : self.handle_return,
'exception' : self.handle_exception,
'c_call' : self.handle_c_call,
'c_return' : self.handle_c_return,
'c_exception' : self.handle_c_exception,
}
self.cur_frame = None
self.stepping = STEPPING_NONE
self.unblock_work = None
self._block_lock = thread.allocate_lock()
self._block_lock.acquire()
self._block_starting_lock = thread.allocate_lock()
self._is_blocked = False
self._is_working = False
self.stopped_on_line = None
self.detach = False
self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
self.prev_trace_func = None
self.trace_func_stack = []
self.reported_process_loaded = False
self.django_stepping = None
self.is_sending = False
# stackless changes
if stackless is not None:
self._stackless_attach()
if sys.platform == 'cli':
self.frames = []
def block(self, block_lambda, keep_stopped_on_line = False):
"""blocks the current thread until the debugger resumes it"""
assert not self._is_blocked
#assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident()) # we should only ever block ourselves
# send thread frames before we block
self.enum_thread_frames_locally()
if not keep_stopped_on_line:
self.stopped_on_line = self.cur_frame.f_lineno
# need to synchronize w/ sending the reason we're blocking
self._block_starting_lock.acquire()
self._is_blocked = True
block_lambda()
self._block_starting_lock.release()
while not DETACHED:
self._block_lock.acquire()
if self.unblock_work is None:
break
# the debugger wants us to do something, do it, and then block again
self._is_working = True
self.unblock_work()
self.unblock_work = None
self._is_working = False
self._block_starting_lock.acquire()
assert self._is_blocked
self._is_blocked = False
self._block_starting_lock.release()
def unblock(self):
"""unblocks the current thread allowing it to continue to run"""
assert self._is_blocked
assert self.id != thread.get_ident() # only someone else should unblock us
self._block_lock.release()
def run_on_thread(self, text, cur_frame, execution_id, frame_kind, repr_kind = PYTHON_EVALUATION_RESULT_REPR_KIND_NORMAL):
self._block_starting_lock.acquire()
if not self._is_blocked:
report_execution_error('<expression cannot be evaluated at this time>', execution_id)
elif not self._is_working:
self.schedule_work(lambda : self.run_locally(text, cur_frame, execution_id, frame_kind, repr_kind))
else:
report_execution_error('<error: previous evaluation has not completed>', execution_id)
self._block_starting_lock.release()
def enum_child_on_thread(self, text, cur_frame, execution_id, frame_kind):
self._block_starting_lock.acquire()
if not self._is_working and self._is_blocked:
self.schedule_work(lambda : self.enum_child_locally(text, cur_frame, execution_id, frame_kind))
self._block_starting_lock.release()
else:
self._block_starting_lock.release()
report_children(execution_id, [])
def command_step_into(self):
tid = read_int(self.conn)
thread = get_thread_from_id(tid)
if thread is not None:
assert thread._is_blocked
thread.stepping = STEPPING_INTO
self.command_resume_all()
def command_step_out(self):
tid = read_int(self.conn)
thread = get_thread_from_id(tid)
if thread is not None:
assert thread._is_blocked
thread.stepping = STEPPING_OUT
self.command_resume_all()
def command_step_over(self):
# set step over
tid = read_int(self.conn)
thread = get_thread_from_id(tid)
if thread is not None:
assert thread._is_blocked
if DJANGO_DEBUG:
source_obj = get_django_frame_source(thread.cur_frame)
if source_obj is not None:
thread.django_stepping = True
self.command_resume_all()
return
thread.stepping = STEPPING_OVER
self.command_resume_all()
def command_resume_all(self):
# resume all
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for thread in all_threads:
thread._block_starting_lock.acquire()
if thread.stepping == STEPPING_BREAK or thread.stepping == STEPPING_ATTACH_BREAK:
thread.stepping = STEPPING_NONE
if thread._is_blocked:
thread.unblock()
thread._block_starting_lock.release()
def __init__(self, id = None):
if id is not None:
self.id = id
else:
self.id = thread.get_ident()
self._events = {'call' : self.handle_call,
'line' : self.handle_line,
'return' : self.handle_return,
'exception' : self.handle_exception,
'c_call' : self.handle_c_call,
'c_return' : self.handle_c_return,
'c_exception' : self.handle_c_exception,
}
self.cur_frame = None
self.stepping = STEPPING_NONE
self.unblock_work = None
self._block_lock = thread.allocate_lock()
self._block_lock.acquire()
self._block_starting_lock = thread.allocate_lock()
self._is_blocked = False
self._is_working = False
self.stopped_on_line = None
self.detach = False
self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
self.prev_trace_func = None
self.trace_func_stack = []
self.reported_process_loaded = False
self.django_stepping = None
self.is_sending = False
# stackless changes
if stackless is not None:
self._stackless_attach()
if sys.platform == 'cli':
self.frames = []
def block(self, block_lambda, keep_stopped_on_line = False):
"""blocks the current thread until the debugger resumes it"""
assert not self._is_blocked
#assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident()) # we should only ever block ourselves
# send thread frames before we block
self.enum_thread_frames_locally()
if not keep_stopped_on_line:
self.stopped_on_line = self.cur_frame.f_lineno
# need to synchronize w/ sending the reason we're blocking
self._block_starting_lock.acquire()
self._is_blocked = True
block_lambda()
self._block_starting_lock.release()
while not DETACHED:
self._block_lock.acquire()
if self.unblock_work is None:
break
# the debugger wants us to do something, do it, and then block again
self._is_working = True
self.unblock_work()
self.unblock_work = None
self._is_working = False
self._block_starting_lock.acquire()
assert self._is_blocked
self._is_blocked = False
self._block_starting_lock.release()
def unblock(self):
"""unblocks the current thread allowing it to continue to run"""
assert self._is_blocked
assert self.id != thread.get_ident() # only someone else should unblock us
self._block_lock.release()