python类support()的实例源码

_test_multiprocessing.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_traceback(self):
        # We want ensure that the traceback from the child process is
        # contained in the traceback raised in the main process.
        if self.TYPE == 'processes':
            with self.Pool(1) as p:
                try:
                    p.apply(self._test_traceback)
                except Exception as e:
                    exc = e
                else:
                    raise AssertionError('expected RuntimeError')
            self.assertIs(type(exc), RuntimeError)
            self.assertEqual(exc.args, (123,))
            cause = exc.__cause__
            self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
            self.assertIn('raise RuntimeError(123) # some comment', cause.tb)

            with test.support.captured_stderr() as f1:
                try:
                    raise exc
                except RuntimeError:
                    sys.excepthook(*sys.exc_info())
            self.assertIn('raise RuntimeError(123) # some comment',
                          f1.getvalue())
_test_multiprocessing.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_fd_transfer(self):
        if self.TYPE != 'processes':
            self.skipTest("only makes sense with processes")
        conn, child_conn = self.Pipe(duplex=True)

        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
        p.daemon = True
        p.start()
        self.addCleanup(test.support.unlink, test.support.TESTFN)
        with open(test.support.TESTFN, "wb") as f:
            fd = f.fileno()
            if msvcrt:
                fd = msvcrt.get_osfhandle(fd)
            reduction.send_handle(conn, fd, p.pid)
        p.join()
        with open(test.support.TESTFN, "rb") as f:
            self.assertEqual(f.read(), b"foo")
_test_multiprocessing.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_large_fd_transfer(self):
        # With fd > 256 (issue #11657)
        if self.TYPE != 'processes':
            self.skipTest("only makes sense with processes")
        conn, child_conn = self.Pipe(duplex=True)

        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
        p.daemon = True
        p.start()
        self.addCleanup(test.support.unlink, test.support.TESTFN)
        with open(test.support.TESTFN, "wb") as f:
            fd = f.fileno()
            for newfd in range(256, MAXFD):
                if not self._is_fd_assigned(newfd):
                    break
            else:
                self.fail("could not find an unassigned large file descriptor")
            os.dup2(fd, newfd)
            try:
                reduction.send_handle(conn, newfd, p.pid)
            finally:
                os.close(newfd)
        p.join()
        with open(test.support.TESTFN, "rb") as f:
            self.assertEqual(f.read(), b"bar")
_test_multiprocessing.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _listener(cls, conn, families):
        for fam in families:
            l = cls.connection.Listener(family=fam)
            conn.send(l.address)
            new_conn = l.accept()
            conn.send(new_conn)
            new_conn.close()
            l.close()

        l = socket.socket()
        l.bind((test.support.HOST, 0))
        l.listen(1)
        conn.send(l.getsockname())
        new_conn, addr = l.accept()
        conn.send(new_conn)
        new_conn.close()
        l.close()

        conn.recv()
test_threading.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_threads_join(self):
        # Non-daemon threads should be joined at subinterpreter shutdown
        # (issue #18808)
        r, w = os.pipe()
        self.addCleanup(os.close, r)
        self.addCleanup(os.close, w)
        code = r"""if 1:
            import os
            import threading
            import time

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_EndInterpreter is called.
                time.sleep(0.05)
                os.write(%d, b"x")
            threading.Thread(target=f).start()
            """ % (w,)
        ret = test.support.run_in_subinterp(code)
        self.assertEqual(ret, 0)
        # The thread was joined properly.
        self.assertEqual(os.read(r, 1), b"x")
test_sys.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def tearDown(self):
        sys.stdout = self.orig_stdout
        sys.stderr = self.orig_stderr
        sys.displayhook = self.orig_displayhook
        test.support.reap_children()
test_sys.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_excepthook(self):
        with test.support.captured_output("stderr") as stderr:
            sys.excepthook(1, '1', 1)
        self.assertTrue("TypeError: print_exception(): Exception expected for " \
                         "value, str found" in stderr.getvalue())

    # FIXME: testing the code for a lost or replaced excepthook in
    # Python/pythonrun.c::PyErr_PrintEx() is tricky.
