def snapshot_to_json(self):
now = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
## don't include trailing slash in path -- it's in fullpath
## server (change parent dir to fixtures, add that path to Django settings and just leave dated subdirectory)
path = '/home/ubuntu/mccelections/electionsproject/snapshots/%s' % (election_date_string)
filename = '%s' % (now)
fullpath = '%s/%s.json' % (path, filename)
snapshotresults = serializers.serialize('json', Result.objects.filter(election_date=election_date_string))
f = open(fullpath, 'w')
f.write(snapshotresults)
f.close()
python类serialize()的实例源码
def create_resource_type_simple_patterned_ajax(request):
#import pdb; pdb.set_trace()
form = EconomicResourceTypeAjaxForm(request.POST, request.FILES)
if form.is_valid():
data = form.cleaned_data
rt = form.save(commit=False)
rt.created_by=request.user
rt.save()
slot = request.POST["slot"]
pattern_id = request.POST["pattern"]
pattern = ProcessPattern.objects.get(id=pattern_id)
formset = create_patterned_facet_formset(pattern, slot, data=request.POST)
for form_rtfv in formset.forms:
if form_rtfv.is_valid():
data_rtfv = form_rtfv.cleaned_data
fv = FacetValue.objects.get(id=data_rtfv["value"])
if fv:
rtfv = ResourceTypeFacetValue()
rtfv.resource_type = rt
rtfv.facet_value = fv
rtfv.save()
return_data = serializers.serialize("json", EconomicResourceType.objects.filter(id=rt.id), fields=('id','name',))
return HttpResponse(return_data, content_type="text/json-comment-filtered")
else:
return HttpResponse(form.errors, content_type="text/json-comment-filtered")
def create_value_equation_bucket_rule(request, bucket_id):
veb = get_object_or_404(ValueEquationBucket, id=bucket_id)
ve = veb.value_equation
if request.method == "POST":
#import pdb; pdb.set_trace()
vebr_form = ValueEquationBucketRuleForm(prefix=str(bucket_id), data=request.POST)
if vebr_form.is_valid():
vebr = vebr_form.save(commit=False)
vebr.value_equation_bucket = veb
vebr.created_by = request.user
filter_form = BucketRuleFilterSetForm(context_agent=None, event_type=None, pattern=None, prefix=str(bucket_id), data=request.POST)
if filter_form.is_valid():
vebr.filter_rule = filter_form.serialize()
vebr.save()
return HttpResponseRedirect('/%s/%s/'
% ('accounting/edit-value-equation', ve.id))
def from_tweets(cls, tweets, metadata=None, **kwargs):
"""
:param tweets: a iterable of tweets
:param kwargs: extra attributes to be considered for inclusion. should be json serializable.
:return:
"""
tl = cls()
json_tweets = json.loads(serializers.serialize("json", tweets));
for key, values in kwargs.items():
if len(values) != len(json_tweets):
continue
for tweet, value in zip(json_tweets, values):
tweet['fields'][key] = value
tl.save()
json_repr = {'metadata': metadata, 'tweets': json_tweets, 'pk': tl.pk, 'created_at': tl.datetime.isoformat()}
tl.json = json.dumps(json_repr)
tl.save()
return tl
def serialize_subs(subs, filepath, *other_objects):
applicants = [sub.applicant for sub in subs]
visitors = [applicant.visitor for applicant in applicants]
applications = models.Application.objects.filter(
form_submission__in=subs)
status_updates = []
status_notifications = []
status_updates = models.StatusUpdate.objects.filter(
application__in=applications)
status_notifications = models.StatusNotification.objects.filter(
status_update__in=status_updates)
with open(filepath, 'w') as f:
data = [*visitors, *applicants, *subs, *applications,
*status_updates, *status_notifications]
for object_set in other_objects:
data.extend(object_set)
f.write(serializers.serialize(
'json', data, indent=2, use_natural_foreign_keys=True))
def handle(self, *args, **kwargs):
form_sub_ids_with_crud_events = set(
CRUDEvent.objects.filter(
content_type__model='formsubmission'
).values_list('object_id', flat=True))
subs_without_events = FormSubmission.objects.exclude(
id__in=form_sub_ids_with_crud_events)
for sub in subs_without_events:
CRUDEvent.objects.create(
event_type=CRUDEvent.CREATE,
object_repr=str(sub),
object_json_repr=serializers.serialize("json", [sub]),
content_type=ContentType.objects.get_for_model(sub),
object_id=sub.pk,
user=None,
datetime=timezone.now(),
user_pk_as_string=None
)
self.stdout.write("Created CRUDEvents for {} FormSubmissions".format(
len(subs_without_events)
))
def log_error(message, instance):
# serialising model instance
# https://stackoverflow.com/questions/757022/how-do-you-serialize-a-model-instance-in-django
data = serialize('json', [instance, ])
struct = json.loads(data)
try:
# replace long String64 function with useful info
f = instance.function
signature = str(inspect.signature(f))
if hasattr(f, 'args'):
signature += ', args='+str(f.args)
if hasattr(f, 'kwargs'):
signature += ', kwargs=' + str(f.kwargs)
struct[0]['fields']['function'] = str(f.__module__+"."+f.__name__ + signature)
except Exception:
pass
data = json.dumps(struct[0])
logger.error(message + ' ' + data)
def form_valid(self, form):
alert = models.Alert()
alert.location = form.cleaned_data['location']
if self.request.user.is_authenticated:
#save alert
alert.user = self.request.user
alert.save()
#create a success message
messages.add_message(self.request, messages.SUCCESS, success_message)
return redirect(reverse('alert-list'))
else:
self.request.session['new-alert'] = serialize('geojson', [alert],
fields=('location',))
return redirect(reverse('create-alert-user'))
def get_context_data(self, **kwargs):
dashboard_data = super(SiteDashboardView, self).get_context_data(**kwargs)
obj = Site.objects.get(pk=self.kwargs.get('pk'))
peoples_involved = obj.site_roles.filter(ended_at__isnull=True).distinct('user')
data = serialize('custom_geojson', [obj], geometry_field='location',
fields=('name', 'public_desc', 'additional_desc', 'address', 'location', 'phone', 'id'))
line_chart = LineChartGeneratorSite(obj)
line_chart_data = line_chart.data()
outstanding, flagged, approved, rejected = obj.get_site_submission()
dashboard_data = {
'obj': obj,
'peoples_involved': peoples_involved,
'outstanding': outstanding,
'flagged': flagged,
'approved': approved,
'rejected': rejected,
'data': data,
'cumulative_data': line_chart_data.values(),
'cumulative_labels': line_chart_data.keys(),
}
return dashboard_data
def retrieveResources(request):
response_data = {}
try:
R = Resource.objects.retrieveResources(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else :
response_data['success'] = '1'
global data
try:
data = serializers.serialize('json', R)
except Exception as e:
data = serializers.serialize('json', [ R, ])
response_data["resources"] = json.loads(data)
return JsonResponse(response_data)
def retrieveLatestAnnouncements(request):
response_data = {}
try:
A = Announcement.objects.retrieveLatestAnnouncements(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else :
response_data['success'] = '1'
global data
try:
data = serializers.serialize('json', A)
except Exception as e:
data = serializers.serialize('json', [ A, ])
response_data["announcements"] = json.loads(data)
return JsonResponse(response_data)
def retrieveMoreAnnouncements(request):
response_data = {}
try:
A = Announcement.objects.retrieveMoreAnnouncements(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else :
response_data['success'] = '1'
global data
try:
data = serializers.serialize('json', A)
except Exception as e:
data = serializers.serialize('json', [ A, ])
response_data["announcements"] = json.loads(data)
return JsonResponse(response_data)
def retrieveLatestNotices(request):
response_data = {}
try:
N = Notice.objects.retrieveLatestNotices(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else :
response_data['success'] = '1'
global data
try:
data = serializers.serialize('json', N)
except Exception as e:
data = serializers.serialize('json', [ N, ])
response_data["notices"] = json.loads(data)
return JsonResponse(response_data)
def retrieveMoreNotices(request):
response_data = {}
try:
N = Notice.objects.retrieveMoreNotices(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else :
response_data['success'] = '1'
global data
try:
data = serializers.serialize('json', N)
except Exception as e:
data = serializers.serialize('json', [ N, ])
response_data["notices"] = json.loads(data)
return JsonResponse(response_data)
def retrieveMoreEvents(request):
response_data = {}
try:
E = Event.objects.retrieveMoreEvents(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else :
response_data['success'] = '1'
global data
try:
data = serializers.serialize('json', E)
except Exception as e:
data = serializers.serialize('json', [ E, ])
response_data["events"] = json.loads(data)
return JsonResponse(response_data)
def getEventById(request):
response_data = {}
try:
E = Event.objects.getEventById(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else :
response_data['success'] = '1'
global data
try:
data = serializers.serialize('json', E)
except Exception as e:
data = serializers.serialize('json', [ E, ])
response_data["events"] = json.loads(data)
return JsonResponse(response_data)
def retrieveMoreNews(request):
response_data = {}
try:
N = News.objects.retrieveMoreNews(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else :
response_data['success'] = '1'
global data
try:
data = serializers.serialize('json', N)
except Exception as e:
data = serializers.serialize('json', [ N, ])
response_data["news"] = json.loads(data)
return JsonResponse(response_data)
def getNewsById(request):
response_data = {}
try:
N = News.objects.getNewsById(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else :
response_data['success'] = '1'
global data
try:
data = serializers.serialize('json', N)
except Exception as e:
data = serializers.serialize('json', [ N, ])
response_data["news"] = json.loads(data)
return JsonResponse(response_data)
def retrieveGroups(request):
response_data = {}
try:
G = Group.objects.retrieveGroups(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else :
response_data['success'] = '1'
global data
try:
data = serializers.serialize('json', G)
except Exception as e:
data = serializers.serialize('json', [ G, ])
response_data["groups"] = json.loads(data)
return JsonResponse(response_data)
def retrieveBatches(request):
response_data = {}
try:
B = Batch.objects.retrieveBatches(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else :
response_data['success'] = '1'
try:
data = serializers.serialize('json', B)
except Exception as e:
data = serializers.serialize('json', [ B, ])
response_data["batches"] = json.loads(data)
return JsonResponse(response_data)
def retrieveCourseCurriculum(request):
response_data = {}
try:
C = CourseCurriculum.objects.retrieveCourseCurriculum(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else:
response_data['success'] = '1'
global data
try:
data = serializers.serialize('json', C)
except Exception as e:
data = serializers.serialize('json', [C, ])
response_data["coursecurriculum"] = json.loads(data)
return JsonResponse(response_data)
def retrieveCourseGroups(request):
response_data = {}
try:
C = CourseGroup.objects.retrieveCourseGroups(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else :
response_data['success'] = '1'
global data
try:
data = serializers.serialize('json', C)
except Exception as e:
data = serializers.serialize('json', [ C, ])
response_data["coursegroups"] = json.loads(data)
return JsonResponse(response_data)
def addCourse(request):
""""
request : +courseId, +courseName, +courseType, +credits, +sessMaxMarks, +endMaxSemMarks, +maxMarks, +minPassingMarks, +semester, +degreeCode, +degreeType, +branchCode
"""
response_data = {}
try:
C = Course.objects.addCourse(request.POST)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else:
response_data['success'] = '1'
data = serializers.serialize('json', [C, ])
response_data["course"] = json.loads(data)
# data = serializers.serialize('json', [ C, ])
# print(data)
# return HttpResponse(data, content_type='application/json')
return JsonResponse(response_data)
def getCourseById(request):
"""
request : +courseId
"""
response_data = {}
try:
C = Course.objects.getCourseById(request.GET)
except Exception as e:
response_data["success"] = 0
response_data['exception'] = str(e)
else:
response_data["success"] = 1
data = serializers.serialize('json', [C, ])
response_data["course"] = json.loads(data)
return JsonResponse(response_data)
# data = serializers.serialize('json', [C, ])
# return HttpResponse(data, content_type='application/json')
def retrieveDegrees(request):
response_data = {}
try:
D = Degree.objects.retrieveDegrees(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else :
response_data['success'] = '1'
try:
data = serializers.serialize('json', D)
except Exception as e:
data = serializers.serialize('json', [ D, ])
response_data["degrees"] = json.loads(data)
return JsonResponse(response_data)
def retrieveDepartments(request):
response_data = {}
try:
D = Department.objects.retrieveDepartments(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else :
response_data['success'] = '1'
global data
try:
data = serializers.serialize('json', D)
except Exception as e:
data = serializers.serialize('json', [ D, ])
response_data["departments"] = json.loads(data)
return JsonResponse(response_data)
def retrieveProjects(request):
response_data = {}
try:
P = Project.objects.retrieveProjects(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else:
response_data['success'] = '1'
global data
try:
data = serializers.serialize('json', P)
except Exception as e:
data = serializers.serialize('json', [P, ])
response_data["projects"] = json.loads(data)
return JsonResponse(response_data)
def addProject(request):
response_data = {}
try:
#startDate = datetime.strptime(request.POST['startDate'][:-27], '%a %b %d %Y %H:%M:%S %Z')
#request.POST['startDate'] = str(startDate.year) + "-" + str(startDate.month) + "-" + str(startDate.day)
#endDate = datetime.strptime(request.POST['endDate'][:-27], '%a %b %d %Y %H:%M:%S %Z')
#request.POST['endDate'] = str(endDate.year) + "-" + str(endDate.month) + "-" + str(endDate.day)
print(request.POST['startDate'])
print(request.POST['endDate'])
P = Project.objects.addProject(request.POST)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else :
response_data['success'] = '1'
data = serializers.serialize('json', [ P, ])
response_data["student"] = json.loads(data)
return JsonResponse(response_data)
def editProject(request):
response_data = {}
try:
#startDate = datetime.strptime(request.POST['startDate'][:-27], '%a %b %d %Y %H:%M:%S %Z')
#request.POST['startDate'] = str(startDate.year) + "-" + str(startDate.month) + "-" + str(startDate.day)
#endDate = datetime.strptime(request.POST['endDate'][:-27], '%a %b %d %Y %H:%M:%S %Z')
#request.POST['endDate'] = str(endDate.year) + "-" + str(endDate.month) + "-" + str(endDate.day)
print(request.POST['startDate'])
print(request.POST['endDate'])
P = Project.objects.editProject(request.POST)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else :
response_data['success'] = '1'
data = serializers.serialize('json', [ P, ])
response_data["student"] = json.loads(data)
return JsonResponse(response_data)
def retrieveFields(request):
response_data = {}
try:
P = Field.objects.retrieveFields(request.GET)
except Exception as e:
response_data['success'] = '0'
response_data['exception'] = str(e)
else:
response_data['success'] = '1'
global data
try:
data = serializers.serialize('json', P)
except Exception as e:
data = serializers.serialize('json', [P, ])
response_data["fields"] = json.loads(data)
return JsonResponse(response_data)