python类MongoClient()的实例源码

conftest.py 文件源码 项目:picoCTF 作者: picoCTF 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def setup_db():
    """ Creates a mongodb instance and shuts it down after testing has concluded. """

    client = MongoClient(api.config.testing_mongo_addr,
                         api.config.testing_mongo_port)[api.config.testing_mongo_db_name]

    if len(client.collection_names()) != 0:
        client.connection.drop_database(api.config.testing_mongo_db_name)

    #Set debug client for mongo
    if api.common.external_client is None:
        api.common.external_client = client

    return client
conftest.py 文件源码 项目:picoCTF 作者: picoCTF 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def teardown_db():
    """ Drops the db and shuts down the mongodb instance. """
    client = MongoClient(api.config.testing_mongo_addr,
                         api.config.testing_mongo_port)[api.config.testing_mongo_db_name]
    client.connection.drop_database(api.config.testing_mongo_db_name)
    client.connection.disconnect()
common.py 文件源码 项目:picoCTF 作者: picoCTF 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_conn():
    """
    Get a database connection

    Ensures that only one global database connection exists per thread.
    If the connection does not exist a new one is created and returned.
    """

    global __client, __connection
    if not __connection:
        try:
            # Allow more complex mongodb connections
            conf = api.app.app.config
            if conf["MONGO_USER"] and conf["MONGO_PW"]:
                uri = "mongodb://{}:{}@{}:{}/{}?authMechanism=SCRAM-SHA-1".format(
                        conf["MONGO_USER"],
                        conf["MONGO_PW"],
                        conf["MONGO_ADDR"],
                        conf["MONGO_PORT"],
                        conf["MONGO_DB_NAME"])
            else:
                uri = "mongodb://{}:{}/{}".format(
                        conf["MONGO_ADDR"],
                        conf["MONGO_PORT"],
                        conf["MONGO_DB_NAME"])

            __client = MongoClient(uri)
            __connection = __client[conf["MONGO_DB_NAME"]]
        except ConnectionFailure:
            raise SevereInternalException("Could not connect to mongo database {} at {}:{}".format(mongo_db_name, mongo_addr, mongo_port))
        except InvalidName as error:
            raise SevereInternalException("Database {} is invalid! - {}".format(mongo_db_name, error))

    return __connection
mongo.py 文件源码 项目:database_assetstore 作者: OpenGeoscience 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def connect(self):
        """
        Connect to the database and get a reference to the Mongo collection.

        :returns: the mongo collection.
        """
        self.conn = MongoClient(self.databaseUri)
        self.database = self.conn[self.databaseName]
        return self.database[self.collection]
mongo.py 文件源码 项目:database_assetstore 作者: OpenGeoscience 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def getTableList(uri, internalTables=False, **kwargs):
        """
        Get a list of known databases, each of which has a list of known
        collections from the database.  This is of the form [{'database':
        (database 1), 'tables': [{'table': (collection 1)}, {'table':
        (collection 2)}, ...]}, {'database': (database 2), 'tables': [...]},
        ...]

        :param uri: uri to connect to the database.
        :param internaltables: True to return tables about the database itself.
            Ignored for Mongo.
        :returns: A list of known collections.
        """
        conn = MongoClient(uri)
        databaseName = base.databaseFromUri(uri)
        if databaseName is None:
            databaseNames = conn.database_names()
        else:
            databaseNames = [databaseName]
        results = []
        for name in databaseNames:
            database = conn[name]
            results.append({
                'database': name,
                'tables': [{'table': collection, 'name': collection}
                           for collection in database.collection_names(False)]
            })
        return results
mongodb.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def Conn(self):
        self.client = pymongo.MongoClient(self.ip,self.port)
        self.connection=self.client.stock #storage stock information
        self.index=self.client.index #storage index
        self.pool=self.client.pool  #storate pool
        self.treasure=self.client.treasure
        #print self.connection.collection_names()
        #print self.index.collection_names()
        #print self.pool.collection_names()
mongo_operator.py 文件源码 项目:apiTest 作者: wuranxu 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self):
        try:
            self.Client = pymongo.MongoClient(host=self.HOST, port=self.PORT)
            self.db = self.Client.yitu8
            assert self.db.authenticate(self.user, self.pwd), "mongo???????!"
        except Exception as err:
            logging.error("mongo connect error: {}".format(str(err)))
testers.py 文件源码 项目:mongoaudit 作者: Exploit-install 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_connection(self):
        """
        Get the most secure kind of connection available.
        Returns:
          pymongo.MongoClient instance

        """
        fqdn, port = self.cred['nodelist'][0]
        if hasattr(self, 'conn'):
            self.conn.close()
            return self.get_plain_connection(fqdn, port)
        else:
            return self.get_tls_connection(fqdn, port)
conn.py 文件源码 项目:tipi-engine 作者: CIECODE-Madrid 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _connDB(self):
        configuration = MongoConfig()
        mongo_client = pymongo.MongoClient(host=configuration.host, port=configuration.port)
        self._client = mongo_client
        try:
            db = mongo_client[configuration.db_name]
            if configuration.username is not None:
                db.authenticate(configuration.username, password=configuration.password)
            self._currentDB = db
        except:
            pass
