python类Task()的实例源码

testing_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_timeout(self):
        # Set a short timeout and exceed it.
        @gen_test(timeout=0.1)
        def test(self):
            yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)

        # This can't use assertRaises because we need to inspect the
        # exc_info triple (and not just the exception object)
        try:
            test(self)
            self.fail("did not get expected exception")
        except ioloop.TimeoutError:
            # The stack trace should blame the add_timeout line, not just
            # unrelated IOLoop/testing internals.
            self.assertIn(
                "gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)",
                traceback.format_exc())

        self.finished = True
stack_context_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_run_with_stack_context(self):
        @gen.coroutine
        def f1():
            self.assertEqual(self.active_contexts, ['c1'])
            yield run_with_stack_context(
                StackContext(functools.partial(self.context, 'c2')),
                f2)
            self.assertEqual(self.active_contexts, ['c1'])

        @gen.coroutine
        def f2():
            self.assertEqual(self.active_contexts, ['c1', 'c2'])
            yield gen.Task(self.io_loop.add_callback)
            self.assertEqual(self.active_contexts, ['c1', 'c2'])

        self.assertEqual(self.active_contexts, [])
        yield run_with_stack_context(
            StackContext(functools.partial(self.context, 'c1')),
            f1)
        self.assertEqual(self.active_contexts, [])
httpserver_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_body_size_override_reset(self):
        # The max_body_size override is reset between requests.
        stream = IOStream(socket.socket())
        try:
            yield stream.connect(('127.0.0.1', self.get_http_port()))
            # Use a raw stream so we can make sure it's all on one connection.
            stream.write(b'PUT /streaming?expected_size=10240 HTTP/1.1\r\n'
                         b'Content-Length: 10240\r\n\r\n')
            stream.write(b'a' * 10240)
            headers, response = yield gen.Task(read_stream_body, stream)
            self.assertEqual(response, b'10240')
            # Without the ?expected_size parameter, we get the old default value
            stream.write(b'PUT /streaming HTTP/1.1\r\n'
                         b'Content-Length: 10240\r\n\r\n')
            with ExpectLog(gen_log, '.*Content-Length too long'):
                data = yield stream.read_until_close()
            self.assertEqual(data, b'')
        finally:
            stream.close()
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_stack_context_leak_exception(self):
        # same as previous, but with a function that exits with an exception
        @gen.engine
        def inner(callback):
            yield gen.Task(self.io_loop.add_callback)
            1 / 0

        @gen.engine
        def outer():
            for i in range(10):
                try:
                    yield gen.Task(inner)
                except ZeroDivisionError:
                    pass
            stack_increase = len(stack_context._state.contexts) - initial_stack_depth
            self.assertTrue(stack_increase <= 2)
            self.stop()
        initial_stack_depth = len(stack_context._state.contexts)
        self.run_gen(outer)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_task_refcounting(self):
        # On CPython, tasks and their arguments should be released immediately
        # without waiting for garbage collection.
        @gen.engine
        def f():
            class Foo(object):
                pass
            arg = Foo()
            self.arg_ref = weakref.ref(arg)
            task = gen.Task(self.io_loop.add_callback, arg=arg)
            self.task_ref = weakref.ref(task)
            yield task
            self.stop()

        self.run_gen(f)
        self.assertIs(self.arg_ref(), None)
        self.assertIs(self.task_ref(), None)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_async_await_mixed_multi_native_yieldpoint(self):
        namespace = exec_test(globals(), locals(), """
        async def f1():
            await gen.Task(self.io_loop.add_callback)
            return 42
        """)

        @gen.coroutine
        def f2():
            yield gen.Task(self.io_loop.add_callback)
            raise gen.Return(43)

        f2(callback=(yield gen.Callback('cb')))
        results = yield [namespace['f1'](), gen.Wait('cb')]
        self.assertEqual(results, [42, 43])
        self.finished = True
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_swallow_context_exception(self):
        # Test exception handling: exceptions thrown into the stack context
        # can be caught and ignored.
        @gen.coroutine
        def f2():
            (yield gen.Callback(1))()
            yield gen.Wait(1)
            self.io_loop.add_callback(lambda: 1 / 0)
            try:
                yield gen.Task(self.io_loop.add_timeout,
                               self.io_loop.time() + 10)
            except ZeroDivisionError:
                raise gen.Return(42)

        result = yield f2()
        self.assertEqual(result, 42)
        self.finished = True
