def save(self, commit=True):
instance = super(FormWithRequest, self).save(commit=False)
# Fix empty strings to None
for field in self.fields.keys():
if (hasattr(instance, field)
and (type(getattr(instance, field)) == unicode or type(getattr(instance, field)) == str)
and getattr(instance, field) == ''):
setattr(instance, field, None)
if (hasattr(instance, field)
and field in dir(self.Meta.model)
and type(self.Meta.model._meta.get_field(field)) == models.models.ImageField):
image = self.cleaned_data[field]
if image and (isinstance(image, InMemoryUploadedFile) or isinstance(image, TemporaryUploadedFile)):
filename = image.name
image = shrinkImageFromData(image.read(), filename)
setattr(instance, field, image)
if commit:
instance.save()
return instance
python类TemporaryUploadedFile()的实例源码
def saveImage(image):
pwd = os.getcwd()
filetype = image.content_type.split('/')[1]
filePath = os.path.abspath(os.path.dirname(pwd)) + '/Django/image/' + str(time.time()) + '.' + filetype
if isinstance(image, TemporaryUploadedFile):
temp_file = open(image.temporary_file_path(), 'rb+')
content = cStringIO.StringIO(temp_file.read())
image = Image.open(content)
temp_file.close()
# image.seek(0)
# img = Image.open(image)
# img.save(filePath)
default_storage.save(filePath, ContentFile(image.read()))
path = detectObject(filePath, filetype)
# new_img = Image.open(path)
fileName = path.split('/')[-1]
return fileName
# return HttpResponse("<img src="{% static img %}" style="width: 80%;alignment: center">")
def test_get_train_data(self):
"""
:return:
"""
base_list = ImageManager().search_all_database()
if "test" not in base_list:
self.assertEqual(ImageManager().create_database("test"), "test")
load_table = ImageManager().search_database("test")
if "test_table" not in load_table:
ImageManager().create_table("test", "test_table")
label_list = ImageManager().search_table("test", "test_table")
if "1" not in label_list:
ImageManager().create_label("test", "test_table", "1")
temp_file = TemporaryUploadedFile("img_test_data", "byte", 66666, "xxxxxxxxxxxxxxxxxx")
self.assertEqual(ImageManager().put_data("test", "1", [temp_file]), 1)
#ImageManager().load_data("test", "test_table", "1")
tfmsa_logger("==========PASS==========")
def upload_major_info(request):
if not request.user.is_superuser:
return Response(status=status.HTTP_403_FORBIDDEN)
upload = request.FILES.get('file', False)
if upload:
from django.core.files.uploadedfile import InMemoryUploadedFile, TemporaryUploadedFile
if type(upload) is InMemoryUploadedFile:
temp_file = upload
elif type(upload) is TemporaryUploadedFile:
temp_file = open(upload.temporary_file_path())
majors = json.load(temp_file)
try:
for major in majors:
Major.objects.create(major_short=major['major_short'], major_full=major['major_full'])
except IntegrityError as e:
print e.message
return Response(status=status.HTTP_201_CREATED)
else:
return Response(status=status.HTTP_400_BAD_REQUEST)
def upload_professor_info(request):
if not request.user.is_superuser:
return Response(status=status.HTTP_403_FORBIDDEN)
upload = request.FILES.get('file', False)
if upload:
from django.core.files.uploadedfile import InMemoryUploadedFile, TemporaryUploadedFile
if type(upload) is InMemoryUploadedFile:
temp_file = upload
elif type(upload) is TemporaryUploadedFile:
temp_file = open(upload.temporary_file_path())
professors = json.load(temp_file)
try:
for professor in professors:
name = professor['name'].upper().split()
email = professor.get('email', '')
office = professor.get('address', '')
Professor.objects.create(first_name=name[0], last_name=name[1], email=email, office=office)
except IntegrityError as e:
print e.message
return Response(status=status.HTTP_201_CREATED)
else:
return Response(status=status.HTTP_400_BAD_REQUEST)
def image_create(request, **kwargs):
"""Create image.
:param kwargs:
* copy_from: URL from which Glance server should immediately copy
the data and store it in its configured image store.
* data: Form data posted from client.
* location: URL where the data for this image already resides.
In the case of 'copy_from' and 'location', the Glance server
will give us a immediate response from create and handle the data
asynchronously.
In the case of 'data' the process of uploading the data may take
some time and is handed off to a separate thread.
"""
data = kwargs.pop('data', None)
image = glanceclient(request).images.create(**kwargs)
if data:
if isinstance(data, TemporaryUploadedFile):
# Hack to fool Django, so we can keep file open in the new thread.
data.file.close_called = True
if isinstance(data, InMemoryUploadedFile):
# Clone a new file for InMemeoryUploadedFile.
# Because the old one will be closed by Django.
data = SimpleUploadedFile(data.name,
data.read(),
data.content_type)
thread.start_new_thread(image_update,
(request, image.id),
{'data': data,
'purge_props': False})
return image
def new_file(self, file_name, *args, **kwargs):
"""
Create the file object to append to as data is coming in.
"""
super(TemporaryFileUploadHandler, self).new_file(file_name, *args, **kwargs)
self.file = TemporaryUploadedFile(self.file_name, self.content_type, 0, self.charset)
def test_upload_file(self):
host_name = "{0}:{1}".format(os.environ['HOSTNAME'], "8989")
resp = requests.post('http://' + host_name + '/api/v1/type/imagefile/base/mes' \
+ '/table/' + self.__class__.rand_name + '/label/1/')
temp_file = TemporaryUploadedFile("img_test_data", "byte", 66666, "xxxxxxxxxxxxxxxxxxxxxxxxxx")
self.assertEqual(ImageManager().put_data("mes", self.__class__.rand_name.join , "1" , [temp_file]), 1)
tfmsa_logger("==========PASS==========")
def upload_file(url, timeout=5):
"""
???????? ????? ?? ???? ?? ????????? ????.
??????:
from libs.upload import *
...
try:
uploaded_file = upload_file('http://host.ru/image.jpg')
except URLError as e:
return JsonResponse({
'message': str(e.msg),
}, status=e.code)
request.user.avatar.save(uploaded_file.name, uploaded_file, save=False)
uploaded_file.close()
try:
request.user.full_clean()
except ValidationError as e:
request.user.avatar.delete(save=False)
return JsonResponse({
'message': ', '.join(e.messages),
}, status=400)
else:
request.user.save()
"""
logger.debug('Uploading %s...', url)
with contextlib.closing(urlopen(url, timeout=timeout)) as fp:
headers = fp.info()
file_name = url.split('/')[-1]
content_type = headers.get('content-type')
file_size = headers.get('content-length')
charset = 'utf-8'
tmp = TemporaryUploadedFile(file_name, content_type, file_size, charset, {})
while True:
block = fp.read(8 * 1024)
if not block:
break
tmp.write(block)
logger.debug('Uploaded %s to file %s', url, tmp.file.name)
tmp.seek(0)
tmp.flush()
return tmp