def delete_traffic_event_documents(self, object_ids=None, event_ids=None):
"""
Delete multiple traffic_event_documents.
:param object_ids: [ObjectId]
:param event_ids: [string]
:return: The number of deleted documents.
"""
if object_ids is not None:
processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
result = self.traffic_event_documents_collection.delete_many({
'_id': {'$in': processed_object_ids}
})
elif event_ids is not None:
result = self.traffic_event_documents_collection.delete_many({
'event_id': {'$in': event_ids}
})
else:
return 0
return result.deleted_count
python类ObjectId()的实例源码
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def delete_way_document(self, object_id=None, osm_id=None):
"""
Delete a way_document.
:param object_id: ObjectId
:param osm_id: int
:return: True if the way_document was successfully deleted, otherwise False.
"""
if object_id is not None:
result = self.way_documents_collection.delete_one({
'_id': ObjectId(object_id)
})
elif osm_id is not None:
result = self.way_documents_collection.delete_one({
'osm_id': osm_id
})
else:
return False
return result.deleted_count == 1
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def delete_way_documents(self, object_ids=None, osm_ids=None):
"""
Delete multiple way_documents.
:param object_ids: [ObjectId]
:param osm_ids: [int]
:return: The number of deleted documents.
"""
if object_ids is not None:
processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
result = self.way_documents_collection.delete_many({
'_id': {'$in': processed_object_ids}
})
elif osm_ids is not None:
result = self.way_documents_collection.delete_many({
'osm_id': {'$in': osm_ids}
})
else:
return 0
return result.deleted_count
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def find_bus_vehicle_document(self, object_id=None, bus_vehicle_id=None):
"""
Retrieve a bus_vehicle_document.
:param object_id: ObjectId
:param bus_vehicle_id: int
:return: bus_vehicle_document
"""
if object_id is not None:
bus_vehicle_document = self.bus_vehicle_documents_collection.find_one({
'_id': ObjectId(object_id)
})
elif bus_vehicle_id is not None:
bus_vehicle_document = self.bus_vehicle_documents_collection.find_one({
'bus_vehicle_id': bus_vehicle_id
})
else:
return None
return bus_vehicle_document
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def find_edge_document(self, object_id=None, starting_node_osm_id=None, ending_node_osm_id=None):
"""
Retrieve an edge_document.
:param object_id: ObjectId
:param starting_node_osm_id: int
:param ending_node_osm_id: int
:return: edge_document
"""
if object_id is not None:
edge_document = self.edge_documents_collection.find_one({
'_id': ObjectId(object_id)
})
elif starting_node_osm_id is not None and ending_node_osm_id is not None:
edge_document = self.edge_documents_collection.find_one({
'starting_node.osm_id': starting_node_osm_id,
'ending_node.oms_id': ending_node_osm_id}
)
else:
return None
return edge_document
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def find_timetable_document(self, object_id=None, timetable_id=None):
"""
Retrieve a timetable_document.
:param object_id: ObjectId
:param timetable_id: int
:return: timetable_document
"""
if object_id is not None:
timetable_document = self.timetable_documents_collection.find_one({
'_id': ObjectId(object_id)
})
elif timetable_id is not None:
timetable_document = self.timetable_documents_collection.find_one({
'_id': ObjectId(object_id)
})
else:
return None
return timetable_document
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def find_traffic_event_document(self, object_id=None, event_id=None):
"""
Retrieve a traffic_event_document.
:param object_id: ObjectId
:param event_id: string
:return: traffic_event_document
"""
if object_id is not None:
traffic_event_document = self.traffic_event_documents_collection.find_one({
'_id': ObjectId(object_id)
})
elif event_id is not None:
traffic_event_document = self.traffic_event_documents_collection.find_one({
'event_id': event_id
})
else:
return None
return traffic_event_document
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def find_way_document(self, object_id=None, osm_id=None):
"""
Retrieve a way_document.
:param object_id: ObjectId
:param osm_id: int
:return: way_document
"""
if object_id is not None:
way_document = self.way_documents_collection.find_one({
'_id': ObjectId(object_id)
})
elif osm_id is not None:
way_document = self.way_documents_collection.find_one({
'osm_id': osm_id
})
else:
return None
return way_document
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def insert_address_document(self, address_document=None, name=None, node_id=None, point=None):
"""
Insert an address_document.
:param address_document
:param name: string
:param node_id: int
:param point: Point
:return: new_object_id: ObjectId
"""
if address_document is None:
address_document = {
'name': name,
'node_id': node_id,
'point': {
'longitude': point.longitude,
'latitude': point.latitude
}
}
result = self.address_documents_collection.insert_one(address_document)
new_object_id = result.inserted_id
return new_object_id
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def insert_bus_stop_document(self, bus_stop_document=None, osm_id=None, name=None, point=None):
"""
Insert a bus_stop_document.
:param bus_stop_document
:param osm_id: int
:param name: string
:param point: Point
:return: new_object_id: ObjectId
"""
if bus_stop_document is None:
bus_stop_document = {
'osm_id': osm_id,
'name': name,
'point': {
'longitude': point.longitude,
'latitude': point.latitude
}
}
result = self.bus_stop_documents_collection.insert_one(bus_stop_document)
new_object_id = result.inserted_id
return new_object_id
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def insert_node_document(self, node_document=None, osm_id=None, tags=None, point=None):
"""
Insert a node_document.
:param node_document
:param osm_id: int
:param tags: dict
:param point: Point
:return: new_object_id: ObjectId
"""
if node_document is None:
node_document = {
'osm_id': osm_id,
'tags': tags,
'point': {
'longitude': point.longitude,
'latitude': point.latitude
}
}
result = self.node_documents_collection.insert_one(node_document)
new_object_id = result.inserted_id
return new_object_id
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def insert_point_document(self, point_document=None, osm_id=None, point=None):
"""
Insert a point_document.
:param point_document
:param osm_id: int
:param point: Point
:return: new_object_id: ObjectId
"""
if point_document is None:
point_document = {
'osm_id': osm_id,
'point': {
'longitude': point.longitude,
'latitude': point.latitude
}
}
result = self.point_documents_collection.insert_one(point_document)
new_object_id = result.inserted_id
return new_object_id
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def insert_timetable_document(self, timetable):
"""
Insert a new timetable_document or update, if it already exists in the database.
:param timetable: timetable_document
:return: new_object_id: ObjectId
"""
key = {
'_id': ObjectId(timetable.get('_id'))
}
data = {
'$set': {
'bus_line_id': timetable.get('bus_line_id'),
'timetable_entries': timetable.get('timetable_entries'),
'travel_requests': timetable.get('travel_requests')
}
}
result = self.timetable_documents_collection.update_one(key, data, upsert=True)
new_object_id = result.upserted_id
return new_object_id
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def insert_traffic_event_document(self, traffic_event_document):
"""
Insert a new traffic_event_document or update, if it already exists in the database.
:param traffic_event_document
:return: new_object_id: ObjectId
"""
key = {
'event_id': traffic_event_document.get('event_id')
}
data = {
'$set': {
'event_type': traffic_event_document.get('event_type'),
'event_level': traffic_event_document.get('event_level'),
'point': traffic_event_document.get('point'),
'datetime': traffic_event_document.get('datetime')
}
}
result = self.traffic_event_documents_collection.update_one(key, data, upsert=True)
new_object_id = result.upserted_id
return new_object_id
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def insert_traffic_event_documents(self, traffic_event_documents):
"""
Insert multiple new traffic_event_documents or update, if they already exist in the database.
:param traffic_event_documents: [traffic_event_document]
:return: new_object_ids: [ObjectId]
"""
new_object_ids = []
for traffic_event_document in traffic_event_documents:
new_object_id = self.insert_traffic_event_document(
traffic_event_document=traffic_event_document
)
new_object_ids.append(new_object_id)
return new_object_ids
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def insert_way_document(self, way_document=None, osm_id=None, tags=None, references=None):
"""
Insert a way_document.
:param way_document
:param osm_id: int
:param tags: dict
:param references: [osm_id]
:return: new_object_id: ObjectId
"""
if way_document is None:
way_document = {
'osm_id': osm_id,
'tags': tags,
'references': references
}
result = self.way_documents_collection.insert_one(way_document)
new_object_id = result.inserted_id
return new_object_id
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def print_address_document(self, object_id=None, name=None, node_id=None, longitude=None, latitude=None):
"""
Print an address_document.
:param object_id: ObjectId
:param name: string
:param node_id: int
:param longitude: float
:param latitude: float
:return: None
"""
address_document = self.find_address_document(
object_id=object_id,
name=name,
node_id=node_id,
longitude=longitude,
latitude=latitude
)
print address_document
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def print_bus_stop_document(self, object_id=None, osm_id=None, name=None, longitude=None, latitude=None):
"""
Retrieve a bus_stop_document.
:param object_id: ObjectId
:param osm_id: int
:param name: string
:param longitude: float
:param latitude: float
:return: None
"""
bus_stop_document = self.find_bus_stop_document(
object_id=object_id,
osm_id=osm_id, name=name,
longitude=longitude,
latitude=latitude
)
print bus_stop_document
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def print_bus_vehicle_documents(self, object_ids=None, bus_vehicle_ids=None, counter=None):
"""
Print multiple bus_vehicle_documents.
:param object_ids: [ObjectId]
:param bus_vehicle_ids: [int]
:param counter: int
:return: None
"""
bus_vehicle_documents = self.find_bus_vehicle_documents(
object_ids=object_ids,
bus_vehicle_ids=bus_vehicle_ids
)
number_of_bus_vehicle_documents = len(bus_vehicle_documents)
if counter is None or number_of_bus_vehicle_documents <= counter:
for bus_vehicle_document in bus_vehicle_documents:
print bus_vehicle_document
else:
for i in range(0, counter):
bus_vehicle_document = bus_vehicle_documents[i]
print bus_vehicle_document
print 'number_of_bus_vehicle_documents:', number_of_bus_vehicle_documents
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def print_node_document(self, object_id=None, osm_id=None, longitude=None, latitude=None):
"""
Print a node_document.
:param object_id: ObjectId
:param osm_id: int
:param longitude: float
:param latitude: float
:return: None
"""
node_document = self.find_node_document(
object_id=object_id,
osm_id=osm_id,
longitude=longitude,
latitude=latitude
)
print node_document
def get_oid(id_str):
try:
return bson.objectid.ObjectId(id_str)
except bson.errors.InvalidId:
return None
def str2id(document_id: str) -> bson.ObjectId:
"""Returns the document ID as ObjectID, or raises a BadRequest exception.
:raises: wz_exceptions.BadRequest
"""
if not document_id:
log.debug('str2id(%r): Invalid Object ID', document_id)
raise wz_exceptions.BadRequest('Invalid object ID %r' % document_id)
try:
return bson.ObjectId(document_id)
except (bson.objectid.InvalidId, TypeError):
log.debug('str2id(%r): Invalid Object ID', document_id)
raise wz_exceptions.BadRequest('Invalid object ID %r' % document_id)