def get(self, request, index_name, secret):
if request.method.lower() == 'head':
raise Http404
if secret != conf.SECRET:
raise Http404
if index_name not in ALL_INDEXES:
raise Http404
cls = ALL_INDEXES[index_name]
if not issubclass(cls, SphinxXMLIndex):
raise Http404
index = cls()
return StreamingHttpResponse(iter(index), content_type='application/xml')
python类StreamingHttpResponse()的实例源码
def download_csv(self, request, queryset):
pseudo_buffer = Echo()
writer = csv.writer(pseudo_buffer)
report_header = ["E-mail"]
report_rows = [report_header]
for user in queryset:
if TungaUser.objects.filter(email=user.email).count() == 0:
user_info = [
user.email
]
report_rows.append(user_info)
response = StreamingHttpResponse((writer.writerow(row) for row in report_rows), content_type="text/csv")
response['Content-Disposition'] = 'attachment; filename=tunga_email_visitors.csv'
return response
def serve_zip_file(submissions, zipname="zipfile"):
"""
Takes a list of submissions and generates a streaming response from them
"""
resp = StreamingHttpResponse(submissions_zip_generator(submissions), content_type="application/zip")
resp["Content-Disposition"] = "attachment; filename={zipname}.zip".format(zipname=zipname)
return resp
def process_view(self, request, view_func, view_args, view_kwargs):
"""
Check if response has information about regular pending task.
Loop and check for task result if task_id of a PENDING task is found.
"""
# Ignore socket.io requests
if not request.path.startswith('/api/'):
return None
response = view_func(request, *view_args, **view_kwargs)
# Valid task response is always a TaskResponse objects
if not isinstance(response, TaskResponse):
return response
# api.task.views.task_status should immediately show task status
if response.task_status:
return response
# Only if task/status is PENDING
if response.status_code != HTTP_201_CREATED:
return response
# We need the task_id
# noinspection PyBroadException
try:
task_id = response.data['task_id']
except:
return response
# This should never happen (Dummy task has it's own Response class)
if is_dummy_task(task_id):
return response
# Use streaming only if client is es or es compatible
stream = request.META.get('HTTP_ES_STREAM', None)
if stream:
# Let's render the pending response as it sets some headers (Content-type)
pending_response = response.rendered_content
# Switch to streaming response
stream_res = StreamingHttpResponse(task_status_loop(request, pending_response, task_id, stream=stream),
status=HTTP_201_CREATED)
# Copy headers
# noinspection PyProtectedMember
stream_res._headers = response._headers
# Set custom es_stream header => es will process the stream correctly
stream_res['es_stream'] = bool(stream)
stream_res['es_task_id'] = task_id
return stream_res
else:
return response
def form_valid(self, form):
start_date = form.cleaned_data['start_date']
start_date = timezone.localtime(timezone.make_aware(timezone.datetime(
start_date.year, start_date.month, start_date.day
)))
end_date = form.cleaned_data['end_date']
end_date = timezone.localtime(timezone.make_aware(timezone.datetime(
end_date.year, end_date.month, end_date.day
)))
data_type = form.cleaned_data['data_type']
if data_type == ExportAsCsvForm.DATA_TYPE_DAY:
source_data = DayStatistics.objects.filter(
day__gte=start_date.date(), day__lte=end_date.date()
).order_by('day')
export_fields = [
'day', 'electricity1', 'electricity2', 'electricity1_returned',
'electricity2_returned', 'gas', 'electricity1_cost', 'electricity2_cost',
'gas_cost', 'total_cost'
]
else: # if data_type == ExportAsCsvForm.DATA_TYPE_HOUR:
source_data = HourStatistics.objects.filter(
hour_start__gte=start_date, hour_start__lte=end_date
).order_by('hour_start')
export_fields = [
'hour_start', 'electricity1', 'electricity2', 'electricity1_returned',
'electricity2_returned', 'gas'
]
# Direct copy from Django docs.
class Echo(object):
""" An object that implements just the write method of the file-like interface. """
def write(self, value):
""" Write the value by returning it, instead of storing in a buffer. """
return value
pseudo_buffer = Echo()
writer = csv.writer(pseudo_buffer)
response = StreamingHttpResponse(
(
self._generate_csv_row(writer, source_data, export_fields)
),
content_type='text/csv'
)
attachment_name = 'dsmrreader-data-export---{}__{}__{}.csv'.format(
data_type, start_date.date(), end_date.date()
)
response['Content-Disposition'] = 'attachment; filename="{}"'.format(attachment_name)
return response
def download_csv(self, request, queryset):
pseudo_buffer = Echo()
writer = csv.writer(pseudo_buffer)
filter_type = request.GET.get('type__exact', None)
report_header = [
"First Name", "Last Name", "E-mail", "Phone Number", "User Type",
"Company", "Country", "City", "Street", "Plot Number", "ZIP Code", "Postal Address",
"VAT Number"
]
if filter_type != str(USER_TYPE_DEVELOPER):
report_header.append("Company Reg. Number")
report_rows = [report_header]
for user in queryset:
phone_number = user.profile and user.profile.phone_number or ""
user_info = [
user.first_name and user.first_name.encode('utf-8') or '',
user.last_name and user.last_name.encode('utf-8') or '',
user.email,
"=\"%s\"" % phone_number,
user.display_type,
user.profile and user.profile.company or "",
user.profile and user.profile.country_name or "",
user.profile and user.profile.city_name or "",
user.profile and user.profile.street or "",
user.profile and user.profile.plot_number or "",
user.profile and user.profile.postal_code or "",
user.profile and user.profile.postal_address or "",
user.profile and user.profile.vat_number or "",
]
if filter_type != str(USER_TYPE_DEVELOPER):
user_info.append(user.profile and user.profile.company_reg_no or "")
report_rows.append(user_info)
file_suffix = "users"
if filter_type == str(USER_TYPE_DEVELOPER):
file_suffix = "developers"
elif filter_type == str(USER_TYPE_PROJECT_OWNER):
file_suffix = "clients"
response = StreamingHttpResponse((writer.writerow(row) for row in report_rows), content_type="text/csv")
response['Content-Disposition'] = 'attachment; filename=tunga_%s.csv' % file_suffix
return response