python类Sequence()的实例源码

api_torrent.py 文件源码 项目:stig 作者: rndusr 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def torrents(self, torrents=None, keys='ALL', autoconnect=True):
        """Fetch and return torrents

        torrents: Iterator of torrent IDs, TorrentFilter object (or its string
                  representation) or None for all torrents
        keys: tuple of Torrent keys to fetch or 'ALL' for all torrents
        autoconnect: Wether to attempt to connect automatically if not
                     connected; if False and not connected, return None

        Return Response with the following properties:
            torrents: tuple of Torrent objects with requested torrents
            success: False if no torrents were found, True otherwise
            msgs: list of strings/`ClientError`s caused by the request
        """
        if not autoconnect and not self.rpc.connected:
            return None
        elif torrents is None:
            return await self._get_torrents_by_ids(keys)
        elif isinstance(torrents, (str, TorrentFilter)):
            return await self._get_torrents_by_filter(keys, tfilter=torrents)
        elif isinstance(torrents, abc.Sequence) and \
             all(isinstance(id, int) for id in torrents):
            return await self._get_torrents_by_ids(keys, ids=torrents)
        else:
            raise ValueError("Invalid 'torrents' argument: {!r}".format(torrents))
selection.py 文件源码 项目:mastic 作者: ADicksonLab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, member, flags=None):

        if flags is not None:
            assert (issubclass(type(flags), colabc.Set) or \
                issubclass(type(flags), colabc.Sequence)) and \
                not isinstance(flags, str), \
                "flags must be a container and not a string"
            assert all([isinstance(flag, str) for flag in list(flags)]), \
                "all flags must be strings, given{}".format(flags)

        super().__init__()
        self.member = member

        # list of selections
        self._registry = []
        # list of flags for specific kinds of selections
        self._flags = set()
        if flags:
            self._flags.update(flags)
selection.py 文件源码 项目:mastic 作者: ADicksonLab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, selection_list=None, flags=None):
        if not selection_list:
            self.data = []

        super().__init__(selection_list, flags=flags)

        if selection_list:
            assert issubclass(type(selection_list), col.Sequence), \
                "selection_dict must be a subclass of collections.Sequence, not {}".format(
                    type(selection_list))
            self.data = selection_list

        # if values in the selection_list are SelectionMembers update
        # their registries
        for idx, member in enumerate(self.data):
            if issubclass(type(member), SelectionMember):
                member.register_selection(idx, self, flags=flags)
orders.py 文件源码 项目:exchanges 作者: black-omen 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, action, asset_pair, volume, price=None, validate=False):
        if action not in [Order.BUY, Order.SELL]:
            raise ValueError("Action must be Order.BUY or Order.SELL")

        if not isinstance(asset_pair, Sequence) or len(asset_pair) != 2:
            raise TypeError("asset_pair must be a sequence of lenght 2")

        if any([not isinstance(a, Asset) for a in asset_pair]):
            raise TypeError("asset_pair must be a sequence of assets, "
                            "not {}, {}".format(*map(type, asset_pair)))
        if volume < 0:
            raise ValueError("volume should be strictly positive")

        self.id = None
        self._action = action
        self._asset_pair = asset_pair
        self._volume = volume
        self._price = price
        self.status = None
        self.time_placed = None
        self.closed_price = None
        self.closed_time = None
        self._validate = validate
jsonpointer.py 文件源码 项目:deb-python-json-pointer 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_part(self, doc, part):
        """ Returns the next step in the correct type """

        if isinstance(doc, Mapping):
            return part

        elif isinstance(doc, Sequence):

            if part == '-':
                return part

            if not RE_ARRAY_INDEX.match(str(part)):
                raise JsonPointerException("'%s' is not a valid list index" % (part, ))

            return int(part)

        elif hasattr(doc, '__getitem__'):
            # Allow indexing via ducktyping if the target has defined __getitem__
            return part

        else:
            raise JsonPointerException("Document '%s' does not support indexing, "
                                       "must be dict/list or support __getitem__" % type(doc))
