python类database()的实例源码

storage2.py 文件源码 项目:InplusTrader_Linux 作者: zhengwsh 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _ensure_index(self):
        """
        Ensure indices for all databases and collections.

        first access self._dbs config to get index column names;
        then get collection names from self._collNames and loop
        over all collections.

        """
        if self._collNames and self._dbs:
            try:
                for dbName in self._dbs:
                    # Iterate over database configurations.

                    db = self._dbs[dbName]
                    dbSelf = db['self']
                    index = db['index']
                    collNames = self._collNames[db['collNames']]
                    # db['self'] is the pymongo.Database object.

                    for name in collNames:
                        coll = dbSelf[name]
                        coll.ensure_index([(index, 
                                            pymongo.DESCENDING)], unique=True)
                print '[MONGOD]: MongoDB index set.'
                return 1
            except KeyError:
                msg = '[MONGOD]: Unable to set collection indices; ' + \
                      'infomation in Config.body["dbs"] is incomplete.'
                raise VNPAST_DatabaseError(msg)
            except Exception, e:
                msg = '[MONGOD]: Unable to set collection indices; ' + str(e)
                raise VNPAST_DatabaseError(msg)

    #----------------------------------------------------------------------
    # Download method.
storage2.py 文件源码 项目:InplusTrader_Linux 作者: zhengwsh 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __update(self, key, target1, target2, sessionNum):
        """
        Basic update method.
        Looks into the database specified by 'key', find the latest 
        record in the collection of it. Then update the collections 
        till last trading date.

        parameters
        ----------
        * key: string; a database alias (refer to the database config)
          e.g., 'EQU_D1'.
        * target1: method; pointer to the function with which controller
          obtain all tickers in the database. Concretely, target1 are 
          self._all#Tickers methods.
        * target2: method; pointer to the api overlord requesting functions
          i.e. self._api.get_###_mongod methods.
        * sessionNum: integer; the number of threads.

        """
        try:
            # get databases and tickers
            db = self._dbs[key]['self']
            index = self._dbs[key]['index']
            allTickers = target1()
            coll = db[allTickers[0]]

            # find the latest timestamp in collection.
            latest = coll.find_one(
                     sort=[(index, pymongo.DESCENDING)])[index]
            start = datetime.strftime(
                latest + timedelta(days=1),'%Y%m%d')
            end = datetime.strftime(datetime.now(), '%Y%m%d')

            # then download.
            target2(db, start, end, sessionNum)
            return db

        except Exception, e:
            msg = '[MONGOD]: Unable to update data; ' + str(e)
            raise VNPAST_DatabaseError(msg)
storage2.py 文件源码 项目:InplusTrader_Linux 作者: zhengwsh 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def fetch(self, dbName, ticker, start, end, output='list'):
        """

        """
        # check inputs' validity.
        if output not in ['df', 'list', 'json']:
            raise ValueError('[MONGOD]: Unsupported output type.')
        if dbName not in self._dbNames:
            raise ValueError('[MONGOD]: Unable to locate database name.')

        db = self._dbs[dbName]
        dbSelf = db['self']
        dbIndex = db['index']
        try:
            coll = db[ticker]
            if len(start)==8 and len(end)==8:
                # yyyymmdd, len()=8
                start = datetime.strptime(start, '%Y%m%d')
                end = datetime.strptime(end, '%Y%m%d')
            elif len(start)==14 and len(end)==14:
                # yyyymmdd HH:MM, len()=14
                start = datetime.strptime(start, '%Y%m%d %H:%M')
                end = datetime.strptime(end, '%Y%m%d %H:%M')
            else:
                pass
            docs = []

            # find in MongoDB.
            for doc in coll.find(filter={dbIndex: {'$lte': end,
                '$gte': start}}, projection={'_id': False}):
                docs.append(doc)

            if output == 'list':
                return docs[::-1]

        except Exception, e:
            msg = '[MONGOD]: Error encountered when fetching data' + \
                  'from MongoDB; '+ str(e)
            return -1
mongo.py 文件源码 项目:lemonpay 作者: antfu 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self,database, collection):
        self.database = database
        self.collection = collection
        self.name = self.collection.name
