def get_zip(self, project, ty):
"""Get a ZIP file directly from uploaded directory
or generate one on the fly and upload it if not existing."""
filename = self.download_name(project, ty)
if not self.zip_existing(project, ty):
print "Warning: Generating %s on the fly now!" % filename
self._make_zip(project, ty)
if isinstance(uploader, local.LocalUploader):
filepath = self._download_path(project)
res = send_file(filename_or_fp=safe_join(filepath, filename),
mimetype='application/octet-stream',
as_attachment=True,
attachment_filename=filename)
# fail safe mode for more encoded filenames.
# It seems Flask and Werkzeug do not support RFC 5987 http://greenbytes.de/tech/tc2231/#encoding-2231-char
# res.headers['Content-Disposition'] = 'attachment; filename*=%s' % filename
return res
else:
return redirect(url_for('rackspace', filename=filename,
container=self._container(project),
_external=True))
python类safe_join()的实例源码
def _download_path(self, project):
container = self._container(project)
if isinstance(uploader, local.LocalUploader):
filepath = safe_join(uploader.upload_folder, container)
else:
print("The method Exporter _download_path should not be used for Rackspace etc.!") # TODO: Log this stuff
filepath = container
return filepath
def files_head(path):
"""
access(path) -> HEAD /files/<path>
200
404 Not Found
"""
if path == "/":
path = app.config['serve_dir']
else:
path = flask.safe_join(app.config['serve_dir'], path)
if not os.path.exists(path):
return "File not found.", 404
return ""
def files_post(path):
"""
create(path) -> POST /files/<path>?op=create
200
mkdir(path) -> POST /files/<path>?op=mkdir
200
400 Directory exists.
"""
if 'op' not in flask.request.args:
return 'Missing operation.', 400
op = flask.request.args['op']
path = flask.safe_join(app.config['serve_dir'], path)
if op == "create":
fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
os.close(fd)
return ""
elif op == "mkdir":
try:
fd = os.mkdir(path, 0o755)
except FileExistsError:
return 'Directory exists.', 400
return ""
return 'Unknown operation.', 400
def files_put(path):
"""
write(path, data, offset) -> PUT /files/<path>
{"data": "<base64 str>", "offset": <offset int>}
200 <bytes written int>
404 File not found.
"""
path = flask.safe_join(app.config['serve_dir'], path)
payload = flask.request.json
if 'data' not in payload:
return 'Missing data.', 400
elif 'offset' not in payload:
return 'Missing offset.', 400
data = base64.b64decode(payload['data'])
offset = int(payload['offset'])
# Open, seek, write, close
try:
fd = os.open(path, os.O_WRONLY)
os.lseek(fd, offset, os.SEEK_SET)
n = os.write(fd, data)
os.close(fd)
except FileNotFoundError:
return 'File not found.', 404
return flask.jsonify({"count": n})
def files_delete(path):
"""
unlink(path) -> DELETE /files/<path>?op=unlink
200
404 File not found.
rmdir(path) -> DELETE /files/<path>?op=rmdir
200
404 File not found.
"""
if 'op' not in flask.request.args:
return 'Missing operation.', 400
op = flask.request.args['op']
path = flask.safe_join(app.config['serve_dir'], path)
if op == 'unlink':
try:
os.unlink(path)
except FileNotFoundError:
return 'File not found.', 404
return ""
elif op == 'rmdir':
try:
os.rmdir(path)
except FileNotFoundError:
return 'File not found.', 404
except OSError as e:
return 'Errno {}'.format(e.errno), 400
return ""
return 'Unknown operation.', 400
def get_file_path(path):
"""
Gets the real filepath of a file within the Alexandria environment
"""
return safe_join(app.config.get('storage','location',None),path)
def preview_file(name):
# get information about the file
basepath=app.config.get("storage", "location")
# Now, we're going to join the two together and normalize path
fullpath = safe_join(basepath, name)
# TODO: This could be nicer.
if(not os.path.exists(fullpath)):
abort(404)
parent_dir='/'.join(name.split('/')[:-1])
file_stat = os.stat(fullpath)
file_record = {
"path":name,
"name":name.split('/')[-1],
"size":sizeof_fmt(file_stat.st_size)
}
previews = get_previews(name)
if(len(previews) > 0):
file_record["previews"] = previews
# Render the preview template.
return render_template('browser/preview_panel.html',
parent_dir=parent_dir,
file=file_record
)
def upload(path=""):
"""
Uploads a file to a place.
"""
# Check that uploads are OK
if not can_upload():
return redirect(sign_auth_path(request.full_path))
if request.method=='POST':
# handle uploaded files
if not 'file' in request.files:
abort(400) # General UA error
file = request.files["file"]
# We default to using the name of the file provided,
# but allow the filename to be changed via POST details.
fname = file.filename
if 'name' in request.form:
fname = request.form['name']
safe_fname = secure_filename(fname)
if not allowed_filename(safe_fname):
abort(400) # General client error
# We're handling a potentially dangerous path, better run it through
# The flask path jointer.
basepath=app.config.get("storage", "location")
fullpath = safe_join(basepath, path)
file.save(os.path.join(fullpath,fname))
flash("File uploaded successfully")
return redirect(url_for('browser.upload',path=path))
else:
return render_template("browser/upload.html",path=path,title="Upload Files")
def get_file_path(path):
"""
Gets the real filepath of a file within the Alexandria environment
"""
return safe_join(app.config.get('storage','location',None),path)
def preview_file(name):
# get information about the file
basepath=app.config.get("storage", "location")
# Now, we're going to join the two together and normalize path
fullpath = safe_join(basepath, name)
# TODO: This could be nicer.
if(not os.path.exists(fullpath)):
abort(404)
parent_dir='/'.join(name.split('/')[:-1])
file_stat = os.stat(fullpath)
file_record = {
"path":name,
"name":name.split('/')[-1],
"size":sizeof_fmt(file_stat.st_size)
}
previews = get_previews(name)
if(len(previews) > 0):
file_record["previews"] = previews
# Render the preview template.
return render_template('browser/preview_panel.html',
parent_dir=parent_dir,
file=file_record
)
def upload(path=""):
"""
Uploads a file to a place.
"""
# Check that uploads are OK
if not can_upload():
return redirect(sign_auth_path(request.full_path))
if request.method=='POST':
# handle uploaded files
if not 'file' in request.files:
abort(400) # General UA error
file = request.files["file"]
# We default to using the name of the file provided,
# but allow the filename to be changed via POST details.
fname = file.filename
if 'name' in request.form:
fname = request.form['name']
safe_fname = secure_filename(fname)
if not allowed_filename(safe_fname):
abort(400) # General client error
# We're handling a potentially dangerous path, better run it through
# The flask path jointer.
basepath=app.config.get("storage", "location")
fullpath = safe_join(basepath, path)
file.save(os.path.join(fullpath,fname))
flash("File uploaded successfully")
return redirect(url_for('browser.upload',path=path))
else:
return render_template("browser/upload.html",path=path,title="Upload Files")
def send_js(path):
return send_from_directory(safe_join(app.config['STATIC_FOLDER'], 'js'), path)
def send_css(path):
return send_from_directory(safe_join(app.config['STATIC_FOLDER'], 'css'), path)
def send_fonts(path):
return send_from_directory(safe_join(app.config['STATIC_FOLDER'], 'fonts'), path)
def root():
print(safe_join(app.config['FRONT_END_ROOT'], 'index.html'))
file = send_from_directory(app.config['FRONT_END_ROOT'], 'index.html')
return file
def files_get(path):
"""
getattr(path) -> GET /files/<path>?op=getattr
200 {"st_mode": <mode int>, "st_size": <size int>}
404 Not Found
readdir(path) -> GET /files/<path>?op=readdir
200 {"files": ["<file str>", ... ]}
404 Not Found
read(path, size, offset) -> GET /files/<path>?op=read&size=<size>&offset=<offset>
200 "<base64 str>"
404 Not Found
"""
if 'op' not in flask.request.args:
return 'Missing operation.', 400
op = flask.request.args['op']
if path == "/":
path = app.config['serve_dir']
else:
path = flask.safe_join(app.config['serve_dir'], path)
if op == 'getattr':
try:
info = os.stat(path)
return flask.jsonify({'st_mode': info.st_mode, 'st_size': info.st_size})
except FileNotFoundError:
return 'File not found.', 404
elif op == 'readdir':
try:
return flask.jsonify({"files": os.listdir(path)})
except FileNotFoundError:
return 'File not found.', 404
elif op == 'read':
if 'size' not in flask.request.args:
return 'Missing size.', 400
elif 'offset' not in flask.request.args:
return 'Missing offset.', 400
size = int(flask.request.args['size'])
offset = int(flask.request.args['offset'])
# Open, seek, read, close
try:
fd = os.open(path, os.O_RDONLY)
os.lseek(fd, offset, os.SEEK_SET)
buf = os.read(fd, size)
os.close(fd)
except FileNotFoundError:
return 'File not found.', 404
return flask.jsonify({"data": base64.b64encode(buf).decode()})
return 'Unknown operation.', 400
def get_file(
file_name: str, name: str = 'export'
) -> werkzeug.wrappers.Response:
"""Serve some specific file in the uploads folder.
.. :quickref: File; Get an uploaded file directory.
.. note::
Only files uploaded using :http:post:`/api/v1/files/` may be retrieved.
:param str file_name: The filename of the file to get.
:returns: The requested file.
:raises PermissionException: If there is no logged in user. (NOT_LOGGED_IN)
"""
name = request.args.get('name', name)
directory = app.config['MIRROR_UPLOAD_DIR']
error = False
@after_this_request
def delete_file(response: t.Any) -> t.Any:
if not error:
filename = safe_join(directory, file_name)
os.unlink(filename)
return response
try:
mimetype = request.args.get('mime', None)
as_attachment = request.args.get('not_as_attachment', False)
return send_from_directory(
directory,
file_name,
attachment_filename=name,
as_attachment=as_attachment,
mimetype=mimetype
)
except NotFound:
error = True
raise APIException(
'The specified file was not found',
f'The file with name "{file_name}" was not found or is deleted.',
APICodes.OBJECT_NOT_FOUND,
404,
)