python类iteritems()的实例源码

postgres_sqlalchemy.py 文件源码 项目:database_assetstore 作者: OpenGeoscience 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        # The super class also validates the connector
        super(PostgresSAConnector, self).__init__(*args, **kwargs)
        # dbparams can include values in http://www.postgresql.org/docs/
        #   current/static/libpq-connect.html#LIBPQ-PARAMKEYWORDS
        self.databaseOperators = PostgresOperators
        # Get a list of types and their classes so that we can cast using
        # sqlalchemy
        self.types = KnownTypes
        KnownTypes.update({
            type: getattr(dialect, type) for type in dir(dialect)
            if isinstance(getattr(dialect, type),
                          sqlalchemy.sql.visitors.VisitableType)})
        # Include types that were added to the ischema_names table (this is
        # done, for instance, by the geoalchemy module).
        for ikey, itype in six.iteritems(dialect.base.ischema_names):
            key = getattr(itype, '__visit_name__', None)
            if key and key not in KnownTypes:
                KnownTypes[key] = itype
period.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __getstate__(self):
        state_dict = {k: v for k, v in iteritems(self.__dict__)
                      if not k.startswith('_')}

        state_dict['_portfolio_store'] = self._portfolio_store
        state_dict['_account_store'] = self._account_store

        state_dict['processed_transactions'] = \
            dict(self.processed_transactions)
        state_dict['orders_by_id'] = \
            dict(self.orders_by_id)
        state_dict['orders_by_modified'] = \
            dict(self.orders_by_modified)
        state_dict['_payout_last_sale_prices'] = \
            self._payout_last_sale_prices

        STATE_VERSION = 3
        state_dict[VERSION_LABEL] = STATE_VERSION
        return state_dict
position_tracker.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_positions(self):

        positions = self._positions_store

        for sid, pos in iteritems(self.positions):

            if pos.amount == 0:
                # Clear out the position if it has become empty since the last
                # time get_positions was called.  Catching the KeyError is
                # faster than checking `if sid in positions`, and this can be
                # potentially called in a tight inner loop.
                try:
                    del positions[sid]
                except KeyError:
                    pass
                continue

            # Note that this will create a position if we don't currently have
            # an entry
            position = positions[sid]
            position.amount = pos.amount
            position.cost_basis = pos.cost_basis
            position.last_sale_price = pos.last_sale_price
        return positions
synthetic.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, constants, dates, sids):
        loaders = {}
        for column, const in iteritems(constants):
            frame = DataFrame(
                const,
                index=dates,
                columns=sids,
                dtype=column.dtype,
            )
            loaders[column] = DataFrameLoader(
                column=column,
                baseline=frame,
                adjustments=None,
            )

        self._loaders = loaders
test_events.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_completeness(self):
        """
        Tests that all rules are being tested.
        """
        if not self.class_:
            return  # This is the base class testing, it is always complete.

        dem = {
            k for k, v in iteritems(vars(zipline.utils.events))
            if isinstance(v, type) and
            issubclass(v, self.class_) and
            v is not self.class_
        }
        ds = {
            k[5:] for k in dir(self)
            if k.startswith('test') and k[5:] in dem
        }
        self.assertTrue(
            dem <= ds,
            msg='This suite is missing tests for the following classes:\n' +
            '\n'.join(map(repr, dem - ds)),
        )
test_pipeline_algo.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def create_bar_reader(cls, tempdir):
        resources = {
            cls.AAPL: join(TEST_RESOURCE_PATH, 'AAPL.csv'),
            cls.MSFT: join(TEST_RESOURCE_PATH, 'MSFT.csv'),
            cls.BRK_A: join(TEST_RESOURCE_PATH, 'BRK-A.csv'),
        }
        raw_data = {
            asset: read_csv(path, parse_dates=['day']).set_index('day')
            for asset, path in iteritems(resources)
        }
        # Add 'price' column as an alias because all kinds of stuff in zipline
        # depends on it being present. :/
        for frame in raw_data.values():
            frame['price'] = frame['close']

        writer = DailyBarWriterFromCSVs(resources)
        data_path = tempdir.getpath('testdata.bcolz')
        table = writer.write(data_path, trading_days, cls.assets)
        return raw_data, BcolzDailyBarReader(table)
base.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def with_defaults(**default_funcs):
    """
    Decorator for providing dynamic default values for a method.

    Usages:

    @with_defaults(foo=lambda self: self.x + self.y)
    def func(self, foo):
        ...

    If a value is passed for `foo`, it will be used. Otherwise the function
    supplied to `with_defaults` will be called with `self` as an argument.
    """
    def decorator(f):
        @wraps(f)
        def method(self, *args, **kwargs):
            for name, func in iteritems(default_funcs):
                if name not in kwargs:
                    kwargs[name] = func(self)
            return f(self, *args, **kwargs)
        return method
    return decorator
