python类connection()的实例源码

_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_multiple_bind(self):
        for family in self.connection.families:
            l = self.connection.Listener(family=family)
            self.addCleanup(l.close)
            self.assertRaises(OSError, self.connection.Listener,
                              l.address, family)
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_context(self):
        with self.connection.Listener() as l:
            with self.connection.Client(l.address) as c:
                with l.accept() as d:
                    c.send(1729)
                    self.assertEqual(d.recv(), 1729)

        if self.TYPE == 'processes':
            self.assertRaises(OSError, l.accept)
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _test(cls, address):
        conn = cls.connection.Client(address)
        conn.send('hello')
        conn.close()
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_issue14725(self):
        l = self.connection.Listener()
        p = self.Process(target=self._test, args=(l.address,))
        p.daemon = True
        p.start()
        time.sleep(1)
        # On Windows the client process should by now have connected,
        # written data and closed the pipe handle by now.  This causes
        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
        # 14725.
        conn = l.accept()
        self.assertEqual(conn.recv(), 'hello')
        conn.close()
        p.join()
        l.close()
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_issue16955(self):
        for fam in self.connection.families:
            l = self.connection.Listener(family=fam)
            c = self.connection.Client(l.address)
            a = l.accept()
            a.send_bytes(b"hello")
            self.assertTrue(c.poll(1))
            a.close()
            c.close()
            l.close()
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _remote(cls, conn):
        for (address, msg) in iter(conn.recv, None):
            client = cls.connection.Client(address)
            client.send(msg.upper())
            client.close()

        address, msg = conn.recv()
        client = socket.socket()
        client.connect(address)
        client.sendall(msg.upper())
        client.close()

        conn.close()
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_invalid_handles(self):
        conn = multiprocessing.connection.Connection(44977608)
        # check that poll() doesn't crash
        try:
            conn.poll()
        except (ValueError, OSError):
            pass
        finally:
            # Hack private attribute _handle to avoid printing an error
            # in conn.__del__
            conn._handle = None
        self.assertRaises((ValueError, OSError),
                          multiprocessing.connection.Connection, -1)
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_deliver_challenge_auth_failure(self):
        class _FakeConnection(object):
            def recv_bytes(self, size):
                return b'something bogus'
            def send_bytes(self, data):
                pass
        self.assertRaises(multiprocessing.AuthenticationError,
                          multiprocessing.connection.deliver_challenge,
                          _FakeConnection(), b'abc')
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_wait(self, slow=False):
        from multiprocessing.connection import wait
        readers = []
        procs = []
        messages = []

        for i in range(4):
            r, w = multiprocessing.Pipe(duplex=False)
            p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
            p.daemon = True
            p.start()
            w.close()
            readers.append(r)
            procs.append(p)
            self.addCleanup(p.join)

        while readers:
            for r in wait(readers):
                try:
                    msg = r.recv()
                except EOFError:
                    readers.remove(r)
                    r.close()
                else:
                    messages.append(msg)

        messages.sort()
        expected = sorted((i, p.pid) for i in range(10) for p in procs)
        self.assertEqual(messages, expected)
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_wait_socket(self, slow=False):
        from multiprocessing.connection import wait
        l = socket.socket()
        l.bind((test.support.HOST, 0))
        l.listen(4)
        addr = l.getsockname()
        readers = []
        procs = []
        dic = {}

        for i in range(4):
            p = multiprocessing.Process(target=self._child_test_wait_socket,
                                        args=(addr, slow))
            p.daemon = True
            p.start()
            procs.append(p)
            self.addCleanup(p.join)

        for i in range(4):
            r, _ = l.accept()
            readers.append(r)
            dic[r] = []
        l.close()

        while readers:
            for r in wait(readers):
                msg = r.recv(32)
                if not msg:
                    readers.remove(r)
                    r.close()
                else:
                    dic[r].append(msg)

        expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
        for v in dic.values():
            self.assertEqual(b''.join(v), expected)
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_neg_timeout(self):
        from multiprocessing.connection import wait
        a, b = multiprocessing.Pipe()
        t = time.time()
        res = wait([a], timeout=-1)
        t = time.time() - t
        self.assertEqual(res, [])
        self.assertLess(t, 1)
        a.close()
        b.close()

#
# Issue 14151: Test invalid family on invalid environment
#
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_invalid_family(self):
        with self.assertRaises(ValueError):
            multiprocessing.connection.Listener(r'\\.\test')
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_invalid_family_win32(self):
        with self.assertRaises(ValueError):
            multiprocessing.connection.Listener('/var/test.pipe')

#
# Issue 12098: check sys.flags of child matches that for parent
#
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _test_timeout(cls, child, address):
        time.sleep(1)
        child.send(123)
        child.close()
        conn = multiprocessing.connection.Client(address)
        conn.send(456)
        conn.close()
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _test_ignore_listener(cls, conn):
        def handler(signum, frame):
            pass
        signal.signal(signal.SIGUSR1, handler)
        with multiprocessing.connection.Listener() as l:
            conn.send(l.address)
            a = l.accept()
            a.send('welcome')
gateway.py 文件源码 项目:tfc 作者: maqp 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def server_establish_socket(self) -> None:
        """Establish IPC server."""
        listener       = multiprocessing.connection.Listener(('localhost', RXM_LISTEN_SOCKET))
        self.interface = listener.accept()
gateway.py 文件源码 项目:tfc 作者: maqp 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def client_establish_socket(self) -> None:
        """Establish IPC client."""
        try:
            phase("Waiting for connection to NH", offset=11)
            while True:
                try:
                    socket_number  = TXM_DD_LISTEN_SOCKET if self.settings.data_diode_sockets else NH_LISTEN_SOCKET
                    self.interface = multiprocessing.connection.Client(('localhost', socket_number))
                    phase("Established", done=True)
                    break
                except socket.error:
                    time.sleep(0.1)

        except KeyboardInterrupt:
            graceful_exit()
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_wait_integer(self):
        from multiprocessing.connection import wait

        expected = 3
        sorted_ = lambda l: sorted(l, key=lambda x: id(x))
        sem = multiprocessing.Semaphore(0)
        a, b = multiprocessing.Pipe()
        p = multiprocessing.Process(target=self.signal_and_sleep,
                                    args=(sem, expected))

        p.start()
        self.assertIsInstance(p.sentinel, int)
        self.assertTrue(sem.acquire(timeout=20))

        start = time.time()
        res = wait([a, p.sentinel, b], expected + 20)
        delta = time.time() - start

        self.assertEqual(res, [p.sentinel])
        self.assertLess(delta, expected + 2)
        self.assertGreater(delta, expected - 2)

        a.send(None)

        start = time.time()
        res = wait([a, p.sentinel, b], 20)
        delta = time.time() - start

        self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
        self.assertLess(delta, 0.4)

        b.send(None)

        start = time.time()
        res = wait([a, p.sentinel, b], 20)
        delta = time.time() - start

        self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
        self.assertLess(delta, 0.4)

        p.terminate()
        p.join()


问题


面经


文章

微信
公众号

扫码关注公众号