def __init__(self):
#_DummyThread_.__init__(self) # pylint:disable=super-init-not-called
# It'd be nice to use a pattern like "greenlet-%d", but maybe somebody out
# there is checking thread names...
self._name = self._Thread__name = __threading__._newname("DummyThread-%d")
self._set_ident()
g = getcurrent()
gid = _get_ident(g) # same as id(g)
__threading__._active[gid] = self
rawlink = getattr(g, 'rawlink', None)
if rawlink is not None:
# raw greenlet.greenlet greenlets don't
# have rawlink...
rawlink(_cleanup)
else:
# ... so for them we use weakrefs.
# See https://github.com/gevent/gevent/issues/918
global _weakref
if _weakref is None:
_weakref = __import__('weakref')
ref = _weakref.ref(g, _make_cleanup_id(gid))
self.__raw_ref = ref
python类_active()的实例源码
def test_foreign_thread(self):
# Check that a "foreign" thread can use the threading module.
def f(mutex):
# Calling current_thread() forces an entry for the foreign
# thread to get made in the threading._active map.
threading.current_thread()
mutex.release()
mutex = threading.Lock()
mutex.acquire()
tid = _thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
self.assertIn(tid, threading._active)
self.assertIsInstance(threading._active[tid], threading._DummyThread)
del threading._active[tid]
# PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
# exposed at the Python level. This test relies on ctypes to get at it.
def test_foreign_thread(self):
# Check that a "foreign" thread can use the threading module.
def f(mutex):
# Calling current_thread() forces an entry for the foreign
# thread to get made in the threading._active map.
threading.current_thread()
mutex.release()
mutex = threading.Lock()
mutex.acquire()
tid = thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
self.assertIn(tid, threading._active)
self.assertIsInstance(threading._active[tid], threading._DummyThread)
del threading._active[tid]
# PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
# exposed at the Python level. This test relies on ctypes to get at it.
def test_foreign_thread(self):
# Check that a "foreign" thread can use the threading module.
def f(mutex):
# Calling current_thread() forces an entry for the foreign
# thread to get made in the threading._active map.
threading.current_thread()
mutex.release()
mutex = threading.Lock()
mutex.acquire()
tid = thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
self.assertIn(tid, threading._active)
self.assertIsInstance(threading._active[tid], threading._DummyThread)
del threading._active[tid]
# PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
# exposed at the Python level. This test relies on ctypes to get at it.
def test_foreign_thread(self):
# Check that a "foreign" thread can use the threading module.
def f(mutex):
# Calling current_thread() forces an entry for the foreign
# thread to get made in the threading._active map.
threading.current_thread()
mutex.release()
mutex = threading.Lock()
mutex.acquire()
tid = _thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
self.assertIn(tid, threading._active)
self.assertIsInstance(threading._active[tid], threading._DummyThread)
del threading._active[tid]
# PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
# exposed at the Python level. This test relies on ctypes to get at it.
def test_foreign_thread(self):
# Check that a "foreign" thread can use the threading module.
def f(mutex):
# Calling current_thread() forces an entry for the foreign
# thread to get made in the threading._active map.
threading.current_thread()
mutex.release()
mutex = threading.Lock()
mutex.acquire()
tid = thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
self.assertIn(tid, threading._active)
self.assertIsInstance(threading._active[tid], threading._DummyThread)
del threading._active[tid]
# PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
# exposed at the Python level. This test relies on ctypes to get at it.
def test_foreign_thread(self):
# Check that a "foreign" thread can use the threading module.
def f(mutex):
# Calling current_thread() forces an entry for the foreign
# thread to get made in the threading._active map.
threading.current_thread()
mutex.release()
mutex = threading.Lock()
mutex.acquire()
tid = _thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
self.assertIn(tid, threading._active)
self.assertIsInstance(threading._active[tid], threading._DummyThread)
del threading._active[tid]
# PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
# exposed at the Python level. This test relies on ctypes to get at it.
def test_foreign_thread(self):
# Check that a "foreign" thread can use the threading module.
def f(mutex):
# Calling current_thread() forces an entry for the foreign
# thread to get made in the threading._active map.
threading.current_thread()
mutex.release()
mutex = threading.Lock()
mutex.acquire()
tid = thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
self.assertIn(tid, threading._active)
self.assertIsInstance(threading._active[tid], threading._DummyThread)
del threading._active[tid]
# PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
# exposed at the Python level. This test relies on ctypes to get at it.
def __bootstrap(self):
try:
self._set_ident()
self._Thread__started.set()
threading._active_limbo_lock.acquire()
threading._active[self._Thread__ident] = self
del threading._limbo[self]
threading._active_limbo_lock.release()
if threading._trace_hook:
sys.settrace(threading._trace_hook)
if threading._profile_hook:
sys.setprofile(threading._profile_hook)
try:
self.run()
finally:
self._Thread__exc_clear()
finally:
with threading._active_limbo_lock:
self._Thread__stop()
try:
del threading._active[threading._get_ident()]
except:
pass
def __init__(self):
#_DummyThread_.__init__(self) # pylint:disable=super-init-not-called
# It'd be nice to use a pattern like "greenlet-%d", but maybe somebody out
# there is checking thread names...
self._name = self._Thread__name = __threading__._newname("DummyThread-%d")
self._set_ident()
g = getcurrent()
gid = _get_ident(g) # same as id(g)
__threading__._active[gid] = self
rawlink = getattr(g, 'rawlink', None)
if rawlink is not None:
# raw greenlet.greenlet greenlets don't
# have rawlink...
rawlink(_cleanup)
else:
# ... so for them we use weakrefs.
# See https://github.com/gevent/gevent/issues/918
global _weakref
if _weakref is None:
_weakref = __import__('weakref')
ref = _weakref.ref(g, _make_cleanup_id(gid))
self.__raw_ref = ref
def test_foreign_thread(self):
# Check that a "foreign" thread can use the threading module.
def f(mutex):
# Calling current_thread() forces an entry for the foreign
# thread to get made in the threading._active map.
threading.current_thread()
mutex.release()
mutex = threading.Lock()
mutex.acquire()
tid = _thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
self.assertIn(tid, threading._active)
self.assertIsInstance(threading._active[tid], threading._DummyThread)
del threading._active[tid]
# PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
# exposed at the Python level. This test relies on ctypes to get at it.
def test_orig_thread(self):
new_mod = """import eventlet
eventlet.monkey_patch()
from eventlet import patcher
import threading
_threading = patcher.original('threading')
def test():
print(repr(threading.currentThread()))
t = _threading.Thread(target=test)
t.start()
t.join()
print(len(threading._active))
print(len(_threading._active))
"""
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 4, "\n".join(lines))
assert lines[0].startswith('<Thread'), lines[0]
assert lines[1] == '1', lines
assert lines[2] == '1', lines
def test_threading(self):
new_mod = """import eventlet
eventlet.monkey_patch()
import threading
def test():
print(repr(threading.currentThread()))
t = threading.Thread(target=test)
t.start()
t.join()
print(len(threading._active))
"""
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 3, "\n".join(lines))
assert lines[0].startswith('<_MainThread'), lines[0]
self.assertEqual(lines[1], "1", lines[1])
def test_greenlet(self):
new_mod = """import eventlet
eventlet.monkey_patch()
from eventlet import event
import threading
evt = event.Event()
def test():
print(repr(threading.currentThread()))
evt.send()
eventlet.spawn_n(test)
evt.wait()
print(len(threading._active))
"""
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 3, "\n".join(lines))
assert lines[0].startswith('<_MainThread'), lines[0]
self.assertEqual(lines[1], "1", lines[1])
def id2thread_name(thread_id):
return threading.Thread.getName(threading._active[thread_id])
def _cleanup(g):
__threading__._active.pop(id(g), None)
def __init__(self):
#_DummyThread_.__init__(self)
# It'd be nice to use a pattern like "greenlet-%d", but maybe somebody out
# there is checking thread names...
self._name = self._Thread__name = __threading__._newname("DummyThread-%d")
self._set_ident()
__threading__._active[_get_ident()] = self
g = getcurrent()
rawlink = getattr(g, 'rawlink', None)
if rawlink is not None:
rawlink(_cleanup)
def _cleanup(g):
__threading__._active.pop(id(g), None)
def _make_cleanup_id(gid):
def _(_r):
__threading__._active.pop(gid, None)
return _
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
self.assertFalse(threading.currentThread().ident is None)
def f():
ident.append(threading.currentThread().ident)
done.set()
done = threading.Event()
ident = []
_thread.start_new_thread(f, ())
done.wait()
self.assertFalse(ident[0] is None)
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
# run with a small(ish) thread stack size (256kB)
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
self.assertIsNotNone(threading.currentThread().ident)
def f():
ident.append(threading.currentThread().ident)
done.set()
done = threading.Event()
ident = []
thread.start_new_thread(f, ())
done.wait()
self.assertIsNotNone(ident[0])
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
# run with a small(ish) thread stack size (256kB)
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
self.assertIsNotNone(threading.currentThread().ident)
def f():
ident.append(threading.currentThread().ident)
done.set()
done = threading.Event()
ident = []
thread.start_new_thread(f, ())
done.wait()
self.assertIsNotNone(ident[0])
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
# run with a small(ish) thread stack size (256kB)
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
self.assertFalse(threading.currentThread().ident is None)
def f():
ident.append(threading.currentThread().ident)
done.set()
done = threading.Event()
ident = []
_thread.start_new_thread(f, ())
done.wait()
self.assertFalse(ident[0] is None)
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
# run with a small(ish) thread stack size (256kB)
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
self.assertFalse(threading.currentThread().ident is None)
def f():
ident.append(threading.currentThread().ident)
done.set()
done = threading.Event()
ident = []
thread.start_new_thread(f, ())
done.wait()
self.assertFalse(ident[0] is None)
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
# run with a small(ish) thread stack size (256kB)
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
self.assertFalse(threading.currentThread().ident is None)
def f():
ident.append(threading.currentThread().ident)
done.set()
done = threading.Event()
ident = []
_thread.start_new_thread(f, ())
done.wait()
self.assertFalse(ident[0] is None)
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
# run with a small(ish) thread stack size (256kB)
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
self.assertFalse(threading.currentThread().ident is None)
def f():
ident.append(threading.currentThread().ident)
done.set()
done = threading.Event()
ident = []
thread.start_new_thread(f, ())
done.wait()
self.assertFalse(ident[0] is None)
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
# run with a small(ish) thread stack size (256kB)
def _cleanup(g):
__threading__._active.pop(id(g), None)
def _make_cleanup_id(gid):
def _(_r):
__threading__._active.pop(gid, None)
return _
def __repr__(self):
owner, count = self._get_data()
try:
owner = threading._active[owner].name
except KeyError:
pass
if owner:
return "<{}, owned by <{}>x{} for {}>".format(self._name, owner, count, self._lease_timer.elapsed)
else:
return "<{}, unowned>".format(self._name)
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
self.assertFalse(threading.currentThread().ident is None)
def f():
ident.append(threading.currentThread().ident)
done.set()
done = threading.Event()
ident = []
_thread.start_new_thread(f, ())
done.wait()
self.assertFalse(ident[0] is None)
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
# run with a small(ish) thread stack size (256kB)