web_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_streaming_body(self):
        self.prepared = Future()
        self.data = Future()
        self.finished = Future()

        stream = self.connect(b"/stream_body", connection_close=True)
        yield self.prepared
        stream.write(b"4\r\nasdf\r\n")
        # Ensure the first chunk is received before we send the second.
        data = yield self.data
        self.assertEqual(data, b"asdf")
        self.data = Future()
        stream.write(b"4\r\nqwer\r\n")
        data = yield self.data
        self.assertEquals(data, b"qwer")
        stream.write(b"0\r\n")
        yield self.finished
        data = yield gen.Task(stream.read_until_close)
        # This would ideally use an HTTP1Connection to read the response.
        self.assertTrue(data.endswith(b"{}"))
        stream.close()
concurrent_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_future_traceback(self):
        @return_future
        @gen.engine
        def f(callback):
            yield gen.Task(self.io_loop.add_callback)
            try:
                1 / 0
            except ZeroDivisionError:
                self.expected_frame = traceback.extract_tb(
                    sys.exc_info()[2], limit=1)[0]
                raise
        try:
            yield f()
            self.fail("didn't get expected exception")
        except ZeroDivisionError:
            tb = traceback.extract_tb(sys.exc_info()[2])
            self.assertIn(self.expected_frame, tb)

# The following series of classes demonstrate and test various styles
# of use, with and without generators and futures.
testing_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_timeout(self):
        # Set a short timeout and exceed it.
        @gen_test(timeout=0.1)
        def test(self):
            yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)

        # This can't use assertRaises because we need to inspect the
        # exc_info triple (and not just the exception object)
        try:
            test(self)
            self.fail("did not get expected exception")
        except ioloop.TimeoutError:
            # The stack trace should blame the add_timeout line, not just
            # unrelated IOLoop/testing internals.
            self.assertIn(
                "gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)",
                traceback.format_exc())

        self.finished = True
stack_context_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_run_with_stack_context(self):
        @gen.coroutine
        def f1():
            self.assertEqual(self.active_contexts, ['c1'])
            yield run_with_stack_context(
                StackContext(functools.partial(self.context, 'c2')),
                f2)
            self.assertEqual(self.active_contexts, ['c1'])

        @gen.coroutine
        def f2():
            self.assertEqual(self.active_contexts, ['c1', 'c2'])
            yield gen.Task(self.io_loop.add_callback)
            self.assertEqual(self.active_contexts, ['c1', 'c2'])

        self.assertEqual(self.active_contexts, [])
        yield run_with_stack_context(
            StackContext(functools.partial(self.context, 'c1')),
            f1)
        self.assertEqual(self.active_contexts, [])
