python类count()的实例源码

project.py 文件源码 项目:psyplot 作者: Chilipp 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        """
        Parameters
        ----------
        %(ArrayList.parameters)s
        main: Project
            The main project this subproject belongs to (or None if this
            project is the main project)
        num: int
            The number of the project
        """
        self.main = kwargs.pop('main', None)
        self._plot = ProjectPlotter(self)
        self.num = kwargs.pop('num', 1)
        self._ds_counter = count()
        with self.block_signals:
            super(Project, self).__init__(*args, **kwargs)
vmf.py 文件源码 项目:srctools 作者: TeamSpen210 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_id(self, desired=-1):
        """Get a valid ID."""

        if desired == -1:
            # Start with the lowest ID, and look upwards
            desired = 1

        if desired not in self:
            # The desired ID is avalible!
            self.add(desired)
            return desired

        # Check every ID in order to find a valid one
        for poss_id in itertools.count(start=1):
            if poss_id not in self:
                self.add(poss_id)
                return poss_id
util.py 文件源码 项目:lang-reps 作者: chaitanyamalaviya 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, i, s, count=1):
        self.i = i
        self.s = s
        self.count = count
util.py 文件源码 项目:lang-reps 作者: chaitanyamalaviya 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def add_string(self, string):
        if string in self.strings:
            self[string].count += 1
            return self[string]
        i = len(self.tokens)
        s = string
        t = Token(i, s)
        self.i2t[i] = t
        self.s2t[s] = t
        self.tokens.add(t)
        self.strings.add(s)
        return t
asynchronous.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _run(self, group, queue):
        LOG.debug("Asynchronous handler started processing %s", group)
        for _ in itertools.count():
            # NOTE(ivc): this is a mock-friendly replacement for 'while True'
            # to allow more controlled environment for unit-tests (e.g. to
            # avoid tests getting stuck in infinite loops)
            try:
                event = queue.get(timeout=self._grace_period)
            except six_queue.Empty:
                break
            # FIXME(ivc): temporary workaround to skip stale events
            # If K8s updates resource while the handler is processing it,
            # when the handler finishes its work it can fail to update an
            # annotation due to the 'resourceVersion' conflict. K8sClient
            # was updated to allow *new* annotations to be set ignoring
            # 'resourceVersion', but it leads to another problem as the
            # Handler will receive old events (i.e. before annotation is set)
            # and will start processing the event 'from scratch'.
            # It has negative effect on handlers' performance (VIFHandler
            # creates ports only to later delete them and LBaaS handler also
            # produces some excess requests to Neutron, although with lesser
            # impact).
            # Possible solutions (can be combined):
            #  - use K8s ThirdPartyResources to store data/annotations instead
            #    of native K8s resources (assuming Kuryr-K8s will own those
            #    resources and no one else would update them)
            #  - use the resulting 'resourceVersion' received from K8sClient's
            #    'annotate' to provide feedback to Async to skip all events
            #    until that version
            #  - stick to the 'get-or-create' behaviour in handlers and
            #    also introduce cache for long operations
            time.sleep(STALE_PERIOD)
            while not queue.empty():
                event = queue.get()
                if queue.empty():
                    time.sleep(STALE_PERIOD)
            self._handler(event)
asynchronous.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _done(self, thread, group):
        LOG.debug("Asynchronous handler stopped processing %s", group)
        queue = self._queues.pop(group)

        if not queue.empty():
            LOG.critical("Asynchronous handler terminated abnormally; "
                         "%(count)s events dropped for %(group)s",
                         {'count': queue.qsize(), 'group': group})

        if not self._queues:
            LOG.debug("Asynchronous handler is idle")
retry.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __call__(self, event):
        deadline = time.time() + self._timeout
        for attempt in itertools.count(1):
            try:
                self._handler(event)
                break
            except self._exceptions:
                with excutils.save_and_reraise_exception() as ex:
                    if self._sleep(deadline, attempt, ex.value):
                        ex.reraise = False
ipaddress.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _compat_bit_length(i):
        for res in itertools.count():
            if i >> res == 0:
                return res
ipaddress.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _count_righthand_zero_bits(number, bits):
    """Count the number of zero bits on the right hand side.

    Args:
        number: an integer.
        bits: maximum number of bits to count.

    Returns:
        The number of zero bits on the right hand side of the number.

    """
    if number == 0:
        return bits
    return min(bits, _compat_bit_length(~number & (number - 1)))
ipaddress.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _compat_bit_length(i):
        for res in itertools.count():
            if i >> res == 0:
                return res
