def post(self):
#
# ???????
# ????session
#
password = self.get_body_argument('password', '')
if password:
user = yield self.db.user.find_one({
'username': self.current_user['username']
})
_ = md5(password + self.settings['salt'])
if user['password'] == _.hexdigest():
if self.get_cookie('TORNADOSESSION'):
self.clear_cookie('TORNADOSESSION')
self.db.user.remove({
'_id': ObjectId(self.current_user['_id'])
})
self.session.delete('user_session')
self.redirect('/')
self.custom_error('????????')
python类ObjectId()的实例源码
def get(self, topic_id):
'''
?????
'''
topic = dict()
tid = ObjectId(topic_id)
topic = yield self.db.topic.find_one({
'_id': tid
})
if topic is None:
self.custom_error()
isfavorite = False
current_user = self.current_user
topic['content'] = markdown.markdown(topic['content'])
isfavorite = False
if current_user:
user = yield self.db.user.find_one({
'_id': ObjectId(current_user['_id'])
})
if topic['_id'] in user['favorite']:
isfavorite = True
self.render('topic/template/topic-detail.html',
topic=topic, isfavorite=isfavorite)
def get(self):
uid = self.current_user['_id']
limit = 20
page = int(self.get_query_argument('page', '1'))
fav_list = yield self.db.user.find_one({
'_id': ObjectId(uid)
}, {
'favorite': 1
})
cursor = self.db.topic.find({
'_id': {
'$in': fav_list['favorite']
}
})
total = yield cursor.count()
cursor.limit(
limit).skip((page - 1) * limit)
topics = yield cursor.to_list(length=limit)
self.render('topic/template/topic-favorite.html',
topics=topics, page=page, limit=limit, total=total)
def post(self, tid):
topic = {
'title': self.get_body_argument('title', ''),
'content': self.get_body_argument('content', ''),
'price': self.get_body_argument('price', 0)
}
model = TopicModel()
if model(topic):
try:
tid = ObjectId(tid)
except:
self.cache['topic'] = topic
self.custom_error('????????????')
yield self.db.topic.update({
'_id': tid
}, {
'$set': topic
})
self.redirect('/topics/{}'.format(tid))
else:
self.custom_error()
def user_action(self, *args, **kwargs):
uid = self.get_body_argument('uid')
user = {
'email': self.get_body_argument('email'),
'website': self.get_body_argument('website'),
'qq': self.get_body_argument('qq'),
'address': self.get_body_argument('address')
}
# model ??
#
#
password = self.get_body_argument('password', '')
# password ?????
if password:
user['password'] = md5(password, self.settings['salt'])
user = yield self.db.user.find_and_modify({
'_id': ObjectId(uid)
}, {
'$set': user
})
self.redirect('/manage/userdetail/{}'.format(uid))
def post(self):
id_ = self.get_body_argument('id', None)
if not id_:
self.custom_error()
rtn = yield self.db.tag.remove({
'_id': ObjectId(id_)
})
if rtn:
self.write('{"success":true}')
self.finish()
else:
self.custom_error()
#
# ???????
#
def handle_aggregation(self, callback):
def handle(*arguments, **kw):
if arguments[1]:
raise RuntimeError('Aggregation failed due to: %s' % str(arguments[1]))
results = []
for item in arguments[0]['result']:
#if '_id' in item and isinstance(item['_id'], ObjectId):
#results.append(self.get_instance(item))
#else:
self.fill_ids(item)
results.append(edict(item))
callback(results)
return handle
def test_id_can_be_anything(self, test_db):
await test_db.test.delete_many({})
auto_id = {'hello': 'world'}
await test_db.test.insert_one(auto_id)
assert isinstance(auto_id['_id'], ObjectId)
numeric = {'_id': 240, 'hello': 'world'}
await test_db.test.insert_one(numeric)
assert numeric['_id'] == 240
obj = {'_id': numeric, 'hello': 'world'}
await test_db.test.insert_one(obj)
assert obj['_id'] == numeric
async with test_db.test.find() as cursor:
async for x in cursor:
assert x['hello'] == 'world'
assert '_id' in x
def test_find_one(self, test_db):
_id = (await test_db.test.insert_one({'hello': 'world', 'foo': 'bar'})).inserted_id
assert 'world' == (await test_db.test.find_one())['hello']
assert await test_db.test.find_one(_id) == await test_db.test.find_one()
assert await test_db.test.find_one(None) == await test_db.test.find_one()
assert await test_db.test.find_one({}) == await test_db.test.find_one()
assert await test_db.test.find_one({'hello': 'world'}) == await test_db.test.find_one()
assert 'hello' in await test_db.test.find_one(projection=['hello'])
assert 'hello' not in await test_db.test.find_one(projection=['foo'])
assert ['_id'] == list(await test_db.test.find_one(projection=[]))
assert await test_db.test.find_one({'hello': 'foo'}) is None
assert await test_db.test.find_one(ObjectId()) is None
def test_deref(self, test_db):
with pytest.raises(TypeError):
await test_db.dereference(5)
with pytest.raises(TypeError):
await test_db.dereference('hello')
with pytest.raises(TypeError):
await test_db.dereference(None)
assert await test_db.dereference(DBRef("test", ObjectId())) is None
obj = {'x': True}
key = (await test_db.test.insert_one(obj)).inserted_id
assert await test_db.dereference(DBRef('test', key)) == obj
assert await test_db.dereference(DBRef('test', key, 'aiomongo_test')) == obj
with pytest.raises(ValueError):
await test_db.dereference(DBRef('test', key, 'foo'))
assert await test_db.dereference(DBRef('test', 4)) is None
obj = {'_id': 4}
await test_db.test.insert_one(obj)
assert await test_db.dereference(DBRef('test', 4)) == obj
def test_insert_find_one(self, test_db):
a_doc = SON({'hello': 'world'})
a_key = (await test_db.test.insert_one(a_doc)).inserted_id
assert isinstance(a_doc['_id'], ObjectId)
assert a_doc['_id'] == a_key
assert a_doc == await test_db.test.find_one({'_id': a_doc['_id']})
assert a_doc == await test_db.test.find_one(a_key)
assert await test_db.test.find_one(ObjectId()) is None
assert a_doc == await test_db.test.find_one({'hello': 'world'})
assert await test_db.test.find_one({'hello': 'test'}) is None
b = await test_db.test.find_one()
b['hello'] = 'mike'
await test_db.test.replace_one({'_id': b['_id']}, b)
assert a_doc != await test_db.test.find_one(a_key)
assert b == await test_db.test.find_one(a_key)
assert b == await test_db.test.find_one()
count = 0
async with test_db.test.find() as cursor:
async for _ in cursor:
count += 1
assert count == 1
def search_entries_by_id(self, id):
"""Search For Entries created on the specified Date in the Entries_Table
Args:
id(string): the objectID you are searching for
Return:
result(collection): the search result
"""
entries_table = self.db["Entries_Table"]
try:
return entries_table.find(
{"_id": ObjectId(id)})
except errors.ServerSelectionTimeoutError:
print('ERROR : No connection could be made because'
' the target machine actively refused it')
return True
return False
def update_entries(self, _id, vals):
"""Update entries in the Entries_Table
Args:
_id(ObjectId): ObjectID of the entry you want to change
vals(collection): New values
Return:
result(bool):True if the update was successful. False if it fails
"""
entries_table = self.db["Entries_Table"]
try:
vals["Last_Modified"] = datetime.now()
if not entries_table.find_one({"_id": ObjectId(_id)}):
return False
entries_table.update_one({"_id": ObjectId(_id)},
{"$set": vals})
return True
except errors.ServerSelectionTimeoutError:
print('ERROR : No connection could be made because'
' the target machine actively refused it')
return False
def delete_entries(self, _id):
"""Delete entries in the Entries_Table
Args:
_id(str): Object ID of the entry you want to change
Return:
result(bool):true if the delete was successful. false if it fails
"""
entries_table = self.db["Entries_Table"]
try:
if not entries_table.find_one({"_id": ObjectId(_id)}):
# print "The specified entry does not Exist"
return False
entries_table.delete_one({"_id": ObjectId(_id)})
return True
except errors.ServerSelectionTimeoutError:
print('ERROR : No connection could be made because'
' the target machine actively refused it')
return False
def get_run(self, run_id):
"""
Get a single run from the database.
:param run_id: The ID of the run.
:return: The whole object from the database.
"""
try:
cursor = getattr(self._db, self._collection_name) \
.find({"_id": int(run_id)})
except ValueError:
# Probably not a number.
cursor = getattr(self._db, self._collection_name) \
.find({"_id": bson.ObjectId(run_id)})
run = None
for c in cursor:
run = c
return run
def test_find_record(mongo_client):
generic_dao = GenericDAO(mongo_client, "testdb")
r = generic_dao.find_record("runs", {"_id": "NON_EXISTING_ID"})
assert r is None
r = generic_dao.find_record("runs",
{"_id": bson.ObjectId(
"58163443b1758523257c69ca")})
assert r is not None
assert r["config"] is not None
assert r["config"]["seed"] == 185616783
r = generic_dao.find_record("runs", {"config.learning_rate": 0.0001})
assert r is not None
assert r["config"] is not None
assert r["config"]["seed"] == 144363069
def test04_write_json_with_mongo_objects():
schema = {
'db': {'coll': {'object': {'_id': {'type': 'oid', 'anonymized': ObjectId()},
'date': {'type': 'date', 'anonymized': datetime(2015, 1, 1)}}}}
}
output = os.path.join(TEST_DIR, 'output_data_dict.json')
output_maker = JsonOutput({})
output_maker.data = schema
with open(output, 'w') as out_fd:
output_maker.write_data(out_fd)
with open(output, 'r') as out_fd:
# custom json_options - DEFAULT_JSON_OPTIONS sets tzinfo when loading datetime (!= schema)
json_options = json_util.JSONOptions(tz_aware=False)
# object_hook allows to interpret $oid, $date, etc from json and convert them to objects
object_hook = partial(json_util.object_hook, json_options=json_options)
assert json.load(out_fd, object_hook=object_hook) == schema
os.remove(output)
def test_unwrap():
class A(Document):
x = IntField()
s = get_session()
a = A(x=5)
s.save(a)
aref = {'$id':a.mongo_id, '$ref':'A'}
dbaref = DBRef(db='unit-testing', collection='A', id=a.mongo_id)
ret = RefField(DocumentField(A)).unwrap(dbaref)
assert isinstance(ret, DBRef), ret
ret = SRefField(A).unwrap(a.mongo_id)
assert isinstance(ret, ObjectId), ret
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def delete_address_document(self, object_id=None, name=None):
"""
Delete an address_document.
:param object_id: ObjectId
:param name: string
:return: True if the address_document was successfully deleted, otherwise False.
"""
if object_id is not None:
result = self.address_documents_collection.delete_one({
'_id': ObjectId(object_id)
})
elif name is not None:
result = self.address_documents_collection.delete_one({
'name': name
})
else:
return False
return result.deleted_count == 1
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def delete_address_documents(self, object_ids=None, names=None):
"""
Delete multiple address_documents.
:param object_ids: [ObjectId]
:param names: [string]
:return: The number of deleted documents.
"""
if object_ids is not None:
processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
result = self.address_documents_collection.delete_many({
'_id': {'$in': processed_object_ids}
})
elif names is not None:
result = self.address_documents_collection.delete_many({
'name': {'$in': names}
})
else:
return 0
return result.deleted_count
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def delete_bus_line_documents(self, object_ids=None, bus_line_ids=None):
"""
Delete multiple bus_line_document.
:param object_ids: [ObjectId]
:param bus_line_ids: [int]
:return: The number of deleted documents.
"""
if object_ids is not None:
processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
result = self.bus_line_documents_collection.delete_many({
'_id': {'$in': processed_object_ids}
})
elif bus_line_ids is not None:
result = self.bus_line_documents_collection.delete_many({
'bus_line_id': {'$in': bus_line_ids}
})
else:
return 0
return result.deleted_count
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def delete_bus_stop_document(self, object_id=None, osm_id=None):
"""
Delete a bus_stop_document.
:param object_id: ObjectId
:param osm_id: int
:return: True if the bus_stop_document was successfully deleted, otherwise False.
"""
if object_id is not None:
result = self.bus_stop_documents_collection.delete_one({
'_id': ObjectId(object_id)
})
elif osm_id is not None:
result = self.bus_stop_documents_collection.delete_one({
'osm_id': osm_id
})
else:
return False
return result.deleted_count == 1
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def delete_bus_stop_documents(self, object_ids=None, osm_ids=None):
"""
Delete multiple bus_stop_documents.
:param object_ids: [ObjectId]
:param osm_ids: [int]
:return: The number of deleted documents.
"""
if object_ids is not None:
processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
result = self.bus_stop_documents_collection.delete_many({
'_id': {'$in': processed_object_ids}
})
elif osm_ids is not None:
result = self.bus_stop_documents_collection.delete_many({
'osm_id': {'$in': osm_ids}
})
else:
return 0
return result.deleted_count
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def delete_bus_vehicle_documents(self, object_ids=None, bus_vehicle_ids=None):
"""
Delete multiple bus_vehicle_documents.
:param object_ids: [ObjectId]
:param bus_vehicle_ids: [int]
:return: The number of deleted documents.
"""
if object_ids is not None:
processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
result = self.bus_vehicle_documents_collection.delete_many({
'_id': {'$in': processed_object_ids}
})
elif bus_vehicle_ids is not None:
result = self.bus_vehicle_documents_collection.delete_many({
'bus_vehicle_id': {'$in': bus_vehicle_ids}
})
else:
return 0
return result.deleted_count
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def delete_edge_document(self, object_id=None, starting_node_osm_id=None, ending_node_osm_id=None):
"""
Delete an edge_document.
:param object_id: ObjectId
:param starting_node_osm_id: int
:param ending_node_osm_id: int
:return: True if the document was successfully deleted, otherwise False.
"""
if object_id is not None:
result = self.edge_documents_collection.delete_one({
'_id': ObjectId(object_id)
})
elif starting_node_osm_id is not None and ending_node_osm_id is not None:
result = self.edge_documents_collection.delete_one({
'starting_node.osm_id': starting_node_osm_id,
'ending_node.osm_id': ending_node_osm_id
})
else:
return False
return result.deleted_count == 1
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def delete_node_document(self, object_id=None, osm_id=None):
"""
Delete a node_document.
:param object_id: ObjectId
:param osm_id: int
:return: True if node_document was successfully deleted, otherwise False.
"""
if object_id is not None:
result = self.node_documents_collection.delete_one({
'_id': ObjectId(object_id)
})
elif osm_id is not None:
result = self.node_documents_collection.delete_one({
'osm_id': osm_id
})
else:
return False
return result.deleted_count == 1
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 70
收藏 0
点赞 0
评论 0
def delete_node_documents(self, object_ids=None, osm_ids=None):
"""
Delete multiple node_documents.
:param object_ids: [ObjectId]
:param osm_ids: [int]
:return: The number of deleted documents.
"""
if object_ids is not None:
processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
result = self.node_documents_collection.delete_many({
'_id': {'$in': processed_object_ids}
})
elif osm_ids is not None:
result = self.node_documents_collection.delete_many({
'osm_id': {'$in': osm_ids}
})
else:
return 0
return result.deleted_count
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def delete_point_documents(self, object_ids=None, osm_ids=None):
"""
Delete multiple point_document.
:param object_ids: [ObjectId]
:param osm_ids: [int]
:return: The number of deleted documents.
"""
if object_ids is not None:
processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
result = self.point_documents_collection.delete_many({
'_id': {'$in': processed_object_ids}
})
elif osm_ids is not None:
result = self.point_documents_collection.delete_many({
'osm_id': {'$in': osm_ids}
})
else:
return 0
return result.deleted_count
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def delete_timetable_document(self, object_id=None, timetable_id=None):
"""
Delete a timetable_document.
:param object_id: ObjectId
:param timetable_id: int
:return: True if the document was successfully deleted, otherwise False.
"""
if object_id is not None:
result = self.timetable_documents_collection.delete_one({
'_id': ObjectId(object_id)
})
elif timetable_id is not None:
result = self.timetable_documents_collection.delete_one({
'timetable_id': timetable_id
})
else:
return False
return result.deleted_count == 1
mongodb_database_connection.py 文件源码
项目:dynamic-bus-scheduling
作者: pinac0099
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def delete_traffic_event_document(self, object_id=None, event_id=None):
"""
Delete a traffic_event_document.
:param object_id: ObjectId
:param event_id: string
:return: True if the traffic_event_document was successfully deleted, otherwise False.
"""
if object_id is not None:
result = self.traffic_event_documents_collection.delete_one({
'_id': ObjectId(object_id)
})
elif event_id is not None:
result = self.traffic_event_documents_collection.delete_one({
'event_id': event_id
})
else:
return False
return result.deleted_count == 1