httpserver_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_body_size_override_reset(self):
        # The max_body_size override is reset between requests.
        stream = IOStream(socket.socket())
        try:
            yield stream.connect(('127.0.0.1', self.get_http_port()))
            # Use a raw stream so we can make sure it's all on one connection.
            stream.write(b'PUT /streaming?expected_size=10240 HTTP/1.1\r\n'
                         b'Content-Length: 10240\r\n\r\n')
            stream.write(b'a' * 10240)
            headers, response = yield gen.Task(read_stream_body, stream)
            self.assertEqual(response, b'10240')
            # Without the ?expected_size parameter, we get the old default value
            stream.write(b'PUT /streaming HTTP/1.1\r\n'
                         b'Content-Length: 10240\r\n\r\n')
            with ExpectLog(gen_log, '.*Content-Length too long'):
                data = yield stream.read_until_close()
            self.assertEqual(data, b'')
        finally:
            stream.close()
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_stack_context_leak_exception(self):
        # same as previous, but with a function that exits with an exception
        @gen.engine
        def inner(callback):
            yield gen.Task(self.io_loop.add_callback)
            1 / 0

        @gen.engine
        def outer():
            for i in range(10):
                try:
                    yield gen.Task(inner)
                except ZeroDivisionError:
                    pass
            stack_increase = len(stack_context._state.contexts) - initial_stack_depth
            self.assertTrue(stack_increase <= 2)
            self.stop()
        initial_stack_depth = len(stack_context._state.contexts)
        self.run_gen(outer)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_task_refcounting(self):
        # On CPython, tasks and their arguments should be released immediately
        # without waiting for garbage collection.
        @gen.engine
        def f():
            class Foo(object):
                pass
            arg = Foo()
            self.arg_ref = weakref.ref(arg)
            task = gen.Task(self.io_loop.add_callback, arg=arg)
            self.task_ref = weakref.ref(task)
            yield task
            self.stop()

        self.run_gen(f)
        self.assertIs(self.arg_ref(), None)
        self.assertIs(self.task_ref(), None)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_async_await_mixed_multi_native_yieldpoint(self):
        namespace = exec_test(globals(), locals(), """
        async def f1():
            await gen.Task(self.io_loop.add_callback)
            return 42
        """)

        @gen.coroutine
        def f2():
            yield gen.Task(self.io_loop.add_callback)
            raise gen.Return(43)

        f2(callback=(yield gen.Callback('cb')))
        results = yield [namespace['f1'](), gen.Wait('cb')]
        self.assertEqual(results, [42, 43])
        self.finished = True
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_swallow_context_exception(self):
        # Test exception handling: exceptions thrown into the stack context
        # can be caught and ignored.
        @gen.coroutine
        def f2():
            (yield gen.Callback(1))()
            yield gen.Wait(1)
            self.io_loop.add_callback(lambda: 1 / 0)
            try:
                yield gen.Task(self.io_loop.add_timeout,
                               self.io_loop.time() + 10)
            except ZeroDivisionError:
                raise gen.Return(42)

        result = yield f2()
        self.assertEqual(result, 42)
        self.finished = True
web_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_streaming_body(self):
        self.prepared = Future()
        self.data = Future()
        self.finished = Future()

        stream = self.connect(b"/stream_body", connection_close=True)
        yield self.prepared
        stream.write(b"4\r\nasdf\r\n")
        # Ensure the first chunk is received before we send the second.
        data = yield self.data
        self.assertEqual(data, b"asdf")
        self.data = Future()
        stream.write(b"4\r\nqwer\r\n")
        data = yield self.data
        self.assertEquals(data, b"qwer")
        stream.write(b"0\r\n")
        yield self.finished
        data = yield gen.Task(stream.read_until_close)
        # This would ideally use an HTTP1Connection to read the response.
        self.assertTrue(data.endswith(b"{}"))
        stream.close()
concurrent_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_future_traceback(self):
        @return_future
        @gen.engine
        def f(callback):
            yield gen.Task(self.io_loop.add_callback)
            try:
                1 / 0
            except ZeroDivisionError:
                self.expected_frame = traceback.extract_tb(
                    sys.exc_info()[2], limit=1)[0]
                raise
        try:
            yield f()
            self.fail("didn't get expected exception")
        except ZeroDivisionError:
            tb = traceback.extract_tb(sys.exc_info()[2])
            self.assertIn(self.expected_frame, tb)

