python类from_buffer()的实例源码

optionals.py 文件源码 项目:sqlalchemy-media 作者: pylover 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def magic_mime_from_buffer(buffer: bytes) -> str:
    """
    Try to detect mimetype using ``magic`` library.

    .. warning:: :exc:`.OptionalPackageRequirementError` will be raised if ``python-magic`` is not installed.

    :param buffer: buffer from header of file.

    :return: The mimetype
    """

    if magic is None:  # pragma: no cover
        raise OptionalPackageRequirementError('python-magic')

    return magic.from_buffer(buffer, mime=True)


# wand image
forms.py 文件源码 项目:sample-platform 作者: CCExtractor 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def validate_file(form, field):
        # File cannot end with a forbidden extension
        filename, file_extension = os.path.splitext(field.data.filename)
        if len(file_extension) > 0:
            forbidden_ext = ForbiddenExtension.query.filter(
                ForbiddenExtension.extension == file_extension[1:]).first()
            if forbidden_ext is not None:
                raise ValidationError('Extension not allowed')
        mimedata = field.data
        mimetype = magic.from_buffer(field.data.read(1024), mime=True)
        # File Pointer returns to beginning
        field.data.seek(0, 0)
        # Check for permitted mimetype
        forbidden_mime = ForbiddenMimeType.query.filter(
            ForbiddenMimeType.mimetype == mimetype).first()
        if forbidden_mime is not None:
            raise ValidationError('File MimeType not allowed')
        extension = mimetypes.guess_extension(mimetype)
        if extension is not None:
            forbidden_real = ForbiddenExtension.query.filter(
                ForbiddenExtension.extension == extension[1:]).first()
            if forbidden_real is not None:
                raise ValidationError('Extension not allowed')
__init__.py 文件源码 项目:cloudstrype 作者: btimby 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def write(self, data):
        """
        Write data to multiple clouds.

        Uses a write-through buffer to ensure each chunk is the proper size.
        Close flushes the remainder of the buffer.
        """
        if self._closed:
            raise IOError('I/O operation on closed file.')
        if self._size == 0:
            # First block. See if we can get a more specific mime type by
            # examining the data.
            mime = magic.from_buffer(data, mime=True)
            # Choose the better mimetype somehow, self.mime is determined by
            # the filename. mime is determined by magic.
            if not self.mime or mime != 'application/octet-strem':
                self.mime = mime
        self._size += len(data)
        self._md5.update(data)
        self._sha1.update(data)
        self._write_chunk(data)
__init__.py 文件源码 项目:cloudstrype 作者: btimby 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def write(self, data):
        """
        Write data to multiple clouds.

        Uses a write-through buffer to ensure each chunk is the proper size.
        Close flushes the remainder of the buffer.
        """
        if self._closed:
            raise IOError('I/O operation on closed file.')
        if self._size == 0:
            # First block. See if we can get a more specific mime type by
            # examining the data.
            mime = magic.from_buffer(data, mime=True)
            # Choose the better mimetype somehow, self.mime is determined by
            # the filename. mime is determined by magic.
            if not self.mime or mime != 'application/octet-strem':
                self.mime = mime
        self._size += len(data)
        self._md5.update(data)
        self._sha1.update(data)
        self._write_chunk(data)
__init__.py 文件源码 项目:cloudstrype 作者: btimby 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def write(self, data):
        """
        Write data to multiple clouds.

        Uses a write-through buffer to ensure each chunk is the proper size.
        Close flushes the remainder of the buffer.
        """
        if self._closed:
            raise IOError('I/O operation on closed file.')
        if self._size == 0:
            # First block. See if we can get a more specific mime type by
            # examining the data.
            mime = magic.from_buffer(data, mime=True)
            # Choose the better mimetype somehow, self.mime is determined by
            # the filename. mime is determined by magic.
            if not self.mime or mime != 'application/octet-strem':
                self.mime = mime
        self._size += len(data)
        self._md5.update(data)
        self._sha1.update(data)
        self._write_chunk(data)