tasks.py 文件源码 项目:SoS 作者: vatlab 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def addTags(filename, new_tags):
    with open(filename, 'rb') as task:
        header = task.readline()
        # read the tags
        tags = task.readline().decode().strip().split(' ')
        if isinstance(new_tags, str):
            if new_tags in tags:
                return
            else:
                tags.append(new_tags)
        elif isinstance(new_tags, Sequence):
            new_tags = [tag for tag in new_tags if tag not in tags]
            if new_tags:
                tags.extend(new_tags)
            else:
                return
        else:
            raise ValueError(f'Cannot add tags {new_tags} to task {filename}')
        body = task.read()
    with open(filename, 'wb') as task:
        task.write(header)
        task.write((' '.join(tags) + '\n').encode())
        task.write(body)
utils.py 文件源码 项目:SoS 作者: vatlab 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def short_repr(obj, noneAsNA=False):
    '''Return a short representation of obj for clarity.'''
    if obj is None:
        return 'unspecified' if noneAsNA else 'None'
    elif isinstance(obj, str) and len(obj) > 80:
        return '{}...{}'.format(obj[:60].replace('\n', '\\n'), obj[-20:].replace('\n', '\\n'))
    elif isinstance(obj, (str, int, float, bool)) or (isinstance(obj, collections.Sequence) \
        and len(obj) <= 2) or len(str(obj)) < 80:
        return repr(obj)
    elif isinstance(obj, collections.Sequence): # should be a list or tuple
        return f'[{short_repr(obj[0])}, ...] ({len(obj)} items)'
    elif isinstance(obj, dict):
        if obj:
            first_key = list(obj.keys())[0]
            return f'{{{first_key!r}:{short_repr(obj[first_key])!r}, ...}} ({len(obj)} items)'
        else:
            return '{}'
    else:
        return f'{repr(obj)[:60]}...'

#
# SoS Workflow dictionary
#
hosts.py 文件源码 项目:SoS 作者: vatlab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _get_path_map(self):
        res = {}
        # if user-specified path_map, it overrides CONFIG
        path_map = self.config.get('path_map', [])
        #
        if not path_map:
            return res
        if isinstance(path_map, str):
            path_map = [path_map]
        if isinstance(path_map, Sequence):
            for v in path_map:
                if ' -> ' not in v:
                    raise ValueError(f'Path map should be separated as from -> to, {v} specified')
                elif v.count(' -> ') > 1:
                    raise ValueError(f'Path map should be separated as from -> to, {v} specified')
                res[v.split(' -> ')[0]] = v.split(' -> ')[1]
        elif isinstance(path_map, dict):
            for k,v in path_map.items():
                res[k] = v
        else:
            raise ValueError(f'Unacceptable path_mapue for configuration path_map: {path_map}')
        return res
jsonpointer.py 文件源码 项目:yatta_reader 作者: sound88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_part(self, doc, part):
        """Returns the next step in the correct type"""

        if isinstance(doc, Mapping):
            return part

        elif isinstance(doc, Sequence):

            if part == '-':
                return part

            if not self._RE_ARRAY_INDEX.match(str(part)):
                raise JsonPointerException("'%s' is not a valid sequence index" % part)

            return int(part)

        elif hasattr(doc, '__getitem__'):
            # Allow indexing via ducktyping
            # if the target has defined __getitem__
            return part

        else:
            raise JsonPointerException("Document '%s' does not support indexing, "
                                       "must be mapping/sequence or support __getitem__" % type(doc))
jsonpointer.py 文件源码 项目:yatta_reader 作者: sound88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def walk(self, doc, part):
        """ Walks one step in doc and returns the referenced part """

        part = self.get_part(doc, part)

        assert hasattr(doc, '__getitem__'), "invalid document type %s" % (type(doc),)

        if isinstance(doc, Sequence):
            if part == '-':
                return EndOfList(doc)

            try:
                return doc[part]

            except IndexError:
                raise JsonPointerException("index '%s' is out of bounds" % (part, ))

        # Else the object is a mapping or supports __getitem__(so assume custom indexing)
        try:
            return doc[part]

        except KeyError:
            raise JsonPointerException("member '%s' not found in %s" % (part, doc))
