def get_extension(media):
"""Gets the corresponding extension for any Telegram media"""
# Photos are always compressed as .jpg by Telegram
if isinstance(media, (UserProfilePhoto, ChatPhoto, MessageMediaPhoto)):
return '.jpg'
# Documents will come with a mime type
if isinstance(media, MessageMediaDocument):
if isinstance(media.document, Document):
if media.document.mime_type == 'application/octet-stream':
# Octet stream are just bytes, which have no default extension
return ''
else:
extension = guess_extension(media.document.mime_type)
return extension if extension else ''
return ''
python类guess_extension()的实例源码
def _get_email_thread_attachment(ticket, email_category=None):
try:
_emails = ImplementationFactory.instance.get_singleton_of(
'MailerServiceBase'
).get_emails(ticket)
except (KeyError, MailerServiceException) as ex:
raise InternalServerError(str(ex))
emails = [email for email in _emails if email.category.lower() == email_category]
try:
content, filetype = utils.get_email_thread_content(ticket, emails)
except (utils.EmailThreadTemplateNotFound, utils.EmailThreadTemplateSyntaxError) as ex:
raise InternalServerError(str(ex))
content = base64.b64encode(content)
name = 'ticket_{}_emails_{}{}'.format(
ticket.publicId,
datetime.strftime(datetime.now(), '%d-%m-%Y_%H-%M-%S'),
mimetypes.guess_extension(filetype),
)
return {'filetype': filetype, 'content': content, 'name': name}
def validate_file(form, field):
# File cannot end with a forbidden extension
filename, file_extension = os.path.splitext(field.data.filename)
if len(file_extension) > 0:
forbidden_ext = ForbiddenExtension.query.filter(
ForbiddenExtension.extension == file_extension[1:]).first()
if forbidden_ext is not None:
raise ValidationError('Extension not allowed')
mimedata = field.data
mimetype = magic.from_buffer(field.data.read(1024), mime=True)
# File Pointer returns to beginning
field.data.seek(0, 0)
# Check for permitted mimetype
forbidden_mime = ForbiddenMimeType.query.filter(
ForbiddenMimeType.mimetype == mimetype).first()
if forbidden_mime is not None:
raise ValidationError('File MimeType not allowed')
extension = mimetypes.guess_extension(mimetype)
if extension is not None:
forbidden_real = ForbiddenExtension.query.filter(
ForbiddenExtension.extension == extension[1:]).first()
if forbidden_real is not None:
raise ValidationError('Extension not allowed')
def filename_from_url(url, content_type):
fn = urlsplit(url).path.rstrip('/')
fn = os.path.basename(fn) if fn else 'index'
if '.' not in fn and content_type:
content_type = content_type.split(';')[0]
if content_type == 'text/plain':
# mimetypes returns '.ksh'
ext = '.txt'
else:
ext = mimetypes.guess_extension(content_type)
if ext == '.htm': # Python 3
ext = '.html'
if ext:
fn += ext
return fn
def __init__(self, config_file=None):
super(SonOfMMM, self).__init__(config_file)
self.log_file = '%s.log' % self.instance_id
self.log_path = os.path.join(self.working_dir, self.log_file)
boto.set_file_logger(self.name, self.log_path)
if self.sd.has_option('ffmpeg_args'):
self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
else:
self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
self.output_mimetype = self.sd.get('output_mimetype')
if self.sd.has_option('output_ext'):
self.output_ext = self.sd.get('output_ext')
else:
self.output_ext = mimetypes.guess_extension(self.output_mimetype)
self.output_bucket = self.sd.get_obj('output_bucket')
self.input_bucket = self.sd.get_obj('input_bucket')
# check to see if there are any messages queue
# if not, create messages for all files in input_bucket
m = self.input_queue.read(1)
if not m:
self.queue_files()
def save_zip(self, stream, content_type, name=None):
'''Save the zip stream to disk and extract its contents
'''
# Make sure the storage path exists
ensure_dir(self._storage_path)
ext = mimetypes.guess_extension(content_type)
if not name:
name = str(self._uuidgen())
fname = '{uuid}{ext}'.format(uuid=name, ext=ext)
archive_path = os.path.join(self._storage_path, fname)
self._write(archive_path, stream)
# extract the zip file
directory = extract_model(archive_path, name, self._storage_path)
return fmdb.id_from_path(directory), name
def store(self):
if len(self.data) >= self.MAX_SIZE:
raise TooBigMedia(self.identifying_name, self.MAX_SIZE)
mime = magic.from_buffer(self.data, mime=True)
if mime not in self.allowed_mimetypes:
raise InvalidMimeType(mime)
self.extension = mimetypes.guess_extension(mime)
# weirdness from mimetypes
if self.extension == '.jpe':
self.extension = '.jpeg'
checksum = hashlib.sha1(self.data).hexdigest()
fn = '{}{}'.format(checksum, self.extension)
img = Image(organization=self.organization)
img.file.save(fn, ContentFile(self.data))
return img.get_absolute_url()
def __init__(self, config_file=None):
super(SonOfMMM, self).__init__(config_file)
self.log_file = '%s.log' % self.instance_id
self.log_path = os.path.join(self.working_dir, self.log_file)
boto.set_file_logger(self.name, self.log_path)
if self.sd.has_option('ffmpeg_args'):
self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
else:
self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
self.output_mimetype = self.sd.get('output_mimetype')
if self.sd.has_option('output_ext'):
self.output_ext = self.sd.get('output_ext')
else:
self.output_ext = mimetypes.guess_extension(self.output_mimetype)
self.output_bucket = self.sd.get_obj('output_bucket')
self.input_bucket = self.sd.get_obj('input_bucket')
# check to see if there are any messages queue
# if not, create messages for all files in input_bucket
m = self.input_queue.read(1)
if not m:
self.queue_files()
def fileinfo(self, path):
info, images = self.robot_obj.file_info(path)
for key, value in info.items():
if len(value) > 30:
logger.info(" :%s => %s", key, value[:30])
else:
logger.info(" :%s => %s", key, value)
logger.info("%s" % info)
previews = []
for img in images:
ext = mimetypes.guess_extension(img[0])
if ext:
ntf = NamedTemporaryFile(suffix=ext, delete=False)
ntf.write(img[1])
previews.append(ntf.name)
if previews:
os.system("open " + " ".join(previews))
def add_media_to_archive(self, media, mime, name=''):
"""Adds to "Pictures" archive folder the file in `media` and register
it into manifest file."""
extension = None
if hasattr(media, 'name') and not name:
extension = path.splitext(media.name)
name = extension[0]
extension = extension[1]
if not extension:
extension = guess_extension(mime)
media_path = 'Pictures/%s%s' % (name, extension)
media.seek(0)
self.files[media_path] = media.read(-1)
if hasattr(media, 'close'):
media.close()
files_node = self.manifest.getElementsByTagName('manifest:manifest')[0]
node = self.create_node(self.manifest, 'manifest:file-entry', files_node)
node.setAttribute('manifest:full-path', media_path)
node.setAttribute('manifest:media-type', mime)
return media_path
def filename_from_url(url, content_type):
fn = urlsplit(url).path.rstrip('/')
fn = os.path.basename(fn) if fn else 'index'
if '.' not in fn and content_type:
content_type = content_type.split(';')[0]
if content_type == 'text/plain':
# mimetypes returns '.ksh'
ext = '.txt'
else:
ext = mimetypes.guess_extension(content_type)
if ext == '.htm': # Python 3
ext = '.html'
if ext:
fn += ext
return fn
def generate_email_files(msg):
counter = 1
upload_date = time.mktime(email.utils.parsedate(msg["Date"]))
for part in msg.walk():
# multipart/* are just containers
if part.get_content_maintype() == 'multipart':
continue
# Applications should really sanitize the given filename so that an
# email message can't be used to overwrite important files
filename = part.get_filename()
if not filename:
ext = mimetypes.guess_extension(part.get_content_type())
if not ext:
# Use a generic bag-of-bits extension
ext = '.bin'
filename = 'part-%03d%s' % (counter, ext)
counter += 1
data = part.get_payload(decode=True)
if parse_pathname(filename).ext == '.zip':
for zipfn, zipdata, zipdt in generate_zip_files(data):
yield zipfn, zipdata, zipdt
else:
yield filename, data, upload_date
def __init__(self, config_file=None):
Service.__init__(self, config_file)
self.log_file = '%s.log' % self.instance_id
self.log_path = os.path.join(self.working_dir, self.log_file)
boto.set_file_logger(self.name, self.log_path)
if self.sd.has_option('ffmpeg_args'):
self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
else:
self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
self.output_mimetype = self.sd.get('output_mimetype')
if self.sd.has_option('output_ext'):
self.output_ext = self.sd.get('output_ext')
else:
self.output_ext = mimetypes.guess_extension(self.output_mimetype)
self.output_bucket = self.sd.get_obj('output_bucket')
self.input_bucket = self.sd.get_obj('input_bucket')
# check to see if there are any messages queue
# if not, create messages for all files in input_bucket
m = self.input_queue.read(1)
if not m:
self.queue_files()
def __init__(self, config_file=None):
Service.__init__(self, config_file)
self.log_file = '%s.log' % self.instance_id
self.log_path = os.path.join(self.working_dir, self.log_file)
boto.set_file_logger(self.name, self.log_path)
if self.sd.has_option('ffmpeg_args'):
self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
else:
self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
self.output_mimetype = self.sd.get('output_mimetype')
if self.sd.has_option('output_ext'):
self.output_ext = self.sd.get('output_ext')
else:
self.output_ext = mimetypes.guess_extension(self.output_mimetype)
self.output_bucket = self.sd.get_obj('output_bucket')
self.input_bucket = self.sd.get_obj('input_bucket')
# check to see if there are any messages queue
# if not, create messages for all files in input_bucket
m = self.input_queue.read(1)
if not m:
self.queue_files()
def run(self):
while True:
# The name will be used for saving the file
name, url = self.imageQueue.get()
res = requests.get(url, headers=HEADERS, timeout=TIMEOUT, stream=True)
if res.status_code == 200:
content_type = res.headers['content-type']
# With the content type received from the web server, use mimetypes to guess the file extension.
extension = mimetypes.guess_extension(content_type)
filepath = os.path.join('./' + IMAGE_FOLDER + '/' + name + extension)
with open(filepath, 'wb') as f:
# Stream the files.
for chunk in res:
f.write(chunk)
# Notify that we have finished one task.
self.imageQueue.task_done()
# Function to retrieve a list of URL for every pages of cards.
# The url parameter is the entry point of the website where we might extract the information.
def __init__(self, config_file=None):
super(SonOfMMM, self).__init__(config_file)
self.log_file = '%s.log' % self.instance_id
self.log_path = os.path.join(self.working_dir, self.log_file)
boto.set_file_logger(self.name, self.log_path)
if self.sd.has_option('ffmpeg_args'):
self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
else:
self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
self.output_mimetype = self.sd.get('output_mimetype')
if self.sd.has_option('output_ext'):
self.output_ext = self.sd.get('output_ext')
else:
self.output_ext = mimetypes.guess_extension(self.output_mimetype)
self.output_bucket = self.sd.get_obj('output_bucket')
self.input_bucket = self.sd.get_obj('input_bucket')
# check to see if there are any messages queue
# if not, create messages for all files in input_bucket
m = self.input_queue.read(1)
if not m:
self.queue_files()
def filename_from_url(url, content_type):
fn = urlsplit(url).path.rstrip('/')
fn = os.path.basename(fn) if fn else 'index'
if '.' not in fn and content_type:
content_type = content_type.split(';')[0]
if content_type == 'text/plain':
# mimetypes returns '.ksh'
ext = '.txt'
else:
ext = mimetypes.guess_extension(content_type)
if ext == '.htm': # Python 3
ext = '.html'
if ext:
fn += ext
return fn
def __init__(self, config_file=None):
Service.__init__(self, config_file)
self.log_file = '%s.log' % self.instance_id
self.log_path = os.path.join(self.working_dir, self.log_file)
boto.set_file_logger(self.name, self.log_path)
if self.sd.has_option('ffmpeg_args'):
self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
else:
self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
self.output_mimetype = self.sd.get('output_mimetype')
if self.sd.has_option('output_ext'):
self.output_ext = self.sd.get('output_ext')
else:
self.output_ext = mimetypes.guess_extension(self.output_mimetype)
self.output_bucket = self.sd.get_obj('output_bucket')
self.input_bucket = self.sd.get_obj('input_bucket')
# check to see if there are any messages queue
# if not, create messages for all files in input_bucket
m = self.input_queue.read(1)
if not m:
self.queue_files()
def get_extension(media):
"""Gets the corresponding extension for any Telegram media"""
# Photos are always compressed as .jpg by Telegram
if (isinstance(media, UserProfilePhoto) or isinstance(media, ChatPhoto) or
isinstance(media, MessageMediaPhoto)):
return '.jpg'
# Documents will come with a mime type, from which we can guess their mime type
if isinstance(media, MessageMediaDocument):
extension = guess_extension(media.document.mime_type)
return extension if extension else ''
return None
def parse_attachment(part):
"""
Get attachments of an email
:param `Message` part: A `Message`
:rtype: list
:return: The list of attachments
"""
attachment = {}
attachment['content_type'] = part.get_content_type()
if attachment['content_type'].lower() in ['message/rfc822', 'message/delivery-status']:
attachment['content'] = str(part)
else:
attachment['content'] = part.get_payload(decode=True)
filename = part.get_filename()
if not filename:
filename = hashlib.sha1(attachment['content']).hexdigest()
if attachment['content_type']:
extension = mimetypes.guess_extension(attachment['content_type'])
if extension:
filename += extension
attachment['filename'] = get_valid_filename(utils.decode_every_charset_in_the_world(filename))
return attachment
def remux_detect(f):
from detection.utils import filetype
f = os.path.abspath(f)
mime = filetype(f)
ext = mimetypes.guess_extension(mime, strict=False)
if ext:
if ext[0] == '.':
ext = ext[1:]
if ext == 'ogx':
ext = 'ogg'
else:
# naive get extension from mime
ext = mime.split('/')[1]
if ext[:2] == 'x-':
ext = ext[2:]
with tempfile.NamedTemporaryFile(suffix='.'+ext) as tmp:
args = ['ffmpeg',
'-loglevel', 'warning',
'-y',
'-i', f,
'-c', 'copy',
tmp.name]
subprocess.call(args)
size = os.path.getsize(tmp.name)
if size:
return size, False
def save(self, image_stream, image_content_type):
ext = mimetypes.guess_extension(image_content_type)
name = '{uuid}{ext}'.format(uuid=self._uuidgen(), ext=ext)
image_path = os.path.join(self._storage_path, name)
with self._fopen(image_path, 'wb') as image_file:
while True:
chunk = image_stream.read(self._CHUNK_SIZE_BYTES)
if not chunk:
break
image_file.write(chunk)
return name
def upload(url):
cclive = subprocess.Popen("cclive --support | xargs | tr ' ' '|'", stdout=subprocess.PIPE, shell=True)
(cclive_formats, err) = cclive.communicate()
re_youtube = "youtube|youtu\.be|yooouuutuuube"
search = ".*(?:{}|{}).*".format(re_youtube, cclive_formats)
try:
if re.match(search, url, re.I):
if re.match(".*(?:{}).*".format(re_youtube), url, re.I):
cmd = "youtube-dl --quiet --recode-video webm --format webm/mp4 --output /tmp/%\(id\)s.webm {}".format(url)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
yt = ".*(?:youtube.*?(?:v=|/v/)|youtu\.be/|yooouuutuuube.*?id=)([-_a-zA-Z0-9]+).*"
file = "/tmp/{}.webm".format(re.match(yt, url, re.I).group(1))
else:
cmd = "cclive --quiet -f fmt43_360p {} --O /tmp/pomf.webm --exec 'echo -n %f'".format(url, "/tmp")
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
(file, err) = p.communicate()
else:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:37.0) Gecko/20100101 Firefox/37.0',
'Referer': 'http://www.amazon.com/'
}
extension = guess_extension(guess_type(url)[0]).replace('jpe','jpg')
temp = tempfile.NamedTemporaryFile(suffix=extension)
content = requests.get(url).content
temp.write(content)
file = temp.name
fh = open(file, "rb")
fh.seek(0)
content = requests.post(url="http://pomf.se/upload.php", files={"files[]":fh})
if not content.status_code // 100 == 2:
raise Exception("Unexpected response {}".format(content))
return "http://a.pomf.se/{}".format(content.json()["files"][0]["url"])
except Exception as e:
return "Error: {}".format(e)
def downloadImage(url, folder, name, loop, chunkSize=20):
result = {'canAccessURL': False, 'isImage': False, 'fileSaved': False}
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
'Accept-Encoding': 'none',
'Accept-Language': 'en-US,en;q=0.8',
'Connection': 'keep-alive'}
async with aiohttp.ClientSession(loop=loop) as session:
with aiohttp.Timeout(10, loop=session.loop):
async with session.get(url, headers=headers) as response:
content_type = response.headers['content-type']
if response.status == 200:
result['canAccessURL'] = True
if "image" in content_type:
result['isImage'] = True
if not result['canAccessURL'] or not result['isImage']:
return result
extension = mimetypes.guess_extension(content_type)
if extension == '.jpe':
extension = '.jpg'
with open(folder + "/" + name + extension, 'wb') as fd:
while True:
chunk = await response.content.read(chunkSize)
if not chunk:
break
fd.write(chunk)
result['fileSaved'] = True
return result
def downloadImage(url, folder, name, loop, chunkSize=20):
result = {'canAccessURL': False, 'isImage': False, 'fileSaved': False}
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
'Accept-Encoding': 'none',
'Accept-Language': 'en-US,en;q=0.8',
'Connection': 'keep-alive'}
async with aiohttp.ClientSession(loop=loop) as session:
with aiohttp.Timeout(10, loop=session.loop):
async with session.get(url, headers=headers) as response:
content_type = response.headers['content-type']
if response.status == 200:
result['canAccessURL'] = True
if "image" in content_type:
result['isImage'] = True
if not result['canAccessURL'] or not result['isImage']:
return result
extension = mimetypes.guess_extension(content_type)
if extension == '.jpe':
extension = '.jpg'
with open(folder + "/" + name + extension, 'wb') as fd:
while True:
chunk = await response.content.read(chunkSize)
if not chunk:
break
fd.write(chunk)
result['fileSaved'] = True
return result
def upload_image(self, image_uri, sync, username, userid, channel_name):
token = self.apikey
logger.info('downloading %s', image_uri)
filename = os.path.basename(image_uri)
request = urllib.request.Request(image_uri)
request.add_header("Authorization", "Bearer %s" % token)
image_response = urllib.request.urlopen(request)
content_type = image_response.info().get_content_type()
filename_extension = mimetypes.guess_extension(content_type).lower() # returns with "."
physical_extension = "." + filename.rsplit(".", 1).pop().lower()
if physical_extension == filename_extension:
pass
elif filename_extension == ".jpe" and physical_extension in [ ".jpg", ".jpeg", ".jpe", ".jif", ".jfif" ]:
# account for mimetypes idiosyncrancy to return jpe for valid jpeg
pass
else:
logger.warning("unable to determine extension: {} {}".format(filename_extension, physical_extension))
filename += filename_extension
logger.info('uploading as %s', filename)
image_id = yield from self.bot._client.upload_image(image_response, filename=filename)
logger.info('sending HO message, image_id: %s', image_id)
yield from sync._bridgeinstance._send_to_internal_chat(
sync.hangoutid,
"shared media from slack",
{ "sync": sync,
"source_user": username,
"source_uid": userid,
"source_title": channel_name },
image_id=image_id )
def download(url, filename):
file_url = requests.get(url)
file_extension = mimetypes.guess_extension(file_url.headers['content-type'])
with open(filename+file_extension, 'wb') as file:
file.write(file_url.content)
def __init__(self, s):
self.str = s
# Extract arguments (anything that follows ':')
if ':' in s:
self.ts_format, _, arguments_str = s.partition(':')
self.arguments = tuple(arguments_str.split(','))
else:
self.ts_format = s
self.arguments = tuple()
# Check if is mimetype, extension or qualifier
self.is_qualifier = False
self.mimetype = None
self.extension = None
if '/' in self.ts_format:
self.mimetype = self.ts_format
ext = mimetypes.guess_extension(self.mimetype)
if ext:
self.extension = ext.strip('.').upper()
elif self.ts_format.isupper():
self.extension = self.ts_format
fn = 'fn.%s' % self.extension
self.mimetype, _ = mimetypes.guess_type(fn) # discard encoding
else:
# Is qualifier, can't determine mimetype OR extension
self.is_qualifier = True
def guess_extension(mimetype):
"""guess a file extension from mimetype, without leading `.`
Returns `unknown` if an extension could not be guessed
"""
x = (mimetypes.guess_extension(mimetype.split(';')[0]) or '.unknown')[1:]
return x if x != 'htm' else 'html'
def load_conf(self, fd, *, path=None, mime_type=None, response=None):
if isinstance(response, http.client.HTTPResponse):
url = URL(response.geturl())
self.uris.append(url)
mime_type = response.headers.get('Content-Type')
if mime_type:
mime_type = mime_type.split(';')[0].strip()
logger.info('Config found: {} [{}]'.format(url, mime_type))
if path:
loader = registry.get(path.suffix)
path = path.absolute()
self.uris.append(path)
logger.info('Config found: {}'.format(path))
elif mime_type in registry:
loader = registry.get(mime_type)
elif mimetypes.guess_extension(mime_type) in registry:
loader = registry.get(mimetypes.guess_extension(mime_type))
elif not mime_type:
raise LookupError('Not found mime_type %s' % mime_type)
else:
raise NotImplemented
if response is not None:
return loader.load_bytes(response.read())
elif fd is None:
return loader.load_path(path)
with fd:
return loader.load_fd(fd)