def test_can_save_images(self):
self.assertNotIn("test.png", os.listdir(MEDIA_ROOT))
self.assertEqual(MediaFile.objects.all().count(), 0)
media_file = SimpleUploadedFile("test.png", b"\x00\x01\x02\x03")
image = MediaFile(mediatitle="test", mediafile=media_file)
image.save()
self.assertEqual(MediaFile.objects.all().count(), 1)
self.assertIn(
datetime.now().strftime("%Y%m%d%H%M%S") + ".png",
os.listdir(MEDIA_ROOT)
)
retrieved_image = MediaFile.objects.first()
self.assertEqual(retrieved_image, image)
python类SimpleUploadedFile()的实例源码
def test_tile_symmetry(self):
'''
Make sure that tiles are symmetric
'''
upload_file = open('data/Dixon2012-J1-NcoI-R1-filtered.100kb.multires.cool', 'rb')
tileset = tm.Tileset.objects.create(
datafile=dcfu.SimpleUploadedFile(upload_file.name, upload_file.read()),
filetype='cooler',
datatype='matrix',
owner=self.user1,
uuid='aa')
ret = self.client.get('/api/v1/tiles/?d=aa.0.0.0')
contents = json.loads(ret.content.decode('utf-8'))
import base64
r = base64.decodestring(contents['aa.0.0.0']['dense'].encode('utf-8'))
q = np.frombuffer(r, dtype=np.float16)
q = q.reshape((256,256))
def test_reset_deletes_tmp_files(self):
request = get_request()
storage = self.get_storage()('wizard1', request, temp_storage)
step = 'start'
file_ = SimpleUploadedFile('file.txt', b'content')
storage.set_step_files(step, {'file': file_})
with storage.get_step_files(step)['file'] as file:
tmp_name = file.name
self.assertTrue(storage.file_storage.exists(tmp_name))
storage.reset()
storage.update_response(HttpResponse())
self.assertFalse(storage.file_storage.exists(tmp_name))
def upload_file(request):
if request.method != 'POST':
return error(request, 'Wrong HTTP method!')
try:
filename = request.GET['filename']
except KeyError:
return error(request, 'Need a filename param!')
if not os.path.exists(settings.CML_UPLOAD_ROOT):
try:
os.makedirs(settings.CML_UPLOAD_ROOT)
except OSError:
return error(request, 'Can\'t create upload directory!')
filename = os.path.basename(filename)
temp_file = SimpleUploadedFile(filename, request.read(), content_type='text/xml')
with open(os.path.join(settings.CML_UPLOAD_ROOT, filename), 'wb') as f:
for chunk in temp_file.chunks():
f.write(chunk)
return success(request)
def setUp(self):
self.user1 = dcam.User.objects.create_user(
username='user1', password='pass'
)
upload_file = open(
(
'data/dixon2012-h1hesc-hindiii-allreps-filtered.1000kb'
'.multires.cool'
),
'rb'
)
self.cooler = tm.Tileset.objects.create(
datafile=dcfu.SimpleUploadedFile(
upload_file.name, upload_file.read()
),
filetype='cooler',
uuid='c1',
owner=self.user1
)
def setUp(self):
self.user1 = dcam.User.objects.create_user(
username='user1', password='pass'
)
upload_file = open('data/wgEncodeCaltechRnaSeqHuvecR1x75dTh1014IlnaPlusSignalRep2.bigWig', 'rb')
#x = upload_file.read()
self.tileset = tm.Tileset.objects.create(
datafile=dcfu.SimpleUploadedFile(upload_file.name, upload_file.read()),
filetype='bigwig',
datatype='vector',
owner=self.user1,
coordSystem='hg19',
coordSystem2='hg19',
name="x",
uuid='bw')
def test_unbalanced(self):
'''
Try to get tiles from an unbalanced dataset
'''
upload_file = open('data/G15509.K-562.2_sampleDown.multires.cool', 'rb')
tileset = tm.Tileset.objects.create(
datafile=dcfu.SimpleUploadedFile(upload_file.name, upload_file.read()),
filetype='cooler',
datatype='matrix',
owner=self.user1,
uuid='g1a')
ret = self.client.get('/api/v1/tiles/?d=g1a.0.0.0')
contents = json.loads(ret.content.decode('utf-8'))
self.assertIn('g1a.0.0.0', contents)
def setUp(self):
self.user1 = dcam.User.objects.create_user(
username='user1', password='pass'
)
self.user2 = dcam.User.objects.create_user(
username='user2', password='pass'
)
upload_file = open('data/dixon2012-h1hesc-hindiii-allreps-filtered.1000kb.multires.cool', 'rb')
self.cooler = tm.Tileset.objects.create(
datafile=dcfu.SimpleUploadedFile(upload_file.name, upload_file.read()),
filetype='cooler',
owner=self.user1
)
upload_file=open('data/wgEncodeCaltechRnaSeqHuvecR1x75dTh1014IlnaPlusSignalRep2.hitile', 'rb')
self.hitile = tm.Tileset.objects.create(
datafile=dcfu.SimpleUploadedFile(upload_file.name, upload_file.read()),
filetype='hitile',
owner=self.user1
)
def test_create_private_tileset(self):
"""Test to make sure that when we create a private dataset, we can only
access it if we're logged in as the proper user
"""
upload_file =open('data/wgEncodeCaltechRnaSeqHuvecR1x75dTh1014IlnaPlusSignalRep2.hitile', 'rb')
private_obj = tm.Tileset.objects.create(
datafile=dcfu.SimpleUploadedFile(upload_file.name, upload_file.read()),
filetype='hitile',
private=True,
owner=self.user1
)
c = dt.Client()
c.login(username='user1', password='pass')
returned = json.loads(
self.client.get(
'/api/v1/tileset_info/?d={uuid}'.format(uuid=private_obj.uuid)
).content.decode('utf-8')
)
def unzip(file_obj):
"""
Take a path to a zipfile and checks if it is a valid zip file
and returns...
"""
files = []
# TODO: implement try-except here
zip = ZipFile(file_obj)
bad_file = zip.testzip()
if bad_file:
raise Exception('"%s" in the .zip archive is corrupt.' % bad_file)
infolist = zip.infolist()
for zipinfo in infolist:
if zipinfo.filename.startswith('__'): # do not process meta files
continue
file_obj = SimpleUploadedFile(name=zipinfo.filename, content=zip.read(zipinfo))
files.append((file_obj, zipinfo.filename))
zip.close()
return files
def test_valid_file_upload(admin_client):
task = Task.objects.create(slug='test')
data = b"a"
url = reverse('task', kwargs={'task_id': task.id})
zip_file = SimpleUploadedFile(
"task1.zip", data, content_type="application/zip")
response = admin_client.post(url, {'zip_file': zip_file})
assert response.status_code == 302
assert urlparse(response.url).path == url
submissions = TaskSubmission.objects.all()
assert len(submissions) == 1
submission = submissions[0]
with open(submission.get_submission_path(), 'rb') as f:
assert data == f.read()
def create_assignmentype(prof, title=test_assignment_title, description='test',
nb_grading=2,
deadline_submission='2020-02-02 22:59:30+00:00',
deadline_grading='2020-02-12 22:59:30+00:00',
nb_questions=3, questions_coeff=[2, 1, 1],
list_students=file_students):
"""
Create an assignmentype and associated assignments
"""
fs = SimpleUploadedFile(name='list_students.csv',
content=open(file_students, 'rb').read())
assignmentype = Assignmentype.objects.create(
prof=prof, title=title, description=description,
nb_grading=nb_grading, deadline_submission=deadline_submission,
deadline_grading=deadline_grading, nb_questions=nb_questions,
questions_coeff=questions_coeff, list_students=fs)
# Get new and existing students, associated to the assignmetype
existing_students, new_students = tasks.get_students(assignmentype.
list_students.path)
# Create their assignment
tasks.create_assignment(assignmentype.id, existing_students, new_students)
return assignmentype
def test_valid_form(self):
"""
Checks whether the form validates correct data
"""
# open the image file
with open(self.image_path, 'rb') as upload_file:
# dict with valid values
post_dict = {
'position': 0,
'content_type': self.ctype.pk,
'object_id': 1
}
# dict with valid uploaded file
file_dict = {
# create an uploaded file object
# using opened image file
'image': SimpleUploadedFile(
upload_file.name,
upload_file.read()
)
}
# create a form with both dicts
form = forms.ImageAdminForm(post_dict, file_dict)
# check whether the form is valid
self.assertTrue(form.is_valid())
def test_invalid_form_without_content_type(self):
"""
Checks whether the form without specified content type is invalid
"""
# open the image file
with open(self.image_path, 'rb') as upload_file:
# dict without content_type
post_dict = {
'position': 0,
'object_id': 1
}
# dict with valid uploaded file
file_dict = {
'image': SimpleUploadedFile(
upload_file.name,
upload_file.read()
)
}
# create a form with both dicts
form = forms.ImageAdminForm(post_dict, file_dict)
# check whether the form is not valid
self.assertFalse(form.is_valid())
def test_invalid_form_without_object_id(self):
"""
Checks whether the form without specified object id is invalid
"""
# open the image file
with open(self.image_path, 'rb') as upload_file:
# dict without object_id
post_dict = {
'position': 0,
'content_type': self.ctype.pk
}
# dict with valid uploaded file
file_dict = {
'image': SimpleUploadedFile(
upload_file.name,
upload_file.read()
)
}
# create a form with both dicts
form = forms.ImageAdminForm(post_dict, file_dict)
# check whether the form is not valid
self.assertFalse(form.is_valid())
def test_valid_form_without_position(self):
"""
Checks whether the form without specified position
is valid since the 'posion' field is optional
"""
# open the image file
with open(self.image_path, 'rb') as upload_file:
# dict without position
post_dict = {
'content_type': self.ctype.pk,
'object_id': 1
}
# dict with valid uploaded file
file_dict = {
'image': SimpleUploadedFile(
upload_file.name,
upload_file.read()
)
}
# create a form with both dicts
form = forms.ImageAdminForm(post_dict, file_dict)
# check whether the form is valid
self.assertTrue(form.is_valid())
def test_add_attachment(self):
# We log as admin
self.client.login(username='admin', password='supertest')
data = {
'name': 'Test attachment',
'attachment_name': 'foo.txt',
'buildable': False,
'file': SimpleUploadedFile('test.txt', b'toto')
}
resp = self.client.post(reverse('attachment_add'), data=data)
self.assertRedirects(resp, reverse('attachment_list'))
qs = Attachment.objects.filter(name='Test attachment')
self.assertEqual(qs.count(), 1)
attachment = qs.first()
self.assertEqual(attachment.attachment_name, 'foo.txt')
self.assertEqual(attachment.buildable, False)
def test_edit_attachment_permissions(self):
# We create an attachment
att = Attachment.objects.create(
name='Test attachment',
attachment_name='foo.txt',
buildable=False,
file=SimpleUploadedFile('test.txt', b'toto')
)
# We try to access edit page
resp = self.client.get(reverse('attachment_edit', args=(att.pk,)))
self.assertEqual(resp.status_code, 302)
self.assertIn(resp.url, '%s?next=%s' % (reverse('login'),
reverse('attachment_edit',
args=(att.pk,))))
# We log as admin
self.client.login(username='admin', password='supertest')
resp = self.client.get(reverse('attachment_edit', args=(att.pk,)))
self.assertEqual(resp.status_code, 200)
# TODO: Test user that is not admin
def test_delete_attachment(self):
# We log as admin
self.client.login(username='admin', password='supertest')
# We create an attachment
att = Attachment.objects.create(
name='Test attachment',
attachment_name='foo.txt',
buildable=False,
file=SimpleUploadedFile('test.txt', b'toto')
)
# We remove it and check that it correctly redirect
resp = self.client.post(reverse('attachment_delete', args=(att.pk,)))
self.assertRedirects(resp, reverse('attachment_list'))
# We check that the landing page doesn't exists anymor
self.assertEqual(Attachment.objects.filter(pk=att.pk).count(), 0)
def test_delete_attachment_permissions(self):
# We create an attachment
att = Attachment.objects.create(
name='Test attachment',
attachment_name='foo.txt',
buildable=False,
file=SimpleUploadedFile('test.txt', b'toto')
)
# We try to access edit page
resp = self.client.get(reverse('attachment_delete', args=(att.pk,)))
self.assertEqual(resp.status_code, 302)
self.assertIn(resp.url, '%s?next=%s' % (reverse('login'),
reverse('attachment_delete',
args=(att.pk,))))
# We log as admin
self.client.login(username='admin', password='supertest')
resp = self.client.get(reverse('attachment_delete', args=(att.pk,)))
self.assertEqual(resp.status_code, 200)
# TODO: Test user that is not admin
def test_post_large_file(make_dirs, monkeypatch, settings): # noqa
monkeypatch.setattr(views, 'default_storage', FakeS3Storage())
file = create_file()
upload = SimpleUploadedFile('text.txt', open(file.name, 'rb').read())
upload.size = settings.AWS['MAX_FILE_SIZE'] + 1
request = HttpRequest()
setattr(request, 'method', 'POST')
setattr(request, 'FILES', {'file': upload})
setattr(request, 'POST', {'key': 'subdir/text.txt'})
response = views.fake_s3_upload(request)
assert response.status_code == 400
assert response.content.decode('utf-8') == errors.EXCEED_MAX_SIZE.format(
max_size=settings.AWS['MAX_FILE_SIZE'],
proposed_size=settings.AWS['MAX_FILE_SIZE'] + 1)
assert not os.path.isfile(
os.path.join(settings.MEDIA_ROOT, 's3/uploads/subdir', 'text.txt'))
def test_has_pdf_bundle_url_if_needed(self):
"""ApplicationBundleDetailView return pdf url if needed
create an `ApplicationBundle` that needs a pdf
try to access `ApplicationBundleDetailView` using `id`
assert that the url for `FilledPDFBundle` is in the template.
"""
self.be_sfpubdef_user()
mock_pdf = SimpleUploadedFile(
'a.pdf', b"things", content_type="application/pdf")
bundle = BundlesService.create_bundle_from_submissions(
organization=self.sf_pubdef,
submissions=self.sf_pubdef_submissions,
bundled_pdf=mock_pdf
)
url = bundle.get_pdf_bundle_url()
result = self.client.get(reverse(
'intake-app_bundle_detail',
kwargs=dict(bundle_id=bundle.id)))
self.assertContains(result, url)
def test_authenticated_user_can_see_filled_pdf(self):
self.be_sfpubdef_user()
submission = self.sf_pubdef_submissions[0]
filled_pdf_bytes = self.fillable.fill(submission)
pdf_file = SimpleUploadedFile('filled.pdf', filled_pdf_bytes,
content_type='application/pdf')
filled_pdf_model = models.FilledPDF(
original_pdf=self.fillable,
submission=submission,
pdf=pdf_file
)
filled_pdf_model.save()
pdf = self.client.get(reverse('intake-filled_pdf',
kwargs=dict(
submission_id=submission.id
)))
self.assertTrue(len(pdf.content) > 69000)
self.assertEqual(type(pdf.content), bytes)
def test_uploading_image(self, mock_get_thumbnail):
"""Test upload single image file"""
mock_get_thumbnail.return_value = ""
with open(settings.TEST_DATA_DIR+'/test.png', 'rb') as att:
form_data = {'file': SimpleUploadedFile(att.name, att.read()), 'media_type': 'images'}
resp = self.client.post(reverse('multiuploader'), data=form_data)
self.assertEqual(resp.status_code, 200)
data = {"files": [{"id": "1",
"name": "test.png",
"size": 180,
"type": "text/plain",
"url": "/multiuploader/1/",
"thumbnailUrl": "",
"deleteUrl": "/multiuploader/1/",
"deleteType": "DELETE"
}]
}
self.assertEqual(resp.content, JsonResponse(data).content)
resp = self.client.get(reverse('multiuploader', args=[1]), HTTP_USER_AGENT='Mozilla/5.0')
self.assertEqual(resp.status_code, 200)
resp = self.client.delete(reverse('multiuploader', args=[1]))
self.assertEqual(resp.status_code, 200)
def test_uploading_mp3(self, mock_get_thumbnail):
"""Test upload single mp3 file"""
mock_get_thumbnail.return_value = ""
with open(settings.TEST_DATA_DIR + '/test.mp3', 'rb') as att:
form_data = {'file': SimpleUploadedFile(att.name, att.read()), 'media_type': 'audio'}
resp = self.client.post(reverse('multiuploader'), data=form_data)
self.assertEqual(resp.status_code, 200)
data = {"files": [{"id": "2",
"name": "test.mp3",
"size": 3742720,
'type': "text/plain",
"url": "/multiuploader/2/",
"thumbnailUrl": "",
"deleteUrl": "/multiuploader/2/",
"deleteType": "DELETE", }]
}
self.assertEqual(resp.content, JsonResponse(data).content)
resp = self.client.get(reverse('multiuploader', args=[2]), HTTP_USER_AGENT='Mozilla/5.0')
self.assertEqual(resp.status_code, 200)
resp = self.client.delete(reverse('multiuploader', args=[2]))
self.assertEqual(resp.status_code, 200)
def test_uploading_pdf(self, mock_get_thumbnail):
"""Test upload single pdf file"""
mock_get_thumbnail.return_value = ""
with open(settings.TEST_DATA_DIR + '/test.pdf', 'rb') as att:
form_data = {'file': SimpleUploadedFile(att.name, att.read()), 'media_type': 'default'}
resp = self.client.post(reverse('multiuploader'), data=form_data)
self.assertEqual(resp.status_code, 200)
data = {"files": [{"id": "3",
"name": "test.pdf",
"size": 6763,
'type': "text/plain",
"url": "/multiuploader/3/",
"thumbnailUrl": "",
"deleteUrl": "/multiuploader/3/",
"deleteType": "DELETE", }]
}
self.assertEqual(resp.content, JsonResponse(data).content)
resp = self.client.get(reverse('multiuploader', args=[3]), HTTP_USER_AGENT='Mozilla/5.0')
self.assertEqual(resp.status_code, 200)
resp = self.client.delete(reverse('multiuploader', args=[3]))
self.assertEqual(resp.status_code, 200)
def test_uploading_thumbnail_quality(self, mock_get_thumbnail):
"""Test upload single image file with quality"""
mock_get_thumbnail.return_value = ""
with open(settings.TEST_DATA_DIR + '/test.png', 'rb') as att:
form_data = {'file': SimpleUploadedFile(att.name, att.read()), 'media_type': 'images', 'quality': 70}
resp = self.client.post(reverse('multiuploader'), data=form_data)
self.assertEqual(resp.status_code, 200)
data = {"files": [{"id": "4",
"name": "test.png",
"size": 180,
'type': "text/plain",
"url": "/multiuploader/4/",
"thumbnailUrl": "",
"deleteUrl": "/multiuploader/4/",
"deleteType": "DELETE", }]
}
self.assertEqual(resp.content, JsonResponse(data).content)
resp = self.client.get(reverse('multiuploader', args=[4]), HTTP_USER_AGENT='Mozilla/5.0')
self.assertEqual(resp.status_code, 200)
resp = self.client.delete(reverse('multiuploader', args=[4]))
self.assertEqual(resp.status_code, 200)
def test_uploading_thumbnail_size(self, mock_get_thumbnail):
"""Test upload single image file with size"""
mock_get_thumbnail.return_value = ""
with open(settings.TEST_DATA_DIR+'/test.png', 'rb') as att:
form_data = {'file': SimpleUploadedFile(att.name, att.read()), 'media_type': 'images', 'size': '280x80'}
resp = self.client.post(reverse('multiuploader'), data=form_data)
self.assertEqual(resp.status_code, 200)
data = {"files": [{"id": "5",
"name": "test.png",
"size": 180,
"type": "text/plain",
"url": "/multiuploader/5/",
"thumbnailUrl": "",
"deleteUrl": "/multiuploader/5/",
"deleteType": "DELETE"
}]
}
self.assertEqual(resp.content, JsonResponse(data).content)
resp = self.client.get(reverse('multiuploader', args=[5]), HTTP_USER_AGENT='Mozilla/5.0')
self.assertEqual(resp.status_code, 200)
resp = self.client.delete(reverse('multiuploader', args=[5]))
self.assertEqual(resp.status_code, 200)
def setUp(self):
super(FileItemResourceTest, self).setUp()
# turn on streaming_support so that test_view can test the view endpoint
# without streaming_supported set to True, view endpoint will behave exactly like download
settings.FILESERVICE_CONFIG['streaming_supported'] = True
self.image_filename = 'image.jpg'
self.image_file = SimpleUploadedFile(
name=self.image_filename,
content='',
content_type='image/jpg',
)
self.upload_url = '/api/fileservice/'
self.download_url_template = '/api/fileservice/download/{0}'
self.view_url_template = '/api/fileservice/view/{0}'
def test_reset_deletes_tmp_files(self):
request = get_request()
storage = self.get_storage()('wizard1', request, temp_storage)
step = 'start'
file_ = SimpleUploadedFile('file.txt', b'content')
storage.set_step_files(step, {'file': file_})
with storage.get_step_files(step)['file'] as file:
tmp_name = file.name
self.assertTrue(storage.file_storage.exists(tmp_name))
storage.reset()
storage.update_response(HttpResponse())
self.assertFalse(storage.file_storage.exists(tmp_name))