def test_uri_prioritised_over_host_and_port(self):
self.app.config['MONGO_URI'] = 'mongodb://localhost:27017/database_name'
self.app.config['MONGO_HOST'] = 'some_other_host'
self.app.config['MONGO_PORT'] = 27018
self.app.config['MONGO_DBNAME'] = 'not_the_correct_db_name'
with warnings.catch_warnings():
# URI connections without a username and password
# work, but warn that auth should be supplied
warnings.simplefilter('ignore')
mongo = flask.ext.pymongo.PyMongo(self.app)
if pymongo.version_tuple[0] > 2:
time.sleep(0.2)
assert ('localhost', 27017) == mongo.cx.address
else:
assert mongo.cx.host == 'localhost'
assert mongo.cx.port == 27017
assert mongo.db.name == 'database_name'
python类version_tuple()的实例源码
def test_create_with_document_class(self):
""" This test doesn't use self.mongo, because it has to change config
It uses second mongo connection, using a CUSTOM prefix to avoid
duplicate config_prefix exception. To make use of tearDown and thus DB
deletion even in case of failure, it uses same DBNAME.
"""
# copying standard DBNAME, so this DB gets also deleted by tearDown
self.app.config['CUSTOM_DBNAME'] = self.app.config['MONGO_DBNAME']
self.app.config['CUSTOM_DOCUMENT_CLASS'] = CustomDict
# not using self.mongo, because we want to use updated config
# also using CUSTOM, to avoid duplicate config_prefix exception
mongo = flask.ext.pymongo.PyMongo(self.app, 'CUSTOM')
assert mongo.db.things.find_one() is None
# write document and retrieve, to check if type is really CustomDict
if pymongo.version_tuple[0] > 2:
# Write Concern is set to w=1 by default in pymongo > 3.0
mongo.db.things.insert_one({'_id': 'thing', 'val': 'foo'})
else:
mongo.db.things.insert({'_id': 'thing', 'val': 'foo'}, w=1)
assert type(mongo.db.things.find_one()) == CustomDict
def test_uri_prioritised_over_host_and_port(self):
self.app.config['MONGO_URI'] = 'mongodb://localhost:27017/database_name'
self.app.config['MONGO_HOST'] = 'some_other_host'
self.app.config['MONGO_PORT'] = 27018
self.app.config['MONGO_DBNAME'] = 'not_the_correct_db_name'
with warnings.catch_warnings():
# URI connections without a username and password
# work, but warn that auth should be supplied
warnings.simplefilter('ignore')
mongo = flask.ext.pymongo.PyMongo(self.app)
if pymongo.version_tuple[0] > 2:
time.sleep(0.2)
assert ('localhost', 27017) == mongo.cx.address
else:
assert mongo.cx.host == 'localhost'
assert mongo.cx.port == 27017
assert mongo.db.name == 'database_name'
def test_create_with_document_class(self):
""" This test doesn't use self.mongo, because it has to change config
It uses second mongo connection, using a CUSTOM prefix to avoid
duplicate config_prefix exception. To make use of tearDown and thus DB
deletion even in case of failure, it uses same DBNAME.
"""
# copying standard DBNAME, so this DB gets also deleted by tearDown
self.app.config['CUSTOM_DBNAME'] = self.app.config['MONGO_DBNAME']
self.app.config['CUSTOM_DOCUMENT_CLASS'] = CustomDict
# not using self.mongo, because we want to use updated config
# also using CUSTOM, to avoid duplicate config_prefix exception
mongo = flask.ext.pymongo.PyMongo(self.app, 'CUSTOM')
assert mongo.db.things.find_one() is None
# write document and retrieve, to check if type is really CustomDict
if pymongo.version_tuple[0] > 2:
# Write Concern is set to w=1 by default in pymongo > 3.0
mongo.db.things.insert_one({'_id': 'thing', 'val': 'foo'})
else:
mongo.db.things.insert({'_id': 'thing', 'val': 'foo'}, w=1)
assert type(mongo.db.things.find_one()) == CustomDict
def _get_connection(self, host, port):
ckey = "{}:{}".format(host, port)
conn = self._conns.get(ckey, None)
if conn is None:
mps = ('max_pool_size' if pymongo.version_tuple[0] == 2
else 'maxPoolSize')
conn = pymongo.MongoClient(host, port, **{mps: self.MAX_POOL})
self._conns[ckey] = conn
return conn
def mongo_database():
connection_params = {'host': os.environ.get('MONGODB_HOST', 'localhost'),
'port': int(os.environ.get('MONGODB_PORT', 27017))}
if pymongo.version_tuple < (3, 0):
connection_params['safe'] = True
mongo = pymongo.MongoClient(**connection_params)
db = mongo.elasticapm_test
yield db
mongo.drop_database('elasticapm_test')
mongo.close()
def unique(self, drop_dups=False):
''' Make this index unique, optionally dropping duplicate entries.
:param drop_dups: Drop duplicate objects while creating the unique \
index? Default to ``False``
'''
self.__unique = True
if drop_dups and pymongo.version_tuple >= (2, 7, 5): # pragma: nocover
raise BadIndexException("drop_dups is not supported on pymongo >= 2.7.5")
self.__drop_dups = drop_dups
return self
def test_find_one_or_404(self):
self.mongo.db.things.remove()
try:
self.mongo.db.things.find_one_or_404({'_id': 'thing'})
except HTTPException as notfound:
assert notfound.code == 404, "raised wrong exception"
if pymongo.version_tuple[0] > 2:
self.mongo.db.things.insert_one({'_id': 'thing', 'val': 'foo'})
else:
self.mongo.db.things.insert({'_id': 'thing', 'val': 'foo'}, w=1)
# now it should not raise
thing = self.mongo.db.things.find_one_or_404({'_id': 'thing'})
assert thing['val'] == 'foo', 'got wrong thing'
# also test with dotted-named collections
self.mongo.db.things.morethings.remove()
try:
self.mongo.db.things.morethings.find_one_or_404({'_id': 'thing'})
except HTTPException as notfound:
assert notfound.code == 404, "raised wrong exception"
if pymongo.version_tuple[0] > 2:
# Write Concern is set to w=1 by default in pymongo > 3.0
self.mongo.db.things.morethings.insert_one({'_id': 'thing', 'val': 'foo'})
else:
self.mongo.db.things.morethings.insert({'_id': 'thing', 'val': 'foo'}, w=1)
# now it should not raise
thing = self.mongo.db.things.morethings.find_one_or_404({'_id': 'thing'})
assert thing['val'] == 'foo', 'got wrong thing'
def test_default_config_prefix(self):
self.app.config['MONGO_DBNAME'] = 'flask_pymongo_test_db'
self.app.config['MONGO_HOST'] = 'localhost'
self.app.config['MONGO_PORT'] = 27017
mongo = flask.ext.pymongo.PyMongo(self.app)
assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name
if pymongo.version_tuple[0] > 2:
time.sleep(0.2)
assert ('localhost', 27017) == mongo.cx.address
else:
assert mongo.cx.host == 'localhost'
assert mongo.cx.port == 27017
def test_custom_config_prefix(self):
self.app.config['CUSTOM_DBNAME'] = 'flask_pymongo_test_db'
self.app.config['CUSTOM_HOST'] = 'localhost'
self.app.config['CUSTOM_PORT'] = 27017
mongo = flask.ext.pymongo.PyMongo(self.app, 'CUSTOM')
assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name
if pymongo.version_tuple[0] > 2:
time.sleep(0.2)
assert ('localhost', 27017) == mongo.cx.address
else:
assert mongo.cx.host == 'localhost'
assert mongo.cx.port == 27017
def test_converts_str_to_int(self):
self.app.config['MONGO_DBNAME'] = 'flask_pymongo_test_db'
self.app.config['MONGO_HOST'] = 'localhost'
self.app.config['MONGO_PORT'] = '27017'
mongo = flask.ext.pymongo.PyMongo(self.app)
assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name
if pymongo.version_tuple[0] > 2:
time.sleep(0.2)
assert ('localhost', 27017) == mongo.cx.address
else:
assert mongo.cx.host == 'localhost'
assert mongo.cx.port == 27017
def test_config_with_uri(self):
self.app.config['MONGO_URI'] = 'mongodb://localhost:27017/flask_pymongo_test_db'
with warnings.catch_warnings():
# URI connections without a username and password
# work, but warn that auth should be supplied
warnings.simplefilter('ignore')
mongo = flask.ext.pymongo.PyMongo(self.app)
assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name
if pymongo.version_tuple[0] > 2:
time.sleep(0.2)
assert ('localhost', 27017) == mongo.cx.address
else:
assert mongo.cx.host == 'localhost'
assert mongo.cx.port == 27017
def test_config_with_document_class(self):
self.app.config['MONGO_DOCUMENT_CLASS'] = CustomDict
mongo = flask.ext.pymongo.PyMongo(self.app)
if pymongo.version_tuple[0] > 2:
assert mongo.cx.codec_options.document_class == CustomDict
else:
assert mongo.cx.document_class == CustomDict
def test_config_without_document_class(self):
mongo = flask.ext.pymongo.PyMongo(self.app)
if pymongo.version_tuple[0] > 2:
assert mongo.cx.codec_options.document_class == dict
else:
assert mongo.cx.document_class == dict
def test_host_with_port_does_not_get_overridden_by_separate_port_config_value(self):
self.app.config['MONGO_HOST'] = 'localhost:27017'
self.app.config['MONGO_PORT'] = 27018
with warnings.catch_warnings():
# URI connections without a username and password
# work, but warn that auth should be supplied
warnings.simplefilter('ignore')
mongo = flask.ext.pymongo.PyMongo(self.app)
if pymongo.version_tuple[0] > 2:
time.sleep(0.2)
assert ('localhost', 27017) == mongo.cx.address
else:
assert mongo.cx.host == 'localhost'
assert mongo.cx.port == 27017
def connectMongoEngine(pmcol, conn_uri=None):
if pymongo.version_tuple[0] == 2: #really? REALLY?
#host = pmcol.database.connection.HOST
#port = pmcol.database.connection.PORT
host = pmcol.database.connection.host
port = pmcol.database.connection.port
else:
host = pmcol.database.client.HOST
port = pmcol.database.client.PORT
# Can just use the connection uri, which has credentials
if conn_uri:
return meng.connect(pmcol.database.name, host=conn_uri)
return meng.connect(pmcol.database.name, host=host, port=port)
def test_find_one_or_404(self):
self.mongo.db.things.remove()
try:
self.mongo.db.things.find_one_or_404({'_id': 'thing'})
except HTTPException as notfound:
assert notfound.code == 404, "raised wrong exception"
if pymongo.version_tuple[0] > 2:
self.mongo.db.things.insert_one({'_id': 'thing', 'val': 'foo'})
else:
self.mongo.db.things.insert({'_id': 'thing', 'val': 'foo'}, w=1)
# now it should not raise
thing = self.mongo.db.things.find_one_or_404({'_id': 'thing'})
assert thing['val'] == 'foo', 'got wrong thing'
# also test with dotted-named collections
self.mongo.db.things.morethings.remove()
try:
self.mongo.db.things.morethings.find_one_or_404({'_id': 'thing'})
except HTTPException as notfound:
assert notfound.code == 404, "raised wrong exception"
if pymongo.version_tuple[0] > 2:
# Write Concern is set to w=1 by default in pymongo > 3.0
self.mongo.db.things.morethings.insert_one({'_id': 'thing', 'val': 'foo'})
else:
self.mongo.db.things.morethings.insert({'_id': 'thing', 'val': 'foo'}, w=1)
# now it should not raise
thing = self.mongo.db.things.morethings.find_one_or_404({'_id': 'thing'})
assert thing['val'] == 'foo', 'got wrong thing'
def test_default_config_prefix(self):
self.app.config['MONGO_DBNAME'] = 'flask_pymongo_test_db'
self.app.config['MONGO_HOST'] = 'localhost'
self.app.config['MONGO_PORT'] = 27017
mongo = flask.ext.pymongo.PyMongo(self.app)
assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name
if pymongo.version_tuple[0] > 2:
time.sleep(0.2)
assert ('localhost', 27017) == mongo.cx.address
else:
assert mongo.cx.host == 'localhost'
assert mongo.cx.port == 27017
def test_custom_config_prefix(self):
self.app.config['CUSTOM_DBNAME'] = 'flask_pymongo_test_db'
self.app.config['CUSTOM_HOST'] = 'localhost'
self.app.config['CUSTOM_PORT'] = 27017
mongo = flask.ext.pymongo.PyMongo(self.app, 'CUSTOM')
assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name
if pymongo.version_tuple[0] > 2:
time.sleep(0.2)
assert ('localhost', 27017) == mongo.cx.address
else:
assert mongo.cx.host == 'localhost'
assert mongo.cx.port == 27017
def test_converts_str_to_int(self):
self.app.config['MONGO_DBNAME'] = 'flask_pymongo_test_db'
self.app.config['MONGO_HOST'] = 'localhost'
self.app.config['MONGO_PORT'] = '27017'
mongo = flask.ext.pymongo.PyMongo(self.app)
assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name
if pymongo.version_tuple[0] > 2:
time.sleep(0.2)
assert ('localhost', 27017) == mongo.cx.address
else:
assert mongo.cx.host == 'localhost'
assert mongo.cx.port == 27017
def test_config_with_uri(self):
self.app.config['MONGO_URI'] = 'mongodb://localhost:27017/flask_pymongo_test_db'
with warnings.catch_warnings():
# URI connections without a username and password
# work, but warn that auth should be supplied
warnings.simplefilter('ignore')
mongo = flask.ext.pymongo.PyMongo(self.app)
assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name
if pymongo.version_tuple[0] > 2:
time.sleep(0.2)
assert ('localhost', 27017) == mongo.cx.address
else:
assert mongo.cx.host == 'localhost'
assert mongo.cx.port == 27017
def test_config_with_document_class(self):
self.app.config['MONGO_DOCUMENT_CLASS'] = CustomDict
mongo = flask.ext.pymongo.PyMongo(self.app)
if pymongo.version_tuple[0] > 2:
assert mongo.cx.codec_options.document_class == CustomDict
else:
assert mongo.cx.document_class == CustomDict
def test_config_without_document_class(self):
mongo = flask.ext.pymongo.PyMongo(self.app)
if pymongo.version_tuple[0] > 2:
assert mongo.cx.codec_options.document_class == dict
else:
assert mongo.cx.document_class == dict
def test_host_with_port_does_not_get_overridden_by_separate_port_config_value(self):
self.app.config['MONGO_HOST'] = 'localhost:27017'
self.app.config['MONGO_PORT'] = 27018
with warnings.catch_warnings():
# URI connections without a username and password
# work, but warn that auth should be supplied
warnings.simplefilter('ignore')
mongo = flask.ext.pymongo.PyMongo(self.app)
if pymongo.version_tuple[0] > 2:
time.sleep(0.2)
assert ('localhost', 27017) == mongo.cx.address
else:
assert mongo.cx.host == 'localhost'
assert mongo.cx.port == 27017
def do_chunks(init, proc, src_col, dest_col, query, key, sort, verbose, sleep=60):
while housekeep.objects(state = 'done').count() < housekeep.objects.count():
tnow = datetime.datetime.utcnow()
raw = housekeep._collection.find_and_modify(
{'state': 'open'},
{
'$set': {
'state': 'working',
'tstart': tnow,
'procname': procname(),
}
}
)
# if raw==None, someone scooped us
if raw != None:
raw_id = raw['_id']
#reload as mongoengine object -- _id is .start (because set as primary_key)
hko = housekeep.objects(start = raw_id)[0]
# Record git commit for sanity
# hko.git = git.Git('.').rev_parse('HEAD')
# hko.save()
# get data pointed to by housekeep
qq = {'$and': [query, {key: {'$gte': hko.start}}, {key: {'$lte': hko.end}}]}
# Make cursor not timeout, using version-appropriate paramater
if pymongo.version_tuple[0] == 2:
cursor = src_col.find(qq, timeout=False)
elif pymongo.version_tuple[0] == 3:
cursor = src_col.find(qq, no_cursor_timeout=True)
else:
raise Exception("Unknown pymongo version")
# Set the sort parameters on the cursor
if sort[0] == "-":
cursor = cursor.sort(sort[1:], pymongo.DESCENDING)
else:
cursor = cursor.sort(sort, pymongo.ASCENDING)
if verbose & 2: print "mongo_process: %d elements in chunk %s-%s" % (cursor.count(), hko.start, hko.end)
sys.stdout.flush()
# This is where processing happens
hko.good =_process(init, proc, cursor, dest_col, verbose,
hkstart=raw_id)
# Check if another job finished it while this one was plugging away
hko_later = housekeep.objects(start = raw_id).only('state')[0]
if hko.good == -1: # Early exit signal
print "Chunk at %s lost to another process; not updating" % raw_id
sys.stdout.flush()
elif hko_later.state == 'done':
print "Chunk at %s had already finished; not updating" % raw_id
sys.stdout.flush()
else:
hko.state = 'done'
hko.procname = 'none'
hko.time = datetime.datetime.utcnow()
hko.save()
else:
# Not all done, but none were open for processing; thus, wait to
# see if one re-opens
print 'Standing by for reopening of "working" job...'
sys.stdout.flush()
time.sleep(sleep)