python类guess_extension()的实例源码

utils.py 文件源码 项目:Telethon 作者: LonamiWebs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_extension(media):
    """Gets the corresponding extension for any Telegram media"""

    # Photos are always compressed as .jpg by Telegram
    if isinstance(media, (UserProfilePhoto, ChatPhoto, MessageMediaPhoto)):
        return '.jpg'

    # Documents will come with a mime type
    if isinstance(media, MessageMediaDocument):
        if isinstance(media.document, Document):
            if media.document.mime_type == 'application/octet-stream':
                # Octet stream are just bytes, which have no default extension
                return ''
            else:
                extension = guess_extension(media.document.mime_type)
                return extension if extension else ''

    return ''
TicketsController.py 文件源码 项目:cerberus-core 作者: ovh 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _get_email_thread_attachment(ticket, email_category=None):

    try:
        _emails = ImplementationFactory.instance.get_singleton_of(
            'MailerServiceBase'
        ).get_emails(ticket)
    except (KeyError, MailerServiceException) as ex:
        raise InternalServerError(str(ex))

    emails = [email for email in _emails if email.category.lower() == email_category]

    try:
        content, filetype = utils.get_email_thread_content(ticket, emails)
    except (utils.EmailThreadTemplateNotFound, utils.EmailThreadTemplateSyntaxError) as ex:
        raise InternalServerError(str(ex))

    content = base64.b64encode(content)
    name = 'ticket_{}_emails_{}{}'.format(
        ticket.publicId,
        datetime.strftime(datetime.now(), '%d-%m-%Y_%H-%M-%S'),
        mimetypes.guess_extension(filetype),
    )

    return {'filetype': filetype, 'content': content, 'name': name}
forms.py 文件源码 项目:sample-platform 作者: CCExtractor 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def validate_file(form, field):
        # File cannot end with a forbidden extension
        filename, file_extension = os.path.splitext(field.data.filename)
        if len(file_extension) > 0:
            forbidden_ext = ForbiddenExtension.query.filter(
                ForbiddenExtension.extension == file_extension[1:]).first()
            if forbidden_ext is not None:
                raise ValidationError('Extension not allowed')
        mimedata = field.data
        mimetype = magic.from_buffer(field.data.read(1024), mime=True)
        # File Pointer returns to beginning
        field.data.seek(0, 0)
        # Check for permitted mimetype
        forbidden_mime = ForbiddenMimeType.query.filter(
            ForbiddenMimeType.mimetype == mimetype).first()
        if forbidden_mime is not None:
            raise ValidationError('File MimeType not allowed')
        extension = mimetypes.guess_extension(mimetype)
        if extension is not None:
            forbidden_real = ForbiddenExtension.query.filter(
                ForbiddenExtension.extension == extension[1:]).first()
            if forbidden_real is not None:
                raise ValidationError('Extension not allowed')
downloads.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def filename_from_url(url, content_type):
    fn = urlsplit(url).path.rstrip('/')
    fn = os.path.basename(fn) if fn else 'index'
    if '.' not in fn and content_type:
        content_type = content_type.split(';')[0]
        if content_type == 'text/plain':
            # mimetypes returns '.ksh'
            ext = '.txt'
        else:
            ext = mimetypes.guess_extension(content_type)

        if ext == '.htm':  # Python 3
            ext = '.html'

        if ext:
            fn += ext

    return fn
sonofmmm.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, config_file=None):
        super(SonOfMMM, self).__init__(config_file)
        self.log_file = '%s.log' % self.instance_id
        self.log_path = os.path.join(self.working_dir, self.log_file)
        boto.set_file_logger(self.name, self.log_path)
        if self.sd.has_option('ffmpeg_args'):
            self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
        else:
            self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
        self.output_mimetype = self.sd.get('output_mimetype')
        if self.sd.has_option('output_ext'):
            self.output_ext = self.sd.get('output_ext')
        else:
            self.output_ext = mimetypes.guess_extension(self.output_mimetype)
        self.output_bucket = self.sd.get_obj('output_bucket')
        self.input_bucket = self.sd.get_obj('input_bucket')
        # check to see if there are any messages queue
        # if not, create messages for all files in input_bucket
        m = self.input_queue.read(1)
        if not m:
            self.queue_files()
