def perform_requests(self):
signal.signal(signal.SIGINT, self.exit_fast)
signal.signal(signal.SIGTERM, self.exit_fast)
self.state = b'E'
for q_batch in self.get_batch():
for (_, _) in self.split_batch(q_batch):
if self.state != b"R":
self.state = b'R'
yield
continue
# wait for all batches to finish before returning
self.state = b'W'
while self.futures:
f_len = len(self.futures)
self.futures = [i for i in self.futures if not i.done()]
if f_len != len(self.futures):
self.ui.debug('Waiting for final requests to finish. '
'remaining requests: {}'
''.format(len(self.futures)))
wait(self.futures, return_when=FIRST_COMPLETED)
self.state = b'D'
yield True
python类FIRST_COMPLETED的实例源码
def __init__(self, cluster, hosts, keyspace=None):
self.cluster = cluster
self.hosts = hosts
self.keyspace = keyspace
self._lock = RLock()
self._pools = {}
self._profile_manager = cluster.profile_manager
self._metrics = cluster.metrics
self._request_init_callbacks = []
self._protocol_version = self.cluster.protocol_version
self.encoder = Encoder()
# create connection pools in parallel
self._initial_connect_futures = set()
for host in hosts:
future = self.add_or_renew_pool(host, is_host_addition=False)
if future:
self._initial_connect_futures.add(future)
futures = wait_futures(self._initial_connect_futures, return_when=FIRST_COMPLETED)
while futures.not_done and not any(f.result() for f in futures.done):
futures = wait_futures(futures.not_done, return_when=FIRST_COMPLETED)
if not any(f.result() for f in self._initial_connect_futures):
msg = "Unable to connect to any servers"
if self.keyspace:
msg += " using keyspace '%s'" % self.keyspace
raise NoHostAvailable(msg, [h.address for h in hosts])
def _get_layers(self):
"""
Wait for renderers to produce new layers, yields until at least one
layer is active.
"""
# schedule tasks to wait on each renderer queue
for r_idx in range(0, len(self.layers)):
layer = self.layers[r_idx]
if layer.waiter is None or layer.waiter.done():
layer.waiter = ensure_future(self._dequeue(r_idx))
# async wait for at least one completion
waiters = [layer.waiter for layer in self.layers]
if len(waiters) == 0:
return
await asyncio.wait(waiters, return_when=futures.FIRST_COMPLETED)
# check the rest without waiting
for r_idx in range(0, len(self.layers)):
layer = self.layers[r_idx]
if layer.waiter is not None and not layer.waiter.done():
self._dequeue_nowait(r_idx)
def test_first_completed(self):
future1 = self.executor.submit(mul, 21, 2)
future2 = self.executor.submit(time.sleep, 1.5)
done, not_done = futures.wait(
[CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(set([future1]), done)
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
def test_first_completed_some_already_completed(self):
future1 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future1]), pending)
def run(self):
for job in self.jobs:
task = self.root.loop.create_task(job.run())
self.task_job_map[task] = job
self.job_started_cb(job, task)
while self.task_job_map:
done, pending = await asyncio.wait(list(self.task_job_map.keys()),
return_when=FIRST_COMPLETED)
for task in done:
job = self.task_job_map.pop(task)
self.job_finished_cb(job, task)
self.root.start_coro(job.cleanup())
self.status = "finished"
logging.info("%s: all jobs finished.", self)
def wait_fs(self, fs):
"""Wait for futures.
:param fs: dict where key is future and value is related object
"""
self.log.debug("Waiting for %s" % fs.values())
while fs:
done, pending = yield from asyncio.wait(
list(fs.keys()), return_when=futures.FIRST_COMPLETED)
for fut in done:
if fut in fs:
del(fs[fut])
self.log.debug("Pending %s" % pending)
def run(self):
# init listeners, add them to the event loop
for s in self.sockets:
s.setblocking(False)
self.poller.register(s, selectors.EVENT_READ, self.accept)
timeout = self.cfg.timeout or 0.5
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(0.02)
for key, mask in events:
callback = key.data
callback(key.fileobj)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
# if the number of connections is < to the max we can handle at
# the same time there is no need to wait for one
if len(self.futures) < self.cfg.threads:
continue
result = futures.wait(self.futures, timeout=timeout,
return_when=futures.FIRST_COMPLETED)
if not result.done:
break
else:
[self.futures.remove(f) for f in result.done]
self.tpool.shutdown(False)
self.poller.close()
def test_first_completed(self):
future1 = self.executor.submit(mul, 21, 2)
future2 = self.executor.submit(time.sleep, 1.5)
done, not_done = futures.wait(
[CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(set([future1]), done)
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
def test_first_completed_some_already_completed(self):
future1 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future1]), pending)
def run(self):
# init listeners, add them to the event loop
for s in self.sockets:
s.setblocking(False)
self.poller.register(s, selectors.EVENT_READ, self.accept)
timeout = self.cfg.timeout or 0.5
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(0.02)
for key, mask in events:
callback = key.data
callback(key.fileobj)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
# if the number of connections is < to the max we can handle at
# the same time there is no need to wait for one
if len(self.futures) < self.cfg.threads:
continue
result = futures.wait(self.futures, timeout=timeout,
return_when=futures.FIRST_COMPLETED)
if not result.done:
break
else:
[self.futures.remove(f) for f in result.done]
self.tpool.shutdown(False)
self.poller.close()
def test_first_completed(self):
future1 = self.executor.submit(mul, 21, 2)
future2 = self.executor.submit(time.sleep, 1.5)
done, not_done = futures.wait(
[CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(set([future1]), done)
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
def test_first_completed_some_already_completed(self):
future1 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future1]), pending)
def test_first_completed(self):
future1 = self.executor.submit(mul, 21, 2)
future2 = self.executor.submit(time.sleep, 1.5)
done, not_done = futures.wait(
[CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(set([future1]), done)
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
def test_first_completed_some_already_completed(self):
future1 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future1]), pending)
def run(self):
# init listeners, add them to the event loop
for s in self.sockets:
s.setblocking(False)
self.poller.register(s, selectors.EVENT_READ, self.accept)
timeout = self.cfg.timeout or 0.5
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(0.02)
for key, mask in events:
callback = key.data
callback(key.fileobj)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
# if the number of connections is < to the max we can handle at
# the same time there is no need to wait for one
if len(self.futures) < self.cfg.threads:
continue
result = futures.wait(self.futures, timeout=timeout,
return_when=futures.FIRST_COMPLETED)
if not result.done:
break
else:
[self.futures.remove(f) for f in result.done]
self.tpool.shutdown(False)
self.poller.close()
def run(self):
# init listeners, add them to the event loop
for s in self.sockets:
s.setblocking(False)
self.poller.register(s, selectors.EVENT_READ, self.accept)
timeout = self.cfg.timeout or 0.5
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(0.02)
for key, mask in events:
callback = key.data
callback(key.fileobj)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
# if the number of connections is < to the max we can handle at
# the same time there is no need to wait for one
if len(self.futures) < self.cfg.threads:
continue
result = futures.wait(self.futures, timeout=timeout,
return_when=futures.FIRST_COMPLETED)
if not result.done:
break
else:
[self.futures.remove(f) for f in result.done]
self.tpool.shutdown(False)
self.poller.close()
def run(self):
# init listeners, add them to the event loop
for s in self.sockets:
s.setblocking(False)
self.poller.register(s, selectors.EVENT_READ, self.accept)
timeout = self.cfg.timeout or 0.5
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(0.02)
for key, mask in events:
callback = key.data
callback(key.fileobj)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
# if the number of connections is < to the max we can handle at
# the same time there is no need to wait for one
if len(self.futures) < self.cfg.threads:
continue
result = futures.wait(self.futures, timeout=timeout,
return_when=futures.FIRST_COMPLETED)
if not result.done:
break
else:
[self.futures.remove(f) for f in result.done]
self.tpool.shutdown(False)
self.poller.close()
def test_first_completed(self):
future1 = self.executor.submit(mul, 21, 2)
future2 = self.executor.submit(time.sleep, 1.5)
done, not_done = futures.wait(
[CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(set([future1]), done)
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
def test_first_completed_some_already_completed(self):
future1 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future1]), pending)
def run(self):
# init listeners, add them to the event loop
for s in self.sockets:
s.setblocking(False)
self.poller.register(s, selectors.EVENT_READ, self.accept)
timeout = self.cfg.timeout or 0.5
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(0.02)
for key, mask in events:
callback = key.data
callback(key.fileobj)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
# if the number of connections is < to the max we can handle at
# the same time there is no need to wait for one
if len(self.futures) < self.cfg.threads:
continue
result = futures.wait(self.futures, timeout=timeout,
return_when=futures.FIRST_COMPLETED)
if not result.done:
break
else:
[self.futures.remove(f) for f in result.done]
self.tpool.shutdown(False)
self.poller.close()
def test_first_completed(self):
future1 = self.executor.submit(mul, 21, 2)
future2 = self.executor.submit(time.sleep, 1.5)
done, not_done = futures.wait(
[CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(set([future1]), done)
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
def test_first_completed_some_already_completed(self):
future1 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future1]), pending)
def __init__(self, cluster, hosts, keyspace=None):
self.cluster = cluster
self.hosts = hosts
self.keyspace = keyspace
self._lock = RLock()
self._pools = {}
self._profile_manager = cluster.profile_manager
self._metrics = cluster.metrics
self._request_init_callbacks = []
self._protocol_version = self.cluster.protocol_version
self.encoder = Encoder()
# create connection pools in parallel
self._initial_connect_futures = set()
for host in hosts:
future = self.add_or_renew_pool(host, is_host_addition=False)
if future:
self._initial_connect_futures.add(future)
futures = wait_futures(self._initial_connect_futures, return_when=FIRST_COMPLETED)
while futures.not_done and not any(f.result() for f in futures.done):
futures = wait_futures(futures.not_done, return_when=FIRST_COMPLETED)
if not any(f.result() for f in self._initial_connect_futures):
msg = "Unable to connect to any servers"
if self.keyspace:
msg += " using keyspace '%s'" % self.keyspace
raise NoHostAvailable(msg, [h.address for h in hosts])
def _compute_nodes(self, names, raise_exceptions=False):
LOG.debug('Computing nodes {}'.format(list(map(str, names))))
futs = {}
def run(name):
f, executor_name, args, kwds = self._get_func_args_kwds(name)
if executor_name is None:
executor = self.default_executor
else:
executor = self.executor_map[executor_name]
fut = executor.submit(self._eval_node, name, f, args, kwds, raise_exceptions)
futs[fut] = name
computed = set()
for name in names:
node0 = self.dag.node[name]
state = node0[_AN_STATE]
if state == States.COMPUTABLE:
run(name)
while len(futs) > 0:
done, not_done = wait(futs.keys(), return_when=FIRST_COMPLETED)
for fut in done:
name = futs.pop(fut)
node0 = self.dag.node[name]
value, exc, tb, start_dt, end_dt = fut.result()
delta = (end_dt - start_dt).total_seconds()
if exc is None:
self._set_state_and_value(name, States.UPTODATE, value)
node0[_AN_TIMING] = TimingData(start_dt, end_dt, delta)
self._set_descendents(name, States.STALE)
for n in self.dag.successors(name):
logging.debug(str(name) + ' ' + str(n) + ' ' + str(computed))
if n in computed:
raise LoopDetectedException("Calculating {} for the second time".format(name))
self._try_set_computable(n)
node0 = self.dag.node[n]
state = node0[_AN_STATE]
if state == States.COMPUTABLE and n in names:
run(n)
else:
self._set_state_and_value(name, States.ERROR, Error(exc, tb))
self._set_descendents(name, States.STALE)
computed.add(name)
def run(self):
# init listeners, add them to the event loop
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(1.0)
for key, mask in events:
callback = key.data
callback(key.fileobj)
# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)
# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
self.tpool.shutdown(False)
self.poller.close()
for s in self.sockets:
s.close()
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
def run(self):
# init listeners, add them to the event loop
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(1.0)
for key, mask in events:
callback = key.data
callback(key.fileobj)
# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)
# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
self.tpool.shutdown(False)
self.poller.close()
for s in self.sockets:
s.close()
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
def run(self):
# init listeners, add them to the event loop
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(1.0)
for key, mask in events:
callback = key.data
callback(key.fileobj)
# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)
# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
self.tpool.shutdown(False)
self.poller.close()
for s in self.sockets:
s.close()
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
def run(self):
# init listeners, add them to the event loop
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(1.0)
for key, mask in events:
callback = key.data
callback(key.fileobj)
# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)
# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
self.tpool.shutdown(False)
self.poller.close()
for s in self.sockets:
s.close()
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
def run(self):
# init listeners, add them to the event loop
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(1.0)
for key, mask in events:
callback = key.data
callback(key.fileobj)
# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)
# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
self.tpool.shutdown(False)
self.poller.close()
for s in self.sockets:
s.close()
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)