def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)
python类wait()的实例源码
def shutdown(self):
"""
Close all connections. ``Session`` instances should not be used
for any purpose after being shutdown.
"""
with self._lock:
if self.is_shutdown:
return
else:
self.is_shutdown = True
# PYTHON-673. If shutdown was called shortly after session init, avoid
# a race by cancelling any initial connection attempts haven't started,
# then blocking on any that have.
for future in self._initial_connect_futures:
future.cancel()
wait_futures(self._initial_connect_futures)
for pool in tuple(self._pools.values()):
pool.shutdown()
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 3)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=1.5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEqual(set([future2]), pending)
def optimiz(currencies, debug):
currencies = sorted(currencies)
if len(currencies) < 2 or len(currencies) > 10:
return {"error": "2 to 10 currencies"}
max_workers = 4 if sys.version_info[1] < 5 else None
executor = ThreadPoolExecutor(max_workers)
data = dict(future.result() for future in wait([executor.submit(get_ochl, cur) for cur in currencies]).done)
data = [data[cur] for cur in currencies]
errors = [x['error'] for x in data if 'error' in x]
if errors:
return {"error": "Currencies not found : " + str(errors)}
weights, m, s, a, b = markowitz_optimization(data, debug)
if debug:
import matplotlib as mpl
mpl.use('Agg')
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
plt.plot(s, m, 'o', markersize=1)
plt.plot(b, a, 'or')
fig.savefig("chalu.png")
result = dict()
for i, cur in enumerate(currencies):
result[cur] = weights[i]
return {"result": result}
def stop_listen(self):
"""Stop listening for events"""
self.set_traps(False)
self.stop_request.set()
nb_threads = len([f for f in self.futures if f.running()])
if nb_threads:
# ack current thread
self.current_cont_event.set()
# wait for current thread to terminate
while [f for f in self.futures if f.running()] == nb_threads:
time.sleep(0.1)
# ack the rest of the threads
while [f for f in self.futures if f.running()]:
if self.queue.full():
(*rest, continue_event) = self.queue.get()
continue_event.set()
# let the threads terminate
time.sleep(0.1)
# wait for threads to exit
wait(self.futures)
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 3)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=1.5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEqual(set([future2]), pending)
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 6)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEqual(set([future2]), pending)
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 6)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEqual(set([future2]), pending)
def doCrawl(self):
flag = True
while flag:
futures = []
#??????5?
with ThreadPoolExecutor(max_workers = 5) as executor:
for i in range(self.context['currentPage'],self.context['currentPage'] + 5):
futures.append(executor.submit(self.getMangaDataByPage,i))
wait(futures)
#??????
for f in futures:
gmetadata = f.result()
#??????
if gmetadata == None:
flag = False
break
for data in gmetadata:
#??
if int(data['posted']) < self.context['currentPosted']:
self.db.insertEromanga(data)
self.context['currentPosted'] = int(data['posted'])
self.context['currentPage'] += 1
def doCrawlNewest(self):
flag = True
while flag:
futures = []
#??????5?
with ThreadPoolExecutor(max_workers = 5) as executor:
for i in range(self.context['currentPage'],self.context['currentPage'] + 5):
futures.append(executor.submit(self.getMangaDataByPage,i))
wait(futures)
#??????
for f in futures:
gmetadata = f.result()
for data in gmetadata:
#????????
if int(data['posted']) <= self.context['newestPosted']:
info("info","excrawler.crawlNewest","???????????? posted:" + data['posted'])
setConfig("app","new_context","")
flag = False
exit()
#??
if int(data['posted']) < self.context['currentPosted']:
self.db.insertEromanga(data)
self.context['currentPosted'] = int(data['posted'])
self.context['currentPage'] += 1
def run(self, funcs):
"""Run a set of functions in parallel, returning their results.
Make sure any function you pass exits with a reasonable timeout. If it
doesn't return within the timeout or the result is ignored due an exception
in a separate thread it will continue to stick around until it finishes,
including blocking process exit.
Args:
funcs: An iterable of functions or iterable of args to functools.partial.
Returns:
A list of return values with the values matching the order in funcs.
Raises:
Propagates the first exception encountered in one of the functions.
"""
funcs = [f if callable(f) else functools.partial(*f) for f in funcs]
if len(funcs) == 1: # Ignore threads if it's not needed.
return [funcs[0]()]
if len(funcs) > self._workers: # Lazy init and grow as needed.
self.shutdown()
self._workers = len(funcs)
self._executor = futures.ThreadPoolExecutor(self._workers)
futs = [self._executor.submit(f) for f in funcs]
done, not_done = futures.wait(futs, self._timeout, futures.FIRST_EXCEPTION)
# Make sure to propagate any exceptions.
for f in done:
if not f.cancelled() and f.exception() is not None:
if not_done:
# If there are some calls that haven't finished, cancel and recreate
# the thread pool. Otherwise we may have a thread running forever
# blocking parallel calls.
for nd in not_done:
nd.cancel()
self.shutdown(False) # Don't wait, they may be deadlocked.
raise f.exception()
# Either done or timed out, so don't wait again.
return [f.result(timeout=0) for f in futs]
def _shutdown_instance(self, inst, port):
force_shutdown_time = time.time() + _SHUTDOWN_TIMEOUT
try:
environ = self.build_request_environ(
'GET', '/_ah/stop', [], '', '0.1.0.3', port, fake_login=True)
self._handle_request(environ,
start_response_utils.null_start_response,
inst=inst,
request_type=instance.SHUTDOWN_REQUEST)
logging.debug('Sent shutdown request: %s', inst)
except:
logging.exception('Internal error while handling shutdown request.')
finally:
time_to_wait = force_shutdown_time - time.time()
self._quit_event.wait(time_to_wait)
inst.quit(force=True)
def quit(self):
"""Stops the Module."""
self._quit_event.set()
# The instance adjustment thread depends on the balanced module and the
# watcher so wait for it exit before quitting them.
if self._watcher:
self._watcher.quit()
self._change_watcher_thread.join()
self._balanced_module.quit()
for wsgi_servr in self._wsgi_servers:
wsgi_servr.quit()
with self._condition:
instances = self._instances
self._instances = []
self._condition.notify_all()
for inst in instances:
inst.quit(force=True)
def quit(self):
"""Stops the Module."""
self._quit_event.set()
self._change_watcher_thread.join()
# The instance adjustment thread depends on the balanced module and the
# watcher so wait for it exit before quitting them.
if self._watcher:
self._watcher.quit()
self._balanced_module.quit()
for wsgi_servr in self._wsgi_servers:
wsgi_servr.quit()
with self._condition:
instances = self._instances
self._instances = []
self._condition.notify_all()
for inst in instances:
inst.quit(force=True)
def _choose_instance(self, timeout_time):
"""Returns an Instance to handle a request or None if all are busy."""
with self._condition:
while time.time() < timeout_time and not self._quit_event.is_set():
for inst in self._instances:
if inst.can_accept_requests:
return inst
else:
inst = self._start_any_instance()
if inst:
break
self._condition.wait(timeout_time - time.time())
else:
return None
if inst:
inst.wait(timeout_time)
return inst
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 6)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEqual(set([future2]), pending)
def live_migrate_servers_from_host(self, image, host_to_evacuate,
flavor, block_migration=False, destination_host=None,
disk_over_commit=False, number_of_parallel_migrations=1,
**kwargs):
"""Live Migrate servers.
This scenario migrates all the VM in the specified compute host to another
compute node on the same availability zone.
:param image: image to be used to boot an instance
:param flavor: flavor to be used to boot an instance
:param block_migration: Specifies the migration type
on migrated instance or not
"""
servers_to_migrate = self._get_servers_from_compute(host_to_evacuate)
print "migrating servers: " + str(servers_to_migrate)
executor = ThreadPoolExecutor(max_workers=int(number_of_parallel_migrations))
futures = []
for server in servers_to_migrate:
a = executor.submit(self._migrate_server, server, destination_host, block_migration, disk_over_commit)
futures.append(a)
print(wait(futures))
def perform_requests(self):
signal.signal(signal.SIGINT, self.exit_fast)
signal.signal(signal.SIGTERM, self.exit_fast)
self.state = b'E'
for q_batch in self.get_batch():
for (_, _) in self.split_batch(q_batch):
if self.state != b"R":
self.state = b'R'
yield
continue
# wait for all batches to finish before returning
self.state = b'W'
while self.futures:
f_len = len(self.futures)
self.futures = [i for i in self.futures if not i.done()]
if f_len != len(self.futures):
self.ui.debug('Waiting for final requests to finish. '
'remaining requests: {}'
''.format(len(self.futures)))
wait(self.futures, return_when=FIRST_COMPLETED)
self.state = b'D'
yield True
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 3)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=1.5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEqual(set([future2]), pending)
def shutdown(self):
"""
Close all connections. ``Session`` instances should not be used
for any purpose after being shutdown.
"""
with self._lock:
if self.is_shutdown:
return
else:
self.is_shutdown = True
# PYTHON-673. If shutdown was called shortly after session init, avoid
# a race by cancelling any initial connection attempts haven't started,
# then blocking on any that have.
for future in self._initial_connect_futures:
future.cancel()
wait_futures(self._initial_connect_futures)
for pool in tuple(self._pools.values()):
pool.shutdown()
def memdump(self, local_filename, remote_filename=None, compress=False):
dump_object = self.start_memdump(remote_filename=remote_filename, compress=compress)
dump_object.wait()
dump_object.get(local_filename)
dump_object.delete()
def get(self, local_filename):
if not self._done:
self.wait()
if self._error:
raise self._error
src = self.lr_session.get_raw_file(self.remote_filename, timeout=3600, delay=5)
dst = open(local_filename, "wb")
shutil.copyfileobj(src, dst)
def wait(self):
self.lr_session._poll_command(self.memdump_id, timeout=3600, delay=5)
self._done = True
def run(self):
log.debug("Starting Live Response Job Scheduler")
while True:
log.debug("Waiting for item on Scheduler Queue")
item = self.schedule_queue.get(block=True)
log.debug("Got item: {0}".format(item))
if isinstance(item, WorkItem):
# new WorkItem available
self._unscheduled_jobs[item.sensor_id].append(item)
elif isinstance(item, CompletionNotification):
# job completed
self._idle_workers.add(item.sensor_id)
elif isinstance(item, WorkerStatus):
if item.status == "error":
log.error("Error encountered by JobWorker[{0}]: {1}".format(item.sensor_id,
item.exception))
elif item.status == "exiting":
log.debug("JobWorker[{0}] has exited, waiting...".format(item.sensor_id))
self._job_workers[item.sensor_id].join()
log.debug("JobWorker[{0}] deleted".format(item.sensor_id))
del self._job_workers[item.sensor_id]
try:
self._idle_workers.remove(item.sensor_id)
except KeyError:
pass
elif item.status == "ready":
log.debug("JobWorker[{0}] now ready to accept jobs, session established".format(item.sensor_id))
self._idle_workers.add(item.sensor_id)
else:
log.debug("Unknown status from JobWorker[{0}]: {1}".format(item.sensor_id, item.status))
else:
log.debug("Received unknown item on the scheduler Queue, exiting")
# exiting the scheduler if we get None
# TODO: wait for all worker threads to exit
return
self._schedule_jobs()
def test_wait_for_all():
def f(sleep_time: int):
sleep(sleep_time)
return sleep_time
def calc(fs):
fs_done = wait(fs).done
r = sum(r.result() for r in fs_done)
return r
pool = ThreadPoolExecutor()
fs = [pool.submit(f, arg) for arg in (3, 2, 5)]
result = pool.submit(calc, fs).result()
assert result == 10