forms.py 文件源码 项目:multiuploader 作者: vinaypost 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def clean_file(self):

        content = self.cleaned_data[u'file']
        filename, extension = os.path.splitext(content.name)

        if self.check_extension:
            if re.match(self._options['acceptFileTypes'], extension, flags=re.I) is None:
                raise forms.ValidationError('acceptFileTypes')

        if self.check_content_type:
            content_type = magic.from_buffer(content.read(1024), mime=True)
            if content_type.lower() in self._options['allowedContentTypes']:
                if content._size > self._options['maxFileSize']:
                    raise forms.ValidationError("maxFileSize")
            else:
                raise forms.ValidationError("acceptFileTypes")

        return content
s3.py 文件源码 项目:S4 作者: MichaelAquilina 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load_index(self):
        try:
            resp = self.boto.get_object(
                Bucket=self.bucket,
                Key=self.index_path(),
            )
            body = resp['Body'].read()
            content_type = magic.from_buffer(body, mime=True)
            if content_type == 'text/plain':
                logger.debug('Detected plain text encoding for index')
                return json.loads(body.decode('utf-8'))
            elif content_type == 'application/zlib':
                logger.debug('Detected zlib encoding for index')
                body = zlib.decompress(body)
                return json.loads(body.decode('utf-8'))
            elif content_type == 'application/x-empty':
                return {}
            else:
                raise ValueError('Unknown content type for index', content_type)
        except (ClientError):
            return {}
pescanner.py 文件源码 项目:analyst-scripts 作者: Te-k 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_filetype(data):
    """There are two versions of python-magic floating around, and annoyingly, the interface
    changed between versions, so we try one method and if it fails, then we try the other.
    NOTE: you may need to alter the magic_file for your system to point to the magic file."""
    if sys.modules.has_key('magic'):
        try:
            ms = magic.open(magic.MAGIC_NONE)
            ms.load()
            return ms.buffer(data)
        except:
            try:
                return magic.from_buffer(data)
            except magic.MagicException:
                magic_custom = magic.Magic(magic_file='C:\windows\system32\magic')
                return magic_custom.from_buffer(data)
    return ''
models.py 文件源码 项目:munch-core 作者: crunchmail 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def store(self):
        if len(self.data) >= self.MAX_SIZE:
            raise TooBigMedia(self.identifying_name, self.MAX_SIZE)

        mime = magic.from_buffer(self.data, mime=True)
        if mime not in self.allowed_mimetypes:
            raise InvalidMimeType(mime)

        self.extension = mimetypes.guess_extension(mime)

        # weirdness from mimetypes
        if self.extension == '.jpe':
            self.extension = '.jpeg'

        checksum = hashlib.sha1(self.data).hexdigest()
        fn = '{}{}'.format(checksum, self.extension)

        img = Image(organization=self.organization)
        img.file.save(fn, ContentFile(self.data))
        return img.get_absolute_url()
channel.py 文件源码 项目:daisychain 作者: daisychainme 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def new_video(self, fb_user, fields):
        if 'message' not in fields:
            fields['message'] = ''
        if 'title' not in fields:
            fields['title'] = ''

        video_file = fields['video']
        video_file.seek(0)
        mime_type = magic.from_buffer(video_file.read(), mime=True)
        video_file.seek(0)

        post_data = [('access_token', (None, fb_user.access_token)),
                     ('source', (str(uuid4()) + '.' + mime_type.split('/')[1], video_file)),
                     ('message', (None, fields['message']))]

        try:
            fb_request_url = Config.get("API_BASE_URI_VIDEO") + "/me/videos"
            resp = requests.post(fb_request_url, files=post_data)
        except Exception:
            pass
            log.error(_("A failure occurred while posting on Facebook : "
                        "called with data: {}".format(post_data)))
        video_file.close()
static.py 文件源码 项目:cuckoodroid-2.0 作者: idanr1986 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_filetype(self, data):
        """Gets filetype, uses libmagic if available.
        @param data: data to be analyzed.
        @return: file type or None.
        """
        if not HAVE_MAGIC:
            return None

        try:
            ms = magic.open(magic.MAGIC_NONE)
            ms.load()
            file_type = ms.buffer(data)
        except:
            try:
                file_type = magic.from_buffer(data)
            except Exception:
                return None
        finally:
            try:
                ms.close()
            except:
                pass

        return file_type
