def test_race_condition_handling(self):
# Hold on to original os.remove
original_remove = os.remove
def race_remove(path):
"Patched os.remove to raise ENOENT (No such file or directory)"
original_remove(path)
raise OSError(errno.ENOENT, 'Fake ENOENT')
try:
os.remove = race_remove
self.default_storage.save('race.file', ContentFile('Fake ENOENT'))
self.default_storage.delete('race.file')
self.assertFalse(self.default_storage.exists('race.file'))
finally:
# Restore os.remove
os.remove = original_remove
python类ContentFile()的实例源码
def saveImage(image):
pwd = os.getcwd()
filetype = image.content_type.split('/')[1]
filePath = os.path.abspath(os.path.dirname(pwd)) + '/Django/image/' + str(time.time()) + '.' + filetype
if isinstance(image, TemporaryUploadedFile):
temp_file = open(image.temporary_file_path(), 'rb+')
content = cStringIO.StringIO(temp_file.read())
image = Image.open(content)
temp_file.close()
# image.seek(0)
# img = Image.open(image)
# img.save(filePath)
default_storage.save(filePath, ContentFile(image.read()))
path = detectObject(filePath, filetype)
# new_img = Image.open(path)
fileName = path.split('/')[-1]
return fileName
# return HttpResponse("<img src="{% static img %}" style="width: 80%;alignment: center">")
def load_mugshots(data_dir='/root/mugshots'):
from .models import DepartmentUser
files = [x for x in os.listdir(data_dir) if os.path.isfile(os.path.join(data_dir, x))]
valid = 0
for f in files:
name = os.path.splitext(f)[0]
qs = DepartmentUser.objects.filter(username__iexact=name)
if qs:
with open(os.path.join(data_dir, f)) as fp:
qs[0].photo.save(f, ContentFile(fp.read()))
print('Updated photo for {}'.format(name))
valid += 1
else:
print('ERROR: Username {} not found'.format(name))
print('{}/{} photos valid'.format(valid, len(files)))
def test_post(self):
# Build a fake file
fake_file = ContentFile(b("A boring example document"))
fake_file.name = 'test.txt'
# Submit
post_data = {
'title': "Test document",
'file': fake_file,
}
response = self.client.post(reverse('wagtaildocs:add'), post_data)
# User should be redirected back to the index
self.assertRedirects(response, reverse('wagtaildocs:index'))
# Document should be created, and be placed in the root collection
self.assertTrue(models.Document.objects.filter(title="Test document").exists())
root_collection = Collection.get_first_root_node()
self.assertEqual(
models.Document.objects.get(title="Test document").collection,
root_collection
)
def test_post(self):
# Build a fake file
fake_file = ContentFile(b("A boring example document"))
fake_file.name = 'test.txt'
# Submit
post_data = {
'title': "Test document",
'file': fake_file,
}
response = self.client.post(reverse('wagtaildocs:add'), post_data)
# User should be redirected back to the index
self.assertRedirects(response, reverse('wagtaildocs:index'))
# Document should be created in the 'evil plans' collection,
# despite there being no collection field in the form, because that's the
# only one the user has access to
self.assertTrue(models.Document.objects.filter(title="Test document").exists())
self.assertEqual(
models.Document.objects.get(title="Test document").collection,
self.evil_plans_collection
)
def test_post(self):
# Build a fake file
fake_file = ContentFile(b("A boring example document"))
fake_file.name = 'test.txt'
# Submit title change
post_data = {
'title': "Test document changed!",
'file': fake_file,
}
response = self.client.post(reverse('wagtaildocs:edit', args=(self.document.id,)), post_data)
# User should be redirected back to the index
self.assertRedirects(response, reverse('wagtaildocs:index'))
# Document title should be changed
self.assertEqual(models.Document.objects.get(id=self.document.id).title, "Test document changed!")
def test_post(self):
# Build a fake file
fake_file = ContentFile(b("A boring example document"))
fake_file.name = 'test.txt'
# Submit
post_data = {
'title': "Test document",
'file': fake_file,
}
response = self.client.post(reverse('wagtaildocs:chooser_upload'), post_data)
# Check that the response is a javascript file saying the document was chosen
self.assertTemplateUsed(response, 'wagtaildocs/chooser/document_chosen.js')
self.assertContains(response, "modal.respond('documentChosen'")
# Document should be created
self.assertTrue(models.Document.objects.filter(title="Test document").exists())
def test_post(self):
# Build a fake file
fake_file = ContentFile(b("A boring example document"))
fake_file.name = 'test.txt'
# Submit
post_data = {
'title': "Test document",
'file': fake_file,
}
response = self.client.post(reverse('wagtaildocs:chooser_upload'), post_data)
# Check that the response is a javascript file saying the document was chosen
self.assertTemplateUsed(response, 'wagtaildocs/chooser/document_chosen.js')
self.assertContains(response, "modal.respond('documentChosen'")
# Document should be created
doc = models.Document.objects.filter(title="Test document")
self.assertTrue(doc.exists())
# Document should be in the 'evil plans' collection
self.assertEqual(doc.get().collection, self.evil_plans_collection)
def add_document(self, **params):
# Build a fake file
fake_file = ContentFile(b("A boring example document"))
fake_file.name = 'test.txt'
# Submit
post_data = {
'title': "Test document",
'file': fake_file,
}
post_data.update(params)
response = self.client.post(reverse('wagtaildocs:add'), post_data)
# User should be redirected back to the index
self.assertRedirects(response, reverse('wagtaildocs:index'))
# Document should be created
doc = models.Document.objects.filter(title=post_data['title'])
self.assertTrue(doc.exists())
return doc.first()
def thumbnail_from_big_image(big_image, size=None):
try:
src_w, src_h = get_image_dimensions(big_image)
img = Image.open(big_image)
img.thumbnail((512,512))
th_w, th_h = img.size
if size is None:
size = settings.DEFAULT_MAX_THUMB_SIZE
if th_w > size or th_h > size:
x = (th_w - size)/2.0
if x < 0:
x = 0
y = (th_h - size)/2.0
if y < 0:
y = 0
img = img.crop((x,y, size+x, size+y))
output = StringIO()
img.save(output, format="PNG")
contents = output.getvalue()
output.close()
return ContentFile(contents)
except IOError:
return None
def test_file_handling(self):
fileobj = ContentFile("foo bar")
file1 = File.objects.create(
name='baz.js',
type='default',
size=7,
)
results = file1.putfile(fileobj, 3)
assert len(results) == 3
assert results[0].offset == 0
assert results[1].offset == 3
assert results[2].offset == 6
with file1.getfile() as fp:
assert fp.read() == 'foo bar'
fp.seek(2)
assert fp.read() == 'o bar'
fp.seek(0)
assert fp.read() == 'foo bar'
fp.seek(4)
assert fp.read() == 'bar'
def test_publish_xml_xlsform_download(self):
count = XForm.objects.count()
path = os.path.join(
self.this_directory, '..', '..', 'api', 'tests', 'fixtures',
'forms', 'contributions', 'contributions.xml')
f = open(path)
xml_file = ContentFile(f.read())
f.close()
xml_file.name = 'contributions.xml'
project = get_user_default_project(self.user)
self.xform = publish_xml_form(xml_file, self.user, project)
self.assertTrue(XForm.objects.count() > count)
response = self.client.get(reverse(download_xlsform, kwargs={
'username': self.user.username,
'id_string': 'contributions'
}), follow=True)
self.assertContains(response, 'No XLS file for your form ')
def _contributions_form_submissions(self):
count = XForm.objects.count()
path = os.path.join(os.path.dirname(__file__),
'..', 'fixtures', 'forms', 'contributions')
form_path = os.path.join(path, 'contributions.xml')
f = open(form_path)
xml_file = ContentFile(f.read())
f.close()
xml_file.name = 'contributions.xml'
project = get_user_default_project(self.user)
self.xform = publish_xml_form(xml_file, self.user, project)
self.assertTrue(XForm.objects.count() > count)
instances_path = os.path.join(path, 'instances')
for uuid in os.listdir(instances_path):
s_path = os.path.join(instances_path, uuid, 'submission.xml')
create_instance(self.user.username, open(s_path), [])
self.assertEqual(self.xform.instances.count(), 6)
def download_media_files(self, xml_doc, media_path):
for media_node in xml_doc.getElementsByTagName('mediaFile'):
filename_node = media_node.getElementsByTagName('filename')
url_node = media_node.getElementsByTagName('downloadUrl')
if filename_node and url_node:
filename = filename_node[0].childNodes[0].nodeValue
path = os.path.join(media_path, filename)
if default_storage.exists(path):
continue
download_url = url_node[0].childNodes[0].nodeValue
if self._get_media_response(download_url):
download_res = self._current_response
media_content = ContentFile(download_res.content)
default_storage.save(path, media_content)
self.logger.debug("Fetched %s." % filename)
else:
self.logger.error("Failed to fetch %s." % filename)
def _upload_instance(self, xml_file, instance_dir_path, files):
xml_doc = clean_and_parse_xml(xml_file.read())
xml = StringIO()
de_node = xml_doc.documentElement
for node in de_node.firstChild.childNodes:
xml.write(node.toxml())
new_xml_file = ContentFile(xml.getvalue())
new_xml_file.content_type = 'text/xml'
xml.close()
attachments = []
for attach in de_node.getElementsByTagName('mediaFile'):
filename_node = attach.getElementsByTagName('filename')
filename = filename_node[0].childNodes[0].nodeValue
if filename in files:
file_obj = default_storage.open(
os.path.join(instance_dir_path, filename))
mimetype, encoding = mimetypes.guess_type(file_obj.name)
media_obj = django_file(file_obj, 'media_files[]', mimetype)
attachments.append(media_obj)
create_instance(self.user.username, new_xml_file, attachments)
def __init__(self, file_contents=None, content_type=None, file_name=None, *args, **kwargs):
super(WsFileResponse, self).__init__(*args, **kwargs)
self.content = ContentFile(file_contents)
self["Content-Disposition"] = "attachment; filename=%s" % (file_name,)
self["Content-Type"] = content_type
# Static Methods
# Class Methods
# Public Methods
# Protected Methods
# Private Methods
# Properties
# Representation and Comparison
def value_from_datadict(self, data, files, name):
file_data = data["id_" + name]
"""
file_data: <file_name>:::data:<file content type>;base64,<base64 encoded file data>
Example : PG Deletion.txt:::data:text/plain;base64,UEcgRGVsZXRpb2tpIjsKCg==
"""
_data_list = file_data.split(":::")
if len(_data_list) == 1:
return None
file_name = _data_list[0]
_data_list = _data_list[1].split(";base64,")
file_extension = _data_list[0].split("data:")[1]
file_content = _data_list[1]
return ContentFile(base64.b64decode(file_content), name=file_name)
def test_assign(cls):
"""Tests whether the :see:LocalizedFileValueDescriptor works properly"""
temp_file = tempfile.NamedTemporaryFile(dir=MEDIA_ROOT)
instance = cls.FileFieldModel()
instance.file = {'en': temp_file.name}
assert isinstance(instance.file.en, LocalizedFieldFile)
assert instance.file.en.name == temp_file.name
field_dump = pickle.dumps(instance.file)
instance = cls.FileFieldModel()
instance.file = pickle.loads(field_dump)
assert instance.file.en.field == instance._meta.get_field('file')
assert instance.file.en.instance == instance
assert isinstance(instance.file.en, LocalizedFieldFile)
instance = cls.FileFieldModel()
instance.file = {'en': ContentFile("test", "testfilename")}
assert isinstance(instance.file.en, LocalizedFieldFile)
assert instance.file.en.name == "testfilename"
another_instance = cls.FileFieldModel()
another_instance.file = {'ro': instance.file.en}
assert another_instance == another_instance.file.ro.instance
assert another_instance.file.ro.lang == 'ro'
def duplicate(self, settings):
new_design = deepcopy(self)
new_design.pk = None
new_design.badge_settings = settings
for var in ('bg_front', 'bg_back'):
if getattr(self, var):
tmp = ContentFile(getattr(self, var).read())
tmp.name = os.path.basename(getattr(self, var).name)
setattr(new_design, var, tmp)
new_design.save()
return new_design
def save_file(self, name, content):
expected_path = original_storage.path(name)
obj = SimpleModel()
obj.upload_file.save(name, ContentFile(content))
return expected_path