python类from_buffer()的实例源码

helpers.py 文件源码 项目:PyCIRCLeanMail 作者: CIRCL 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, file_obj, orig_filename=None):
        self.file_obj = BytesIO(file_obj)
        self.orig_filename = orig_filename
        if self.orig_filename:
            self.final_filename = self.orig_filename
        else:
            self.final_filename = 'unknownfile.bin'
        self.log_details = {'origFilename': self.orig_filename}
        self.log_string = ''
        if self.orig_filename:
            a, self.extension = os.path.splitext(self.orig_filename)
        else:
            self.extension = None

        try:
            mt = magic.from_buffer(self.file_obj.getvalue(), mime=True)
        except UnicodeEncodeError as e:
            # FIXME: The encoding of the file is broken (possibly UTF-16)
            mt = ''
            self.log_details.update({'UnicodeError': e})
        try:
            self.mimetype = mt.decode("utf-8")
        except:
            self.mimetype = mt

        if self.mimetype and '/' in self.mimetype:
            self.main_type, self.sub_type = self.mimetype.split('/')
        else:
            self.main_type = ''
            self.sub_type = ''
viur.py 文件源码 项目:transfer 作者: viur-framework 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def genReqStr( params ):
        boundary_str = "---"+''.join( [ random.choice(string.ascii_lowercase+string.ascii_uppercase + string.digits) for x in range(13) ] )
        boundary = boundary_str.encode("UTF-8")
        res = b'Content-Type: multipart/mixed; boundary="'+boundary+b'"\nMIME-Version: 1.0\n'
        res += b'\n--'+boundary
        for(key, value) in list(params.items()):
            if all( [x in dir( value ) for x in ["name", "read", "mimetype"] ] ): #File
                dataVal = value.read()
                type = value.mimetype
                if type=="application/octet-stream":
                    type = magic.from_buffer(dataVal, mime=True)
                res += b'\nContent-Type: '+type.encode("UTF-8")+b'\nMIME-Version: 1.0\nContent-Disposition: form-data; name="'+key.encode("UTF-8")+b'"; filename="'+os.path.basename(value.name).decode(sys.getfilesystemencoding()).encode("UTF-8")+b'"\n\n'
                res += dataVal
                res += b'\n--'+boundary
            elif isinstance( value, list ):
                for val in value:
                    res += b'\nContent-Type: application/octet-stream\nMIME-Version: 1.0\nContent-Disposition: form-data; name="'+key.encode("UTF-8")+b'"\n\n'
                    if isinstance( val, unicode ):
                        res += val.encode("UTF-8")
                    else:
                        res += str(val)
                    res += b'\n--'+boundary
            else:
                res += b'\nContent-Type: application/octet-stream\nMIME-Version: 1.0\nContent-Disposition: form-data; name="'+key.encode("UTF-8")+b'"\n\n'
                if isinstance( value, unicode ):
                    res += unicode( value ).encode("UTF-8")
                else:
                    res += str( value )
                res += b'\n--'+boundary
        res += b'--\n'
        return( res, boundary )
validators.py 文件源码 项目:alexandriadocs 作者: srtab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __call__(self, value):
        # Check the content type
        mimetype = magic.from_buffer(value.read(1024), mime=True)
        if self.allowed_mimetypes and mimetype not in self.allowed_mimetypes:
            message = self.mime_message % {
                'mimetype': mimetype,
                'allowed_mimetypes': ', '.join(self.allowed_mimetypes)
            }
            raise ValidationError(message)
utils.py 文件源码 项目:django-content-gallery 作者: Kemaweyan 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def create_in_memory_image(image, name, size):
    """
    Resizes the image and saves it as InMemoryUploadedFile object
    Returns the InMemoryUploadedFile object with the image data
    """
    output = io.BytesIO()  # create an io object
    # resize the image and save it to the io object
    image_resize(image, output, size)
    # get MIME type of the image
    mime = magic.from_buffer(output.getvalue(), mime=True)
    # create InMemoryUploadedFile using data from the io
    return uploadedfile.InMemoryUploadedFile(output, 'ImageField', name,
        mime, sys.getsizeof(output), None)
