def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
all_threads = list(THREADS.items())
THREADS_LOCK.release()
for tid, pyThread in all_threads:
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS_LOCK.acquire()
THREADS.clear()
THREADS_LOCK.release()
BREAKPOINTS.clear()
python类unblock()的实例源码
def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
all_threads = list(THREADS.items())
THREADS_LOCK.release()
for tid, pyThread in all_threads:
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS_LOCK.acquire()
THREADS.clear()
THREADS_LOCK.release()
BREAKPOINTS.clear()
def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
all_threads = list(THREADS.items())
THREADS_LOCK.release()
for tid, pyThread in all_threads:
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS_LOCK.acquire()
THREADS.clear()
THREADS_LOCK.release()
BREAKPOINTS.clear()
def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
for tid, pyThread in THREADS.items():
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS.clear()
BREAKPOINTS.clear()
THREADS_LOCK.release()
def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
all_threads = list(THREADS.items())
THREADS_LOCK.release()
for tid, pyThread in all_threads:
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS_LOCK.acquire()
THREADS.clear()
THREADS_LOCK.release()
BREAKPOINTS.clear()
def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
all_threads = list(THREADS.items())
THREADS_LOCK.release()
for tid, pyThread in all_threads:
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS_LOCK.acquire()
THREADS.clear()
THREADS_LOCK.release()
BREAKPOINTS.clear()
def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
all_threads = list(THREADS.items())
THREADS_LOCK.release()
for tid, pyThread in all_threads:
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS_LOCK.acquire()
THREADS.clear()
THREADS_LOCK.release()
BREAKPOINTS.clear()
def detach_threads():
# tell all threads to stop tracing...
THREADS_LOCK.acquire()
all_threads = list(THREADS.items())
THREADS_LOCK.release()
for tid, pyThread in all_threads:
if not _INTERCEPTING_FOR_ATTACH:
pyThread.detach = True
pyThread.stepping = STEPPING_BREAK
if pyThread._is_blocked:
pyThread.unblock()
if not _INTERCEPTING_FOR_ATTACH:
THREADS_LOCK.acquire()
THREADS.clear()
THREADS_LOCK.release()
BREAKPOINTS.clear()
def unblock(self):
"""unblocks the current thread allowing it to continue to run"""
assert self._is_blocked
assert self.id != thread.get_ident() # only someone else should unblock us
self._block_lock.release()
def schedule_work(self, work):
self.unblock_work = work
self.unblock()
def command_resume_all(self):
# resume all
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for thread in all_threads:
thread._block_starting_lock.acquire()
if thread.stepping == STEPPING_BREAK or thread.stepping == STEPPING_ATTACH_BREAK:
thread.stepping = STEPPING_NONE
if thread._is_blocked:
thread.unblock()
thread._block_starting_lock.release()
def command_resume_thread(self):
tid = read_int(self.conn)
THREADS_LOCK.acquire()
thread = THREADS[tid]
THREADS_LOCK.release()
if thread.reported_process_loaded:
thread.reported_process_loaded = False
self.command_resume_all()
else:
thread.unblock()
def schedule_work(self, work):
self.unblock_work = work
self.unblock()
def command_resume_all(self):
# resume all
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for thread in all_threads:
thread._block_starting_lock.acquire()
if thread.stepping == STEPPING_BREAK or thread.stepping == STEPPING_ATTACH_BREAK:
thread.stepping = STEPPING_NONE
if thread._is_blocked:
thread.unblock()
thread._block_starting_lock.release()
def command_resume_thread(self):
tid = read_int(self.conn)
THREADS_LOCK.acquire()
thread = THREADS[tid]
THREADS_LOCK.release()
if thread.reported_process_loaded:
thread.reported_process_loaded = False
self.command_resume_all()
else:
thread.unblock()
def unblock(self):
"""unblocks the current thread allowing it to continue to run"""
assert self._is_blocked
assert self.id != thread.get_ident() # only someone else should unblock us
self._block_lock.release()
def schedule_work(self, work):
self.unblock_work = work
self.unblock()
def command_resume_all(self):
# resume all
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for thread in all_threads:
thread._block_starting_lock.acquire()
if thread.stepping == STEPPING_BREAK or thread.stepping == STEPPING_ATTACH_BREAK:
thread.stepping = STEPPING_NONE
if thread._is_blocked:
thread.unblock()
thread._block_starting_lock.release()
def command_resume_thread(self):
tid = read_int(self.conn)
THREADS_LOCK.acquire()
thread = THREADS[tid]
THREADS_LOCK.release()
if thread.reported_process_loaded:
thread.reported_process_loaded = False
self.command_resume_all()
else:
thread.unblock()
def unblock(self):
"""unblocks the current thread allowing it to continue to run"""
assert self._is_blocked
assert self.id != thread.get_ident() # only someone else should unblock us
self._block_lock.release()
def schedule_work(self, work):
self.unblock_work = work
self.unblock()
def command_resume_all(self):
# resume all
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for thread in all_threads:
thread._block_starting_lock.acquire()
if thread.stepping == STEPPING_BREAK or thread.stepping == STEPPING_ATTACH_BREAK:
thread.stepping = STEPPING_NONE
if thread._is_blocked:
thread.unblock()
thread._block_starting_lock.release()
def command_resume_thread(self):
tid = read_int(self.conn)
THREADS_LOCK.acquire()
thread = THREADS[tid]
THREADS_LOCK.release()
if thread.reported_process_loaded:
thread.reported_process_loaded = False
self.command_resume_all()
else:
thread.unblock()
def unblock(self):
"""unblocks the current thread allowing it to continue to run"""
assert self._is_blocked
assert self.id != thread.get_ident() # only someone else should unblock us
self._block_lock.release()
def schedule_work(self, work):
self.unblock_work = work
self.unblock()
def command_resume_all(self):
# resume all
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for thread in all_threads:
thread._block_starting_lock.acquire()
if thread.stepping == STEPPING_BREAK or thread.stepping == STEPPING_ATTACH_BREAK:
thread.stepping = STEPPING_NONE
if thread._is_blocked:
thread.unblock()
thread._block_starting_lock.release()
def command_resume_thread(self):
tid = read_int(self.conn)
THREADS_LOCK.acquire()
thread = THREADS[tid]
THREADS_LOCK.release()
if thread.reported_process_loaded:
thread.reported_process_loaded = False
self.command_resume_all()
else:
thread.unblock()
def unblock(self):
"""unblocks the current thread allowing it to continue to run"""
assert self._is_blocked
assert self.id != thread.get_ident() # only someone else should unblock us
self._block_lock.release()
def schedule_work(self, work):
self.unblock_work = work
self.unblock()
def command_resume_all(self):
# resume all
THREADS_LOCK.acquire()
all_threads = list(THREADS.values())
THREADS_LOCK.release()
for thread in all_threads:
thread._block_starting_lock.acquire()
if thread.stepping == STEPPING_BREAK or thread.stepping == STEPPING_ATTACH_BREAK:
thread.stepping = STEPPING_NONE
if thread._is_blocked:
thread.unblock()
thread._block_starting_lock.release()