def test_expire_after(self):
# create mongo client
client = pymongo.MongoClient(host=settings.MONGO_HOST, port=settings.MONGO_PORT)
# init MongoItemPool with Mongo client
pool = MongoItemPool(client, DB="pycache", COLLECTION="pytest")
item = CacheItem()
# set expire
item.set("mykey", "myval")
item.expire_after(1)
pool.save(item)
ret_item = pool.get_item("mykey")
assert ret_item.is_hit() == True
time.sleep(2)
ret_item = pool.get_item("mykey")
assert ret_item.is_hit() == False
python类MONGO_PORT的实例源码
def test_Decorator_expire(self):
client = pymongo.MongoClient(host=settings.MONGO_HOST, port=settings.MONGO_PORT)
pool = MongoItemPool(client, DB="pycache", COLLECTION="pytest")
# decorate myAdder function in common way
@cached(CacheItemPool=pool, expire_after=0.5)
def myAdder_expire(a, b):
import time
print 'Add %d + %d need 3 seconds!' % (a, b)
time.sleep(3)
return a + b
assert myAdder_expire(5, 6) == 11 # wait for 3 seconds
time.sleep(1)
cur = time.time()
assert myAdder_expire(5, 6) == 11 # need wait another 3 seconds
assert time.time()-cur > 1
def __init__(self, collection_name: str) -> None:
"""Constructor for mongodb connection
Args:
collection_name: name of collection to operate on
"""
self._host = settings.MONGO_HOST
self._port = settings.MONGO_PORT
self._db_name = settings.MONGO_DB
try:
self.db = pymongo.MongoClient(self._host, self._port)[self._db_name]
self.db = self.db.get_collection(collection_name)
except ConnectionError:
print('Error connecting to database!')
def test_simpleKeyVal(self):
# create mongo client
client = pymongo.MongoClient(host=settings.MONGO_HOST, port=settings.MONGO_PORT)
# init MongoItemPool with Mongo client
pool = MongoItemPool(client, DB="pycache", COLLECTION="pytest")
item = CacheItem()
item.set("mykey", "myval")
pool.save(item)
ret_item = pool.get_item("mykey")
assert ret_item.get() == "myval"
assert ret_item.get_key() == "mykey"
assert ret_item.is_hit() == True
def test_Decorator(self):
client = pymongo.MongoClient(host=settings.MONGO_HOST, port=settings.MONGO_PORT)
pool = MongoItemPool(client, DB="pycache", COLLECTION="pytest")
# decorate myAdder function in common way
@cached(CacheItemPool=pool)
def myAdder(a, b):
import time
print 'Add %d + %d need 3 seconds!' % (a, b)
time.sleep(3)
return a + b
assert myAdder(5, 6) == 11 # wait for 3 seconds
assert myAdder(5, 6) == 11 # no need wait
def connect_mongo():
client = MongoClient(settings.MONGO_HOST, settings.MONGO_PORT)
return client[settings.MONGO_DATABASE]
def test_keywords():
client = MongoClient(settings.MONGO_HOST, settings.MONGO_PORT)
db = client[settings.MONGO_DATABASE]
cursor = db.arch.find().limit(10)
for post in cursor:
id = post['_id']
title = post['title']
tf_idf_dict = util.file2dict(
settings.TFIDF_FILE_PATH, str(id) + '.txt')
util.print_message(title.encode('gbk'))
for item in sorted(tf_idf_dict.items(), key=operator.itemgetter(1), reverse=True)[:10]:
util.print_message('{0}:{1}'.format(item[0].decode(
'utf8').encode('gbk'), item[1]))
def update_keywords():
util.print_message('Start updating keywords...', debug=True)
client = MongoClient(settings.MONGO_HOST, settings.MONGO_PORT)
db = client[settings.MONGO_DATABASE]
cursor = db.arch.find()
for post in cursor:
id = post['_id']
tf_idf_dict = util.file2dict(settings.TFIDF_FILE_PATH, str(id) + '.txt')
tags = []
for item in sorted(tf_idf_dict.items(), key=operator.itemgetter(1), reverse=True)[:20]:
tags.append(item[0])
util.print_message(' '.join(tags))
db.arch.update_one({'_id': id}, {'$set': {'tags': tags}})