def add_execution_profile(self, name, profile, pool_wait_timeout=5):
"""
Adds an :class:`.ExecutionProfile` to the cluster. This makes it available for use by ``name`` in :meth:`.Session.execute`
and :meth:`.Session.execute_async`. This method will raise if the profile already exists.
Normally profiles will be injected at cluster initialization via ``Cluster(execution_profiles)``. This method
provides a way of adding them dynamically.
Adding a new profile updates the connection pools according to the specified ``load_balancing_policy``. By default,
this method will wait up to five seconds for the pool creation to complete, so the profile can be used immediately
upon return. This behavior can be controlled using ``pool_wait_timeout`` (see
`concurrent.futures.wait <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.wait>`_
for timeout semantics).
"""
if not isinstance(profile, ExecutionProfile):
raise TypeError("profile must be an instance of ExecutionProfile")
if self._config_mode == _ConfigMode.LEGACY:
raise ValueError("Cannot add execution profiles when legacy parameters are set explicitly.")
if name in self.profile_manager.profiles:
raise ValueError("Profile %s already exists")
self.profile_manager.profiles[name] = profile
profile.load_balancing_policy.populate(self, self.metadata.all_hosts())
# on_up after populate allows things like DCA LBP to choose default local dc
for host in filter(lambda h: h.is_up, self.metadata.all_hosts()):
profile.load_balancing_policy.on_up(host)
futures = set()
for session in self.sessions:
futures.update(session.update_created_pools())
_, not_done = wait_futures(futures, pool_wait_timeout)
if not_done:
raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout.")
python类wait()的实例源码
def __init__(self, cluster, hosts, keyspace=None):
self.cluster = cluster
self.hosts = hosts
self.keyspace = keyspace
self._lock = RLock()
self._pools = {}
self._profile_manager = cluster.profile_manager
self._metrics = cluster.metrics
self._request_init_callbacks = []
self._protocol_version = self.cluster.protocol_version
self.encoder = Encoder()
# create connection pools in parallel
self._initial_connect_futures = set()
for host in hosts:
future = self.add_or_renew_pool(host, is_host_addition=False)
if future:
self._initial_connect_futures.add(future)
futures = wait_futures(self._initial_connect_futures, return_when=FIRST_COMPLETED)
while futures.not_done and not any(f.result() for f in futures.done):
futures = wait_futures(futures.not_done, return_when=FIRST_COMPLETED)
if not any(f.result() for f in self._initial_connect_futures):
msg = "Unable to connect to any servers"
if self.keyspace:
msg += " using keyspace '%s'" % self.keyspace
raise NoHostAvailable(msg, [h.address for h in hosts])
def tearDown(self):
self.executor.shutdown(wait=True)
dt = time.time() - self.t1
if test_support.verbose:
print("%.2fs" % dt)
self.assertLess(dt, 60, "synchronization issue: test lasted too long")
def test_first_completed(self):
future1 = self.executor.submit(mul, 21, 2)
future2 = self.executor.submit(time.sleep, 1.5)
done, not_done = futures.wait(
[CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(set([future1]), done)
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
def test_first_completed_some_already_completed(self):
future1 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future1]), pending)
def test_first_exception(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(sleep_and_raise, 1.5)
future3 = self.executor.submit(time.sleep, 3)
finished, pending = futures.wait(
[future1, future2, future3],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([future1, future2]), finished)
self.assertEqual(set([future3]), pending)
def test_first_exception_some_already_complete(self):
future1 = self.executor.submit(divmod, 21, 0)
future2 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1]), finished)
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
def test_pending_calls_race(self):
# Issue #14406: multi-threaded race condition when waiting on all
# futures.
event = threading.Event()
def future_func():
event.wait()
oldswitchinterval = sys.getcheckinterval()
sys.setcheckinterval(1)
try:
fs = set(self.executor.submit(future_func) for i in range(100))
event.set()
futures.wait(fs, return_when=futures.ALL_COMPLETED)
finally:
sys.setcheckinterval(oldswitchinterval)
def test_map_submits_without_iteration(self):
"""Tests verifying issue 11777."""
finished = []
def record_finished(n):
finished.append(n)
self.executor.map(record_finished, range(10))
self.executor.shutdown(wait=True)
self.assertEqual(len(finished), 10)
def run(self):
# init listeners, add them to the event loop
for s in self.sockets:
s.setblocking(False)
self.poller.register(s, selectors.EVENT_READ, self.accept)
timeout = self.cfg.timeout or 0.5
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(0.02)
for key, mask in events:
callback = key.data
callback(key.fileobj)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
# if the number of connections is < to the max we can handle at
# the same time there is no need to wait for one
if len(self.futures) < self.cfg.threads:
continue
result = futures.wait(self.futures, timeout=timeout,
return_when=futures.FIRST_COMPLETED)
if not result.done:
break
else:
[self.futures.remove(f) for f in result.done]
self.tpool.shutdown(False)
self.poller.close()
def listen_vcpu(self, vcpu_io, queue):
"""Listen to an individual virtual CPU"""
logging.info('Start listening on VCPU %s', vcpu_io.vcpu_nb)
# we need a per thread continue event
continue_event = threading.Event()
while not self.stop_request.is_set():
try:
nitro_raw_ev = vcpu_io.get_event()
except ValueError as e:
if not self.vm_io.syscall_filters:
# if there are no filters, get_event should not timeout
# since we capture all system calls
# so log the error
logging.debug(str(e))
else:
e = NitroEvent(nitro_raw_ev, vcpu_io)
# put the event in the queue
# and wait for the event to be processed,
# when the main thread will set the continue_event
item = (e, continue_event)
queue.put(item)
continue_event.wait()
# reset continue_event
continue_event.clear()
vcpu_io.continue_vm()
logging.debug('stop listening on VCPU %s', vcpu_io.vcpu_nb)
def tearDown(self):
self.executor.shutdown(wait=True)
dt = time.time() - self.t1
if test.support.verbose:
print("%.2fs" % dt, end=' ')
self.assertLess(dt, 60, "synchronization issue: test lasted too long")
def test_first_completed(self):
future1 = self.executor.submit(mul, 21, 2)
future2 = self.executor.submit(time.sleep, 1.5)
done, not_done = futures.wait(
[CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(set([future1]), done)
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
def test_first_completed_some_already_completed(self):
future1 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future1]), pending)
def test_first_exception(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(sleep_and_raise, 1.5)
future3 = self.executor.submit(time.sleep, 3)
finished, pending = futures.wait(
[future1, future2, future3],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([future1, future2]), finished)
self.assertEqual(set([future3]), pending)
def test_first_exception_some_already_complete(self):
future1 = self.executor.submit(divmod, 21, 0)
future2 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1]), finished)
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
def run(self):
# init listeners, add them to the event loop
for s in self.sockets:
s.setblocking(False)
self.poller.register(s, selectors.EVENT_READ, self.accept)
timeout = self.cfg.timeout or 0.5
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(0.02)
for key, mask in events:
callback = key.data
callback(key.fileobj)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
# if the number of connections is < to the max we can handle at
# the same time there is no need to wait for one
if len(self.futures) < self.cfg.threads:
continue
result = futures.wait(self.futures, timeout=timeout,
return_when=futures.FIRST_COMPLETED)
if not result.done:
break
else:
[self.futures.remove(f) for f in result.done]
self.tpool.shutdown(False)
self.poller.close()
def tearDown(self):
self.executor.shutdown(wait=True)
dt = time.time() - self.t1
if test.support.verbose:
print("%.2fs" % dt, end=' ')
self.assertLess(dt, 60, "synchronization issue: test lasted too long")
def test_first_completed(self):
future1 = self.executor.submit(mul, 21, 2)
future2 = self.executor.submit(time.sleep, 1.5)
done, not_done = futures.wait(
[CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(set([future1]), done)
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
def test_first_completed_some_already_completed(self):
future1 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future1]), pending)