python类Sequence()的实例源码

test_collections.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_Sequence(self):
        for sample in [tuple, list, bytes, str]:
            self.assertIsInstance(sample(), Sequence)
            self.assertTrue(issubclass(sample, Sequence))
        self.assertIsInstance(range(10), Sequence)
        self.assertTrue(issubclass(range, Sequence))
        self.assertIsInstance(memoryview(b""), Sequence)
        self.assertTrue(issubclass(memoryview, Sequence))
        self.assertTrue(issubclass(str, Sequence))
        self.validate_abstract_methods(Sequence, '__contains__', '__iter__', '__len__',
            '__getitem__')
factory.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def CheckSqliteRowAsSequence(self):
        """ Checks if the row object can act like a sequence """
        self.con.row_factory = sqlite.Row
        row = self.con.execute("select 1 as a, 2 as b").fetchone()

        as_tuple = tuple(row)
        self.assertEqual(list(reversed(row)), list(reversed(as_tuple)))
        self.assertIsInstance(row, Sequence)
test_collections.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_Sequence(self):
        for sample in [tuple, list, bytes, str]:
            self.assertIsInstance(sample(), Sequence)
            self.assertTrue(issubclass(sample, Sequence))
        self.assertIsInstance(range(10), Sequence)
        self.assertTrue(issubclass(range, Sequence))
        self.assertIsInstance(memoryview(b""), Sequence)
        self.assertTrue(issubclass(memoryview, Sequence))
        self.assertTrue(issubclass(str, Sequence))
        self.validate_abstract_methods(Sequence, '__contains__', '__iter__', '__len__',
            '__getitem__')
factory.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def CheckSqliteRowAsSequence(self):
        """ Checks if the row object can act like a sequence """
        self.con.row_factory = sqlite.Row
        row = self.con.execute("select 1 as a, 2 as b").fetchone()

        as_tuple = tuple(row)
        self.assertEqual(list(reversed(row)), list(reversed(as_tuple)))
        self.assertIsInstance(row, Sequence)
jsonpointer.py 文件源码 项目:deb-python-json-pointer 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def walk(self, doc, part):
        """ Walks one step in doc and returns the referenced part """

        part = self.get_part(doc, part)

        assert (type(doc) in (dict, list) or hasattr(doc, '__getitem__')), "invalid document type %s" % (type(doc),)

        if isinstance(doc, Mapping):
            try:
                return doc[part]

            except KeyError:
                raise JsonPointerException("member '%s' not found in %s" % (part, doc))

        elif isinstance(doc, Sequence):

            if part == '-':
                return EndOfList(doc)

            try:
                return doc[part]

            except IndexError:
                raise JsonPointerException("index '%s' is out of bounds" % (part, ))

        else:
            # Object supports __getitem__, assume custom indexing
            return doc[part]
utils.py 文件源码 项目:skorch 作者: dnouri 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def to_tensor(X, use_cuda):
    """Turn to torch Variable.

    Handles the cases:
      * Variable
      * PackedSequence
      * numpy array
      * torch Tensor
      * list or tuple of one of the former
      * dict of one of the former

    """
    to_tensor_ = partial(to_tensor, use_cuda=use_cuda)

    if isinstance(X, (Variable, nn.utils.rnn.PackedSequence)):
        return X

    if isinstance(X, dict):
        return {key: to_tensor_(val) for key, val in X.items()}

    if isinstance(X, (list, tuple)):
        return [to_tensor_(x) for x in X]

    if isinstance(X, np.ndarray):
        X = torch.from_numpy(X)

    if isinstance(X, Sequence):
        X = torch.from_numpy(np.array(X))
    elif np.isscalar(X):
        X = torch.from_numpy(np.array([X]))

    if not is_torch_data_type(X):
        raise TypeError("Cannot convert this data type to a torch tensor.")

    if use_cuda:
        X = X.cuda()
    return X
step_executor.py 文件源码 项目:SoS 作者: vatlab 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def append(self, task_def):
        self._unsubmitted_tasks.append(task_def)
        if isinstance(task_def[2], Sequence):
            self._all_output.extend(task_def[2])
        self._all_ids.append(task_def[0])
step_executor.py 文件源码 项目:SoS 作者: vatlab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def has_output(self, output):
        if not isinstance(output, Sequence) or not self._unsubmitted_tasks:
            return False
        return any(x in self._all_output for x in output)
