python类next()的实例源码

query.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _fill_result_cache_to_idx(self, idx):
        self._execute_query()
        if self._result_idx is None:
            self._result_idx = -1

        qty = idx - self._result_idx
        if qty < 1:
            return
        else:
            for idx in range(qty):
                self._result_idx += 1
                while True:
                    try:
                        self._result_cache[self._result_idx] = self._construct_result(self._result_cache[self._result_idx])
                        break
                    except IndexError:
                        self._result_cache.append(next(self._result_generator))
query.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __iter__(self):
        self._execute_query()

        idx = 0
        while True:
            if len(self._result_cache) <= idx:
                try:
                    self._result_cache.append(next(self._result_generator))
                except StopIteration:
                    break

            instance = self._result_cache[idx]
            if isinstance(instance, dict):
                self._fill_result_cache_to_idx(idx)
            yield self._result_cache[idx]

            idx += 1
http.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def read(self, buf_len):
        """
        Implementation note: due to a constraint of the requests library, the
        buf_len that is used the first time this method is called will cause
        all future requests to ``read`` to have the same ``buf_len`` even if
        a different ``buf_len`` is passed in on subsequent requests.
        """
        if self._iter is None:  # lazy load response body iterator
            method = self.input_spec.get('method', 'GET').upper()
            headers = self.input_spec.get('headers', {})
            params = self.input_spec.get('params', {})
            req = requests.request(
                method, self.input_spec['url'], headers=headers, params=params,
                stream=True, allow_redirects=True)
            req.raise_for_status()  # we have the response headers already
            self._iter = req.iter_content(buf_len, decode_unicode=False)

        try:
            return six.next(self._iter)
        except StopIteration:
            return b''
query.py 文件源码 项目:infi.clickhouse_orm 作者: Infinidat 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __getitem__(self, s):
        if isinstance(s, six.integer_types):
            # Single index
            assert s >= 0, 'negative indexes are not supported'
            qs = copy(self)
            qs._limits = (s, 1)
            return six.next(iter(qs))
        else:
            # Slice
            assert s.step in (None, 1), 'step is not supported in slices'
            start = s.start or 0
            stop = s.stop or 2**63 - 1
            assert start >= 0 and stop >= 0, 'negative indexes are not supported'
            assert start <= stop, 'start of slice cannot be smaller than its end'
            qs = copy(self)
            qs._limits = (start, stop - start)
            return qs
database.py 文件源码 项目:infi.clickhouse_orm 作者: Infinidat 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def select(self, query, model_class=None, settings=None):
        '''
        Performs a query and returns a generator of model instances.

        - `query`: the SQL query to execute.
        - `model_class`: the model class matching the query's table,
          or `None` for getting back instances of an ad-hoc model.
        - `settings`: query settings to send as HTTP GET parameters
        '''
        query += ' FORMAT TabSeparatedWithNamesAndTypes'
        query = self._substitute(query, model_class)
        r = self._send(query, settings, True)
        lines = r.iter_lines()
        field_names = parse_tsv(next(lines))
        field_types = parse_tsv(next(lines))
        model_class = model_class or ModelBase.create_ad_hoc_model(zip(field_names, field_types))
        for line in lines:
            # skip blank line left by WITH TOTALS modifier
            if line:
                yield model_class.from_tsv(line, field_names, self.server_timezone, self)
models.py 文件源码 项目:infi.clickhouse_orm 作者: Infinidat 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None):
        '''
        Create a model instance from a tab-separated line. The line may or may not include a newline.
        The `field_names` list must match the fields defined in the model, but does not have to include all of them.
        If omitted, it is assumed to be the names of all fields in the model, in order of definition.

        - `line`: the TSV-formatted data.
        - `field_names`: names of the model fields in the data.
        - `timezone_in_use`: the timezone to use when parsing dates and datetimes.
        - `database`: if given, sets the database that this instance belongs to.
        '''
        from six import next
        field_names = field_names or [name for name, field in cls._fields]
        values = iter(parse_tsv(line))
        kwargs = {}
        for name in field_names:
            field = getattr(cls, name)
            kwargs[name] = field.to_python(next(values), timezone_in_use)

        obj = cls(**kwargs)
        if database is not None:
            obj.set_database(database)

        return obj
query_set.py 文件源码 项目:cqlmapper 作者: reddit 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def iter(self, conn):
        """ Return an iterator over all of the objects return by the query.

        :param conn: Cassandra connection wrapper used to execute the query.
        :type: cqlengine.ConnectionInterface subclass
        """
        self._execute_query(conn)

        idx = 0
        while True:
            if len(self._result_cache) <= idx:
                try:
                    self._result_cache.append(next(self._result_generator))
                except StopIteration:
                    break

            instance = self._result_cache[idx]
            if isinstance(instance, dict):
                self._fill_result_cache_to_idx(conn, idx)
            yield self._result_cache[idx]

            idx += 1
