python类ObjectId()的实例源码

handler.py 文件源码 项目:Ushio 作者: Hanaasagi 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def post(self):
        #
        # ???????
        # ????session
        #
        password = self.get_body_argument('password', '')
        if password:
            user = yield self.db.user.find_one({
                'username': self.current_user['username']
            })
            _ = md5(password + self.settings['salt'])
            if user['password'] == _.hexdigest():
                if self.get_cookie('TORNADOSESSION'):
                    self.clear_cookie('TORNADOSESSION')
                self.db.user.remove({
                    '_id': ObjectId(self.current_user['_id'])
                })
                self.session.delete('user_session')
                self.redirect('/')
        self.custom_error('????????')
handler.py 文件源码 项目:Ushio 作者: Hanaasagi 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def get(self, topic_id):
        '''
            ?????
        '''
        topic = dict()
        tid = ObjectId(topic_id)
        topic = yield self.db.topic.find_one({
            '_id': tid
        })
        if topic is None:
            self.custom_error()
        isfavorite = False
        current_user = self.current_user

        topic['content'] = markdown.markdown(topic['content'])

        isfavorite = False
        if current_user:
            user = yield self.db.user.find_one({
                '_id': ObjectId(current_user['_id'])
            })
            if topic['_id'] in user['favorite']:
                isfavorite = True
        self.render('topic/template/topic-detail.html',
                    topic=topic, isfavorite=isfavorite)
handler.py 文件源码 项目:Ushio 作者: Hanaasagi 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get(self):
        uid = self.current_user['_id']
        limit = 20
        page = int(self.get_query_argument('page', '1'))
        fav_list = yield self.db.user.find_one({
            '_id': ObjectId(uid)
        }, {
            'favorite': 1
        })

        cursor = self.db.topic.find({
            '_id': {
                '$in': fav_list['favorite']
            }
        })
        total = yield cursor.count()
        cursor.limit(
            limit).skip((page - 1) * limit)
        topics = yield cursor.to_list(length=limit)
        self.render('topic/template/topic-favorite.html',
                    topics=topics, page=page, limit=limit, total=total)
handler.py 文件源码 项目:Ushio 作者: Hanaasagi 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def post(self, tid):
        topic = {
            'title': self.get_body_argument('title', ''),
            'content': self.get_body_argument('content', ''),
            'price': self.get_body_argument('price', 0)
        }
        model = TopicModel()
        if model(topic):
            try:
                tid = ObjectId(tid)
            except:
                self.cache['topic'] = topic
                self.custom_error('????????????')
            yield self.db.topic.update({
                '_id': tid
            }, {
                '$set': topic
            })
            self.redirect('/topics/{}'.format(tid))
        else:
            self.custom_error()
handler.py 文件源码 项目:Ushio 作者: Hanaasagi 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def user_action(self, *args, **kwargs):
        uid = self.get_body_argument('uid')
        user = {
            'email': self.get_body_argument('email'),
            'website': self.get_body_argument('website'),
            'qq': self.get_body_argument('qq'),
            'address': self.get_body_argument('address')
        }
        # model ??
        #
        #
        password = self.get_body_argument('password', '')
        # password ?????
        if password:
            user['password'] = md5(password, self.settings['salt'])
        user = yield self.db.user.find_and_modify({
            '_id': ObjectId(uid)
        }, {
            '$set': user
        })
        self.redirect('/manage/userdetail/{}'.format(uid))
handler.py 文件源码 项目:Ushio 作者: Hanaasagi 项目源码 文件源码 阅读 65 收藏 0 点赞 0 评论 0
def post(self):
        id_ = self.get_body_argument('id', None)
        if not id_:
            self.custom_error()
        rtn = yield self.db.tag.remove({
            '_id': ObjectId(id_)
        })
        if rtn:
            self.write('{"success":true}')
            self.finish()
        else:
            self.custom_error()


