python类ObjectId()的实例源码

mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def delete_traffic_event_documents(self, object_ids=None, event_ids=None):
        """
        Delete multiple traffic_event_documents.

        :param object_ids: [ObjectId]
        :param event_ids: [string]
        :return: The number of deleted documents.
        """
        if object_ids is not None:
            processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
            result = self.traffic_event_documents_collection.delete_many({
                '_id': {'$in': processed_object_ids}
            })
        elif event_ids is not None:
            result = self.traffic_event_documents_collection.delete_many({
                'event_id': {'$in': event_ids}
            })
        else:
            return 0

        return result.deleted_count
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def delete_way_document(self, object_id=None, osm_id=None):
        """
        Delete a way_document.

        :param object_id: ObjectId
        :param osm_id: int
        :return: True if the way_document was successfully deleted, otherwise False.
        """
        if object_id is not None:
            result = self.way_documents_collection.delete_one({
                '_id': ObjectId(object_id)
            })
        elif osm_id is not None:
            result = self.way_documents_collection.delete_one({
                'osm_id': osm_id
            })
        else:
            return False

        return result.deleted_count == 1
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def delete_way_documents(self, object_ids=None, osm_ids=None):
        """
        Delete multiple way_documents.

        :param object_ids: [ObjectId]
        :param osm_ids: [int]
        :return: The number of deleted documents.
        """
        if object_ids is not None:
            processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
            result = self.way_documents_collection.delete_many({
                '_id': {'$in': processed_object_ids}
            })
        elif osm_ids is not None:
            result = self.way_documents_collection.delete_many({
                'osm_id': {'$in': osm_ids}
            })
        else:
            return 0

        return result.deleted_count
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def find_bus_vehicle_document(self, object_id=None, bus_vehicle_id=None):
        """
        Retrieve a bus_vehicle_document.

        :param object_id: ObjectId
        :param bus_vehicle_id: int
        :return: bus_vehicle_document
        """
        if object_id is not None:
            bus_vehicle_document = self.bus_vehicle_documents_collection.find_one({
                '_id': ObjectId(object_id)
            })
        elif bus_vehicle_id is not None:
            bus_vehicle_document = self.bus_vehicle_documents_collection.find_one({
                'bus_vehicle_id': bus_vehicle_id
            })
        else:
            return None

        return bus_vehicle_document
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def find_edge_document(self, object_id=None, starting_node_osm_id=None, ending_node_osm_id=None):
        """
        Retrieve an edge_document.

        :param object_id: ObjectId
        :param starting_node_osm_id: int
        :param ending_node_osm_id: int
        :return: edge_document
        """
        if object_id is not None:
            edge_document = self.edge_documents_collection.find_one({
                '_id': ObjectId(object_id)
            })
        elif starting_node_osm_id is not None and ending_node_osm_id is not None:
            edge_document = self.edge_documents_collection.find_one({
                'starting_node.osm_id': starting_node_osm_id,
                'ending_node.oms_id': ending_node_osm_id}
            )
        else:
            return None

        return edge_document
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def find_timetable_document(self, object_id=None, timetable_id=None):
        """
        Retrieve a timetable_document.

        :param object_id: ObjectId
        :param timetable_id: int
        :return: timetable_document
        """
        if object_id is not None:
            timetable_document = self.timetable_documents_collection.find_one({
                '_id': ObjectId(object_id)
            })
        elif timetable_id is not None:
            timetable_document = self.timetable_documents_collection.find_one({
                '_id': ObjectId(object_id)
            })
        else:
            return None

        return timetable_document
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def find_traffic_event_document(self, object_id=None, event_id=None):
        """
        Retrieve a traffic_event_document.

        :param object_id: ObjectId
        :param event_id: string
        :return: traffic_event_document
        """
        if object_id is not None:
            traffic_event_document = self.traffic_event_documents_collection.find_one({
                '_id': ObjectId(object_id)
            })
        elif event_id is not None:
            traffic_event_document = self.traffic_event_documents_collection.find_one({
                'event_id': event_id
            })
        else:
            return None

        return traffic_event_document
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def find_way_document(self, object_id=None, osm_id=None):
        """
        Retrieve a way_document.

        :param object_id: ObjectId
        :param osm_id: int
        :return: way_document
        """
        if object_id is not None:
            way_document = self.way_documents_collection.find_one({
                '_id': ObjectId(object_id)
            })
        elif osm_id is not None:
            way_document = self.way_documents_collection.find_one({
                'osm_id': osm_id
            })
        else:
            return None

        return way_document
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def insert_address_document(self, address_document=None, name=None, node_id=None, point=None):
        """
        Insert an address_document.

        :param address_document
        :param name: string
        :param node_id: int
        :param point: Point
        :return: new_object_id: ObjectId
        """
        if address_document is None:
            address_document = {
                'name': name,
                'node_id': node_id,
                'point': {
                    'longitude': point.longitude,
                    'latitude': point.latitude
                }
            }

        result = self.address_documents_collection.insert_one(address_document)
        new_object_id = result.inserted_id
        return new_object_id
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def insert_bus_stop_document(self, bus_stop_document=None, osm_id=None, name=None, point=None):
        """
        Insert a bus_stop_document.

        :param bus_stop_document
        :param osm_id: int
        :param name: string
        :param point: Point
        :return: new_object_id: ObjectId
        """
        if bus_stop_document is None:
            bus_stop_document = {
                'osm_id': osm_id,
                'name': name,
                'point': {
                    'longitude': point.longitude,
                    'latitude': point.latitude
                }
            }

        result = self.bus_stop_documents_collection.insert_one(bus_stop_document)
        new_object_id = result.inserted_id
        return new_object_id
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def insert_node_document(self, node_document=None, osm_id=None, tags=None, point=None):
        """
        Insert a node_document.

        :param node_document
        :param osm_id: int
        :param tags: dict
        :param point: Point
        :return: new_object_id: ObjectId
        """
        if node_document is None:
            node_document = {
                'osm_id': osm_id,
                'tags': tags,
                'point': {
                    'longitude': point.longitude,
                    'latitude': point.latitude
                }
            }

        result = self.node_documents_collection.insert_one(node_document)
        new_object_id = result.inserted_id
        return new_object_id
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def insert_point_document(self, point_document=None, osm_id=None, point=None):
        """
        Insert a point_document.

        :param point_document
        :param osm_id: int
        :param point: Point
        :return: new_object_id: ObjectId
        """
        if point_document is None:
            point_document = {
                'osm_id': osm_id,
                'point': {
                    'longitude': point.longitude,
                    'latitude': point.latitude
                }
            }

        result = self.point_documents_collection.insert_one(point_document)
        new_object_id = result.inserted_id
        return new_object_id
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def insert_timetable_document(self, timetable):
        """
        Insert a new timetable_document or update, if it already exists in the database.

        :param timetable: timetable_document
        :return: new_object_id: ObjectId
        """
        key = {
            '_id': ObjectId(timetable.get('_id'))
        }
        data = {
            '$set': {
                'bus_line_id': timetable.get('bus_line_id'),
                'timetable_entries': timetable.get('timetable_entries'),
                'travel_requests': timetable.get('travel_requests')
            }
        }
        result = self.timetable_documents_collection.update_one(key, data, upsert=True)
        new_object_id = result.upserted_id
        return new_object_id
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def insert_traffic_event_document(self, traffic_event_document):
        """
        Insert a new traffic_event_document or update, if it already exists in the database.

        :param traffic_event_document
        :return: new_object_id: ObjectId
        """
        key = {
            'event_id': traffic_event_document.get('event_id')
        }
        data = {
            '$set': {
                'event_type': traffic_event_document.get('event_type'),
                'event_level': traffic_event_document.get('event_level'),
                'point': traffic_event_document.get('point'),
                'datetime': traffic_event_document.get('datetime')
            }
        }
        result = self.traffic_event_documents_collection.update_one(key, data, upsert=True)
        new_object_id = result.upserted_id
        return new_object_id
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def insert_traffic_event_documents(self, traffic_event_documents):
        """
        Insert multiple new traffic_event_documents or update, if they already exist in the database.

        :param traffic_event_documents: [traffic_event_document]
        :return: new_object_ids: [ObjectId]
        """
        new_object_ids = []

        for traffic_event_document in traffic_event_documents:
            new_object_id = self.insert_traffic_event_document(
                traffic_event_document=traffic_event_document
            )
            new_object_ids.append(new_object_id)

        return new_object_ids
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def insert_way_document(self, way_document=None, osm_id=None, tags=None, references=None):
        """
        Insert a way_document.

        :param way_document
        :param osm_id: int
        :param tags: dict
        :param references: [osm_id]
        :return: new_object_id: ObjectId
        """
        if way_document is None:
            way_document = {
                'osm_id': osm_id,
                'tags': tags,
                'references': references
            }

        result = self.way_documents_collection.insert_one(way_document)
        new_object_id = result.inserted_id
        return new_object_id
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def print_address_document(self, object_id=None, name=None, node_id=None, longitude=None, latitude=None):
        """
        Print an address_document.

        :param object_id: ObjectId
        :param name: string
        :param node_id: int
        :param longitude: float
        :param latitude: float
        :return: None
        """
        address_document = self.find_address_document(
            object_id=object_id,
            name=name,
            node_id=node_id,
            longitude=longitude,
            latitude=latitude
        )
        print address_document
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def print_bus_stop_document(self, object_id=None, osm_id=None, name=None, longitude=None, latitude=None):
        """
        Retrieve a bus_stop_document.

        :param object_id: ObjectId
        :param osm_id: int
        :param name: string
        :param longitude: float
        :param latitude: float
        :return: None
        """
        bus_stop_document = self.find_bus_stop_document(
            object_id=object_id,
            osm_id=osm_id, name=name,
            longitude=longitude,
            latitude=latitude
        )
        print bus_stop_document
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def print_bus_vehicle_documents(self, object_ids=None, bus_vehicle_ids=None, counter=None):
        """
        Print multiple bus_vehicle_documents.

        :param object_ids: [ObjectId]
        :param bus_vehicle_ids: [int]
        :param counter: int
        :return: None
        """
        bus_vehicle_documents = self.find_bus_vehicle_documents(
            object_ids=object_ids,
            bus_vehicle_ids=bus_vehicle_ids
        )
        number_of_bus_vehicle_documents = len(bus_vehicle_documents)

        if counter is None or number_of_bus_vehicle_documents <= counter:
            for bus_vehicle_document in bus_vehicle_documents:
                print bus_vehicle_document
        else:
            for i in range(0, counter):
                bus_vehicle_document = bus_vehicle_documents[i]
                print bus_vehicle_document

        print 'number_of_bus_vehicle_documents:', number_of_bus_vehicle_documents
