python类TimeoutError()的实例源码

testing_test.py 文件源码 项目:projects-2017-2 作者: ncss 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_no_timeout_environment_variable(self):
        @gen_test(timeout=0.01)
        def test_short_timeout(self):
            time = self.io_loop.time
            yield gen.Task(self.io_loop.add_timeout, time() + 1)

        # Uses environment-variable timeout of 0.1, times out.
        with set_environ('ASYNC_TEST_TIMEOUT', '0.1'):
            with self.assertRaises(ioloop.TimeoutError):
                test_short_timeout(self)

        self.finished = True
testing_test.py 文件源码 项目:projects-2017-2 作者: ncss 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_native_coroutine_timeout(self):
        # Set a short timeout and exceed it.
        namespace = exec_test(globals(), locals(), """
        @gen_test(timeout=0.1)
        async def test(self):
            await gen.sleep(1)
        """)

        try:
            namespace['test'](self)
            self.fail("did not get expected exception")
        except ioloop.TimeoutError:
            self.finished = True
ioloop_test.py 文件源码 项目:projects-2017-2 作者: ncss 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_timeout(self):
        @gen.coroutine
        def f():
            yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)
        self.assertRaises(TimeoutError, self.io_loop.run_sync, f, timeout=0.01)
testing_test.py 文件源码 项目:aweasome_learning 作者: Knight-ZXW 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_no_timeout_environment_variable(self):
        @gen_test(timeout=0.01)
        def test_short_timeout(self):
            time = self.io_loop.time
            yield gen.Task(self.io_loop.add_timeout, time() + 1)

        # Uses environment-variable timeout of 0.1, times out.
        with set_environ('ASYNC_TEST_TIMEOUT', '0.1'):
            with self.assertRaises(ioloop.TimeoutError):
                test_short_timeout(self)

        self.finished = True
testing_test.py 文件源码 项目:aweasome_learning 作者: Knight-ZXW 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_native_coroutine_timeout(self):
        # Set a short timeout and exceed it.
        namespace = exec_test(globals(), locals(), """
        @gen_test(timeout=0.1)
        async def test(self):
            await gen.sleep(1)
        """)

        try:
            namespace['test'](self)
            self.fail("did not get expected exception")
        except ioloop.TimeoutError:
            self.finished = True
ioloop_test.py 文件源码 项目:aweasome_learning 作者: Knight-ZXW 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_timeout(self):
        @gen.coroutine
        def f():
            yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)
        self.assertRaises(TimeoutError, self.io_loop.run_sync, f, timeout=0.01)
testing_test.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_no_timeout_environment_variable(self):
        @gen_test(timeout=0.01)
        def test_short_timeout(self):
            time = self.io_loop.time
            yield gen.Task(self.io_loop.add_timeout, time() + 1)

        # Uses environment-variable timeout of 0.1, times out.
        with set_environ('ASYNC_TEST_TIMEOUT', '0.1'):
            with self.assertRaises(ioloop.TimeoutError):
                test_short_timeout(self)

        self.finished = True
testing_test.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_native_coroutine_timeout(self):
        # Set a short timeout and exceed it.
        namespace = exec_test(globals(), locals(), """
        @gen_test(timeout=0.1)
        async def test(self):
            await gen.sleep(1)
        """)

        try:
            namespace['test'](self)
            self.fail("did not get expected exception")
        except ioloop.TimeoutError:
            self.finished = True
ioloop_test.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_timeout(self):
        @gen.coroutine
        def f():
            yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)
        self.assertRaises(TimeoutError, self.io_loop.run_sync, f, timeout=0.01)
testing_test.py 文件源码 项目:browser_vuln_check 作者: lcatro 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_no_timeout_environment_variable(self):
        @gen_test(timeout=0.01)
        def test_short_timeout(self):
            time = self.io_loop.time
            yield gen.Task(self.io_loop.add_timeout, time() + 1)

        # Uses environment-variable timeout of 0.1, times out.
        with set_environ('ASYNC_TEST_TIMEOUT', '0.1'):
            with self.assertRaises(ioloop.TimeoutError):
                test_short_timeout(self)

        self.finished = True