#
# ???????
#
base.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handle_aggregation(self, callback):
        def handle(*arguments, **kw):
            if arguments[1]:
                raise RuntimeError('Aggregation failed due to: %s' % str(arguments[1]))

            results = []
            for item in arguments[0]['result']:
                #if '_id' in item and isinstance(item['_id'], ObjectId):
                    #results.append(self.get_instance(item))
                #else:
                self.fill_ids(item)
                results.append(edict(item))

            callback(results)

        return handle
test_collection.py 文件源码 项目:aiomongo 作者: ZeoAlliance 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_id_can_be_anything(self, test_db):
        await test_db.test.delete_many({})
        auto_id = {'hello': 'world'}
        await test_db.test.insert_one(auto_id)
        assert isinstance(auto_id['_id'], ObjectId)

        numeric = {'_id': 240, 'hello': 'world'}
        await test_db.test.insert_one(numeric)
        assert numeric['_id'] == 240

        obj = {'_id': numeric, 'hello': 'world'}
        await test_db.test.insert_one(obj)
        assert obj['_id'] == numeric

        async with test_db.test.find() as cursor:
            async for x in cursor:
                assert x['hello'] == 'world'
                assert '_id' in x
test_collection.py 文件源码 项目:aiomongo 作者: ZeoAlliance 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_find_one(self, test_db):

        _id = (await test_db.test.insert_one({'hello': 'world', 'foo': 'bar'})).inserted_id

        assert 'world' == (await test_db.test.find_one())['hello']
        assert await test_db.test.find_one(_id) == await test_db.test.find_one()
        assert await test_db.test.find_one(None) == await test_db.test.find_one()
        assert await test_db.test.find_one({}) == await test_db.test.find_one()
        assert await test_db.test.find_one({'hello': 'world'}) == await test_db.test.find_one()

        assert 'hello' in await test_db.test.find_one(projection=['hello'])
        assert 'hello' not in await test_db.test.find_one(projection=['foo'])
        assert ['_id'] == list(await test_db.test.find_one(projection=[]))

        assert await test_db.test.find_one({'hello': 'foo'}) is None
        assert await test_db.test.find_one(ObjectId()) is None
test_database.py 文件源码 项目:aiomongo 作者: ZeoAlliance 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_deref(self, test_db):
        with pytest.raises(TypeError):
            await test_db.dereference(5)
        with pytest.raises(TypeError):
            await test_db.dereference('hello')
        with pytest.raises(TypeError):
            await test_db.dereference(None)

        assert await test_db.dereference(DBRef("test", ObjectId())) is None
        obj = {'x': True}
        key = (await test_db.test.insert_one(obj)).inserted_id
        assert await test_db.dereference(DBRef('test', key)) == obj
        assert await test_db.dereference(DBRef('test', key, 'aiomongo_test')) == obj

        with pytest.raises(ValueError):
            await test_db.dereference(DBRef('test', key, 'foo'))

        assert await test_db.dereference(DBRef('test', 4)) is None
        obj = {'_id': 4}
        await test_db.test.insert_one(obj)
        assert await test_db.dereference(DBRef('test', 4)) == obj
test_database.py 文件源码 项目:aiomongo 作者: ZeoAlliance 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_insert_find_one(self, test_db):
        a_doc = SON({'hello': 'world'})
        a_key = (await test_db.test.insert_one(a_doc)).inserted_id
        assert isinstance(a_doc['_id'], ObjectId)
        assert a_doc['_id'] == a_key
        assert a_doc == await test_db.test.find_one({'_id': a_doc['_id']})
        assert a_doc == await test_db.test.find_one(a_key)
        assert await test_db.test.find_one(ObjectId()) is None
        assert a_doc == await test_db.test.find_one({'hello': 'world'})
        assert await test_db.test.find_one({'hello': 'test'}) is None

        b = await test_db.test.find_one()
        b['hello'] = 'mike'
        await test_db.test.replace_one({'_id': b['_id']}, b)

        assert a_doc != await test_db.test.find_one(a_key)
        assert b == await test_db.test.find_one(a_key)
        assert b == await test_db.test.find_one()

        count = 0

        async with test_db.test.find() as cursor:
            async for _ in cursor:
                count += 1
        assert count == 1
