def __init__(self, file_obj, orig_filename=None):
self.file_obj = BytesIO(file_obj)
self.orig_filename = orig_filename
if self.orig_filename:
self.final_filename = self.orig_filename
else:
self.final_filename = 'unknownfile.bin'
self.log_details = {'origFilename': self.orig_filename}
self.log_string = ''
if self.orig_filename:
a, self.extension = os.path.splitext(self.orig_filename)
else:
self.extension = None
try:
mt = magic.from_buffer(self.file_obj.getvalue(), mime=True)
except UnicodeEncodeError as e:
# FIXME: The encoding of the file is broken (possibly UTF-16)
mt = ''
self.log_details.update({'UnicodeError': e})
try:
self.mimetype = mt.decode("utf-8")
except:
self.mimetype = mt
if self.mimetype and '/' in self.mimetype:
self.main_type, self.sub_type = self.mimetype.split('/')
else:
self.main_type = ''
self.sub_type = ''
python类from_buffer()的实例源码
def genReqStr( params ):
boundary_str = "---"+''.join( [ random.choice(string.ascii_lowercase+string.ascii_uppercase + string.digits) for x in range(13) ] )
boundary = boundary_str.encode("UTF-8")
res = b'Content-Type: multipart/mixed; boundary="'+boundary+b'"\nMIME-Version: 1.0\n'
res += b'\n--'+boundary
for(key, value) in list(params.items()):
if all( [x in dir( value ) for x in ["name", "read", "mimetype"] ] ): #File
dataVal = value.read()
type = value.mimetype
if type=="application/octet-stream":
type = magic.from_buffer(dataVal, mime=True)
res += b'\nContent-Type: '+type.encode("UTF-8")+b'\nMIME-Version: 1.0\nContent-Disposition: form-data; name="'+key.encode("UTF-8")+b'"; filename="'+os.path.basename(value.name).decode(sys.getfilesystemencoding()).encode("UTF-8")+b'"\n\n'
res += dataVal
res += b'\n--'+boundary
elif isinstance( value, list ):
for val in value:
res += b'\nContent-Type: application/octet-stream\nMIME-Version: 1.0\nContent-Disposition: form-data; name="'+key.encode("UTF-8")+b'"\n\n'
if isinstance( val, unicode ):
res += val.encode("UTF-8")
else:
res += str(val)
res += b'\n--'+boundary
else:
res += b'\nContent-Type: application/octet-stream\nMIME-Version: 1.0\nContent-Disposition: form-data; name="'+key.encode("UTF-8")+b'"\n\n'
if isinstance( value, unicode ):
res += unicode( value ).encode("UTF-8")
else:
res += str( value )
res += b'\n--'+boundary
res += b'--\n'
return( res, boundary )
def __call__(self, value):
# Check the content type
mimetype = magic.from_buffer(value.read(1024), mime=True)
if self.allowed_mimetypes and mimetype not in self.allowed_mimetypes:
message = self.mime_message % {
'mimetype': mimetype,
'allowed_mimetypes': ', '.join(self.allowed_mimetypes)
}
raise ValidationError(message)
def create_in_memory_image(image, name, size):
"""
Resizes the image and saves it as InMemoryUploadedFile object
Returns the InMemoryUploadedFile object with the image data
"""
output = io.BytesIO() # create an io object
# resize the image and save it to the io object
image_resize(image, output, size)
# get MIME type of the image
mime = magic.from_buffer(output.getvalue(), mime=True)
# create InMemoryUploadedFile using data from the io
return uploadedfile.InMemoryUploadedFile(output, 'ImageField', name,
mime, sys.getsizeof(output), None)
def validate_file(self):
data = self.body.data.read(16)
self.body.data.seek(0)
if not magic.from_buffer(data, mime=True) in self.allowed_extensions:
return False
return True
def validate_file(self):
data = self.testcase.data.read(16)
self.testcase.data.seek(0)
if not magic.from_buffer(data, mime=True) in self.allowed_extensions:
return False
return True
def validate_file(self):
data = self.code.data.read(16)
self.code.data.seek(0)
#if not magic.from_buffer(data, mime=True) in self.allowed_extensions:
# return False
#return True
return magic.from_buffer(data, mime=True).startswith('text/')
def detect(self, path):
if os.path.isdir(path):
return DIRECTORY
with open(path, 'rb') as fd:
mimetype = magic.from_buffer(fd.read(128), mime=True)
if mimetype and mimetype not in UNKNOWN_MIMETYPE:
return TypeString(mimetype)
def mimetype(self):
""" Readable mimetype, guessed from file content """
self.file.open()
return magic.from_buffer(self.file.read(1024), mime=True)
def add_location(self, location):
"""
Add a media location to this subject.
- **location** can be an open :py:class:`file` object, a path to a
local file, or a :py:class:`dict` containing MIME types and URLs for
remote media.
Examples::
subject.add_location(my_file)
subject.add_location('/data/image.jpg')
subject.add_location({'image/png': 'https://example.com/image.png'})
"""
if type(location) is dict:
self.locations.append(location)
self._media_files.append(None)
return
elif type(location) in (str,) + _OLD_STR_TYPES:
f = open(location, 'rb')
else:
f = location
try:
media_data = f.read()
if MEDIA_TYPE_DETECTION == 'magic':
media_type = magic.from_buffer(media_data, mime=True)
else:
media_type = 'image/{}'.format(imghdr.what(None, media_data))
self.locations.append(media_type)
self._media_files.append(media_data)
finally:
f.close()
def new_picture(self, fb_user, fields):
if 'message' not in fields:
fields['message'] = ''
image_file = fields['image']
image_file.seek(0)
mime_type = magic.from_buffer(image_file.read(), mime=True)
image_file.seek(0)
post_data = [('access_token', (None, fb_user.access_token)),
('source', (str(uuid4()) + '.' + mime_type.split('/')[1], image_file)),
('caption', (None, fields['message'])),
('message', (None, fields['message']))]
try:
fb_request_url = Config.get("API_BASE_URI") + "/{}/photos".format(fb_user.username)
resp = requests.post(fb_request_url, files=post_data)
if resp.ok and "post_id" in resp.json():
log.info(_("A new Post with ID {} published!".format(resp.json()['post_id'])))
else:
raise Exception
except Exception:
pass
log.error(_("A failure occurred while posting on Facebook : "
"called with data: {}\n response: {}".format(post_data,
resp.json()
)
)
)
image_file.close()
webhook_data = {
"time": 0,
"id": "101915710270588",
"changed_fields": ["statuses"],
"uid": fb_user.username
}
self.fire_trigger(webhook_data)
def __call__(self, fileobj):
detected_type = magic.from_buffer(fileobj.read(READ_SIZE), mime=True)
root, extension = os.path.splitext(fileobj.name.lower())
# seek back to start so a valid file could be read
# later without resetting the position
fileobj.seek(0)
# some versions of libmagic do not report proper mimes for Office subtypes
# use detection details to transform it to proper mime
if detected_type in ('application/octet-stream', 'application/vnd.ms-office'):
detected_type = self.check_word_or_excel(fileobj, detected_type, extension)
if detected_type not in self.allowed_mimes:
# use more readable file type names for feedback message
allowed_types = map(lambda mime_type: mime_type.split('/')[1], self.allowed_mimes)
raise ValidationError(
message=self.type_message,
params={
'detected_type': detected_type,
'allowed_types': ', '.join(allowed_types)
},
code='invalid_type'
)
if self.allowed_exts and (extension not in self.allowed_exts):
raise ValidationError(
message=self.extension_message,
params={
'extension': extension,
'allowed_extensions': ', '.join(self.allowed_exts)
},
code='invalid_extension'
)
def get_filetype(data):
"""There are two versions of python-magic floating around, and annoyingly, the interface
changed between versions, so we try one method and if it fails, then we try the other"""
if sys.modules.has_key('magic'):
try:
ms = magic.open(magic.MAGIC_NONE)
ms.load()
return ms.buffer(data)
except:
return magic.from_buffer(data)
def MIME_TYPE(data, mime=True):
try:
return magic.from_buffer(data, mime=mime)
except magic.MagicException:
return "none/none"
def file_is(file_description, fmt):
"""Get if file stored in `file_path` is a `fmt` document.
:file_path: Full path for a `fmt` file or a buffer containing `fmt` data.
:returns: True if is `fmt` and False otherwise
"""
import magic
logger.debug("Checking filetype")
if isinstance(file_description, str):
# This means that the file_description is a string
result = re.match(
r".*%s.*" % fmt, magic.from_file(file_description, mime=True),
re.IGNORECASE
)
if result:
logger.debug(
"File %s appears to be of type %s" % (file_description, fmt)
)
elif isinstance(file_description, bytes):
# Suppose that file_description is a buffer
result = re.match(
r".*%s.*" % fmt, magic.from_buffer(file_description, mime=True)
)
if result:
logger.debug(
"Buffer appears to be of type %s" % (fmt)
)
return True if result else False
def process(self):
return magic.from_buffer(self.buffer)
def pack_file(f):
record_data = f.read()
record_type = magic.from_buffer(record_data, mime=1)
record_name = getattr(f, 'name', '<stdin>')[0:255]
return ndef.Record(record_type, record_name, record_data)
def mimetype(file_header):
"""
Detect mime type by reading file header.
file_header: first bytes to read
"""
return magic.from_buffer(file_header, mime=True)
def mimetype(file_path):
"""
Detect mime type by reading first 1024 bytes of file
file_path: file to detect mime type
"""
# read first 1024 bytes of file to determine mime type
return magic.from_buffer(open(file_path, 'rb').read(1024), mime=True)
filetype.py 文件源码
项目:Intruder-detector-with-Raspberry-Pi-and-Pushbullet
作者: DeligenceTechnologies
项目源码
文件源码
阅读 15
收藏 0
点赞 0
评论 0
def _magic_get_file_type(f, _):
file_type = magic.from_buffer(f.read(1024), mime=True)
f.seek(0)
return maybe_decode(file_type)