def generate_attachments_zip_export(
export_type, extension, username, id_string, export_id=None,
filter_query=None):
xform = XForm.objects.get(user__username=username, id_string=id_string)
attachments = Attachment.objects.filter(instance__xform=xform)
basename = "%s_%s" % (id_string,
datetime.now().strftime("%Y_%m_%d_%H_%M_%S"))
filename = basename + "." + extension
file_path = os.path.join(
username,
'exports',
id_string,
export_type,
filename)
with NamedTemporaryFile('wb+', prefix='media_zip_export_', suffix='.zip') as temporary_file:
create_attachments_zipfile(attachments, temporary_file=temporary_file)
export_filename = get_storage_class()().save(
file_path,
File(temporary_file, file_path))
dir_name, basename = os.path.split(export_filename)
# get or create export object
if(export_id):
export = Export.objects.get(id=export_id)
else:
export = Export.objects.create(xform=xform, export_type=export_type)
export.filedir = dir_name
export.filename = basename
export.internal_status = Export.SUCCESSFUL
export.save()
return export
python类get_storage_class()的实例源码
def generate_kml_export(
export_type, extension, username, id_string, export_id=None,
filter_query=None):
user = User.objects.get(username=username)
xform = XForm.objects.get(user__username=username, id_string=id_string)
response = render_to_response(
'survey.kml', {'data': kml_export_data(id_string, user)})
basename = "%s_%s" % (id_string,
datetime.now().strftime("%Y_%m_%d_%H_%M_%S"))
filename = basename + "." + extension
file_path = os.path.join(
username,
'exports',
id_string,
export_type,
filename)
storage = get_storage_class()()
temp_file = NamedTemporaryFile(suffix=extension)
temp_file.write(response.content)
temp_file.seek(0)
export_filename = storage.save(
file_path,
File(temp_file, file_path))
temp_file.close()
dir_name, basename = os.path.split(export_filename)
# get or create export object
if(export_id):
export = Export.objects.get(id=export_id)
else:
export = Export.objects.create(xform=xform, export_type=export_type)
export.filedir = dir_name
export.filename = basename
export.internal_status = Export.SUCCESSFUL
export.save()
return export
def image_urls(instance):
default_storage = get_storage_class()()
urls = []
suffix = settings.THUMB_CONF['medium']['suffix']
for a in instance.attachments.all():
if default_storage.exists(get_path(a.media_file.name, suffix)):
url = default_storage.url(
get_path(a.media_file.name, suffix))
else:
url = a.media_file.url
urls.append(url)
return urls
def response_with_mimetype_and_name(
mimetype, name, extension=None, show_date=True, file_path=None,
use_local_filesystem=False, full_mime=False):
if extension is None:
extension = mimetype
if not full_mime:
mimetype = "application/%s" % mimetype
if file_path:
try:
if not use_local_filesystem:
default_storage = get_storage_class()()
wrapper = FileWrapper(default_storage.open(file_path))
response = StreamingHttpResponse(wrapper,
content_type=mimetype)
response['Content-Length'] = default_storage.size(file_path)
else:
wrapper = FileWrapper(open(file_path))
response = StreamingHttpResponse(wrapper,
content_type=mimetype)
response['Content-Length'] = os.path.getsize(file_path)
except IOError:
response = HttpResponseNotFound(
_(u"The requested file could not be found."))
else:
response = HttpResponse(content_type=mimetype)
response['Content-Disposition'] = disposition_ext_and_date(
name, extension, show_date)
return response
def resize(filename):
default_storage = get_storage_class()()
path = default_storage.url(filename)
req = requests.get(path)
if req.status_code == 200:
im = StringIO(req.content)
image = Image.open(im)
conf = settings.THUMB_CONF
[_save_thumbnails(
image, filename,
conf[key]['size'],
conf[key]['suffix']) for key in settings.THUMB_ORDER]
def resize_local_env(filename):
default_storage = get_storage_class()()
path = default_storage.path(filename)
image = Image.open(path)
conf = settings.THUMB_CONF
[_save_thumbnails(
image, path, conf[key]['size'],
conf[key]['suffix']) for key in settings.THUMB_ORDER]
def image_url(attachment, suffix):
'''Return url of an image given size(@param suffix)
e.g large, medium, small, or generate required thumbnail
'''
url = attachment.media_file.url
if suffix == 'original':
return url
else:
default_storage = get_storage_class()()
fs = get_storage_class('django.core.files.storage.FileSystemStorage')()
if suffix in settings.THUMB_CONF:
size = settings.THUMB_CONF[suffix]['suffix']
filename = attachment.media_file.name
if default_storage.exists(filename):
if default_storage.exists(get_path(filename, size)) and\
default_storage.size(get_path(filename, size)) > 0:
url = default_storage.url(
get_path(filename, size))
else:
if default_storage.__class__ != fs.__class__:
resize(filename)
else:
resize_local_env(filename)
return image_url(attachment, suffix)
else:
return None
return url
def _setup(self):
self._wrapped = get_storage_class(settings.STATICFILES_STORAGE)()
def _setup(self):
self._wrapped = get_storage_class(settings.STATICFILES_STORAGE)()
def _setup(self):
self._wrapped = get_storage_class(settings.STATICFILES_STORAGE)()
def _setup(self):
self._wrapped = get_storage_class(settings.STATICFILES_STORAGE)()
def _setup(self):
self._wrapped = get_storage_class(settings.STATICFILES_STORAGE)()
def _setup(self):
self._wrapped = get_storage_class(settings.STATICFILES_STORAGE)()
def _setup(self):
self._wrapped = get_storage_class(settings.STATICFILES_STORAGE)()
def _setup(self):
self._wrapped = get_storage_class(settings.STATICFILES_STORAGE)()
def _setup(self):
self._wrapped = get_storage_class(getattr(settings, 'STATICFILES_STORAGE', default_storage))()
def _setup(self):
self._wrapped = get_storage_class(
settings.THUMBNAIL_DEFAULT_STORAGE)()
def _setup(self):
self._wrapped = get_storage_class('compressor.storage.GzipCompressorFileStorage')()
def _setup(self):
self._wrapped = get_storage_class(settings.COMPRESS_STORAGE)()
def _setup(self):
self._wrapped = get_storage_class(settings.STATICFILES_STORAGE)()