def setUp(self):
self.superuser = create_superuser()
self.client.login(username='admin', password='secret')
self.img = create_image()
self.image_name = 'test_file.jpg'
self.filename = os.path.join(settings.FILE_UPLOAD_TEMP_DIR, self.image_name)
self.img.save(self.filename, 'JPEG')
self.file = DjangoFile(open(self.filename, 'rb'), name=self.image_name)
# This is actually a "file" for filer considerations
self.image = Image.objects.create(owner=self.superuser,
original_filename=self.image_name,
file=self.file)
self.clipboard = Clipboard.objects.create(user=self.superuser)
self.clipboard.append_file(self.image)
self.folder = Folder.objects.create(name='test_folder')
python类File()的实例源码
def test_file_change_upload_to_destination(self):
"""
Test that the file is actualy move from the private to the public
directory when the is_public is checked on an existing private file.
"""
file_obj = DjangoFile(open(self.filename, 'rb'), name=self.image_name)
image = Image.objects.create(owner=self.superuser,
is_public=False,
original_filename=self.image_name,
file=file_obj)
image.save()
self.assertTrue(image.file.path.startswith(filer_settings.FILER_PRIVATEMEDIA_STORAGE.location))
image.is_public = True
image.save()
self.assertTrue(image.file.path.startswith(filer_settings.FILER_PUBLICMEDIA_STORAGE.location))
self.assertEqual(len(image.icons), len(filer_settings.FILER_ADMIN_ICON_SIZES))
image.is_public = False
image.save()
self.assertTrue(image.file.path.startswith(filer_settings.FILER_PRIVATEMEDIA_STORAGE.location))
self.assertEqual(len(image.icons), len(filer_settings.FILER_ADMIN_ICON_SIZES))
def save(self, name, content, max_length=None):
"""
Saves new content to the file specified by name. The content should be
a proper File object or any python file-like object, ready to be read
from the beginning.
"""
# Get the proper name for the file, as it will actually be saved.
if name is None:
name = content.name
if not hasattr(content, 'chunks'):
content = File(content, name)
name = self.get_available_name(name, max_length=max_length)
return self._save(name, content)
# These methods are part of the public API, with default implementations.
def _get_encoded_attachments(self):
attachments = self.get_attachments()
if attachments:
new_attachments = []
for attachment in attachments:
if isinstance(attachment, File):
attachment.seek(0)
new_attachments.append((attachment.name, attachment.read(), guess_type(attachment.name)[0]))
else:
new_attachments.append(attachment)
attachments = new_attachments
return jsonpickle.dumps(attachments)
archivecalaccessprocessedfile.py 文件源码
项目:django-calaccess-processed-data
作者: california-civic-data-coalition
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def archive(self, processed_file):
"""
Write the .csv file and upload a copy to the archive.
"""
# Remove previous .CSV files
processed_file.file_archive.delete()
# Export a new one
processed_file.make_csv_copy()
# Open up the .CSV file for reading so we can wrap it in the Django File obj
with open(processed_file.csv_path, 'rb') as csv_file:
# Save the .CSV on the processed data file
processed_file.file_archive.save(
'%s.csv' % self.model_name,
File(csv_file),
)
# Save it to the model
processed_file.file_size = os.path.getsize(processed_file.csv_path)
processed_file.save()
return
def save(self, name, content, max_length=None):
"""
Saves new content to the file specified by name. The content should be
a proper File object or any python file-like object, ready to be read
from the beginning.
"""
# Get the proper name for the file, as it will actually be saved.
if name is None:
name = content.name
if not hasattr(content, 'chunks'):
content = File(content, name)
name = self.get_available_name(name, max_length=max_length)
return self._save(name, content)
# These methods are part of the public API, with default implementations.
def save(self, name, content, max_length=None):
"""
Saves new content to the file specified by name. The content should be
a proper File object or any python file-like object, ready to be read
from the beginning.
"""
# Get the proper name for the file, as it will actually be saved.
if name is None:
name = content.name
if not hasattr(content, 'chunks'):
content = File(content, name)
name = self.get_available_name(name, max_length=max_length)
return self._save(name, content)
# These methods are part of the public API, with default implementations.
def put(self, request):
try:
zip_file = request.FILES['file']
archive = zipfile.ZipFile(zip_file)
except (MultiValueDictKeyError, zipfile.BadZipfile):
raise NotZIPFileError
try:
csv_name = [item for item in archive.namelist() if item.endswith('csv')][0]
except IndexError:
raise NoCSVInArchiveFoundError
with archive.open(csv_name) as zip_csv_file:
# Convert zipfile handle to Django file handle
csv_file = File(zip_csv_file)
dataset = Dataset.objects.create(
name=zip_csv_file.name,
content=csv_file,
uploaded_by=request.user)
# Start tasks for feature calculation
initialize_from_dataset.delay(dataset_id=dataset.id)
serializer = DatasetSerializer(instance=dataset)
return Response(serializer.data)
def save(self, *args: typing.Any, **kwargs: typing.Any) -> None:
super(Gallery, self).save(*args, **kwargs)
if self.thumbnail_url and not self.thumbnail:
response = request_with_retries(
self.thumbnail_url,
{
'timeout': 25,
'stream': True
},
post=False,
)
if response:
disassembled = urlparse(self.thumbnail_url)
file_name = basename(disassembled.path)
lf = NamedTemporaryFile()
if response.status_code == requests.codes.ok:
for chunk in response.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
lf.write(chunk)
self.thumbnail.save(file_name, File(lf), save=False)
lf.close()
super(Gallery, self).save(force_update=True)
def save_img(self, img_link: str) -> None:
tf2 = NamedTemporaryFile()
request_file = requests.get(img_link, stream='True', timeout=25)
for chunk in request_file.iter_content(4096):
tf2.write(chunk)
self.image.save(os.path.splitext(img_link)[1], File(tf2), save=False)
tf2.close()
im = PImage.open(self.image.path)
if im.mode != 'RGB':
im = im.convert('RGB')
# large thumbnail
im.thumbnail((200, 290), PImage.ANTIALIAS)
thumb_relative_path = upload_announce_thumb_handler(self, os.path.splitext(img_link)[1])
thumb_fn = pjoin(settings.MEDIA_ROOT, thumb_relative_path)
os.makedirs(os.path.dirname(thumb_fn), exist_ok=True)
im.save(thumb_fn, "JPEG")
self.thumbnail.name = thumb_relative_path
self.save()
def copy_img(self, img_path: str) -> None:
tf2 = NamedTemporaryFile()
shutil.copy(img_path, tf2.name)
self.image.save(os.path.splitext(img_path)[1], File(tf2), save=False)
tf2.close()
im = PImage.open(self.image.path)
if im.mode != 'RGB':
im = im.convert('RGB')
# large thumbnail
im.thumbnail((200, 290), PImage.ANTIALIAS)
thumb_relative_path = upload_announce_thumb_handler(self, os.path.splitext(img_path)[1])
thumb_fn = pjoin(settings.MEDIA_ROOT, thumb_relative_path)
os.makedirs(os.path.dirname(thumb_fn), exist_ok=True)
im.save(thumb_fn, "JPEG")
self.thumbnail.name = thumb_relative_path
self.save()
def set_image(instance, field, width=1920, height=1440):
""" FileField / ImageField """
manager = getattr(instance, field.name)
try:
image_type = random.choice(['people', 'places', 'things'])
response = requests.get(
'https://placem.at/%s?w=%d&h=%d&random=1&txt=' % (image_type, width, height),
timeout=5,
stream=True
)
except (ConnectionError, Timeout):
response = requests.get('http://baconmockup.com/%d/%d/' % (width, height), stream=True)
tfp = tempfile.NamedTemporaryFile(delete=False)
with tfp:
for chunk in response.iter_content(1024 * 1024):
tfp.write(chunk)
tfp.seek(0)
manager.save('image.jpg', File(tfp), save=False)
def save(self, name, content, max_length=None):
"""
Saves new content to the file specified by name. The content should be
a proper File object or any python file-like object, ready to be read
from the beginning.
"""
# Get the proper name for the file, as it will actually be saved.
if name is None:
name = content.name
if not hasattr(content, 'chunks'):
content = File(content, name)
name = self.get_available_name(name, max_length=max_length)
return self._save(name, content)
# These methods are part of the public API, with default implementations.
forms.py 文件源码
项目:Django-Web-Development-with-Python
作者: PacktPublishing
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def save(self, commit=True):
instance = super(InspirationQuoteForm, self).save(commit=True)
if self.cleaned_data["delete_picture"] and instance.picture:
instance.picture.delete()
if self.cleaned_data["picture_path"]:
tmp_path = self.cleaned_data["picture_path"]
abs_tmp_path = os.path.join(settings.MEDIA_ROOT, tmp_path)
filename = InspirationQuote._meta.get_field("picture").upload_to(instance, tmp_path)
instance.picture.save(filename, File(open(abs_tmp_path, "rb")), False)
os.remove(abs_tmp_path)
instance.save()
return instance
def save(self, name, content, max_length=None):
"""
Saves new content to the file specified by name. The content should be
a proper File object or any python file-like object, ready to be read
from the beginning.
"""
# Get the proper name for the file, as it will actually be saved.
if name is None:
name = content.name
if not hasattr(content, 'chunks'):
content = File(content, name)
name = self.get_available_name(name, max_length=max_length)
return self._save(name, content)
# These methods are part of the public API, with default implementations.
def save(self, name, content, max_length=None):
"""
Saves new content to the file specified by name. The content should be
a proper File object or any python file-like object, ready to be read
from the beginning.
"""
# Get the proper name for the file, as it will actually be saved.
if name is None:
name = content.name
if not hasattr(content, 'chunks'):
content = File(content, name)
name = self.get_available_name(name, max_length=max_length)
return self._save(name, content)
# These methods are part of the public API, with default implementations.
def save(self, name, content, max_length=None):
"""
Saves new content to the file specified by name. The content should be
a proper File object or any python file-like object, ready to be read
from the beginning.
"""
# Get the proper name for the file, as it will actually be saved.
if name is None:
name = content.name
if not hasattr(content, 'chunks'):
content = File(content, name)
name = self.get_available_name(name, max_length=max_length)
return self._save(name, content)
# These methods are part of the public API, with default implementations.
def test_get_model_with_file(self):
emmen = Zoo(name='Wildlands Adventure Zoo Emmen')
with temp_imagefile(100, 200, 'jpeg') as file:
emmen.floor_plan.save('plan.jpg', File(file), save=False)
emmen.save()
response = self.client.get('/zoo/%d/' % emmen.id)
self.assertEqual(response.status_code, 200)
result = jsonloads(response.content)
self.assertEqual(emmen.id, result['data']['id'])
self.assertEqual(emmen.name, result['data']['name'], 'Wildlands Adventure Zoo Emmen')
self.assertEqual('/zoo/%d/floor_plan/' % emmen.id, result['data']['floor_plan'])
# This is a basic regression test for a bug due to the router
# singleton refactor, GET would crash if the model simply
# _contained_ a file attribute.
def test_get_related_model_with_file(self):
emmen = Zoo(name='Wildlands Adventure Zoo Emmen')
with temp_imagefile(100, 200, 'jpeg') as file:
emmen.floor_plan.save('plan.jpg', File(file), save=False)
emmen.save()
donald = Animal(name='Donald Duck', zoo=emmen)
donald.save()
response = self.client.get('/animal/%d/' % donald.id, data={'with': 'zoo'})
self.assertEqual(response.status_code, 200)
result = jsonloads(response.content)
self.assertEqual(donald.id, result['data']['id'])
self.assertEqual({'zoo': 'zoo'}, result['with_mapping'])
zoo = result['with']['zoo'][0]
self.assertEqual(emmen.id, zoo['id'])
self.assertEqual(emmen.name, zoo['name'], 'Wildlands Adventure Zoo Emmen')
self.assertEqual('/zoo/%d/floor_plan/' % emmen.id, zoo['floor_plan'])
# Same as above, but in multi-put's code path
def test_multi_put_model_with_existing_file(self):
emmen = Zoo(name='Wildlands Adventure Zoo Emmen')
with temp_imagefile(100, 200, 'jpeg') as file:
emmen.floor_plan.save('plan.jpg', File(file), save=False)
emmen.save()
model_data = {
'data': [{
'id': emmen.id,
'name': 'Wildlands!',
}]
}
response = self.client.put('/zoo/', data=json.dumps(model_data), content_type='application/json')
self.assertEqual(response.status_code, 200)
def test_get_model_with_file(self):
emmen = Zoo(name='Wildlands Adventure Zoo Emmen')
with temp_imagefile(100, 200, 'jpeg') as file:
emmen.floor_plan.save('plan.jpg', File(file), save=False)
emmen.save()
response = self.client.get('/zoo/%d/' % emmen.id)
self.assertEqual(response.status_code, 200)
result = jsonloads(response.content)
self.assertEqual(emmen.id, result['data']['id'])
self.assertEqual(emmen.name, result['data']['name'], 'Wildlands Adventure Zoo Emmen')
self.assertEqual('/zoo/%d/floor_plan/' % emmen.id, result['data']['floor_plan'])
# This is a basic regression test for a bug due to the router
# singleton refactor, GET would crash if the model simply
# _contained_ a file attribute.
def test_get_related_model_with_file(self):
emmen = Zoo(name='Wildlands Adventure Zoo Emmen')
with temp_imagefile(100, 200, 'jpeg') as file:
emmen.floor_plan.save('plan.jpg', File(file), save=False)
emmen.save()
donald = Animal(name='Donald Duck', zoo=emmen)
donald.save()
response = self.client.get('/animal/%d/' % donald.id, data={'with': 'zoo'})
self.assertEqual(response.status_code, 200)
result = jsonloads(response.content)
self.assertEqual(donald.id, result['data']['id'])
self.assertEqual({'zoo': 'zoo'}, result['with_mapping'])
zoo = result['with']['zoo'][0]
self.assertEqual(emmen.id, zoo['id'])
self.assertEqual(emmen.name, zoo['name'], 'Wildlands Adventure Zoo Emmen')
self.assertEqual('/zoo/%d/floor_plan/' % emmen.id, zoo['floor_plan'])
# Same as above, but in multi-put's code path
def save(self, name, content, max_length=None):
"""
Saves new content to the file specified by name. The content should be
a proper File object or any python file-like object, ready to be read
from the beginning.
"""
# Get the proper name for the file, as it will actually be saved.
if name is None:
name = content.name
if not hasattr(content, 'chunks'):
content = File(content, name)
name = self.get_available_name(name, max_length=max_length)
return self._save(name, content)
# These methods are part of the public API, with default implementations.
def setUp(self):
self.superuser = create_superuser()
self.client.login(username='admin', password='secret')
self.img = create_image()
self.image_name = 'test_file.jpg'
self.filename = os.path.join(settings.FILE_UPLOAD_TEMP_DIR, self.image_name)
self.img.save(self.filename, 'JPEG')
self.file = DjangoFile(open(self.filename, 'rb'), name=self.image_name)
# This is actually a "file" for filer considerations
self.image = Image.objects.create(owner=self.superuser,
original_filename=self.image_name,
file=self.file)
self.clipboard = Clipboard.objects.create(user=self.superuser)
self.clipboard.append_file(self.image)
self.folder = Folder.objects.create(name='test_folder')
def test_file_change_upload_to_destination(self):
"""
Test that the file is actualy move from the private to the public
directory when the is_public is checked on an existing private file.
"""
file_obj = DjangoFile(open(self.filename, 'rb'), name=self.image_name)
image = Image.objects.create(owner=self.superuser,
is_public=False,
original_filename=self.image_name,
file=file_obj)
image.save()
self.assertTrue(image.file.path.startswith(filer_settings.FILER_PRIVATEMEDIA_STORAGE.location))
image.is_public = True
image.save()
self.assertTrue(image.file.path.startswith(filer_settings.FILER_PUBLICMEDIA_STORAGE.location))
self.assertEqual(len(image.icons), len(filer_settings.FILER_ADMIN_ICON_SIZES))
image.is_public = False
image.save()
self.assertTrue(image.file.path.startswith(filer_settings.FILER_PRIVATEMEDIA_STORAGE.location))
self.assertEqual(len(image.icons), len(filer_settings.FILER_ADMIN_ICON_SIZES))
generate_initial_dictionaries.py 文件源码
项目:jatumba-backend
作者: YetAnotherTeam
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def handle(self, *args, **options):
base_dir = os.path.dirname(os.path.abspath(__file__))
dictionaries_path = os.path.join(base_dir, self.DATA_DIR)
instruments_dirs = os.listdir(dictionaries_path)
for instrument_dir in instruments_dirs:
# ?????????? ??????? ????? ? ?????.
if not instrument_dir.startswith('.'):
instrument, _ = Instrument.objects.get_or_create(name=instrument_dir)
sounds_path = os.path.join(dictionaries_path, instrument_dir)
sounds_names = [
sound_name for sound_name in os.listdir(sounds_path)
if os.path.isfile(os.path.join(sounds_path, sound_name)) and
not sound_name.startswith('.')
]
for sound_name in sounds_names:
with open(os.path.join(sounds_path, sound_name), 'rb') as sound_file:
wrapped_file = File(sound_file)
sound, _ = Sound.objects.get_or_create(
name=os.path.splitext(sound_name)[0],
instrument=instrument
)
sound.file = wrapped_file
sound.save()
self.stdout.write(self.style.SUCCESS('Successfully created'))
def save(self, name, content, max_length=None):
"""
Saves new content to the file specified by name. The content should be
a proper File object or any python file-like object, ready to be read
from the beginning.
"""
# Get the proper name for the file, as it will actually be saved.
if name is None:
name = content.name
if not hasattr(content, 'chunks'):
content = File(content, name)
name = self.get_available_name(name, max_length=max_length)
return self._save(name, content)
# These methods are part of the public API, with default implementations.
def get_or_import_file(self, options):
assert self.post_type == 'attachment'
if self.file:
return self.file
# download content into deleted temp_file
temp_file = NamedTemporaryFile(delete=True)
temp_file.write(urlopen(force_bytes(self.guid)).read())
temp_file.flush()
# create DjangoFile object
django_file = DjangoFile(temp_file, name=self.guid.split('/')[-1])
# choose folder
if self.parent:
folder = self.parent.get_or_create_folder(options)
else:
folder = options.file_folder
# import file
self.file = FileImporter().import_file(file_obj=django_file, folder=folder)
# set date and owner
self.file.created_at = self.pub_date
self.file.owner = self.created_by.user
self.file.save()
# return imported file
self.save()
return self.file
def _make_bound_form(email, file_attached=False, course="", program="", course_mode="", notify=""):
"""
Builds bound ManageLearnersForm.
"""
form_data = {
ManageLearnersForm.Fields.EMAIL_OR_USERNAME: email,
ManageLearnersForm.Fields.COURSE: course,
ManageLearnersForm.Fields.PROGRAM: program,
ManageLearnersForm.Fields.COURSE_MODE: course_mode,
ManageLearnersForm.Fields.NOTIFY: notify,
}
file_data = {}
if file_attached:
mock_file = mock.Mock(spec=File)
mock_file.name = "some_file.csv"
mock_file.read.return_value = "fake file contents"
file_data = {ManageLearnersForm.Fields.BULK_UPLOAD: mock_file}
customer = EnterpriseCustomerFactory(
catalog=99,
)
return ManageLearnersForm(form_data, file_data, enterprise_customer=customer)
def test_image_size(self, is_valid_image_size, image_size):
"""
Test image size in KB's, image_size < 512 KB.
Default valid max image size is 512 KB (512 * 1024 bytes).
See config `valid_max_image_size` in apps.py.
"""
file_mock = mock.MagicMock(spec=File, name="FileMock")
file_mock.name = "test1.png"
file_mock.size = image_size * 1024 # image size in bytes
branding_configuration = EnterpriseCustomerBrandingConfiguration(
enterprise_customer=factories.EnterpriseCustomerFactory(),
logo=file_mock
)
if not is_valid_image_size:
with self.assertRaises(ValidationError) as validation_error:
branding_configuration.full_clean()
expected_validation_message = 'The logo image file size must be less than or equal to 512 KB.'
self.assertEqual(validation_error.exception.messages[0], expected_validation_message)
else:
branding_configuration.full_clean() # exception here will fail the test