utils.py 文件源码 项目:tucluster 作者: JamesRamm 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def save_zip(self, stream, content_type, name=None):
        '''Save the zip stream to disk and extract its contents
        '''
        # Make sure the storage path exists
        ensure_dir(self._storage_path)

        ext = mimetypes.guess_extension(content_type)
        if not name:
            name = str(self._uuidgen())
        fname = '{uuid}{ext}'.format(uuid=name, ext=ext)
        archive_path = os.path.join(self._storage_path, fname)

        self._write(archive_path, stream)
        # extract the zip file
        directory = extract_model(archive_path, name, self._storage_path)
        return fmdb.id_from_path(directory), name
models.py 文件源码 项目:munch-core 作者: crunchmail 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def store(self):
        if len(self.data) >= self.MAX_SIZE:
            raise TooBigMedia(self.identifying_name, self.MAX_SIZE)

        mime = magic.from_buffer(self.data, mime=True)
        if mime not in self.allowed_mimetypes:
            raise InvalidMimeType(mime)

        self.extension = mimetypes.guess_extension(mime)

        # weirdness from mimetypes
        if self.extension == '.jpe':
            self.extension = '.jpeg'

        checksum = hashlib.sha1(self.data).hexdigest()
        fn = '{}{}'.format(checksum, self.extension)

        img = Image(organization=self.organization)
        img.file.save(fn, ContentFile(self.data))
        return img.get_absolute_url()
sonofmmm.py 文件源码 项目:learneveryword 作者: karan 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, config_file=None):
        super(SonOfMMM, self).__init__(config_file)
        self.log_file = '%s.log' % self.instance_id
        self.log_path = os.path.join(self.working_dir, self.log_file)
        boto.set_file_logger(self.name, self.log_path)
        if self.sd.has_option('ffmpeg_args'):
            self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
        else:
            self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
        self.output_mimetype = self.sd.get('output_mimetype')
        if self.sd.has_option('output_ext'):
            self.output_ext = self.sd.get('output_ext')
        else:
            self.output_ext = mimetypes.guess_extension(self.output_mimetype)
        self.output_bucket = self.sd.get_obj('output_bucket')
        self.input_bucket = self.sd.get_obj('input_bucket')
        # check to see if there are any messages queue
        # if not, create messages for all files in input_bucket
        m = self.input_queue.read(1)
        if not m:
            self.queue_files()
robot_console.py 文件源码 项目:flux_line_bot 作者: blesscat 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def fileinfo(self, path):
        info, images = self.robot_obj.file_info(path)
        for key, value in info.items():
            if len(value) > 30:
                logger.info("    :%s => %s", key, value[:30])
            else:
                logger.info("    :%s => %s", key, value)
        logger.info("%s" % info)

        previews = []
        for img in images:
            ext = mimetypes.guess_extension(img[0])
            if ext:
                ntf = NamedTemporaryFile(suffix=ext, delete=False)
                ntf.write(img[1])
                previews.append(ntf.name)
        if previews:
            os.system("open " + " ".join(previews))
secretary.py 文件源码 项目:pytemplate 作者: krotos139 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def add_media_to_archive(self, media, mime, name=''):
        """Adds to "Pictures" archive folder the file in `media` and register
        it into manifest file."""
        extension = None
        if hasattr(media, 'name') and not name:
            extension = path.splitext(media.name)
            name      = extension[0]
            extension = extension[1]

        if not extension:
            extension = guess_extension(mime)

        media_path = 'Pictures/%s%s' % (name, extension)
        media.seek(0)
        self.files[media_path] = media.read(-1)
        if hasattr(media, 'close'):
            media.close()

        files_node = self.manifest.getElementsByTagName('manifest:manifest')[0]
        node = self.create_node(self.manifest, 'manifest:file-entry', files_node)
        node.setAttribute('manifest:full-path', media_path)
        node.setAttribute('manifest:media-type', mime)

        return media_path
downloads.py 文件源码 项目:webapp 作者: superchilli 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def filename_from_url(url, content_type):
    fn = urlsplit(url).path.rstrip('/')
    fn = os.path.basename(fn) if fn else 'index'
    if '.' not in fn and content_type:
        content_type = content_type.split(';')[0]
        if content_type == 'text/plain':
            # mimetypes returns '.ksh'
            ext = '.txt'
        else:
            ext = mimetypes.guess_extension(content_type)

        if ext == '.htm':  # Python 3
            ext = '.html'

        if ext:
            fn += ext

    return fn
