python类filterfalse()的实例源码

utils.py 文件源码 项目:minihydra 作者: VillanCh 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def unique_everseen(iterable, key=None):
    """
    The generator to list unique elements, preserving the order. Remember all
    elements ever seen. This was taken from the itertools recipes.

    Args:
        iterable: An iterable to process.
        key: Optional function to run when checking elements (e.g., str.lower)

    Returns:
        Generator: Yields a generator object.
    """

    seen = set()
    seen_add = seen.add

    if key is None:

        for element in filterfalse(seen.__contains__, iterable):

            seen_add(element)
            yield element

    else:

        for element in iterable:

            k = key(element)

            if k not in seen:

                seen_add(k)
                yield element
utils.py 文件源码 项目:minihydra 作者: VillanCh 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def unique_everseen(iterable, key=None):
    """
    The generator to list unique elements, preserving the order. Remember all
    elements ever seen. This was taken from the itertools recipes.

    Args:
        iterable: An iterable to process.
        key: Optional function to run when checking elements (e.g., str.lower)

    Returns:
        Generator: Yields a generator object.
    """

    seen = set()
    seen_add = seen.add

    if key is None:

        for element in filterfalse(seen.__contains__, iterable):

            seen_add(element)
            yield element

    else:

        for element in iterable:

            k = key(element)

            if k not in seen:

                seen_add(k)
                yield element
filecmp.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def phase1(self): # Compute common names
        a = dict(zip(map(os.path.normcase, self.left_list), self.left_list))
        b = dict(zip(map(os.path.normcase, self.right_list), self.right_list))
        self.common = list(map(a.__getitem__, filter(b.__contains__, a)))
        self.left_only = list(map(a.__getitem__, filterfalse(b.__contains__, a)))
        self.right_only = list(map(b.__getitem__, filterfalse(a.__contains__, b)))
filecmp.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _filter(flist, skip):
    return list(filterfalse(skip.__contains__, flist))


# Demonstration and testing.
#
compare_table_predicate.py 文件源码 项目:SkiRaff 作者: mathiascj 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def difference(a, b):
    """
    Equivalent to A-B or A\B in set theory. Difference/Relative Complement
    :param a: First list of dicts
    :param b: Second list of dicts
    :return: List of elements in a but not in b
    """
    return list(filterfalse(lambda x: x in b, a))
util.py 文件源码 项目:open-synthesis 作者: twschiller 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def partition(pred, iterable):
    """Use a predicate to partition entries into false entries and true entries."""
    # https://stackoverflow.com/questions/8793772/how-to-split-a-sequence-according-to-a-predicate
    # NOTE: this might iterate over the collection twice
    # NOTE: need to use filter(s) here because we're lazily dealing with iterators
    it1, it2 = itertools.tee(iterable)
    return itertools.filterfalse(pred, it1), filter(pred, it2)  # pylint: disable=bad-builtin
utils.py 文件源码 项目:mysplunk_csc 作者: patel-bhavin 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def unique_everseen(iterable, key=None):
    """
    The generator to list unique elements, preserving the order. Remember all
    elements ever seen. This was taken from the itertools recipes.

    Args:
        iterable: An iterable to process.
        key: Optional function to run when checking elements (e.g., str.lower)

    Returns:
        Generator: Yields a generator object.
    """

    seen = set()
    seen_add = seen.add

    if key is None:

        for element in filterfalse(seen.__contains__, iterable):

            seen_add(element)
            yield element

    else:

        for element in iterable:

            k = key(element)

            if k not in seen:

                seen_add(k)
                yield element
fields.py 文件源码 项目:Kiwi 作者: kiwitcms 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def to_python(self, value):

        if not value:
            return []

        if not isinstance(value, str):
            raise ValidationError(
                '%s is not a valid string value.' % str(value))

        result = [item.strip() for item in filterfalse(
            lambda item: item.strip() == '', value.split(self.delimiter))]
        return result
parse.py 文件源码 项目:wos_parser 作者: alexander-belikov 项目源码 文件源码 阅读 47 收藏 0 点赞 0 评论 0
def parse_address(branch):
    """
    expected address structure:

    required:
        organization : str
        add_no : int

    optional:
        full_address : str
        organization synonyms : list of str
        country : str
        city : str
        state : str
        zipcode : str
        street : str
    """
    success = True

    try:
        org_names = branch.findall(org_path)

        def condition(x):
            return x.attrib and 'pref' in x.attrib and x.attrib['pref'] == 'Y'

        # find first org with pref='Y'
        orgs_pref = list(filter(condition, org_names))
        orgs_pref = list(map(lambda x: x.text, filterfalse(lambda x: x is None, orgs_pref)))
        result_dict = {'organizations_pref': orgs_pref}

        orgs_rest = list(filterfalse(condition, org_names))
        orgs_rest = list(map(lambda x: x.text, filterfalse(lambda x: x is None, orgs_rest)))
        result_dict.update({'organizations': orgs_rest})

        suborg_names = branch.findall(suborg_path)
        suborgs = list(map(lambda x: x.text, filterfalse(lambda y: y is None, suborg_names)))
        result_dict.update({'suborganizations': suborgs})

        if branch.attrib:
            if add_no_key in branch.attrib:
                # TODO add try-catch-raise with logging
                # if not int-able : exception triggered
                addr_number = int(branch.attrib[add_no_key])
                result_dict.update({add_no_key: addr_number})
        else:
            result_dict.update({add_no_key: 1})

        # entries below are optional
        add_entry(result_dict, branch, full_address_path)
        add_entry(result_dict, branch, country_path)
        add_entry(result_dict, branch, city_path)
        add_entry(result_dict, branch, state_path)
        add_entry(result_dict, branch, zipcode_path)
        add_entry(result_dict, branch, street_path)

    except:
        success = False
        result_dict = etree_to_dict(branch)
    return success, result_dict