test_completion.py 文件源码 项目:azure-cli-shell 作者: Azure 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_second_completion(self):
        self.init3()
        doc = Document(u'crea ')
        gen = self.completer.get_completions(doc, None)
        with self.assertRaises(StopIteration):
            six.next(gen)

        doc = Document(u'create --fun ')
        gen = self.completer.get_completions(doc, None)
        with self.assertRaises(StopIteration):
            six.next(gen)

        doc = Document(u'command d ')
        gen = self.completer.get_completions(doc, None)
        with self.assertRaises(StopIteration):
            six.next(gen)

        doc = Document(u'create --funtimes "life" --hello')
        gen = self.completer.get_completions(doc, None)
        self.assertEqual(six.next(gen), Completion(
            "--helloworld", -7))
jinja2ext.py 文件源码 项目:pyramid_webpack 作者: stevearc 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def parse(self, parser):
        # the first token is the token that started the tag.  In our case
        # we only listen to ``'webpack'`` so this will be a name token with
        # `webpack` as value.  We get the line number so that we can give
        # that line number to the nodes we create by hand.
        lineno = six.next(parser.stream).lineno
        ctx_ref = nodes.ContextReference()

        # Parse a single expression that is the 'bundle' or 'config:bundle'
        args = [ctx_ref, parser.parse_expression()]

        # if there is a comma, the user provided an 'extensions' arg
        if parser.stream.skip_if('comma'):
            args.append(parser.parse_expression())
        else:
            args.append(nodes.Const(None))

        # now we parse the body of the cache block up to `endwebpack` and
        # drop the needle (which would always be `endwebpack` in that case)
        body = parser.parse_statements(['name:endwebpack'], drop_needle=True)

        call_args = [nodes.Name('ASSET', 'param')]

        return nodes.CallBlock(self.call_method('_get_graph', args), call_args,
                               [], body).set_lineno(lineno)
pipelines_common_test.py 文件源码 项目:magenta 作者: tensorflow 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testRandomPartition(self):
    random_partition = pipelines_common.RandomPartition(
        str, ['a', 'b', 'c'], [0.1, 0.4])
    random_nums = [0.55, 0.05, 0.34, 0.99]
    choices = ['c', 'a', 'b', 'c']
    random_partition.rand_func = functools.partial(six.next, iter(random_nums))
    self.assertEqual(random_partition.input_type, str)
    self.assertEqual(random_partition.output_type,
                     {'a': str, 'b': str, 'c': str})
    for i, s in enumerate(['hello', 'qwerty', '1234567890', 'zxcvbnm']):
      results = random_partition.transform(s)
      self.assertTrue(isinstance(results, dict))
      self.assertEqual(set(results.keys()), set(['a', 'b', 'c']))
      self.assertEqual(len(results.values()), 3)
      self.assertEqual(len([l for l in results.values() if l == []]), 2)  # pylint: disable=g-explicit-bool-comparison
      self.assertEqual(results[choices[i]], [s])
arrayiterator.py 文件源码 项目:ngraph 作者: NervanaSystems 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __next__(self):
        """
        Returns a new minibatch of data with each call.

        Yields:
            tuple: The next minibatch which includes both features and labels.
        """

        if self.index >= self.total_iterations:
            raise StopIteration

        i1 = (self.start + self.index * self.batch_size) % self.ndata
        bsz = min(self.batch_size, self.ndata - i1)
        oslice1 = slice(i1, i1 + bsz)
        self.index += 1

        if self.batch_size > bsz:
            batch_bufs = {k: np.concatenate([src[oslice1], src[:self.batch_size - bsz]])
                          for k, src in self.data_arrays.items()}
        else:
            batch_bufs = {k: src[oslice1] for k, src in self.data_arrays.items()}

        batch_bufs['iteration'] = self.index
        return batch_bufs
ccxt_exchange.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_market(self, symbol):
        """
        The CCXT market.

        Parameters
        ----------
        symbol:
            The CCXT symbol.

        Returns
        -------
        dict[str, Object]

        """
        s = self.get_symbol(symbol)
        market = next(
            (market for market in self.markets if market['symbol'] == s),
            None,
        )
        return market
