def _ensure_index(self):
"""
Ensure indices for all databases and collections.
first access self._dbs config to get index column names;
then get collection names from self._collNames and loop
over all collections.
"""
if self._collNames and self._dbs:
try:
for dbName in self._dbs:
# Iterate over database configurations.
db = self._dbs[dbName]
dbSelf = db['self']
index = db['index']
collNames = self._collNames[db['collNames']]
# db['self'] is the pymongo.Database object.
for name in collNames:
coll = dbSelf[name]
coll.ensure_index([(index,
pymongo.DESCENDING)], unique=True)
print '[MONGOD]: MongoDB index set.'
return 1
except KeyError:
msg = '[MONGOD]: Unable to set collection indices; ' + \
'infomation in Config.body["dbs"] is incomplete.'
raise VNPAST_DatabaseError(msg)
except Exception, e:
msg = '[MONGOD]: Unable to set collection indices; ' + str(e)
raise VNPAST_DatabaseError(msg)
#----------------------------------------------------------------------
# Download method.
python类database()的实例源码
def __update(self, key, target1, target2, sessionNum):
"""
Basic update method.
Looks into the database specified by 'key', find the latest
record in the collection of it. Then update the collections
till last trading date.
parameters
----------
* key: string; a database alias (refer to the database config)
e.g., 'EQU_D1'.
* target1: method; pointer to the function with which controller
obtain all tickers in the database. Concretely, target1 are
self._all#Tickers methods.
* target2: method; pointer to the api overlord requesting functions
i.e. self._api.get_###_mongod methods.
* sessionNum: integer; the number of threads.
"""
try:
# get databases and tickers
db = self._dbs[key]['self']
index = self._dbs[key]['index']
allTickers = target1()
coll = db[allTickers[0]]
# find the latest timestamp in collection.
latest = coll.find_one(
sort=[(index, pymongo.DESCENDING)])[index]
start = datetime.strftime(
latest + timedelta(days=1),'%Y%m%d')
end = datetime.strftime(datetime.now(), '%Y%m%d')
# then download.
target2(db, start, end, sessionNum)
return db
except Exception, e:
msg = '[MONGOD]: Unable to update data; ' + str(e)
raise VNPAST_DatabaseError(msg)
def fetch(self, dbName, ticker, start, end, output='list'):
"""
"""
# check inputs' validity.
if output not in ['df', 'list', 'json']:
raise ValueError('[MONGOD]: Unsupported output type.')
if dbName not in self._dbNames:
raise ValueError('[MONGOD]: Unable to locate database name.')
db = self._dbs[dbName]
dbSelf = db['self']
dbIndex = db['index']
try:
coll = db[ticker]
if len(start)==8 and len(end)==8:
# yyyymmdd, len()=8
start = datetime.strptime(start, '%Y%m%d')
end = datetime.strptime(end, '%Y%m%d')
elif len(start)==14 and len(end)==14:
# yyyymmdd HH:MM, len()=14
start = datetime.strptime(start, '%Y%m%d %H:%M')
end = datetime.strptime(end, '%Y%m%d %H:%M')
else:
pass
docs = []
# find in MongoDB.
for doc in coll.find(filter={dbIndex: {'$lte': end,
'$gte': start}}, projection={'_id': False}):
docs.append(doc)
if output == 'list':
return docs[::-1]
except Exception, e:
msg = '[MONGOD]: Error encountered when fetching data' + \
'from MongoDB; '+ str(e)
return -1
def __init__(self,database, collection):
self.database = database
self.collection = collection
self.name = self.collection.name
def _to_dict_inner(self, d):
result = {}
for k, v in d.items():
if isinstance(v, DBRef):
result[k] = self.collection.database.dereference(v).to_dict()
elif isinstance(v, dict):
result[k] = self._to_dict_inner(v)
elif isinstance(v, ObjectId):
result[k] = str(v)
elif isinstance(v, datetime):
result[k] = datetime_to_stamp(v)
else:
result[k] = v
return result
def get_ref(self, key):
if isinstance(self.get(key, None), DBRef):
return self[key].dereference(self.collection.database)
def db(self, collection_name: str = None) \
-> typing.Union[pymongo.collection.Collection, pymongo.database.Database]:
"""Returns the MongoDB database, or the collection (if given)"""
if collection_name:
return self.data.driver.db[collection_name]
return self.data.driver.db
def __init__(self, database, name, _delegate=None):
db_class = create_class_with_framework(
MongoMotorAgnosticDatabase, self._framework, self.__module__)
if not isinstance(database, db_class):
raise TypeError("First argument to MongoMotorCollection must be "
"MongoMotorDatabase, not %r" % database)
delegate = _delegate or Collection(database.delegate, name)
super(AgnosticCollection, self).__init__(delegate)
self.database = database
def __getitem__(self, name):
collection_class = create_class_with_framework(
MongoMotorAgnosticCollection, self._framework, self.__module__)
return collection_class(self.database, self.name + '.' + name)
def __getattr__(self, name):
if name.startswith('_'):
# samething. try get from delegate first
try:
ret = getattr(self.delegate, name)
except AttributeError:
raise AttributeError(
"%s has no attribute %r. To access the %s"
" collection, use database['%s']." % (
self.__class__.__name__, name, name,
name))
return ret
return self[name]
def __getattr__(self, name):
if name.startswith('_'):
# the same. Try get from delegate.
try:
ret = getattr(self.delegate, name)
except AttributeError:
raise AttributeError(
"%s has no attribute %r. To access the %s"
" database, use client['%s']." % (
self.__class__.__name__, name, name, name))
return ret
return self[name]
def setup_db_indices(self):
"""Adds missing database indices.
This does NOT drop and recreate existing indices,
nor does it reconfigure existing indices.
If you want that, drop them manually first.
"""
self.log.debug('Adding any missing database indices.')
import pymongo
db = self.data.driver.db
coll = db['tokens']
coll.create_index([('user', pymongo.ASCENDING)])
coll.create_index([('token', pymongo.ASCENDING)])
coll.create_index([('token_hashed', pymongo.ASCENDING)])
coll = db['notifications']
coll.create_index([('user', pymongo.ASCENDING)])
coll = db['activities-subscriptions']
coll.create_index([('context_object', pymongo.ASCENDING)])
coll = db['nodes']
# This index is used for queries on project, and for queries on
# the combination (project, node type).
coll.create_index([('project', pymongo.ASCENDING),
('node_type', pymongo.ASCENDING)])
coll.create_index([('parent', pymongo.ASCENDING)])
coll.create_index([('short_code', pymongo.ASCENDING)],
sparse=True, unique=True)
# Used for latest assets & comments
coll.create_index([('properties.status', pymongo.ASCENDING),
('node_type', pymongo.ASCENDING),
('_created', pymongo.DESCENDING)])
coll = db['projects']
# This index is used for statistics, and for fetching public projects.
coll.create_index([('is_private', pymongo.ASCENDING)])
coll.create_index([('category', pymongo.ASCENDING)])