def test_resized_images_updated(self):
"""
thumbnails should be updated if image is already present and updated when update_image=True
"""
assert self.profile.image
assert self.profile.image_small
assert self.profile.image_medium
# create a dummy image file in memory for upload
image_file = BytesIO()
image = Image.new('RGBA', size=(50, 50), color=(256, 0, 0))
image.save(image_file, 'png')
image_file.seek(0)
self.profile.image = UploadedFile(image_file, "filename.png", "image/png", len(image_file.getvalue()))
self.profile.save(update_image=True)
image_file_bytes = image_file.read()
assert self.profile.image_small.file.read() != image_file_bytes
assert self.profile.image_medium.file.read() != image_file_bytes
python类UploadedFile()的实例源码
def test_case_study_create_api_success(
mock_create_case_study, supplier_case_study_end_to_end, sso_user,
all_case_study_data, api_response_200
):
mock_create_case_study.return_value = api_response_200
response = supplier_case_study_end_to_end()
assert response.status_code == http.client.FOUND
assert response.get('Location') == reverse('company-detail')
data = {
**all_case_study_data,
'image_one': ANY, 'image_two': ANY, 'image_three': ANY,
}
# django converts uploaded files to UploadedFile, which makes
# `assert_called_once_with` tricky.
assert mock_create_case_study.call_count == 1
assert mock_create_case_study.call_args == call(
data=data,
sso_session_id=sso_user.session_id,
)
def test_case_study_update_api_success(
mock_update_case_study, supplier_case_study_end_to_end, sso_user,
all_case_study_data, api_response_200
):
mock_update_case_study.return_value = api_response_200
response = supplier_case_study_end_to_end(case_study_id='1')
assert response.status_code == http.client.FOUND
assert response.get('Location') == reverse('company-detail')
# django converts uploaded files to UploadedFile, which makes
# `assert_called_once_with` tricky.
data = {
**all_case_study_data,
'image_one': ANY, 'image_two': ANY, 'image_three': ANY,
}
mock_update_case_study.assert_called_once_with(
data=data,
case_study_id='1',
sso_session_id=sso_user.session_id,
)
def _save_to_answer(self, field, answer, value):
action = 'pretalx.submission.answer' + ('update' if answer.pk else 'create')
if isinstance(field, forms.ModelMultipleChoiceField):
answstr = ', '.join([str(o) for o in value])
if not answer.pk:
answer.save()
else:
answer.options.clear()
answer.answer = answstr
answer.options.add(*value)
elif isinstance(field, forms.ModelChoiceField):
if not answer.pk:
answer.save()
else:
answer.options.clear()
answer.options.add(value)
answer.answer = value.answer
elif isinstance(field, forms.FileField):
if isinstance(value, UploadedFile):
answer.answer_file.save(value.name, value)
answer.answer = 'file://' + value.name
value = answer.answer
else:
answer.answer = value
answer.log_action(action, person=self.request_user, data={'answer': value})
def get_step_files(self, step):
wizard_files = self.data[self.step_files_key].get(step, {})
if wizard_files and not self.file_storage:
raise NoFileStorageConfigured(
"You need to define 'file_storage' in your "
"wizard view in order to handle file uploads.")
files = {}
for field, field_dict in six.iteritems(wizard_files):
field_dict = field_dict.copy()
tmp_name = field_dict.pop('tmp_name')
if (step, field) not in self._files:
self._files[(step, field)] = UploadedFile(
file=self.file_storage.open(tmp_name), **field_dict)
files[field] = self._files[(step, field)]
return files or None
def setUp(self):
TestBase.setUp(self)
self._publish_transportation_form()
self._submit_transport_instance_w_attachment()
src = os.path.join(self.this_directory, "fixtures",
"transportation", "screenshot.png")
uf = UploadedFile(file=open(src), content_type='image/png')
count = MetaData.objects.count()
MetaData.media_upload(self.xform, uf)
self.assertEqual(MetaData.objects.count(), count + 1)
url = urljoin(
self.base_url,
reverse(profile, kwargs={'username': self.user.username})
)
self._logout()
self._create_user_and_login('deno', 'deno')
self.bc = BriefcaseClient(
username='bob', password='bob',
url=url,
user=self.user
)
def clean_file(self):
pdf = self.cleaned_data['file']
if not pdf:
raise ValidationError(_('no file'))
# pdf magic check
if pdf.read(4) != PDF_MAGIC:
raise ValidationError(_('This file is not a PDF document.'))
pdf.seek(0)
# sanitization
try:
f = decrypt_pdf(pdf)
except ValueError:
raise ValidationError(_('The PDF-File seems to broken. For more Information click on the question mark in the sidebar.'))
f.seek(0)
return UploadedFile(f, content_type='application/pdf', name='upload.pdf')
def clean(self, value, *args, **kwargs):
value = super().clean(value, *args, **kwargs)
if isinstance(value, UploadedFile):
try:
from PIL import Image
except ImportError:
return value
value.open('rb')
value.seek(0)
try:
with Image.open(value) as im, tempfile.NamedTemporaryFile('rb', suffix='.png') as tmpfile:
im.save(tmpfile.name)
tmpfile.seek(0)
return SimpleUploadedFile('picture.png', tmpfile.read(), 'image png')
except IOError:
logger.exception('Could not convert image to PNG.')
raise ValidationError(
_('The file you uploaded could not be converted to PNG format.')
)
return value
def clean(self, *args, **kwargs):
data = super(PrivateFileField, self).clean(*args, **kwargs)
file = data.file
if isinstance(file, UploadedFile):
# content_type is only available for uploaded files,
# and not for files which are already stored in the model.
content_type = file.content_type
if self.content_types and content_type not in self.content_types:
logger.debug('Rejected uploaded file type: %s', content_type)
raise ValidationError(self.error_messages['invalid_file_type'])
if self.max_file_size and file.size > self.max_file_size:
raise ValidationError(self.error_messages['file_too_large'].format(
max_size=filesizeformat(self.max_file_size),
size=filesizeformat(file.size)
))
return data
def clean(self, value, *args, **kwargs):
value = super().clean(value, *args, **kwargs)
if isinstance(value, UploadedFile):
try:
from PIL import Image
except ImportError:
return value
value.open('rb')
value.seek(0)
try:
with Image.open(value) as im, tempfile.NamedTemporaryFile('rb', suffix='.png') as tmpfile:
im.save(tmpfile.name)
tmpfile.seek(0)
return SimpleUploadedFile('picture.png', tmpfile.read(), 'image png')
except IOError:
logger.exception('Could not convert image to PNG.')
raise ValidationError(
_('The file you uploaded could not be converted to PNG format.')
)
return value
def setUp(self):
TestBase.setUp(self)
self._publish_transportation_form()
self._submit_transport_instance_w_attachment()
src = os.path.join(self.this_directory, "fixtures",
"transportation", "screenshot.png")
uf = UploadedFile(file=open(src), content_type='image/png')
count = MetaData.objects.count()
MetaData.media_upload(self.xform, uf)
self.assertEqual(MetaData.objects.count(), count + 1)
url = urljoin(
self.base_url,
reverse(profile, kwargs={'username': self.user.username})
)
self._logout()
self._create_user_and_login('deno', 'deno')
self.bc = BriefcaseClient(
username='bob', password='bob',
url=url,
user=self.user
)
def test_files(rf):
request = rf.post('/upload', {
'file': ContentFile(b'foo', name='foo.txt'),
})
request.api_info = APIInfo(router.get_path('/upload').get_operation('post'))
parameters = read_parameters(request, {})
assert isinstance(parameters['file'], UploadedFile)
def setUp(self):
super().setUp()
# create a dummy image file in memory for upload
image_file = BytesIO()
image = Image.new('RGBA', size=(50, 50), color=(256, 0, 0))
image.save(image_file, 'png')
image_file.seek(0)
self.profile.image = UploadedFile(image_file, "filename.png", "image/png", len(image_file.getvalue()))
self.profile.save(update_image=True)
def image_one():
return UploadedFile(
file=create_test_image('png'),
name='one.png',
content_type='image/png',
size=100,
)
def image_two():
return UploadedFile(
file=create_test_image('png'),
name='one.png',
content_type='image/png',
size=100,
)
def image_three():
return UploadedFile(
file=create_test_image('png'),
name='one.png',
content_type='image/png',
size=100,
)
def form_valid(self, form):
"""
If the form is valid, redirect to the supplied URL.
"""
log.info('received POST to main multiuploader view')
file_obj = self.request.FILES['file']
wrapped_file = UploadedFile(file_obj)
filename = wrapped_file.name
file_size = wrapped_file.file.size
log.info('Got file: "%s"' % filename)
fl = self.model(filename=filename, file=file_obj)
fl.save()
log.info('File saving done')
thumb_url = ""
size = self.request.POST.get('size')
size = size if size else '180x80'
quality = self.request.POST.get('quality')
quality = quality if quality else 50
try:
im = get_thumbnail(fl.file, size, quality=quality)
thumb_url = im.url
except Exception as e:
log.error(e)
# generating json response array
result = {"files": [{"id": str(fl.id),
"name": filename,
"size": file_size,
'type': file_obj.content_type,
"url": reverse('multiuploader', args=[fl.pk]),
"thumbnailUrl": thumb_url,
"deleteUrl": reverse('multiuploader', args=[fl.pk]),
"deleteType": "DELETE", }]
}
return JsonResponse(result)
def serialize_form(self, form):
"""
Given this job's bound form return the form's data as a session serializable object
The field order is preserved from the original form
"""
data = []
for field_name, field in form.fields.items():
if field_name in form.cleaned_data:
form_value = form.cleaned_data[field_name]
display_value = None
if isinstance(form_value, models.Model):
ctype = ContentType.objects.get_for_model(form_value)
form_value = '{0}{1}.{2}:{3}'.format(
_CONTENT_TYPE_PREFIX,
ctype.app_label,
ctype.model,
form_value.pk
)
elif isinstance(form_value, UploadedFile):
file_name = _fs.get_available_name(form_value.name)
file_path = _fs.path(file_name)
with open(file_path, 'wb+') as destination:
for chunk in form_value.chunks():
destination.write(chunk)
form_value = file_path
display_value = file_name
data.append({
'name': field_name,
'label': force_text(field.label) if field.label else None,
'value': form_value,
'display_value': display_value,
})
return data
def clean(self):
file = self.cleaned_data.get('file')
if not self.cleaned_data.get('name'):
if isinstance(file, UploadedFile):
self.cleaned_data['name'] = file.name
else:
del self.cleaned_data['name']
return self.cleaned_data
def test_metadata_file_hash(self):
self._publish_transportation_form()
src = os.path.join(self.this_directory, "fixtures",
"transportation", "screenshot.png")
uf = UploadedFile(file=open(src), content_type='image/png')
count = MetaData.objects.count()
MetaData.media_upload(self.xform, uf)
# assert successful insert of new metadata record
self.assertEqual(MetaData.objects.count(), count + 1)
md = MetaData.objects.get(xform=self.xform,
data_value='screenshot.png')
# assert checksum string has been generated, hash length > 1
self.assertTrue(len(md.hash) > 16)
def clean(self, value, *args, **kwargs):
value = super().clean(value, *args, **kwargs)
if isinstance(value, UploadedFile):
value.open('rb')
value.seek(0)
content = value.read()
if content.startswith(b'-----BEGIN CERTIFICATE-----') and b'-----BEGIN CERTIFICATE-----' in content:
return SimpleUploadedFile('cert.pem', content, 'text/plain')
openssl_cmd = [
'openssl',
'x509',
'-inform',
'DER',
'-outform',
'PEM',
]
process = subprocess.Popen(
openssl_cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
)
process.stdin.write(content)
pem, error = process.communicate()
if process.returncode != 0:
logger.info('Trying to convert a DER to PEM failed: {}'.format(error))
raise ValidationError(
_('This does not look like a X509 certificate in either PEM or DER format'),
)
return SimpleUploadedFile('cert.pem', pem, 'text/plain')
return value
def get_step_files(self, step):
wizard_files = self.data[self.step_files_key].get(step, {})
if wizard_files and not self.file_storage:
raise NoFileStorageConfigured(
"You need to define 'file_storage' in your "
"wizard view in order to handle file uploads.")
files = {}
for field, field_dict in six.iteritems(wizard_files):
field_dict = field_dict.copy()
tmp_name = field_dict.pop('tmp_name')
files[field] = UploadedFile(
file=self.file_storage.open(tmp_name), **field_dict)
return files or None
def make_value_from_form(self, value):
"""Convert a form value to a property value.
This extracts the content from the UploadedFile instance returned
by the FileField instance.
"""
if have_uploadedfile and isinstance(value, uploadedfile.UploadedFile):
if not self.form_value:
self.form_value = value.read()
b = db.Blob(self.form_value)
return b
return super(BlobProperty, self).make_value_from_form(value)
def get_step_files(self, step):
wizard_files = self.data[self.step_files_key].get(step, {})
if wizard_files and not self.file_storage:
raise NoFileStorageConfigured(
"You need to define 'file_storage' in your "
"wizard view in order to handle file uploads.")
files = {}
for field, field_dict in six.iteritems(wizard_files):
field_dict = field_dict.copy()
tmp_name = field_dict.pop('tmp_name')
files[field] = UploadedFile(
file=self.file_storage.open(tmp_name), **field_dict)
return files or None
def post_create_issue(self, request, issue, data):
from issues_media.models import IssueMedia
for file in request.FILES.getlist('media', ()):
assert isinstance(file, UploadedFile)
if file.size > 100 * 1024 * 1024:
raise ValidationError("File %s is too large" % file)
# TODO: Add mimetype validation somewhere around here
IssueMedia.objects.create(
issue=issue,
file=file
)
def test_metadata_file_hash(self):
self._publish_transportation_form()
src = os.path.join(self.this_directory, "fixtures",
"transportation", "screenshot.png")
uf = UploadedFile(file=open(src), content_type='image/png')
count = MetaData.objects.count()
MetaData.media_upload(self.xform, uf)
# assert successful insert of new metadata record
self.assertEqual(MetaData.objects.count(), count + 1)
md = MetaData.objects.get(xform=self.xform,
data_value='screenshot.png')
# assert checksum string has been generated, hash length > 1
self.assertTrue(len(md.hash) > 16)
def save(self) -> None:
"""
Saves all changed values to the database.
"""
for name, field in self.fields.items():
value = self.cleaned_data[name]
if isinstance(value, UploadedFile):
# Delete old file
fname = self._s.get(name, as_type=File)
if fname:
try:
default_storage.delete(fname.name)
except OSError: # pragma: no cover
logger.error('Deleting file %s failed.' % fname.name)
# Create new file
newname = default_storage.save(self.get_new_filename(value.name), value)
value._name = newname
self._s.set(name, value)
elif isinstance(value, File):
# file is unchanged
continue
elif isinstance(field, forms.FileField):
# file is deleted
fname = self._s.get(name, as_type=File)
if fname:
try:
default_storage.delete(fname.name)
except OSError: # pragma: no cover
logger.error('Deleting file %s failed.' % fname.name)
del self._s[name]
elif value is None:
del self._s[name]
elif self._s.get(name, as_type=type(value)) != value:
self._s.set(name, value)
def value_from_datadict(self, data: Dict[str, str], files: Dict[str, UploadedFile], name: str) -> str:
values = super().value_from_datadict(data, files, name)
return ",".join(values)
def test_upload_avatar_inTemp(self):
avatar = open(image_path, "rb")
new_dict = default_dict2.copy()
new_dict['avatar'] = avatar
avatar2 = UploadedFile(avatar, name=avatar.name, content_type="image/png", size=os.path.getsize(image_path))
response = self.client.post(reverse('user_profile:edit_profile', kwargs={"username": user_name}),
new_dict, follow=True)
self.assertContains(response, avatar2)
self.delete_image_test()
avatar.close()
# helper function
def _save(self, name, content):
name = self._normalise_name(name)
name = self._prepend_prefix(name)
content = UploadedFile(content, name)
response = self._upload(name, content)
return response['public_id']