def test_semaphore_tracker(self):
import subprocess
cmd = '''if 1:
import multiprocessing as mp, time, os
mp.set_start_method("spawn")
lock1 = mp.Lock()
lock2 = mp.Lock()
os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
time.sleep(10)
'''
r, w = os.pipe()
p = subprocess.Popen([sys.executable,
'-c', cmd % (w, w)],
pass_fds=[w],
stderr=subprocess.PIPE)
os.close(w)
with open(r, 'rb', closefd=True) as f:
name1 = f.readline().rstrip().decode('ascii')
name2 = f.readline().rstrip().decode('ascii')
_multiprocessing.sem_unlink(name1)
p.terminate()
p.wait()
time.sleep(2.0)
with self.assertRaises(OSError) as ctx:
_multiprocessing.sem_unlink(name2)
# docs say it should be ENOENT, but OSX seems to give EINVAL
self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
err = p.stderr.read().decode('utf-8')
p.stderr.close()
expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
self.assertRegex(err, expected)
self.assertRegex(err, 'semaphore_tracker: %r: \[Errno' % name1)
#
# Mixins
#
评论列表
文章目录