12-parse-email.py 文件源码 项目:xd 作者: century-arcade 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def generate_email_files(msg):
    counter = 1
    upload_date = time.mktime(email.utils.parsedate(msg["Date"]))
    for part in msg.walk():
        # multipart/* are just containers
        if part.get_content_maintype() == 'multipart':
            continue
        # Applications should really sanitize the given filename so that an
        # email message can't be used to overwrite important files
        filename = part.get_filename()
        if not filename:
            ext = mimetypes.guess_extension(part.get_content_type())
            if not ext:
                # Use a generic bag-of-bits extension
                ext = '.bin'
            filename = 'part-%03d%s' % (counter, ext)
        counter += 1

        data = part.get_payload(decode=True)
        if parse_pathname(filename).ext == '.zip':
            for zipfn, zipdata, zipdt in generate_zip_files(data):
                yield zipfn, zipdata, zipdt
        else:
            yield filename, data, upload_date
sonofmmm.py 文件源码 项目:Chromium_DepotTools 作者: p07r0457 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, config_file=None):
        Service.__init__(self, config_file)
        self.log_file = '%s.log' % self.instance_id
        self.log_path = os.path.join(self.working_dir, self.log_file)
        boto.set_file_logger(self.name, self.log_path)
        if self.sd.has_option('ffmpeg_args'):
            self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
        else:
            self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
        self.output_mimetype = self.sd.get('output_mimetype')
        if self.sd.has_option('output_ext'):
            self.output_ext = self.sd.get('output_ext')
        else:
            self.output_ext = mimetypes.guess_extension(self.output_mimetype)
        self.output_bucket = self.sd.get_obj('output_bucket')
        self.input_bucket = self.sd.get_obj('input_bucket')
        # check to see if there are any messages queue
        # if not, create messages for all files in input_bucket
        m = self.input_queue.read(1)
        if not m:
            self.queue_files()
sonofmmm.py 文件源码 项目:node-gn 作者: Shouqun 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, config_file=None):
        Service.__init__(self, config_file)
        self.log_file = '%s.log' % self.instance_id
        self.log_path = os.path.join(self.working_dir, self.log_file)
        boto.set_file_logger(self.name, self.log_path)
        if self.sd.has_option('ffmpeg_args'):
            self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
        else:
            self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
        self.output_mimetype = self.sd.get('output_mimetype')
        if self.sd.has_option('output_ext'):
            self.output_ext = self.sd.get('output_ext')
        else:
            self.output_ext = mimetypes.guess_extension(self.output_mimetype)
        self.output_bucket = self.sd.get_obj('output_bucket')
        self.input_bucket = self.sd.get_obj('input_bucket')
        # check to see if there are any messages queue
        # if not, create messages for all files in input_bucket
        m = self.input_queue.read(1)
        if not m:
            self.queue_files()
arachas.py 文件源码 项目:arachas 作者: GwentAPI 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run(self):
        while True:
            # The name will be used for saving the file
            name, url = self.imageQueue.get()
            res = requests.get(url, headers=HEADERS, timeout=TIMEOUT, stream=True)

            if res.status_code == 200:
                content_type = res.headers['content-type']
                # With the content type received from the web server, use mimetypes to guess the file extension.
                extension = mimetypes.guess_extension(content_type)

                filepath = os.path.join('./' + IMAGE_FOLDER + '/' + name + extension)
                with open(filepath, 'wb') as f:
                    # Stream the files.
                    for chunk in res:
                        f.write(chunk)
            # Notify that we have finished one task.
            self.imageQueue.task_done()

# Function to retrieve a list of URL for every pages of cards.
# The url parameter is the entry point of the website where we might extract the information.
sonofmmm.py 文件源码 项目:alfred-ec2 作者: SoMuchToGrok 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, config_file=None):
        super(SonOfMMM, self).__init__(config_file)
        self.log_file = '%s.log' % self.instance_id
        self.log_path = os.path.join(self.working_dir, self.log_file)
        boto.set_file_logger(self.name, self.log_path)
        if self.sd.has_option('ffmpeg_args'):
            self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
        else:
            self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
        self.output_mimetype = self.sd.get('output_mimetype')
        if self.sd.has_option('output_ext'):
            self.output_ext = self.sd.get('output_ext')
        else:
            self.output_ext = mimetypes.guess_extension(self.output_mimetype)
        self.output_bucket = self.sd.get_obj('output_bucket')
        self.input_bucket = self.sd.get_obj('input_bucket')
        # check to see if there are any messages queue
        # if not, create messages for all files in input_bucket
        m = self.input_queue.read(1)
        if not m:
            self.queue_files()
