def test_threads_join(self):
# Non-daemon threads should be joined at subinterpreter shutdown
# (issue #18808)
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
code = r"""if 1:
import os
import threading
import time
def f():
# Sleep a bit so that the thread is still running when
# Py_EndInterpreter is called.
time.sleep(0.05)
os.write(%d, b"x")
threading.Thread(target=f).start()
""" % (w,)
ret = test.support.run_in_subinterp(code)
self.assertEqual(ret, 0)
# The thread was joined properly.
self.assertEqual(os.read(r, 1), b"x")
评论列表
文章目录