def test_thread_leak(self):
# The lock shouldn't leak a Thread instance when used from a foreign
# (non-threading) thread.
lock = self.locktype()
def f():
lock.acquire()
lock.release()
n = len(threading.enumerate())
# We run many threads in the hope that existing threads ids won't
# be recycled.
Bunch(f, 15).wait_for_finished()
if len(threading.enumerate()) != n:
# There is a small window during which a Thread instance's
# target function has finished running, but the Thread is still
# alive and registered. Avoid spurious failures by waiting a
# bit more (seen on a buildbot).
time.sleep(0.4)
self.assertEqual(n, len(threading.enumerate()))
python类enumerate()的实例源码
def test_enumerate_after_join(self):
# Try hard to trigger #1703448: a thread is still returned in
# threading.enumerate() after it has been join()ed.
enum = threading.enumerate
old_interval = sys.getswitchinterval()
try:
for i in range(1, 100):
sys.setswitchinterval(i * 0.0002)
t = threading.Thread(target=lambda: None)
t.start()
t.join()
l = enum()
self.assertNotIn(t, l,
"#1703448 triggered after %d trials: %s" % (i, l))
finally:
sys.setswitchinterval(old_interval)
def test_enumerate_after_join(self):
# Try hard to trigger #1703448: a thread is still returned in
# threading.enumerate() after it has been join()ed.
enum = threading.enumerate
old_interval = sys.getcheckinterval()
try:
for i in xrange(1, 100):
# Try a couple times at each thread-switching interval
# to get more interleavings.
sys.setcheckinterval(i // 5)
t = threading.Thread(target=lambda: None)
t.start()
t.join()
l = enum()
self.assertNotIn(t, l,
"#1703448 triggered after %d trials: %s" % (i, l))
finally:
sys.setcheckinterval(old_interval)
def flush(self):
"""Flush buffered output."""
orphans = []
self.lock.acquire()
try:
# Detect threads no longer existing.
indexes = (getattr(t, 'index', None) for t in threading.enumerate())
indexes = filter(None, indexes)
for index in self.__output_buffers:
if not index in indexes:
orphans.append((index, self.__output_buffers[index][0]))
for orphan in orphans:
del self.__output_buffers[orphan[0]]
finally:
self.lock.release()
# Don't keep the lock while writting. Will append \n when it shouldn't.
for orphan in orphans:
if orphan[1]:
self._wrapped.write('%d>%s\n' % (orphan[0], orphan[1]))
return self._wrapped.flush()
def flush(self):
"""Flush buffered output."""
orphans = []
self.lock.acquire()
try:
# Detect threads no longer existing.
indexes = (getattr(t, 'index', None) for t in threading.enumerate())
indexes = filter(None, indexes)
for index in self.__output_buffers:
if not index in indexes:
orphans.append((index, self.__output_buffers[index][0]))
for orphan in orphans:
del self.__output_buffers[orphan[0]]
finally:
self.lock.release()
# Don't keep the lock while writting. Will append \n when it shouldn't.
for orphan in orphans:
if orphan[1]:
self._wrapped.write('%d>%s\n' % (orphan[0], orphan[1]))
return self._wrapped.flush()
def multithread_engine(object,redirect,credentials):
start_time = datetime.datetime.now()
index = 0
if(object == initialize.ntw_device):
arguments = credentials
if(object == initialize.switchport):
arguments = credentials
for i in object:
my_thread = threading.Thread(target=getattr(object[index],redirect) , args=(arguments,))
my_thread.start()
index = index + 1
main_thread = threading.currentThread()
for some_thread in threading.enumerate():
if some_thread != main_thread:
print(some_thread)
some_thread.join()
print("\n")
print("TIME ELAPSED: {}\n".format(datetime.datetime.now() - start_time))
def multithread_engine(object,redirect,credentials):
start_time = datetime.datetime.now()
index = 0
if(object == initialize.ntw_device):
arguments = None
if(object == initialize.switchport):
arguments = credentials
for i in object:
my_thread = threading.Thread(target=getattr(object[index],redirect) , args=(arguments,))
my_thread.start()
index = index + 1
main_thread = threading.currentThread()
for some_thread in threading.enumerate():
if some_thread != main_thread:
print(some_thread)
some_thread.join()
print("\n")
print("TIME ELAPSED: {}\n".format(datetime.datetime.now() - start_time))
def __del__(self):
import threading
key = object.__getattribute__(self, '_local__key')
try:
threads = list(threading.enumerate())
except:
return
for thread in threads:
try:
__dict__ = thread.__dict__
except AttributeError:
continue
if key in __dict__:
try:
del __dict__[key]
except KeyError:
pass
def shutdown_components(self):
"""Execute before the reactor is shut down"""
self.log.info('exiting-on-keyboard-interrupt')
for component in reversed(registry.iterate()):
yield component.stop()
import threading
self.log.info('THREADS:')
main_thread = threading.current_thread()
for t in threading.enumerate():
if t is main_thread:
continue
if not t.isDaemon():
continue
self.log.info('joining thread {} {}'.format(
t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
t.join()
def sigquit_handler(sig, frame):
"""Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT
e.g. kill -s QUIT <PID> or CTRL+\
"""
print("Dumping stack traces for all threads in PID {}".format(os.getpid()))
id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for thread_id, stack in sys._current_frames().items():
code.append("\n# Thread: {}({})"
.format(id_to_name.get(thread_id, ""), thread_id))
for filename, line_number, name, line in traceback.extract_stack(stack):
code.append('File: "{}", line {}, in {}'
.format(filename, line_number, name))
if line:
code.append(" {}".format(line.strip()))
print("\n".join(code))
def worker_int(worker):
worker.log.info("worker received INT or QUIT signal")
## get traceback info
import threading, sys, traceback
id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
threadId))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename,
lineno, name))
if line:
code.append(" %s" % (line.strip()))
worker.log.debug("\n".join(code))
def iter_thread_frames():
main_thread_frame = None
for ident, frame in sys._current_frames().items():
if IDENT_TO_UUID.get(ident) == MAIN_UUID:
main_thread_frame = frame
# the MainThread should be shown in it's "greenlet" version
continue
yield ident, frame
for thread in threading.enumerate():
if not getattr(thread, '_greenlet', None):
# some inbetween state, before greenlet started or after it died?...
pass
elif thread._greenlet.gr_frame:
yield thread.ident, thread._greenlet.gr_frame
else:
# a thread with greenlet but without gr_frame will be fetched from sys._current_frames
# If we switch to another greenlet by the time we get there we will get inconsistent dup of threads.
# TODO - make best-effort attempt to show coherent thread dump
yield thread.ident, main_thread_frame
def test_thread_leak(self):
# The lock shouldn't leak a Thread instance when used from a foreign
# (non-threading) thread.
lock = self.locktype()
def f():
lock.acquire()
lock.release()
n = len(threading.enumerate())
# We run many threads in the hope that existing threads ids won't
# be recycled.
Bunch(f, 15).wait_for_finished()
if len(threading.enumerate()) != n:
# There is a small window during which a Thread instance's
# target function has finished running, but the Thread is still
# alive and registered. Avoid spurious failures by waiting a
# bit more (seen on a buildbot).
time.sleep(0.4)
self.assertEqual(n, len(threading.enumerate()))
def test_enumerate_after_join(self):
# Try hard to trigger #1703448: a thread is still returned in
# threading.enumerate() after it has been join()ed.
enum = threading.enumerate
old_interval = sys.getswitchinterval()
try:
for i in range(1, 100):
sys.setswitchinterval(i * 0.0002)
t = threading.Thread(target=lambda: None)
t.start()
t.join()
l = enum()
self.assertNotIn(t, l,
"#1703448 triggered after %d trials: %s" % (i, l))
finally:
sys.setswitchinterval(old_interval)
def run_thread(req_list, name=None, is_lock=True, limit_num=8):
'''
?????
- req_list ????, list, ?????????, ???
- [
- (func_0, (para_0_1, para_0_2, *,)),
- (func_1, (para_1_1, para_1_2, *,)),
- *
- ]
- name ???, str, ???None
- is_lock ??????, bool, ???True, ????, False????
- limit_num ?????, int, ???8
'''
queue = deque(req_list)
while len(queue):
if threading.active_count() <= limit_num:
para = queue.popleft()
now_thread = threading.Thread(
target=para[0], args=para[1], name=name, daemon=True)
now_thread.start()
if is_lock:
for now_thread in threading.enumerate():
if now_thread is not threading.currentThread():
now_thread.join()
def flush(self):
"""Flush buffered output."""
orphans = []
self.lock.acquire()
try:
# Detect threads no longer existing.
indexes = (getattr(t, 'index', None) for t in threading.enumerate())
indexes = filter(None, indexes)
for index in self.__output_buffers:
if not index in indexes:
orphans.append((index, self.__output_buffers[index][0]))
for orphan in orphans:
del self.__output_buffers[orphan[0]]
finally:
self.lock.release()
# Don't keep the lock while writting. Will append \n when it shouldn't.
for orphan in orphans:
if orphan[1]:
self._wrapped.write('%d>%s\n' % (orphan[0], orphan[1]))
return self._wrapped.flush()
def test_builtin_channels(self):
b = wspbus.Bus()
self.responses, expected = [], []
for channel in b.listeners:
for index, priority in enumerate([100, 50, 0, 51]):
b.subscribe(channel,
self.get_listener(channel, index), priority)
for channel in b.listeners:
b.publish(channel)
expected.extend([msg % (i, channel, None) for i in (2, 1, 3, 0)])
b.publish(channel, arg=79347)
expected.extend([msg % (i, channel, 79347) for i in (2, 1, 3, 0)])
self.assertEqual(self.responses, expected)
def test_custom_channels(self):
b = wspbus.Bus()
self.responses, expected = [], []
custom_listeners = ('hugh', 'louis', 'dewey')
for channel in custom_listeners:
for index, priority in enumerate([None, 10, 60, 40]):
b.subscribe(channel,
self.get_listener(channel, index), priority)
for channel in custom_listeners:
b.publish(channel, 'ah so')
expected.extend([msg % (i, channel, 'ah so')
for i in (1, 3, 0, 2)])
b.publish(channel)
expected.extend([msg % (i, channel, None) for i in (1, 3, 0, 2)])
self.assertEqual(self.responses, expected)
def watchThreads(self) -> None:
while True:
try:
workingThreads: List[Thread] = threading.enumerate()
with open(f"{self.core.config.directory.api}/{API.Thread.value}", "w") as f:
json.dump([x.name for x in workingThreads], f, sort_keys=True, indent=4)
for i, thread, func in enumerate(self.threads):
if not thread.is_alive() or thread not in workingThreads:
if thread.name in [x.meta.name for x in self.core.PM.plugins.values()]:
self.threads[i] = self.startThread(func, name=thread.name)
else:
self.threads[i] = self.startThread(func, name=thread.name, args=[self.core])
except:
pass
finally:
time.sleep(10)
def get_full_thread_dump():
"""Returns a string containing a traceback for all threads"""
output = io.StringIO()
time = strftime("%Y-%m-%d %H:%M:%S", gmtime())
thread_names = {}
for thread in threading.enumerate():
thread_names[thread.ident] = thread.name
output.write("\n>>>> Begin stack trace (%s) >>>>\n" % time)
for threadId, stack in current_frames().items():
output.write(
"\n# ThreadID: %s (%s)\n" %
(threadId, thread_names.get(threadId, "unknown")))
for filename, lineno, name, line in traceback.extract_stack(stack):
output.write(
'File: "%s", line %d, in %s\n' %
(filename, lineno, name))
if line:
output.write(" %s\n" % (line.strip()))
output.write("\n<<<< End stack trace <<<<\n\n")
thread_dump = output.getvalue()
output.close()
return thread_dump