actions.py 文件源码 项目:SoS 作者: vatlab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def collect_input(script, input):
    # determine file extension
    if input is not None:
        if isinstance(input, (str, file_target)):
            ext = os.path.splitext(input)[-1]
        elif isinstance(input, Sequence) and len(input) > 0:
            ext = os.path.splitext(input[0])[-1]
        else:
            raise ValueError('Unknown input file for action pandoc')
    else:
        ext = '.md'

    input_file = tempfile.NamedTemporaryFile(mode='w+t', suffix=ext, delete=False).name
    with open(input_file, 'w') as tmp:
        if script is not None and script.strip():
            tmp.write(script.rstrip() + '\n\n')
        if isinstance(input, str):
            try:
                with open(input) as ifile:
                    tmp.write(ifile.read() + '\n\n')
            except  Exception as e:
                    raise ValueError(f'Failed to read input file {input}: {e}')
        elif isinstance(input, Sequence):
            for ifile in input:
                try:
                    with open(ifile) as itmp:
                        tmp.write(itmp.read().rstrip() + '\n\n')
                except  Exception as e:
                        raise ValueError(f'Failed to read input file {ifile}: {e}')
    return input_file
workflow_executor.py 文件源码 项目:SoS 作者: vatlab 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def match(self, target, step):
        # for sos_step, we need to match step name
        if isinstance(target, sos_step):
            return step.match(target.target_name())
        if not 'provides' in step.options:
            return False
        patterns = step.options['provides']
        if isinstance(patterns, (str, BaseTarget)):
            patterns = [patterns]
        elif not isinstance(patterns, Sequence):
            raise RuntimeError(f'Unknown target to match: {patterns}')
        #
        for p in patterns:
            # other targets has to match exactly
            if isinstance(target, BaseTarget) or isinstance(p, BaseTarget):
                if target == p:
                    return {}
                else:
                    continue
            # if this is a regular string
            res = extract_pattern(p, [target])
            if res and not any(None in x for x in res.values()):
                return {x:y[0] for x,y in res.items()}
            # string match
            elif file_target(p) == file_target(target):
                return True
        return False
targets.py 文件源码 项目:SoS 作者: vatlab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, *targets):
        super(remote, self).__init__()
        self.__unresolvable_object__ = True
        if len(targets) == 1:
            self._target = targets[0]
        else:
            # multi-item targets
            self._target = targets
        if isinstance(self._target, Sequence) and not isinstance(self._target, str):
            self.__flattenable__ = True
hosts.py 文件源码 项目:SoS 作者: vatlab 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _get_shared_dirs(self):
        value = self.config.get('shared', [])
        if isinstance(value, str):
            return [value]
        elif isinstance(value, Sequence):
            return value
        else:
            raise ValueError('Option shared can only be a string or a list of strings')
hosts.py 文件源码 项目:SoS 作者: vatlab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _map_path(self, source):
        result = {}
        cwd = os.getcwd()
        if isinstance(source, (str, path)):
            dest = os.path.abspath(os.path.expanduser(source))
            # we use samefile to avoid problems with case-insensitive file system #522
            # we also use the "cwd" name to avoid wrong case for cwd. For example,
            # if the cwd = '/Users/user/Project'
            # then, dest = '/USERS/USER/PROJECT/a.txt'
            # would be converted to '/Users/user/Project/a.txt' before path mapping
            if os.path.exists(dest[:len(cwd)]) and os.path.samefile(dest[:len(cwd)], cwd):
                dest = cwd + dest[len(cwd):]
            matched = [k for k in self.path_map.keys() if os.path.exists(dest[:len(k)]) and os.path.samefile(dest[:len(k)], k)]
            if matched:
                # pick the longest key that matches
                k = max(matched, key=len)
                dest = self.path_map[k] + dest[len(k):]
            else:
                env.logger.warning(
                    f'Path {source} is not under any specified paths of localhost and is mapped to {dest} on remote host.')
            result[source] = dest.replace('\\', '/')
        elif isinstance(source, (Sequence, set)):
            for src in source:
                result.update(self._map_path(src))
        else:
            env.logger.debug(f'Ignore unmappable source {source}')
            return {source: source}
        return result

    #
    # Interface functions
    #
hosts.py 文件源码 项目:SoS 作者: vatlab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _map_var(self, source):
        cwd = os.getcwd()
        if isinstance(source, str):
            dest = os.path.abspath(os.path.expanduser(source))
            # we use samefile to avoid problems with case-insensitive file system #522
            # we also use the "cwd" name to avoid wrong case for cwd. For example,
            # if the cwd = '/Users/user/Project'
            # then, dest = '/USERS/USER/PROJECT/a.txt'
            # would be converted to '/Users/user/Project/a.txt' before path mapping
            if os.path.exists(dest[:len(cwd)]) and os.path.samefile(dest[:len(cwd)], cwd):
                dest = cwd + dest[len(cwd):]
            matched = [k for k in self.path_map.keys() if os.path.exists(dest[:len(k)]) and os.path.samefile(dest[:len(k)], k)]
            if matched:
                # pick the longest key that matches
                k = max(matched, key=len)
                dest = self.path_map[k] + dest[len(k):]
            else:
                env.logger.warning(
                    f'Path {source} is not under any specified paths of localhost and is mapped to {dest} on remote host.')
            return dest.replace('\\', '/')
        elif isinstance(source, (Sequence, set)):
            ret = [self._map_var(x) for x in source]
            return [x for x in ret if x is not None]
        else:
            env.logger.debug(f'Ignore unmappable source {source}')
            return source
