def magic_mime_from_buffer(buffer: bytes) -> str:
"""
Try to detect mimetype using ``magic`` library.
.. warning:: :exc:`.OptionalPackageRequirementError` will be raised if ``python-magic`` is not installed.
:param buffer: buffer from header of file.
:return: The mimetype
"""
if magic is None: # pragma: no cover
raise OptionalPackageRequirementError('python-magic')
return magic.from_buffer(buffer, mime=True)
# wand image
python类from_buffer()的实例源码
def validate_file(form, field):
# File cannot end with a forbidden extension
filename, file_extension = os.path.splitext(field.data.filename)
if len(file_extension) > 0:
forbidden_ext = ForbiddenExtension.query.filter(
ForbiddenExtension.extension == file_extension[1:]).first()
if forbidden_ext is not None:
raise ValidationError('Extension not allowed')
mimedata = field.data
mimetype = magic.from_buffer(field.data.read(1024), mime=True)
# File Pointer returns to beginning
field.data.seek(0, 0)
# Check for permitted mimetype
forbidden_mime = ForbiddenMimeType.query.filter(
ForbiddenMimeType.mimetype == mimetype).first()
if forbidden_mime is not None:
raise ValidationError('File MimeType not allowed')
extension = mimetypes.guess_extension(mimetype)
if extension is not None:
forbidden_real = ForbiddenExtension.query.filter(
ForbiddenExtension.extension == extension[1:]).first()
if forbidden_real is not None:
raise ValidationError('Extension not allowed')
def write(self, data):
"""
Write data to multiple clouds.
Uses a write-through buffer to ensure each chunk is the proper size.
Close flushes the remainder of the buffer.
"""
if self._closed:
raise IOError('I/O operation on closed file.')
if self._size == 0:
# First block. See if we can get a more specific mime type by
# examining the data.
mime = magic.from_buffer(data, mime=True)
# Choose the better mimetype somehow, self.mime is determined by
# the filename. mime is determined by magic.
if not self.mime or mime != 'application/octet-strem':
self.mime = mime
self._size += len(data)
self._md5.update(data)
self._sha1.update(data)
self._write_chunk(data)
def write(self, data):
"""
Write data to multiple clouds.
Uses a write-through buffer to ensure each chunk is the proper size.
Close flushes the remainder of the buffer.
"""
if self._closed:
raise IOError('I/O operation on closed file.')
if self._size == 0:
# First block. See if we can get a more specific mime type by
# examining the data.
mime = magic.from_buffer(data, mime=True)
# Choose the better mimetype somehow, self.mime is determined by
# the filename. mime is determined by magic.
if not self.mime or mime != 'application/octet-strem':
self.mime = mime
self._size += len(data)
self._md5.update(data)
self._sha1.update(data)
self._write_chunk(data)
def write(self, data):
"""
Write data to multiple clouds.
Uses a write-through buffer to ensure each chunk is the proper size.
Close flushes the remainder of the buffer.
"""
if self._closed:
raise IOError('I/O operation on closed file.')
if self._size == 0:
# First block. See if we can get a more specific mime type by
# examining the data.
mime = magic.from_buffer(data, mime=True)
# Choose the better mimetype somehow, self.mime is determined by
# the filename. mime is determined by magic.
if not self.mime or mime != 'application/octet-strem':
self.mime = mime
self._size += len(data)
self._md5.update(data)
self._sha1.update(data)
self._write_chunk(data)
def clean_file(self):
content = self.cleaned_data[u'file']
filename, extension = os.path.splitext(content.name)
if self.check_extension:
if re.match(self._options['acceptFileTypes'], extension, flags=re.I) is None:
raise forms.ValidationError('acceptFileTypes')
if self.check_content_type:
content_type = magic.from_buffer(content.read(1024), mime=True)
if content_type.lower() in self._options['allowedContentTypes']:
if content._size > self._options['maxFileSize']:
raise forms.ValidationError("maxFileSize")
else:
raise forms.ValidationError("acceptFileTypes")
return content
def load_index(self):
try:
resp = self.boto.get_object(
Bucket=self.bucket,
Key=self.index_path(),
)
body = resp['Body'].read()
content_type = magic.from_buffer(body, mime=True)
if content_type == 'text/plain':
logger.debug('Detected plain text encoding for index')
return json.loads(body.decode('utf-8'))
elif content_type == 'application/zlib':
logger.debug('Detected zlib encoding for index')
body = zlib.decompress(body)
return json.loads(body.decode('utf-8'))
elif content_type == 'application/x-empty':
return {}
else:
raise ValueError('Unknown content type for index', content_type)
except (ClientError):
return {}
def get_filetype(data):
"""There are two versions of python-magic floating around, and annoyingly, the interface
changed between versions, so we try one method and if it fails, then we try the other.
NOTE: you may need to alter the magic_file for your system to point to the magic file."""
if sys.modules.has_key('magic'):
try:
ms = magic.open(magic.MAGIC_NONE)
ms.load()
return ms.buffer(data)
except:
try:
return magic.from_buffer(data)
except magic.MagicException:
magic_custom = magic.Magic(magic_file='C:\windows\system32\magic')
return magic_custom.from_buffer(data)
return ''
def store(self):
if len(self.data) >= self.MAX_SIZE:
raise TooBigMedia(self.identifying_name, self.MAX_SIZE)
mime = magic.from_buffer(self.data, mime=True)
if mime not in self.allowed_mimetypes:
raise InvalidMimeType(mime)
self.extension = mimetypes.guess_extension(mime)
# weirdness from mimetypes
if self.extension == '.jpe':
self.extension = '.jpeg'
checksum = hashlib.sha1(self.data).hexdigest()
fn = '{}{}'.format(checksum, self.extension)
img = Image(organization=self.organization)
img.file.save(fn, ContentFile(self.data))
return img.get_absolute_url()
def new_video(self, fb_user, fields):
if 'message' not in fields:
fields['message'] = ''
if 'title' not in fields:
fields['title'] = ''
video_file = fields['video']
video_file.seek(0)
mime_type = magic.from_buffer(video_file.read(), mime=True)
video_file.seek(0)
post_data = [('access_token', (None, fb_user.access_token)),
('source', (str(uuid4()) + '.' + mime_type.split('/')[1], video_file)),
('message', (None, fields['message']))]
try:
fb_request_url = Config.get("API_BASE_URI_VIDEO") + "/me/videos"
resp = requests.post(fb_request_url, files=post_data)
except Exception:
pass
log.error(_("A failure occurred while posting on Facebook : "
"called with data: {}".format(post_data)))
video_file.close()
def _get_filetype(self, data):
"""Gets filetype, uses libmagic if available.
@param data: data to be analyzed.
@return: file type or None.
"""
if not HAVE_MAGIC:
return None
try:
ms = magic.open(magic.MAGIC_NONE)
ms.load()
file_type = ms.buffer(data)
except:
try:
file_type = magic.from_buffer(data)
except Exception:
return None
finally:
try:
ms.close()
except:
pass
return file_type
def check_word_or_excel(self, fileobj, detected_type, extension):
"""
Returns proper mimetype in case of word or excel files
"""
word_strings = ['Microsoft Word', 'Microsoft Office Word', 'Microsoft Macintosh Word']
excel_strings = ['Microsoft Excel', 'Microsoft Office Excel', 'Microsoft Macintosh Excel']
office_strings = ['Microsoft OOXML']
file_type_details = magic.from_buffer(fileobj.read(READ_SIZE))
fileobj.seek(0)
if any(string in file_type_details for string in word_strings):
detected_type = 'application/msword'
elif any(string in file_type_details for string in excel_strings):
detected_type = 'application/vnd.ms-excel'
elif any(string in file_type_details for string in office_strings) or \
(detected_type == 'application/vnd.ms-office'):
if extension in ('.doc', '.docx'):
detected_type = 'application/msword'
if extension in ('.xls', '.xlsx'):
detected_type = 'application/vnd.ms-excel'
return detected_type
def run(self, directory):
for root, dirs, files in os.walk(directory, followlinks=True):
for name in files:
filename = os.path.join(root, name)
try:
file_type = magic.from_buffer(open(filename).read(1024))
except:
log("Error reading file %s: %s" % (filename, str(sys.exc_info()[1])))
continue
if is_executable(file_type):
md5_hash = md5(open(filename, "rb").read()).hexdigest()
if not self.is_file_indexed(md5_hash):
self.do_run(filename, file_type)
else:
log("File already indexed %s" % name)
#-------------------------------------------------------------------------------
def test_html_to_md(self):
"""HTML to MD"""
self.set_original_document_from_file('demo.html')
response = self.app.post('/convert/',
buffered=True,
content_type='multipart/form-data',
data={
'document': (self.original_document, self.original_document_name),
'from': 'html',
'to': 'md'
})
destination_document = response.data
assert '200' in response.status
assert 'octet-stream' in response.content_type
self.assertEqual(magic.from_buffer(destination_document, mime=True), 'text/x-c++')
assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_html_to_rst(self):
"""HTML to RST"""
self.set_original_document_from_file('demo.html')
response = self.app.post('/convert/',
buffered=True,
content_type='multipart/form-data',
data={
'document': (self.original_document, self.original_document_name),
'from': 'html',
'to': 'rst'
})
destination_document = response.data
assert '200' in response.status
assert 'octet-stream' in response.content_type
assert magic.from_buffer(destination_document, mime=True) == 'text/x-c'
assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_html_to_docx(self):
"""HTML to DOCX"""
self.set_original_document_from_file('demo.html')
response = self.app.post('/convert/',
buffered=True,
content_type='multipart/form-data',
data={
'document': (self.original_document, self.original_document_name),
'from': 'html',
'to': 'docx'
})
destination_document = response.data
assert '200' in response.status
assert 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' in response.content_type
self.assertEqual(magic.from_buffer(destination_document, mime=True), 'application/vnd.openxmlformats-officedocument.wordprocessingml.document')
assert os.listdir(app.config['CONVERSION_FOLDER']) == []
#
# From MD
#
def test_md_to_html(self):
"""MD to HTML"""
self.set_original_document_from_file('demo.md')
response = self.app.post('/convert/',
buffered=True,
content_type='multipart/form-data',
data={
'document': (self.original_document, self.original_document_name),
'from': 'md',
'to': 'html'
})
destination_document = response.data
assert '200' in response.status
assert 'text/html' in response.content_type
assert magic.from_buffer(destination_document, mime=True) == 'text/html'
assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_md_to_rst(self):
"""HTML to RST"""
self.set_original_document_from_file('demo.md')
response = self.app.post('/convert/',
buffered=True,
content_type='multipart/form-data',
data={
'document': (self.original_document, self.original_document_name),
'from': 'md',
'to': 'rst'
})
destination_document = response.data
assert '200' in response.status
assert 'octet-stream' in response.content_type
assert magic.from_buffer(destination_document, mime=True) == 'text/x-c'
assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_rst_to_html(self):
"""RST to HTML"""
self.set_original_document_from_file('demo.rst')
response = self.app.post('/convert/',
buffered=True,
content_type='multipart/form-data',
data={
'document': (self.original_document, self.original_document_name),
'from': 'rst',
'to': 'html'
})
destination_document = response.data
assert '200' in response.status
assert 'text/html' in response.content_type
assert magic.from_buffer(destination_document, mime=True) == 'text/html'
assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_rst_to_md(self):
"""RST to MD"""
self.set_original_document_from_file('demo.rst')
response = self.app.post('/convert/',
buffered=True,
content_type='multipart/form-data',
data={
'document': (self.original_document, self.original_document_name),
'from': 'rst',
'to': 'md'
})
destination_document = response.data
assert '200' in response.status
assert 'octet-stream' in response.content_type
assert magic.from_buffer(destination_document, mime=True) == 'text/plain'
assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_rst_to_docx(self):
"""RST to DOCX"""
self.set_original_document_from_file('demo.rst')
response = self.app.post('/convert/',
buffered=True,
content_type='multipart/form-data',
data={
'document': (self.original_document, self.original_document_name),
'from': 'rst',
'to': 'docx'
})
destination_document = response.data
assert '200' in response.status
assert 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' in response.content_type
assert magic.from_buffer(destination_document, mime=True) == 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
assert os.listdir(app.config['CONVERSION_FOLDER']) == []
#
# From DOCX
#
def test_docx_to_html(self):
"""DOCX to HTML"""
self.set_original_document_from_file('demo.docx')
response = self.app.post('/convert/',
buffered=True,
content_type='multipart/form-data',
data={
'document': (self.original_document, self.original_document_name),
'from': 'docx',
'to': 'html'
})
destination_document = response.data
assert '200' in response.status
assert 'text/html' in response.content_type
assert magic.from_buffer(destination_document, mime=True) == 'text/html'
assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_rst_to_html(self):
"""RST to HTML"""
self.set_original_document_from_file('demo.rst')
response = self.app.post('/convert/',
buffered=True,
content_type='multipart/form-data',
data={
'document': (self.original_document, self.original_document_name),
'from': 'rst',
'to': 'html'
})
destination_document = response.data
assert '200' in response.status
assert 'text/html' in response.content_type
assert magic.from_buffer(destination_document, mime=True) == 'text/html'
assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_docx_to_rst(self):
"""DOCX to RST"""
self.set_original_document_from_file('demo.docx')
response = self.app.post('/convert/',
buffered=True,
content_type='multipart/form-data',
data={
'document': (self.original_document, self.original_document_name),
'from': 'docx',
'to': 'rst'
})
destination_document = response.data
assert '200' in response.status
assert 'octet-stream' in response.content_type
assert magic.from_buffer(destination_document, mime=True) == 'text/plain'
assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_convert_from_string(self):
"""Convert from string"""
self.set_original_document_from_string('#1')
response = self.app.post('/convert/',
buffered=True,
content_type='multipart/form-data',
data={
'document': self.original_document_content_string,
'from': 'md',
'to': 'rst'
})
destination_document = response.data
assert '200' in response.status
assert 'octet-stream' in response.content_type
assert magic.from_buffer(destination_document, mime=True) == 'text/plain'
assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def _get_filetype(data):
"""Gets filetype, uses libmagic if available.
@param data: data to be analyzed.
@return: file type or None.
"""
if not HAVE_MAGIC:
return None
try:
ms = magic.open(magic.MAGIC_SYMLINK)
ms.load()
file_type = ms.buffer(data)
except:
try:
file_type = magic.from_buffer(data)
except Exception:
return None
finally:
try:
ms.close()
except:
pass
return file_type
def __call__(self, data):
if self.max_size is not None and data.size > self.max_size:
params = {
'max_size': filesizeformat(self.max_size),
'size': filesizeformat(data.size),
}
raise ValidationError(self.error_messages['max_size'], 'max_size', params)
if self.min_size is not None and data.size < self.min_size:
params = {
'min_size': filesizeformat(self.mix_size),
'size': filesizeformat(data.size)
}
raise ValidationError(self.error_messages['min_size'], 'min_size', params)
if self.content_types is not None and len(self.content_types):
content_type = magic.from_buffer(data.read(), mime=True)
data.seek(0) # seek to start for future mime checks by django
if content_type not in self.content_types:
params = {
'content_type': content_type
}
raise ValidationError(self.error_messages['content_type'], 'content_type', params)
def upload_image():
name = request.forms.get('name')
data = request.files.get('data')
if name and data and data.file:
raw = data.file.read()
filename = data.filename
save_path="{path}/{file}".format(
path=config.get('storage','imagesdir'),
file=filename
)
if not os.path.exists(config.get('storage','imagesdir')):
os.makedirs(save_path)
if 'image' not in magic.from_buffer(raw):
return HTTPResponse(status=400,body=json.dumps({'error' : 'file type is not allowed'}))
with open(save_path,'w') as open_file:
open_file.write(raw)
if queue.add_to_queue(queue_name='images',image=save_path):
return HTTPResponse(status=200,body=json.dumps({'status' : 'Image Stored'}))
else:
return HTTPResponse(status=500,body=json.dumps({'error' : 'Internal Server Error'}))
else:
return HTTPResponse(status=400,body=json.dumps({'error' : 'missing fields'}))
def get_type(data):
try:
ms = magic.open(magic.MAGIC_NONE)
ms.load()
file_type = ms.buffer(data)
except:
try:
file_type = magic.from_buffer(data)
except:
return ''
finally:
try:
ms.close()
except:
pass
return file_type
def validate_file_type_and_size(upload):
file_max_mb = 5
max_size = file_max_mb*10**6
fileformats = settings.FILE_ALIASES['*']['fileformats']
mimetypes = [mimetype for name, mimetype in fileformats]
names = [name for name, mimetype in fileformats]
errors = []
filetype = magic.from_buffer(upload.read(), mime=True)
if filetype.lower() not in mimetypes:
msg = _(
'Unsupported file format. Supported formats are {}.'.format(
', '.join(names)
)
)
errors.append(ValidationError(msg))
if upload.size > max_size:
msg = _('File should be at most {} MB'.format(file_max_mb))
errors.append(ValidationError(msg))
if errors:
raise ValidationError(errors)
return upload