def test_add_operators_to_field_reference_field():
from mongoengine import ReferenceField, Document, StringField
from graphene_mongo.operators import gen_operators_of_field, allowed_operators
from graphene_mongo.fields import respective_special_fields
class Other(Document):
name = StringField()
class Test(Document):
test = ReferenceField(Other)
field = Test.test
r_graphene = respective_special_fields[type(field)]
applied_operators = gen_operators_of_field('test', field, r_graphene('test', field), allowed_operators(field))
assert sorted(list(applied_operators.keys())) == format_fields(['in', 'nin', 'ne'])
assert isinstance(applied_operators['test__in'], graphene.List)
assert isinstance(applied_operators['test__nin'], graphene.List)
assert isinstance(applied_operators['test__ne'], graphene.ID)
python类ReferenceField()的实例源码
def __init__(self, *args, **kwargs):
"""
Initialize the class.
Parameters:
*args, **kwsrgs: Any arguments to be passed to ReferenceField.
Keyword Arguments:
id_check: Set False to disable id check. By default, this value is
True
autosave: Set True to save/update the referenced document when
to_python is called.
max_depth: Set natural value to set depath limit for
loop-reference. By default this value is set to 3.
"""
self.id_check = kwargs.pop("id_check", True)
self.autosave = kwargs.pop("autosave", False)
self.max_depth = kwargs.pop("max_depth", 3)
super(FollowReferenceField, self).__init__(*args, **kwargs)
def setUp(self):
"""Setup class."""
from bson import DBRef, ObjectId
class Source(db.Document):
pass
class Model(db.Document):
src = db.ReferenceField(Source, dbref=True)
self.src_cls = Source
self.model_cls = Model
self.src_id = ObjectId()
self.data = json.dumps({
"src": {"collection": "source", "id": str(self.src_id)}
})
self.expected_data = {"src": DBRef("source", self.src_id)}
self.hook = generate_object_hook(self.model_cls)
def setUp(self):
"""Setup class."""
from bson import ObjectId, DBRef
class Source(db.Document):
pass
class Model(db.Document):
src = db.ReferenceField(Source)
self.src_cls = Source
self.model_cls = Model
self.src_id = ObjectId()
self.data = json.dumps({
"src": str(self.src_id)
})
self.expected_data = {
"src": DBRef("source", self.src_id)
}
self.hook = generate_object_hook(self.model_cls)
def test_mongoengine_field_references_self():
from mongoengine import Document, ReferenceField
class Test(Document):
parent = ReferenceField('self')
with pytest.raises(Exception) as e_info:
Options('TestSchema', {'model': Test})
assert str(e_info.value) == "It was not possible to generate schema for {} because the field {} is a " \
"ReferenceField to self and this is not supported yet."\
.format("TestSchema", 'parent')
def setUp(self):
"""Setup the class."""
class SelfReferenceDocument(Document):
name = db.StringField()
reference = db.ReferenceField("self")
class TestDocument(Document):
title = db.StringField()
references = db.ListField(
db.ReferenceField(SelfReferenceDocument)
)
self.references = [
SelfReferenceDocument(
pk=ObjectId(), name=("test {}").format(counter)
) for counter in range(3)
]
for (index, srd) in enumerate(self.references):
srd.reference = self.references[
(index + 1) % len(self.references)
]
srd.to_json = MagicMock(side_effect=srd.to_json)
self.model_cls = TestDocument
self.model = TestDocument(
pk=ObjectId(), title="Test", references=self.references
)
self.model.to_mongo = MagicMock(
return_value={
"id": self.model.id, "title": self.model.title,
"references": self.references
}
)
self.model.to_mongo = lambda x: {
"id": self.model.pk,
"title": "Test",
"references": [str(srd.pk) for srd in self.references]
}
def owner(self):
"""Return the Organization (instance) owning self.
We refrain from storing the owner as a me.ReferenceField in order to
avoid automatic/unwanted dereferencing.
"""
return Organization.objects.get(id=self.owner_id)
def conv_List(self, model, field, kwargs):
if isinstance(field.field, ReferenceField):
return ModelSelectMultipleField(model=field.field.document_type, **kwargs)
if field.field.choices:
kwargs['multiple'] = True
return self.convert(model, field.field, kwargs)
field_args = kwargs.pop("field_args", {})
unbound_field = self.convert(model, field.field, field_args)
unacceptable = {
'validators': [],
'filters': [],
'min_entries': kwargs.get('min_entries', 0)
}
kwargs.update(unacceptable)
return f.FieldList(unbound_field, **kwargs)
def _document_typeof(doc_cls, field_name):
try:
orm_field = doc_cls._fields[field_name]
except (KeyError, AttributeError):
return None
if isinstance(orm_field, ListField):
orm_field = orm_field.field
if isinstance(orm_field, (ReferenceField, EmbeddedDocumentField)):
return orm_field.document_type
return None
def is_to_one_relationship(field):
"""
Returns True, if the *field* is a reference field:
* :class:`mongoengine.ReferenceField`
* :class:`mongoengine.CachedReferenceField`
* :class:`mongoengine.GenericReferenceField`
All of these fields describe a *to-one* relationship.
"""
return isinstance(field, mongoengine.ReferenceField)\
or isinstance(field, mongoengine.CachedReferenceField)\
or isinstance(field, mongoengine.GenericReferenceField)
def _follow_reference(self, max_depth, current_depth,
use_db_field, *args, **kwargs):
from .fields import FollowReferenceField
ret = {}
for fldname in self:
fld = self._fields.get(fldname)
is_list = isinstance(fld, db.ListField)
target = fld.field if is_list else fld
if all([
isinstance(
target, (db.ReferenceField, db.EmbeddedDocumentField)
), not isinstance(target, FollowReferenceField)
]):
value = None
if is_list:
value = []
for doc in getattr(self, fldname, []):
value.append(json.loads((
target.document_type.objects(
id=doc.id
).get() if isinstance(doc, DBRef) else doc
).to_json(
follow_reference=True,
max_depth=max_depth,
current_depth=current_depth + 1,
use_db_field=use_db_field,
*args, **kwargs
)))
else:
doc = getattr(self, fldname, None)
value = json.loads(
(
target.document_type.objects(
id=doc.id
).get() if isinstance(doc, DBRef) else doc
).to_json(
follow_reference=True,
max_depth=max_depth,
current_depth=current_depth + 1,
use_db_field=use_db_field,
*args, **kwargs
)
) if doc else doc
if value is not None:
ret.update({fldname: value})
return ret
def from_json(cls, json_str, created=False, *args, **kwargs):
"""
Decode from human-readable json.
Parameters:
json_str: JSON string that should be passed to the serialized
created: a parameter that is passed to cls._from_son.
*args, **kwargs: Any additional arguments that is passed to
json.loads.
"""
from .fields import FollowReferenceField
hook = generate_object_hook(cls)
if "object_hook" not in kwargs:
kwargs["object_hook"] = hook
dct = json.loads(json_str, *args, **kwargs)
for name, fld in cls._fields.items():
if any([
getattr(fld, "exclude_from_json", None),
getattr(fld, "exclude_json", None)
]):
dct.pop(name, None)
from_son_result = cls._from_son(SON(dct), created=created)
@singledispatch
def normalize_reference(ref_id, fld):
"""Normalize Reference."""
return ref_id and fld.to_python(ref_id) or None
@normalize_reference.register(dict)
def normalize_reference_dict(ref_id, fld):
"""Normalize Reference for dict."""
return fld.to_python(ref_id["id"])
@normalize_reference.register(list)
def normalize_reference_list(ref_id, fld):
"""Normalize Reference for list."""
return [
normalize_reference(ref.id, fld) for ref in ref_id
]
for fldname, fld in cls._fields.items():
target = fld.field if isinstance(fld, db.ListField) else fld
if not isinstance(target, db.ReferenceField) or \
isinstance(target, FollowReferenceField):
continue
value = dct.get(fldname)
setattr(
from_son_result, fldname,
normalize_reference(getattr(value, "id", value), target)
)
return from_son_result