fields.py 文件源码 项目:django-spectator 作者: philgyford 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def find_unique(self, model_instance, field, iterator, *args):
        # exclude the current model instance from the queryset used in finding
        # next valid hash
        queryset = self.get_queryset(model_instance.__class__, field)
        if model_instance.pk:
            queryset = queryset.exclude(pk=model_instance.pk)

        # form a kwarg dict used to impliment any unique_together contraints
        kwargs = {}
        for params in model_instance._meta.unique_together:
            if self.attname in params:
                for param in params:
                    kwargs[param] = getattr(model_instance, param, None)

        new = six.next(iterator)
        kwargs[self.attname] = new
        while not new or queryset.filter(**kwargs):
            new = six.next(iterator)
            kwargs[self.attname] = new
        setattr(model_instance, self.attname, new)
        return new
ascan.py 文件源码 项目:zap-api-python 作者: zaproxy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def scan(self, url=None, recurse=None, inscopeonly=None, scanpolicyname=None, method=None, postdata=None, contextid=None, apikey=''):
        """
        Runs the active scanner against the given URL and/or Context. Optionally, the 'recurse' parameter can be used to scan URLs under the given URL, the parameter 'inScopeOnly' can be used to constrain the scan to URLs that are in scope (ignored if a Context is specified), the parameter 'scanPolicyName' allows to specify the scan policy (if none is given it uses the default scan policy), the parameters 'method' and 'postData' allow to select a given request in conjunction with the given URL.
        """
        params = {'apikey': apikey}
        if url is not None:
            params['url'] = url
        if recurse is not None:
            params['recurse'] = recurse
        if inscopeonly is not None:
            params['inScopeOnly'] = inscopeonly
        if scanpolicyname is not None:
            params['scanPolicyName'] = scanpolicyname
        if method is not None:
            params['method'] = method
        if postdata is not None:
            params['postData'] = postdata
        if contextid is not None:
            params['contextId'] = contextid
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'ascan/action/scan/', params)))
ascan.py 文件源码 项目:zap-api-python 作者: zaproxy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def scan_as_user(self, url=None, contextid=None, userid=None, recurse=None, scanpolicyname=None, method=None, postdata=None, apikey=''):
        """
        Active Scans from the perspective of a User, obtained using the given Context ID and User ID. See 'scan' action for more details.
        """
        params = {'apikey': apikey}
        if url is not None:
            params['url'] = url
        if contextid is not None:
            params['contextId'] = contextid
        if userid is not None:
            params['userId'] = userid
        if recurse is not None:
            params['recurse'] = recurse
        if scanpolicyname is not None:
            params['scanPolicyName'] = scanpolicyname
        if method is not None:
            params['method'] = method
        if postdata is not None:
            params['postData'] = postdata
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'ascan/action/scanAsUser/', params)))
test_orderedmap.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_iter(self):
        keys = ['first', 'middle', 'last']
        values = list(range(len(keys)))
        items = list(zip(keys, values))
        om = OrderedMap(items)

        itr = iter(om)
        self.assertEqual(sum([1 for _ in itr]), len(keys))
        self.assertRaises(StopIteration, six.next, itr)

        self.assertEqual(list(iter(om)), keys)
        self.assertEqual(list(six.iteritems(om)), items)
        self.assertEqual(list(six.itervalues(om)), values)
query.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def first(self):
        try:
            return six.next(iter(self))
        except StopIteration:
            return None
conftest.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def private_folder(girder_client):
    me = girder_client.get('user/me')
    try:
        folder = six.next(
            girder_client.listFolder(
                me['_id'], parentFolderType='user', name='Private'))
    except StopIteration:
        raise Exception("User doesn't have a Private folder.")

    yield folder
girder.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def read(self, buf_len):
        """
        Implementation note: due to a constraint of the requests library, the
        buf_len that is used the first time this method is called will cause
        all future requests to ``read`` to have the same ``buf_len`` even if
        a different ``buf_len`` is passed in on subsequent requests.
        """
        if self._iter is None:  # lazy load response body iterator
            self._iter = self._client.downloadFileAsIterator(self._file_id, buf_len)

        try:
            return six.next(self._iter)
        except StopIteration:
            return b''
reader.py 文件源码 项目:pykbart 作者: chill17 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, file_handle, delimiter='\t'):
        self.reader = csv.reader(file_handle, delimiter=delimiter, encoding='utf-8')
        self.fields = list(six.next(self.reader))
reader.py 文件源码 项目:pykbart 作者: chill17 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def __next__(self):
        return KbartRecord(six.next(self.reader), fields=self.fields)
