def test_timeout(self):
# Set a short timeout and exceed it.
@gen_test(timeout=0.1)
def test(self):
yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)
# This can't use assertRaises because we need to inspect the
# exc_info triple (and not just the exception object)
try:
test(self)
self.fail("did not get expected exception")
except ioloop.TimeoutError:
# The stack trace should blame the add_timeout line, not just
# unrelated IOLoop/testing internals.
self.assertIn(
"gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)",
traceback.format_exc())
self.finished = True
python类Task()的实例源码
def test_run_with_stack_context(self):
@gen.coroutine
def f1():
self.assertEqual(self.active_contexts, ['c1'])
yield run_with_stack_context(
StackContext(functools.partial(self.context, 'c2')),
f2)
self.assertEqual(self.active_contexts, ['c1'])
@gen.coroutine
def f2():
self.assertEqual(self.active_contexts, ['c1', 'c2'])
yield gen.Task(self.io_loop.add_callback)
self.assertEqual(self.active_contexts, ['c1', 'c2'])
self.assertEqual(self.active_contexts, [])
yield run_with_stack_context(
StackContext(functools.partial(self.context, 'c1')),
f1)
self.assertEqual(self.active_contexts, [])
def test_body_size_override_reset(self):
# The max_body_size override is reset between requests.
stream = IOStream(socket.socket())
try:
yield stream.connect(('127.0.0.1', self.get_http_port()))
# Use a raw stream so we can make sure it's all on one connection.
stream.write(b'PUT /streaming?expected_size=10240 HTTP/1.1\r\n'
b'Content-Length: 10240\r\n\r\n')
stream.write(b'a' * 10240)
headers, response = yield gen.Task(read_stream_body, stream)
self.assertEqual(response, b'10240')
# Without the ?expected_size parameter, we get the old default value
stream.write(b'PUT /streaming HTTP/1.1\r\n'
b'Content-Length: 10240\r\n\r\n')
with ExpectLog(gen_log, '.*Content-Length too long'):
data = yield stream.read_until_close()
self.assertEqual(data, b'')
finally:
stream.close()
def test_stack_context_leak_exception(self):
# same as previous, but with a function that exits with an exception
@gen.engine
def inner(callback):
yield gen.Task(self.io_loop.add_callback)
1 / 0
@gen.engine
def outer():
for i in range(10):
try:
yield gen.Task(inner)
except ZeroDivisionError:
pass
stack_increase = len(stack_context._state.contexts) - initial_stack_depth
self.assertTrue(stack_increase <= 2)
self.stop()
initial_stack_depth = len(stack_context._state.contexts)
self.run_gen(outer)
def test_task_refcounting(self):
# On CPython, tasks and their arguments should be released immediately
# without waiting for garbage collection.
@gen.engine
def f():
class Foo(object):
pass
arg = Foo()
self.arg_ref = weakref.ref(arg)
task = gen.Task(self.io_loop.add_callback, arg=arg)
self.task_ref = weakref.ref(task)
yield task
self.stop()
self.run_gen(f)
self.assertIs(self.arg_ref(), None)
self.assertIs(self.task_ref(), None)
def test_async_await_mixed_multi_native_yieldpoint(self):
namespace = exec_test(globals(), locals(), """
async def f1():
await gen.Task(self.io_loop.add_callback)
return 42
""")
@gen.coroutine
def f2():
yield gen.Task(self.io_loop.add_callback)
raise gen.Return(43)
f2(callback=(yield gen.Callback('cb')))
results = yield [namespace['f1'](), gen.Wait('cb')]
self.assertEqual(results, [42, 43])
self.finished = True
def test_swallow_context_exception(self):
# Test exception handling: exceptions thrown into the stack context
# can be caught and ignored.
@gen.coroutine
def f2():
(yield gen.Callback(1))()
yield gen.Wait(1)
self.io_loop.add_callback(lambda: 1 / 0)
try:
yield gen.Task(self.io_loop.add_timeout,
self.io_loop.time() + 10)
except ZeroDivisionError:
raise gen.Return(42)
result = yield f2()
self.assertEqual(result, 42)
self.finished = True
def test_streaming_body(self):
self.prepared = Future()
self.data = Future()
self.finished = Future()
stream = self.connect(b"/stream_body", connection_close=True)
yield self.prepared
stream.write(b"4\r\nasdf\r\n")
# Ensure the first chunk is received before we send the second.
data = yield self.data
self.assertEqual(data, b"asdf")
self.data = Future()
stream.write(b"4\r\nqwer\r\n")
data = yield self.data
self.assertEquals(data, b"qwer")
stream.write(b"0\r\n")
yield self.finished
data = yield gen.Task(stream.read_until_close)
# This would ideally use an HTTP1Connection to read the response.
self.assertTrue(data.endswith(b"{}"))
stream.close()
def test_future_traceback(self):
@return_future
@gen.engine
def f(callback):
yield gen.Task(self.io_loop.add_callback)
try:
1 / 0
except ZeroDivisionError:
self.expected_frame = traceback.extract_tb(
sys.exc_info()[2], limit=1)[0]
raise
try:
yield f()
self.fail("didn't get expected exception")
except ZeroDivisionError:
tb = traceback.extract_tb(sys.exc_info()[2])
self.assertIn(self.expected_frame, tb)
# The following series of classes demonstrate and test various styles
# of use, with and without generators and futures.
def test_timeout(self):
# Set a short timeout and exceed it.
@gen_test(timeout=0.1)
def test(self):
yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)
# This can't use assertRaises because we need to inspect the
# exc_info triple (and not just the exception object)
try:
test(self)
self.fail("did not get expected exception")
except ioloop.TimeoutError:
# The stack trace should blame the add_timeout line, not just
# unrelated IOLoop/testing internals.
self.assertIn(
"gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)",
traceback.format_exc())
self.finished = True
def test_run_with_stack_context(self):
@gen.coroutine
def f1():
self.assertEqual(self.active_contexts, ['c1'])
yield run_with_stack_context(
StackContext(functools.partial(self.context, 'c2')),
f2)
self.assertEqual(self.active_contexts, ['c1'])
@gen.coroutine
def f2():
self.assertEqual(self.active_contexts, ['c1', 'c2'])
yield gen.Task(self.io_loop.add_callback)
self.assertEqual(self.active_contexts, ['c1', 'c2'])
self.assertEqual(self.active_contexts, [])
yield run_with_stack_context(
StackContext(functools.partial(self.context, 'c1')),
f1)
self.assertEqual(self.active_contexts, [])
def test_body_size_override_reset(self):
# The max_body_size override is reset between requests.
stream = IOStream(socket.socket())
try:
yield stream.connect(('127.0.0.1', self.get_http_port()))
# Use a raw stream so we can make sure it's all on one connection.
stream.write(b'PUT /streaming?expected_size=10240 HTTP/1.1\r\n'
b'Content-Length: 10240\r\n\r\n')
stream.write(b'a' * 10240)
headers, response = yield gen.Task(read_stream_body, stream)
self.assertEqual(response, b'10240')
# Without the ?expected_size parameter, we get the old default value
stream.write(b'PUT /streaming HTTP/1.1\r\n'
b'Content-Length: 10240\r\n\r\n')
with ExpectLog(gen_log, '.*Content-Length too long'):
data = yield stream.read_until_close()
self.assertEqual(data, b'')
finally:
stream.close()
def test_stack_context_leak_exception(self):
# same as previous, but with a function that exits with an exception
@gen.engine
def inner(callback):
yield gen.Task(self.io_loop.add_callback)
1 / 0
@gen.engine
def outer():
for i in range(10):
try:
yield gen.Task(inner)
except ZeroDivisionError:
pass
stack_increase = len(stack_context._state.contexts) - initial_stack_depth
self.assertTrue(stack_increase <= 2)
self.stop()
initial_stack_depth = len(stack_context._state.contexts)
self.run_gen(outer)
def test_task_refcounting(self):
# On CPython, tasks and their arguments should be released immediately
# without waiting for garbage collection.
@gen.engine
def f():
class Foo(object):
pass
arg = Foo()
self.arg_ref = weakref.ref(arg)
task = gen.Task(self.io_loop.add_callback, arg=arg)
self.task_ref = weakref.ref(task)
yield task
self.stop()
self.run_gen(f)
self.assertIs(self.arg_ref(), None)
self.assertIs(self.task_ref(), None)
def test_async_await_mixed_multi_native_yieldpoint(self):
namespace = exec_test(globals(), locals(), """
async def f1():
await gen.Task(self.io_loop.add_callback)
return 42
""")
@gen.coroutine
def f2():
yield gen.Task(self.io_loop.add_callback)
raise gen.Return(43)
f2(callback=(yield gen.Callback('cb')))
results = yield [namespace['f1'](), gen.Wait('cb')]
self.assertEqual(results, [42, 43])
self.finished = True
def test_swallow_context_exception(self):
# Test exception handling: exceptions thrown into the stack context
# can be caught and ignored.
@gen.coroutine
def f2():
(yield gen.Callback(1))()
yield gen.Wait(1)
self.io_loop.add_callback(lambda: 1 / 0)
try:
yield gen.Task(self.io_loop.add_timeout,
self.io_loop.time() + 10)
except ZeroDivisionError:
raise gen.Return(42)
result = yield f2()
self.assertEqual(result, 42)
self.finished = True
def test_streaming_body(self):
self.prepared = Future()
self.data = Future()
self.finished = Future()
stream = self.connect(b"/stream_body", connection_close=True)
yield self.prepared
stream.write(b"4\r\nasdf\r\n")
# Ensure the first chunk is received before we send the second.
data = yield self.data
self.assertEqual(data, b"asdf")
self.data = Future()
stream.write(b"4\r\nqwer\r\n")
data = yield self.data
self.assertEquals(data, b"qwer")
stream.write(b"0\r\n")
yield self.finished
data = yield gen.Task(stream.read_until_close)
# This would ideally use an HTTP1Connection to read the response.
self.assertTrue(data.endswith(b"{}"))
stream.close()
def test_future_traceback(self):
@return_future
@gen.engine
def f(callback):
yield gen.Task(self.io_loop.add_callback)
try:
1 / 0
except ZeroDivisionError:
self.expected_frame = traceback.extract_tb(
sys.exc_info()[2], limit=1)[0]
raise
try:
yield f()
self.fail("didn't get expected exception")
except ZeroDivisionError:
tb = traceback.extract_tb(sys.exc_info()[2])
self.assertIn(self.expected_frame, tb)
# The following series of classes demonstrate and test various styles
# of use, with and without generators and futures.
def test_timeout(self):
# Set a short timeout and exceed it.
@gen_test(timeout=0.1)
def test(self):
yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)
# This can't use assertRaises because we need to inspect the
# exc_info triple (and not just the exception object)
try:
test(self)
self.fail("did not get expected exception")
except ioloop.TimeoutError:
# The stack trace should blame the add_timeout line, not just
# unrelated IOLoop/testing internals.
self.assertIn(
"gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)",
traceback.format_exc())
self.finished = True
def test_run_with_stack_context(self):
@gen.coroutine
def f1():
self.assertEqual(self.active_contexts, ['c1'])
yield run_with_stack_context(
StackContext(functools.partial(self.context, 'c2')),
f2)
self.assertEqual(self.active_contexts, ['c1'])
@gen.coroutine
def f2():
self.assertEqual(self.active_contexts, ['c1', 'c2'])
yield gen.Task(self.io_loop.add_callback)
self.assertEqual(self.active_contexts, ['c1', 'c2'])
self.assertEqual(self.active_contexts, [])
yield run_with_stack_context(
StackContext(functools.partial(self.context, 'c1')),
f1)
self.assertEqual(self.active_contexts, [])
def test_stack_context_leak_exception(self):
# same as previous, but with a function that exits with an exception
@gen.engine
def inner(callback):
yield gen.Task(self.io_loop.add_callback)
1 / 0
@gen.engine
def outer():
for i in range(10):
try:
yield gen.Task(inner)
except ZeroDivisionError:
pass
stack_increase = len(stack_context._state.contexts) - initial_stack_depth
self.assertTrue(stack_increase <= 2)
self.stop()
initial_stack_depth = len(stack_context._state.contexts)
self.run_gen(outer)
def test_task_refcounting(self):
# On CPython, tasks and their arguments should be released immediately
# without waiting for garbage collection.
@gen.engine
def f():
class Foo(object):
pass
arg = Foo()
self.arg_ref = weakref.ref(arg)
task = gen.Task(self.io_loop.add_callback, arg=arg)
self.task_ref = weakref.ref(task)
yield task
self.stop()
self.run_gen(f)
self.assertIs(self.arg_ref(), None)
self.assertIs(self.task_ref(), None)
def test_async_await_mixed_multi_native_yieldpoint(self):
namespace = exec_test(globals(), locals(), """
async def f1():
await gen.Task(self.io_loop.add_callback)
return 42
""")
@gen.coroutine
def f2():
yield gen.Task(self.io_loop.add_callback)
raise gen.Return(43)
f2(callback=(yield gen.Callback('cb')))
results = yield [namespace['f1'](), gen.Wait('cb')]
self.assertEqual(results, [42, 43])
self.finished = True
def test_swallow_context_exception(self):
# Test exception handling: exceptions thrown into the stack context
# can be caught and ignored.
@gen.coroutine
def f2():
(yield gen.Callback(1))()
yield gen.Wait(1)
self.io_loop.add_callback(lambda: 1 / 0)
try:
yield gen.Task(self.io_loop.add_timeout,
self.io_loop.time() + 10)
except ZeroDivisionError:
raise gen.Return(42)
result = yield f2()
self.assertEqual(result, 42)
self.finished = True
def test_streaming_body(self):
self.prepared = Future()
self.data = Future()
self.finished = Future()
stream = self.connect(b"/stream_body", connection_close=True)
yield self.prepared
stream.write(b"4\r\nasdf\r\n")
# Ensure the first chunk is received before we send the second.
data = yield self.data
self.assertEqual(data, b"asdf")
self.data = Future()
stream.write(b"4\r\nqwer\r\n")
data = yield self.data
self.assertEquals(data, b"qwer")
stream.write(b"0\r\n")
yield self.finished
data = yield gen.Task(stream.read_until_close)
# This would ideally use an HTTP1Connection to read the response.
self.assertTrue(data.endswith(b"{}"))
stream.close()
def test_future_traceback(self):
@return_future
@gen.engine
def f(callback):
yield gen.Task(self.io_loop.add_callback)
try:
1 / 0
except ZeroDivisionError:
self.expected_frame = traceback.extract_tb(
sys.exc_info()[2], limit=1)[0]
raise
try:
yield f()
self.fail("didn't get expected exception")
except ZeroDivisionError:
tb = traceback.extract_tb(sys.exc_info()[2])
self.assertIn(self.expected_frame, tb)
# The following series of classes demonstrate and test various styles
# of use, with and without generators and futures.
def get(self, geneid=None):
'''/gene/<geneid>
geneid can be entrezgene, ensemblgene, retired entrezgene ids.
/gene/1017
/gene/1017?fields=symbol,name
/gene/1017?fields=symbol,name,reporter.HG-U133_Plus_2
'''
if geneid:
kwargs = self.get_query_params()
kwargs.setdefault('scopes', 'entrezgene,ensemblgene,retired')
kwargs.setdefault('species', 'all')
gene = yield Task(self.esq.get_gene2, geneid, **kwargs)
if gene:
self.return_json(gene)
self.ga_track(event={'category': 'v2_api',
'action': 'gene_get'})
else:
raise HTTPError(404)
else:
raise HTTPError(404)
def post(self, geneid=None):
'''
This is essentially the same as post request in QueryHandler, with different defaults.
parameters:
ids
fields
species
'''
kwargs = self.get_query_params()
ids = kwargs.pop('ids', None)
if ids:
ids = re.split('[\s\r\n+|,]+', ids)
scopes = 'entrezgene,ensemblgene,retired'
fields = kwargs.pop('fields', None)
res = yield Task(self.esq.mget_gene2, ids, fields=fields, scopes=scopes, **kwargs)
else:
res = {'success': False, 'error': "Missing required parameters."}
self.return_json(res)
self.ga_track(event={'category': 'v2_api',
'action': 'gene_post',
'label': 'qsize',
'value': len(ids) if ids else 0})
testing_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def test_timeout(self):
# Set a short timeout and exceed it.
@gen_test(timeout=0.1)
def test(self):
yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)
# This can't use assertRaises because we need to inspect the
# exc_info triple (and not just the exception object)
try:
test(self)
self.fail("did not get expected exception")
except ioloop.TimeoutError:
# The stack trace should blame the add_timeout line, not just
# unrelated IOLoop/testing internals.
self.assertIn(
"gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)",
traceback.format_exc())
self.finished = True
stack_context_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def test_run_with_stack_context(self):
@gen.coroutine
def f1():
self.assertEqual(self.active_contexts, ['c1'])
yield run_with_stack_context(
StackContext(functools.partial(self.context, 'c2')),
f2)
self.assertEqual(self.active_contexts, ['c1'])
@gen.coroutine
def f2():
self.assertEqual(self.active_contexts, ['c1', 'c2'])
yield gen.Task(self.io_loop.add_callback)
self.assertEqual(self.active_contexts, ['c1', 'c2'])
self.assertEqual(self.active_contexts, [])
yield run_with_stack_context(
StackContext(functools.partial(self.context, 'c1')),
f1)
self.assertEqual(self.active_contexts, [])