__init__.py 文件源码 项目:django-upload-validator 作者: mckinseyacademy 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def check_word_or_excel(self, fileobj, detected_type, extension):
        """
        Returns proper mimetype in case of word or excel files
        """
        word_strings = ['Microsoft Word', 'Microsoft Office Word', 'Microsoft Macintosh Word']
        excel_strings = ['Microsoft Excel', 'Microsoft Office Excel', 'Microsoft Macintosh Excel']
        office_strings = ['Microsoft OOXML']

        file_type_details = magic.from_buffer(fileobj.read(READ_SIZE))

        fileobj.seek(0)

        if any(string in file_type_details for string in word_strings):
            detected_type = 'application/msword'
        elif any(string in file_type_details for string in excel_strings):
            detected_type = 'application/vnd.ms-excel'
        elif any(string in file_type_details for string in office_strings) or \
                (detected_type == 'application/vnd.ms-office'):
            if extension in ('.doc', '.docx'):
                detected_type = 'application/msword'
            if extension in ('.xls', '.xlsx'):
                detected_type = 'application/vnd.ms-excel'

        return detected_type
diaphora_index_batch.py 文件源码 项目:maltindex 作者: joxeankoret 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self, directory):
    for root, dirs, files in os.walk(directory, followlinks=True):
      for name in files:
        filename = os.path.join(root, name)
        try:
          file_type = magic.from_buffer(open(filename).read(1024))
        except:
          log("Error reading file %s: %s" % (filename, str(sys.exc_info()[1])))
          continue

        if is_executable(file_type):
          md5_hash = md5(open(filename, "rb").read()).hexdigest()
          if not self.is_file_indexed(md5_hash):
            self.do_run(filename, file_type)
          else:
            log("File already indexed %s" % name)

#-------------------------------------------------------------------------------
main.py 文件源码 项目:chalupas 作者: Antojitos 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_html_to_md(self):
        """HTML to MD"""

        self.set_original_document_from_file('demo.html')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'html',
                    'to': 'md'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'octet-stream' in response.content_type
        self.assertEqual(magic.from_buffer(destination_document, mime=True), 'text/x-c++')
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
main.py 文件源码 项目:chalupas 作者: Antojitos 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_html_to_rst(self):
        """HTML to RST"""

        self.set_original_document_from_file('demo.html')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'html',
                    'to': 'rst'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'octet-stream' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/x-c'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
main.py 文件源码 项目:chalupas 作者: Antojitos 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_html_to_docx(self):
        """HTML to DOCX"""

        self.set_original_document_from_file('demo.html')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'html',
                    'to': 'docx'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' in response.content_type
        self.assertEqual(magic.from_buffer(destination_document, mime=True), 'application/vnd.openxmlformats-officedocument.wordprocessingml.document')
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []

    #
    # From MD
    #
main.py 文件源码 项目:chalupas 作者: Antojitos 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_md_to_html(self):
        """MD to HTML"""

        self.set_original_document_from_file('demo.md')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'md',
                    'to': 'html'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'text/html' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/html'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
main.py 文件源码 项目:chalupas 作者: Antojitos 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_md_to_rst(self):
        """HTML to RST"""

        self.set_original_document_from_file('demo.md')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'md',
                    'to': 'rst'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'octet-stream' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/x-c'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
main.py 文件源码 项目:chalupas 作者: Antojitos 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_rst_to_html(self):
        """RST to HTML"""

        self.set_original_document_from_file('demo.rst')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'rst',
                    'to': 'html'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'text/html' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/html'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
main.py 文件源码 项目:chalupas 作者: Antojitos 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_rst_to_md(self):
        """RST to MD"""

        self.set_original_document_from_file('demo.rst')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'rst',
                    'to': 'md'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'octet-stream' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/plain'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
main.py 文件源码 项目:chalupas 作者: Antojitos 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_rst_to_docx(self):
        """RST to DOCX"""

        self.set_original_document_from_file('demo.rst')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'rst',
                    'to': 'docx'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []

    #
    # From DOCX
    #
main.py 文件源码 项目:chalupas 作者: Antojitos 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_docx_to_html(self):
        """DOCX to HTML"""

        self.set_original_document_from_file('demo.docx')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'docx',
                    'to': 'html'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'text/html' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/html'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
main.py 文件源码 项目:chalupas 作者: Antojitos 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_rst_to_html(self):
        """RST to HTML"""

        self.set_original_document_from_file('demo.rst')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'rst',
                    'to': 'html'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'text/html' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/html'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