query_set.py 文件源码 项目:cqlmapper 作者: reddit 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _fill_result_cache_to_idx(self, conn, idx):
        """
        Fill the result cache to the given index.

        :param conn: Cassandra connection wrapper used to execute the query.
        :type: cqlengine.ConnectionInterface subclass
        :param idx: Index value to fill the result cache to
        :type: int
        """
        self._execute_query(conn)
        if self._result_idx is None:
            self._result_idx = -1

        qty = idx - self._result_idx
        if qty < 1:
            return
        else:
            for idx in range(qty):
                self._result_idx += 1
                while True:
                    try:
                        result = self._construct_result(
                            self._result_cache[self._result_idx]
                        )
                        self._result_cache[self._result_idx] = result
                        break
                    except IndexError:
                        self._result_cache.append(next(self._result_generator))
query_set.py 文件源码 项目:cqlmapper 作者: reddit 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def first(self, conn):
        try:
            return six.next(self.iter(conn))
        except StopIteration:
            return None
test_completion.py 文件源码 项目:azure-cli-shell 作者: Azure 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_command_completion(self):
        """ tests general command completion """
        self.init1()

        doc = Document(u'')
        gen = self.completer.get_completions(doc, None)
        self.assertEqual(six.next(gen), Completion("command"))
        self.assertEqual(six.next(gen), Completion("create"))

        doc = Document(u'c')
        gen = self.completer.get_completions(doc, None)
        self.assertEqual(six.next(gen), Completion("command", -1))
        self.assertEqual(six.next(gen), Completion("create", -1))

        doc = Document(u'cr')
        gen = self.completer.get_completions(doc, None)
        self.assertEqual(six.next(gen), Completion("create", -2))

        doc = Document(u'command ')
        gen = self.completer.get_completions(doc, None)
        self.assertEqual(six.next(gen), Completion("can"))

        doc = Document(u'create ')
        gen = self.completer.get_completions(doc, None)
        with self.assertRaises(StopIteration):
            six.next(gen)
test_completion.py 文件源码 项目:azure-cli-shell 作者: Azure 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_param_completion(self):
        """ tests param completion """
        self.init2()
        doc = Document(u'create -')
        gen = self.completer.get_completions(doc, None)
        self.assertEqual(six.next(gen), Completion(
            "-funtime", -1, display_meta="There is no work life balance, it's just your life"))

        doc = Document(u'command can -')
        gen = self.completer.get_completions(doc, None)
        with self.assertRaises(StopIteration):
            six.next(gen)
test_completion.py 文件源码 项目:azure-cli-shell 作者: Azure 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_param_double(self):
        """ tests not generating doubles for parameters """
        self.init3()
        doc = Document(u'create -f --')
        gen = self.completer.get_completions(doc, None)
        self.assertEqual(six.next(gen), Completion(
            "--helloworld", -2))

        doc = Document(u'create -f -')
        gen = self.completer.get_completions(doc, None)
        with self.assertRaises(StopIteration):
            six.next(gen)
test_completion.py 文件源码 项目:azure-cli-shell 作者: Azure 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_substring_completion(self):
        self.init4()
        doc = Document(u'create')
        gen = self.completer.get_completions(doc, None)
        self.assertEqual(six.next(gen), Completion(
            "createmore", -6))
archiveiterator.py 文件源码 项目:warcio 作者: webrecorder 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __next__(self):
        return six.next(self.the_iter)
arrayiterator.py 文件源码 项目:ngraph 作者: NervanaSystems 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def next(self):
        return self.__next__()
arrayiterator.py 文件源码 项目:ngraph 作者: NervanaSystems 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __iter__(self):
        """
        Returns a new minibatch of data with each call.
        Yields:
            dictionary: The next minibatch
                samples[key]: numpy array with shape (batch_size, seq_len, feature_dim)
        """

        while self.current_iter < self.total_iterations:
            for batch_idx in range(self.batch_size):
                if self.shuffle:
                    strt_idx = self.start + (self.current_iter * self.stride)
                    seq_start = strt_idx + (batch_idx * self.nbatches * self.seq_len)
                else:
                    strt_idx = self.start + (self.current_iter * self.batch_size * self.stride)
                    seq_start = strt_idx + (batch_idx * self.stride)

                idcs = np.arange(seq_start, seq_start + self.seq_len) % self.ndata
                for key in self.data_arrays.keys():
                    self.samples[key][batch_idx] = self.data_arrays[key][idcs]

            self.current_iter += 1

            if self.reverse_target:
                self.samples[self.tgt_key][:] = self.samples[self.tgt_key][:, ::-1]

            if self.get_prev_target:
                self.samples['prev_tgt'] = np.roll(self.samples[self.tgt_key], shift=1, axis=1)

            if self.include_iteration is True:
                self.samples['iteration'] = self.index
            yield self.samples


问题


面经


文章

微信
公众号

扫码关注公众号