mongo.py 文件源码 项目:lemonpay 作者: antfu 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _to_dict_inner(self, d):
        result = {}
        for k, v in d.items():
            if isinstance(v, DBRef):
                result[k] = self.collection.database.dereference(v).to_dict()
            elif isinstance(v, dict):
                result[k] = self._to_dict_inner(v)
            elif isinstance(v, ObjectId):
                result[k] = str(v)
            elif isinstance(v, datetime):
                result[k] = datetime_to_stamp(v)
            else:
                result[k] = v
        return result
mongo.py 文件源码 项目:lemonpay 作者: antfu 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_ref(self, key):
        if isinstance(self.get(key, None), DBRef):
            return self[key].dereference(self.collection.database)
__init__.py 文件源码 项目:pillar 作者: armadillica 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def db(self, collection_name: str = None) \
            -> typing.Union[pymongo.collection.Collection, pymongo.database.Database]:
        """Returns the MongoDB database, or the collection (if given)"""

        if collection_name:
            return self.data.driver.db[collection_name]
        return self.data.driver.db
core.py 文件源码 项目:mongomotor 作者: jucacrispim 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, database, name, _delegate=None):

        db_class = create_class_with_framework(
            MongoMotorAgnosticDatabase, self._framework, self.__module__)

        if not isinstance(database, db_class):
            raise TypeError("First argument to MongoMotorCollection must be "
                            "MongoMotorDatabase, not %r" % database)

        delegate = _delegate or Collection(database.delegate, name)
        super(AgnosticCollection, self).__init__(delegate)
        self.database = database
core.py 文件源码 项目:mongomotor 作者: jucacrispim 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __getitem__(self, name):
        collection_class = create_class_with_framework(
            MongoMotorAgnosticCollection, self._framework, self.__module__)

        return collection_class(self.database, self.name + '.' + name)
core.py 文件源码 项目:mongomotor 作者: jucacrispim 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __getattr__(self, name):
        if name.startswith('_'):
            # samething. try get from delegate first
            try:
                ret = getattr(self.delegate, name)
            except AttributeError:
                raise AttributeError(
                    "%s has no attribute %r. To access the %s"
                    " collection, use database['%s']." % (
                        self.__class__.__name__, name, name,
                        name))
            return ret

        return self[name]
core.py 文件源码 项目:mongomotor 作者: jucacrispim 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __getattr__(self, name):
        if name.startswith('_'):
            # the same. Try get from delegate.
            try:
                ret = getattr(self.delegate, name)
            except AttributeError:

                raise AttributeError(
                    "%s has no attribute %r. To access the %s"
                    " database, use client['%s']." % (
                        self.__class__.__name__, name, name, name))
            return ret

        return self[name]
__init__.py 文件源码 项目:pillar 作者: armadillica 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def setup_db_indices(self):
        """Adds missing database indices.

        This does NOT drop and recreate existing indices,
        nor does it reconfigure existing indices.
        If you want that, drop them manually first.
        """

        self.log.debug('Adding any missing database indices.')

        import pymongo

        db = self.data.driver.db

        coll = db['tokens']
        coll.create_index([('user', pymongo.ASCENDING)])
        coll.create_index([('token', pymongo.ASCENDING)])
        coll.create_index([('token_hashed', pymongo.ASCENDING)])

        coll = db['notifications']
        coll.create_index([('user', pymongo.ASCENDING)])

        coll = db['activities-subscriptions']
        coll.create_index([('context_object', pymongo.ASCENDING)])

        coll = db['nodes']
        # This index is used for queries on project, and for queries on
        # the combination (project, node type).
        coll.create_index([('project', pymongo.ASCENDING),
                           ('node_type', pymongo.ASCENDING)])
        coll.create_index([('parent', pymongo.ASCENDING)])
        coll.create_index([('short_code', pymongo.ASCENDING)],
                          sparse=True, unique=True)
        # Used for latest assets & comments
        coll.create_index([('properties.status', pymongo.ASCENDING),
                           ('node_type', pymongo.ASCENDING),
                           ('_created', pymongo.DESCENDING)])

        coll = db['projects']
        # This index is used for statistics, and for fetching public projects.
        coll.create_index([('is_private', pymongo.ASCENDING)])
        coll.create_index([('category', pymongo.ASCENDING)])


问题


面经


文章

微信
公众号

扫码关注公众号