main.py 文件源码 项目:chalupas 作者: Antojitos 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_docx_to_rst(self):
        """DOCX to RST"""

        self.set_original_document_from_file('demo.docx')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'docx',
                    'to': 'rst'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'octet-stream' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/plain'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
main.py 文件源码 项目:chalupas 作者: Antojitos 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_convert_from_string(self):
        """Convert from string"""

        self.set_original_document_from_string('#1')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': self.original_document_content_string,
                    'from': 'md',
                    'to': 'rst'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'octet-stream' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/plain'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
static.py 文件源码 项目:CAPE 作者: ctxis 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _get_filetype(data):
    """Gets filetype, uses libmagic if available.
    @param data: data to be analyzed.
    @return: file type or None.
    """
    if not HAVE_MAGIC:
        return None

    try:
        ms = magic.open(magic.MAGIC_SYMLINK)
        ms.load()
        file_type = ms.buffer(data)
    except:
        try:
            file_type = magic.from_buffer(data)
        except Exception:
            return None
    finally:
        try:
            ms.close()
        except:
            pass

    return file_type
validators.py 文件源码 项目:wypok 作者: arturtamborski 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __call__(self, data):
        if self.max_size is not None and data.size > self.max_size:
            params = {
                'max_size': filesizeformat(self.max_size),
                'size': filesizeformat(data.size),
            }
            raise ValidationError(self.error_messages['max_size'], 'max_size', params)

        if self.min_size is not None and data.size < self.min_size:
            params = {
                'min_size': filesizeformat(self.mix_size),
                'size': filesizeformat(data.size)
            }
            raise ValidationError(self.error_messages['min_size'], 'min_size', params)

        if self.content_types is not None and len(self.content_types):
            content_type = magic.from_buffer(data.read(), mime=True)
            data.seek(0) # seek to start for future mime checks by django

            if content_type not in self.content_types:
                params = {
                    'content_type': content_type
                }
                raise ValidationError(self.error_messages['content_type'], 'content_type', params)
main.py 文件源码 项目:auto-tagging-image-search 作者: OsamaJBR 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def upload_image():
    name = request.forms.get('name')
    data = request.files.get('data')
    if name and data and data.file:
        raw = data.file.read()
        filename = data.filename
        save_path="{path}/{file}".format(
                path=config.get('storage','imagesdir'),
                file=filename
                )
        if not os.path.exists(config.get('storage','imagesdir')):
            os.makedirs(save_path)
        if 'image' not in magic.from_buffer(raw):
            return HTTPResponse(status=400,body=json.dumps({'error' : 'file type is not allowed'}))
        with open(save_path,'w') as open_file:
            open_file.write(raw)
        if queue.add_to_queue(queue_name='images',image=save_path):
            return HTTPResponse(status=200,body=json.dumps({'status' : 'Image Stored'}))
        else:
            return HTTPResponse(status=500,body=json.dumps({'error' : 'Internal Server Error'}))
    else:
        return HTTPResponse(status=400,body=json.dumps({'error' : 'missing fields'}))
utils.py 文件源码 项目:Snakepit 作者: K4lium 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_type(data):
    try:
        ms = magic.open(magic.MAGIC_NONE)
        ms.load()
        file_type = ms.buffer(data)
    except:
        try:
            file_type = magic.from_buffer(data)
        except:
            return ''
    finally:
        try:
            ms.close()
        except:
            pass

    return file_type
validators.py 文件源码 项目:a4-opin 作者: liqd 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def validate_file_type_and_size(upload):

    file_max_mb = 5
    max_size = file_max_mb*10**6
    fileformats = settings.FILE_ALIASES['*']['fileformats']
    mimetypes = [mimetype for name, mimetype in fileformats]
    names = [name for name, mimetype in fileformats]

    errors = []

    filetype = magic.from_buffer(upload.read(), mime=True)
    if filetype.lower() not in mimetypes:
        msg = _(
            'Unsupported file format. Supported formats are {}.'.format(
                ', '.join(names)
            )
        )
        errors.append(ValidationError(msg))
    if upload.size > max_size:
        msg = _('File should be at most {} MB'.format(file_max_mb))
        errors.append(ValidationError(msg))
    if errors:
        raise ValidationError(errors)
    return upload


问题


面经


文章

微信
公众号

扫码关注公众号