python类wait()的实例源码

cluster.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def add_execution_profile(self, name, profile, pool_wait_timeout=5):
        """
        Adds an :class:`.ExecutionProfile` to the cluster. This makes it available for use by ``name`` in :meth:`.Session.execute`
        and :meth:`.Session.execute_async`. This method will raise if the profile already exists.

        Normally profiles will be injected at cluster initialization via ``Cluster(execution_profiles)``. This method
        provides a way of adding them dynamically.

        Adding a new profile updates the connection pools according to the specified ``load_balancing_policy``. By default,
        this method will wait up to five seconds for the pool creation to complete, so the profile can be used immediately
        upon return. This behavior can be controlled using ``pool_wait_timeout`` (see
        `concurrent.futures.wait <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.wait>`_
        for timeout semantics).
        """
        if not isinstance(profile, ExecutionProfile):
            raise TypeError("profile must be an instance of ExecutionProfile")
        if self._config_mode == _ConfigMode.LEGACY:
            raise ValueError("Cannot add execution profiles when legacy parameters are set explicitly.")
        if name in self.profile_manager.profiles:
            raise ValueError("Profile %s already exists")
        self.profile_manager.profiles[name] = profile
        profile.load_balancing_policy.populate(self, self.metadata.all_hosts())
        # on_up after populate allows things like DCA LBP to choose default local dc
        for host in filter(lambda h: h.is_up, self.metadata.all_hosts()):
            profile.load_balancing_policy.on_up(host)
        futures = set()
        for session in self.sessions:
            futures.update(session.update_created_pools())
        _, not_done = wait_futures(futures, pool_wait_timeout)
        if not_done:
            raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout.")
cluster.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, cluster, hosts, keyspace=None):
        self.cluster = cluster
        self.hosts = hosts
        self.keyspace = keyspace

        self._lock = RLock()
        self._pools = {}
        self._profile_manager = cluster.profile_manager
        self._metrics = cluster.metrics
        self._request_init_callbacks = []
        self._protocol_version = self.cluster.protocol_version

        self.encoder = Encoder()

        # create connection pools in parallel
        self._initial_connect_futures = set()
        for host in hosts:
            future = self.add_or_renew_pool(host, is_host_addition=False)
            if future:
                self._initial_connect_futures.add(future)

        futures = wait_futures(self._initial_connect_futures, return_when=FIRST_COMPLETED)
        while futures.not_done and not any(f.result() for f in futures.done):
            futures = wait_futures(futures.not_done, return_when=FIRST_COMPLETED)

        if not any(f.result() for f in self._initial_connect_futures):
            msg = "Unable to connect to any servers"
            if self.keyspace:
                msg += " using keyspace '%s'" % self.keyspace
            raise NoHostAvailable(msg, [h.address for h in hosts])
test_futures.py 文件源码 项目:deb-python-concurrent.futures 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def tearDown(self):
        self.executor.shutdown(wait=True)
        dt = time.time() - self.t1
        if test_support.verbose:
            print("%.2fs" % dt)
        self.assertLess(dt, 60, "synchronization issue: test lasted too long")
test_futures.py 文件源码 项目:deb-python-concurrent.futures 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_first_completed(self):
        future1 = self.executor.submit(mul, 21, 2)
        future2 = self.executor.submit(time.sleep, 1.5)

        done, not_done = futures.wait(
                [CANCELLED_FUTURE, future1, future2],
                 return_when=futures.FIRST_COMPLETED)

        self.assertEqual(set([future1]), done)
        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
test_futures.py 文件源码 项目:deb-python-concurrent.futures 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_first_completed_some_already_completed(self):
        future1 = self.executor.submit(time.sleep, 1.5)

        finished, pending = futures.wait(
                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
                 return_when=futures.FIRST_COMPLETED)

        self.assertEqual(
                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
                finished)
        self.assertEqual(set([future1]), pending)
