def test_post_submission_require_auth_anonymous_user(self):
self.user.profile.require_auth = True
self.user.profile.save()
count = Attachment.objects.count()
s = self.surveys[0]
media_file = "1335783522563.jpg"
path = os.path.join(self.main_directory, 'fixtures',
'transportation', 'instances', s, media_file)
with open(path) as f:
f = InMemoryUploadedFile(f, 'media_file', media_file, 'image/jpg',
os.path.getsize(path), None)
submission_path = os.path.join(
self.main_directory, 'fixtures',
'transportation', 'instances', s, s + '.xml')
with open(submission_path) as sf:
data = {'xml_submission_file': sf, 'media_file': f}
request = self.factory.post('/submission', data)
response = self.view(request)
self.assertEqual(response.status_code, 401)
response = self.view(request, username=self.user.username)
self.assertEqual(response.status_code, 401)
self.assertEqual(count, Attachment.objects.count())
python类InMemoryUploadedFile()的实例源码
def create_media(media):
"""Download media link"""
if is_valid_url(media.data_value):
filename = media.data_value.split('/')[-1]
data_file = NamedTemporaryFile()
content_type = mimetypes.guess_type(filename)
with closing(requests.get(media.data_value, stream=True)) as r:
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
if chunk:
data_file.write(chunk)
data_file.seek(os.SEEK_SET, os.SEEK_END)
size = os.path.getsize(data_file.name)
data_file.seek(os.SEEK_SET)
media.data_value = filename
media.data_file = InMemoryUploadedFile(
data_file, 'data_file', filename, content_type,
size, charset=None)
return media
return None
def test_windows_csv_file_upload(self):
count = MetaData.objects.filter(data_type='media').count()
media_file = os.path.join(
self.this_directory, 'fixtures', 'transportation',
'transportation.csv')
f = InMemoryUploadedFile(open(media_file),
'media',
'transportation.csv',
'application/octet-stream',
2625,
None)
MetaData.media_upload(self.xform, f)
media_list = MetaData.objects.filter(data_type='media')
new_count = media_list.count()
self.assertEqual(count + 1, new_count)
media = media_list.get(data_value='transportation.csv')
self.assertEqual(media.data_file_type, 'text/csv')
def crop_image(self, data_image, ratio):
image = Image.open(data_image)
cropped_image = image.crop(ratio)
# saving image to memory
thumb_io = io.BytesIO()
cropped_image.save(
thumb_io,
data_image.content_type.split('/')[-1].upper()
)
# creating new InMemoryUploadedFile() based on the modified file
file = InMemoryUploadedFile(
thumb_io,
'photo',
data_image._get_name(),
data_image.content_type,
None, None,
)
return file
def test_render_with_uploaded_image(self):
"""
Checks whether the template_with_initial is not affected by
the render method if it has been called with just uploaded image
"""
# set initial template_with_initial value
self.widget.template_with_initial = "bar"
# create a mock object of just uploaded image
image = mock.MagicMock(spec=InMemoryUploadedFile)
# patch the parent's render method
with mock.patch.object(
AdminFileWidget,
'render',
return_value='foo'
) as render:
# call the method with just uploaded image mock
result = widgets.ImageWidget.render(self.widget, 'name', image)
# check whether the parent's method has been called
# with the same arguments
render.assert_called_with('name', image, None)
# check whether the method returns the result of the parent's method
self.assertEqual(result, 'foo')
# check whether the template_with_initial has not been changed
self.assertEqual(self.widget.template_with_initial, 'bar')
helpers.py 文件源码
项目:django-open-volunteering-platform
作者: OpenVolunteeringPlatform
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def perform_image_crop(image_obj, crop_rect=None):
img_ext = os.path.splitext(image_obj.name)[1][1:].upper()
img_ext = 'JPEG' if img_ext=='JPG' else img_ext
if crop_rect is None:
return image_obj
image = BytesIO(image_obj.read())
base_image = Image.open(image)
tmp_img,tmp_file = base_image.crop(crop_rect), BytesIO()
tmp_img.save(tmp_file, format=img_ext)
tmp_file = ContentFile(tmp_file.getvalue())
return uploadedfile.InMemoryUploadedFile(
tmp_file, None, image_obj.name, image_obj.content_type, tmp_file.tell, None
)
def sms_media_to_file(file_object, name):
if isinstance(file_object, basestring):
file_object = io.BytesIO(file_object)
def getsize(f):
f.seek(0)
f.read()
s = f.tell()
f.seek(0)
return s
name = name.strip()
content_type, charset = mimetypes.guess_type(name)
size = getsize(file_object)
return InMemoryUploadedFile(file=file_object, name=name,
field_name=None, content_type=content_type,
charset=charset, size=size)
def save(self, commit=True):
instance = super(FormWithRequest, self).save(commit=False)
# Fix empty strings to None
for field in self.fields.keys():
if (hasattr(instance, field)
and (type(getattr(instance, field)) == unicode or type(getattr(instance, field)) == str)
and getattr(instance, field) == ''):
setattr(instance, field, None)
if (hasattr(instance, field)
and field in dir(self.Meta.model)
and type(self.Meta.model._meta.get_field(field)) == models.models.ImageField):
image = self.cleaned_data[field]
if image and (isinstance(image, InMemoryUploadedFile) or isinstance(image, TemporaryUploadedFile)):
filename = image.name
image = shrinkImageFromData(image.read(), filename)
setattr(instance, field, image)
if commit:
instance.save()
return instance
def retrieve(self, key, field_name):
metadata = self.metadata(key)
content = self.backend.get(self.prefix(key) + '_content')
if metadata and content:
out = BytesIO()
out.write(content)
upload = InMemoryUploadedFile(
file=out,
field_name=field_name,
name=metadata['name'],
content_type=metadata['content_type'],
size=metadata['size'],
charset=metadata['charset'],
)
upload.file.seek(0)
else:
upload = None
return upload
def _createTxtFilestream(self, strData, **kwargs):
"""
Helper function to create filestream for upload.
Parameters :
strData : str, test string data
Optional Arguments :
filename : str, Defaults to 'test.txt'
"""
filename = kwargs.get('filename', 'test.txt')
data = strData.encode('utf-8')
filedata = BytesIO(data)
filestream = InMemoryUploadedFile(
filedata,
None,
filename,
'text',
len(data),
None
)
return filestream
def _create_gif_filestream_from_base64(self, str_base64, **kwargs):
"""
Helper function to create filestream for upload.
Parameters :
strData : str, test string data
Optional Arguments :
filename : str, Defaults to 'test.txt'
"""
filename = kwargs.get('filename', 'test.gif')
data = base64.b64decode(str_base64)
filedata = BytesIO(data)
filestream = InMemoryUploadedFile(
filedata,
None,
filename,
'image',
len(data),
None
)
return filestream
def file_complete(self, file_size):
"""
Return a file object if we're activated.
"""
if not self.activated:
return
self.file.seek(0)
return InMemoryUploadedFile(
file = self.file,
field_name = self.field_name,
name = self.file_name,
content_type = self.content_type,
size = file_size,
charset = self.charset
)
def _internal_service_call(self, share_data) :
try:
# internal : IMAGE
print("?????????? ??? ?? ?? ?? ?? ?????????? ")
temp = {}
request_type = share_data.get_request_type()
decode_text = base64.decodebytes(str.encode(share_data.get_request_data()))
temp['test'] = [InMemoryUploadedFile(io.BytesIO(decode_text), None, 'test.jpg', 'image/jpeg', len(decode_text), None)]
ml = MultiValueDict(temp)
# fp = open("/hoya_src_root/nn00004/1/test1.jpg", 'wb')
# fp.write(decode_text)
# fp.close()
# CNN Prediction
if(request_type == "image"):
return_val = PredictNetCnn().run('nn00004', None, ml )
name_tag = {"KYJ" : "???", "KSW" : "???", "LTY" : "???", "LSH" : "???", "PJH" : "???", "KSS" : "???", "PSC" : "???"}
print("?????????? ??? ?? ?? ?? ?? : " + return_val['test.jpg']['key'][0])
share_data.set_output_data(name_tag[return_val['test.jpg']['key'][0]] + "?? ??? ????")
else :
share_data.set_output_data("??? ?? ??? ????")
return share_data
except Exception as e:
raise Exception(e)
def create_media(media):
"""Download media link"""
if is_valid_url(media.data_value):
filename = media.data_value.split('/')[-1]
data_file = NamedTemporaryFile()
content_type = mimetypes.guess_type(filename)
with closing(requests.get(media.data_value, stream=True)) as r:
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
if chunk:
data_file.write(chunk)
data_file.seek(os.SEEK_SET, os.SEEK_END)
size = os.path.getsize(data_file.name)
data_file.seek(os.SEEK_SET)
media.data_value = filename
media.data_file = InMemoryUploadedFile(
data_file, 'data_file', filename, content_type,
size, charset=None)
return media
return None
def test_windows_csv_file_upload(self):
count = MetaData.objects.filter(data_type='media').count()
media_file = os.path.join(
self.this_directory, 'fixtures', 'transportation',
'transportation.csv')
f = InMemoryUploadedFile(open(media_file),
'media',
'transportation.csv',
'application/octet-stream',
2625,
None)
MetaData.media_upload(self.xform, f)
media_list = MetaData.objects.filter(data_type='media')
new_count = media_list.count()
self.assertEqual(count + 1, new_count)
media = media_list.get(data_value='transportation.csv')
self.assertEqual(media.data_file_type, 'text/csv')
def test_post_submission_require_auth_anonymous_user(self):
self.user.profile.require_auth = True
self.user.profile.save()
count = Attachment.objects.count()
s = self.surveys[0]
media_file = "1335783522563.jpg"
path = os.path.join(self.main_directory, 'fixtures',
'transportation', 'instances', s, media_file)
with open(path) as f:
f = InMemoryUploadedFile(f, 'media_file', media_file, 'image/jpg',
os.path.getsize(path), None)
submission_path = os.path.join(
self.main_directory, 'fixtures',
'transportation', 'instances', s, s + '.xml')
with open(submission_path) as sf:
data = {'xml_submission_file': sf, 'media_file': f}
request = self.factory.post('/submission', data)
response = self.view(request)
self.assertEqual(response.status_code, 401)
response = self.view(request, username=self.user.username)
self.assertEqual(response.status_code, 401)
self.assertEqual(count, Attachment.objects.count())
def upload_major_info(request):
if not request.user.is_superuser:
return Response(status=status.HTTP_403_FORBIDDEN)
upload = request.FILES.get('file', False)
if upload:
from django.core.files.uploadedfile import InMemoryUploadedFile, TemporaryUploadedFile
if type(upload) is InMemoryUploadedFile:
temp_file = upload
elif type(upload) is TemporaryUploadedFile:
temp_file = open(upload.temporary_file_path())
majors = json.load(temp_file)
try:
for major in majors:
Major.objects.create(major_short=major['major_short'], major_full=major['major_full'])
except IntegrityError as e:
print e.message
return Response(status=status.HTTP_201_CREATED)
else:
return Response(status=status.HTTP_400_BAD_REQUEST)
def upload_professor_info(request):
if not request.user.is_superuser:
return Response(status=status.HTTP_403_FORBIDDEN)
upload = request.FILES.get('file', False)
if upload:
from django.core.files.uploadedfile import InMemoryUploadedFile, TemporaryUploadedFile
if type(upload) is InMemoryUploadedFile:
temp_file = upload
elif type(upload) is TemporaryUploadedFile:
temp_file = open(upload.temporary_file_path())
professors = json.load(temp_file)
try:
for professor in professors:
name = professor['name'].upper().split()
email = professor.get('email', '')
office = professor.get('address', '')
Professor.objects.create(first_name=name[0], last_name=name[1], email=email, office=office)
except IntegrityError as e:
print e.message
return Response(status=status.HTTP_201_CREATED)
else:
return Response(status=status.HTTP_400_BAD_REQUEST)
def strip_img_metadata(in_memory_img):
if not in_memory_img:
return None
content_type = ""
if type(in_memory_img) == InMemoryUploadedFile:
content_type = in_memory_img.content_type
else:
# XXX some testcases seem to create files with wrong file format. In case of erroneous magic
# bytes we do not strip any data.
# TODO is there something we can do about that?
pass
# jpeg
if content_type == "image/jpeg" or content_type == "image/jpg":
img = Image.open(in_memory_img)
img_io_bytes = io.BytesIO()
img.save(img_io_bytes, format='JPEG')
new_img = InMemoryUploadedFile(img_io_bytes, None, in_memory_img.name, 'image/jpeg',
img_io_bytes.getbuffer().nbytes, None)
new_img.seek(0)
img.close()
return new_img
# TODO remove additional metadata for other formats
return in_memory_img
def image_create(request, **kwargs):
"""Create image.
:param kwargs:
* copy_from: URL from which Glance server should immediately copy
the data and store it in its configured image store.
* data: Form data posted from client.
* location: URL where the data for this image already resides.
In the case of 'copy_from' and 'location', the Glance server
will give us a immediate response from create and handle the data
asynchronously.
In the case of 'data' the process of uploading the data may take
some time and is handed off to a separate thread.
"""
data = kwargs.pop('data', None)
image = glanceclient(request).images.create(**kwargs)
if data:
if isinstance(data, TemporaryUploadedFile):
# Hack to fool Django, so we can keep file open in the new thread.
data.file.close_called = True
if isinstance(data, InMemoryUploadedFile):
# Clone a new file for InMemeoryUploadedFile.
# Because the old one will be closed by Django.
data = SimpleUploadedFile(data.name,
data.read(),
data.content_type)
thread.start_new_thread(image_update,
(request, image.id),
{'data': data,
'purge_props': False})
return image
def test_save_no_location(self, mock_blob, mock_bucket):
s = Storage()
content = InMemoryUploadedFile(six.StringIO('1'), '', 'test.jpg', 'text/plain', 1, 'utf8')
with content:
s._save('', content)
mock_blob.assert_called_once_with('test.jpg', mock_bucket())
def test_save_with_location(self, mock_blob, mock_bucket):
s = Storage(location='images')
content = InMemoryUploadedFile(six.StringIO('1'), '', 'test.jpg', 'text/plain', 1, 'utf8')
with content:
s._save('', content)
mock_blob.assert_called_once_with('images/test.jpg', mock_bucket())
def get_image_in_memory_data():
"""
Creates the InMemoryUploadedFile object using thedata from io
to save it into the ImageField of the database
"""
io = get_image_data() # get a red rectangle 200x200px
# create the InMemoryUploadedFile object with the 'foo.jpg' file
image_file = InMemoryUploadedFile(io, None, 'foo.jpg',
'jpeg', sys.getsizeof(io), None)
image_file.seek(0) # seek to the beginning
return image_file
def create_in_memory_image(image, name, size):
"""
Resizes the image and saves it as InMemoryUploadedFile object
Returns the InMemoryUploadedFile object with the image data
"""
output = io.BytesIO() # create an io object
# resize the image and save it to the io object
image_resize(image, output, size)
# get MIME type of the image
mime = magic.from_buffer(output.getvalue(), mime=True)
# create InMemoryUploadedFile using data from the io
return uploadedfile.InMemoryUploadedFile(output, 'ImageField', name,
mime, sys.getsizeof(output), None)
def test_post_submission_anonymous(self):
s = self.surveys[0]
media_file = "1335783522563.jpg"
path = os.path.join(self.main_directory, 'fixtures',
'transportation', 'instances', s, media_file)
with open(path) as f:
f = InMemoryUploadedFile(f, 'media_file', media_file, 'image/jpg',
os.path.getsize(path), None)
submission_path = os.path.join(
self.main_directory, 'fixtures',
'transportation', 'instances', s, s + '.xml')
with open(submission_path) as sf:
data = {'xml_submission_file': sf, 'media_file': f}
request = self.factory.post(
'/%s/submission' % self.user.username, data)
request.user = AnonymousUser()
response = self.view(request, username=self.user.username)
self.assertContains(response, 'Successful submission',
status_code=201)
self.assertTrue(response.has_header('X-OpenRosa-Version'))
self.assertTrue(
response.has_header('X-OpenRosa-Accept-Content-Length'))
self.assertTrue(response.has_header('Date'))
self.assertEqual(response['Content-Type'],
'text/xml; charset=utf-8')
self.assertEqual(response['Location'],
'http://testserver/%s/submission'
% self.user.username)
def test_post_submission_authenticated(self):
s = self.surveys[0]
media_file = "1335783522563.jpg"
path = os.path.join(self.main_directory, 'fixtures',
'transportation', 'instances', s, media_file)
with open(path) as f:
f = InMemoryUploadedFile(f, 'media_file', media_file, 'image/jpg',
os.path.getsize(path), None)
submission_path = os.path.join(
self.main_directory, 'fixtures',
'transportation', 'instances', s, s + '.xml')
with open(submission_path) as sf:
data = {'xml_submission_file': sf, 'media_file': f}
request = self.factory.post('/submission', data)
response = self.view(request)
self.assertEqual(response.status_code, 401)
# rewind the file and redo the request since they were
# consummed
sf.seek(0)
request = self.factory.post('/submission', data)
auth = DigestAuth('bob', 'bobbob')
request.META.update(auth(request.META, response))
response = self.view(request, username=self.user.username)
self.assertContains(response, 'Successful submission',
status_code=201)
self.assertTrue(response.has_header('X-OpenRosa-Version'))
self.assertTrue(
response.has_header('X-OpenRosa-Accept-Content-Length'))
self.assertTrue(response.has_header('Date'))
self.assertEqual(response['Content-Type'],
'text/xml; charset=utf-8')
self.assertEqual(response['Location'],
'http://testserver/submission')
def test_windows_csv_file_upload_to_metadata(self):
data_value = 'transportation.csv'
path = os.path.join(self.fixture_dir, data_value)
with open(path) as f:
f = InMemoryUploadedFile(
f, 'media', data_value, 'text/csv', 2625, None)
data = {
'data_value': data_value,
'data_file': f,
'data_type': 'media',
'xform': self.xform.pk
}
self._post_form_metadata(data)
self.assertEqual(self.metadata.data_file_type, 'text/csv')
def django_file(path, field_name, content_type):
# adapted from here: http://groups.google.com/group/django-users/browse_th\
# read/thread/834f988876ff3c45/
f = open(path)
return InMemoryUploadedFile(
file=f,
field_name=field_name,
name=f.name,
content_type=content_type,
size=os.path.getsize(path),
charset=None
)
def django_file(file_obj, field_name, content_type):
return InMemoryUploadedFile(
file=file_obj,
field_name=field_name,
name=file_obj.name,
content_type=content_type,
size=file_obj.size,
charset=None
)
def capture_screenshot(project):
"""Captures screenshot of Project website and saves it as
project photo. If Project website is not available, then
project source is used.
"""
screenshot_url = project.source_link if not project.website_link \
else project.website_link
driver = webdriver.PhantomJS()
driver.set_window_size(1024, 420)
# Get URL
driver.get(screenshot_url)
screenshot = driver.get_screenshot_as_png()
img = Image.open(BytesIO(screenshot))
# Crop image
box = (0, 0, 1024, 420)
img = img.crop(box)
# Save image in a buffer and create a content file
buffer_ = BytesIO()
img.save(buffer_, 'PNG')
content_file = ContentFile(buffer_.getvalue())
# Save content file in memory and pass it to Project photo
# File is automatically renamed by Django in case of conflicts
image_file = InMemoryUploadedFile(content_file, None, 'project.png',
'image/png', content_file.tell, None)
Photo.objects.update_or_create(project=project,
defaults={'image': image_file})