pools.py 文件源码 项目:python-pool-performance 作者: JohnStarich 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run_test(work_type: FunctionType, job_sets: Sequence, trials: int,
             pool_class: type, worker_count: int) -> Mapping:
    pool = pool_class(worker_count)
    if work_type == 'compute':
        test_func = pool.run_compute_test
    elif work_type == 'network':
        test_func = pool.run_network_test
    else:
        raise Exception("Invalid work type: {}".format(work_type))
    results = map(
        lambda jobs: test_func(jobs, trials, show_progress=True),
        tqdm(job_sets, desc=pool_class.__name__),
    )
    summarized_results = list(map(summarize_test, results))
    pool.destroy_pool()
    return summarized_results
step.py 文件源码 项目:sequent 作者: Acrisel 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, ev, step, progname, repeat=[1,], iter_triggers=(), end_triggers=(),):
        module_logger.debug("[ step %s ] Container initialization\n    iter_triggers: %s\n    end_triggers: %s\n    repeat: %s" % (progname, iter_triggers, end_triggers, repeat) )
        self.ev=ev
        self.progname=progname
        self.starters=iter_triggers
        self.enders=end_triggers
        if isinstance(repeat, AbcSequence):
            repeat=IterGen(repeat)
        self.repeat=repeat
        self.loop_index=0
        self.initiating_sequence=None
        self.step=step
        self.max_concurrent=self.step.config['max_concurrent']
        self.triggers=None
transformations.py 文件源码 项目:mfnf-pdf-export 作者: Lodifice 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __call__(self, obj):
        """Transforms the JSON object `obj`."""
        if isinstance(obj, str):
            return obj
        elif isinstance(obj, Sequence):
            return self.act_on_list(obj)
        elif isinstance(obj, Mapping):
            return self.act_on_dict(obj)
        else:
            return obj
utils.py 文件源码 项目:pudzu 作者: Udzu 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def non_string_sequence(v, types=None):
    """Return whether the object is a Sequence other than str, optionally 
    with the given element types."""
    return isinstance(v, Sequence) and not isinstance(v, str) and (types is None or all(any(isinstance(x, t) for t in make_iterable(types)) for x in v))
filelist.py 文件源码 项目:stig 作者: rndusr 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _file_is_filtered(self, tfile):
        if self._ffilter is None:
            return False  # No filter specified
        elif isinstance(self._ffilter, (abc.Sequence, abc.Set)):
            # ffilter is a collection of file IDs
            return not tfile['id'] in self._ffilter
        else:
            # ffilter is a TorrentFileFilter instance
            return not self._ffilter.match(tfile)
filelist.py 文件源码 项目:stig 作者: rndusr 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def focused_file_ids(self):
        """File IDs of the focused files in a tuple"""
        focused = self.focused_widget
        if focused is not None:
            # The focused widget in the list can be a file or a directory.  If
            # it's a directory, the 'file_id' property returns the IDs of all
            # the contained files recursively.
            fid = focused.file_id
            return tuple(fid) if isinstance(fid, (abc.Sequence, abc.Set)) else (fid,)
trequestpool.py 文件源码 项目:stig 作者: rndusr 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def register(self, sid, callback, keys=(), tfilter=None):
        """Add new request to request pool

        sid: Subscriber ID (any hashable)
        callback: Callable that receives a tuple of Torrents on updates
        keys: Wanted Torrent keys
        tfilter: None for all torrents or TorrentFilter instance
        """
        if isinstance(tfilter, abc.Sequence):
            tfilter = TorrentFilter('|'.join('id=%s' % tid for tid in tfilter))

        log.debug('Registering subscriber: %s', sid)
        event = blinker.signal(sid)
        event.connect(callback)
        self._keys[event] = tuple(keys)
        self._tfilters[event] = tfilter

        # It's possible that a currently ongoing request doesn't collect the
        # keys this new callback needs.  In that case, the request is finished
        # AFTER we added the callback, and the callback would be called with
        # lacking keys, resuling in a KeyError.
        # Therefore we ask the poller to dump the result of a currently
        # ongoing request to prevent this.
        if self.running:
            self.skip_ongoing_request()

        self._combine_requests()
