python类with_timeout()的实例源码

locks.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def wait(self, timeout=None):
        """Block until the internal flag is true.

        Returns a Future, which raises `tornado.gen.TimeoutError` after a
        timeout.
        """
        if timeout is None:
            return self._future
        else:
            return gen.with_timeout(timeout, self._future)
gen_test.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_timeout(self):
        with self.assertRaises(gen.TimeoutError):
            yield gen.with_timeout(datetime.timedelta(seconds=0.1),
                                   Future())
gen_test.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_completes_before_timeout(self):
        future = Future()
        self.io_loop.add_timeout(datetime.timedelta(seconds=0.1),
                                 lambda: future.set_result('asdf'))
        result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
                                        future, io_loop=self.io_loop)
        self.assertEqual(result, 'asdf')
gen_test.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_fails_before_timeout(self):
        future = Future()
        self.io_loop.add_timeout(
            datetime.timedelta(seconds=0.1),
            lambda: future.set_exception(ZeroDivisionError()))
        with self.assertRaises(ZeroDivisionError):
            yield gen.with_timeout(datetime.timedelta(seconds=3600),
                                   future, io_loop=self.io_loop)
gen_test.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_already_resolved(self):
        future = Future()
        future.set_result('asdf')
        result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
                                        future, io_loop=self.io_loop)
        self.assertEqual(result, 'asdf')
gen_test.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_completed_concurrent_future(self):
        with futures.ThreadPoolExecutor(1) as executor:
            yield gen.with_timeout(datetime.timedelta(seconds=3600),
                                   executor.submit(lambda: None))
gen_test.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_no_ref(self):
        # In this usage, there is no direct hard reference to the
        # WaitIterator itself, only the Future it returns. Since
        # WaitIterator uses weak references internally to improve GC
        # performance, this used to cause problems.
        yield gen.with_timeout(datetime.timedelta(seconds=0.1),
                               gen.WaitIterator(gen.sleep(0)).next())
locks.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def wait(self, timeout=None):
        """Block until the internal flag is true.

        Returns a Future, which raises `tornado.gen.TimeoutError` after a
        timeout.
        """
        if timeout is None:
            return self._future
        else:
            return gen.with_timeout(timeout, self._future)
gen_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_timeout(self):
        with self.assertRaises(gen.TimeoutError):
            yield gen.with_timeout(datetime.timedelta(seconds=0.1),
                                   Future())
gen_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_completes_before_timeout(self):
        future = Future()
        self.io_loop.add_timeout(datetime.timedelta(seconds=0.1),
                                 lambda: future.set_result('asdf'))
        result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
                                        future, io_loop=self.io_loop)
        self.assertEqual(result, 'asdf')
gen_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_fails_before_timeout(self):
        future = Future()
        self.io_loop.add_timeout(
            datetime.timedelta(seconds=0.1),
            lambda: future.set_exception(ZeroDivisionError()))
        with self.assertRaises(ZeroDivisionError):
            yield gen.with_timeout(datetime.timedelta(seconds=3600),
                                   future, io_loop=self.io_loop)
gen_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_already_resolved(self):
        future = Future()
        future.set_result('asdf')
        result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
                                        future, io_loop=self.io_loop)
        self.assertEqual(result, 'asdf')
gen_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_completed_concurrent_future(self):
        with futures.ThreadPoolExecutor(1) as executor:
            yield gen.with_timeout(datetime.timedelta(seconds=3600),
                                   executor.submit(lambda: None))
gen_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_no_ref(self):
        # In this usage, there is no direct hard reference to the
        # WaitIterator itself, only the Future it returns. Since
        # WaitIterator uses weak references internally to improve GC
        # performance, this used to cause problems.
        yield gen.with_timeout(datetime.timedelta(seconds=0.1),
                               gen.WaitIterator(gen.sleep(0)).next())
locks.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 67 收藏 0 点赞 0 评论 0
def wait(self, timeout=None):
        """Block until the internal flag is true.

        Returns a Future, which raises `tornado.gen.TimeoutError` after a
        timeout.
        """
        if timeout is None:
            return self._future
        else:
            return gen.with_timeout(timeout, self._future)
gen_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_timeout(self):
        with self.assertRaises(gen.TimeoutError):
            yield gen.with_timeout(datetime.timedelta(seconds=0.1),
                                   Future())
gen_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_completes_before_timeout(self):
        future = Future()
        self.io_loop.add_timeout(datetime.timedelta(seconds=0.1),
                                 lambda: future.set_result('asdf'))
        result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
                                        future, io_loop=self.io_loop)
        self.assertEqual(result, 'asdf')
gen_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_already_resolved(self):
        future = Future()
        future.set_result('asdf')
        result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
                                        future, io_loop=self.io_loop)
        self.assertEqual(result, 'asdf')
gen_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_timeout_concurrent_future(self):
        with futures.ThreadPoolExecutor(1) as executor:
            with self.assertRaises(gen.TimeoutError):
                yield gen.with_timeout(self.io_loop.time(),
                                       executor.submit(time.sleep, 0.1))
gen_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_completed_concurrent_future(self):
        with futures.ThreadPoolExecutor(1) as executor:
            yield gen.with_timeout(datetime.timedelta(seconds=3600),
                                   executor.submit(lambda: None))


问题


面经


文章

微信
公众号

扫码关注公众号