def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)
python类ALL_COMPLETED的实例源码
def stop(self):
if self.renderer.running:
tasks = []
if self.task is not None and not self.task.done():
self.task.cancel()
tasks.append(self.task)
if self.waiter is not None and not self.waiter.done():
self.waiter.cancel()
tasks.append(self.waiter)
await self.renderer._stop()
if len(tasks) > 0:
await asyncio.wait(tasks, return_when=futures.ALL_COMPLETED)
self.renderer.finish(self._frame)
def _stop(self):
"""
Stop this AnimationLoop
Shuts down the loop and triggers cleanup tasks.
"""
if not self.running:
return False
self.running = False
for layer in self.layers[::-1]:
await self.remove_layer(layer)
if self._anim_task is not None and not self._anim_task.done():
self._anim_task.cancel()
await asyncio.wait([self._anim_task], return_when=futures.ALL_COMPLETED)
self._logger.info("AnimationLoop stopped")
def _close_input_devices(self):
if not hasattr(self, '_opened') or not self._opened:
return
self._opened = False
for event_device in self._event_devices:
asyncio.get_event_loop().remove_reader(event_device.fileno())
event_device.close()
tasks = []
for task in self._tasks:
if not task.done():
task.cancel()
tasks.append(task)
await asyncio.wait(tasks, return_when=futures.ALL_COMPLETED)
self._event_devices.clear()
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 3)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=1.5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEqual(set([future2]), pending)
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 3)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=1.5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEqual(set([future2]), pending)
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 6)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEqual(set([future2]), pending)
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 6)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEqual(set([future2]), pending)
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 6)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEqual(set([future2]), pending)
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 3)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=1.5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEqual(set([future2]), pending)
def _run(self):
dm = UChromaDeviceManager()
atexit.register(UChromaServer.exit, self._loop)
dbus = DeviceManagerAPI(dm, self._logger)
power = PowerMonitor()
for sig in (signal.SIGINT, signal.SIGTERM):
self._loop.add_signal_handler(sig, self._shutdown_callback)
try:
dbus.run()
power.start()
ensure_future(dm.monitor_start(), loop=self._loop)
self._loop.run_forever()
except KeyboardInterrupt:
pass
finally:
for sig in (signal.SIGTERM, signal.SIGINT):
self._loop.remove_signal_handler(sig)
power.stop()
self._loop.run_until_complete(asyncio.wait( \
[dm.close_devices(), dm.monitor_stop()],
return_when=futures.ALL_COMPLETED))
def exit(loop):
try:
loop.run_until_complete(asyncio.wait( \
list(asyncio.Task.all_tasks()),
return_when=futures.ALL_COMPLETED))
loop.close()
except KeyboardInterrupt:
pass
def test_pending_calls_race(self):
# Issue #14406: multi-threaded race condition when waiting on all
# futures.
event = threading.Event()
def future_func():
event.wait()
oldswitchinterval = sys.getcheckinterval()
sys.setcheckinterval(1)
try:
fs = set(self.executor.submit(future_func) for i in range(100))
event.set()
futures.wait(fs, return_when=futures.ALL_COMPLETED)
finally:
sys.setcheckinterval(oldswitchinterval)
def run(self):
self._load_config()
for service in self.config.iter_instances("service", "Service"):
self._start_service(service)
listen = self.config.raw_data[0]["core"]["listen"]
self.http = HTTP(self.loop, listen)
self.http.add_route("*", r"/{dir:.*}", self._http_handler)
for prov in self.config.iter_providers():
self.providers[prov.name] = prov
yield from prov.start()
yield from self.http.start()
reload_fut = asyncio.async(self.reload(), loop=self.loop)
yield from self.stop_event.wait()
self.log.info("Interrupted.")
reload_fut.cancel()
yield from reload_fut
for obj in self._running_objects:
obj.cancel()
yield from asyncio.wait(self._running_objects,
return_when=futures.ALL_COMPLETED)
if self._running_cleanups:
yield from asyncio.wait(self._running_cleanups,
return_when=futures.ALL_COMPLETED)
for provider in self.providers.values():
yield from prov.stop()
self.http.stop()
yield from self.http.wait_closed()
self.log.info("Exit.")
def test_pending_calls_race(self):
# Issue #14406: multi-threaded race condition when waiting on all
# futures.
event = threading.Event()
def future_func():
event.wait()
oldswitchinterval = sys.getswitchinterval()
sys.setswitchinterval(1e-6)
try:
fs = {self.executor.submit(future_func) for i in range(100)}
event.set()
futures.wait(fs, return_when=futures.ALL_COMPLETED)
finally:
sys.setswitchinterval(oldswitchinterval)
def test_pending_calls_race(self):
# Issue #14406: multi-threaded race condition when waiting on all
# futures.
event = threading.Event()
def future_func():
event.wait()
oldswitchinterval = sys.getswitchinterval()
sys.setswitchinterval(1e-6)
try:
fs = {self.executor.submit(future_func) for i in range(100)}
event.set()
futures.wait(fs, return_when=futures.ALL_COMPLETED)
finally:
sys.setswitchinterval(oldswitchinterval)
def test_pending_calls_race(self):
# Issue #14406: multi-threaded race condition when waiting on all
# futures.
event = threading.Event()
def future_func():
event.wait()
oldswitchinterval = sys.getswitchinterval()
sys.setswitchinterval(1e-6)
try:
fs = {self.executor.submit(future_func) for i in range(100)}
event.set()
futures.wait(fs, return_when=futures.ALL_COMPLETED)
finally:
sys.setswitchinterval(oldswitchinterval)
def test_pending_calls_race(self):
# Issue #14406: multi-threaded race condition when waiting on all
# futures.
event = threading.Event()
def future_func():
event.wait()
oldswitchinterval = sys.getcheckinterval()
sys.setcheckinterval(1)
try:
fs = set(self.executor.submit(future_func) for i in range(100))
event.set()
futures.wait(fs, return_when=futures.ALL_COMPLETED)
finally:
sys.setcheckinterval(oldswitchinterval)
def pull_stats(config, storage_dir, loop, executor):
"""
Launch coroutines for pulling statistics from UNIX sockets.
This a delegating routine.
Arguments:
config (obj): A configParser object which holds configuration.
storage_dir (str): The absolute directory path to save the statistics.
loop (obj): A base event loop.
executor(obj): A ThreadPoolExecutor object.
Returns:
True if statistics from *all* UNIX sockets are fetched False otherwise.
"""
# absolute directory path which contains UNIX socket files.
results = [] # stores the result of finished tasks
socket_dir = config.get('pull', 'socket-dir')
pull_timeout = config.getfloat('pull', 'pull-timeout')
if int(pull_timeout) == 0:
pull_timeout = None
socket_files = [f for f in glob.glob(socket_dir + '/*')
if is_unix_socket(f)]
if not socket_files:
log.error("found zero UNIX sockets under %s to connect to", socket_dir)
return False
log.debug('pull statistics')
coroutines = [get(socket_file, cmd, storage_dir, loop, executor, config)
for socket_file in socket_files
for cmd in CMDS]
# Launch all connections.
done, pending = yield from asyncio.wait(coroutines,
timeout=pull_timeout,
return_when=ALL_COMPLETED)
for task in done:
log.debug('task status: %s', task)
results.append(task.result())
log.debug('task report, done:%s pending:%s succeed:%s failed:%s',
len(done),
len(pending),
results.count(True),
results.count(False))
for task in pending:
log.warning('cancelling task %s as it reached its timeout threshold of'
' %.2f seconds', task, pull_timeout)
task.cancel()
# only when all tasks are finished successfully we claim success
return not pending and len(set(results)) == 1 and True in set(results)