def _make_zip(self, project, ty):
name = self._project_name_latin_encoded(project)
csv_task_generator = self._respond_csv(ty, project.id)
if csv_task_generator is not None:
# TODO: use temp file from csv generation directly
datafile = tempfile.NamedTemporaryFile()
try:
for line in csv_task_generator:
datafile.write(str(line))
datafile.flush()
csv_task_generator.close() # delete temp csv file
zipped_datafile = tempfile.NamedTemporaryFile()
try:
_zip = self._zip_factory(zipped_datafile.name)
_zip.write(
datafile.name, secure_filename('%s_%s.csv' % (name, ty)))
_zip.close()
container = "user_%d" % project.owner_id
_file = FileStorage(
filename=self.download_name(project, ty), stream=zipped_datafile)
uploader.upload_file(_file, container=container)
finally:
zipped_datafile.close()
finally:
datafile.close()
评论列表
文章目录