problem.py 文件源码 项目:ijust_server 作者: koala-team 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def validate_file(self):
        data = self.body.data.read(16)
        self.body.data.seek(0)

        if not magic.from_buffer(data, mime=True) in self.allowed_extensions:
            return False
        return True
problem.py 文件源码 项目:ijust_server 作者: koala-team 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def validate_file(self):
        data = self.testcase.data.read(16)
        self.testcase.data.seek(0)

        if not magic.from_buffer(data, mime=True) in self.allowed_extensions:
            return False
        return True
submission.py 文件源码 项目:ijust_server 作者: koala-team 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def validate_file(self):
        data = self.code.data.read(16)
        self.code.data.seek(0)

        #if not magic.from_buffer(data, mime=True) in self.allowed_extensions:
        #    return False
        #return True
        return magic.from_buffer(data, mime=True).startswith('text/')
core.py 文件源码 项目:omnic 作者: michaelpb 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def detect(self, path):
        if os.path.isdir(path):
            return DIRECTORY
        with open(path, 'rb') as fd:
            mimetype = magic.from_buffer(fd.read(128), mime=True)
            if mimetype and mimetype not in UNKNOWN_MIMETYPE:
                return TypeString(mimetype)
models.py 文件源码 项目:munch-core 作者: crunchmail 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def mimetype(self):
        """ Readable mimetype, guessed from file content """
        self.file.open()
        return magic.from_buffer(self.file.read(1024), mime=True)
subject.py 文件源码 项目:panoptes-python-client 作者: zooniverse 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def add_location(self, location):
        """
        Add a media location to this subject.

        - **location** can be an open :py:class:`file` object, a path to a
          local file, or a :py:class:`dict` containing MIME types and URLs for
          remote media.

        Examples::

            subject.add_location(my_file)
            subject.add_location('/data/image.jpg')
            subject.add_location({'image/png': 'https://example.com/image.png'})
        """
        if type(location) is dict:
            self.locations.append(location)
            self._media_files.append(None)
            return
        elif type(location) in (str,) + _OLD_STR_TYPES:
            f = open(location, 'rb')
        else:
            f = location

        try:
            media_data = f.read()
            if MEDIA_TYPE_DETECTION == 'magic':
                media_type = magic.from_buffer(media_data, mime=True)
            else:
                media_type = 'image/{}'.format(imghdr.what(None, media_data))
            self.locations.append(media_type)
            self._media_files.append(media_data)
        finally:
            f.close()
channel.py 文件源码 项目:daisychain 作者: daisychainme 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def new_picture(self, fb_user, fields):
        if 'message' not in fields:
            fields['message'] = ''

        image_file = fields['image']
        image_file.seek(0)
        mime_type = magic.from_buffer(image_file.read(), mime=True)

        image_file.seek(0)

        post_data = [('access_token', (None, fb_user.access_token)),
                     ('source', (str(uuid4()) + '.' + mime_type.split('/')[1], image_file)),
                     ('caption', (None, fields['message'])),
                     ('message', (None, fields['message']))]

        try:
            fb_request_url = Config.get("API_BASE_URI") + "/{}/photos".format(fb_user.username)
            resp = requests.post(fb_request_url, files=post_data)

            if resp.ok and "post_id" in resp.json():
                log.info(_("A new Post with ID {} published!".format(resp.json()['post_id'])))
            else:
                raise Exception
        except Exception:
            pass
            log.error(_("A failure occurred while posting on Facebook : "
                        "called with data: {}\n response: {}".format(post_data,
                                                                     resp.json()
                                                                     )
                        )
                      )
        image_file.close()
        webhook_data = {
            "time": 0,
            "id": "101915710270588",
            "changed_fields": ["statuses"],
            "uid": fb_user.username
        }
        self.fire_trigger(webhook_data)