peer.py 文件源码 项目:stig 作者: rndusr 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run(self, TORRENT_FILTER, PEER_FILTER, sort, columns):
        columns = self.cfg['columns.peers'].value if columns is None else columns
        sort = self.cfg['sort.peers'].value if sort is None else sort
        try:
            tfilter = self.select_torrents(TORRENT_FILTER,
                                           allow_no_filter=True,
                                           discover_torrent=True)
            pfilter = self.get_peer_filter(PEER_FILTER)
            sort    = self.get_peer_sorter(sort)
            columns = self.get_peer_columns(columns)
        except ValueError as e:
            log.error(e)
            return False

        # Unless we're listing peers of exactly one torrent, specified by its
        # ID, automatically add the 'torrent' column.
        if 'torrent' not in columns and \
           (not isinstance(tfilter, abc.Sequence) or len(tfilter) != 1):
            columns.append('torrent')

        log.debug('Listing %s peers of %s torrents', pfilter, tfilter)

        if asyncio.iscoroutinefunction(self.make_plist):
            return await self.make_plist(tfilter, pfilter, sort, columns)
        else:
            return self.make_plist(tfilter, pfilter, sort, columns)
tracker.py 文件源码 项目:stig 作者: rndusr 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run(self, TORRENT_FILTER, TRACKER_FILTER, sort, columns):
        columns = self.cfg['columns.trackers'].value if columns is None else columns
        sort = self.cfg['sort.trackers'].value if sort is None else sort
        try:
            torfilter = self.select_torrents(TORRENT_FILTER,
                                             allow_no_filter=True,
                                             discover_torrent=True)
            trkfilter = self.get_tracker_filter(TRACKER_FILTER)
            sort      = self.get_tracker_sorter(sort)
            columns   = self.get_tracker_columns(columns)
        except ValueError as e:
            log.error(e)
            return False

        # Unless we're listing trackers of exactly one torrent, specified by its
        # ID, automatically add the 'torrent' column.
        if 'torrent' not in columns and \
           (not isinstance(torfilter, abc.Sequence) or len(torfilter) != 1):
            columns.append('torrent')

        log.debug('Listing %s trackers of %s torrents', trkfilter, torfilter)

        if asyncio.iscoroutinefunction(self.make_trklist):
            return await self.make_trklist(torfilter, trkfilter, sort, columns)
        else:
            return self.make_trklist(torfilter, trkfilter, sort, columns)
__init__.py 文件源码 项目:stig 作者: rndusr 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __repr__(self):
        if isinstance(self._args, abc.Sequence):
            argstr = ', '.join(repr(arg) for arg in self._args)
        elif isinstance(self._args, abc.Mapping):
            argstr = ', '.join('%s=%r' % (k,v)
                                for k,v in self._args.items())
        provides = '/'.join(interface for interface in self.provides)
        string = '<Command [{}] {}({})'.format(provides, self.name, argstr)
        if self.finished:
            string += ' success={}'.format(self.success)
        else:
            string += ' running'
        return string + '>'
__init__.py 文件源码 项目:stig 作者: rndusr 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _validate_cmdchain_item(self, item):
        # Test if item is of a valid type
        if not (is_op(item) or (isinstance(item, abc.Sequence) and not isinstance(item, str) and
                                all(isinstance(arg, str) for arg in item))):
            raise RuntimeError('Invalid type for command chain item: {!r}'.format(item))

        # Test if item is an operator after another operator
        try:
            prev_item = self._prev_validation_item
        except AttributeError:
            prev_item = None
        self._prev_validation_item = item
        if is_op(prev_item) and is_op(item):
            raise CmdError('Consecutive operators: "{} {}"'.format(prev_item, item))
vector.py 文件源码 项目:Chiaki-Nanami 作者: Ikusaba-san 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __add__(self, other):
        if not isinstance(other, Sequence):
            return NotImplemented
        self._check_compatibility(other)
        return type(self)(map(operator.add, self, other))
vector.py 文件源码 项目:Chiaki-Nanami 作者: Ikusaba-san 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __radd__(self, other):
        if not isinstance(other, Sequence):
            return NotImplemented
        self._check_compatibility(other)
        return type(self)(map(operator.add, other, self))