# The following series of classes demonstrate and test various styles
# of use, with and without generators and futures.
testing_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_timeout(self):
        # Set a short timeout and exceed it.
        @gen_test(timeout=0.1)
        def test(self):
            yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)

        # This can't use assertRaises because we need to inspect the
        # exc_info triple (and not just the exception object)
        try:
            test(self)
            self.fail("did not get expected exception")
        except ioloop.TimeoutError:
            # The stack trace should blame the add_timeout line, not just
            # unrelated IOLoop/testing internals.
            self.assertIn(
                "gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)",
                traceback.format_exc())

        self.finished = True
stack_context_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_run_with_stack_context(self):
        @gen.coroutine
        def f1():
            self.assertEqual(self.active_contexts, ['c1'])
            yield run_with_stack_context(
                StackContext(functools.partial(self.context, 'c2')),
                f2)
            self.assertEqual(self.active_contexts, ['c1'])

        @gen.coroutine
        def f2():
            self.assertEqual(self.active_contexts, ['c1', 'c2'])
            yield gen.Task(self.io_loop.add_callback)
            self.assertEqual(self.active_contexts, ['c1', 'c2'])

        self.assertEqual(self.active_contexts, [])
        yield run_with_stack_context(
            StackContext(functools.partial(self.context, 'c1')),
            f1)
        self.assertEqual(self.active_contexts, [])
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_stack_context_leak_exception(self):
        # same as previous, but with a function that exits with an exception
        @gen.engine
        def inner(callback):
            yield gen.Task(self.io_loop.add_callback)
            1 / 0

        @gen.engine
        def outer():
            for i in range(10):
                try:
                    yield gen.Task(inner)
                except ZeroDivisionError:
                    pass
            stack_increase = len(stack_context._state.contexts) - initial_stack_depth
            self.assertTrue(stack_increase <= 2)
            self.stop()
        initial_stack_depth = len(stack_context._state.contexts)
        self.run_gen(outer)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_task_refcounting(self):
        # On CPython, tasks and their arguments should be released immediately
        # without waiting for garbage collection.
        @gen.engine
        def f():
            class Foo(object):
                pass
            arg = Foo()
            self.arg_ref = weakref.ref(arg)
            task = gen.Task(self.io_loop.add_callback, arg=arg)
            self.task_ref = weakref.ref(task)
            yield task
            self.stop()

        self.run_gen(f)
        self.assertIs(self.arg_ref(), None)
        self.assertIs(self.task_ref(), None)
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_async_await_mixed_multi_native_yieldpoint(self):
        namespace = exec_test(globals(), locals(), """
        async def f1():
            await gen.Task(self.io_loop.add_callback)
            return 42
        """)

        @gen.coroutine
        def f2():
            yield gen.Task(self.io_loop.add_callback)
            raise gen.Return(43)

        f2(callback=(yield gen.Callback('cb')))
        results = yield [namespace['f1'](), gen.Wait('cb')]
        self.assertEqual(results, [42, 43])
        self.finished = True
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_swallow_context_exception(self):
        # Test exception handling: exceptions thrown into the stack context
        # can be caught and ignored.
        @gen.coroutine
        def f2():
            (yield gen.Callback(1))()
            yield gen.Wait(1)
            self.io_loop.add_callback(lambda: 1 / 0)
            try:
                yield gen.Task(self.io_loop.add_timeout,
                               self.io_loop.time() + 10)
            except ZeroDivisionError:
                raise gen.Return(42)

        result = yield f2()
        self.assertEqual(result, 42)
        self.finished = True
web_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_streaming_body(self):
        self.prepared = Future()
        self.data = Future()
        self.finished = Future()

        stream = self.connect(b"/stream_body", connection_close=True)
        yield self.prepared
        stream.write(b"4\r\nasdf\r\n")
        # Ensure the first chunk is received before we send the second.
        data = yield self.data
        self.assertEqual(data, b"asdf")
        self.data = Future()
        stream.write(b"4\r\nqwer\r\n")
        data = yield self.data
        self.assertEquals(data, b"qwer")
        stream.write(b"0\r\n")
        yield self.finished
        data = yield gen.Task(stream.read_until_close)
        # This would ideally use an HTTP1Connection to read the response.
        self.assertTrue(data.endswith(b"{}"))
        stream.close()