pool.py 文件源码 项目:python-pool-performance 作者: JohnStarich 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def map(self, work_func: FunctionType, inputs: Sequence) -> Sequence:
        raise NotImplementedError("{} does not implement map"
                                  .format(self.__class__.__name__))
__init__.py 文件源码 项目:yarl 作者: aio-libs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _get_str_query(self, *args, **kwargs):
        if kwargs:
            if len(args) > 0:
                raise ValueError("Either kwargs or single query parameter "
                                 "must be present")
            query = kwargs
        elif len(args) == 1:
            query = args[0]
        else:
            raise ValueError("Either kwargs or single query parameter "
                             "must be present")

        if query is None:
            query = ''
        elif isinstance(query, Mapping):
            quoter = partial(_quote, qs=True)
            lst = []
            for k, v in query.items():
                if isinstance(v, str):
                    pass
                elif type(v) == int:  # no subclasses like bool
                    v = str(v)
                else:
                    raise TypeError("Invalid variable type: mapping value "
                                    "should be str or int, got {!r}".format(v))
                lst.append(
                    quoter(k, safe='/?:@') + '=' + quoter(v, safe='/?:@'))
            query = '&'.join(lst)
        elif isinstance(query, str):
            query = _quote(query, safe='/?:@',
                           protected=PROTECT_CHARS,
                           qs=True)
        elif isinstance(query, (bytes, bytearray, memoryview)):
            raise TypeError("Invalid query type: bytes, bytearray and "
                            "memoryview are forbidden")
        elif isinstance(query, Sequence):
            quoter = partial(_quote, qs=True, safe='/?:@')
            query = '&'.join(quoter(k) + '=' + quoter(v)
                             for k, v in query)
        else:
            raise TypeError("Invalid query type: only str, mapping or "
                            "sequence of (str, str) pairs is allowed")

        return query
attrs.py 文件源码 项目:stellarmagnate 作者: abadger 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def sequence_of_type(_type, mutable, instance, attribute, value):
    """
    Validate that a value is a Sequence containing a specific type.

    :arg _type: The type of the values inside of the sequence
    :arg mutable: selects whether a sequence can be mutable or not
        :mutable: only mutable sequences are allowed
        :immutable: only immutable sequences are allowed
        :both: both mutable and immutable sequences are allowed
    :arg instance: The instance of the attr.s class that is being created
    :arg attribute: The attribute of the attr.s class that is being set
    :arg value: The value the attribute is being set to

    This function will be used with the :meth:`attr.ib` validate parameter and
    :func:`functools.partial`.  Example::

        @attr.s
        class CommodityData:
            type = attr.ib(validator=partial(enum_validator, CommodityType))
    """
    if mutable == 'both':
        msg = 'a Sequence'
    elif mutable == 'mutable':
        msg = 'a MutableSequence'
    elif mutable == 'immutable':
        msg = 'an Immutable Sequence'
    else:
        raise ValueError('sequence_of_type was given an improper argument for mutable')

    if not isinstance(value, Sequence):
        raise ValueError('{} is not {}'.format(value, msg))

    if isinstance(value, MutableSequence):
        if mutable == 'immutable':
            raise ValueError('{} is not {}'.format(value, msg))
    else:
        if mutable == 'mutable':
            raise ValueError('{} is not {}'.format(value, msg))

    for entry in value:
        if not isinstance(entry, _type):
            raise ValueError('The Sequence element {} is not a {}'.format(value, _type))
pypacker_server.py 文件源码 项目:taf 作者: taf3 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __register_methods(self):
        """Register all Pypacker TG methods.

        """
        def xmlrpc_wrap(func, *args, **kwargs):
            """Register all Pypacker TG methods.

            """
            try:
                return func(*args, **kwargs)
            except:
                exc_type, exc_value, exc_traceback = sys.exc_info()
                traceback_message = traceback.format_exception(exc_type, exc_value, exc_traceback)
                raise xmlrpc.Fault(500, traceback_message)

        def wrap_method(method):
            """Register all Pypacker TG methods.

            """
            return lambda args, kwargs: xmlrpc_wrap(getattr(self.pypacker, method),
                                                    *pickle.loads(args.data), **pickle.loads(kwargs.data))

        def wrap_attibute(attr):
            """Register all Pypacker TG attributes.

            """
            return lambda: getattr(self.pypacker, attr)

        # Get full list of pypacker attrs
        pypacker_attrs = (fn for fn in dir(self.pypacker) if not fn.startswith("_"))
        # Register attributes and methods
        for attr in pypacker_attrs:
            attr_instance = getattr(self.pypacker, attr)
            if isinstance(attr_instance, (str, int, Sequence, Mapping)):
                setattr(self, "xmlrpc_{0}".format(attr), wrap_attibute(attr))
                self.class_logger.debug("Registered Pypacker TG attribute %s", attr)
            elif callable(attr_instance):
                setattr(self, "xmlrpc_{0}".format(attr), wrap_method(attr))
                self.class_logger.debug("Registered Pypacker TG method %s", attr)

        # Need to wrap stop_sniff separately
        # because we have to perform additional procedures with sniffed data before sending.
        self.xmlrpc_stop_sniff = self.stop_sniff  # pylint: disable=attribute-defined-outside-init