downloads.py 文件源码 项目:python-flask-security 作者: weinbergdavid 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def filename_from_url(url, content_type):
    fn = urlsplit(url).path.rstrip('/')
    fn = os.path.basename(fn) if fn else 'index'
    if '.' not in fn and content_type:
        content_type = content_type.split(';')[0]
        if content_type == 'text/plain':
            # mimetypes returns '.ksh'
            ext = '.txt'
        else:
            ext = mimetypes.guess_extension(content_type)

        if ext == '.htm':  # Python 3
            ext = '.html'

        if ext:
            fn += ext

    return fn
sonofmmm.py 文件源码 项目:depot_tools 作者: webrtc-uwp 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, config_file=None):
        Service.__init__(self, config_file)
        self.log_file = '%s.log' % self.instance_id
        self.log_path = os.path.join(self.working_dir, self.log_file)
        boto.set_file_logger(self.name, self.log_path)
        if self.sd.has_option('ffmpeg_args'):
            self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
        else:
            self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
        self.output_mimetype = self.sd.get('output_mimetype')
        if self.sd.has_option('output_ext'):
            self.output_ext = self.sd.get('output_ext')
        else:
            self.output_ext = mimetypes.guess_extension(self.output_mimetype)
        self.output_bucket = self.sd.get_obj('output_bucket')
        self.input_bucket = self.sd.get_obj('input_bucket')
        # check to see if there are any messages queue
        # if not, create messages for all files in input_bucket
        m = self.input_queue.read(1)
        if not m:
            self.queue_files()
utils.py 文件源码 项目:BitBot 作者: crack00r 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_extension(media):
    """Gets the corresponding extension for any Telegram media"""

    # Photos are always compressed as .jpg by Telegram
    if (isinstance(media, UserProfilePhoto) or isinstance(media, ChatPhoto) or
            isinstance(media, MessageMediaPhoto)):
        return '.jpg'

    # Documents will come with a mime type, from which we can guess their mime type
    if isinstance(media, MessageMediaDocument):
        extension = guess_extension(media.document.mime_type)
        return extension if extension else ''

    return None
parser.py 文件源码 项目:cerberus-core 作者: ovh 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def parse_attachment(part):
    """
        Get attachments of an email

        :param `Message` part: A `Message`
        :rtype: list
        :return: The list of attachments
    """
    attachment = {}
    attachment['content_type'] = part.get_content_type()

    if attachment['content_type'].lower() in ['message/rfc822', 'message/delivery-status']:
        attachment['content'] = str(part)
    else:
        attachment['content'] = part.get_payload(decode=True)

    filename = part.get_filename()

    if not filename:
        filename = hashlib.sha1(attachment['content']).hexdigest()
        if attachment['content_type']:
            extension = mimetypes.guess_extension(attachment['content_type'])
            if extension:
                filename += extension

    attachment['filename'] = get_valid_filename(utils.decode_every_charset_in_the_world(filename))
    return attachment
ffmpeg.py 文件源码 项目:embeddeddata 作者: toolforge 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def remux_detect(f):
    from detection.utils import filetype

    f = os.path.abspath(f)
    mime = filetype(f)
    ext = mimetypes.guess_extension(mime, strict=False)
    if ext:
        if ext[0] == '.':
            ext = ext[1:]
        if ext == 'ogx':
            ext = 'ogg'
    else:
        # naive get extension from mime
        ext = mime.split('/')[1]
        if ext[:2] == 'x-':
            ext = ext[2:]
    with tempfile.NamedTemporaryFile(suffix='.'+ext) as tmp:
        args = ['ffmpeg',
                '-loglevel', 'warning',
                '-y',
                '-i', f,
                '-c', 'copy',
                tmp.name]
        subprocess.call(args)

        size = os.path.getsize(tmp.name)
        if size:
            return size, False