test_standard_library.py 文件源码 项目:packaging 作者: blockstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_future_moves(self):
        """
        Ensure everything is available from the future.moves interface that we
        claim and expect. (Issue #104).
        """
        from future.moves.collections import Counter, OrderedDict   # backported to Py2.6
        from future.moves.collections import UserDict, UserList, UserString

        from future.moves import configparser
        from future.moves import copyreg

        from future.moves.itertools import filterfalse, zip_longest

        from future.moves import html
        import future.moves.html.entities
        import future.moves.html.parser

        from future.moves import http
        import future.moves.http.client
        import future.moves.http.cookies
        import future.moves.http.cookiejar
        import future.moves.http.server

        from future.moves import queue

        from future.moves import socketserver

        from future.moves.subprocess import check_output              # even on Py2.6
        from future.moves.subprocess import getoutput, getstatusoutput

        from future.moves.sys import intern

        from future.moves import urllib
        import future.moves.urllib.error
        import future.moves.urllib.parse
        import future.moves.urllib.request
        import future.moves.urllib.response
        import future.moves.urllib.robotparser

        try:
            # Is _winreg available on Py2? If so, ensure future.moves._winreg is available too:
            import _winreg
        except ImportError:
            pass
        else:
            from future.moves import winreg

        from future.moves import xmlrpc
        import future.moves.xmlrpc.client
        import future.moves.xmlrpc.server

        from future.moves import _dummy_thread
        from future.moves import _markupbase
        from future.moves import _thread
denite.py 文件源码 项目:denite.nvim 作者: Shougo 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def filter_candidates(self, context):
        for source in self._current_sources:
            ctx = source.context
            ctx['matchers'] = context['matchers']
            ctx['input'] = context['input']
            if context['smartcase']:
                ctx['ignorecase'] = re.search(r'[A-Z]', ctx['input']) is None
            ctx['mode'] = context['mode']
            ctx['async_timeout'] = 0.03 if ctx['mode'] != 'insert' else 0.02
            if ctx['prev_input'] != ctx['input'] and ctx['is_interactive']:
                ctx['event'] = 'interactive'
                ctx['all_candidates'] = self._gather_source_candidates(
                    ctx, source)
            ctx['prev_input'] = ctx['input']
            entire = ctx['all_candidates']
            if ctx['is_async']:
                ctx['event'] = 'async'
                entire += self._gather_source_candidates(ctx, source)
            if not entire:
                yield source.name, entire, [], []
                continue
            partial = []
            ctx['candidates'] = entire
            for i in range(0, len(entire), 1000):
                ctx['candidates'] = entire[i:i+1000]
                matchers = [self._filters[x] for x in
                            (ctx['matchers'].split(',') if ctx['matchers']
                             else source.matchers)
                            if x in self._filters]
                self.match_candidates(ctx, matchers)
                partial += ctx['candidates']
                if len(partial) >= 1000:
                    break
            ctx['candidates'] = partial
            for f in [self._filters[x]
                      for x in source.sorters + source.converters
                      if x in self._filters]:
                ctx['candidates'] = f.filter(ctx)
            partial = ctx['candidates']
            for c in partial:
                c['source'] = source.name
            ctx['candidates'] = []

            patterns = filterfalse(lambda x: x == '', (
                self._filters[x].convert_pattern(context['input'])
                for x in source.matchers if self._filters[x]))

            yield source.name, entire, partial, patterns
redis.py 文件源码 项目:quantbube 作者: nooperpudd 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def add_many(self, name, timestamp_pairs, chunks_size=2000, *args, **kwargs):
        """
        :param name:
        :param timestamp_pairs: [("timestamp",data)]
        :param chunks_size:
        :param args:
        :param kwargs:
        :return:
        """
        incr_key = self.incr_format.format(key=name)
        hash_key = self.hash_format.format(key=name)

        # remove exist data

        # todo maybe other way to optimize this filter code
        sorted_timestamps = sorted(timestamp_pairs, key=itemgetter(0))

        max_timestamp = sorted_timestamps[-1][0]  # max
        min_timestamp = sorted_timestamps[0][0]  # min

        filter_data = self.get_slice(name, start=min_timestamp, end=max_timestamp)
        if filter_data:
            timestamp_set = set(map(lambda x: x[0], filter_data))
            filter_results = itertools.filterfalse(lambda x: x[0] in timestamp_set, sorted_timestamps)
        else:
            filter_results = sorted_timestamps
        chunks_data = helper.chunks(filter_results, chunks_size)

        with self._pipe_acquire() as pipe:
            for chunks in chunks_data:
                start_id = self.client.get(incr_key) or 1  # if key not exist id equal 0
                end_id = self.client.incrby(incr_key, amount=len(chunks))  # incr the add length

                start_id = int(start_id)
                end_id = int(end_id)

                ids_range = range(start_id, end_id)

                dumps_results = map(lambda x: (x[0], self.serializer.dumps(x[1])), chunks)

                mix_data = itertools.zip_longest(dumps_results, ids_range)  # [(("timestamp",data),id),...]
                mix_data = list(mix_data)  # need converted as list

                timestamp_ids = map(lambda seq: (seq[0][0], seq[1]), mix_data)  # [("timestamp",id),...]
                ids_pairs = map(lambda seq: (seq[1], seq[0][1]), mix_data)  # [("id",data),...]

                timestamp_ids = itertools.chain.from_iterable(timestamp_ids)
                ids_values = {k: v for k, v in ids_pairs}

                pipe.multi()
                pipe.zadd(name, *timestamp_ids)
                pipe.hmset(hash_key, ids_values)
                pipe.execute()


问题


面经


文章

微信
公众号

扫码关注公众号