DBHandler.py 文件源码 项目:CaptsLog 作者: jaehoonhwang 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def search_entries_by_id(self, id):
        """Search For Entries created on the specified Date in the Entries_Table

        Args:
            id(string): the objectID you are searching for

        Return:
            result(collection): the search result

        """

        entries_table = self.db["Entries_Table"]
        try:
            return entries_table.find(
                {"_id": ObjectId(id)})
        except errors.ServerSelectionTimeoutError:
            print('ERROR : No connection could be made because'
                  ' the target machine actively refused it')
            return True
        return False
DBHandler.py 文件源码 项目:CaptsLog 作者: jaehoonhwang 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def update_entries(self, _id, vals):
        """Update entries in the Entries_Table

        Args:
            _id(ObjectId):  ObjectID of the entry you want to change
            vals(collection): New values

        Return:
            result(bool):True if the update was successful. False if it fails
        """

        entries_table = self.db["Entries_Table"]
        try:
            vals["Last_Modified"] = datetime.now()
            if not entries_table.find_one({"_id": ObjectId(_id)}):
                return False
            entries_table.update_one({"_id": ObjectId(_id)},
                                     {"$set": vals})
            return True
        except errors.ServerSelectionTimeoutError:
            print('ERROR : No connection could be made because'
                  ' the target machine actively refused it')
            return False
DBHandler.py 文件源码 项目:CaptsLog 作者: jaehoonhwang 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def delete_entries(self, _id):
        """Delete entries in the Entries_Table

        Args:
            _id(str):  Object ID of the entry you want to change

        Return:
            result(bool):true if the delete was successful. false if it fails
        """

        entries_table = self.db["Entries_Table"]
        try:
            if not entries_table.find_one({"_id": ObjectId(_id)}):
                # print "The specified entry does not Exist"
                return False
            entries_table.delete_one({"_id": ObjectId(_id)})
            return True
        except errors.ServerSelectionTimeoutError:
            print('ERROR : No connection could be made because'
                  ' the target machine actively refused it')
            return False
mongodb.py 文件源码 项目:sacredboard 作者: chovanecm 项目源码 文件源码 阅读 97 收藏 0 点赞 0 评论 0
def get_run(self, run_id):
        """
        Get a single run from the database.

        :param run_id: The ID of the run.
        :return: The whole object from the database.
        """
        try:
            cursor = getattr(self._db, self._collection_name) \
                .find({"_id": int(run_id)})
        except ValueError:
            # Probably not a number.
            cursor = getattr(self._db, self._collection_name) \
                .find({"_id": bson.ObjectId(run_id)})
        run = None
        for c in cursor:
            run = c
        return run
test_genericdao.py 文件源码 项目:sacredboard 作者: chovanecm 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_find_record(mongo_client):
    generic_dao = GenericDAO(mongo_client, "testdb")
    r = generic_dao.find_record("runs", {"_id": "NON_EXISTING_ID"})
    assert r is None

    r = generic_dao.find_record("runs",
                                {"_id": bson.ObjectId(
                                    "58163443b1758523257c69ca")})
    assert r is not None
    assert r["config"] is not None
    assert r["config"]["seed"] == 185616783

    r = generic_dao.find_record("runs", {"config.learning_rate": 0.0001})
    assert r is not None
    assert r["config"] is not None
    assert r["config"]["seed"] == 144363069
test_export.py 文件源码 项目:pymongo-schema 作者: pajachiet 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test04_write_json_with_mongo_objects():
    schema = {
        'db': {'coll': {'object': {'_id': {'type': 'oid', 'anonymized': ObjectId()},
                                   'date': {'type': 'date', 'anonymized': datetime(2015, 1, 1)}}}}
    }

    output = os.path.join(TEST_DIR, 'output_data_dict.json')
    output_maker = JsonOutput({})
    output_maker.data = schema
    with open(output, 'w') as out_fd:
        output_maker.write_data(out_fd)
    with open(output, 'r') as out_fd:
        # custom json_options - DEFAULT_JSON_OPTIONS sets tzinfo when loading datetime (!= schema)
        json_options = json_util.JSONOptions(tz_aware=False)
        # object_hook allows to interpret $oid, $date, etc from json and convert them to objects
        object_hook = partial(json_util.object_hook, json_options=json_options)
        assert json.load(out_fd, object_hook=object_hook) == schema
    os.remove(output)