test_futures.py 文件源码 项目:deb-python-concurrent.futures 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_first_exception(self):
        future1 = self.executor.submit(mul, 2, 21)
        future2 = self.executor.submit(sleep_and_raise, 1.5)
        future3 = self.executor.submit(time.sleep, 3)

        finished, pending = futures.wait(
                [future1, future2, future3],
                return_when=futures.FIRST_EXCEPTION)

        self.assertEqual(set([future1, future2]), finished)
        self.assertEqual(set([future3]), pending)
test_futures.py 文件源码 项目:deb-python-concurrent.futures 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_first_exception_some_already_complete(self):
        future1 = self.executor.submit(divmod, 21, 0)
        future2 = self.executor.submit(time.sleep, 1.5)

        finished, pending = futures.wait(
                [SUCCESSFUL_FUTURE,
                 CANCELLED_FUTURE,
                 CANCELLED_AND_NOTIFIED_FUTURE,
                 future1, future2],
                return_when=futures.FIRST_EXCEPTION)

        self.assertEqual(set([SUCCESSFUL_FUTURE,
                              CANCELLED_AND_NOTIFIED_FUTURE,
                              future1]), finished)
        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
test_futures.py 文件源码 项目:deb-python-concurrent.futures 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_pending_calls_race(self):
        # Issue #14406: multi-threaded race condition when waiting on all
        # futures.
        event = threading.Event()
        def future_func():
            event.wait()
        oldswitchinterval = sys.getcheckinterval()
        sys.setcheckinterval(1)
        try:
            fs = set(self.executor.submit(future_func) for i in range(100))
            event.set()
            futures.wait(fs, return_when=futures.ALL_COMPLETED)
        finally:
            sys.setcheckinterval(oldswitchinterval)
test_futures.py 文件源码 项目:deb-python-concurrent.futures 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_map_submits_without_iteration(self):
        """Tests verifying issue 11777."""
        finished = []
        def record_finished(n):
            finished.append(n)

        self.executor.map(record_finished, range(10))
        self.executor.shutdown(wait=True)
        self.assertEqual(len(finished), 10)
gthread.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run(self):
        # init listeners, add them to the event loop
        for s in self.sockets:
            s.setblocking(False)
            self.poller.register(s, selectors.EVENT_READ, self.accept)

        timeout = self.cfg.timeout or 0.5

        while self.alive:
            # notify the arbiter we are alive
            self.notify()

            # can we accept more connections?
            if self.nr_conns < self.worker_connections:
                # wait for an event
                events = self.poller.select(0.02)
                for key, mask in events:
                    callback = key.data
                    callback(key.fileobj)

            if not self.is_parent_alive():
                break

            # hanle keepalive timeouts
            self.murder_keepalived()

            # if the number of connections is < to the max we can handle at
            # the same time there is no need to wait for one
            if len(self.futures) < self.cfg.threads:
                continue

            result = futures.wait(self.futures, timeout=timeout,
                    return_when=futures.FIRST_COMPLETED)

            if not result.done:
                break
            else:
                [self.futures.remove(f) for f in result.done]

        self.tpool.shutdown(False)
        self.poller.close()
listener.py 文件源码 项目:nitro 作者: KVM-VMI 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def listen_vcpu(self, vcpu_io, queue):
        """Listen to an individual virtual CPU"""
        logging.info('Start listening on VCPU %s', vcpu_io.vcpu_nb)
        # we need a per thread continue event
        continue_event = threading.Event()
        while not self.stop_request.is_set():
            try:
                nitro_raw_ev = vcpu_io.get_event()
            except ValueError as e:
                if not self.vm_io.syscall_filters:
                    # if there are no filters, get_event should not timeout
                    # since we capture all system calls
                    # so log the error
                    logging.debug(str(e))
            else:
                e = NitroEvent(nitro_raw_ev, vcpu_io)
                # put the event in the queue
                # and wait for the event to be processed,
                # when the main thread will set the continue_event
                item = (e, continue_event)
                queue.put(item)
                continue_event.wait()
                # reset continue_event
                continue_event.clear()
                vcpu_io.continue_vm()

        logging.debug('stop listening on VCPU %s', vcpu_io.vcpu_nb)
