def backup_media():
extension = "tar.gz"
filename = utils.filename_generate(extension, content_type='media')
# Create tarball
media_storage = get_storage_class()()
outputfile = utils.create_spooled_temporary_file()
tar_file = tarfile.open(name=filename, fileobj=outputfile, mode='w:gz')
for media_filename in explore_storage(media_storage):
tarinfo = tarfile.TarInfo(media_filename)
media_file = media_storage.open(media_filename)
tarinfo.size = len(media_file)
tar_file.addfile(tarinfo, media_file)
# Close the TAR for writing
tar_file.close()
# Store backup
outputfile.seek(0)
return outputfile, filename
python类get_storage_class()的实例源码
def test_dl_no_xls(self):
"""
Exports are built from the JSON form structure so we dont need the
xls to generate an export
"""
self._create_xform()
self.xform.shared_data = True
self.xform.save()
default_storage = get_storage_class()()
path = self.xform.xls.name
self.assertEqual(default_storage.exists(path), True)
default_storage.delete(path)
self.assertEqual(default_storage.exists(path), False)
url = reverse('xls_export', kwargs={'username': self.user.username,
'id_string': self.xform.id_string})
response = self.anon.get(url)
self.assertEqual(response.status_code, 404)
def test_dotted_fields_csv_export_output(self):
path = os.path.join(os.path.dirname(__file__), 'fixtures', 'userone',
'userone_with_dot_name_fields.xls')
self._publish_xls_file_and_set_xform(path)
path = os.path.join(os.path.dirname(__file__), 'fixtures', 'userone',
'userone_with_dot_name_fields.xml')
self._make_submission(
path, forced_submission_time=self._submission_time)
# test csv
export = generate_export(Export.CSV_EXPORT, 'csv', self.user.username,
'userone')
storage = get_storage_class()()
self.assertTrue(storage.exists(export.filepath))
path, ext = os.path.splitext(export.filename)
self.assertEqual(ext, '.csv')
with open(os.path.join(
os.path.dirname(__file__), 'fixtures', 'userone',
'userone_with_dot_name_fields.csv')) as f1:
with storage.open(export.filepath) as f2:
expected_content = f1.read()
actual_content = f2.read()
self.assertEquals(actual_content, expected_content)
def get_media_file_response(metadata):
if metadata.data_file:
file_path = metadata.data_file.name
filename, extension = os.path.splitext(file_path.split('/')[-1])
extension = extension.strip('.')
dfs = get_storage_class()()
if dfs.exists(file_path):
response = response_with_mimetype_and_name(
metadata.data_file_type,
filename, extension=extension, show_date=False,
file_path=file_path, full_mime=True)
return response
else:
return HttpResponseNotFound()
else:
return HttpResponseRedirect(metadata.data_value)
def test_create_export(self):
self._publish_transportation_form_and_submit_instance()
storage = get_storage_class()()
# test xls
export = generate_export(Export.XLS_EXPORT, 'xls', self.user.username,
self.xform.id_string)
self.assertTrue(storage.exists(export.filepath))
path, ext = os.path.splitext(export.filename)
self.assertEqual(ext, '.xls')
# test csv
export = generate_export(Export.CSV_EXPORT, 'csv', self.user.username,
self.xform.id_string)
self.assertTrue(storage.exists(export.filepath))
path, ext = os.path.splitext(export.filename)
self.assertEqual(ext, '.csv')
# test xls with existing export_id
existing_export = Export.objects.create(xform=self.xform,
export_type=Export.XLS_EXPORT)
export = generate_export(Export.XLS_EXPORT, 'xls', self.user.username,
self.xform.id_string, existing_export.id)
self.assertEqual(existing_export.id, export.id)
def test_graceful_exit_on_export_delete_if_file_doesnt_exist(self):
self._publish_transportation_form()
self._submit_transport_instance()
export = generate_export(Export.XLS_EXPORT, 'xls', self.user.username,
self.xform.id_string)
storage = get_storage_class()()
# delete file
storage.delete(export.filepath)
self.assertFalse(storage.exists(export.filepath))
# clear filename, like it would be in an incomplete export
export.filename = None
export.filedir = None
export.save()
# delete export record, which should try to delete file as well
delete_url = reverse(delete_export, kwargs={
'username': self.user.username,
'id_string': self.xform.id_string,
'export_type': 'xls'
})
post_data = {'export_id': export.id}
response = self.client.post(delete_url, post_data)
self.assertEqual(response.status_code, 302)
def create_attachments_zipfile(attachments, temporary_file=None):
if not temporary_file:
temporary_file = NamedTemporaryFile()
storage = get_storage_class()()
with zipfile.ZipFile(temporary_file, 'w', zipfile.ZIP_STORED, allowZip64=True) as zip_file:
for attachment in attachments:
if storage.exists(attachment.media_file.name):
try:
with storage.open(attachment.media_file.name, 'rb') as source_file:
zip_file.writestr(attachment.media_file.name, source_file.read())
except Exception, e:
report_exception("Error adding file \"{}\" to archive.".format(attachment.media_file.name), e)
# Be kind; rewind.
temporary_file.seek(0)
return temporary_file
def test_dl_no_xls(self):
"""
Exports are built from the JSON form structure so we dont need the
xls to generate an export
"""
self._create_xform()
self.xform.shared_data = True
self.xform.save()
default_storage = get_storage_class()()
path = self.xform.xls.name
self.assertEqual(default_storage.exists(path), True)
default_storage.delete(path)
self.assertEqual(default_storage.exists(path), False)
url = reverse('xls_export', kwargs={'username': self.user.username,
'id_string': self.xform.id_string})
response = self.anon.get(url)
self.assertEqual(response.status_code, 404)
def test_csv_export_output(self):
path = os.path.join(self.fixture_dir, 'tutorial_w_repeats.xls')
self._publish_xls_file_and_set_xform(path)
path = os.path.join(self.fixture_dir, 'tutorial_w_repeats.xml')
self._make_submission(
path, forced_submission_time=self._submission_time)
# test csv
export = generate_export(Export.CSV_EXPORT, 'csv', self.user.username,
'tutorial_w_repeats')
storage = get_storage_class()()
self.assertTrue(storage.exists(export.filepath))
path, ext = os.path.splitext(export.filename)
self.assertEqual(ext, '.csv')
with open(os.path.join(
self.fixture_dir, 'tutorial_w_repeats.csv')) as f1:
with storage.open(export.filepath) as f2:
expected_content = f1.read()
actual_content = f2.read()
self.assertEquals(actual_content, expected_content)
def test_dotted_fields_csv_export_output(self):
path = os.path.join(os.path.dirname(__file__), 'fixtures', 'userone',
'userone_with_dot_name_fields.xls')
self._publish_xls_file_and_set_xform(path)
path = os.path.join(os.path.dirname(__file__), 'fixtures', 'userone',
'userone_with_dot_name_fields.xml')
self._make_submission(
path, forced_submission_time=self._submission_time)
# test csv
export = generate_export(Export.CSV_EXPORT, 'csv', self.user.username,
'userone')
storage = get_storage_class()()
self.assertTrue(storage.exists(export.filepath))
path, ext = os.path.splitext(export.filename)
self.assertEqual(ext, '.csv')
with open(os.path.join(
os.path.dirname(__file__), 'fixtures', 'userone',
'userone_with_dot_name_fields.csv')) as f1:
with storage.open(export.filepath) as f2:
expected_content = f1.read()
actual_content = f2.read()
self.assertEquals(actual_content, expected_content)
def get_media_file_response(metadata):
if metadata.data_file:
file_path = metadata.data_file.name
filename, extension = os.path.splitext(file_path.split('/')[-1])
extension = extension.strip('.')
dfs = get_storage_class()()
if dfs.exists(file_path):
response = response_with_mimetype_and_name(
metadata.data_file_type,
filename, extension=extension, show_date=False,
file_path=file_path, full_mime=True)
return response
else:
return HttpResponseNotFound()
else:
return HttpResponseRedirect(metadata.data_value)
def test_graceful_exit_on_export_delete_if_file_doesnt_exist(self):
self._publish_transportation_form()
self._submit_transport_instance()
export = generate_export(Export.XLS_EXPORT, 'xls', self.user.username,
self.xform.id_string)
storage = get_storage_class()()
# delete file
storage.delete(export.filepath)
self.assertFalse(storage.exists(export.filepath))
# clear filename, like it would be in an incomplete export
export.filename = None
export.filedir = None
export.save()
# delete export record, which should try to delete file as well
delete_url = reverse(delete_export, kwargs={
'username': self.user.username,
'id_string': self.xform.id_string,
'export_type': 'xls'
})
post_data = {'export_id': export.id}
response = self.client.post(delete_url, post_data)
self.assertEqual(response.status_code, 302)
def _setup(self):
self._wrapped = get_storage_class(settings.STATICFILES_STORAGE)()
def _setup(self):
self._wrapped = get_storage_class(settings.STATICFILES_STORAGE)()
def _setup(self):
self._wrapped = get_storage_class(settings.STATICFILES_STORAGE)()
def _setup(self):
self._wrapped = get_storage_class(settings.STATICFILES_STORAGE)()
def _setup(self):
self._wrapped = get_storage_class(settings.STATICFILES_STORAGE)()
def __init__(self, storage=None, **kwargs):
super(FileView, self).__init__(**kwargs)
if storage is None:
self.storage = get_storage_class()()
else:
self.storage = storage()
def _setup(self):
self._wrapped = get_storage_class(settings.STATICFILES_STORAGE)()
def response_with_mimetype_and_name(
mimetype, name, extension=None, show_date=True, file_path=None,
use_local_filesystem=False, full_mime=False):
if extension is None:
extension = mimetype
if not full_mime:
mimetype = "application/%s" % mimetype
if file_path:
try:
if not use_local_filesystem:
default_storage = get_storage_class()()
wrapper = FileWrapper(default_storage.open(file_path))
response = StreamingHttpResponse(wrapper,
content_type=mimetype)
response['Content-Length'] = default_storage.size(file_path)
else:
wrapper = FileWrapper(open(file_path))
response = StreamingHttpResponse(wrapper,
content_type=mimetype)
response['Content-Length'] = os.path.getsize(file_path)
except IOError:
response = HttpResponseNotFound(
_(u"The requested file could not be found."))
else:
response = HttpResponse(content_type=mimetype)
response['Content-Disposition'] = disposition_ext_and_date(
name, extension, show_date)
return response
def download_metadata(request, username, id_string, data_id):
xform = get_object_or_404(XForm,
user__username__iexact=username,
id_string__exact=id_string)
owner = xform.user
if username == request.user.username or xform.shared:
data = get_object_or_404(MetaData, pk=data_id)
file_path = data.data_file.name
filename, extension = os.path.splitext(file_path.split('/')[-1])
extension = extension.strip('.')
dfs = get_storage_class()()
if dfs.exists(file_path):
audit = {
'xform': xform.id_string
}
audit_log(
Actions.FORM_UPDATED, request.user, owner,
_("Document '%(filename)s' for '%(id_string)s' downloaded.") %
{
'id_string': xform.id_string,
'filename': "%s.%s" % (filename, extension)
}, audit, request)
response = response_with_mimetype_and_name(
data.data_file_type,
filename, extension=extension, show_date=False,
file_path=file_path)
return response
else:
return HttpResponseNotFound()
return HttpResponseForbidden(_(u'Permission denied.'))
def test_csv_nested_repeat_output(self):
path = os.path.join(self.fixture_dir, 'double_repeat.xls')
self._publish_xls_file(path)
path = os.path.join(self.fixture_dir, 'instance.xml')
self._make_submission(
path, forced_submission_time=self._submission_time)
self.maxDiff = None
dd = DataDictionary.objects.all()[0]
xpaths = [
u'/double_repeat/bed_net[1]/member[1]/name',
u'/double_repeat/bed_net[1]/member[2]/name',
u'/double_repeat/bed_net[2]/member[1]/name',
u'/double_repeat/bed_net[2]/member[2]/name',
u'/double_repeat/meta/instanceID'
]
self.assertEquals(dd.xpaths(repeat_iterations=2), xpaths)
# test csv
export = generate_export(Export.CSV_EXPORT, 'csv', self.user.username,
'double_repeat')
storage = get_storage_class()()
self.assertTrue(storage.exists(export.filepath))
path, ext = os.path.splitext(export.filename)
self.assertEqual(ext, '.csv')
with open(os.path.join(self.fixture_dir, 'export.csv')) as f1:
with storage.open(export.filepath) as f2:
expected_content = f1.read()
actual_content = f2.read()
self.assertEquals(actual_content, expected_content)
def export_delete_callback(sender, **kwargs):
export = kwargs['instance']
storage = get_storage_class()()
if export.filepath and storage.exists(export.filepath):
storage.delete(export.filepath)
def full_filepath(self):
if self.filepath:
default_storage = get_storage_class()()
try:
return default_storage.path(self.filepath)
except NotImplementedError:
# read file from s3
name, ext = os.path.splitext(self.filepath)
tmp = NamedTemporaryFile(suffix=ext, delete=False)
f = default_storage.open(self.filepath)
tmp.write(f.read())
tmp.close()
return tmp.name
return None
def test_delete_file_on_export_delete(self):
self._publish_transportation_form()
self._submit_transport_instance()
export = generate_export(Export.XLS_EXPORT, 'xls', self.user.username,
self.xform.id_string)
storage = get_storage_class()()
self.assertTrue(storage.exists(export.filepath))
# delete export object
export.delete()
self.assertFalse(storage.exists(export.filepath))
def _get_csv_data(self, filepath):
storage = get_storage_class()()
csv_file = storage.open(filepath)
reader = csv.DictReader(csv_file)
data = reader.next()
csv_file.close()
return data
def _get_xls_data(self, filepath):
storage = get_storage_class()()
with storage.open(filepath) as f:
workbook = open_workbook(file_contents=f.read())
transportation_sheet = workbook.sheet_by_name("transportation")
self.assertTrue(transportation_sheet.nrows > 1)
headers = transportation_sheet.row_values(0)
column1 = transportation_sheet.row_values(1)
return dict(zip(headers, column1))
def _xls_file_io(self):
'''
pulls the xls file from remote storage
this should be used sparingly
'''
file_path = self.xls.name
default_storage = get_storage_class()()
if file_path != '' and default_storage.exists(file_path):
with default_storage.open(file_path) as ff:
if file_path.endswith('.csv'):
return convert_csv_to_xls(ff.read())
else:
return StringIO(ff.read())
def to_xlsform(self):
'''Generate an XLS format XLSForm copy of this form.'''
file_path= self.xls.name
default_storage= get_storage_class()()
if file_path != '' and default_storage.exists(file_path):
with default_storage.open(file_path) as xlsform_file:
if file_path.endswith('.csv'):
xlsform_io = convert_csv_to_xls(xlsform_file.read())
else:
xlsform_io= io.BytesIO(xlsform_file.read())
return xlsform_io
else:
return None
def handle(self, *args, **kwargs):
try:
fs = get_storage_class(
'django.core.files.storage.FileSystemStorage')()
s3 = get_storage_class('storages.backends.s3boto.S3BotoStorage')()
except:
print _(u"Missing necessary libraries. Try running: pip install -r"
"requirements/s3.pip")
sys.exit(1)
default_storage = get_storage_class()()
if default_storage.__class__ != s3.__class__:
print _(u"You must first set your default storage to s3 in your "
"local_settings.py file.")
sys.exit(1)
classes_to_move = [
(Attachment, 'media_file', attachment_upload_to),
(XForm, 'xls', xform_upload_to),
]
for cls, file_field, upload_to in classes_to_move:
print _("Moving %(class)ss to s3...") % {'class': cls.__name__}
for i in cls.objects.all():
f = getattr(i, file_field)
old_filename = f.name
if f.name and fs.exists(f.name) and not s3.exists(
upload_to(i, f.name)):
f.save(fs.path(f.name), fs.open(fs.path(f.name)))
print (_("\t+ '%(fname)s'\n\t---> '%(url)s'")
% {'fname': fs.path(old_filename), 'url': f.url})
else:
print "\t- (f.name=%s, fs.exists(f.name)=%s, not s3.exist"\
"s(upload_to(i, f.name))=%s)" % (
f.name, fs.exists(f.name),
not s3.exists(upload_to(i, f.name)))