ipaddress.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _count_righthand_zero_bits(number, bits):
    """Count the number of zero bits on the right hand side.

    Args:
        number: an integer.
        bits: maximum number of bits to count.

    Returns:
        The number of zero bits on the right hand side of the number.

    """
    if number == 0:
        return bits
    return min(bits, _compat_bit_length(~number & (number - 1)))
drawing.py 文件源码 项目:zellij 作者: nedbat 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __iter__(self):
        nums = itertools.count()
        while True:
            num = next(nums)
            name = self.name.replace("_", f"_{num:04d}")
            dwg = Drawing(name=name, *self.args, **self.kwargs)
            dwg.num = num
            sys.stdout.write(".")
            sys.stdout.flush()
            yield dwg
jar_extract.py 文件源码 项目:pbtk 作者: marin-m 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def namer():
    for length in count(1):
        for name in product(ascii_lowercase, repeat=length):
            yield ''.join(name)
download_planet_python.py 文件源码 项目:python-station-backend 作者: itielshwartz 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def download_posts(output_file, max_page_to_download=None):
    with open(output_file, "w") as f:  # open the output file
        pages_to_download_itr = range(1, max_page_to_download + 1) if max_page_to_download else itertools.count(1)
        for i in pages_to_download_itr:  # start itrate over the pages
            url = BASE_URL.format(i)
            logging.info("fetching %s", format(url))
            page_data = download_with_retry(url)
            if should_stop_page(page_data):  # validate it's not the last page
                return logging.info("Finished Downloading all data")
            f.write(json.dumps(page_data) + "\n")  # write page as jsonline
            logging.info("finished %s", format(url))
download_planet_python.py 文件源码 项目:python-station-backend 作者: itielshwartz 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def download_with_retry(url):
    for sleep_time in itertools.count():
        page_data_raw = requests.get(url, verify=False)  # get the page
        page_data = page_data_raw.text
        if page_data_raw.status_code == 200 or should_stop_page(page_data):
            return page_data
        sleep(sleep_time)  # sleep in case bad response
process.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
        assert group is None, 'group argument must be None for now'
        count = _current_process._counter.next()
        self._identity = _current_process._identity + (count,)
        self._authkey = _current_process._authkey
        self._daemonic = _current_process._daemonic
        self._tempdir = _current_process._tempdir
        self._parent_pid = os.getpid()
        self._popen = None
        self._target = target
        self._args = tuple(args)
        self._kwargs = dict(kwargs)
        self._name = name or type(self).__name__ + '-' + \
                     ':'.join(str(i) for i in self._identity)
process.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def __init__(self):
        self._identity = ()
        self._daemonic = False
        self._name = 'MainProcess'
        self._parent_pid = None
        self._popen = None
        self._counter = itertools.count(1)
        self._children = set()
        self._authkey = AuthenticationString(bytes(os.urandom(32), 'latin-1'))
        self._tempdir = None
btm_matcher.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self):
        self.transition_table = {}
        self.fixers = []
        self.id = next(BMNode.count)
        self.content = ''
fixer_base.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def start_tree(self, tree, filename):
        """Some fixers need to maintain tree-wide state.
        This method is called once, at the start of tree fix-up.

        tree - the root node of the tree to be processed.
        filename - the name of the file the tree came from.
        """
        self.used_names = tree.used_names
        self.set_filename(filename)
        self.numbers = itertools.count(1)
        self.first_log = True
heapq.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def nlargest(n, iterable, key=None):
    """Find the n largest elements in a dataset.

    Equivalent to:  sorted(iterable, key=key, reverse=True)[:n]
    """

    # Short-cut for n==1 is to use max() when len(iterable)>0
    if n == 1:
        it = iter(iterable)
        head = list(islice(it, 1))
        if not head:
            return []
        if key is None:
            return [max(chain(head, it))]
        return [max(chain(head, it), key=key)]

    # When n>=size, it's faster to use sorted()
    try:
        size = len(iterable)
    except (TypeError, AttributeError):
        pass
    else:
        if n >= size:
            return sorted(iterable, key=key, reverse=True)[:n]

    # When key is none, use simpler decoration
    if key is None:
        it = izip(iterable, count(0,-1))                    # decorate
        result = _nlargest(n, it)
        return map(itemgetter(0), result)                   # undecorate

    # General case, slowest method
    in1, in2 = tee(iterable)
    it = izip(imap(key, in1), count(0,-1), in2)             # decorate
    result = _nlargest(n, it)
    return map(itemgetter(2), result)                       # undecorate


问题


面经


文章

微信
公众号

扫码关注公众号