test_concurrent_futures.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def tearDown(self):
        self.executor.shutdown(wait=True)
        dt = time.time() - self.t1
        if test.support.verbose:
            print("%.2fs" % dt, end=' ')
        self.assertLess(dt, 60, "synchronization issue: test lasted too long")
test_concurrent_futures.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_first_completed(self):
        future1 = self.executor.submit(mul, 21, 2)
        future2 = self.executor.submit(time.sleep, 1.5)

        done, not_done = futures.wait(
                [CANCELLED_FUTURE, future1, future2],
                 return_when=futures.FIRST_COMPLETED)

        self.assertEqual(set([future1]), done)
        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
test_concurrent_futures.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_first_completed_some_already_completed(self):
        future1 = self.executor.submit(time.sleep, 1.5)

        finished, pending = futures.wait(
                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
                 return_when=futures.FIRST_COMPLETED)

        self.assertEqual(
                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
                finished)
        self.assertEqual(set([future1]), pending)
test_concurrent_futures.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_first_exception(self):
        future1 = self.executor.submit(mul, 2, 21)
        future2 = self.executor.submit(sleep_and_raise, 1.5)
        future3 = self.executor.submit(time.sleep, 3)

        finished, pending = futures.wait(
                [future1, future2, future3],
                return_when=futures.FIRST_EXCEPTION)

        self.assertEqual(set([future1, future2]), finished)
        self.assertEqual(set([future3]), pending)
test_concurrent_futures.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_first_exception_some_already_complete(self):
        future1 = self.executor.submit(divmod, 21, 0)
        future2 = self.executor.submit(time.sleep, 1.5)

        finished, pending = futures.wait(
                [SUCCESSFUL_FUTURE,
                 CANCELLED_FUTURE,
                 CANCELLED_AND_NOTIFIED_FUTURE,
                 future1, future2],
                return_when=futures.FIRST_EXCEPTION)

        self.assertEqual(set([SUCCESSFUL_FUTURE,
                              CANCELLED_AND_NOTIFIED_FUTURE,
                              future1]), finished)
        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
gthread.py 文件源码 项目:Price-Comparator 作者: Thejas-1 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run(self):
        # init listeners, add them to the event loop
        for s in self.sockets:
            s.setblocking(False)
            self.poller.register(s, selectors.EVENT_READ, self.accept)

        timeout = self.cfg.timeout or 0.5

        while self.alive:
            # notify the arbiter we are alive
            self.notify()

            # can we accept more connections?
            if self.nr_conns < self.worker_connections:
                # wait for an event
                events = self.poller.select(0.02)
                for key, mask in events:
                    callback = key.data
                    callback(key.fileobj)

            if not self.is_parent_alive():
                break

            # hanle keepalive timeouts
            self.murder_keepalived()

            # if the number of connections is < to the max we can handle at
            # the same time there is no need to wait for one
            if len(self.futures) < self.cfg.threads:
                continue

            result = futures.wait(self.futures, timeout=timeout,
                    return_when=futures.FIRST_COMPLETED)

            if not result.done:
                break
            else:
                [self.futures.remove(f) for f in result.done]

        self.tpool.shutdown(False)
        self.poller.close()
test_concurrent_futures.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def tearDown(self):
        self.executor.shutdown(wait=True)
        dt = time.time() - self.t1
        if test.support.verbose:
            print("%.2fs" % dt, end=' ')
        self.assertLess(dt, 60, "synchronization issue: test lasted too long")
test_concurrent_futures.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_first_completed(self):
        future1 = self.executor.submit(mul, 21, 2)
        future2 = self.executor.submit(time.sleep, 1.5)

        done, not_done = futures.wait(
                [CANCELLED_FUTURE, future1, future2],
                 return_when=futures.FIRST_COMPLETED)

        self.assertEqual(set([future1]), done)
        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
test_concurrent_futures.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_first_completed_some_already_completed(self):
        future1 = self.executor.submit(time.sleep, 1.5)

        finished, pending = futures.wait(
                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
                 return_when=futures.FIRST_COMPLETED)

        self.assertEqual(
                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
                finished)
        self.assertEqual(set([future1]), pending)


问题


面经


文章

微信
公众号

扫码关注公众号