test_ref_field.py 文件源码 项目:OmMongo 作者: bapakode 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_unwrap():
    class A(Document):
        x = IntField()
    s = get_session()

    a = A(x=5)
    s.save(a)

    aref = {'$id':a.mongo_id, '$ref':'A'}
    dbaref = DBRef(db='unit-testing', collection='A', id=a.mongo_id)

    ret = RefField(DocumentField(A)).unwrap(dbaref)
    assert isinstance(ret, DBRef), ret

    ret = SRefField(A).unwrap(a.mongo_id)
    assert isinstance(ret, ObjectId), ret
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def delete_address_document(self, object_id=None, name=None):
        """
        Delete an address_document.

        :param object_id: ObjectId
        :param name: string
        :return: True if the address_document was successfully deleted, otherwise False.
        """
        if object_id is not None:
            result = self.address_documents_collection.delete_one({
                '_id': ObjectId(object_id)
            })
        elif name is not None:
            result = self.address_documents_collection.delete_one({
                'name': name
            })
        else:
            return False

        return result.deleted_count == 1
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def delete_address_documents(self, object_ids=None, names=None):
        """
        Delete multiple address_documents.

        :param object_ids: [ObjectId]
        :param names: [string]
        :return: The number of deleted documents.
        """
        if object_ids is not None:
            processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
            result = self.address_documents_collection.delete_many({
                '_id': {'$in': processed_object_ids}
            })
        elif names is not None:
            result = self.address_documents_collection.delete_many({
                'name': {'$in': names}
            })
        else:
            return 0

        return result.deleted_count
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def delete_bus_line_documents(self, object_ids=None, bus_line_ids=None):
        """
        Delete multiple bus_line_document.

        :param object_ids: [ObjectId]
        :param bus_line_ids: [int]
        :return: The number of deleted documents.
        """
        if object_ids is not None:
            processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
            result = self.bus_line_documents_collection.delete_many({
                '_id': {'$in': processed_object_ids}
            })
        elif bus_line_ids is not None:
            result = self.bus_line_documents_collection.delete_many({
                'bus_line_id': {'$in': bus_line_ids}
            })
        else:
            return 0

        return result.deleted_count
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def delete_bus_stop_document(self, object_id=None, osm_id=None):
        """
        Delete a bus_stop_document.

        :param object_id: ObjectId
        :param osm_id: int
        :return: True if the bus_stop_document was successfully deleted, otherwise False.
        """
        if object_id is not None:
            result = self.bus_stop_documents_collection.delete_one({
                '_id': ObjectId(object_id)
            })
        elif osm_id is not None:
            result = self.bus_stop_documents_collection.delete_one({
                'osm_id': osm_id
            })
        else:
            return False

        return result.deleted_count == 1
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def delete_bus_stop_documents(self, object_ids=None, osm_ids=None):
        """
        Delete multiple bus_stop_documents.

        :param object_ids: [ObjectId]
        :param osm_ids: [int]
        :return: The number of deleted documents.
        """
        if object_ids is not None:
            processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
            result = self.bus_stop_documents_collection.delete_many({
                '_id': {'$in': processed_object_ids}
            })
        elif osm_ids is not None:
            result = self.bus_stop_documents_collection.delete_many({
                'osm_id': {'$in': osm_ids}
            })
        else:
            return 0

        return result.deleted_count
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def delete_bus_vehicle_documents(self, object_ids=None, bus_vehicle_ids=None):
        """
        Delete multiple bus_vehicle_documents.

        :param object_ids: [ObjectId]
        :param bus_vehicle_ids: [int]
        :return: The number of deleted documents.
        """
        if object_ids is not None:
            processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
            result = self.bus_vehicle_documents_collection.delete_many({
                '_id': {'$in': processed_object_ids}
            })
        elif bus_vehicle_ids is not None:
            result = self.bus_vehicle_documents_collection.delete_many({
                'bus_vehicle_id': {'$in': bus_vehicle_ids}
            })
        else:
            return 0

        return result.deleted_count
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def delete_edge_document(self, object_id=None, starting_node_osm_id=None, ending_node_osm_id=None):
        """
        Delete an edge_document.

        :param object_id: ObjectId
        :param starting_node_osm_id: int
        :param ending_node_osm_id: int
        :return: True if the document was successfully deleted, otherwise False.
        """
        if object_id is not None:
            result = self.edge_documents_collection.delete_one({
                '_id': ObjectId(object_id)
            })
        elif starting_node_osm_id is not None and ending_node_osm_id is not None:
            result = self.edge_documents_collection.delete_one({
                'starting_node.osm_id': starting_node_osm_id,
                'ending_node.osm_id': ending_node_osm_id
            })
        else:
            return False

        return result.deleted_count == 1
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def delete_node_document(self, object_id=None, osm_id=None):
        """
        Delete a node_document.

        :param object_id: ObjectId
        :param osm_id: int
        :return: True if node_document was successfully deleted, otherwise False.
        """
        if object_id is not None:
            result = self.node_documents_collection.delete_one({
                '_id': ObjectId(object_id)
            })
        elif osm_id is not None:
            result = self.node_documents_collection.delete_one({
                'osm_id': osm_id
            })
        else:
            return False

        return result.deleted_count == 1
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 70 收藏 0 点赞 0 评论 0
def delete_node_documents(self, object_ids=None, osm_ids=None):
        """
        Delete multiple node_documents.

        :param object_ids: [ObjectId]
        :param osm_ids: [int]
        :return: The number of deleted documents.
        """
        if object_ids is not None:
            processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
            result = self.node_documents_collection.delete_many({
                '_id': {'$in': processed_object_ids}
            })
        elif osm_ids is not None:
            result = self.node_documents_collection.delete_many({
                'osm_id': {'$in': osm_ids}
            })
        else:
            return 0

        return result.deleted_count
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def delete_point_documents(self, object_ids=None, osm_ids=None):
        """
        Delete multiple point_document.

        :param object_ids: [ObjectId]
        :param osm_ids: [int]
        :return: The number of deleted documents.
        """
        if object_ids is not None:
            processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
            result = self.point_documents_collection.delete_many({
                '_id': {'$in': processed_object_ids}
            })
        elif osm_ids is not None:
            result = self.point_documents_collection.delete_many({
                'osm_id': {'$in': osm_ids}
            })
        else:
            return 0

        return result.deleted_count
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def delete_timetable_document(self, object_id=None, timetable_id=None):
        """
        Delete a timetable_document.

        :param object_id: ObjectId
        :param timetable_id: int
        :return: True if the document was successfully deleted, otherwise False.
        """
        if object_id is not None:
            result = self.timetable_documents_collection.delete_one({
                '_id': ObjectId(object_id)
            })
        elif timetable_id is not None:
            result = self.timetable_documents_collection.delete_one({
                'timetable_id': timetable_id
            })
        else:
            return False

        return result.deleted_count == 1
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def delete_traffic_event_document(self, object_id=None, event_id=None):
        """
        Delete a traffic_event_document.

        :param object_id: ObjectId
        :param event_id: string
        :return: True if the traffic_event_document was successfully deleted, otherwise False.
        """
        if object_id is not None:
            result = self.traffic_event_documents_collection.delete_one({
                '_id': ObjectId(object_id)
            })
        elif event_id is not None:
            result = self.traffic_event_documents_collection.delete_one({
                'event_id': event_id
            })
        else:
            return False

        return result.deleted_count == 1


问题


面经


文章

微信
公众号

扫码关注公众号