def test_add_operators_to_field_list_field():
from mongoengine import ListField, SortedListField
from graphene_mongo.operators import gen_operators_of_field, allowed_operators
from graphene_mongo.fields.respective import respective_special_fields, respective_fields
for m_field in [ListField, SortedListField]:
for f, r_graphene in respective_fields.items():
field = m_field(f())
applied_operators = gen_operators_of_field('test', field, respective_special_fields[m_field],
allowed_operators(field))
expected = format_fields(['size'])
assert len(applied_operators.keys()) == len(expected)
assert sorted(list(applied_operators.keys())) == sorted(expected)
obj_list_field = applied_operators['test']('listTest', field)
assert isinstance(obj_list_field, graphene.List)
# here we test to assert that the type of items of the list is what is suppose to be
assert isinstance(obj_list_field.of_type, type(r_graphene))
python类ListField()的实例源码
def __get_doc(self, fld, item):
"""Get document as dict or a list of documents."""
from .fields import FollowReferenceField
@singledispatch
def doc(fld, item):
return item
@doc.register(db.ListField)
def doc_list(fld, item):
return [self.__get_doc(fld.field, el) for el in item]
@doc.register(FollowReferenceField)
def doc_frl(fld, item):
doc = fld.document_type.objects(id=item).get()
doc.begin_goodjson()
result = doc.to_mongo()
doc.end_goodjson()
return result
result = doc(fld, item)
if isinstance(result, dict) and "id" not in result and "_id" in result:
result["id"] = result.pop("_id")
return result
def is_to_many_relationship(field):
"""
Returns True, if the *field* describes a *to many* relationship.
The field types are:
* :class:`mongoengine.ListField`
* :class:`mongoengine.SortedListField`
with a simple reference field as element.
"""
if isinstance(field, mongoengine.ListField) \
and is_to_one_relationship(field.field):
return True
if isinstance(field, mongoengine.SortedListField)\
and is_to_one_relationship(field.field):
return True
return False
def test_list_field_no_field():
""" Assert that raises error if a ListField is given without a type, for instance: ListField() """
from graphene_mongo.fields.special_fields import list_field
from mongoengine import ListField
with pytest.raises(Exception) as e_info:
list_field('test_field', ListField())
assert str(e_info.value) == str(AttributeError('Error in {} field, have sure that this is defined with a '
'mongoengine field'.format('test_field')))
def __set_gj_flag_sub_field(self, instance, fld, cur_depth):
"""Set $$good_json$$ flag to subfield."""
from .fields import FollowReferenceField
def set_good_json(fld):
setattr(fld, "$$good_json$$", True)
setattr(fld, "$$cur_depth$$", cur_depth)
@singledispatch
def set_flag_recursive(fld, instance):
set_good_json(fld)
@set_flag_recursive.register(db.ListField)
def set_flag_list(fld, instance):
set_good_json(fld.field)
@set_flag_recursive.register(db.EmbeddedDocumentField)
def set_flag_emb(fld, instance):
if isinstance(instance, Helper):
instance.begin_goodjson(cur_depth)
@set_flag_recursive.register(FollowReferenceField)
def set_flag_self(fld, instance):
set_good_json(fld)
set_flag_recursive(fld, instance)
def __unset_gj_flag_sub_field(self, instance, fld, cur_depth):
"""Unset $$good_json$$ to subfield."""
from .fields import FollowReferenceField
def unset_flag(fld):
setattr(fld, "$$good_json$$", None)
setattr(fld, "$$cur_depth$$", None)
delattr(fld, "$$good_json$$")
delattr(fld, "$$cur_depth$$")
@singledispatch
def unset_flag_recursive(fld, instance):
unset_flag(fld)
@unset_flag_recursive.register(db.ListField)
def unset_flag_list(fld, instance):
unset_flag(fld.field)
@unset_flag_recursive.register(db.EmbeddedDocumentField)
def unset_flag_emb(fld, instance):
if isinstance(instance, Helper):
instance.end_goodjson(cur_depth)
@unset_flag_recursive.register(FollowReferenceField)
def unset_flag_self(fld, instance):
unset_flag(fld)
unset_flag_recursive(fld, instance)
def setUp(self):
"""Setup the class."""
class SelfReferenceDocument(Document):
name = db.StringField()
reference = db.ReferenceField("self")
class TestDocument(Document):
title = db.StringField()
references = db.ListField(
db.ReferenceField(SelfReferenceDocument)
)
self.references = [
SelfReferenceDocument(
pk=ObjectId(), name=("test {}").format(counter)
) for counter in range(3)
]
for (index, srd) in enumerate(self.references):
srd.reference = self.references[
(index + 1) % len(self.references)
]
srd.to_json = MagicMock(side_effect=srd.to_json)
self.model_cls = TestDocument
self.model = TestDocument(
pk=ObjectId(), title="Test", references=self.references
)
self.model.to_mongo = MagicMock(
return_value={
"id": self.model.id, "title": self.model.title,
"references": self.references
}
)
self.model.to_mongo = lambda x: {
"id": self.model.pk,
"title": "Test",
"references": [str(srd.pk) for srd in self.references]
}
def _document_typeof(doc_cls, field_name):
try:
orm_field = doc_cls._fields[field_name]
except (KeyError, AttributeError):
return None
if isinstance(orm_field, ListField):
orm_field = orm_field.field
if isinstance(orm_field, (ReferenceField, EmbeddedDocumentField)):
return orm_field.document_type
return None
def _follow_reference(self, max_depth, current_depth,
use_db_field, *args, **kwargs):
from .fields import FollowReferenceField
ret = {}
for fldname in self:
fld = self._fields.get(fldname)
is_list = isinstance(fld, db.ListField)
target = fld.field if is_list else fld
if all([
isinstance(
target, (db.ReferenceField, db.EmbeddedDocumentField)
), not isinstance(target, FollowReferenceField)
]):
value = None
if is_list:
value = []
for doc in getattr(self, fldname, []):
value.append(json.loads((
target.document_type.objects(
id=doc.id
).get() if isinstance(doc, DBRef) else doc
).to_json(
follow_reference=True,
max_depth=max_depth,
current_depth=current_depth + 1,
use_db_field=use_db_field,
*args, **kwargs
)))
else:
doc = getattr(self, fldname, None)
value = json.loads(
(
target.document_type.objects(
id=doc.id
).get() if isinstance(doc, DBRef) else doc
).to_json(
follow_reference=True,
max_depth=max_depth,
current_depth=current_depth + 1,
use_db_field=use_db_field,
*args, **kwargs
)
) if doc else doc
if value is not None:
ret.update({fldname: value})
return ret
def from_json(cls, json_str, created=False, *args, **kwargs):
"""
Decode from human-readable json.
Parameters:
json_str: JSON string that should be passed to the serialized
created: a parameter that is passed to cls._from_son.
*args, **kwargs: Any additional arguments that is passed to
json.loads.
"""
from .fields import FollowReferenceField
hook = generate_object_hook(cls)
if "object_hook" not in kwargs:
kwargs["object_hook"] = hook
dct = json.loads(json_str, *args, **kwargs)
for name, fld in cls._fields.items():
if any([
getattr(fld, "exclude_from_json", None),
getattr(fld, "exclude_json", None)
]):
dct.pop(name, None)
from_son_result = cls._from_son(SON(dct), created=created)
@singledispatch
def normalize_reference(ref_id, fld):
"""Normalize Reference."""
return ref_id and fld.to_python(ref_id) or None
@normalize_reference.register(dict)
def normalize_reference_dict(ref_id, fld):
"""Normalize Reference for dict."""
return fld.to_python(ref_id["id"])
@normalize_reference.register(list)
def normalize_reference_list(ref_id, fld):
"""Normalize Reference for list."""
return [
normalize_reference(ref.id, fld) for ref in ref_id
]
for fldname, fld in cls._fields.items():
target = fld.field if isinstance(fld, db.ListField) else fld
if not isinstance(target, db.ReferenceField) or \
isinstance(target, FollowReferenceField):
continue
value = dct.get(fldname)
setattr(
from_son_result, fldname,
normalize_reference(getattr(value, "id", value), target)
)
return from_son_result