def _make_zip(self, project, ty):
name = self._project_name_latin_encoded(project)
json_task_generator = self._respond_json(ty, project.id)
if json_task_generator is not None:
datafile = tempfile.NamedTemporaryFile()
try:
datafile.write(json.dumps(json_task_generator))
datafile.flush()
zipped_datafile = tempfile.NamedTemporaryFile()
try:
_zip = self._zip_factory(zipped_datafile.name)
_zip.write(datafile.name, secure_filename('%s_%s.json' % (name, ty)))
_zip.close()
container = "user_%d" % project.owner_id
_file = FileStorage(filename=self.download_name(project, ty), stream=zipped_datafile)
uploader.upload_file(_file, container=container)
finally:
zipped_datafile.close()
finally:
datafile.close()
python类FileStorage()的实例源码
def _make_zip(self, project, ty):
name = self._project_name_latin_encoded(project)
csv_task_generator = self._respond_csv(ty, project.id)
if csv_task_generator is not None:
# TODO: use temp file from csv generation directly
datafile = tempfile.NamedTemporaryFile()
try:
for line in csv_task_generator:
datafile.write(str(line))
datafile.flush()
csv_task_generator.close() # delete temp csv file
zipped_datafile = tempfile.NamedTemporaryFile()
try:
_zip = self._zip_factory(zipped_datafile.name)
_zip.write(
datafile.name, secure_filename('%s_%s.csv' % (name, ty)))
_zip.close()
container = "user_%d" % project.owner_id
_file = FileStorage(
filename=self.download_name(project, ty), stream=zipped_datafile)
uploader.upload_file(_file, container=container)
finally:
zipped_datafile.close()
finally:
datafile.close()
def test_crop(self):
"""Test UPLOADER crop works."""
u = Uploader()
size = (100, 100)
im = Image.new('RGB', size)
folder = tempfile.mkdtemp()
u.upload_folder = folder
im.save(os.path.join(folder, 'image.png'))
coordinates = (0, 0, 50, 50)
file = FileStorage(filename=os.path.join(folder, 'image.png'))
with patch('pybossa.uploader.Image', return_value=True):
err_msg = "It should crop the image"
assert u.crop(file, coordinates) is True, err_msg
with patch('pybossa.uploader.Image.open', side_effect=IOError):
err_msg = "It should return false"
assert u.crop(file, coordinates) is False, err_msg
def test_environ_builder_unicode_file_mix(self):
for use_tempfile in False, True:
f = FileStorage(BytesIO(u'\N{SNOWMAN}'.encode('utf-8')),
'snowman.txt')
d = MultiDict(dict(f=f, s=u'\N{SNOWMAN}'))
stream, length, boundary = stream_encode_multipart(
d, use_tempfile, threshold=150)
self.assert_true(isinstance(stream, BytesIO) != use_tempfile)
_, form, files = parse_form_data({
'wsgi.input': stream,
'CONTENT_LENGTH': str(length),
'CONTENT_TYPE': 'multipart/form-data; boundary="%s"' %
boundary
})
self.assert_strict_equal(form['s'], u'\N{SNOWMAN}')
self.assert_strict_equal(files['f'].name, 'f')
self.assert_strict_equal(files['f'].filename, u'snowman.txt')
self.assert_strict_equal(files['f'].read(),
u'\N{SNOWMAN}'.encode('utf-8'))
stream.close()
def __call__(self, form, field):
if not (isinstance(field.data, FileStorage) and field.data):
return
filename = field.data.filename.lower()
if isinstance(self.upload_set, Iterable):
if any(filename.endswith('.' + x) for x in self.upload_set):
return
raise StopValidation(self.message or field.gettext(
'File does not have an approved extension: {extensions}'
).format(extensions=', '.join(self.upload_set)))
if not self.upload_set.file_allowed(field.data, filename):
raise StopValidation(self.message or field.gettext(
'File does not have an approved extension.'
))
def __call__(self, form, field):
if not (isinstance(field.data, FileStorage) and field.data):
return
filename = field.data.filename.lower()
if isinstance(self.upload_set, Iterable):
if any(filename.endswith('.' + x) for x in self.upload_set):
return
raise StopValidation(self.message or field.gettext(
'File does not have an approved extension: {extensions}'
).format(extensions=', '.join(self.upload_set)))
if not self.upload_set.file_allowed(field.data, filename):
raise StopValidation(self.message or field.gettext(
'File does not have an approved extension.'
))
def __call__(self, form, field):
if not (isinstance(field.data, FileStorage) and field.data):
return
filename = field.data.filename.lower()
if isinstance(self.upload_set, Iterable):
if any(filename.endswith('.' + x) for x in self.upload_set):
return
raise StopValidation(self.message or field.gettext(
'File does not have an approved extension: {extensions}'
).format(extensions=', '.join(self.upload_set)))
if not self.upload_set.file_allowed(field.data, filename):
raise StopValidation(self.message or field.gettext(
'File does not have an approved extension.'
))
def __call__(self, form, field):
if not (isinstance(field.data, FileStorage) and field.data):
return
filename = field.data.filename.lower()
if isinstance(self.upload_set, Iterable):
if any(filename.endswith('.' + x) for x in self.upload_set):
return
raise StopValidation(self.message or field.gettext(
'File does not have an approved extension: {extensions}'
).format(extensions=', '.join(self.upload_set)))
if not self.upload_set.file_allowed(field.data, filename):
raise StopValidation(self.message or field.gettext(
'File does not have an approved extension.'
))
def __call__(self, form, field):
if not (isinstance(field.data, FileStorage) and field.data):
return
filename = field.data.filename.lower()
if isinstance(self.upload_set, Iterable):
if any(filename.endswith('.' + x) for x in self.upload_set):
return
raise StopValidation(self.message or field.gettext(
'File does not have an approved extension: {extensions}'
).format(extensions=', '.join(self.upload_set)))
if not self.upload_set.file_allowed(field.data, filename):
raise StopValidation(self.message or field.gettext(
'File does not have an approved extension.'
))
def convert(self, value, op):
# Don't cast None
if value is None:
if self.nullable:
return None
else:
raise ValueError('Must not be null!')
# and check if we're expecting a filestorage and haven't overridden `type`
# (required because the below instantiation isn't valid for FileStorage)
elif isinstance(value, FileStorage) and self.type == FileStorage:
return value
try:
return self.type(value, self.name, op)
except TypeError:
try:
if self.type is decimal.Decimal:
return self.type(str(value), self.name)
else:
return self.type(value, self.name)
except TypeError:
return self.type(value)
def convert(self, value, op):
# Don't cast None
if value is None:
if self.nullable:
return None
else:
raise ValueError('Must not be null!')
# and check if we're expecting a filestorage and haven't overridden `type`
# (required because the below instantiation isn't valid for FileStorage)
elif isinstance(value, FileStorage) and self.type == FileStorage:
return value
try:
return self.type(value, self.name, op)
except TypeError:
try:
if self.type is decimal.Decimal:
return self.type(str(value), self.name)
else:
return self.type(value, self.name)
except TypeError:
return self.type(value)
def __call__(self, form, field):
if not (isinstance(field.data, FileStorage) and field.data):
return
filename = field.data.filename.lower()
if isinstance(self.upload_set, Iterable):
if any(filename.endswith('.' + x) for x in self.upload_set):
return
raise StopValidation(self.message or field.gettext(
'File does not have an approved extension: {extensions}'
).format(extensions=', '.join(self.upload_set)))
if not self.upload_set.file_allowed(field.data, filename):
raise StopValidation(self.message or field.gettext(
'File does not have an approved extension.'
))
def test_rackspace_uploader_upload_correct_file(self, mock, mock2):
"""Test RACKSPACE UPLOADER upload file works."""
with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:
mycf.upload_file.return_value=True
mycf.get_object.side_effect = NoSuchObject
u = RackspaceUploader()
u.init_app(self.flask_app)
file = FileStorage(filename='test.jpg')
err_msg = "Upload file should return True"
assert u.upload_file(file, container='user_3') is True, err_msg
calls = [call.get_container('user_3'),
call.get_container().get_object('test.jpg')]
mycf.assert_has_calls(calls, any_order=True)
def test_rackspace_uploader_upload_correct_purgin_first_file(self, mock, mock2):
"""Test RACKSPACE UPLOADER upload file purging first file works."""
with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:
mycf.upload_file.return_value=True
mycf.get_object.side_effect = True
u = RackspaceUploader()
u.init_app(self.flask_app)
file = FileStorage(filename='test.jpg')
err_msg = "Upload file should return True"
assert u.upload_file(file, container='user_3') is True, err_msg
calls = [call.get_container('user_3'),
call.get_container().get_object().delete(),
call.get_container().get_object('test.jpg')]
print mycf.mock_calls
mycf.assert_has_calls(calls, any_order=True)
def test_rackspace_uploader_upload_file_fails(self, mock, mock2):
"""Test RACKSPACE UPLOADER upload file fail works."""
with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:
from pyrax.exceptions import UploadFailed
mycf.upload_file.side_effect = UploadFailed
u = RackspaceUploader()
u.init_app(self.flask_app)
file = FileStorage(filename='test.jpg')
err_msg = "Upload file should return False"
assert u.upload_file(file, container='user_3') is False, err_msg
def test_rackspace_uploader_upload_wrong_file(self, mock, mock2):
"""Test RACKSPACE UPLOADER upload wrong file extension works."""
with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:
mycf.upload_file.return_value = True
u = RackspaceUploader()
u.init_app(self.flask_app)
file = FileStorage(filename='test.docs')
err_msg = "Upload file should return False"
res = u.upload_file(file, container='user_3')
assert res is False, err_msg
def test_local_uploader_upload_fails(self, mock):
"""Test LOCAL UPLOADER upload fails."""
u = LocalUploader()
file = FileStorage(filename='test.jpg')
res = u.upload_file(file, container='user_3')
err_msg = ("Upload file should return False, \
as there is an exception")
assert res is False, err_msg
def test_local_uploader_upload_correct_file(self, mock):
"""Test LOCAL UPLOADER upload works."""
mock.save.return_value = None
u = LocalUploader()
file = FileStorage(filename='test.jpg')
res = u.upload_file(file, container='user_3')
err_msg = ("Upload file should return True, \
as this extension is allowed")
assert res is True, err_msg
def test_local_uploader_upload_wrong_file(self, mock):
"""Test LOCAL UPLOADER upload works with wrong extension."""
mock.save.return_value = None
u = LocalUploader()
file = FileStorage(filename='test.txt')
res = u.upload_file(file, container='user_3')
err_msg = ("Upload file should return False, \
as this extension is not allowed")
assert res is False, err_msg
def test_file_exists_for_real_file(self):
"""Test LOCAL UPLOADER file_exists returns True if the file exists"""
u = LocalUploader()
u.upload_folder = tempfile.mkdtemp()
file = FileStorage(filename='test.jpg')
container = 'mycontainer'
u.upload_file(file, container=container)
assert u.file_exists('test.jpg', container) is True
def process_formdata(self, valuelist):
valuelist = (x for x in valuelist if isinstance(x, FileStorage) and x)
data = next(valuelist, None)
if data is not None:
self.data = data
else:
self.raw_data = ()
def has_file(self):
"""Return ``True`` if ``self.data`` is a
:class:`~werkzeug.datastructures.FileStorage` object.
.. deprecated:: 0.14.1
``data`` is no longer set if the input is not a non-empty
``FileStorage``. Check ``form.data is not None`` instead.
"""
warnings.warn(FlaskWTFDeprecationWarning(
'"has_file" is deprecated and will be removed in 1.0. The data is '
'checked during processing instead.'
))
return bool(self.data)
def __call__(self, form, field):
if not (isinstance(field.data, FileStorage) and field.data):
if self.message is None:
message = field.gettext('This field is required.')
else:
message = self.message
raise StopValidation(message)
def __call__(self, form, field):
if not (isinstance(field.data, FileStorage) and field.data):
return
filename = field.data.filename.lower()
if isinstance(self.upload_set, Iterable):
if any(filename.endswith('.' + x) for x in self.upload_set):
return
raise StopValidation(self.message or field.gettext(
'File does not have an approved extension: {extensions}'
).format(extensions=', '.join(self.upload_set)))
if not self.upload_set.file_allowed(field.data, filename):
raise StopValidation(self.message or field.gettext(
'File does not have an approved extension.'
))
def process_formdata(self, valuelist):
valuelist = (x for x in valuelist if isinstance(x, FileStorage) and x)
data = next(valuelist, None)
if data is not None:
self.data = data
else:
self.raw_data = ()
def has_file(self):
"""Return ``True`` if ``self.data`` is a
:class:`~werkzeug.datastructures.FileStorage` object.
.. deprecated:: 0.14.1
``data`` is no longer set if the input is not a non-empty
``FileStorage``. Check ``form.data is not None`` instead.
"""
warnings.warn(FlaskWTFDeprecationWarning(
'"has_file" is deprecated and will be removed in 1.0. The data is '
'checked during processing instead.'
))
return bool(self.data)
def __call__(self, form, field):
if not (isinstance(field.data, FileStorage) and field.data):
if self.message is None:
message = field.gettext('This field is required.')
else:
message = self.message
raise StopValidation(message)
def __call__(self, form, field):
if not (isinstance(field.data, FileStorage) and field.data):
return
filename = field.data.filename.lower()
if isinstance(self.upload_set, Iterable):
if any(filename.endswith('.' + x) for x in self.upload_set):
return
raise StopValidation(self.message or field.gettext(
'File does not have an approved extension: {extensions}'
).format(extensions=', '.join(self.upload_set)))
if not self.upload_set.file_allowed(field.data, filename):
raise StopValidation(self.message or field.gettext(
'File does not have an approved extension.'
))
def setUp(self):
self.app = Flask(__name__)
self.app.config["SECRET_KEY"] = "test"
self.app.config["WTF_CSRF_ENABLED"] = False
self.app.config['TESTING'] = True
self.app.config["FILEUPLOAD_ALLOWED_EXTENSIONS"] = ["png", "jpg"]
self.app.config["FILEUPLOAD_CONVERT_TO_SNAKE_CASE"] = True
self.login_manager = LoginManager(self.app)
self.ffu = FlaskFileUpload(self.app)
self.dummy_stream = io.BytesIO(b"some initial text data")
self.fs = FileStorage(self.dummy_stream, "dummy.png")
def is_archive(file: FileStorage) -> bool:
"""Checks whether the file ends with a known archive file extension.
File extensions are known if they are recognized by the archive module.
:param file: Some file
:returns: True if the file has a known extension
"""
return file.filename.endswith(_known_archive_extensions)