test_buyback_auth.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def pipeline_event_loader_args(self, dates):
        _, mapping = super(
            BlazeShareBuybackAuthLoaderTestCase,
            self,
        ).pipeline_event_loader_args(dates)
        return (bz.data(pd.concat(
            pd.DataFrame({
                BUYBACK_ANNOUNCEMENT_FIELD_NAME:
                    frame[BUYBACK_ANNOUNCEMENT_FIELD_NAME],
                SHARE_COUNT_FIELD_NAME:
                    frame[SHARE_COUNT_FIELD_NAME],
                TS_FIELD_NAME:
                    frame[TS_FIELD_NAME],
                SID_FIELD_NAME: sid,
            })
            for sid, frame in iteritems(mapping)
        ).reset_index(drop=True)),)
api.py 文件源码 项目:umapi-client.py 作者: adobe-apiplatform 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def insert(self, **kwargs):
        """
        Insert commands at the beginning of the sequence.

        This is provided because certain commands
        have to come first (such as user creation), but may be need to beadded
        after other commands have already been specified.
        Later calls to insert put their commands before those in the earlier calls.

        Also, since the order of iterated kwargs is not guaranteed (in Python 2.x),
        you should really only call insert with one keyword at a time.  See the doc of append
        for more details.
        :param kwargs: the key/value pair to append first
        :return: the action, so you can append Action(...).insert(...).append(...)
        """
        for k, v in six.iteritems(kwargs):
            self.commands.insert(0, {k: v})
        return self
legacy.py 文件源码 项目:umapi-client.py 作者: adobe-apiplatform 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def do(self, **kwargs):
        """
        Here for compatibility with legacy clients only - DO NOT USE!!!
        This is sort of mix of "append" and "insert": it puts commands in the list,
        with some half smarts about which commands go at the front or back.
        If you add multiple commands to the back in one call, they will get added sorted by command name.
        :param kwargs: the commands in key=val format
        :return: the Action, so you can do Action(...).do(...).do(...)
        """
        # add "create" / "add" / "removeFrom" first
        for k, v in list(six.iteritems(kwargs)):
            if k.startswith("create") or k.startswith("addAdobe") or k.startswith("removeFrom"):
                self.commands.append({k: v})
                del kwargs[k]

        # now do the other actions, in a canonical order (to avoid py2/py3 variations)
        for k, v in sorted(six.iteritems(kwargs)):
            if k in ['add', 'remove']:
                self.commands.append({k: {"product": v}})
            else:
                self.commands.append({k: v})
        return self
functional.py 文件源码 项目:umapi-client.py 作者: adobe-apiplatform 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def update(self, email=None, username=None, first_name=None, last_name=None, country=None):
        """
        Update values on an existing user.  See the API docs for what kinds of update are possible.
        :param email: new email for this user
        :param username: new username for this user
        :param first_name: new first name for this user
        :param last_name: new last name for this user
        :param country: new country for this user
        :return: the User, so you can do User(...).update(...).add_to_groups(...)
        """
        if email:
            self._validate(email=email)
        if username:
            self._validate(username=username)
        updates = {}
        for k, v in six.iteritems(dict(email=email, username=username,
                                       firstname=first_name, lastname=last_name,
                                       country=country)):
            if v: updates[k] = v
        return self.append(update=updates)
client.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def clean_headers(headers):
    """Forces header keys and values to be strings, i.e not unicode.

    The httplib module just concats the header keys and values in a way that
    may make the message header a unicode string, which, if it then tries to
    contatenate to a binary request body may result in a unicode decode error.

    Args:
        headers: dict, A dictionary of headers.

    Returns:
        The same dictionary but with all the keys converted to strings.
    """
    clean = {}
    try:
        for k, v in six.iteritems(headers):
            if not isinstance(k, six.binary_type):
                k = str(k)
            if not isinstance(v, six.binary_type):
                v = str(v)
            clean[_to_bytes(k)] = _to_bytes(v)
    except UnicodeEncodeError:
        raise NonAsciiHeaderError(k, ': ', v)
    return clean
discovery.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _add_next_methods(self, resourceDesc, schema):
    # Add _next() methods
    # Look for response bodies in schema that contain nextPageToken, and methods
    # that take a pageToken parameter.
    if 'methods' in resourceDesc:
      for methodName, methodDesc in six.iteritems(resourceDesc['methods']):
        if 'response' in methodDesc:
          responseSchema = methodDesc['response']
          if '$ref' in responseSchema:
            responseSchema = schema.get(responseSchema['$ref'])
          hasNextPageToken = 'nextPageToken' in responseSchema.get('properties',
                                                                   {})
          hasPageToken = 'pageToken' in methodDesc.get('parameters', {})
          if hasNextPageToken and hasPageToken:
            fixedMethodName, method = createNextMethod(methodName + '_next')
            self._set_dynamic_attr(fixedMethodName,
                                   method.__get__(self, self.__class__))
