python类MONGO_PORT的实例源码

test_MongoAdapter.py 文件源码 项目:python-cache 作者: python-cache 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_expire_after(self):
        # create mongo client
        client = pymongo.MongoClient(host=settings.MONGO_HOST, port=settings.MONGO_PORT)

        # init MongoItemPool with Mongo client
        pool = MongoItemPool(client, DB="pycache", COLLECTION="pytest")
        item = CacheItem()

        # set expire
        item.set("mykey", "myval")
        item.expire_after(1)
        pool.save(item)
        ret_item = pool.get_item("mykey")
        assert ret_item.is_hit() == True

        time.sleep(2)
        ret_item = pool.get_item("mykey")
        assert ret_item.is_hit() == False
test_MongoAdapter.py 文件源码 项目:python-cache 作者: python-cache 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_Decorator_expire(self):
        client = pymongo.MongoClient(host=settings.MONGO_HOST, port=settings.MONGO_PORT)
        pool = MongoItemPool(client, DB="pycache", COLLECTION="pytest")

        # decorate myAdder function in common way
        @cached(CacheItemPool=pool, expire_after=0.5)
        def myAdder_expire(a, b):
            import time
            print 'Add %d + %d need 3 seconds!' % (a, b)
            time.sleep(3)
            return a + b

        assert myAdder_expire(5, 6) == 11  # wait for 3 seconds
        time.sleep(1)
        cur = time.time()
        assert myAdder_expire(5, 6) == 11  # need wait another 3 seconds
        assert time.time()-cur > 1
mongo_engine.py 文件源码 项目:coder_api 作者: seekheart 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, collection_name: str) -> None:
        """Constructor for mongodb connection

        Args:
            collection_name: name of collection to operate on
        """

        self._host = settings.MONGO_HOST
        self._port = settings.MONGO_PORT
        self._db_name = settings.MONGO_DB

        try:
            self.db = pymongo.MongoClient(self._host, self._port)[self._db_name]
            self.db = self.db.get_collection(collection_name)
        except ConnectionError:
            print('Error connecting to database!')
test_MongoAdapter.py 文件源码 项目:python-cache 作者: python-cache 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_simpleKeyVal(self):
        # create mongo client
        client = pymongo.MongoClient(host=settings.MONGO_HOST, port=settings.MONGO_PORT)

        # init MongoItemPool with Mongo client
        pool = MongoItemPool(client, DB="pycache", COLLECTION="pytest")
        item = CacheItem()

        item.set("mykey", "myval")
        pool.save(item)
        ret_item = pool.get_item("mykey")
        assert ret_item.get() == "myval"
        assert ret_item.get_key() == "mykey"
        assert ret_item.is_hit() == True
test_MongoAdapter.py 文件源码 项目:python-cache 作者: python-cache 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_Decorator(self):
        client = pymongo.MongoClient(host=settings.MONGO_HOST, port=settings.MONGO_PORT)
        pool = MongoItemPool(client, DB="pycache", COLLECTION="pytest")

        # decorate myAdder function in common way
        @cached(CacheItemPool=pool)
        def myAdder(a, b):
            import time
            print 'Add %d + %d need 3 seconds!' % (a, b)
            time.sleep(3)
            return a + b

        assert myAdder(5, 6) == 11  # wait for 3 seconds
        assert myAdder(5, 6) == 11  # no need wait
kafka_mongo_connector.py 文件源码 项目:nlp 作者: aaronz 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def connect_mongo():
    client = MongoClient(settings.MONGO_HOST, settings.MONGO_PORT)
    return client[settings.MONGO_DATABASE]
mongo_tags.py 文件源码 项目:nlp 作者: aaronz 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_keywords():
    client = MongoClient(settings.MONGO_HOST, settings.MONGO_PORT)
    db = client[settings.MONGO_DATABASE]

    cursor = db.arch.find().limit(10)
    for post in cursor:
        id = post['_id']
        title = post['title']
        tf_idf_dict = util.file2dict(
            settings.TFIDF_FILE_PATH, str(id) + '.txt')
        util.print_message(title.encode('gbk'))
        for item in sorted(tf_idf_dict.items(), key=operator.itemgetter(1), reverse=True)[:10]:
            util.print_message('{0}:{1}'.format(item[0].decode(
                'utf8').encode('gbk'), item[1]))
mongo_tags.py 文件源码 项目:nlp 作者: aaronz 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def update_keywords():
    util.print_message('Start updating keywords...', debug=True)
    client = MongoClient(settings.MONGO_HOST, settings.MONGO_PORT)
    db = client[settings.MONGO_DATABASE]

    cursor = db.arch.find()
    for post in cursor:
        id = post['_id']
        tf_idf_dict = util.file2dict(settings.TFIDF_FILE_PATH, str(id) + '.txt')
        tags = []
        for item in sorted(tf_idf_dict.items(), key=operator.itemgetter(1), reverse=True)[:20]:
            tags.append(item[0])
        util.print_message(' '.join(tags))
        db.arch.update_one({'_id': id}, {'$set': {'tags': tags}})


问题


面经


文章

微信
公众号

扫码关注公众号