python类engine()的实例源码

gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_reuse(self):
        @gen.engine
        def f():
            self.io_loop.add_callback((yield gen.Callback(0)))
            yield gen.Wait(0)
            self.stop()
        self.run_gen(f)
        self.run_gen(f)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_task(self):
        @gen.engine
        def f():
            yield gen.Task(self.io_loop.add_callback)
            self.stop()
        self.run_gen(f)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_wait_all(self):
        @gen.engine
        def f():
            (yield gen.Callback("k1"))("v1")
            (yield gen.Callback("k2"))("v2")
            results = yield gen.WaitAll(["k1", "k2"])
            self.assertEqual(results, ["v1", "v2"])
            self.stop()
        self.run_gen(f)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_resume_after_exception_in_yield(self):
        @gen.engine
        def f():
            try:
                yield gen.Wait("k1")
                raise Exception("did not get expected exception")
            except gen.UnknownKeyError:
                pass
            (yield gen.Callback("k2"))("v2")
            self.assertEqual((yield gen.Wait("k2")), "v2")
            self.stop()
        self.run_gen(f)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_orphaned_callback(self):
        @gen.engine
        def f():
            self.orphaned_callback = yield gen.Callback(1)
        try:
            self.run_gen(f)
            raise Exception("did not get expected exception")
        except gen.LeakedCallbackError:
            pass
        self.orphaned_callback()
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_multi(self):
        @gen.engine
        def f():
            (yield gen.Callback("k1"))("v1")
            (yield gen.Callback("k2"))("v2")
            results = yield [gen.Wait("k1"), gen.Wait("k2")]
            self.assertEqual(results, ["v1", "v2"])
            self.stop()
        self.run_gen(f)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_multi_dict(self):
        @gen.engine
        def f():
            (yield gen.Callback("k1"))("v1")
            (yield gen.Callback("k2"))("v2")
            results = yield dict(foo=gen.Wait("k1"), bar=gen.Wait("k2"))
            self.assertEqual(results, dict(foo="v1", bar="v2"))
            self.stop()
        self.run_gen(f)

    # The following tests explicitly run with both gen.Multi
    # and gen.multi_future (Task returns a Future, so it can be used
    # with either).
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_multi_yieldpoint_delayed(self):
        @gen.engine
        def f():
            # callbacks run at different times
            responses = yield gen.Multi([
                gen.Task(self.delay_callback, 3, arg="v1"),
                gen.Task(self.delay_callback, 1, arg="v2"),
            ])
            self.assertEqual(responses, ["v1", "v2"])
            self.stop()
        self.run_gen(f)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_multi_future_delayed(self):
        @gen.engine
        def f():
            # callbacks run at different times
            responses = yield gen.multi_future([
                gen.Task(self.delay_callback, 3, arg="v1"),
                gen.Task(self.delay_callback, 1, arg="v2"),
            ])
            self.assertEqual(responses, ["v1", "v2"])
            self.stop()
        self.run_gen(f)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_multi_future_dict_delayed(self):
        @gen.engine
        def f():
            # callbacks run at different times
            responses = yield gen.multi_future(dict(
                foo=gen.Task(self.delay_callback, 3, arg="v1"),
                bar=gen.Task(self.delay_callback, 1, arg="v2"),
            ))
            self.assertEqual(responses, dict(foo="v1", bar="v2"))
            self.stop()
        self.run_gen(f)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_arguments(self):
        @gen.engine
        def f():
            (yield gen.Callback("noargs"))()
            self.assertEqual((yield gen.Wait("noargs")), None)
            (yield gen.Callback("1arg"))(42)
            self.assertEqual((yield gen.Wait("1arg")), 42)

            (yield gen.Callback("kwargs"))(value=42)
            result = yield gen.Wait("kwargs")
            self.assertTrue(isinstance(result, gen.Arguments))
            self.assertEqual(((), dict(value=42)), result)
            self.assertEqual(dict(value=42), result.kwargs)

            (yield gen.Callback("2args"))(42, 43)
            result = yield gen.Wait("2args")
            self.assertTrue(isinstance(result, gen.Arguments))
            self.assertEqual(((42, 43), {}), result)
            self.assertEqual((42, 43), result.args)

            def task_func(callback):
                callback(None, error="foo")
            result = yield gen.Task(task_func)
            self.assertTrue(isinstance(result, gen.Arguments))
            self.assertEqual(((None,), dict(error="foo")), result)

            self.stop()
        self.run_gen(f)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_stack_context_leak(self):
        # regression test: repeated invocations of a gen-based
        # function should not result in accumulated stack_contexts
        def _stack_depth():
            head = stack_context._state.contexts[1]
            length = 0

            while head is not None:
                length += 1
                head = head.old_contexts[1]

            return length

        @gen.engine
        def inner(callback):
            yield gen.Task(self.io_loop.add_callback)
            callback()

        @gen.engine
        def outer():
            for i in range(10):
                yield gen.Task(inner)

            stack_increase = _stack_depth() - initial_stack_depth
            self.assertTrue(stack_increase <= 2)
            self.stop()
        initial_stack_depth = _stack_depth()
        self.run_gen(outer)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_sync_raise_return(self):
        # gen.Return is allowed in @gen.engine, but it may not be used
        # to return a value.
        @gen.engine
        def f():
            self.stop(42)
            raise gen.Return()

        result = self.run_gen(f)
        self.assertEqual(result, 42)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_async_raise_return(self):
        @gen.engine
        def f():
            yield gen.Task(self.io_loop.add_callback)
            self.stop(42)
            raise gen.Return()

        result = self.run_gen(f)
        self.assertEqual(result, 42)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_sync_raise_return_value(self):
        @gen.engine
        def f():
            raise gen.Return(42)

        with self.assertRaises(gen.ReturnValueIgnoredError):
            self.run_gen(f)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_sync_raise_return_value_tuple(self):
        @gen.engine
        def f():
            raise gen.Return((1, 2))

        with self.assertRaises(gen.ReturnValueIgnoredError):
            self.run_gen(f)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_async_raise_return_value(self):
        @gen.engine
        def f():
            yield gen.Task(self.io_loop.add_callback)
            raise gen.Return(42)

        with self.assertRaises(gen.ReturnValueIgnoredError):
            self.run_gen(f)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def test_return_value(self):
        # It is an error to apply @gen.engine to a function that returns
        # a value.
        @gen.engine
        def f():
            return 42

        with self.assertRaises(gen.ReturnValueIgnoredError):
            self.run_gen(f)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_return_value_tuple(self):
        # It is an error to apply @gen.engine to a function that returns
        # a value.
        @gen.engine
        def f():
            return (1, 2)

        with self.assertRaises(gen.ReturnValueIgnoredError):
            self.run_gen(f)
auth_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get(self):
        if self.get_argument("oauth_token", None):
            user = yield self.get_authenticated_user()
            self.finish(user)
        else:
            # Old style: with @gen.engine we can ignore the Future from
            # authorize_redirect.
            self.authorize_redirect()


问题


面经


文章

微信
公众号

扫码关注公众号