__init__.py 文件源码 项目:stig 作者: rndusr 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, filters=''):
        if not isinstance(self.filterclass, type) or not issubclass(self.filterclass, Filter):
            raise RuntimeError('Attribute "filterclass" must be set to a Filter class, not {!r}'
                               .format(self.filterclass))

        if isinstance(filters, str):  # Because str is also instance of abc.Sequence
            pass
        elif isinstance(filters, abc.Sequence) and all(isinstance(f, str) for f in filters):
            filters = '|'.join(filters)
        elif not isinstance(filters, str):
            raise TypeError('filters must be string or sequence of strings, not {}: {!r}'
                            .format(type(filters).__name__, filters))

        # self._filterchains is a tuple of tuples.  Each inner tuple combines
        # filters with AND.  The outer tuple combines the inner, AND-combined
        # tuples with OR.
        parts = tuple(part for part in self._op_regex.split(filters) if part is not '')
        if len(parts) < 1:
            self._filterchains = ()
        else:
            if parts[0] in '&|':
                raise ValueError('Filter can\'t start with operator: {!r}'.format(parts[0]))
            elif parts[-1] in '&|':
                raise ValueError('Filter can\'t end with operator: {!r}'.format(parts[-1]))

            filters = []
            ops = []
            expect = 'filter'
            nofilter = self.filterclass()
            for i,part in enumerate(parts):
                if expect is 'filter':
                    if part not in '&|':
                        f = self.filterclass(part)
                        if f == nofilter:
                            # part is something like 'all' or '*' - this
                            # disables all other filters
                            filters = []
                            ops = []
                            break
                        else:
                            filters.append(f)
                            expect = 'operator'
                            continue
                elif expect is 'operator':
                    if part in '&|':
                        ops.append(part)
                        expect = 'filter'
                        continue
                raise ValueError('Consecutive operators: {!r}'.format(''.join(parts[i-2:i+2])))

            if filters:
                fchain = [[]]
                for filter,op in zip_longest(filters, ops):
                    fchain[-1].append(filter)
                    if op is '|':
                        fchain.append([])
                self._filterchains = tuple(tuple(x) for x in fchain)
            else:
                self._filterchains = ()
emitjson.py 文件源码 项目:emitjson 作者: atsuoishimoto 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def repository():
    @singledispatch
    def _repogitory(obj):
        return obj

    def _register(cls, func=None):
        if func is None:
            return lambda f: _register(cls, f)

        if isinstance(func, type):
            if issubclass(func, ObjConverter):
                func = func(cls)

        if isinstance(func, ObjConverter):
            func.repogitory = _repogitory
            func = func.run

        return _repogitory.org_register(cls, func)

    _repogitory.org_register = _repogitory.register
    _repogitory.register = _register

    def fromSQLAlchemyModel(model, attrs=None, ignores=None):
        names = [col.name for col in model.__table__.columns]
        ObjConverter.build(_repogitory, model, names, attrs, ignores)

    _repogitory.fromSQLAlchemyModel = fromSQLAlchemyModel


    def fromDjangoModel(model, attrs, ignores):
        ObjConverter.build(_repogitory, model, _django_get_all_field_names(model),
                    attrs, ignores)

    _repogitory.fromDjangoModel = fromDjangoModel

    def raw(obj):
        return obj
    _repogitory.register(str, raw)

    def conv_seq(obj):
        return tuple(_repogitory(o) for o in obj)

    _repogitory.register(abc.Sequence, conv_seq)
    _repogitory.register(abc.Set, conv_seq)

    @_repogitory.register(abc.Mapping)
    def conv_mapping(obj):
        return {_repogitory(k):_repogitory(v) for k, v in obj.items()}

    def conv_date(obj):
        return obj.isoformat()
    _repogitory.register(datetime.date, conv_date)
    _repogitory.register(datetime.datetime, conv_date)

    return _repogitory


问题


面经


文章

微信
公众号

扫码关注公众号