def upload_to_s3(css_file):
bucket_name = settings.AWS_BUCKET_NAME
conn = S3Connection(settings.AWS_ACCESS_KEY_ID, settings.AWS_SECRET_ACCESS_KEY)
folder = 'webpack_bundles/'
bucket = conn.get_bucket(bucket_name=bucket_name)
filename = css_file.split('/')[-1]
file_obj = open(css_file, 'r')
content = file_obj.read()
key = folder + filename
bucket = conn.get_bucket(bucket_name=bucket_name)
mime = mimetypes.guess_type(filename)[0]
k = Key(bucket)
k.key = key # folder + filename
k.set_metadata("Content-Type", mime)
k.set_contents_from_string(content)
public_read = True
if public_read:
k.set_acl("public-read")
python类guess_type()的实例源码
def serve(self, request, path):
# the following code is largely borrowed from `django.views.static.serve`
# and django-filetransfers: filetransfers.backends.default
fullpath = os.path.join(settings.PRIVATE_MEDIA_ROOT, path)
if not os.path.exists(fullpath):
raise Http404('"{0}" does not exist'.format(fullpath))
# Respect the If-Modified-Since header.
statobj = os.stat(fullpath)
content_type = mimetypes.guess_type(fullpath)[0] or 'application/octet-stream'
if not was_modified_since(request.META.get('HTTP_IF_MODIFIED_SINCE'),
statobj[stat.ST_MTIME], statobj[stat.ST_SIZE]):
return HttpResponseNotModified(content_type=content_type)
response = HttpResponse(open(fullpath, 'rb').read(), content_type=content_type)
response["Last-Modified"] = http_date(statobj[stat.ST_MTIME])
# filename = os.path.basename(path)
# response['Content-Disposition'] = smart_str(u'attachment; filename={0}'.format(filename))
return response
def send_document_file(self, input_file, entity, caption=''):
"""Sends a previously uploaded input_file
(which should be a document) to the given entity (or input peer)"""
# Determine mime-type and attributes
# Take the first element by using [0] since it returns a tuple
mime_type = guess_type(input_file.name)[0]
attributes = [
DocumentAttributeFilename(input_file.name)
# TODO If the input file is an audio, find out:
# Performer and song title and add DocumentAttributeAudio
]
# Ensure we have a mime type, any; but it cannot be None
# 'The "octet-stream" subtype is used to indicate that a body
# contains arbitrary binary data.'
if not mime_type:
mime_type = 'application/octet-stream'
self.send_media_file(
InputMediaUploadedDocument(
file=input_file,
mime_type=mime_type,
attributes=attributes,
caption=caption),
entity)
def screenshot(self, options: dict = None, **kwargs: Any) -> bytes:
"""Take screen shot."""
options = options or dict()
options.update(kwargs)
screenshotType = None
if 'path' in options:
mimeType, _ = mimetypes.guess_type(options['path'])
if mimeType == 'image/png':
screenshotType = 'png'
elif mimeType == 'image/jpeg':
screenshotType = 'jpeg'
else:
raise PageError('Unsupported screenshot '
f'mime type: {mimeType}')
if 'type' in options:
screenshotType = options['type']
if not screenshotType:
screenshotType = 'png'
return await self._screenshotTask(screenshotType, options)
def serve_static(abe, path, start_response):
slen = len(abe.static_path)
if path[:slen] != abe.static_path:
raise PageNotFound()
path = path[slen:]
try:
# Serve static content.
# XXX Should check file modification time and handle HTTP
# if-modified-since. Or just hope serious users will map
# our htdocs as static in their web server.
# XXX is "+ '/' + path" adequate for non-POSIX systems?
found = open(abe.htdocs + '/' + path, "rb")
import mimetypes
type, enc = mimetypes.guess_type(path)
# XXX Should do something with enc if not None.
# XXX Should set Content-length.
start_response('200 OK', [('Content-type', type or 'text/plain')])
return found
except IOError:
raise PageNotFound()
# Change this if you want empty or multi-byte address versions.
def serve_static(abe, path, start_response):
slen = len(abe.static_path)
if path[:slen] != abe.static_path:
raise PageNotFound()
path = path[slen:]
try:
# Serve static content.
# XXX Should check file modification time and handle HTTP
# if-modified-since. Or just hope serious users will map
# our htdocs as static in their web server.
# XXX is "+ '/' + path" adequate for non-POSIX systems?
found = open(abe.htdocs + '/' + path, "rb")
import mimetypes
type, enc = mimetypes.guess_type(path)
# XXX Should do something with enc if not None.
# XXX Should set Content-length.
start_response('200 OK', [('Content-type', type or 'text/plain')])
return found
except IOError:
raise PageNotFound()
# Change this if you want empty or multi-byte address versions.
def add_file(self, name, file, filename=None, content_type=None):
"""Adds a new file to the dict. `file` can be a file name or
a :class:`file`-like or a :class:`FileStorage` object.
:param name: the name of the field.
:param file: a filename or :class:`file`-like object
:param filename: an optional filename
:param content_type: an optional content type
"""
if isinstance(file, FileStorage):
value = file
else:
if isinstance(file, string_types):
if filename is None:
filename = file
file = open(file, 'rb')
if filename and content_type is None:
content_type = mimetypes.guess_type(filename)[0] or \
'application/octet-stream'
value = FileStorage(file, filename, name, content_type)
self.add(name, value)
def checkforhtml(self, info, url):
# if info.has_key('content-type'):
# ctype = cgi.parse_header(info['content-type'])[0].lower()
# if ';' in ctype:
# # handle content-type: text/html; charset=iso8859-1 :
# ctype = ctype.split(';', 1)[0].strip()
# else:
# if url[-1:] == "/":
# return 1
# ctype, encoding = mimetypes.guess_type(url)
# if ctype == 'text/html':
# return 1
# else:
# self.note(1, " Not HTML, mime type %s", ctype)
# return 0
return 1
def __init__(self, filename, mimetype=None, chunksize=DEFAULT_CHUNK_SIZE,
resumable=False):
"""Constructor.
Args:
filename: string, Name of the file.
mimetype: string, Mime-type of the file. If None then a mime-type will be
guessed from the file extension.
chunksize: int, File will be uploaded in chunks of this many bytes. Only
used if resumable=True. Pass in a value of -1 if the file is to be
uploaded in a single chunk. Note that Google App Engine has a 5MB limit
on request size, so you should never set your chunksize larger than 5MB,
or to -1.
resumable: bool, True if this is a resumable upload. False means upload
in a single request.
"""
self._filename = filename
fd = open(self._filename, 'rb')
if mimetype is None:
(mimetype, encoding) = mimetypes.guess_type(filename)
super(MediaFileUpload, self).__init__(fd, mimetype, chunksize=chunksize,
resumable=resumable)
def get_content_type(self):
"""Returns the ``Content-Type`` header to be used for this request.
.. versionadded:: 3.1
"""
mime_type, encoding = mimetypes.guess_type(self.absolute_path)
# per RFC 6713, use the appropriate type for a gzip compressed file
if encoding == "gzip":
return "application/gzip"
# As of 2015-07-21 there is no bzip2 encoding defined at
# http://www.iana.org/assignments/media-types/media-types.xhtml
# So for that (and any other encoding), use octet-stream.
elif encoding is not None:
return "application/octet-stream"
elif mime_type is not None:
return mime_type
# if mime_type not detected, use application/octet-stream
else:
return "application/octet-stream"
def get_content_type(self):
"""Returns the ``Content-Type`` header to be used for this request.
.. versionadded:: 3.1
"""
mime_type, encoding = mimetypes.guess_type(self.absolute_path)
# per RFC 6713, use the appropriate type for a gzip compressed file
if encoding == "gzip":
return "application/gzip"
# As of 2015-07-21 there is no bzip2 encoding defined at
# http://www.iana.org/assignments/media-types/media-types.xhtml
# So for that (and any other encoding), use octet-stream.
elif encoding is not None:
return "application/octet-stream"
elif mime_type is not None:
return mime_type
# if mime_type not detected, use application/octet-stream
else:
return "application/octet-stream"
def get_content_type(self):
"""Returns the ``Content-Type`` header to be used for this request.
.. versionadded:: 3.1
"""
mime_type, encoding = mimetypes.guess_type(self.absolute_path)
# per RFC 6713, use the appropriate type for a gzip compressed file
if encoding == "gzip":
return "application/gzip"
# As of 2015-07-21 there is no bzip2 encoding defined at
# http://www.iana.org/assignments/media-types/media-types.xhtml
# So for that (and any other encoding), use octet-stream.
elif encoding is not None:
return "application/octet-stream"
elif mime_type is not None:
return mime_type
# if mime_type not detected, use application/octet-stream
else:
return "application/octet-stream"
def get_delta(asked_file_path, cached_file_path):
""" Return a tuple of (body, mime_type), given two file paths """
if not exists(asked_file_path):
raise InvalidAskedFile("%s not found." % asked_file_path)
if not exists(cached_file_path):
raise InvalidCachedFile("%s not found." % asked_file_path)
with open(asked_file_path, 'r') as filep:
asked_file_string = filep.read()
with open(cached_file_path, 'r') as filep:
cached_file_string = filep.read()
body = calculate_delta(asked_file_string, cached_file_string)
mime_type = "text/sw-delta"
if len(body) > len(asked_file_string):
body = asked_file_string
mime_type, _ = guess_type(asked_file_path)
return body, mime_type
def multipart_encode(vars, files, boundary = None, buffer = None):
if boundary is None:
boundary = mimetools.choose_boundary()
if buffer is None:
buffer = ''
for(key, value) in vars:
buffer += '--%s\r\n' % boundary
buffer += 'Content-Disposition: form-data; name="%s"' % key
buffer += '\r\n\r\n' + value + '\r\n'
for(key, fd) in files:
file_size = os.fstat(fd.fileno())[stat.ST_SIZE]
filename = fd.name.split('/')[-1]
contenttype = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
buffer += '--%s\r\n' % boundary
buffer += 'Content-Disposition: form-data; name="%s"; filename="%s"\r\n' % (key, filename)
buffer += 'Content-Type: %s\r\n' % contenttype
# buffer += 'Content-Length: %s\r\n' % file_size
fd.seek(0)
buffer += '\r\n' + fd.read() + '\r\n'
buffer += '--%s--\r\n\r\n' % boundary
return boundary, buffer
def _create_attachment(self, filename, content, mimetype=None):
"""
Converts the filename, content, mimetype triple into a MIME attachment
object.
"""
if mimetype is None:
mimetype, _ = mimetypes.guess_type(filename)
if mimetype is None:
mimetype = DEFAULT_ATTACHMENT_MIME_TYPE
attachment = self._create_mime_attachment(content, mimetype)
if filename:
try:
filename.encode('ascii')
except UnicodeEncodeError:
if six.PY2:
filename = filename.encode('utf-8')
filename = ('utf-8', '', filename)
attachment.add_header('Content-Disposition', 'attachment',
filename=filename)
return attachment
def encode_file(boundary, key, file):
to_bytes = lambda s: force_bytes(s, settings.DEFAULT_CHARSET)
filename = os.path.basename(file.name) if hasattr(file, 'name') else ''
if hasattr(file, 'content_type'):
content_type = file.content_type
elif filename:
content_type = mimetypes.guess_type(filename)[0]
else:
content_type = None
if content_type is None:
content_type = 'application/octet-stream'
if not filename:
filename = key
return [
to_bytes('--%s' % boundary),
to_bytes('Content-Disposition: form-data; name="%s"; filename="%s"'
% (key, filename)),
to_bytes('Content-Type: %s' % content_type),
b'',
to_bytes(file.read())
]
def _create_attachment(self, filename, content, mimetype=None):
"""
Converts the filename, content, mimetype triple into a MIME attachment
object.
"""
if mimetype is None:
mimetype, _ = mimetypes.guess_type(filename)
if mimetype is None:
mimetype = DEFAULT_ATTACHMENT_MIME_TYPE
attachment = self._create_mime_attachment(content, mimetype)
if filename:
try:
filename.encode('ascii')
except UnicodeEncodeError:
if six.PY2:
filename = filename.encode('utf-8')
filename = ('utf-8', '', filename)
attachment.add_header('Content-Disposition', 'attachment',
filename=filename)
return attachment
def _get_encoded_attachments(self):
attachments = self.get_attachments()
if attachments:
new_attachments = []
for attachment in attachments:
if isinstance(attachment, File):
attachment.seek(0)
new_attachments.append((attachment.name, attachment.read(), guess_type(attachment.name)[0]))
else:
new_attachments.append(attachment)
attachments = new_attachments
return jsonpickle.dumps(attachments)
def protected_view(request, path, server="django", as_download=False):
if server != "django":
mimetype, encoding = mimetypes.guess_type(path)
response = HttpResponse()
response["Content-Type"] = mimetype
if encoding:
response["Content-Encoding"] = encoding
if as_download:
response["Content-Disposition"] = "attachment; filename={}".format(
basename(path))
response[server_header(server)] = os.path.join(
PROTECTED_MEDIA_LOCATION_PREFIX, path
).encode("utf8")
else:
response = serve(
request, path, document_root=PROTECTED_MEDIA_ROOT,
show_indexes=False
)
return response
def protected_view(request, path, server="django", as_download=False):
if server != "django":
mimetype, encoding = mimetypes.guess_type(path)
response = HttpResponse()
response["Content-Type"] = mimetype
if encoding:
response["Content-Encoding"] = encoding
if as_download:
response["Content-Disposition"] = "attachment; filename={}".format(
basename(path))
response[server_header(server)] = os.path.join(
PROTECTED_MEDIA_LOCATION_PREFIX, path
).encode("utf8")
else:
response = serve(
request, path, document_root=PROTECTED_MEDIA_ROOT,
show_indexes=False
)
return response
def getType(file):
(type, _) = guess_type(file)
if type is None:
# Detect some unknown types
if file[-12:].lower() == "video_ts.ifo":
return "video/x-dvd"
if file == "/media/audiocd/cdplaylist.cdpls":
return "audio/x-cda"
p = file.rfind('.')
if p == -1:
return None
ext = file[p+1:].lower()
if ext == "dat" and file[-11:-6].lower() == "avseq":
return "video/x-vcd"
return type
def guess_type(self, path, allow_directory=True):
"""
Guess the type of a file.
If allow_directory is False, don't consider the possibility that the
file is a directory.
Parameters
----------
obj: s3.Object or string
"""
if path.endswith(".ipynb"):
return "notebook"
elif allow_directory and self.dir_exists(path):
return "directory"
else:
return "file"
def get(self, path, content=True, type=None, format=None):
# Get a file or directory model.
self.log.debug("S3contents.GenericManager.get] path('%s') type(%s) format(%s)", path, type, format)
path = path.strip('/')
if type is None:
type = self.guess_type(path)
try:
func = {
"directory": self._get_directory,
"notebook": self._get_notebook,
"file": self._get_file,
}[type]
except KeyError:
raise ValueError("Unknown type passed: '{}'".format(type))
return func(path=path, content=content, format=format)
def _file_model_from_path(self, path, content=False, format=None):
"""
Build a file model from database record.
"""
model = base_model(path)
model["type"] = "file"
if self.fs.isfile(path):
model["last_modified"] = model["created"] = self.fs.lstat(path)["ST_MTIME"]
else:
model["last_modified"] = model["created"] = DUMMY_CREATED_DATE
if content:
try:
content = self.fs.read(path)
except NoSuchFile as e:
self.no_such_entity(e.path)
except GenericFSError as e:
self.do_error(str(e), 500)
model["format"] = format or "text"
model["content"] = content
model["mimetype"] = mimetypes.guess_type(path)[0] or "text/plain"
if format == "base64":
model["format"] = format or "base64"
from base64 import b64decode
model["content"] = b64decode(content)
return model
def _convert_file_records(self, paths):
"""
Applies _notebook_model_from_s3_path or _file_model_from_s3_path to each entry of `paths`,
depending on the result of `guess_type`.
"""
ret = []
for path in paths:
# path = self.fs.remove_prefix(path, self.prefix) # Remove bucket prefix from paths
if os.path.basename(path) == self.fs.dir_keep_file:
continue
type_ = self.guess_type(path, allow_directory=True)
if type_ == "notebook":
ret.append(self._notebook_model_from_path(path, False))
elif type_ == "file":
ret.append(self._file_model_from_path(path, False, None))
elif type_ == "directory":
ret.append(self._directory_model_from_path(path, False))
else:
self.do_error("Unknown file type %s for file '%s'" % (type_, path), 500)
return ret
def body_producer(boundary, files):
buf = BytesIO()
write = buf.write
for (filename, data) in files:
write('--{}\r\n'.format(boundary).encode('utf-8'))
write('Content-Disposition: form-data; name="{}"; filename="{}"\r\n'.format(
filename, filename).encode('utf-8'))
mtype = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
write('Content-Type: {}\r\n'.format(mtype).encode('utf-8'))
write(b'\r\n')
write(data)
write(b'\r\n')
write('--{}--\r\n'.format(boundary).encode('utf-8'))
return buf.getbuffer().tobytes()
def is_tarfile(path):
"""
Returns if the path belongs to a tarfile or not.
.. versionadded:: 1.2.0
:param str path: path to be checked
:returns: **True** if the path belongs to a tarball, **False** otherwise
"""
# Checking if it's a tar file may fail due to permissions so failing back
# to the mime type...
#
# IOError: [Errno 13] Permission denied: '/vmlinuz.old'
#
# With python 3 insuffient permissions raises an AttributeError instead...
#
# http://bugs.python.org/issue17059
try:
return tarfile.is_tarfile(path)
except (IOError, AttributeError):
return mimetypes.guess_type(path)[0] == 'application/x-tar'
def add_file(self, name, file, filename=None, content_type=None):
"""Adds a new file to the dict. `file` can be a file name or
a :class:`file`-like or a :class:`FileStorage` object.
:param name: the name of the field.
:param file: a filename or :class:`file`-like object
:param filename: an optional filename
:param content_type: an optional content type
"""
if isinstance(file, FileStorage):
value = file
else:
if isinstance(file, string_types):
if filename is None:
filename = file
file = open(file, 'rb')
if filename and content_type is None:
content_type = mimetypes.guess_type(filename)[0] or \
'application/octet-stream'
value = FileStorage(file, filename, name, content_type)
self.add(name, value)
def nfs_open(self, req):
url = req.full_url
directory_name, file_name = os.path.split(url)
server_name = req.host
print('FauxNFSHandler simulating mount:')
print(' Remote path: {}'.format(directory_name))
print(' Server : {}'.format(server_name))
print(' Local path : {}'.format(
os.path.basename(tempdir)))
print(' Filename : {}'.format(file_name))
local_file = os.path.join(tempdir, file_name)
fp = NFSFile(tempdir, file_name)
content_type = (
mimetypes.guess_type(file_name)[0] or
'application/octet-stream'
)
stats = os.stat(local_file)
size = stats.st_size
headers = {
'Content-type': content_type,
'Content-length': size,
}
return response.addinfourl(fp, headers,
req.get_full_url())
def guess_mimetype(resource_path):
"""
Guesses the mimetype of a given resource.
Args:
resource_path: the path to a given resource.
Returns:
The mimetype string.
"""
mime = mimetypes.guess_type(resource_path)[0]
if mime is None:
return "application/octet-stream"
return mime