test_sys.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_getwindowsversion(self):
        # Raise SkipTest if sys doesn't have getwindowsversion attribute
        test.support.get_attribute(sys, "getwindowsversion")
        v = sys.getwindowsversion()
        self.assertEqual(len(v), 5)
        self.assertIsInstance(v[0], int)
        self.assertIsInstance(v[1], int)
        self.assertIsInstance(v[2], int)
        self.assertIsInstance(v[3], int)
        self.assertIsInstance(v[4], str)
        self.assertRaises(IndexError, operator.getitem, v, 5)
        self.assertIsInstance(v.major, int)
        self.assertIsInstance(v.minor, int)
        self.assertIsInstance(v.build, int)
        self.assertIsInstance(v.platform, int)
        self.assertIsInstance(v.service_pack, str)
        self.assertIsInstance(v.service_pack_minor, int)
        self.assertIsInstance(v.service_pack_major, int)
        self.assertIsInstance(v.suite_mask, int)
        self.assertIsInstance(v.product_type, int)
        self.assertEqual(v[0], v.major)
        self.assertEqual(v[1], v.minor)
        self.assertEqual(v[2], v.build)
        self.assertEqual(v[3], v.platform)
        self.assertEqual(v[4], v.service_pack)

        # This is how platform.py calls it. Make sure tuple
        #  still has 5 elements
        maj, min, buildno, plat, csd = sys.getwindowsversion()
test_sys.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def tearDown(self):
        self.file.close()
        test.support.unlink(test.support.TESTFN)
test_dbm.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def delete_files():
    # we don't know the precise name the underlying database uses
    # so we use glob to locate all names
    for f in glob.glob(_fname + "*"):
        test.support.unlink(f)
test_dbm.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUp(self):
        delete_files()
        self.filename = test.support.TESTFN
        self.d = dbm.open(self.filename, 'c')
        self.d.close()
test_dbm.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_main():
    classes = [WhichDBTestCase]
    for mod in dbm_iterator():
        classes.append(type("TestCase-" + mod.__name__, (AnyDBMTestCase,),
                            {'module': mod}))
    test.support.run_unittest(*classes)
test_threading.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setUp(self):
        self._threads = test.support.threading_setup()
test_threading.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def tearDown(self):
        test.support.threading_cleanup(*self._threads)
        test.support.reap_children()
test_threading.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_various_ops_small_stack(self):
        if verbose:
            print('with 256kB thread stack size...')
        try:
            threading.stack_size(262144)
        except _thread.error:
            raise unittest.SkipTest(
                'platform does not support changing thread stack size')
        self.test_various_ops()
        threading.stack_size(0)

    # run with a large thread stack size (1MB)
test_threading.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_various_ops_large_stack(self):
        if verbose:
            print('with 1MB thread stack size...')
        try:
            threading.stack_size(0x100000)
        except _thread.error:
            raise unittest.SkipTest(
                'platform does not support changing thread stack size')
        self.test_various_ops()
        threading.stack_size(0)
test_threading.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_main():
    test.support.run_unittest(LockTests, PyRLockTests, CRLockTests, EventTests,
                              ConditionAsRLockTests, ConditionTests,
                              SemaphoreTests, BoundedSemaphoreTests,
                              ThreadTests,
                              ThreadJoinOnShutdown,
                              ThreadingExceptionTests,
                              BarrierTests
                              )
test_memoryview.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_writable_readonly(self):
        # Issue #10451: memoryview incorrectly exposes a readonly
        # buffer as writable causing a segfault if using mmap
        tp = self.ro_type
        if tp is None:
            return
        b = tp(self._source)
        m = self._view(b)
        i = io.BytesIO(b'ZZZZ')
        self.assertRaises(TypeError, i.readinto, m)

# Variations on source objects for the buffer: bytes-like objects, then arrays
# with itemsize > 1.
# NOTE: support for multi-dimensional objects is unimplemented.
test_memoryview.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_main():
    test.support.run_unittest(__name__)
test_multiprocessing.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_sys_exit(self):
        # See Issue 13854
        if self.TYPE == 'threads':
            return

        testfn = test.support.TESTFN
        self.addCleanup(test.support.unlink, testfn)

        for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
            p.daemon = True
            p.start()
            p.join(5)
            self.assertEqual(p.exitcode, code)

            with open(testfn, 'r') as f:
                self.assertEqual(f.read().rstrip(), str(reason))

        for reason in (True, False, 8):
            p = self.Process(target=sys.exit, args=(reason,))
            p.daemon = True
            p.start()
            p.join(5)
            self.assertEqual(p.exitcode, reason)

#
#
#


问题


面经


文章

微信
公众号

扫码关注公众号