def test_multiple_bind(self):
for family in self.connection.families:
l = self.connection.Listener(family=family)
self.addCleanup(l.close)
self.assertRaises(OSError, self.connection.Listener,
l.address, family)
python类connection()的实例源码
def test_context(self):
with self.connection.Listener() as l:
with self.connection.Client(l.address) as c:
with l.accept() as d:
c.send(1729)
self.assertEqual(d.recv(), 1729)
if self.TYPE == 'processes':
self.assertRaises(OSError, l.accept)
def _test(cls, address):
conn = cls.connection.Client(address)
conn.send('hello')
conn.close()
def test_issue14725(self):
l = self.connection.Listener()
p = self.Process(target=self._test, args=(l.address,))
p.daemon = True
p.start()
time.sleep(1)
# On Windows the client process should by now have connected,
# written data and closed the pipe handle by now. This causes
# ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
# 14725.
conn = l.accept()
self.assertEqual(conn.recv(), 'hello')
conn.close()
p.join()
l.close()
def test_issue16955(self):
for fam in self.connection.families:
l = self.connection.Listener(family=fam)
c = self.connection.Client(l.address)
a = l.accept()
a.send_bytes(b"hello")
self.assertTrue(c.poll(1))
a.close()
c.close()
l.close()
def _remote(cls, conn):
for (address, msg) in iter(conn.recv, None):
client = cls.connection.Client(address)
client.send(msg.upper())
client.close()
address, msg = conn.recv()
client = socket.socket()
client.connect(address)
client.sendall(msg.upper())
client.close()
conn.close()
def test_invalid_handles(self):
conn = multiprocessing.connection.Connection(44977608)
# check that poll() doesn't crash
try:
conn.poll()
except (ValueError, OSError):
pass
finally:
# Hack private attribute _handle to avoid printing an error
# in conn.__del__
conn._handle = None
self.assertRaises((ValueError, OSError),
multiprocessing.connection.Connection, -1)
def test_deliver_challenge_auth_failure(self):
class _FakeConnection(object):
def recv_bytes(self, size):
return b'something bogus'
def send_bytes(self, data):
pass
self.assertRaises(multiprocessing.AuthenticationError,
multiprocessing.connection.deliver_challenge,
_FakeConnection(), b'abc')
def test_wait(self, slow=False):
from multiprocessing.connection import wait
readers = []
procs = []
messages = []
for i in range(4):
r, w = multiprocessing.Pipe(duplex=False)
p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
p.daemon = True
p.start()
w.close()
readers.append(r)
procs.append(p)
self.addCleanup(p.join)
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
r.close()
else:
messages.append(msg)
messages.sort()
expected = sorted((i, p.pid) for i in range(10) for p in procs)
self.assertEqual(messages, expected)
def test_wait_socket(self, slow=False):
from multiprocessing.connection import wait
l = socket.socket()
l.bind((test.support.HOST, 0))
l.listen(4)
addr = l.getsockname()
readers = []
procs = []
dic = {}
for i in range(4):
p = multiprocessing.Process(target=self._child_test_wait_socket,
args=(addr, slow))
p.daemon = True
p.start()
procs.append(p)
self.addCleanup(p.join)
for i in range(4):
r, _ = l.accept()
readers.append(r)
dic[r] = []
l.close()
while readers:
for r in wait(readers):
msg = r.recv(32)
if not msg:
readers.remove(r)
r.close()
else:
dic[r].append(msg)
expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
for v in dic.values():
self.assertEqual(b''.join(v), expected)
def test_neg_timeout(self):
from multiprocessing.connection import wait
a, b = multiprocessing.Pipe()
t = time.time()
res = wait([a], timeout=-1)
t = time.time() - t
self.assertEqual(res, [])
self.assertLess(t, 1)
a.close()
b.close()
#
# Issue 14151: Test invalid family on invalid environment
#
def test_invalid_family(self):
with self.assertRaises(ValueError):
multiprocessing.connection.Listener(r'\\.\test')
def test_invalid_family_win32(self):
with self.assertRaises(ValueError):
multiprocessing.connection.Listener('/var/test.pipe')
#
# Issue 12098: check sys.flags of child matches that for parent
#
def _test_timeout(cls, child, address):
time.sleep(1)
child.send(123)
child.close()
conn = multiprocessing.connection.Client(address)
conn.send(456)
conn.close()
def _test_ignore_listener(cls, conn):
def handler(signum, frame):
pass
signal.signal(signal.SIGUSR1, handler)
with multiprocessing.connection.Listener() as l:
conn.send(l.address)
a = l.accept()
a.send('welcome')
def server_establish_socket(self) -> None:
"""Establish IPC server."""
listener = multiprocessing.connection.Listener(('localhost', RXM_LISTEN_SOCKET))
self.interface = listener.accept()
def client_establish_socket(self) -> None:
"""Establish IPC client."""
try:
phase("Waiting for connection to NH", offset=11)
while True:
try:
socket_number = TXM_DD_LISTEN_SOCKET if self.settings.data_diode_sockets else NH_LISTEN_SOCKET
self.interface = multiprocessing.connection.Client(('localhost', socket_number))
phase("Established", done=True)
break
except socket.error:
time.sleep(0.1)
except KeyboardInterrupt:
graceful_exit()
def test_wait_integer(self):
from multiprocessing.connection import wait
expected = 3
sorted_ = lambda l: sorted(l, key=lambda x: id(x))
sem = multiprocessing.Semaphore(0)
a, b = multiprocessing.Pipe()
p = multiprocessing.Process(target=self.signal_and_sleep,
args=(sem, expected))
p.start()
self.assertIsInstance(p.sentinel, int)
self.assertTrue(sem.acquire(timeout=20))
start = time.time()
res = wait([a, p.sentinel, b], expected + 20)
delta = time.time() - start
self.assertEqual(res, [p.sentinel])
self.assertLess(delta, expected + 2)
self.assertGreater(delta, expected - 2)
a.send(None)
start = time.time()
res = wait([a, p.sentinel, b], 20)
delta = time.time() - start
self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
self.assertLess(delta, 0.4)
b.send(None)
start = time.time()
res = wait([a, p.sentinel, b], 20)
delta = time.time() - start
self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
self.assertLess(delta, 0.4)
p.terminate()
p.join()