def maybe_future(x):
"""Converts ``x`` into a `.Future`.
If ``x`` is already a `.Future`, it is simply returned; otherwise
it is wrapped in a new `.Future`. This is suitable for use as
``result = yield gen.maybe_future(f())`` when you don't know whether
``f()`` returns a `.Future` or not.
.. deprecated:: 4.3
This function only handles ``Futures``, not other yieldable objects.
Instead of `maybe_future`, check for the non-future result types
you expect (often just ``None``), and ``yield`` anything unknown.
"""
if is_future(x):
return x
else:
fut = Future()
fut.set_result(x)
return fut
python类Future()的实例源码
def sleep(duration):
"""Return a `.Future` that resolves after the given number of seconds.
When used with ``yield`` in a coroutine, this is a non-blocking
analogue to `time.sleep` (which should not be used in coroutines
because it is blocking)::
yield gen.sleep(0.5)
Note that calling this function on its own does nothing; you must
wait on the `.Future` it returns (usually by yielding it).
.. versionadded:: 4.1
"""
f = Future()
IOLoop.current().call_later(duration, lambda: f.set_result(None))
return f
def convert_yielded(yielded):
"""Convert a yielded object into a `.Future`.
The default implementation accepts lists, dictionaries, and Futures.
If the `~functools.singledispatch` library is available, this function
may be extended to support additional types. For example::
@convert_yielded.register(asyncio.Future)
def _(asyncio_future):
return tornado.platform.asyncio.to_tornado_future(asyncio_future)
.. versionadded:: 4.1
"""
# Lists and dicts containing YieldPoints were handled earlier.
if isinstance(yielded, (list, dict)):
return multi(yielded)
elif is_future(yielded):
return yielded
elif isawaitable(yielded):
return _wrap_awaitable(yielded)
else:
raise BadYieldError("yielded unknown object %r" % (yielded,))
def motor_coroutine(f):
"""A coroutine that accepts an optional callback.
Given a callback, the function returns None, and the callback is run
with (result, error). Without a callback the function returns a Future.
"""
coro = gen.coroutine(f)
@functools.wraps(f)
def wrapper(*args, **kwargs):
callback = kwargs.pop('callback', None)
if callback and not callable(callback):
raise callback_type_error
future = coro(*args, **kwargs)
if callback:
def _callback(future):
try:
result = future.result()
callback(result, None)
except Exception, e:
callback(None, e)
future.add_done_callback(_callback)
else:
return future
return wrapper
def __init__(self, prop, original_class):
"""Like Async, but before it executes the callback or resolves the
Future, checks if result is a PyMongo class and wraps it in a Motor
class. E.g., Motor's map_reduce should pass a MotorCollection instead
of a PyMongo Collection to the Future. Uses the wrap() method on the
owner object to do the actual wrapping. E.g.,
Database.create_collection returns a Collection, so MotorDatabase has:
create_collection = AsyncCommand().wrap(Collection)
Once Database.create_collection is done, Motor calls
MotorDatabase.wrap() on its result, transforming the result from
Collection to MotorCollection, which is passed to the callback or
Future.
:Parameters:
- `prop`: An Async, the async method to call before wrapping its result
in a Motor class.
- `original_class`: A PyMongo class to be wrapped.
"""
super(WrapAsync, self).__init__(prop)
self.original_class = original_class
def find(self, *args, **kwargs):
"""Create a :class:`MotorCursor`. Same parameters as for
PyMongo's :meth:`~pymongo.collection.Collection.find`.
Note that ``find`` does not take a `callback` parameter, nor does
it return a Future, because ``find`` merely creates a
:class:`MotorCursor` without performing any operations on the server.
``MotorCursor`` methods such as :meth:`~MotorCursor.to_list` or
:meth:`~MotorCursor.count` perform actual operations.
"""
if 'callback' in kwargs:
raise pymongo.errors.InvalidOperation(
"Pass a callback to each, to_list, or count, not to find.")
cursor = self.delegate.find(*args, **kwargs)
return MotorCursor(cursor, self)
def acquire(self, timeout=None):
"""Decrement the counter. Returns a Future.
Block if the counter is zero and wait for a `.release`. The Future
raises `.TimeoutError` after the deadline.
"""
waiter = Future()
if self._value > 0:
self._value -= 1
waiter.set_result(_ReleasingContextManager(self))
else:
self._waiters.append(waiter)
if timeout:
def on_timeout():
waiter.set_exception(gen.TimeoutError())
self._garbage_collect()
io_loop = ioloop.IOLoop.current()
timeout_handle = io_loop.add_timeout(timeout, on_timeout)
waiter.add_done_callback(
lambda _: io_loop.remove_timeout(timeout_handle))
return waiter
def write(self, chunk, callback=None):
"""Implements `.HTTPConnection.write`.
For backwards compatibility is is allowed but deprecated to
skip `write_headers` and instead call `write()` with a
pre-encoded header block.
"""
future = None
if self.stream.closed():
future = self._write_future = Future()
self._write_future.set_exception(iostream.StreamClosedError())
self._write_future.exception()
else:
if callback is not None:
self._write_callback = stack_context.wrap(callback)
else:
future = self._write_future = Future()
self._pending_write = self.stream.write(self._format_chunk(chunk))
self._pending_write.add_done_callback(self._on_write_complete)
return future
def test_replace_context_exception(self):
# Test exception handling: exceptions thrown into the stack context
# can be caught and replaced.
# Note that this test and the following are for behavior that is
# not really supported any more: coroutines no longer create a
# stack context automatically; but one is created after the first
# YieldPoint (i.e. not a Future).
@gen.coroutine
def f2():
(yield gen.Callback(1))()
yield gen.Wait(1)
self.io_loop.add_callback(lambda: 1 / 0)
try:
yield gen.Task(self.io_loop.add_timeout,
self.io_loop.time() + 10)
except ZeroDivisionError:
raise KeyError()
future = f2()
with self.assertRaises(KeyError):
yield future
self.finished = True
def test_moment(self):
calls = []
@gen.coroutine
def f(name, yieldable):
for i in range(5):
calls.append(name)
yield yieldable
# First, confirm the behavior without moment: each coroutine
# monopolizes the event loop until it finishes.
immediate = Future()
immediate.set_result(None)
yield [f('a', immediate), f('b', immediate)]
self.assertEqual(''.join(calls), 'aaaaabbbbb')
# With moment, they take turns.
calls = []
yield [f('a', gen.moment), f('b', gen.moment)]
self.assertEqual(''.join(calls), 'ababababab')
self.finished = True
calls = []
yield [f('a', gen.moment), f('b', immediate)]
self.assertEqual(''.join(calls), 'abbbbbaaaa')
def test_iterator(self):
futures = [Future(), Future(), Future(), Future()]
self.finish_coroutines(0, futures)
g = gen.WaitIterator(*futures)
i = 0
while not g.done():
try:
r = yield g.next()
except ZeroDivisionError:
self.assertIs(g.current_future, futures[0],
'exception future invalid')
else:
if i == 0:
self.assertEqual(r, 24, 'iterator value incorrect')
self.assertEqual(g.current_index, 2, 'wrong index')
elif i == 2:
self.assertEqual(r, 42, 'iterator value incorrect')
self.assertEqual(g.current_index, 1, 'wrong index')
elif i == 3:
self.assertEqual(r, 84, 'iterator value incorrect')
self.assertEqual(g.current_index, 3, 'wrong index')
i += 1
def test_future_close_callback(self):
# Regression test for interaction between the Future read interfaces
# and IOStream._maybe_add_error_listener.
server, client = self.make_iostream_pair()
closed = [False]
def close_callback():
closed[0] = True
self.stop()
server.set_close_callback(close_callback)
try:
client.write(b'a')
future = server.read_bytes(1)
self.io_loop.add_future(future, self.stop)
self.assertEqual(self.wait().result(), b'a')
self.assertFalse(closed[0])
client.close()
self.wait()
self.assertTrue(closed[0])
finally:
server.close()
client.close()
def setUp(self):
try:
super(TestIOStreamStartTLS, self).setUp()
self.listener, self.port = bind_unused_port()
self.server_stream = None
self.server_accepted = Future()
netutil.add_accept_handler(self.listener, self.accept)
self.client_stream = IOStream(socket.socket())
self.io_loop.add_future(self.client_stream.connect(
('127.0.0.1', self.port)), self.stop)
self.wait()
self.io_loop.add_future(self.server_accepted, self.stop)
self.wait()
except Exception as e:
print(e)
raise
def test_wait_for_handshake_callback(self):
test = self
handshake_future = Future()
class TestServer(TCPServer):
def handle_stream(self, stream, address):
# The handshake has not yet completed.
test.assertIsNone(stream.socket.cipher())
self.stream = stream
stream.wait_for_handshake(self.handshake_done)
def handshake_done(self):
# Now the handshake is done and ssl information is available.
test.assertIsNotNone(self.stream.socket.cipher())
handshake_future.set_result(None)
yield self.connect_to_server(TestServer)
yield handshake_future
def test_wait_for_handshake_already_connected(self):
handshake_future = Future()
class TestServer(TCPServer):
def handle_stream(self, stream, address):
self.stream = stream
stream.wait_for_handshake(self.handshake_done)
def handshake_done(self):
self.stream.wait_for_handshake(self.handshake2_done)
def handshake2_done(self):
handshake_future.set_result(None)
yield self.connect_to_server(TestServer)
yield handshake_future
def __init__(self, attr_name, has_write_concern, doc=None):
"""A descriptor that wraps a PyMongo method, such as insert or remove,
and returns an asynchronous version of the method, which accepts a
callback or returns a Future.
:Parameters:
- `attr_name`: The name of the attribute on the PyMongo class, if
different from attribute on the Motor class
- `has_write_concern`: Whether the method accepts getLastError options
"""
super(Async, self).__init__(doc)
self.attr_name = attr_name
self.has_write_concern = has_write_concern
def __init__(self, attr_name=None, doc=None):
"""A descriptor that wraps a PyMongo read method like find_one() that
returns a Future.
"""
Async.__init__(
self, attr_name=attr_name, has_write_concern=False, doc=doc)
def __init__(self, attr_name=None, doc=None):
"""A descriptor that wraps a PyMongo command like copy_database() that
returns a Future and does not accept getLastError options.
"""
Async.__init__(
self, attr_name=attr_name, has_write_concern=False, doc=doc)
def open(self):
"""Connect to the server.
Takes an optional callback, or returns a Future that resolves to
``self`` when opened. This is convenient for checking at program
startup time whether you can connect.
.. doctest::
>>> client = MotorClient()
>>> # run_sync() returns the open client.
>>> IOLoop.current().run_sync(client.open)
MotorClient(MongoClient('localhost', 27017))
``open`` raises a :exc:`~pymongo.errors.ConnectionFailure` if it
cannot connect, but note that auth failures aren't revealed until
you attempt an operation on the open client.
:Parameters:
- `callback`: Optional function taking parameters (self, error)
.. versionchanged:: 0.2
:class:`MotorClient` now opens itself on demand, calling ``open``
explicitly is now optional.
"""
yield self._ensure_connected()
raise gen.Return(self)
def open(self):
"""Connect to the server.
Takes an optional callback, or returns a Future that resolves to
``self`` when opened. This is convenient for checking at program
startup time whether you can connect.
.. doctest::
>>> client = MotorClient()
>>> # run_sync() returns the open client.
>>> IOLoop.current().run_sync(client.open)
MotorClient(MongoClient('localhost', 27017))
``open`` raises a :exc:`~pymongo.errors.ConnectionFailure` if it
cannot connect, but note that auth failures aren't revealed until
you attempt an operation on the open client.
:Parameters:
- `callback`: Optional function taking parameters (self, error)
.. versionchanged:: 0.2
:class:`MotorReplicaSetClient` now opens itself on demand, calling
``open`` explicitly is now optional.
"""
yield self._ensure_connected(True)
primary = self._get_member()
if not primary:
raise pymongo.errors.AutoReconnect('no primary is available')
raise gen.Return(self)