def to_dict(self):
d = model_to_dict(self)
tastyjson = self._serializer.to_json(d)
d = self._serializer.from_json(tastyjson)
d[DOC_TYPE_FIELD_NAME] = self.get_doc_type()
d['id'] = self.get_id()
if 'cbnosync_ptr' in d: del d['cbnosync_ptr']
if 'csrfmiddlewaretoken' in d: del d['csrfmiddlewaretoken']
for field in self._meta.fields:
if isinstance(field, DateTimeField):
d[field.name] = self._string_from_date(field.name)
if isinstance(field, ListField):
if isinstance(field.item_field, EmbeddedModelField):
self.to_dict_nested_list(field.name, d)
if isinstance(field.item_field, ModelReferenceField):
self.to_dict_reference_list(field.name, d)
if isinstance(field, EmbeddedModelField):
self.to_dict_nested(field.name, d)
if isinstance(field, ModelReferenceField):
self.to_dict_reference(field.name, d)
return d
python类model_to_dict()的实例源码
def create(body):
""" Create new tag
"""
try:
body.pop('id', None)
if body.get('tagType') not in TAG_TYPE:
raise BadRequest('Invalid or missing tag type')
existing = [tag.lower() for tag in Tag.objects.all().values_list('name', flat=True)]
if body['name'].lower().strip() in existing:
raise BadRequest('Tag already exists')
body['codename'] = body['name'].lower().replace(' ', '_')
tag = Tag.objects.get_or_create(**body)[0]
except (AttributeError, KeyError, FieldError, IntegrityError, ValueError):
raise BadRequest('Invalid fields in body')
return model_to_dict(tag)
def update(tag_id, body):
""" Update category
"""
try:
tag = Tag.objects.get(id=tag_id)
except (ObjectDoesNotExist, ValueError):
raise NotFound('Tag not found')
try:
body.pop('id', None)
existing = Tag.objects.exclude(id=tag.id).values_list('name', flat=True)
existing = [tg.lower() for tg in existing]
if body['name'].lower().strip() in existing:
raise BadRequest('Tag already exists')
Tag.objects.filter(pk=tag.pk).update(**body)
tag = Tag.objects.get(pk=tag.pk)
except (AttributeError, KeyError, FieldError, IntegrityError, ValueError, TypeError):
raise BadRequest('Invalid fields in body')
return model_to_dict(tag)
def _format_ticket_response(tickets, user):
""" Convert datetime object and add flat foreign key
"""
for ticket in tickets:
# Flat foreign models
if ticket.get('defendant'):
defendant = Defendant.objects.get(id=ticket['defendant'])
ticket['defendant'] = model_to_dict(defendant)
ticket['defendant']['email'] = defendant.details.email
if ticket.get('service'):
ticket['service'] = model_to_dict(Service.objects.get(id=ticket['service']))
if ticket.get('treatedBy'):
ticket['treatedBy'] = User.objects.get(id=ticket['treatedBy']).username
if ticket.get('tags'):
tags = Ticket.objects.get(id=ticket['id']).tags.all()
ticket['tags'] = [model_to_dict(tag) for tag in tags]
ticket['commentsCount'] = TicketComment.objects.filter(ticket=ticket['id']).count()
ticket['starredByMe'] = StarredTicket.objects.filter(
ticket_id=ticket['id'],
user=user
).exists()
def get_ticket_attachments(ticket_id):
"""
Get ticket attachments..
"""
try:
ticket = Ticket.objects.get(id=ticket_id)
except (IndexError, ObjectDoesNotExist, ValueError):
raise NotFound('Ticket not found')
ticket_reports_id = ticket.reportTicket.all().values_list(
'id',
flat=True
).distinct()
attachments = AttachedDocument.objects.filter(report__id__in=ticket_reports_id).distinct()
attachments = list(attachments)
attachments.extend(ticket.attachments.all())
attachments = list(set(attachments))
attachments = [model_to_dict(attach) for attach in attachments]
return attachments
def get_jobs_status(ticket_id):
"""
Get actions todo status
"""
try:
ticket = Ticket.objects.get(id=ticket_id)
except (ObjectDoesNotExist, ValueError):
raise NotFound('Ticket not found')
resp = []
jobs = ticket.jobs.all().order_by('creationDate')
for job in jobs:
info = model_to_dict(job)
if info.get('action'):
info['action'] = model_to_dict(ServiceAction.objects.get(id=info['action']))
resp.append(info)
return resp
def update(news_id, body, user):
""" Update news
"""
try:
if user.is_superuser:
news = News.objects.get(id=news_id)
else:
news = News.objects.get(id=news_id, author__id=user.id)
except (ObjectDoesNotExist, ValueError):
return NotFound('News not found')
try:
body = {k: v for k, v in body.iteritems() if k not in ['author', 'date', 'tags']}
News.objects.filter(pk=news.pk).update(**body)
news = News.objects.get(pk=news.pk)
except (KeyError, FieldError, IntegrityError):
raise BadRequest('Invalid fields in body')
return model_to_dict(news)
def index(**kwargs):
""" Main endpoint, get all templates
"""
filters = {}
if kwargs.get('filters'):
try:
filters = json.loads(unquote(unquote(kwargs['filters'])))
except (ValueError, SyntaxError, TypeError) as ex:
raise BadRequest(str(ex.message))
try:
where = generate_request_filter(filters)
except (AttributeError, KeyError, IndexError, FieldError,
SyntaxError, TypeError, ValueError) as ex:
raise BadRequest(str(ex.message))
try:
templates = MailTemplate.objects.filter(where).order_by('name')
except (AttributeError, KeyError, IndexError, FieldError,
SyntaxError, TypeError, ValueError) as ex:
raise BadRequest(str(ex.message))
return [model_to_dict(t) for t in templates]
def create(body):
""" Create provider
"""
if 'email' not in body:
raise BadRequest('Email field required')
if len(Provider.objects.filter(email=body['email'])) > 1:
raise BadRequest('Provider already exists')
try:
cat = None
if body.get('defaultCategory'):
cat = Category.objects.get(name=body['defaultCategory'])
body.pop('defaultCategory', None)
body = {k: v for k, v in body.iteritems() if k in PROVIDER_FIELDS}
provider = Provider.objects.create(defaultCategory=cat, **body)
return model_to_dict(provider)
except (FieldError, IntegrityError, ObjectDoesNotExist) as ex:
raise BadRequest(str(ex.message))
def update(prov, body):
""" Update provider infos
"""
try:
provider = Provider.objects.get(email=prov)
except (ObjectDoesNotExist, ValueError):
raise NotFound('Provider does not exist')
try:
body = {k: v for k, v in body.iteritems() if k in PROVIDER_FIELDS}
cat = None
if body.get('defaultCategory'):
cat = Category.objects.get(name=body['defaultCategory'])
body.pop('defaultCategory', None)
Provider.objects.filter(pk=provider.pk).update(defaultCategory=cat, **body)
provider = Provider.objects.get(pk=provider.pk)
except (FieldError, IntegrityError, ObjectDoesNotExist) as ex:
raise BadRequest(str(ex.message))
return model_to_dict(provider)
def add_tag(provider_email, body):
""" Add provider tag
"""
try:
tag = Tag.objects.get(**body)
provider = Provider.objects.get(email=provider_email)
if provider.__class__.__name__ != tag.tagType:
raise BadRequest('Invalid tag for provider')
provider.tags.add(tag)
provider.save()
except (KeyError, FieldError, IntegrityError, ObjectDoesNotExist, ValueError):
raise NotFound('Provider or tag not found')
return model_to_dict(provider)
def remove_tag(provider_email, tag_id):
""" Remove defendant tag
"""
try:
tag = Tag.objects.get(id=tag_id)
provider = Provider.objects.get(email=provider_email)
if provider.__class__.__name__ != tag.tagType:
raise BadRequest('Invalid tag for provider')
provider.tags.remove(tag)
provider.save()
except (ObjectDoesNotExist, FieldError, IntegrityError, ValueError):
raise NotFound('Provider or tag not found')
return model_to_dict(provider)
def __format_report_response(reports):
""" Convert datetime object and add flat foreign key
"""
for rep in reports:
# Flat foreign models
if rep.get('defendant'):
defendant = Defendant.objects.get(id=rep['defendant'])
rep['defendant'] = model_to_dict(defendant)
rep['defendant']['email'] = defendant.details.email
if rep.get('plaintiff'):
rep['plaintiff'] = model_to_dict(Plaintiff.objects.get(id=rep['plaintiff']))
if rep.get('service'):
rep['service'] = model_to_dict(Service.objects.get(id=rep['service']))
if rep.get('provider'):
rep['provider'] = ProvidersController.show(rep['provider'])
if rep.get('tags'):
tags = Report.objects.get(id=rep['id']).tags.all()
rep['tags'] = [model_to_dict(tag) for tag in tags]
def post(self, request, *args, **kwargs):
user = request.user
team_info = json.loads(request.body.decode("utf-8"))
validator = Validator(team_schema)
if not validator.validate(team_info):
return JsonResponse({'error': validator.errors})
survey_send_time = parser.parse(team_info['send_time']).replace(second=0, microsecond=0)
summary_send_time = parser.parse(team_info['summary_time']).replace(second=0, microsecond=0)
rule = recurrence.Rule(recurrence.WEEKLY, byday=team_info['days_of_week'])
now = datetime.datetime.now()
exdates = []
if survey_send_time < now:
exdates.append(now.replace(hour=0, minute=0, second=0, microsecond=0))
rec = recurrence.Recurrence(rrules=[rule], exdates=exdates)
try:
team = Team.objects.create(admin=user, name=team_info['name'])
except IntegrityError:
return JsonResponse({'error': {"name": _("team with this name already exists")}})
Report.objects.create(team=team, recurrences=rec, survey_send_time=survey_send_time,
summary_send_time=summary_send_time)
team_dict = model_to_dict(team, exclude=['users'])
team_dict['report'] = self.report_dict(team)
return JsonResponse({'team': team_dict})
def receive_product_viewed(sender, product, user, request, response, **kwargs):
"""
:param sender:
:param product:
:param user:
:param request:
:param response:
:param kwargs:
:return:
"""
qs = filer_hookevent(product, 1)
if qs:
u = dict(user=user.username)
if hasattr(user, 'email'):
u.update(email=user.email)
data = {
"user": u,
"product": model_to_dict(product)
}
run_hook_tasks_job(qs, data)
def receive_basket_addition(sender, request, product, user, **kwargs):
"""
Raised when a product is added to a basket.
:param sender:
:param request:
:param product:
:param user:
:param kwargs:
:return:
"""
qs = filer_hookevent(product, 4)
if qs:
data = {
"user": dict(user=user.username, email=user.email),
"product": model_to_dict(product)
}
run_hook_tasks_job(qs, data)
def receive_order_placed(sender, order, user, **kwargs):
"""
:param sender:
:param order:
:param user:
:param kwargs:
:return:
"""
for line in order.basket.lines.all():
product = line.product
qs = filer_hookevent(product, 9)
if qs:
data = {
"user": dict(user=user.username, email=user.email),
"order": order.number,
"product": model_to_dict(product),
"price_excl_tax": line.price_excl_tax,
"price_incl_tax": line.price_incl_tax,
"quantity": line.quantity
}
run_hook_tasks_job(qs, data)
def get_submission_formsets_initial(instance):
formset_initializers = [
('measure', lambda sf: sf.measures.filter(category='6.1')),
('routinemeasure', lambda sf: sf.measures.filter(category='6.2')),
('nontesteduseddrug', lambda sf: sf.nontesteduseddrug_set.all()),
('participatingcenternonsubject', lambda sf: sf.participatingcenternonsubject_set.all()),
('foreignparticipatingcenter', lambda sf: sf.foreignparticipatingcenter_set.all()),
('investigator', lambda sf: sf.investigators.all()),
]
formsets = {}
for name, initial in formset_initializers:
formsets[name] = [
model_to_dict(obj, exclude=('id',))
for obj in initial(instance).order_by('id')
]
initial = []
if instance:
for index, investigator in enumerate(instance.investigators.order_by('id')):
for employee in investigator.employees.order_by('id'):
employee_dict = model_to_dict(employee, exclude=('id', 'investigator'))
employee_dict['investigator_index'] = index
initial.append(employee_dict)
formsets['investigatoremployee'] = initial
return formsets
def to_dict(self):
d = model_to_dict(self)
tastyjson = self._serializer.to_json(d)
d = self._serializer.from_json(tastyjson)
d[DOC_TYPE_FIELD_NAME] = self.get_doc_type()
d['id'] = self.get_id()
if 'cbnosync_ptr' in d: del d['cbnosync_ptr']
if 'csrfmiddlewaretoken' in d: del d['csrfmiddlewaretoken']
for field in self._meta.fields:
if isinstance(field, DateTimeField):
d[field.name] = self._string_from_date(field.name)
if isinstance(field, ListField):
if isinstance(field.item_field, EmbeddedModelField):
self.to_dict_nested_list(field.name, d)
if isinstance(field.item_field, ModelReferenceField):
self.to_dict_reference_list(field.name, d)
if isinstance(field, EmbeddedModelField):
self.to_dict_nested(field.name, d)
if isinstance(field, ModelReferenceField):
self.to_dict_reference(field.name, d)
return d
def post(self, request):
"""
Handling POST method
:param json file with username and password
:return: HttpResponse with superuser status and code 200 if user is invited or
HttpResponseBadRequest if request contain incorrect data
"""
data = json.loads(request.body)
login_form = LoginForm(data)
if not login_form.is_valid():
return HttpResponseBadRequest('Invalid input data', status=401)
username = data.get('username', None)
password = data.get('password', None)
user = auth.authenticate(username=username, password=password)
if user:
role = model_to_dict(User.objects.get(username=username))
response = HttpResponse('is_supeuser', status=200)
response.set_cookie('role', value=role['is_superuser'])
auth.login(request, user)
return response
else:
return HttpResponseBadRequest("Incorrect email or password", status=401)
def get(self, request, template_id=None):
"""Handling GET method.
Args:
request: Request to View.
template_id: id of retrieved template.
Returns:
if template_id is None returns all templates.
otherwise returns single template by given template_id.
"""
if not template_id:
xml_templates = XmlTemplate.get_all()
data = [model_to_dict(i) for i in xml_templates]
return HttpResponse(json.dumps(data))
xml_template = XmlTemplate.get_by_id(template_id)
data = model_to_dict(xml_template)
return HttpResponse(json.dumps(data))
def get(self, request, company_id):
"""
Handling GET method.
:args
request: Request to View.
company_id: id of company to be returned.
:return: HttpResponse with company fields and values by id.
If user is not superuser and tries to get acces into foreign company
returns HttpResponseBadRequest with 'Permission denied' massage.
"""
company_id = int(company_id)
if (not request.user.is_superuser) and (company_id !=
request.user.adviseruser.id_company.id):
return HttpResponseBadRequest("Permission denied")
data = {"company" : model_to_dict(Company.get_company(company_id)),
"users" : Company.get_company(company_id).get_users()}
return HttpResponse(json.dumps(data))
def test_initialization(self):
EXPECTED = {
'id': self.flow_cell.pk,
'demux_operator': None,
'owner': self.user.pk,
'description': 'Description',
'sequencing_machine': self.machine.pk,
'num_lanes': 8,
'status': models.FLOWCELL_STATUS_SEQ_COMPLETE,
'operator': 'John Doe',
'is_paired': True,
'index_read_count': 1,
'rta_version': models.RTA_VERSION_V2,
'read_length': 151,
'label': 'LABEL',
'run_date': datetime.date(2016, 3, 3),
'run_number': 815,
'slot': 'A',
'status': 'seq_complete',
'vendor_id': 'BCDEFGHIXX',
}
self.assertEqual(model_to_dict(self.flow_cell), EXPECTED)
def test_render(self):
"""Test that the flow cell delete POST works"""
# Check precondition
self.assertEqual(FlowCell.objects.all().count(), 1)
# Simulate the POST
with self.login(self.user):
response = self.client.post(
reverse('flowcell_delete', kwargs={'pk': self.flow_cell.pk}))
# Check resulting database state
self.assertEqual(FlowCell.objects.all().count(), 0)
# Check call to sending emails
self.email_mock.assert_called_once_with(self.user, ANY)
m1 = model_to_dict(self.arg_flowcell)
del m1['id']
m2 = model_to_dict(self.flow_cell)
del m2['id']
self.assertEqual(m1, m2)
# Check resulting response
with self.login(self.user):
self.assertRedirects(
response, reverse('flowcell_list'))
def api_deploys(request):
deploy = Deploy.objects.all().order_by('-created_at')[:10]
deploy_list = list(deploy.values())
deploy_list_new = []
for i in deploy_list:
host_data = model_to_dict(Host.objects.get(id=i["host_id"]))
project_data = model_to_dict(Project.objects.get(id=i["project_id"]))
user_data = model_to_dict(User.objects.get(id=i["user_id"]), fields=["created_at", "email", "id", "username"])
i["host"] = host_data
i["user"] = user_data
i["project"] = project_data
deploy_list_new.append(i)
deploy_data = dict(rc=0, data=dict(deploys=deploy_list_new, count=deploy.count()))
return JsonResponse(deploy_data)
def add_view(self, request, form_url='', extra_context=None):
# Prepopulate new configuration entries with the value of the current config, if given:
if 'source' in request.GET:
get = request.GET.copy()
source_id = int(get.pop('source')[0])
source = get_object_or_404(self.model, pk=source_id)
source_dict = models.model_to_dict(source)
for field_name, field_value in source_dict.items():
# read files into request.FILES, if:
# * user hasn't ticked the "clear" checkbox
# * user hasn't uploaded a new file
if field_value and isinstance(field_value, File):
clear_checkbox_name = '{0}-clear'.format(field_name)
if request.POST.get(clear_checkbox_name) != 'on':
request.FILES.setdefault(field_name, field_value)
get[field_name] = field_value
request.GET = get
# Call our grandparent's add_view, skipping the parent code
# because the parent code has a different way to prepopulate new configuration entries
# with the value of the latest config, which doesn't make sense for keyed models.
# pylint: disable=bad-super-call
return super(ConfigurationModelAdmin, self).add_view(request, form_url, extra_context)
def get_changeform_initial_data(self, request):
"""Sets the initial state of the Create FOIA form"""
initial_data = super(FoiaAdmin, self).get_changeform_initial_data(request)
initial_data['reporter'] = request.user.pk
try:
if request.user.specialperson.default_project:
initial_data['tags'] = [request.user.specialperson.default_project]
except SpecialPerson.DoesNotExist:
pass
if 'duplicatefoia' in request.GET:
id_of_foia_to_dupe = request.GET['duplicatefoia']
foia_to_dupe = model_to_dict(Foia.objects.get(pk=id_of_foia_to_dupe))
attributes_to_dupe = {
'filed_date': foia_to_dupe['filed_date'],
'is_state_foia': foia_to_dupe['is_state_foia'],
'request_subject': foia_to_dupe['request_subject'],
'request_notes': foia_to_dupe['request_notes']
}
initial_data.update(attributes_to_dupe)
# nothing is done with this right now.
# eventually I'd like to keep track of this.
initial_data["duplicate_of"] = id_of_foia_to_dupe
return initial_data
def add_view(self, request, form_url='', extra_context=None):
# Prepopulate new configuration entries with the value of the current config, if given:
if 'source' in request.GET:
get = request.GET.copy()
source_id = int(get.pop('source')[0])
source = get_object_or_404(self.model, pk=source_id)
source_dict = models.model_to_dict(source)
for field_name, field_value in source_dict.items():
# read files into request.FILES, if:
# * user hasn't ticked the "clear" checkbox
# * user hasn't uploaded a new file
if field_value and isinstance(field_value, File):
clear_checkbox_name = '{0}-clear'.format(field_name)
if request.POST.get(clear_checkbox_name) != 'on':
request.FILES.setdefault(field_name, field_value)
get[field_name] = field_value
request.GET = get
# Call our grandparent's add_view, skipping the parent code
# because the parent code has a different way to prepopulate new configuration entries
# with the value of the latest config, which doesn't make sense for keyed models.
# pylint: disable=bad-super-call
return super(ConfigurationModelAdmin, self).add_view(request, form_url, extra_context)
def form_args(self, bundle):
data = bundle.data
# Ensure we get a bound Form, regardless of the state of the bundle.
if data is None:
data = {}
kwargs = {'data': {}}
if hasattr(bundle.obj, 'pk'):
if issubclass(self.form_class, ModelForm):
kwargs['instance'] = bundle.obj
kwargs['data'] = model_to_dict(bundle.obj)
kwargs['data'].update(data)
return kwargs
def _copy(self, id):
from django.forms import model_to_dict
instance = BKPPic.objects.get(id=id)
kwargs = model_to_dict(instance, exclude=['id', 'status'])
new_instance = BKPPic.objects.create(**kwargs)
# copy targets
targets = BKPTarget.objects.filter(bkp_pic_id=id).all()
for t in targets:
t.id = None
t.bkp_pic_id = new_instance.id
t.save()
return ('msg', '????')