model.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _build_query(self, params):
    """Builds a query string.

    Args:
      params: dict, the query parameters

    Returns:
      The query parameters properly encoded into an HTTP URI query string.
    """
    if self.alt_param is not None:
      params.update({'alt': self.alt_param})
    astuples = []
    for key, value in six.iteritems(params):
      if type(value) == type([]):
        for x in value:
          x = x.encode('utf-8')
          astuples.append((key, x))
      else:
        if isinstance(value, six.text_type) and callable(value.encode):
          value = value.encode('utf-8')
        astuples.append((key, value))
    return '?' + urlencode(astuples)
test_config.py 文件源码 项目:caduc 作者: tjamet 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_init_with_values(self):
        value = self.faker.text()
        node = caduc.config.Node(someAttribute=value)
        dict(node).should.be.eql({'someAttribute':value})

        dic = self.faker.pydict(nb_elements=10, variable_nb_elements=True)
        node = caduc.config.Node(**dic)
        for key, val in six.iteritems(node):
            val.should.be.eql(dic[key])
        dict(node).should.be.eql(dic)

        # ensure Node initializer does not alter source dict
        dic = self.faker.pydict(nb_elements=10, variable_nb_elements=True)
        orig = dict(dic)
        caduc.config.Node(**dic)
        dic.should.be.eql(orig)
image.py 文件源码 项目:caduc 作者: tjamet 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_grace_times(self, names):
        labels = self.details['Config']['Labels']
        if labels and labels.get("com.caduc.image.grace_time"):
            return set([labels.get('com.caduc.image.grace_time', None)])
        grace_config = self.config.get("images")
        grace_times = set()
        if grace_config:
            for name in names:
                for pattern, kv in six.iteritems(grace_config):
                    if fnmatch.fnmatch(name, pattern):
                        grace_time = kv['grace_time']
                        if grace_time is None or grace_time==-1:
                            grace_times.add(float('inf'))
                        else:
                            grace_times.add(kv['grace_time'])
        if grace_times:
            return grace_times
        return set([self.grace_time])
config.py 文件源码 项目:caduc 作者: tjamet 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def update(self, other):
        for k, v in six.iteritems(other):
            try:
                oldv = self[k]
            except KeyError:
                if isinstance(v, dict):
                    node = Node()
                    node.update(v)
                    self[k] = node
                else:
                    self[k] = v
            else:
                if isinstance(oldv, dict):
                    if not isinstance(v, dict):
                        raise ValueError("Can't update uncoherent values for key %s, old value: %r, new value: %r" % (k, oldv, v))
                    oldv.update(v)
                else:
                    if isinstance(v, dict):
                        raise ValueError("Can't update uncoherent values for key %s, old value: %r, new value: %r" % (k, oldv, v))
                    self[k] = v
metadata.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _add_table_metadata(self, table_metadata):
        old_indexes = {}
        old_meta = self.tables.get(table_metadata.name, None)
        if old_meta:
            # views are not queried with table, so they must be transferred to new
            table_metadata.views = old_meta.views
            # indexes will be updated with what is on the new metadata
            old_indexes = old_meta.indexes

        # note the intentional order of add before remove
        # this makes sure the maps are never absent something that existed before this update
        for index_name, index_metadata in six.iteritems(table_metadata.indexes):
            self.indexes[index_name] = index_metadata

        for index_name in (n for n in old_indexes if n not in table_metadata.indexes):
            self.indexes.pop(index_name, None)

        self.tables[table_metadata.name] = table_metadata
cluster.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 72 收藏 0 点赞 0 评论 0
def _get_schema_mismatches(self, peers_result, local_result, local_address):
        peers_result = dict_factory(*peers_result.results)

        versions = defaultdict(set)
        if local_result.results:
            local_row = dict_factory(*local_result.results)[0]
            if local_row.get("schema_version"):
                versions[local_row.get("schema_version")].add(local_address)

        for row in peers_result:
            schema_ver = row.get('schema_version')
            if not schema_ver:
                continue
            addr = self._rpc_from_peer_row(row)
            peer = self._cluster.metadata.get_host(addr)
            if peer and peer.is_up is not False:
                versions[schema_ver].add(addr)

        if len(versions) == 1:
            log.debug("[control connection] Schemas match")
            return None

        return dict((version, list(nodes)) for version, nodes in six.iteritems(versions))
util.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        if len(args) > 1:
            raise TypeError('expected at most 1 arguments, got %d' % len(args))

        self._items = []
        self._index = {}
        if args:
            e = args[0]
            if callable(getattr(e, 'keys', None)):
                for k in e.keys():
                    self._insert(k, e[k])
            else:
                for k, v in e:
                    self._insert(k, v)

        for k, v in six.iteritems(kwargs):
            self._insert(k, v)


问题


面经


文章

微信
公众号

扫码关注公众号