def extract_to_temp(
file: FileStorage,
ignore_filter: IgnoreFilterManager,
handle_ignore: IgnoreHandling = IgnoreHandling.keep
) -> str:
"""Extracts the contents of file into a temporary directory.
:param file: The archive to extract.
:param ignore_filter: The files and directories that should be ignored.
:param handle_ignore: Determines how ignored files should be handled.
:returns: The pathname of the new temporary directory.
"""
tmpfd, tmparchive = tempfile.mkstemp()
try:
os.remove(tmparchive)
tmparchive += os.path.basename(
secure_filename('archive_' + file.filename)
)
tmpdir = tempfile.mkdtemp()
file.save(tmparchive)
if handle_ignore == IgnoreHandling.error:
arch = archive.Archive(tmparchive)
wrong_files = ignore_filter.get_ignored_files_in_archive(arch)
if wrong_files:
raise IgnoredFilesException(invalid_files=wrong_files)
arch.extract(to_path=tmpdir, method='safe')
else:
archive.extract(tmparchive, to_path=tmpdir, method='safe')
if handle_ignore == IgnoreHandling.delete:
ignore_filter.delete_from_dir(tmpdir)
finally:
os.close(tmpfd)
os.remove(tmparchive)
return tmpdir
python类FileStorage()的实例源码
def extract(
file: FileStorage,
ignore_filter: IgnoreFilterManager = None,
handle_ignore: IgnoreHandling = IgnoreHandling.keep
) -> t.Optional[ExtractFileTree]:
"""Extracts all files in archive with random name to uploads folder.
:param werkzeug.datastructures.FileStorage file: The file to extract.
:param ignore_filter: What files should be ignored in the given archive.
This can only be None when ``handle_ignore`` is
``IgnoreHandling.keep``.
:param handle_ignore: How should ignored file be handled.
:returns: A file tree as generated by
:py:func:`rename_directory_structure`.
"""
if handle_ignore == IgnoreHandling.keep and ignore_filter is None:
ignore_filter = IgnoreFilterManager([])
elif ignore_filter is None: # pragma: no cover
raise ValueError
tmpdir = extract_to_temp(
file,
ignore_filter,
handle_ignore,
)
rootdir = tmpdir.rstrip(os.sep)
start = rootdir.rfind(os.sep) + 1
try:
res = rename_directory_structure(tmpdir)[tmpdir[start:]]
filename: str = file.filename.split('.')[0]
if not res:
return None
elif len(res) > 1:
return {filename: res if isinstance(res, list) else [res]}
elif not isinstance(res[0], t.MutableMapping):
return {filename: res}
else:
return res[0]
finally:
shutil.rmtree(tmpdir)
def process_formdata(self, valuelist):
valuelist = (x for x in valuelist if isinstance(x, FileStorage) and x)
data = next(valuelist, None)
if data is not None:
self.data = data
else:
self.raw_data = ()
def has_file(self):
"""Return ``True`` if ``self.data`` is a
:class:`~werkzeug.datastructures.FileStorage` object.
.. deprecated:: 0.14.1
``data`` is no longer set if the input is not a non-empty
``FileStorage``. Check ``form.data is not None`` instead.
"""
warnings.warn(FlaskWTFDeprecationWarning(
'"has_file" is deprecated and will be removed in 1.0. The data is '
'checked during processing instead.'
))
return bool(self.data)
def __call__(self, form, field):
if not (isinstance(field.data, FileStorage) and field.data):
if self.message is None:
message = field.gettext('This field is required.')
else:
message = self.message
raise StopValidation(message)
def process_formdata(self, valuelist):
valuelist = (x for x in valuelist if isinstance(x, FileStorage) and x)
data = next(valuelist, None)
if data is not None:
self.data = data
else:
self.raw_data = ()
def has_file(self):
"""Return ``True`` if ``self.data`` is a
:class:`~werkzeug.datastructures.FileStorage` object.
.. deprecated:: 0.14.1
``data`` is no longer set if the input is not a non-empty
``FileStorage``. Check ``form.data is not None`` instead.
"""
warnings.warn(FlaskWTFDeprecationWarning(
'"has_file" is deprecated and will be removed in 1.0. The data is '
'checked during processing instead.'
))
return bool(self.data)
def __call__(self, form, field):
if not (isinstance(field.data, FileStorage) and field.data):
if self.message is None:
message = field.gettext('This field is required.')
else:
message = self.message
raise StopValidation(message)
test_base_handler.py 文件源码
项目:territoriali-backend
作者: algorithm-ninja
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def test_call_file(self):
handler = TestBaseHandler.DummyHandler()
env = Environ({"wsgi.input": None})
request = Request(env)
request.files = {"file": FileStorage(filename="foo")}
res = handler._call(handler.file, {}, request)
self.assertEqual("foo", res)
test_base_handler.py 文件源码
项目:territoriali-backend
作者: algorithm-ninja
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_get_file_name(self):
request = Request(Environ())
request.files = { "file": FileStorage(filename="foo") }
self.assertEqual("foo", BaseHandler._get_file_name(request))
test_base_handler.py 文件源码
项目:territoriali-backend
作者: algorithm-ninja
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def test_get_file_content(self):
request = Request(Environ())
stream = _io.BytesIO("hello world".encode())
request.files = {"file": FileStorage(stream=stream, filename="foo")}
self.assertEqual("hello world", BaseHandler._get_file_content(request).decode())
def test_save_file(self):
file_path_fixture = self.get_fixture_file_path("thumbnails/th01.png")
th_file = FileStorage(
stream=open(file_path_fixture, "rb"),
filename="th01.png"
)
full_path = thumbnail.save_file("shots", "instance-id", th_file)
thumbnail.turn_into_thumbnail(full_path, thumbnail.RECTANGLE_SIZE)
im = Image.open(full_path)
(width, height) = im.size
self.assertEqual(width, 150)
self.assertEqual(height, 100)
def _attachments_as_dict(
filestorages: Iterable[FileStorage],
attachment_encoder: AttachmentEncoder) -> Iterable[dict]:
for filestorage in filestorages:
filename = filestorage.filename
content = attachment_encoder.encode(filestorage.stream.read())
if filename and content:
yield {'filename': filename, 'content': content}
def process_formdata(self, valuelist):
valuelist = (x for x in valuelist if isinstance(x, FileStorage) and x)
data = next(valuelist, None)
if data is not None:
self.data = data
else:
self.raw_data = ()
def has_file(self):
"""Return ``True`` if ``self.data`` is a
:class:`~werkzeug.datastructures.FileStorage` object.
.. deprecated:: 0.14.1
``data`` is no longer set if the input is not a non-empty
``FileStorage``. Check ``form.data is not None`` instead.
"""
warnings.warn(FlaskWTFDeprecationWarning(
'"has_file" is deprecated and will be removed in 1.0. The data is '
'checked during processing instead.'
))
return bool(self.data)
def __call__(self, form, field):
if not (isinstance(field.data, FileStorage) and field.data):
if self.message is None:
message = field.gettext('This field is required.')
else:
message = self.message
raise StopValidation(message)
def process_formdata(self, valuelist):
valuelist = (x for x in valuelist if isinstance(x, FileStorage) and x)
data = next(valuelist, None)
if data is not None:
self.data = data
else:
self.raw_data = ()
def has_file(self):
"""Return ``True`` if ``self.data`` is a
:class:`~werkzeug.datastructures.FileStorage` object.
.. deprecated:: 0.14.1
``data`` is no longer set if the input is not a non-empty
``FileStorage``. Check ``form.data is not None`` instead.
"""
warnings.warn(FlaskWTFDeprecationWarning(
'"has_file" is deprecated and will be removed in 1.0. The data is '
'checked during processing instead.'
))
return bool(self.data)
def __call__(self, form, field):
if not (isinstance(field.data, FileStorage) and field.data):
if self.message is None:
message = field.gettext('This field is required.')
else:
message = self.message
raise StopValidation(message)
def process_formdata(self, valuelist):
valuelist = (x for x in valuelist if isinstance(x, FileStorage) and x)
data = next(valuelist, None)
if data is not None:
self.data = data
else:
self.raw_data = ()