def test_rapid_restart(self):
authkey = os.urandom(32)
manager = QueueManager(
address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
srvr = manager.get_server()
addr = srvr.address
# Close the connection.Listener socket which gets opened as a part
# of manager.get_server(). It's not needed for the test.
srvr.listener.close()
manager.start()
p = self.Process(target=self._putter, args=(manager.address, authkey))
p.daemon = True
p.start()
queue = manager.get_queue()
self.assertEqual(queue.get(), 'hello world')
del queue
manager.shutdown()
manager = QueueManager(
address=addr, authkey=authkey, serializer=SERIALIZER)
try:
manager.start()
except OSError as e:
if e.errno != errno.EADDRINUSE:
raise
# Retry after some time, in case the old socket was lingering
# (sporadic failure on buildbots)
time.sleep(1.0)
manager = QueueManager(
address=addr, authkey=authkey, serializer=SERIALIZER)
manager.shutdown()
#
#
#
python类close()的实例源码
def _echo(cls, conn):
for msg in iter(conn.recv_bytes, SENTINEL):
conn.send_bytes(msg)
conn.close()
def test_spawn_close(self):
# We test that a pipe connection can be closed by parent
# process immediately after child is spawned. On Windows this
# would have sometimes failed on old versions because
# child_conn would be closed before the child got a chance to
# duplicate it.
conn, child_conn = self.Pipe()
p = self.Process(target=self._echo, args=(child_conn,))
p.daemon = True
p.start()
child_conn.close() # this might complete before child initializes
msg = latin('hello')
conn.send_bytes(msg)
self.assertEqual(conn.recv_bytes(), msg)
conn.send_bytes(SENTINEL)
conn.close()
p.join()
def test_large_fd_transfer(self):
# With fd > 256 (issue #11657)
if self.TYPE != 'processes':
self.skipTest("only makes sense with processes")
conn, child_conn = self.Pipe(duplex=True)
p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
p.daemon = True
p.start()
self.addCleanup(test.support.unlink, test.support.TESTFN)
with open(test.support.TESTFN, "wb") as f:
fd = f.fileno()
for newfd in range(256, MAXFD):
if not self._is_fd_assigned(newfd):
break
else:
self.fail("could not find an unassigned large file descriptor")
os.dup2(fd, newfd)
try:
reduction.send_handle(conn, newfd, p.pid)
finally:
os.close(newfd)
p.join()
with open(test.support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"bar")
def test_multiple_bind(self):
for family in self.connection.families:
l = self.connection.Listener(family=family)
self.addCleanup(l.close)
self.assertRaises(OSError, self.connection.Listener,
l.address, family)
def _test(cls, address):
conn = cls.connection.Client(address)
conn.send('hello')
conn.close()
def test_listener_client(self):
for family in self.connection.families:
l = self.connection.Listener(family=family)
p = self.Process(target=self._test, args=(l.address,))
p.daemon = True
p.start()
conn = l.accept()
self.assertEqual(conn.recv(), 'hello')
p.join()
l.close()
def test_issue14725(self):
l = self.connection.Listener()
p = self.Process(target=self._test, args=(l.address,))
p.daemon = True
p.start()
time.sleep(1)
# On Windows the client process should by now have connected,
# written data and closed the pipe handle by now. This causes
# ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
# 14725.
conn = l.accept()
self.assertEqual(conn.recv(), 'hello')
conn.close()
p.join()
l.close()
def _child_strings(cls, conn, strings):
for s in strings:
time.sleep(0.1)
conn.send_bytes(s)
conn.close()
def test_boundaries(self):
r, w = self.Pipe(False)
p = self.Process(target=self._child_boundaries, args=(r,))
p.start()
time.sleep(2)
L = [b"first", b"second"]
for obj in L:
w.send_bytes(obj)
w.close()
p.join()
self.assertIn(r.recv_bytes(), L)
def _remote(cls, conn):
for (address, msg) in iter(conn.recv, None):
client = cls.connection.Client(address)
client.send(msg.upper())
client.close()
address, msg = conn.recv()
client = socket.socket()
client.connect(address)
client.sendall(msg.upper())
client.close()
conn.close()
def test_access(self):
# On Windows, if we do not specify a destination pid when
# using DupHandle then we need to be careful to use the
# correct access flags for DuplicateHandle(), or else
# DupHandle.detach() will raise PermissionError. For example,
# for a read only pipe handle we should use
# access=FILE_GENERIC_READ. (Unfortunately
# DUPLICATE_SAME_ACCESS does not work.)
conn, child_conn = self.Pipe()
p = self.Process(target=self.child_access, args=(child_conn,))
p.daemon = True
p.start()
child_conn.close()
r, w = self.Pipe(duplex=False)
conn.send(w)
w.close()
self.assertEqual(r.recv(), 'all is well')
r.close()
r, w = self.Pipe(duplex=False)
conn.send(r)
r.close()
w.send('foobar')
w.close()
self.assertEqual(conn.recv(), 'foobar'*2)
#
#
#
def _test_finalize(cls, conn):
class Foo(object):
pass
a = Foo()
util.Finalize(a, conn.send, args=('a',))
del a # triggers callback for a
b = Foo()
close_b = util.Finalize(b, conn.send, args=('b',))
close_b() # triggers callback for b
close_b() # does nothing because callback has already been called
del b # does nothing because callback has already been called
c = Foo()
util.Finalize(c, conn.send, args=('c',))
d10 = Foo()
util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
d01 = Foo()
util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
d02 = Foo()
util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
d03 = Foo()
util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
# call multiprocessing's cleanup function then exit process without
# garbage collecting locals
util._exit_function()
conn.close()
os._exit(0)
def test_pool_initializer(self):
self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
p = multiprocessing.Pool(1, initializer, (self.ns,))
p.close()
p.join()
self.assertEqual(self.ns.test, 1)
#
# Issue 5155, 5313, 5331: Test process in processes
# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
#
def test_wait_socket(self, slow=False):
from multiprocessing.connection import wait
l = socket.socket()
l.bind((test.support.HOST, 0))
l.listen(4)
addr = l.getsockname()
readers = []
procs = []
dic = {}
for i in range(4):
p = multiprocessing.Process(target=self._child_test_wait_socket,
args=(addr, slow))
p.daemon = True
p.start()
procs.append(p)
self.addCleanup(p.join)
for i in range(4):
r, _ = l.accept()
readers.append(r)
dic[r] = []
l.close()
while readers:
for r in wait(readers):
msg = r.recv(32)
if not msg:
readers.remove(r)
r.close()
else:
dic[r].append(msg)
expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
for v in dic.values():
self.assertEqual(b''.join(v), expected)
def test_neg_timeout(self):
from multiprocessing.connection import wait
a, b = multiprocessing.Pipe()
t = time.time()
res = wait([a], timeout=-1)
t = time.time() - t
self.assertEqual(res, [])
self.assertLess(t, 1)
a.close()
b.close()
#
# Issue 14151: Test invalid family on invalid environment
#
def run_in_child(cls):
import json
r, w = multiprocessing.Pipe(duplex=False)
p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
p.start()
grandchild_flags = r.recv()
p.join()
r.close()
w.close()
flags = (tuple(sys.flags), grandchild_flags)
print(json.dumps(flags))
def _test_timeout(cls, child, address):
time.sleep(1)
child.send(123)
child.close()
conn = multiprocessing.connection.Client(address)
conn.send(456)
conn.close()
def child(cls, n, conn):
if n > 1:
p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
p.start()
p.join()
else:
conn.send(len(util._afterfork_registry))
conn.close()
def test_ignore_listener(self):
conn, child_conn = multiprocessing.Pipe()
try:
p = multiprocessing.Process(target=self._test_ignore_listener,
args=(child_conn,))
p.daemon = True
p.start()
child_conn.close()
address = conn.recv()
time.sleep(0.1)
os.kill(p.pid, signal.SIGUSR1)
time.sleep(0.1)
client = multiprocessing.connection.Client(address)
self.assertEqual(client.recv(), 'welcome')
p.join()
finally:
conn.close()
#
#
#