def test_deref(self, test_db):
with pytest.raises(TypeError):
await test_db.dereference(5)
with pytest.raises(TypeError):
await test_db.dereference('hello')
with pytest.raises(TypeError):
await test_db.dereference(None)
assert await test_db.dereference(DBRef("test", ObjectId())) is None
obj = {'x': True}
key = (await test_db.test.insert_one(obj)).inserted_id
assert await test_db.dereference(DBRef('test', key)) == obj
assert await test_db.dereference(DBRef('test', key, 'aiomongo_test')) == obj
with pytest.raises(ValueError):
await test_db.dereference(DBRef('test', key, 'foo'))
assert await test_db.dereference(DBRef('test', 4)) is None
obj = {'_id': 4}
await test_db.test.insert_one(obj)
assert await test_db.dereference(DBRef('test', 4)) == obj
python类DBRef()的实例源码
def validate_unwrap(self, value, session=None):
''' Validates that the DBRef is valid as well as can be done without
retrieving it.
'''
if not isinstance(value, DBRef):
self._fail_validation_type(value, DBRef)
if self.type:
expected = self.type.type.get_collection_name()
got = value.collection
if expected != got:
self._fail_validation(value, '''Wrong collection for reference: '''
'''got "%s" instead of "%s" ''' % (got, expected))
if self.db_required and not value.database:
self._fail_validation(value, 'db_required=True, but not database specified')
if self.db and value.database and self.db != value.database:
self._fail_validation(value, '''Wrong database for reference: '''
''' got "%s" instead of "%s" ''' % (value.database, self.db) )
def setUp(self):
"""Setup class."""
from bson import DBRef, ObjectId
class Source(db.Document):
pass
class Model(db.Document):
src = db.ReferenceField(Source, dbref=True)
self.src_cls = Source
self.model_cls = Model
self.src_id = ObjectId()
self.data = json.dumps({
"src": {"collection": "source", "id": str(self.src_id)}
})
self.expected_data = {"src": DBRef("source", self.src_id)}
self.hook = generate_object_hook(self.model_cls)
def setUp(self):
"""Setup class."""
from bson import ObjectId, DBRef
class Source(db.Document):
pass
class Model(db.Document):
src = db.ReferenceField(Source)
self.src_cls = Source
self.model_cls = Model
self.src_id = ObjectId()
self.data = json.dumps({
"src": str(self.src_id)
})
self.expected_data = {
"src": DBRef("source", self.src_id)
}
self.hook = generate_object_hook(self.model_cls)
def __init__(self, document_type, dbref=False,
reverse_delete_rule=DO_NOTHING, **kwargs):
"""Initialises the Reference Field.
:param dbref: Store the reference as :class:`~pymongo.dbref.DBRef`
or as the :class:`~pymongo.objectid.ObjectId`.id .
:param reverse_delete_rule: Determines what to do when the referring
object is deleted
.. note ::
A reference to an abstract document type is always stored as a
:class:`~pymongo.dbref.DBRef`, regardless of the value of `dbref`.
"""
if (
not isinstance(document_type, six.string_types) and
not issubclass(document_type, Document)
):
self.error('Argument to ReferenceField constructor must be a '
'document class or a string')
self.dbref = dbref
self.document_type_obj = document_type
self.reverse_delete_rule = reverse_delete_rule
super(ReferenceField, self).__init__(**kwargs)
def __get__(self, instance, owner):
"""Descriptor to allow lazy dereferencing."""
if instance is None:
# Document class being used rather than a document object
return self
# Get value from document instance if available
value = instance._data.get(self.name)
self._auto_dereference = instance._fields[self.name]._auto_dereference
# Dereference DBRefs
if self._auto_dereference and isinstance(value, DBRef):
if hasattr(value, 'cls'):
# Dereference using the class type specified in the reference
cls = get_document(value.cls)
else:
cls = self.document_type
dereferenced = cls._get_db().dereference(value)
if dereferenced is None:
raise DoesNotExist('Trying to dereference unknown document %s' % value)
else:
instance._data[self.name] = cls._from_son(dereferenced)
return super(ReferenceField, self).__get__(instance, owner)
def __get__(self, instance, owner):
if instance is None:
# Document class being used rather than a document object
return self
# Get value from document instance if available
value = instance._data.get(self.name)
self._auto_dereference = instance._fields[self.name]._auto_dereference
# Dereference DBRefs
if self._auto_dereference and isinstance(value, DBRef):
dereferenced = self.document_type._get_db().dereference(value)
if dereferenced is None:
raise DoesNotExist('Trying to dereference unknown document %s' % value)
else:
instance._data[self.name] = self.document_type._from_son(dereferenced)
return super(CachedReferenceField, self).__get__(instance, owner)
def to_mongo(self, document, use_db_field=True, fields=None):
id_field_name = self.document_type._meta['id_field']
id_field = self.document_type._fields[id_field_name]
if isinstance(document, Document):
# We need the id from the saved object to create the DBRef
id_ = document.pk
if id_ is None:
self.error('You can only reference documents once they have'
' been saved to the database')
else:
self.error('Only accept a document object')
# TODO: should raise here or will fail next statement
value = SON((
('_id', id_field.to_mongo(id_)),
))
if fields:
new_fields = [f for f in self.fields if f in fields]
else:
new_fields = self.fields
value.update(dict(document.to_mongo(use_db_field, fields=new_fields)))
return value
def test_deref_kwargs(self, mongo, test_db):
await test_db.test.insert_one({'_id': 4, 'foo': 'bar'})
db = mongo.get_database(
'aiomongo_test', codec_options=CodecOptions(document_class=SON))
assert SON([('foo', 'bar')]) == await db.dereference(DBRef('test', 4), projection={'_id': False})
def dereference(self, session, ref, allow_none=False):
""" Dereference an ObjectID to this field's underlying type """
ref = DBRef(id=ref, collection=self.type.type.get_collection_name(),
database=self.db)
ref.type = self.type.type
return session.dereference(ref, allow_none=allow_none)
def unwrap(self, value, fields=None, session=None):
''' If ``autoload`` is False, return a DBRef object. Otherwise load
the object.
'''
self.validate_unwrap(value)
value.type = self.type
return value
def dereference(self, session, ref, allow_none=False):
""" Dereference a pymongo "DBRef" to this field's underlying type """
from ommongo.document import collection_registry
# TODO: namespace support
ref.type = collection_registry['global'][ref.collection]
obj = session.dereference(ref, allow_none=allow_none)
return obj
def to_ref(self, db=None):
return DBRef(id=self.mongo_id,
collection=self.get_collection_name(),
database=db)
def setUp(self):
"""Setup."""
self._id = ObjectId()
self.doc = IDCheckDocument(ref=DBRef("referenced_document", self._id))
self.disabled_check = DisabledIDCheckDocument(
enable_gj=True, ref=DBRef("referenced_document", self._id)
)
def setUp(self):
"""Setup."""
self.referenced_doc = ReferencedDocument(name="hi")
self.referenced_doc.save()
self.doc = IDCheckDocument(ref=self.referenced_doc)
self.ref_doc = IDCheckDocument(
enable_gj=True,
ref=DBRef("referenced_document", self.referenced_doc.pk)
)
def to_mongo(self, document):
if isinstance(document, DBRef):
if not self.dbref:
return document.id
return document
if isinstance(document, Document):
# We need the id from the saved object to create the DBRef
id_ = document.pk
if id_ is None:
self.error('You can only reference documents once they have'
' been saved to the database')
# Use the attributes from the document instance, so that they
# override the attributes of this field's document type
cls = document
else:
id_ = document
cls = self.document_type
id_field_name = cls._meta['id_field']
id_field = cls._fields[id_field_name]
id_ = id_field.to_mongo(id_)
if self.document_type._meta.get('abstract'):
collection = cls._get_collection_name()
return DBRef(collection, id_, cls=cls._class_name)
elif self.dbref:
collection = cls._get_collection_name()
return DBRef(collection, id_)
return id_
def to_python(self, value):
"""Convert a MongoDB-compatible type to a Python type."""
if (not self.dbref and
not isinstance(value, (DBRef, Document, EmbeddedDocument))):
collection = self.document_type._get_collection_name()
value = DBRef(collection, self.document_type.id.to_python(value))
return value
def validate(self, value):
if not isinstance(value, (self.document_type, DBRef)):
self.error('A ReferenceField only accepts DBRef or documents')
if isinstance(value, Document) and value.id is None:
self.error('You can only reference documents once they have been '
'saved to the database')
if self.document_type._meta.get('abstract') and \
not isinstance(value, self.document_type):
self.error(
'%s is not an instance of abstract reference type %s' % (
self.document_type._class_name)
)
def _validate_choices(self, value):
if isinstance(value, dict):
# If the field has not been dereferenced, it is still a dict
# of class and DBRef
value = value.get('_cls')
elif isinstance(value, Document):
value = value._class_name
super(GenericReferenceField, self)._validate_choices(value)
def validate(self, value):
if not isinstance(value, (Document, DBRef, dict, SON)):
self.error('GenericReferences can only contain documents')
if isinstance(value, (dict, SON)):
if '_ref' not in value or '_cls' not in value:
self.error('GenericReferences can only contain documents')
# We need the id from the saved object to create the DBRef
elif isinstance(value, Document) and value.id is None:
self.error('You can only reference documents once they have been'
' saved to the database')
def to_mongo(self, document):
if document is None:
return None
if isinstance(document, (dict, SON, ObjectId, DBRef)):
return document
id_field_name = document.__class__._meta['id_field']
id_field = document.__class__._fields[id_field_name]
if isinstance(document, Document):
# We need the id from the saved object to create the DBRef
id_ = document.id
if id_ is None:
self.error('You can only reference documents once they have'
' been saved to the database')
else:
id_ = document
id_ = id_field.to_mongo(id_)
collection = document._get_collection_name()
ref = DBRef(collection, id_)
return SON((
('_cls', document._class_name),
('_ref', ref)
))
def to_python(self, value):
"""Convert a MongoDB-compatible type to a Python type."""
if isinstance(value, six.string_types):
return value
if hasattr(value, 'to_python'):
return value.to_python()
is_list = False
if not hasattr(value, 'items'):
try:
is_list = True
value = {k: v for k, v in enumerate(value)}
except TypeError: # Not iterable return the value
return value
if self.field:
self.field._auto_dereference = self._auto_dereference
value_dict = {key: self.field.to_python(item)
for key, item in list(value.items())}
else:
Document = _import_class('Document')
value_dict = {}
for k, v in list(value.items()):
if isinstance(v, Document):
# We need the id from the saved object to create the DBRef
if v.pk is None:
self.error('You can only reference documents once they'
' have been saved to the database')
collection = v._get_collection_name()
value_dict[k] = DBRef(collection, v.pk)
elif hasattr(v, 'to_python'):
value_dict[k] = v.to_python()
else:
value_dict[k] = self.to_python(v)
if is_list: # Convert back to a list
return [v for _, v in sorted(list(value_dict.items()),
key=operator.itemgetter(0))]
return value_dict
def _deserialize(self, value, attr, data):
if value is None:
return None
if isinstance(value, DBRef):
if self._document_cls.collection.name != value.collection:
raise ValidationError(_("DBRef must be on collection `{collection}`.").format(
self._document_cls.collection.name))
value = value.id
elif isinstance(value, Reference):
if value.document_cls != self.document_cls:
raise ValidationError(_("`{document}` reference expected.").format(
document=self.document_cls.__name__))
if type(value) is not self.reference_cls:
value = self.reference_cls(value.document_cls, value.pk)
return value
elif isinstance(value, self.document_cls):
if not value.is_created:
raise ValidationError(
_("Cannot reference a document that has not been created yet."))
value = value.pk
elif isinstance(value, self._document_implementation_cls):
raise ValidationError(_("`{document}` reference expected.").format(
document=self.document_cls.__name__))
value = super()._deserialize(value, attr, data)
# `value` is similar to data received from the database so we
# can use `_deserialize_from_mongo` to finish the deserialization
return self._deserialize_from_mongo(value)
def __eq__(self, other):
if isinstance(other, self.document_cls):
return other.pk == self.pk
elif isinstance(other, Reference):
return self.pk == other.pk and self.document_cls == other.document_cls
elif isinstance(other, DBRef):
return self.pk == other.id and self.document_cls.collection.name == other.collection
return NotImplemented
def __eq__(self, other):
from .data_objects import Reference
if self.pk is None:
return self is other
elif isinstance(other, self.__class__) and other.pk is not None:
return self.pk == other.pk
elif isinstance(other, DBRef):
return other.collection == self.collection.name and other.id == self.pk
elif isinstance(other, Reference):
return isinstance(self, other.document_cls) and self.pk == other.pk
return NotImplemented
def dbref(self):
"""
Return a pymongo DBRef instance related to the document
"""
if not self.is_created:
raise NotCreatedError('Must create the document before'
' having access to DBRef')
return DBRef(collection=self.collection.name, id=self.pk)
def test_dbref(self):
student = self.Student()
with pytest.raises(exceptions.NotCreatedError):
student.dbref
# Fake document creation
student.id = ObjectId('573b352e13adf20d13d01523')
student.is_created = True
student.clear_modified()
assert student.dbref == DBRef(collection='student',
id=ObjectId('573b352e13adf20d13d01523'))
def test_dereference(self):
data = {'_id': '1234567890', 'content': 'testcontent'}
ref = DBRef('test.collection', '1234567890')
db.add_oneDictIntoCollection(data, 'test.collection')
self.assertEqual(data, db.dereference(ref))
def _follow_reference(self, max_depth, current_depth,
use_db_field, *args, **kwargs):
from .fields import FollowReferenceField
ret = {}
for fldname in self:
fld = self._fields.get(fldname)
is_list = isinstance(fld, db.ListField)
target = fld.field if is_list else fld
if all([
isinstance(
target, (db.ReferenceField, db.EmbeddedDocumentField)
), not isinstance(target, FollowReferenceField)
]):
value = None
if is_list:
value = []
for doc in getattr(self, fldname, []):
value.append(json.loads((
target.document_type.objects(
id=doc.id
).get() if isinstance(doc, DBRef) else doc
).to_json(
follow_reference=True,
max_depth=max_depth,
current_depth=current_depth + 1,
use_db_field=use_db_field,
*args, **kwargs
)))
else:
doc = getattr(self, fldname, None)
value = json.loads(
(
target.document_type.objects(
id=doc.id
).get() if isinstance(doc, DBRef) else doc
).to_json(
follow_reference=True,
max_depth=max_depth,
current_depth=current_depth + 1,
use_db_field=use_db_field,
*args, **kwargs
)
) if doc else doc
if value is not None:
ret.update({fldname: value})
return ret