mongo_collection.py 文件源码 项目:smappdragon 作者: SMAPPNYU 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        BaseCollection.__init__(self)
        if 'passed_mongo' in kwargs:
            self.mongo = kwargs['passed_mongo']
            self.mongo_database = self.mongo[args[2]]
            if args[0] and args[1]:
                self.mongo_database.authenticate(args[0], args[1])
            self.mongo_collection = self.mongo_database[args[3]]
        else:
            self.mongo = pymongo.MongoClient(args[0], int(args[1]))
            self.mongo_database = self.mongo[args[4]]
            if args[2] and args[3]:
                self.mongo_database.authenticate(args[2], args[3])
            self.mongo_collection = self.mongo_database[args[5]]
test_mongo_collection.py 文件源码 项目:smappdragon 作者: SMAPPNYU 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_pass_in_mongo(self):
        mongo_to_pass = pymongo.MongoClient(config['mongo']['host'], int(config['mongo']['port']))
        collection = MongoCollection(
            config['mongo']['user'],
            config['mongo']['password'],
            config['mongo']['database'],
            config['mongo']['collection'],
            passed_mongo=mongo_to_pass
        )
        self.assertTrue(len(list(collection.set_limit(10).get_iterator())) > 0)
connector.py 文件源码 项目:webtzite 作者: materialsproject 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _get_connection(self, host, port):
        ckey = "{}:{}".format(host, port)
        conn = self._conns.get(ckey, None)
        if conn is None:
            mps = ('max_pool_size' if pymongo.version_tuple[0] == 2
                   else 'maxPoolSize')
            conn = pymongo.MongoClient(host, port, **{mps: self.MAX_POOL})
            self._conns[ckey] = conn
        return conn
mongo_base.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def stream_events(self, inputs, ew):
        for input_name, input_item in inputs.inputs.iteritems():
            host = input_item["server"]
            port = input_item["port"]
            if not port is None:
                port = int(port)
            client = pymongo.MongoClient(host, port)
            self.stream_events_mongo(input_name, input_item, client, ew)
QASql.py 文件源码 项目:QUANTAXIS 作者: yutiansut 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def QA_util_sql_mongo_setting(ip='127.0.0.1', port=27017):
    QA_sql_mongo_client = pymongo.MongoClient(ip, int(port))
    QA_util_log_info('ip:{},port:{}'.format(str(ip), str(port)))
    return QA_sql_mongo_client

# async
wsgi.py 文件源码 项目:nationalparks-py 作者: openshift-roadshow 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get(self):
        client = MongoClient(DB_URI)
        database = client[DB_NAME]
        collection = database.nationalparks

        collection.remove({})
        collection.create_index([('Location', GEO2D)])

        with open(DATASET_FILE, 'r') as fp:
            entries = []

            for data in fp.readlines():
                entry = json.loads(data)

                loc = [entry['coordinates'][1], entry['coordinates'][0]]
                entry['Location'] = loc

                entries.append(entry)

                if len(entries) >= 1000:
                    collection.insert_many(entries)
                    entries = []

            if entries:
                collection.insert_many(entries)

        return 'Items inserted in database: %s' % collection.count()
wsgi.py 文件源码 项目:nationalparks-py 作者: openshift-roadshow 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get(self):
        client = MongoClient(DB_URI)
        database = client[DB_NAME]
        collection = database.nationalparks

        return format_result(collection.find())
wsgi.py 文件源码 项目:nationalparks-py 作者: openshift-roadshow 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get(self):
        args = request.args

        box = [[float(args['lon1']), float(args['lat1'])],
               [float(args['lon2']), float(args['lat2'])]]

        query = {'Location': {'$within': {'$box': box}}}

        client = MongoClient(DB_URI)
        database = client[DB_NAME]
        collection = database.nationalparks

        return format_result(collection.find(query))
wechat_sele.py 文件源码 项目:NewsScrapy 作者: yinzishao 项目源码 文件源码 阅读 60 收藏 0 点赞 0 评论 0
def insertMongoDB(items):
    collection_name = 'wechat'
    client = pymongo.MongoClient(MONGO_URI)
    db = client[MONGO_DATABASE]
    for item in items:
        item['_id'] = str(ObjectId())
        db[collection_name].insert(dict(item))
pipelines.py 文件源码 项目:NewsScrapy 作者: yinzishao 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]
table_test.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_bson(self):
        import pymongo
        outputs = run(
            self.analysis,
            inputs={
                'a': {
                    'format': 'objectlist.bson',
                    'mode': 'mongodb',
                    'db': 'test',
                    'collection': 'a'
                },
                'b': {
                    'format': 'objectlist.bson',
                    'mode': 'mongodb',
                    'db': 'test',
                    'collection': 'b'
                }
            },
            outputs={
                'c': {
                    'format': 'objectlist.bson',
                    'mode': 'mongodb',
                    'db': 'test',
                    'collection': 'temp'
                }
            })
        self.assertEqual(outputs['c']['format'], 'objectlist.bson')
        coll = pymongo.MongoClient('mongodb://localhost')['test']['temp']
        self.assertEqual([d for d in coll.find()], [self.aobj, self.bobj])


问题


面经


文章

微信
公众号

扫码关注公众号