def get_filename(self):
if self.mimetype == 'application/vnd.ms-excel': # HACK: we want .xls not .xlb for excel files
ext = '.xls'
else:
ext = mimetypes.guess_extension(self.mimetype) or '.bin'
name_slices = [
self.doctype.name if self.doctype else 'Unterlage', self.name,
self.version, timezone.localtime(self.date).strftime('%Y.%m.%d')
]
if self.parent_object and hasattr(self.parent_object, 'get_filename_slice'):
name_slices.insert(0, self.parent_object.get_filename_slice())
name = slugify('-'.join(name_slices))
return ''.join([name, ext])
python类guess_extension()的实例源码
def ext_from_mimetype(mimetype):
return mimetypes.guess_extension(mimetype)
def __get_download_filename__(self):
#Obtenemos nombre de archivo y extension
if "filename" in self.response_headers.get("content-disposition","") and "attachment" in self.response_headers.get("content-disposition",""):
cd_filename, cd_ext = os.path.splitext(urllib.unquote_plus(re.compile("attachment; filename ?= ?[\"|']?([^\"']+)[\"|']?").match(self.response_headers.get("content-disposition")).group(1)))
if "filename" in self.response_headers.get("content-disposition","") and "inline" in self.response_headers.get("content-disposition",""):
cd_filename, cd_ext = os.path.splitext(urllib.unquote_plus(re.compile("inline; filename ?= ?[\"|']?([^\"']+)[\"|']?").match(self.response_headers.get("content-disposition")).group(1)))
else:
cd_filename, cd_ext = "",""
url_filename, url_ext = os.path.splitext(urllib.unquote_plus(filetools.basename(urlparse.urlparse(self.url)[2])))
if self.response_headers.get("content-type","application/octet-stream") <> "application/octet-stream":
mime_ext = mimetypes.guess_extension(self.response_headers.get("content-type"))
else:
mime_ext = ""
#Seleccionamos el nombre mas adecuado
if cd_filename:
self.remote_filename = cd_filename
if not self._filename:
self._filename = cd_filename
elif url_filename:
self.remote_filename = url_filename
if not self._filename:
self._filename = url_filename
#Seleccionamos la extension mas adecuada
if cd_ext:
if not cd_ext in self._filename: self._filename += cd_ext
if self.remote_filename: self.remote_filename += cd_ext
elif mime_ext:
if not mime_ext in self._filename: self._filename += mime_ext
if self.remote_filename: self.remote_filename += mime_ext
elif url_ext:
if not url_ext in self._filename: self._filename += url_ext
if self.remote_filename: self.remote_filename += url_ext
def handle(url, data):
try:
config = Config()
config.browser_user_agent = data['user_agent']
article = Article(url, config)
article.download()
article.parse()
if article.top_image:
print('\t\tNewspaper located image: %s' % article.top_image)
r = requests.get(article.top_image, headers = {'User-Agent': data['user_agent']}, stream=True)
if r.status_code == 200:
content_type = r.headers['content-type']
ext = mimetypes.guess_extension(content_type)
if not ext or ext=='':
print('\t\tNewsPaper Error locating file MIME Type: %s' % url)
return False
if '.jp' in ext:
ext = '.jpg'
path = data['single_file'] % ext
if not os.path.isfile(path):
if not os.path.isdir(data['parent_dir']):
print("\t\t+Building dir: %s" % data['parent_dir'])
os.makedirs(data['parent_dir'])# Parent dir for the full filepath is supplied already.
with open(path, 'wb') as f:
r.raw.decode_content = True
shutil.copyfileobj(r.raw, f)
return path
else:
print('\t\tError Reading Image: %s responded with code %i!' % (url, r.status_code) )
return False
except Exception as e:
print('\t\t"Newspaper" Generic handler failed. '+(str(e).strip()) )
return False
def play_info(self):
metadata, images = self.robot_obj.play_info()
logger.info("Metadata:")
for k, v in metadata.items():
logger.info(" %s=%s", k, v)
tempfiles = []
if images:
for mime, buf in images:
ext = mimetypes.guess_extension(mime)
if ext:
ntf = NamedTemporaryFile(suffix=".jpg", delete=False)
ntf.write(buf)
tempfiles.append(ntf)
os.system("open " + " ".join([n.name for n in tempfiles]))
def scan_oneshot(self, filename=None):
images = self.task.oneshot()
tempfiles = []
for mime, buf in images:
ext = mimetypes.guess_extension(mime)
if ext:
ntf = NamedTemporaryFile(suffix=".jpg", delete=False)
ntf.write(buf)
tempfiles.append(ntf)
os.system("open " + " ".join([n.name for n in tempfiles]))
def scanimages(self, filename=None):
images = self.task.scanimages()
tempfiles = []
for mime, buf in images:
ext = mimetypes.guess_extension(mime)
if ext:
ntf = NamedTemporaryFile(suffix=".jpg", delete=False)
ntf.write(buf)
tempfiles.append(ntf)
os.system("open " + " ".join([n.name for n in tempfiles]))
def get_filename(self, content_type):
if not self._basename:
return None
typeValue = map(str.strip, content_type.split(";"))
if len(typeValue) == 0:
return None
extension = mimetypes.guess_extension(typeValue[0])
if not extension:
return None
return "%s%s" % (self._basename, extension)
def _find_attachments_in_email(mesg, expand_attachment, atts):
# MHTML detection
if mesg.get_content_maintype() == "multipart" and mesg.get_content_subtype() == "related":
for part in mesg.walk():
if part.is_multipart():
continue
payload = part.get_payload(decode=True)
if isinstance(payload, str) and payload.startswith('ActiveMime'):
return
for part in mesg.walk():
content_type = part.get_content_type()
if part.is_multipart():
continue
payload = part.get_payload(decode=True)
if content_type.startswith('text/') and expand_attachment:
normalized = payload.lstrip(" \t\r\n")
if any(normalized.startswith(m) for m in EMAIL_MAGIC):
new_mesg = email.message_from_string(normalized)
_find_attachments_in_email(new_mesg, expand_attachment, atts)
continue
if content_type in SAFE_MEDIA_TYPE:
continue
filename = part.get_filename()
if filename is None:
ext = mimetypes.guess_extension(content_type) or ''
filename = '<unknown>' + ext
else:
# Sanitize the header value
filename = _decode_header(filename)
filename = utils.get_filename_from_path(filename)
tempfile_path = utils.store_temp_file(payload, filename)
atts.append((tempfile_path, filename, content_type))
def get_extension(attachment):
"""
"""
try:
filename = attachment.get_filename()
if filename:
extension = os.path.splitext(filename)[1]
else:
extension = mimetypes.guess_extension(attachment.get_content_type())
return extension or '.bin'
except AttributeError:
return None
def _determineExtension(determined_type):
extension = config.default_ext
if determined_type in type_override:
return type_override[determined_type]
try:
extension = guess_extension(determined_type)
except:
pass
return extension
def each(self, target):
fp = open(target)
msg = email.message_from_file(fp)
fp.close()
path_temp = tempdir()
counter = 1
for part in msg.walk():
# multipart/* are just containers
if part.get_content_maintype() == 'multipart':
continue
# Applications should really sanitize the given filename so that an
# email message can't be used to overwrite important files
filename = part.get_filename()
if not filename:
ext = mimetypes.guess_extension(part.get_content_type())
if not ext:
# Use a generic bag-of-bits extension
ext = '.bin'
filename = 'part-%03d%s' % (counter, ext)
counter += 1
filepath = os.path.join(path_temp, filename)
fp = open(filepath, 'wb')
fp.write(part.get_payload(decode=True))
fp.close()
self.add_extracted_file(filepath)
def media_post(self, media_file, mime_type=None, description=None):
"""
Post an image. `media_file` can either be image data or
a file name. If image data is passed directly, the mime
type has to be specified manually, otherwise, it is
determined from the file name.
Throws a `MastodonIllegalArgumentError` if the mime type of the
passed data or file can not be determined properly.
Returns a `media dict`_. This contains the id that can be used in
status_post to attach the media file to a toot.
"""
if mime_type is None and os.path.isfile(media_file):
mime_type = mimetypes.guess_type(media_file)[0]
media_file = open(media_file, 'rb')
if mime_type is None:
raise MastodonIllegalArgumentError('Could not determine mime type'
' or data passed directly '
'without mime type.')
random_suffix = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10))
file_name = "mastodonpyupload_" + str(time.time()) + "_" + str(random_suffix) + mimetypes.guess_extension(
mime_type)
media_file_description = (file_name, media_file, mime_type)
return self.__api_request('POST', '/api/v1/media',
files={'file': media_file_description},
params={'description': description})
###
# Writing data: Domain blocks
###
def get_buffer_extension(buffer):
ext = get_mime_type_buffer(buffer)
ext = mimetypes.guess_extension(ext)
return ext if ext else '.png'
def _init_document(self, data, mime_type):
"""Upload the data using the documents API."""
filename = 'upload-{0}{1}'.format(uuid4(), guess_extension(mime_type))
document = self.client.documents.create(filename, len(data))
cursor = 0
while cursor < len(data):
chunk = data[cursor:cursor + self.chunk_size]
document.upload(cursor, cursor + len(chunk) - 1, chunk)
cursor += len(chunk)
self._init_url(document.uri)
def email_login():
"""
login check
:return:
"""
if flask.request.method == 'POST':
content = flask.request.form.get("html")
if content is not None:
print(content)
soup = BeautifulSoup(content, "html.parser")
save_key = soup.find(id="save_key").text.strip()
# session_id will expire after 24 hours
session_id = save_signer.unsign(save_key, max_age=86400)
session_id = bytes.decode(session_id)
user = User.query.filter_by(session_id=session_id).first_or_404()
# try to save the attachment file
limit_counter = 0
try:
for attachment in flask.request.files:
if limit_counter >= 1:
break
file_name = str(uuid.uuid1()) + guess_extension(flask.request.files[attachment].mimetype)
flask.request.files[attachment].save(file_name)
flask.session["file_name"] = file_name
limit_counter += 1
except AttributeError:
flask.session["file_name"] = ""
flask.session["entry"] = soup.select('div[style]')[0].text
flask.session["user_real_id"] = user.user_id
# after login flask_login will push user_id into session and this user_id is our session_id
# as the get_id method in User model returns user's session id
flask_login.login_user(user)
return flask.redirect(flask.url_for('protected_save'))
return flask.redirect(flask.url_for('protected_save'))
def prepare_file(self):
"""
This sets `self.file` to a fitting :class:`InputFile`
or a fitting sublcass (:class:`InputFileFromDisk`, :class:`InputFileFromURL`)
:return: Nothing
"""
if self.file_content:
file_name = "file"
file_suffix = ".blob"
if self.file_path:
file_name = os.path.basename(os.path.normpath(self.file_path)) # http://stackoverflow.com/a/3925147
file_name, file_suffix = os.path.splitext(file_name) # http://stackoverflow.com/a/541394/3423324
elif self.file_url:
from urllib.parse import urlparse # http://stackoverflow.com/a/18727481/3423324
url = urlparse(self.file_url)
file_name = os.path.basename(url.path)
file_name, file_suffix = os.path.splitext(file_name)
# end if
if self.file_mime:
import mimetypes
file_suffix = mimetypes.guess_extension(self.file_mime)
file_suffix = '.jpg' if file_suffix == '.jpe' else file_suffix # .jpe -> .jpg
# end if
if not file_suffix or not file_suffix.strip().lstrip("."):
logger.debug("file_suffix was empty. Using '.blob'")
file_suffix = ".blob"
# end if
file_name = "{filename}{suffix}".format(filename=file_name, suffix=file_suffix)
self.file = InputFile(self.file_content, file_name=file_name, file_mime=self.file_mime)
elif self.file_path:
self.file = InputFileFromDisk(self.file_path, file_mime=self.file_mime)
elif self.file_url:
self.file = InputFileFromURL(self.file_url, file_mime=self.file_mime)
# end if
# end def prepare_file
def send(self, sender: PytgbotApiBot, receiver, reply_id)->PytgbotApiMessage:
if self.receiver:
receiver = self.receiver
# end if
if self.reply_id is not DEFAULT_MESSAGE_ID:
reply_id = self.reply_id
# end if
self.prepare_file()
assert isinstance(self.file, (InputFile, InputFileFromDisk, InputFileFromURL))
if not any([self.file.file_name.endswith(x) for x in [".jpg", ".jpeg", ".gif", ".png", ".tif", ".bmp"]]):
if self.file.file_mime in ["image/jpg", "image/jpeg", "image/jpe"]: # manually, to avoid .jpe ending.
self.file.file_name+=".jpg"
else:
import mimetypes
ext = mimetypes.guess_extension(self.file.file_mime) # automatically
if ext not in [".jpg", ".jpeg", ".gif", ".png", ".tif", ".bmp"]:
ext = ".unknown-file-type.png" # At least we can try setting it as .png
self.file.file_name += ext
try:
return sender.send_photo(
receiver, self.file, caption=self.caption, reply_to_message_id=reply_id, reply_markup=self.reply_markup,
disable_notification = self.disable_notification
)
except TgApiServerException as e:
should_backoff(e) # checks if it should raise an DoRetryException
raise # else it just raises as usual
# end try
# end def send
# end class PhotoMessage
def _download_http_url(link, session, temp_dir, hashes):
"""Download link url into temp_dir using provided session"""
target_url = link.url.split('#', 1)[0]
try:
resp = session.get(
target_url,
# We use Accept-Encoding: identity here because requests
# defaults to accepting compressed responses. This breaks in
# a variety of ways depending on how the server is configured.
# - Some servers will notice that the file isn't a compressible
# file and will leave the file alone and with an empty
# Content-Encoding
# - Some servers will notice that the file is already
# compressed and will leave the file alone and will add a
# Content-Encoding: gzip header
# - Some servers won't notice anything at all and will take
# a file that's already been compressed and compress it again
# and set the Content-Encoding: gzip header
# By setting this to request only the identity encoding We're
# hoping to eliminate the third case. Hopefully there does not
# exist a server which when given a file will notice it is
# already compressed and that you're not asking for a
# compressed file and will then decompress it before sending
# because if that's the case I don't think it'll ever be
# possible to make this work.
headers={"Accept-Encoding": "identity"},
stream=True,
)
resp.raise_for_status()
except requests.HTTPError as exc:
logger.critical(
"HTTP error %s while getting %s", exc.response.status_code, link,
)
raise
content_type = resp.headers.get('content-type', '')
filename = link.filename # fallback
# Have a look at the Content-Disposition header for a better guess
content_disposition = resp.headers.get('content-disposition')
if content_disposition:
type, params = cgi.parse_header(content_disposition)
# We use ``or`` here because we don't want to use an "empty" value
# from the filename param.
filename = params.get('filename') or filename
ext = splitext(filename)[1]
if not ext:
ext = mimetypes.guess_extension(content_type)
if ext:
filename += ext
if not ext and link.url != resp.url:
ext = os.path.splitext(resp.url)[1]
if ext:
filename += ext
file_path = os.path.join(temp_dir, filename)
with open(file_path, 'wb') as content_file:
_download_url(resp, link, content_file, hashes)
return file_path, content_type
def _download_http_url(link, session, temp_dir, hashes):
"""Download link url into temp_dir using provided session"""
target_url = link.url.split('#', 1)[0]
try:
resp = session.get(
target_url,
# We use Accept-Encoding: identity here because requests
# defaults to accepting compressed responses. This breaks in
# a variety of ways depending on how the server is configured.
# - Some servers will notice that the file isn't a compressible
# file and will leave the file alone and with an empty
# Content-Encoding
# - Some servers will notice that the file is already
# compressed and will leave the file alone and will add a
# Content-Encoding: gzip header
# - Some servers won't notice anything at all and will take
# a file that's already been compressed and compress it again
# and set the Content-Encoding: gzip header
# By setting this to request only the identity encoding We're
# hoping to eliminate the third case. Hopefully there does not
# exist a server which when given a file will notice it is
# already compressed and that you're not asking for a
# compressed file and will then decompress it before sending
# because if that's the case I don't think it'll ever be
# possible to make this work.
headers={"Accept-Encoding": "identity"},
stream=True,
)
resp.raise_for_status()
except requests.HTTPError as exc:
logger.critical(
"HTTP error %s while getting %s", exc.response.status_code, link,
)
raise
content_type = resp.headers.get('content-type', '')
filename = link.filename # fallback
# Have a look at the Content-Disposition header for a better guess
content_disposition = resp.headers.get('content-disposition')
if content_disposition:
type, params = cgi.parse_header(content_disposition)
# We use ``or`` here because we don't want to use an "empty" value
# from the filename param.
filename = params.get('filename') or filename
ext = splitext(filename)[1]
if not ext:
ext = mimetypes.guess_extension(content_type)
if ext:
filename += ext
if not ext and link.url != resp.url:
ext = os.path.splitext(resp.url)[1]
if ext:
filename += ext
file_path = os.path.join(temp_dir, filename)
with open(file_path, 'wb') as content_file:
_download_url(resp, link, content_file, hashes)
return file_path, content_type