def tokenize(self, language_tag, query):
id = self._next_id
self._next_id += 1
req = dict(req=id, utterance=query, languageTag=language_tag)
outer = Future()
self._requests[id] = outer
def then(future):
if future.exception():
outer.set_exception(future.exception())
del self._requests[id]
future = self._socket.write(json.dumps(req).encode())
future.add_done_callback(then)
return outer
python类Future()的实例源码
def motor_coroutine(f):
"""A coroutine that accepts an optional callback.
Given a callback, the function returns None, and the callback is run
with (result, error). Without a callback the function returns a Future.
"""
coro = gen.coroutine(f)
@functools.wraps(f)
def wrapper(*args, **kwargs):
callback = kwargs.pop('callback', None)
if callback and not callable(callback):
raise callback_type_error
future = coro(*args, **kwargs)
if callback:
def _callback(future):
try:
result = future.result()
callback(result, None)
except Exception, e:
callback(None, e)
future.add_done_callback(_callback)
else:
return future
return wrapper
def __init__(self, prop, original_class):
"""Like Async, but before it executes the callback or resolves the
Future, checks if result is a PyMongo class and wraps it in a Motor
class. E.g., Motor's map_reduce should pass a MotorCollection instead
of a PyMongo Collection to the Future. Uses the wrap() method on the
owner object to do the actual wrapping. E.g.,
Database.create_collection returns a Collection, so MotorDatabase has:
create_collection = AsyncCommand().wrap(Collection)
Once Database.create_collection is done, Motor calls
MotorDatabase.wrap() on its result, transforming the result from
Collection to MotorCollection, which is passed to the callback or
Future.
:Parameters:
- `prop`: An Async, the async method to call before wrapping its result
in a Motor class.
- `original_class`: A PyMongo class to be wrapped.
"""
super(WrapAsync, self).__init__(prop)
self.original_class = original_class
def find(self, *args, **kwargs):
"""Create a :class:`MotorCursor`. Same parameters as for
PyMongo's :meth:`~pymongo.collection.Collection.find`.
Note that ``find`` does not take a `callback` parameter, nor does
it return a Future, because ``find`` merely creates a
:class:`MotorCursor` without performing any operations on the server.
``MotorCursor`` methods such as :meth:`~MotorCursor.to_list` or
:meth:`~MotorCursor.count` perform actual operations.
"""
if 'callback' in kwargs:
raise pymongo.errors.InvalidOperation(
"Pass a callback to each, to_list, or count, not to find.")
cursor = self.delegate.find(*args, **kwargs)
return MotorCursor(cursor, self)
def wait(self, timeout=None):
"""Wait for `.notify`.
Returns a `.Future` that resolves ``True`` if the condition is notified,
or ``False`` after a timeout.
"""
waiter = Future()
self._waiters.append(waiter)
if timeout:
def on_timeout():
waiter.set_result(False)
self._garbage_collect()
io_loop = ioloop.IOLoop.current()
timeout_handle = io_loop.add_timeout(timeout, on_timeout)
waiter.add_done_callback(
lambda _: io_loop.remove_timeout(timeout_handle))
return waiter
def write(self, chunk, callback=None):
"""Implements `.HTTPConnection.write`.
For backwards compatibility is is allowed but deprecated to
skip `write_headers` and instead call `write()` with a
pre-encoded header block.
"""
future = None
if self.stream.closed():
future = self._write_future = Future()
self._write_future.set_exception(iostream.StreamClosedError())
self._write_future.exception()
else:
if callback is not None:
self._write_callback = stack_context.wrap(callback)
else:
future = self._write_future = Future()
self._pending_write = self.stream.write(self._format_chunk(chunk))
self._pending_write.add_done_callback(self._on_write_complete)
return future
def test_replace_context_exception(self):
# Test exception handling: exceptions thrown into the stack context
# can be caught and replaced.
# Note that this test and the following are for behavior that is
# not really supported any more: coroutines no longer create a
# stack context automatically; but one is created after the first
# YieldPoint (i.e. not a Future).
@gen.coroutine
def f2():
(yield gen.Callback(1))()
yield gen.Wait(1)
self.io_loop.add_callback(lambda: 1 / 0)
try:
yield gen.Task(self.io_loop.add_timeout,
self.io_loop.time() + 10)
except ZeroDivisionError:
raise KeyError()
future = f2()
with self.assertRaises(KeyError):
yield future
self.finished = True
def test_moment(self):
calls = []
@gen.coroutine
def f(name, yieldable):
for i in range(5):
calls.append(name)
yield yieldable
# First, confirm the behavior without moment: each coroutine
# monopolizes the event loop until it finishes.
immediate = Future()
immediate.set_result(None)
yield [f('a', immediate), f('b', immediate)]
self.assertEqual(''.join(calls), 'aaaaabbbbb')
# With moment, they take turns.
calls = []
yield [f('a', gen.moment), f('b', gen.moment)]
self.assertEqual(''.join(calls), 'ababababab')
self.finished = True
calls = []
yield [f('a', gen.moment), f('b', immediate)]
self.assertEqual(''.join(calls), 'abbbbbaaaa')
def test_iterator(self):
futures = [Future(), Future(), Future(), Future()]
self.finish_coroutines(0, futures)
g = gen.WaitIterator(*futures)
i = 0
while not g.done():
try:
r = yield g.next()
except ZeroDivisionError:
self.assertIs(g.current_future, futures[0],
'exception future invalid')
else:
if i == 0:
self.assertEqual(r, 24, 'iterator value incorrect')
self.assertEqual(g.current_index, 2, 'wrong index')
elif i == 2:
self.assertEqual(r, 42, 'iterator value incorrect')
self.assertEqual(g.current_index, 1, 'wrong index')
elif i == 3:
self.assertEqual(r, 84, 'iterator value incorrect')
self.assertEqual(g.current_index, 3, 'wrong index')
i += 1
def test_streaming_body(self):
self.prepared = Future()
self.data = Future()
self.finished = Future()
stream = self.connect(b"/stream_body", connection_close=True)
yield self.prepared
stream.write(b"4\r\nasdf\r\n")
# Ensure the first chunk is received before we send the second.
data = yield self.data
self.assertEqual(data, b"asdf")
self.data = Future()
stream.write(b"4\r\nqwer\r\n")
data = yield self.data
self.assertEquals(data, b"qwer")
stream.write(b"0\r\n")
yield self.finished
data = yield gen.Task(stream.read_until_close)
# This would ideally use an HTTP1Connection to read the response.
self.assertTrue(data.endswith(b"{}"))
stream.close()
def test_future_close_callback(self):
# Regression test for interaction between the Future read interfaces
# and IOStream._maybe_add_error_listener.
server, client = self.make_iostream_pair()
closed = [False]
def close_callback():
closed[0] = True
self.stop()
server.set_close_callback(close_callback)
try:
client.write(b'a')
future = server.read_bytes(1)
self.io_loop.add_future(future, self.stop)
self.assertEqual(self.wait().result(), b'a')
self.assertFalse(closed[0])
client.close()
self.wait()
self.assertTrue(closed[0])
finally:
server.close()
client.close()
def test_wait_for_handshake_callback(self):
test = self
handshake_future = Future()
class TestServer(TCPServer):
def handle_stream(self, stream, address):
# The handshake has not yet completed.
test.assertIsNone(stream.socket.cipher())
self.stream = stream
stream.wait_for_handshake(self.handshake_done)
def handshake_done(self):
# Now the handshake is done and ssl information is available.
test.assertIsNotNone(self.stream.socket.cipher())
handshake_future.set_result(None)
yield self.connect_to_server(TestServer)
yield handshake_future
def test_wait_for_handshake_future(self):
test = self
handshake_future = Future()
class TestServer(TCPServer):
def handle_stream(self, stream, address):
test.assertIsNone(stream.socket.cipher())
test.io_loop.spawn_callback(self.handle_connection, stream)
@gen.coroutine
def handle_connection(self, stream):
yield stream.wait_for_handshake()
handshake_future.set_result(None)
yield self.connect_to_server(TestServer)
yield handshake_future
def test_wait_for_handshake_already_connected(self):
handshake_future = Future()
class TestServer(TCPServer):
def handle_stream(self, stream, address):
self.stream = stream
stream.wait_for_handshake(self.handshake_done)
def handshake_done(self):
self.stream.wait_for_handshake(self.handshake2_done)
def handshake2_done(self):
handshake_future.set_result(None)
yield self.connect_to_server(TestServer)
yield handshake_future
def maybe_future(x):
"""Converts ``x`` into a `.Future`.
If ``x`` is already a `.Future`, it is simply returned; otherwise
it is wrapped in a new `.Future`. This is suitable for use as
``result = yield gen.maybe_future(f())`` when you don't know whether
``f()`` returns a `.Future` or not.
.. deprecated:: 4.3
This function only handles ``Futures``, not other yieldable objects.
Instead of `maybe_future`, check for the non-future result types
you expect (often just ``None``), and ``yield`` anything unknown.
"""
if is_future(x):
return x
else:
fut = Future()
fut.set_result(x)
return fut
def sleep(duration):
"""Return a `.Future` that resolves after the given number of seconds.
When used with ``yield`` in a coroutine, this is a non-blocking
analogue to `time.sleep` (which should not be used in coroutines
because it is blocking)::
yield gen.sleep(0.5)
Note that calling this function on its own does nothing; you must
wait on the `.Future` it returns (usually by yielding it).
.. versionadded:: 4.1
"""
f = Future()
IOLoop.current().call_later(duration, lambda: f.set_result(None))
return f
def prepare(self):
"""Called at the beginning of a request before `get`/`post`/etc.
Override this method to perform common initialization regardless
of the request method.
Asynchronous support: Decorate this method with `.gen.coroutine`
or `.return_future` to make it asynchronous (the
`asynchronous` decorator cannot be used on `prepare`).
If this method returns a `.Future` execution will not proceed
until the `.Future` is done.
.. versionadded:: 3.1
Asynchronous support.
"""
pass
def __init__(self, prop, original_class):
"""Like Async, but before it executes the callback or resolves the
Future, checks if result is a PyMongo class and wraps it in a Motor
class. E.g., Motor's map_reduce should pass a MotorCollection instead
of a PyMongo Collection to the Future. Uses the wrap() method on the
owner object to do the actual wrapping. E.g.,
Database.create_collection returns a Collection, so MotorDatabase has:
create_collection = AsyncCommand().wrap(Collection)
Once Database.create_collection is done, Motor calls
MotorDatabase.wrap() on its result, transforming the result from
Collection to MotorCollection, which is passed to the callback or
Future.
:Parameters:
- `prop`: An Async, the async method to call before wrapping its result
in a Motor class.
- `original_class`: A PyMongo class to be wrapped.
"""
super(WrapAsync, self).__init__(prop)
self.original_class = original_class
def find(self, *args, **kwargs):
"""Create a :class:`MotorCursor`. Same parameters as for
PyMongo's :meth:`~pymongo.collection.Collection.find`.
Note that ``find`` does not take a `callback` parameter, nor does
it return a Future, because ``find`` merely creates a
:class:`MotorCursor` without performing any operations on the server.
``MotorCursor`` methods such as :meth:`~MotorCursor.to_list` or
:meth:`~MotorCursor.count` perform actual operations.
"""
if 'callback' in kwargs:
raise pymongo.errors.InvalidOperation(
"Pass a callback to each, to_list, or count, not to find.")
cursor = self.delegate.find(*args, **kwargs)
return MotorCursor(cursor, self)
def wait(self, timeout=None):
"""Wait for `.notify`.
Returns a `.Future` that resolves ``True`` if the condition is notified,
or ``False`` after a timeout.
"""
waiter = Future()
self._waiters.append(waiter)
if timeout:
def on_timeout():
waiter.set_result(False)
self._garbage_collect()
io_loop = ioloop.IOLoop.current()
timeout_handle = io_loop.add_timeout(timeout, on_timeout)
waiter.add_done_callback(
lambda _: io_loop.remove_timeout(timeout_handle))
return waiter
def acquire(self, timeout=None):
"""Decrement the counter. Returns a Future.
Block if the counter is zero and wait for a `.release`. The Future
raises `.TimeoutError` after the deadline.
"""
waiter = Future()
if self._value > 0:
self._value -= 1
waiter.set_result(_ReleasingContextManager(self))
else:
self._waiters.append(waiter)
if timeout:
def on_timeout():
waiter.set_exception(gen.TimeoutError())
self._garbage_collect()
io_loop = ioloop.IOLoop.current()
timeout_handle = io_loop.add_timeout(timeout, on_timeout)
waiter.add_done_callback(
lambda _: io_loop.remove_timeout(timeout_handle))
return waiter
def write(self, chunk, callback=None):
"""Implements `.HTTPConnection.write`.
For backwards compatibility is is allowed but deprecated to
skip `write_headers` and instead call `write()` with a
pre-encoded header block.
"""
future = None
if self.stream.closed():
future = self._write_future = Future()
self._write_future.set_exception(iostream.StreamClosedError())
self._write_future.exception()
else:
if callback is not None:
self._write_callback = stack_context.wrap(callback)
else:
future = self._write_future = Future()
self._pending_write = self.stream.write(self._format_chunk(chunk))
self._pending_write.add_done_callback(self._on_write_complete)
return future
def test_replace_context_exception(self):
# Test exception handling: exceptions thrown into the stack context
# can be caught and replaced.
# Note that this test and the following are for behavior that is
# not really supported any more: coroutines no longer create a
# stack context automatically; but one is created after the first
# YieldPoint (i.e. not a Future).
@gen.coroutine
def f2():
(yield gen.Callback(1))()
yield gen.Wait(1)
self.io_loop.add_callback(lambda: 1 / 0)
try:
yield gen.Task(self.io_loop.add_timeout,
self.io_loop.time() + 10)
except ZeroDivisionError:
raise KeyError()
future = f2()
with self.assertRaises(KeyError):
yield future
self.finished = True
def test_moment(self):
calls = []
@gen.coroutine
def f(name, yieldable):
for i in range(5):
calls.append(name)
yield yieldable
# First, confirm the behavior without moment: each coroutine
# monopolizes the event loop until it finishes.
immediate = Future()
immediate.set_result(None)
yield [f('a', immediate), f('b', immediate)]
self.assertEqual(''.join(calls), 'aaaaabbbbb')
# With moment, they take turns.
calls = []
yield [f('a', gen.moment), f('b', gen.moment)]
self.assertEqual(''.join(calls), 'ababababab')
self.finished = True
calls = []
yield [f('a', gen.moment), f('b', immediate)]
self.assertEqual(''.join(calls), 'abbbbbaaaa')
def test_iterator(self):
futures = [Future(), Future(), Future(), Future()]
self.finish_coroutines(0, futures)
g = gen.WaitIterator(*futures)
i = 0
while not g.done():
try:
r = yield g.next()
except ZeroDivisionError:
self.assertIs(g.current_future, futures[0],
'exception future invalid')
else:
if i == 0:
self.assertEqual(r, 24, 'iterator value incorrect')
self.assertEqual(g.current_index, 2, 'wrong index')
elif i == 2:
self.assertEqual(r, 42, 'iterator value incorrect')
self.assertEqual(g.current_index, 1, 'wrong index')
elif i == 3:
self.assertEqual(r, 84, 'iterator value incorrect')
self.assertEqual(g.current_index, 3, 'wrong index')
i += 1
def test_streaming_body(self):
self.prepared = Future()
self.data = Future()
self.finished = Future()
stream = self.connect(b"/stream_body", connection_close=True)
yield self.prepared
stream.write(b"4\r\nasdf\r\n")
# Ensure the first chunk is received before we send the second.
data = yield self.data
self.assertEqual(data, b"asdf")
self.data = Future()
stream.write(b"4\r\nqwer\r\n")
data = yield self.data
self.assertEquals(data, b"qwer")
stream.write(b"0\r\n")
yield self.finished
data = yield gen.Task(stream.read_until_close)
# This would ideally use an HTTP1Connection to read the response.
self.assertTrue(data.endswith(b"{}"))
stream.close()
def test_future_close_callback(self):
# Regression test for interaction between the Future read interfaces
# and IOStream._maybe_add_error_listener.
server, client = self.make_iostream_pair()
closed = [False]
def close_callback():
closed[0] = True
self.stop()
server.set_close_callback(close_callback)
try:
client.write(b'a')
future = server.read_bytes(1)
self.io_loop.add_future(future, self.stop)
self.assertEqual(self.wait().result(), b'a')
self.assertFalse(closed[0])
client.close()
self.wait()
self.assertTrue(closed[0])
finally:
server.close()
client.close()
def setUp(self):
try:
super(TestIOStreamStartTLS, self).setUp()
self.listener, self.port = bind_unused_port()
self.server_stream = None
self.server_accepted = Future()
netutil.add_accept_handler(self.listener, self.accept)
self.client_stream = IOStream(socket.socket())
self.io_loop.add_future(self.client_stream.connect(
('127.0.0.1', self.port)), self.stop)
self.wait()
self.io_loop.add_future(self.server_accepted, self.stop)
self.wait()
except Exception as e:
print(e)
raise
def test_wait_for_handshake_future(self):
test = self
handshake_future = Future()
class TestServer(TCPServer):
def handle_stream(self, stream, address):
test.assertIsNone(stream.socket.cipher())
test.io_loop.spawn_callback(self.handle_connection, stream)
@gen.coroutine
def handle_connection(self, stream):
yield stream.wait_for_handshake()
handshake_future.set_result(None)
yield self.connect_to_server(TestServer)
yield handshake_future
def test_wait_for_handshake_already_connected(self):
handshake_future = Future()
class TestServer(TCPServer):
def handle_stream(self, stream, address):
self.stream = stream
stream.wait_for_handshake(self.handshake_done)
def handshake_done(self):
self.stream.wait_for_handshake(self.handshake2_done)
def handshake2_done(self):
handshake_future.set_result(None)
yield self.connect_to_server(TestServer)
yield handshake_future