__init__.py 文件源码 项目:django-upload-validator 作者: mckinseyacademy 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __call__(self, fileobj):
        detected_type = magic.from_buffer(fileobj.read(READ_SIZE), mime=True)
        root, extension = os.path.splitext(fileobj.name.lower())

        # seek back to start so a valid file could be read
        # later without resetting the position
        fileobj.seek(0)

        # some versions of libmagic do not report proper mimes for Office subtypes
        # use detection details to transform it to proper mime
        if detected_type in ('application/octet-stream', 'application/vnd.ms-office'):
            detected_type = self.check_word_or_excel(fileobj, detected_type, extension)

        if detected_type not in self.allowed_mimes:
            # use more readable file type names for feedback message
            allowed_types = map(lambda mime_type: mime_type.split('/')[1], self.allowed_mimes)

            raise ValidationError(
                message=self.type_message,
                params={
                    'detected_type': detected_type,
                    'allowed_types': ', '.join(allowed_types)
                },
                code='invalid_type'
            )

        if self.allowed_exts and (extension not in self.allowed_exts):
            raise ValidationError(
                message=self.extension_message,
                params={
                    'extension': extension,
                    'allowed_extensions': ', '.join(self.allowed_exts)
                },
                code='invalid_extension'
            )
pescanner.py 文件源码 项目:codex-backend 作者: codexgigassys 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_filetype(data):
    """There are two versions of python-magic floating around, and annoyingly, the interface
        changed between versions, so we try one method and if it fails, then we try the other"""
        if sys.modules.has_key('magic'):
            try:
                ms = magic.open(magic.MAGIC_NONE)
                        ms.load()
                        return ms.buffer(data)
                except:
                    return magic.from_buffer(data)
InfoExtractor.py 文件源码 项目:codex-backend 作者: codexgigassys 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def MIME_TYPE(data, mime=True):
    try:
        return magic.from_buffer(data, mime=mime)
    except magic.MagicException:
        return "none/none"
utils.py 文件源码 项目:papis 作者: alejandrogallo 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def file_is(file_description, fmt):
    """Get if file stored in `file_path` is a `fmt` document.

    :file_path: Full path for a `fmt` file or a buffer containing `fmt` data.
    :returns: True if is `fmt` and False otherwise

    """
    import magic
    logger.debug("Checking filetype")
    if isinstance(file_description, str):
        # This means that the file_description is a string
        result = re.match(
            r".*%s.*" % fmt, magic.from_file(file_description, mime=True),
            re.IGNORECASE
        )
        if result:
            logger.debug(
                "File %s appears to be of type %s" % (file_description, fmt)
            )
    elif isinstance(file_description, bytes):
        # Suppose that file_description is a buffer
        result = re.match(
            r".*%s.*" % fmt, magic.from_buffer(file_description, mime=True)
        )
        if result:
            logger.debug(
                "Buffer appears to be of type %s" % (fmt)
            )
    return True if result else False
hashdd_filemagic.py 文件源码 项目:pyhashdd 作者: hashdd 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def process(self):
        return magic.from_buffer(self.buffer)
Load.py 文件源码 项目:ndeftool 作者: nfcpy 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def pack_file(f):
    record_data = f.read()
    record_type = magic.from_buffer(record_data, mime=1)
    record_name = getattr(f, 'name', '<stdin>')[0:255]
    return ndef.Record(record_type, record_name, record_data)
s3.py 文件源码 项目:curl2share 作者: cuongnv23 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def mimetype(file_header):
        """
        Detect mime type by reading file header.
        file_header: first bytes to read
        """
        return magic.from_buffer(file_header, mime=True)
filesystem.py 文件源码 项目:curl2share 作者: cuongnv23 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def mimetype(file_path):
        """
        Detect mime type by reading first 1024 bytes of file
        file_path: file to detect mime type
        """
        # read first 1024 bytes of file to determine mime type
        return magic.from_buffer(open(file_path, 'rb').read(1024), mime=True)
filetype.py 文件源码 项目:Intruder-detector-with-Raspberry-Pi-and-Pushbullet 作者: DeligenceTechnologies 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def _magic_get_file_type(f, _):
    file_type = magic.from_buffer(f.read(1024), mime=True)
    f.seek(0)
    return maybe_decode(file_type)


问题


面经


文章

微信
公众号

扫码关注公众号