images.py 文件源码 项目:deb-python-falcon 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def save(self, image_stream, image_content_type):
        ext = mimetypes.guess_extension(image_content_type)
        name = '{uuid}{ext}'.format(uuid=self._uuidgen(), ext=ext)
        image_path = os.path.join(self._storage_path, name)

        with self._fopen(image_path, 'wb') as image_file:
            while True:
                chunk = image_stream.read(self._CHUNK_SIZE_BYTES)
                if not chunk:
                    break

                image_file.write(chunk)

        return name
pomf.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def upload(url):
    cclive = subprocess.Popen("cclive --support | xargs | tr ' ' '|'", stdout=subprocess.PIPE, shell=True)
    (cclive_formats, err) = cclive.communicate()

    re_youtube = "youtube|youtu\.be|yooouuutuuube"
    search = ".*(?:{}|{}).*".format(re_youtube, cclive_formats)
    try:
        if re.match(search, url, re.I):
            if re.match(".*(?:{}).*".format(re_youtube), url, re.I):
                cmd = "youtube-dl --quiet --recode-video webm --format webm/mp4 --output /tmp/%\(id\)s.webm {}".format(url)
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
                yt = ".*(?:youtube.*?(?:v=|/v/)|youtu\.be/|yooouuutuuube.*?id=)([-_a-zA-Z0-9]+).*"
                file = "/tmp/{}.webm".format(re.match(yt, url, re.I).group(1))
            else:
                cmd = "cclive --quiet -f fmt43_360p {} --O /tmp/pomf.webm --exec 'echo -n %f'".format(url, "/tmp")
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
                (file, err) = p.communicate()
        else:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:37.0) Gecko/20100101 Firefox/37.0',
                'Referer': 'http://www.amazon.com/'
            }

            extension = guess_extension(guess_type(url)[0]).replace('jpe','jpg')
            temp = tempfile.NamedTemporaryFile(suffix=extension)
            content = requests.get(url).content
            temp.write(content)
            file = temp.name

        fh = open(file, "rb")
        fh.seek(0)

        content = requests.post(url="http://pomf.se/upload.php", files={"files[]":fh})
        if not content.status_code // 100 == 2:
            raise Exception("Unexpected response {}".format(content))
        return "http://a.pomf.se/{}".format(content.json()["files"][0]["url"])
    except Exception as e:
        return "Error: {}".format(e)
utilities.py 文件源码 项目:discordbot.py 作者: rauenzi 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def downloadImage(url, folder, name, loop, chunkSize=20):
    result = {'canAccessURL': False, 'isImage': False, 'fileSaved': False}
    headers = {
        'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
        'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
        'Accept-Encoding': 'none',
        'Accept-Language': 'en-US,en;q=0.8',
        'Connection': 'keep-alive'}
    async with aiohttp.ClientSession(loop=loop) as session:
        with aiohttp.Timeout(10, loop=session.loop):
            async with session.get(url, headers=headers) as response:
                content_type = response.headers['content-type']
                if response.status == 200:
                    result['canAccessURL'] = True
                if "image" in content_type:
                    result['isImage'] = True
                if not result['canAccessURL'] or not result['isImage']:
                    return result
                extension = mimetypes.guess_extension(content_type)
                if extension == '.jpe':
                    extension = '.jpg'

                with open(folder + "/" + name + extension, 'wb') as fd:
                    while True:
                        chunk = await response.content.read(chunkSize)
                        if not chunk:
                            break
                        fd.write(chunk)
                result['fileSaved'] = True
                return result
webutilities.py 文件源码 项目:discordbot.py 作者: rauenzi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def downloadImage(url, folder, name, loop, chunkSize=20):
        result = {'canAccessURL': False, 'isImage': False, 'fileSaved': False}
        headers = {
            'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
            'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
            'Accept-Encoding': 'none',
            'Accept-Language': 'en-US,en;q=0.8',
            'Connection': 'keep-alive'}
        async with aiohttp.ClientSession(loop=loop) as session:
            with aiohttp.Timeout(10, loop=session.loop):
                async with session.get(url, headers=headers) as response:
                    content_type = response.headers['content-type']
                    if response.status == 200:
                        result['canAccessURL'] = True
                    if "image" in content_type:
                        result['isImage'] = True
                    if not result['canAccessURL'] or not result['isImage']:
                        return result
                    extension = mimetypes.guess_extension(content_type)
                    if extension == '.jpe':
                        extension = '.jpg'

                    with open(folder + "/" + name + extension, 'wb') as fd:
                        while True:
                            chunk = await response.content.read(chunkSize)
                            if not chunk:
                                break
                            fd.write(chunk)
                    result['fileSaved'] = True
                    return result
