def generate_robot_report(self):
if self.xml_source is not None:
from robot.reporting.resultwriter import ResultWriter
log = os.path.splitext(self.xml_source.file.name)[0] + "_log.html"
report = os.path.splitext(self.xml_source.file.name)[0] + "_report.html"
if self.log_file is not None:
self.log_file.delete()
if self.report_file is not None:
self.report_file.delete()
self.log_file.save(os.path.basename(log), ContentFile('no content'))
self.report_file.save(os.path.basename(report), ContentFile('no content'))
writer = ResultWriter(self.xml_source.file.name)
retval = writer.write_results(report=report, log=log, xunit=None)
if retval == -1:
print "failed to regenerate files"
python类ContentFile()的实例源码
def test_read_and_save_cnt(self):
with override_storage.locmem_stats_override_storage() as storage:
name = 'file.txt'
content = 'file content'.encode()
self.assertEqual(storage.read_cnt, 0)
self.assertEqual(storage.save_cnt, 0)
obj = SimpleModel()
obj.upload_file.save(name, ContentFile(content))
self.assertEqual(storage.save_cnt, 1)
self.assertEqual(storage.read_cnt, 0)
read_content = obj.upload_file.read()
self.assertEqual(storage.read_cnt, 1)
self.assertEqual(content, read_content)
def _copy_file(self, destination, overwrite=False):
"""
Copies the file to a destination files and returns it.
"""
if overwrite:
# If the destination file already exists default storage backend
# does not overwrite it but generates another filename.
# TODO: Find a way to override this behavior.
raise NotImplementedError
src_file_name = self.file.name
storage = self.file.storages['public' if self.is_public else 'private']
# This is needed because most of the remote File Storage backend do not
# open the file.
src_file = storage.open(src_file_name)
src_file.open()
return storage.save(destination, ContentFile(src_file.read()))
def download(self):
"""
Download the LaTeX source of this paper and save to storage. It will
save the model.
"""
# Delete an existing file if it exists so we don't clutter up storage
# with dupes. This might mean we lose a file the download fails,
# but we can just download it again.
if self.source_file.name:
self.source_file.delete()
res = requests.get(self.get_source_url())
res.raise_for_status()
extension = guess_extension_from_headers(res.headers)
if not extension:
raise DownloadError("Could not determine file extension from "
"headers: Content-Type: {}; "
"Content-Encoding: {}".format(
res.headers.get('content-type'),
res.headers.get('content-encoding')))
content = ContentFile(res.content)
self.source_file.save(self.arxiv_id + extension, content)
def add_script_project(request):
name = request.POST['name']
desc = request.POST['description']
entrance = request.POST['entrance']
arguments = request.POST['arguments']
sp = ScriptProject(name=name, desc=desc, entrance_point=entrance, arguments=arguments)
sp.save()
project_path = os.path.join(SCRIPT_MANAGER_DIR, '%s_%s' % (str(sp.id), canonize_project_name(name)))
if not os.path.exists(project_path):
os.makedirs(project_path)
for file_ in request.FILES.getlist('files[]'):
path = default_storage.save(os.path.join(project_path, file_.name), ContentFile(file_.read()))
return HttpResponse()
def export(self, file_handle=None):
'''
exports the task questions and answers as a CSV
'''
try:
if not file_handle:
file_handle = StringIO.StringIO()
data = self.task.answers
# http://stackoverflow.com/a/11399424
# getting the union of all keys in all of the answer rows
headers = list(set().union(*(i.keys() for i in data)))
writer = csv.DictWriter(file_handle, fieldnames=headers)
writer.writeheader()
for row in data:
writer.writerow(row)
export_file = ContentFile(file_handle.getvalue())
export_filename = "ST_TASK_{task_id}_EXPORT_{date}.csv".format(task_id=self.task.id,
date=str(datetime.date.today()))
self.export_file.save(name=export_filename, content=export_file, save=False)
self.status = self.SUCCESS
except Exception as e:
LOG.exception(e)
self.status = self.FAILURE
self.save()
helpers.py 文件源码
项目:django-open-volunteering-platform
作者: OpenVolunteeringPlatform
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def perform_image_crop(image_obj, crop_rect=None):
img_ext = os.path.splitext(image_obj.name)[1][1:].upper()
img_ext = 'JPEG' if img_ext=='JPG' else img_ext
if crop_rect is None:
return image_obj
image = BytesIO(image_obj.read())
base_image = Image.open(image)
tmp_img,tmp_file = base_image.crop(crop_rect), BytesIO()
tmp_img.save(tmp_file, format=img_ext)
tmp_file = ContentFile(tmp_file.getvalue())
return uploadedfile.InMemoryUploadedFile(
tmp_file, None, image_obj.name, image_obj.content_type, tmp_file.tell, None
)
def save(self, *args, **kwargs):
pil_image_obj = Image.open(self.thumbnail_image)
new_image = resizeimage.resize_cover(
pil_image_obj,
[250, 150],
validate=False
)
new_image_io = BytesIO()
new_image.save(new_image_io, format='PNG')
temp_name = self.thumbnail_image.name
self.thumbnail_image.delete(save=False)
self.thumbnail_image.save(
temp_name,
content=ContentFile(new_image_io.getvalue()),
save=False
)
super(ThumbnailImage, self).save(*args, **kwargs)
def convert_image(image, width, height):
"""
resize the image to the correct size needed by the template.
"""
name = image.name
pio = Image.open(image)
if width is None:
img = resizeimage.resize_height(pio, height, validate=False)
else:
img = resizeimage.resize_cover(pio, [width, height], validate=False)
new_image_io = BytesIO()
img.save(new_image_io, format=pio.format)
image.delete(save=False)
image.save(
name,
content=ContentFile(new_image_io.getvalue()),
save=False
)
def put(self, request, *args, **kwargs):
'''
If there is a thumbnail, and it was sent as part of an
application/json payload, then we need to unpack a thumbnail
object payload and convert it to a Python ContentFile payload
instead. We use a try/catch because the optional nature means
we need to check using "if hasattr(request.data,'thumbnail'):"
as we as "if request.data['thumbnail']" and these are pretty
much mutually exclusive patterns. A try/pass make far more sense.
'''
try:
thumbnail = request.data['thumbnail']
# do we actually need to repack as ContentFile?
if thumbnail['name'] and thumbnail['base64']:
name = thumbnail['name']
encdata = thumbnail['base64']
proxy = ContentFile(base64.b64decode(encdata), name=name)
request.data['thumbnail'] = proxy
except:
pass
return super(UserProfileAPIView, self).put(request, *args, **kwargs)
def test_publish_xml_xlsform_download(self):
count = XForm.objects.count()
path = os.path.join(
self.this_directory, '..', '..', 'api', 'tests', 'fixtures',
'forms', 'contributions', 'contributions.xml')
f = open(path)
xml_file = ContentFile(f.read())
f.close()
xml_file.name = 'contributions.xml'
self.xform = publish_xml_form(xml_file, self.user)
self.assertTrue(XForm.objects.count() > count)
response = self.client.get(reverse(download_xlsform, kwargs={
'username': self.user.username,
'id_string': 'contributions'
}), follow=True)
self.assertContains(response, 'No XLS file for your form ')
def _contributions_form_submissions(self):
count = XForm.objects.count()
path = os.path.join(os.path.dirname(__file__),
'..', 'fixtures', 'forms', 'contributions')
form_path = os.path.join(path, 'contributions.xml')
f = open(form_path)
xml_file = ContentFile(f.read())
f.close()
xml_file.name = 'contributions.xml'
self.xform = publish_xml_form(xml_file, self.user)
self.assertTrue(XForm.objects.count() > count)
instances_path = os.path.join(path, 'instances')
for uuid in os.listdir(instances_path):
s_path = os.path.join(instances_path, uuid, 'submission.xml')
create_instance(self.user.username, open(s_path), [])
self.assertEqual(self.xform.instances.count(), 6)
def download_media_files(self, xml_doc, media_path):
for media_node in xml_doc.getElementsByTagName('mediaFile'):
filename_node = media_node.getElementsByTagName('filename')
url_node = media_node.getElementsByTagName('downloadUrl')
if filename_node and url_node:
filename = filename_node[0].childNodes[0].nodeValue
path = os.path.join(media_path, filename)
if default_storage.exists(path):
continue
download_url = url_node[0].childNodes[0].nodeValue
if self._get_media_response(download_url):
download_res = self._current_response
media_content = ContentFile(download_res.content)
default_storage.save(path, media_content)
self.logger.debug("Fetched %s." % filename)
else:
self.logger.error("Failed to fetch %s." % filename)
def _upload_instance(self, xml_file, instance_dir_path, files):
xml_doc = clean_and_parse_xml(xml_file.read())
xml = StringIO()
de_node = xml_doc.documentElement
for node in de_node.firstChild.childNodes:
xml.write(node.toxml())
new_xml_file = ContentFile(xml.getvalue())
new_xml_file.content_type = 'text/xml'
xml.close()
attachments = []
for attach in de_node.getElementsByTagName('mediaFile'):
filename_node = attach.getElementsByTagName('filename')
filename = filename_node[0].childNodes[0].nodeValue
if filename in files:
file_obj = default_storage.open(
os.path.join(instance_dir_path, filename))
mimetype, encoding = mimetypes.guess_type(file_obj.name)
media_obj = django_file(file_obj, 'media_files[]', mimetype)
attachments.append(media_obj)
create_instance(self.user.username, new_xml_file, attachments)
def _save_thumbnails(image, path, size, suffix):
nm = NamedTemporaryFile(suffix='.%s' % settings.IMG_FILE_TYPE)
default_storage = get_storage_class()()
try:
# Ensure conversion to float in operations
image.thumbnail(
get_dimensions(image.size, float(size)), Image.ANTIALIAS)
except ZeroDivisionError:
pass
try:
image.save(nm.name)
except IOError:
# e.g. `IOError: cannot write mode P as JPEG`, which gets raised when
# someone uploads an image in an indexed-color format like GIF
image.convert('RGB').save(nm.name)
default_storage.save(
get_path(path, suffix), ContentFile(nm.read()))
nm.close()
def test_fire_message_with_attachments(self):
MailFactory(message=self.message)
default_storage.save('foo.txt', ContentFile('testfoo'))
ma = MessageAttachment(message=self.message, file='foo.txt')
self.message.author.organization.settings.notify_message_status = False
ma.save()
# Fires the message
self.message.status = 'sending'
self.message.save()
first_mail = self.message.mails.all().first()
message = first_mail.message.to_mail(first_mail)
self.assertEqual(len(message.attachments), 1)
filename, content, mimetype = message.attachments[0]
self.assertEqual(filename, 'foo.txt')
self.assertEqual(content, 'testfoo')
self.assertEqual(mimetype, 'text/plain')
default_storage.delete('foo.txt')
def test_image_resizing(self):
image = Image(organization=self.organization)
file = ContentFile(self.create_random_image(100, 100).read())
image.file.save('random_image.png', file, save=False)
image.save()
image_file = PIL.Image.open(image.file.file)
self.assertEqual((100, 100), image_file.size)
image = Image(organization=self.organization)
file = ContentFile(self.create_random_image(100, 100).read())
image.file.save('random_image.png', file, save=False)
image.width = 50
image.save()
image_file = PIL.Image.open(image.file.file)
self.assertEqual((50, 50), image_file.size)
def store(self):
if len(self.data) >= self.MAX_SIZE:
raise TooBigMedia(self.identifying_name, self.MAX_SIZE)
mime = magic.from_buffer(self.data, mime=True)
if mime not in self.allowed_mimetypes:
raise InvalidMimeType(mime)
self.extension = mimetypes.guess_extension(mime)
# weirdness from mimetypes
if self.extension == '.jpe':
self.extension = '.jpeg'
checksum = hashlib.sha1(self.data).hexdigest()
fn = '{}{}'.format(checksum, self.extension)
img = Image(organization=self.organization)
img.file.save(fn, ContentFile(self.data))
return img.get_absolute_url()
def avatar_upload(request):
if request.method == "POST":
image_stream = request.POST.get('file', None)
image = ContentFile(b64decode(image_stream))
avatar = UserAvatar()
avatar.user_avatar.save(produce_image_name() + '.jpg', image)
current_user = User.objects(id=request.session['currentUser']['_id']).get()
image_url = prefURL['ImageURL'] + avatar.user_avatar.__str__()
current_user.update(avatar=image_url)
current_user.reload()
kw = {
'_id': str(current_user.id),
'nickname': current_user.nickname,
'avatar': current_user.avatar,
'token': current_user.token
}
process_token(**kw)
return HttpResponse(current_user.to_json())
raise Http404
def image_test(request):
if request.method == "GET":
print request.method
return JsonResponse({"result": 555})
if request.method == "POST":
print 123
xxs = request.POST.get('file', None)
# print xxs
# print type(xxs)
image_data = ContentFile(b64decode(xxs))
# file = ContentFile(xxs)
# print request.FILES
# print request.FILES['file'].__dict__
# print bytes(request.FILES['file'])
# print type(request.FILES['file'])
x = ImageTest()
# file = ContentFile(request.FILES['file'].read())
# # x.save()
x.image_avatar.save("test.jpg", image_data)
return JsonResponse({"result": 555})
def test_post_audio(self):
# Build a fake file
fake_file = ContentFile(b("A boring example song"))
fake_file.name = 'song.mp3'
# Submit
post_data = {
'title': "Test media",
'file': fake_file,
'duration': 100,
}
response = self.client.post(reverse('wagtailmedia:add', args=('audio', )), post_data)
# User should be redirected back to the index
self.assertRedirects(response, reverse('wagtailmedia:index'))
# Media should be created, and be placed in the root collection
self.assertTrue(models.Media.objects.filter(title="Test media").exists())
root_collection = Collection.get_first_root_node()
media = models.Media.objects.get(title="Test media")
self.assertEqual(media.collection, root_collection)
self.assertEqual(media.type, 'audio')
def test_post_video(self):
# Build a fake file
fake_file = ContentFile(b("A boring example movie"))
fake_file.name = 'movie.mp4'
# Submit
post_data = {
'title': "Test media",
'file': fake_file,
'duration': 100,
'width': 720,
'height': 480,
}
response = self.client.post(reverse('wagtailmedia:add', args=('video', )), post_data)
# User should be redirected back to the index
self.assertRedirects(response, reverse('wagtailmedia:index'))
# Media should be created, and be placed in the root collection
self.assertTrue(models.Media.objects.filter(title="Test media").exists())
root_collection = Collection.get_first_root_node()
media = models.Media.objects.get(title="Test media")
self.assertEqual(media.collection, root_collection)
self.assertEqual(media.type, 'video')
def test_post_audio(self):
# Build a fake file
fake_file = ContentFile(b("A boring example song"))
fake_file.name = 'song.mp3'
# Submit
post_data = {
'title': "Test media",
'file': fake_file,
'duration': 100,
}
response = self.client.post(reverse('wagtailmedia:add', args=('audio', )), post_data)
# User should be redirected back to the index
self.assertRedirects(response, reverse('wagtailmedia:index'))
# Media should be created with type 'audio' and in the 'evil plans' collection,
# despite there being no collection field in the form, because that's the
# only one the user has access to
self.assertTrue(models.Media.objects.filter(title="Test media").exists())
media = models.Media.objects.get(title="Test media")
self.assertEqual(media.collection, self.evil_plans_collection)
self.assertEqual(media.type, 'audio')
def test_post_video(self):
# Build a fake file
fake_file = ContentFile(b("A boring example movie"))
fake_file.name = 'movie.mp4'
# Submit
post_data = {
'title': "Test media",
'file': fake_file,
'duration': 100,
}
response = self.client.post(reverse('wagtailmedia:add', args=('video', )), post_data)
# User should be redirected back to the index
self.assertRedirects(response, reverse('wagtailmedia:index'))
# Media should be created with type 'video' and in the 'evil plans' collection,
# despite there being no collection field in the form, because that's the
# only one the user has access to
self.assertTrue(models.Media.objects.filter(title="Test media").exists())
media = models.Media.objects.get(title="Test media")
self.assertEqual(media.collection, self.evil_plans_collection)
self.assertEqual(media.type, 'video')
def test_post(self):
# Build a fake file
fake_file = ContentFile(b("A boring example song"))
fake_file.name = 'song.mp3'
# Submit title change
post_data = {
'title': "Test media changed!",
'file': fake_file,
'duration': 100,
}
response = self.client.post(reverse('wagtailmedia:edit', args=(self.media.id,)), post_data)
# User should be redirected back to the index
self.assertRedirects(response, reverse('wagtailmedia:index'))
# Media title should be changed
self.assertEqual(models.Media.objects.get(id=self.media.id).title, "Test media changed!")
def imageLoad(request):
if request.method == 'POST':
image_stream = request.POST.get('file',None)
image = ContentFile(b64decode(image_stream))
icon = UserIcon()
icon.user_iocn.save(produceImageName() + '.jpg',image)
current_user = User.objects(id = request.session['currentUser']['_id']).get()
image_url = preUrl['ImageUrl'] + icon.user_icon.__str__()
current_user.update(icon = image_url)
current_user.reload()
kw = {
'id':str(current_user.id),
'nickname':current_user.nickname,
'icon':current_user.icon,
'token':current_user.token
}
processToken(** kw)
results = current_user.to_json()
results['is_success'] = 1
return HttpResponse(results)
raise Http404
def test_update_asset_parent(self):
"""
Test update Asset parent only
"""
logging.info('Test update asset file...')
f = Folder(name='f')
f.save()
f2 = Folder(name='f2')
f2.save()
a = Asset(name='a', parent=f)
a.file.save('file.txt', ContentFile('Content'.encode('utf-8')))
a.parent = f2
a.save()
self.assertEqual(self.get_bucket_contents(), ['media/' + str(a.parent.id) + '/file.txt'])
def test_update_asset_file_and_parent(self):
"""
Test update Asset file and parent simultaneously
"""
logging.info('Test update asset file and parent simultaneously...')
f = Folder(name='f')
f.save()
f2 = Folder(name='f2')
f2.save()
a = Asset(name='a', parent=f)
a.file.save('file.txt', ContentFile('Content'.encode('utf-8')))
a.parent = f2
a.file.save('file2.txt', ContentFile('Content2'.encode('utf-8')))
self.assertEqual(self.get_bucket_contents(), ['media/' + str(a.parent.id) + '/file2.txt'])
def new_test_image():
"""
Creates an automatically generated test image.
In your testing `tearDown` method make sure to delete the test
image with the helper function `delete_test_image`.
The recommended way of using this helper function is as follows:
object_1.image_property.save(*new_test_image())
:return: Image name and image content file.
"""
warnings.warn(DeprecationWarning(
"new_test_image() is deprecated in favour of the get_sample_image() "
"context manager."), stacklevel=2)
image_name = 'test-{}.png'.format(uuid.uuid4())
image = Image.new('RGBA', size=(50, 50), color=(256, 0, 0))
ImageDraw.Draw(image)
byte_io = BytesIO()
image.save(byte_io, 'png')
byte_io.seek(0)
return image_name, ContentFile(byte_io.read(), image_name)
def _copy_file(self, destination, overwrite=False):
"""
Copies the file to a destination files and returns it.
"""
if overwrite:
# If the destination file already exists default storage backend
# does not overwrite it but generates another filename.
# TODO: Find a way to override this behavior.
raise NotImplementedError
src_file_name = self.file.name
storage = self.file.storages['public' if self.is_public else 'private']
# This is needed because most of the remote File Storage backend do not
# open the file.
src_file = storage.open(src_file_name)
src_file.open()
return storage.save(destination, ContentFile(src_file.read()))