concurrent_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_future_traceback(self):
        @return_future
        @gen.engine
        def f(callback):
            yield gen.Task(self.io_loop.add_callback)
            try:
                1 / 0
            except ZeroDivisionError:
                self.expected_frame = traceback.extract_tb(
                    sys.exc_info()[2], limit=1)[0]
                raise
        try:
            yield f()
            self.fail("didn't get expected exception")
        except ZeroDivisionError:
            tb = traceback.extract_tb(sys.exc_info()[2])
            self.assertIn(self.expected_frame, tb)

# The following series of classes demonstrate and test various styles
# of use, with and without generators and futures.
handlers_async.py 文件源码 项目:mygene.info 作者: biothings 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get(self, geneid=None):
        '''/gene/<geneid>
           geneid can be entrezgene, ensemblgene, retired entrezgene ids.
           /gene/1017
           /gene/1017?fields=symbol,name
           /gene/1017?fields=symbol,name,reporter.HG-U133_Plus_2
        '''
        if geneid:
            kwargs = self.get_query_params()
            kwargs.setdefault('scopes', 'entrezgene,ensemblgene,retired')
            kwargs.setdefault('species', 'all')
            gene = yield Task(self.esq.get_gene2, geneid, **kwargs)
            if gene:
                self.return_json(gene)
                self.ga_track(event={'category': 'v2_api',
                                     'action': 'gene_get'})
            else:
                raise HTTPError(404)
        else:
            raise HTTPError(404)
handlers_async.py 文件源码 项目:mygene.info 作者: biothings 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def post(self, geneid=None):
        '''
           This is essentially the same as post request in QueryHandler, with different defaults.

           parameters:
            ids
            fields
            species
        '''
        kwargs = self.get_query_params()
        ids = kwargs.pop('ids', None)
        if ids:
            ids = re.split('[\s\r\n+|,]+', ids)
            scopes = 'entrezgene,ensemblgene,retired'
            fields = kwargs.pop('fields', None)
            res = yield Task(self.esq.mget_gene2, ids, fields=fields, scopes=scopes, **kwargs)
        else:
            res = {'success': False, 'error': "Missing required parameters."}

        self.return_json(res)
        self.ga_track(event={'category': 'v2_api',
                             'action': 'gene_post',
                             'label': 'qsize',
                             'value': len(ids) if ids else 0})
testing_test.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_timeout(self):
        # Set a short timeout and exceed it.
        @gen_test(timeout=0.1)
        def test(self):
            yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)

        # This can't use assertRaises because we need to inspect the
        # exc_info triple (and not just the exception object)
        try:
            test(self)
            self.fail("did not get expected exception")
        except ioloop.TimeoutError:
            # The stack trace should blame the add_timeout line, not just
            # unrelated IOLoop/testing internals.
            self.assertIn(
                "gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)",
                traceback.format_exc())

        self.finished = True
stack_context_test.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_run_with_stack_context(self):
        @gen.coroutine
        def f1():
            self.assertEqual(self.active_contexts, ['c1'])
            yield run_with_stack_context(
                StackContext(functools.partial(self.context, 'c2')),
                f2)
            self.assertEqual(self.active_contexts, ['c1'])

        @gen.coroutine
        def f2():
            self.assertEqual(self.active_contexts, ['c1', 'c2'])
            yield gen.Task(self.io_loop.add_callback)
            self.assertEqual(self.active_contexts, ['c1', 'c2'])

        self.assertEqual(self.active_contexts, [])
        yield run_with_stack_context(
            StackContext(functools.partial(self.context, 'c1')),
            f1)
        self.assertEqual(self.active_contexts, [])


问题


面经


文章

微信
公众号

扫码关注公众号