testing_test.py 文件源码 项目:browser_vuln_check 作者: lcatro 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def test_native_coroutine_timeout(self):
        # Set a short timeout and exceed it.
        namespace = exec_test(globals(), locals(), """
        @gen_test(timeout=0.1)
        async def test(self):
            await gen.sleep(1)
        """)

        try:
            namespace['test'](self)
            self.fail("did not get expected exception")
        except ioloop.TimeoutError:
            self.finished = True
ioloop_test.py 文件源码 项目:browser_vuln_check 作者: lcatro 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_timeout(self):
        @gen.coroutine
        def f():
            yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)
        self.assertRaises(TimeoutError, self.io_loop.run_sync, f, timeout=0.01)
testing_test.py 文件源码 项目:PyQYT 作者: collinsctk 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_no_timeout_environment_variable(self):
        @gen_test(timeout=0.01)
        def test_short_timeout(self):
            time = self.io_loop.time
            yield gen.Task(self.io_loop.add_timeout, time() + 1)

        # Uses environment-variable timeout of 0.1, times out.
        with set_environ('ASYNC_TEST_TIMEOUT', '0.1'):
            with self.assertRaises(ioloop.TimeoutError):
                test_short_timeout(self)

        self.finished = True
testing_test.py 文件源码 项目:PyQYT 作者: collinsctk 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_native_coroutine_timeout(self):
        # Set a short timeout and exceed it.
        namespace = exec_test(globals(), locals(), """
        @gen_test(timeout=0.1)
        async def test(self):
            await gen.sleep(1)
        """)

        try:
            namespace['test'](self)
            self.fail("did not get expected exception")
        except ioloop.TimeoutError:
            self.finished = True
ioloop_test.py 文件源码 项目:PyQYT 作者: collinsctk 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_timeout(self):
        @gen.coroutine
        def f():
            yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)
        self.assertRaises(TimeoutError, self.io_loop.run_sync, f, timeout=0.01)
plugins.py 文件源码 项目:inmanta 作者: inmanta 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run_sync(self, function: typing.Callable, timeout: int=5):
        """
            Execute the async function and return its result. This method takes care of starting and stopping the ioloop. The
            main use for this function is to use the inmanta internal rpc to communicate with the server.

            :param function: The async function to execute. This function should return a yieldable object.
            :param timeout: A timeout for the async function.
            :return: The result of the async call.
            :raises ConnectionRefusedError: When the function timeouts this exception is raised.
        """
        from tornado.ioloop import IOLoop, TimeoutError
        try:
            return IOLoop.current().run_sync(function, timeout)
        except TimeoutError:
            raise ConnectionRefusedError()
testing_test.py 文件源码 项目:ProgrameFacil 作者: Gpzim98 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_no_timeout_environment_variable(self):
        @gen_test(timeout=0.01)
        def test_short_timeout(self):
            time = self.io_loop.time
            yield gen.Task(self.io_loop.add_timeout, time() + 1)

        # Uses environment-variable timeout of 0.1, times out.
        with set_environ('ASYNC_TEST_TIMEOUT', '0.1'):
            with self.assertRaises(ioloop.TimeoutError):
                test_short_timeout(self)

        self.finished = True
testing_test.py 文件源码 项目:ProgrameFacil 作者: Gpzim98 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_native_coroutine_timeout(self):
        # Set a short timeout and exceed it.
        namespace = exec_test(globals(), locals(), """
        @gen_test(timeout=0.1)
        async def test(self):
            await gen.sleep(1)
        """)

        try:
            namespace['test'](self)
            self.fail("did not get expected exception")
        except ioloop.TimeoutError:
            self.finished = True
ioloop_test.py 文件源码 项目:ProgrameFacil 作者: Gpzim98 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_timeout(self):
        @gen.coroutine
        def f():
            yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)
        self.assertRaises(TimeoutError, self.io_loop.run_sync, f, timeout=0.01)
testing_test.py 文件源码 项目:ProgrameFacil 作者: Gpzim98 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_native_coroutine_timeout(self):
        # Set a short timeout and exceed it.
        namespace = exec_test(globals(), locals(), """
        @gen_test(timeout=0.1)
        async def test(self):
            await gen.sleep(1)
        """)

        try:
            namespace['test'](self)
            self.fail("did not get expected exception")
        except ioloop.TimeoutError:
            self.finished = True


问题


面经


文章

微信
公众号

扫码关注公众号