vector.py 文件源码 项目:Chiaki-Nanami 作者: Ikusaba-san 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __sub__(self, other):
        if not isinstance(other, Sequence):
            return NotImplemented
        self._check_compatibility(other)
        return type(self)(map(operator.sub, self, other))
vector.py 文件源码 项目:Chiaki-Nanami 作者: Ikusaba-san 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __rsub__(self, other):
        if not isinstance(other, Sequence):
            return NotImplemented
        self._check_compatibility(other)
        return type(self)(map(operator.sub, other, self))
context_managers.py 文件源码 项目:Chiaki-Nanami 作者: Ikusaba-san 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def redirect_exception(*exceptions, cls=ChiakiException):
    """Context manager to re-raise exceptions with a proxy exception class.

    The exceptions can either be an exception type or a (exc_type, string) pair.
    """
    exceptions = dict(exc if isinstance(exc, Sequence) else (exc, None)
                      for exc in exceptions)
    try:
        yield
    except tuple(exceptions) as e:
        raise cls(exceptions[type(e)] or str(e)) from e


# asynccontextmanager when
test_collections.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_Sequence(self):
        for sample in [tuple, list, bytes, str]:
            self.assertIsInstance(sample(), Sequence)
            self.assertTrue(issubclass(sample, Sequence))
        self.assertIsInstance(range(10), Sequence)
        self.assertTrue(issubclass(range, Sequence))
        self.assertTrue(issubclass(str, Sequence))
        self.validate_abstract_methods(Sequence, '__contains__', '__iter__', '__len__',
            '__getitem__')
compare.py 文件源码 项目:deoplete-latex 作者: poppyschmo 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def fake_deepdiff(one, two, indent=4, path=None, strict_strings=None):
    """Compare two term dictionaries. ``strict_strings=False`` treats
    strings that contain the same combination of words as equal.
    """
    for k, v in one.items():
        _one = v
        _two = two.get(k)
        if _one == _two:
            continue
        if all(isinstance(d, abc.MutableMapping) for d in (_one, _two)):
            _path = path if path is not None else []
            _path += ['{:<{width}}{}'.format('', k, width=indent)]
            fake_deepdiff(_one, _two, indent + 4, _path, strict_strings)
            continue
        if (all(isinstance(l, abc.MutableSequence) for l in (_one, _two)) and
                set(tuple(x) for x in _one if isinstance(x, abc.Sequence)) ==
                set(tuple(x) for x in _two if isinstance(x, abc.Sequence))):
            continue
        if all(isinstance(l, str) for l in (_one, _two)):
            if (strict_strings is False and
                    set(c.strip(';:,.?=_-\n') for c in _one.split()) ==
                    set(c.strip(';:,.?=_-\n') for c in _two.split())):
                continue
            else:
                _one = _one.strip().replace('\n', '')
                _two = _two.strip().replace('\n', '')
        print('\n'.join(path) if path else '')
        print('{:<{width}}{}'.format('', k, width=indent))
        print('{:<{width}}one: {}'.format('', _one, width=indent + 4))
        print('{:<{width}}two: {}'.format('', _two, width=indent + 4))
types.py 文件源码 项目:deoplete-latex 作者: poppyschmo 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def is_seq(s):
    """Return True if sequence is list or tuple or some facsimile.
    Reject dictionary views, memoryview, bytearray, array.array etc.
    """
    if isinstance(s, abc.Sequence) and not isinstance(s, (str, bytes)):
        return True
    return False
types.py 文件源码 项目:deoplete-latex 作者: poppyschmo 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def enlist(*args, ret_type=tuple):
    """Take a combinations of strings and sequences, consolidate.
    """
    inset = set()
    for s in args:
        if isinstance(s, str):
            inset.add(s)
        # can be tuple, list, set, etc...
        if not isinstance(s, str) and isinstance(s, abc.Sequence):
            inset |= set(s)
    return tuple(sorted(inset)) if ret_type is tuple else sorted(inset)


问题


面经


文章

微信
公众号

扫码关注公众号