mongodb_database_connection.py 文件源码 项目:dynamic-bus-scheduling 作者: pinac0099 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def print_node_document(self, object_id=None, osm_id=None, longitude=None, latitude=None):
        """
        Print a node_document.

        :param object_id: ObjectId
        :param osm_id: int
        :param longitude: float
        :param latitude: float
        :return: None
        """
        node_document = self.find_node_document(
            object_id=object_id,
            osm_id=osm_id,
            longitude=longitude,
            latitude=latitude
        )
        print node_document
utils.py 文件源码 项目:dbs-back 作者: Beit-Hatfutsot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_oid(id_str):
    try:
        return bson.objectid.ObjectId(id_str)
    except bson.errors.InvalidId:
        return None
__init__.py 文件源码 项目:pillar 作者: armadillica 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def str2id(document_id: str) -> bson.ObjectId:
    """Returns the document ID as ObjectID, or raises a BadRequest exception.

    :raises: wz_exceptions.BadRequest
    """

    if not document_id:
        log.debug('str2id(%r): Invalid Object ID', document_id)
        raise wz_exceptions.BadRequest('Invalid object ID %r' % document_id)

    try:
        return bson.ObjectId(document_id)
    except (bson.objectid.InvalidId, TypeError):
        log.debug('str2id(%r): Invalid Object ID', document_id)
        raise wz_exceptions.BadRequest('Invalid object ID %r' % document_id)


问题


面经


文章

微信
公众号

扫码关注公众号