core.py 文件源码 项目:hangoutsbot 作者: das7pad 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def upload_image(self, image_uri, sync, username, userid, channel_name):
        token = self.apikey
        logger.info('downloading %s', image_uri)
        filename = os.path.basename(image_uri)
        request = urllib.request.Request(image_uri)
        request.add_header("Authorization", "Bearer %s" % token)
        image_response = urllib.request.urlopen(request)
        content_type = image_response.info().get_content_type()

        filename_extension = mimetypes.guess_extension(content_type).lower() # returns with "."
        physical_extension = "." + filename.rsplit(".", 1).pop().lower()

        if physical_extension == filename_extension:
            pass
        elif filename_extension == ".jpe" and physical_extension in [ ".jpg", ".jpeg", ".jpe", ".jif", ".jfif" ]:
            # account for mimetypes idiosyncrancy to return jpe for valid jpeg
            pass
        else:
            logger.warning("unable to determine extension: {} {}".format(filename_extension, physical_extension))
            filename += filename_extension

        logger.info('uploading as %s', filename)
        image_id = yield from self.bot._client.upload_image(image_response, filename=filename)

        logger.info('sending HO message, image_id: %s', image_id)
        yield from sync._bridgeinstance._send_to_internal_chat(
            sync.hangoutid,
            "shared media from slack",
            {   "sync": sync,
                "source_user": username,
                "source_uid": userid,
                "source_title": channel_name },
            image_id=image_id )
image_downloader.py 文件源码 项目:Python-Scripts 作者: amitsagtani97 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def download(url, filename):
    file_url = requests.get(url)
    file_extension = mimetypes.guess_extension(file_url.headers['content-type'])
    with open(filename+file_extension, 'wb') as file:
        file.write(file_url.content)
typestring.py 文件源码 项目:omnic 作者: michaelpb 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, s):
        self.str = s

        # Extract arguments (anything that follows ':')
        if ':' in s:
            self.ts_format, _, arguments_str = s.partition(':')
            self.arguments = tuple(arguments_str.split(','))
        else:
            self.ts_format = s
            self.arguments = tuple()

        # Check if is mimetype, extension or qualifier
        self.is_qualifier = False
        self.mimetype = None
        self.extension = None
        if '/' in self.ts_format:
            self.mimetype = self.ts_format
            ext = mimetypes.guess_extension(self.mimetype)
            if ext:
                self.extension = ext.strip('.').upper()
        elif self.ts_format.isupper():
            self.extension = self.ts_format
            fn = 'fn.%s' % self.extension
            self.mimetype, _ = mimetypes.guess_type(fn)  # discard encoding
        else:
            # Is qualifier, can't determine mimetype OR extension
            self.is_qualifier = True
utils.py 文件源码 项目:osp-scraper 作者: opensyllabus 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def guess_extension(mimetype):
    """guess a file extension from mimetype, without leading `.`

    Returns `unknown` if an extension could not be guessed
    """
    x = (mimetypes.guess_extension(mimetype.split(';')[0]) or '.unknown')[1:]
    return x if x != 'htm' else 'html'
config.py 文件源码 项目:aioworkers 作者: aioworkers 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def load_conf(self, fd, *, path=None, mime_type=None, response=None):
        if isinstance(response, http.client.HTTPResponse):
            url = URL(response.geturl())
            self.uris.append(url)
            mime_type = response.headers.get('Content-Type')
            if mime_type:
                mime_type = mime_type.split(';')[0].strip()
            logger.info('Config found: {} [{}]'.format(url, mime_type))
        if path:
            loader = registry.get(path.suffix)
            path = path.absolute()
            self.uris.append(path)
            logger.info('Config found: {}'.format(path))
        elif mime_type in registry:
            loader = registry.get(mime_type)
        elif mimetypes.guess_extension(mime_type) in registry:
            loader = registry.get(mimetypes.guess_extension(mime_type))
        elif not mime_type:
            raise LookupError('Not found mime_type %s' % mime_type)
        else:
            raise NotImplemented
        if response is not None:
            return loader.load_bytes(response.read())
        elif fd is None:
            return loader.load_path(path)
        with fd:
            return loader.load_fd(fd)


问题


面经


文章

微信
公众号

扫码关注公众号