def get_zip(self, project, ty):
"""Get a ZIP file directly from uploaded directory
or generate one on the fly and upload it if not existing."""
filename = self.download_name(project, ty)
if not self.zip_existing(project, ty):
print "Warning: Generating %s on the fly now!" % filename
self._make_zip(project, ty)
if isinstance(uploader, local.LocalUploader):
filepath = self._download_path(project)
res = send_file(filename_or_fp=safe_join(filepath, filename),
mimetype='application/octet-stream',
as_attachment=True,
attachment_filename=filename)
# fail safe mode for more encoded filenames.
# It seems Flask and Werkzeug do not support RFC 5987 http://greenbytes.de/tech/tc2231/#encoding-2231-char
# res.headers['Content-Disposition'] = 'attachment; filename*=%s' % filename
return res
else:
return redirect(url_for('rackspace', filename=filename,
container=self._container(project),
_external=True))
python类send_file()的实例源码
def collage():
if request.method=='GET':
return render_template('collage.html',input=True)
elif request.method == 'POST':
username = request.form['lastfm_username']
duration = request.form['duration']
print(username)
payload = {
'user': username,
'api_key': config.LASTFM_API_KEY,
'method': 'user.gettopalbums',
'period':duration,
'limit':9,
'format':'json'
}
r = requests.get(config.url, params = payload)
filenames = get_album_art(r.text, username)
generate_collage(filenames,username)
return send_file('static/' + username+'.jpg', mimetype='image/jpg')
def get_image_file(uuid, image_file_path):
"""GetImageFile (GET /images/:uuid/file)
Return the image file.
"""
encoded_md5 = get_image_file_md5(uuid, image_file_path)
if NGINX_ENABLED:
res = make_response('')
res.headers['Content-Type'] = 'application/octet-stream'
res.headers['X-Accel-Redirect'] = '/static/images/%s/file?md5=%s' % (uuid, encoded_md5)
else:
res = make_response(send_file(image_file_path, add_etags=False, cache_timeout=0))
res.headers['Content-Length'] = get_image_file_size(image_file_path)
res.headers['Content-MD5'] = encoded_md5
return res
def get_job_output(job_id, filename, as_attach=False, mimetype=None, tail=None, head=None, job=None):
try:
output_file = job.relative_path(filename)
if tail or head:
if tail and head:
return "Cannot specify tail AND head", 500
cmd = "head" if head else "tail"
count = tail or head
p = subprocess.Popen([cmd, "-n", count, output_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
tail_data, tail_error = p.communicate()
resp = make_response(tail_data)
if as_attach:
resp.headers["Content-Disposition"] = "attachment; filename={}".format(filename)
if mimetype:
resp.headers["Content-Type"] = mimetype
return resp
else:
return send_file(output_file, as_attachment=as_attach, mimetype=mimetype)
except Exception as e:
print e
return "File Not Found", 404
def send_file(filename, attachment_filename, mimetype, **kwargs):
response = flask_send_file(filename, mimetype=mimetype)
try:
attachment_filename = attachment_filename.encode('latin-1')
except UnicodeEncodeError:
filenames = {
'filename': unicodedata
.normalize('NFKD', attachment_filename)
.encode('latin-1', 'ignore'),
'filename*': "UTF-8''{}".format(
url_quote(attachment_filename)),
}
else:
filenames = {'filename': attachment_filename}
response.headers.set(
'Content-Disposition', 'attachment', **filenames)
return response
def test_send_file_xsendfile(self):
app = flask.Flask(__name__)
app.use_x_sendfile = True
with app.test_request_context():
rv = flask.send_file('static/index.html')
self.assert_true(rv.direct_passthrough)
self.assert_in('x-sendfile', rv.headers)
self.assert_equal(rv.headers['x-sendfile'],
os.path.join(app.root_path, 'static/index.html'))
self.assert_equal(rv.mimetype, 'text/html')
rv.close()
def serve_static_files(p, index_on_error=True):
"""Securely serve static files for the given path using send_file."""
# Determine the canonical path of the file
full_path = os.path.realpath(os.path.join(app.static_folder, p))
# We have a problem if either:
# - the path is not a sub-path of app.static_folder; or
# - the path does not refer to a real file.
if (os.path.commonprefix([app.static_folder, full_path]) != app.static_folder or
not os.path.isfile(full_path)):
file_to_return = app.config.get('STATIC_FILE_ON_404', None)
if file_to_return is not None:
full_path = os.path.realpath(os.path.join(app.static_folder, file_to_return))
else:
return abort(404)
return send_file(full_path)
def shrug():
import os, os.path
from tools.slack_posting import postImage
filename = os.path.join(os.getcwd(), tools.settings.getValue('misc', 'shinoa_shrug'))
if request.method == 'GET':
return send_file(filename, mimetype='image/{}'.format(filename.split('.')[-1]))
elif request.method == 'POST':
if (not (isValidCommand(request, 'shrug'))):
return "You dun goof'd."
postImage(filename, request.form['channel_id'], None, "shrug", "*shrug*")
return ""
def pages(page_path):
if not validate_custom_page_path(page_path):
raise ApiException(error=Error.NOT_ALLOWED, message='The visit of path "{}" is not allowed.'.format(page_path))
rel_url, exists = storage.fix_relative_url('page', page_path)
if exists:
file_path = rel_url
return send_file(file_path)
elif rel_url is None: # pragma: no cover, it seems impossible to make this happen, see code of 'fix_relative_url'
raise ApiException(error=Error.BAD_PATH, message='The path "{}" cannot be recognized.'.format(page_path))
else:
page_d = cache.get('api-handler.' + rel_url)
if page_d is not None:
return page_d # pragma: no cover, here just get the cached dict
page = storage.get_page(rel_url, include_draft=False)
if page is None:
raise ApiException(error=Error.RESOURCE_NOT_EXISTS)
page_d = page.to_dict()
del page_d['raw_content']
page_d['content'] = get_parser(page.format).parse_whole(page.raw_content)
cache.set('api-handler.' + rel_url, page_d, timeout=2 * 60)
return page_d
def report_get(task_id, report_format="json"):
task = Task.query.get(task_id)
if not task:
return json_error(404, "Task not found")
if task.status == Task.DELETED:
return json_error(404, "Task report has been deleted")
if task.status != Task.FINISHED:
return json_error(420, "Task not finished yet")
report_path = os.path.join(settings.reports_directory,
"%d" % task_id, "report.%s" % report_format)
if not os.path.isfile(report_path):
return json_error(404, "Report format not found")
return send_file(report_path)
def pcap_get(task_id):
task = Task.query.get(task_id)
if not task:
return json_error(404, "Task not found")
if task.status == Task.DELETED:
return json_error(404, "Task files has been deleted")
if task.status != Task.FINISHED:
return json_error(420, "Task not finished yet")
pcap_path = os.path.join(settings.reports_directory,
"%s" % task_id, "dump.pcap")
if not os.path.isfile(pcap_path):
return json_error(404, "Pcap file not found")
return send_file(pcap_path)
def shakemap_overlay(shakemap_id):
session = Session()
shakemap = (session.query(ShakeMap)
.filter(ShakeMap.shakemap_id == shakemap_id)
.order_by(desc(ShakeMap.shakemap_version))
.limit(1)).first()
if shakemap is not None:
img = os.path.join(app.config['EARTHQUAKES'],
shakemap_id,
shakemap_id + '-' + str(shakemap.shakemap_version),
'ii_overlay.png')
else:
img = app.send_static_file('sc_logo.png')
Session.remove()
return send_file(img, mimetype='image/gif')
def index():
path = request.args.get('path')
frame = request.args.get('frame')
id = request.args.get('id')
basename = os.path.split(path)[1]
video_path = '{}/{}'.format(FILE_DIR, basename)
if not os.path.isfile(video_path):
blob = bucket.get_blob(path)
with open(video_path, 'wb') as f:
blob.download_to_file(f)
sp.check_call(shlex.split("ffmpeg -y -i {} -vf \"select=eq(n\\,{})\" -frames:v 1 {}/%05d.jpg".format(video_path, frame, FILE_DIR)))
blob = bucket.blob('public/thumbnails/tvnews/frame_{}.jpg'.format(id))
img_path = '{}/00001.jpg'.format(FILE_DIR)
blob.upload_from_filename(img_path)
return send_file(img_path)
def media_file(filename):
"""
Retrieves a media file.
"""
outfile = media_storage.get_file(filename)
return send_file(outfile,mimetype='image/png')
def showHtml(filename):
return send_file(os.path.join(app.config['REPORT'], filename))
# ????????
def download_homework(filename):
return send_file(os.path.join(app.config['HOMEWORK'], filename))
def scripts():
return send_file('ui/cached_dist/Bock.js')
def styles():
return send_file('ui/cached_dist/Bock.css')
def fonts(route):
return send_file('ui/cached_dist/fonts/{}'.format(route))
def route(route):
return send_file('ui/cached_dist/index.html')
def index():
return send_file('ui/cached_dist/index.html')
def test_send_file_regular(self):
app = flask.Flask(__name__)
with app.test_request_context():
rv = flask.send_file('static/index.html')
self.assert_true(rv.direct_passthrough)
self.assert_equal(rv.mimetype, 'text/html')
with app.open_resource('static/index.html') as f:
rv.direct_passthrough = False
self.assert_equal(rv.data, f.read())
rv.close()
def test_attachment(self):
app = flask.Flask(__name__)
with catch_warnings() as captured:
with app.test_request_context():
f = open(os.path.join(app.root_path, 'static/index.html'))
rv = flask.send_file(f, as_attachment=True)
value, options = parse_options_header(rv.headers['Content-Disposition'])
self.assert_equal(value, 'attachment')
rv.close()
# mimetypes + etag
self.assert_equal(len(captured), 2)
with app.test_request_context():
self.assert_equal(options['filename'], 'index.html')
rv = flask.send_file('static/index.html', as_attachment=True)
value, options = parse_options_header(rv.headers['Content-Disposition'])
self.assert_equal(value, 'attachment')
self.assert_equal(options['filename'], 'index.html')
rv.close()
with app.test_request_context():
rv = flask.send_file(StringIO('Test'), as_attachment=True,
attachment_filename='index.txt',
add_etags=False)
self.assert_equal(rv.mimetype, 'text/plain')
value, options = parse_options_header(rv.headers['Content-Disposition'])
self.assert_equal(value, 'attachment')
self.assert_equal(options['filename'], 'index.txt')
rv.close()
def test_static_file(self):
app = flask.Flask(__name__)
# default cache timeout is 12 hours
with app.test_request_context():
# Test with static file handler.
rv = app.send_static_file('index.html')
cc = parse_cache_control_header(rv.headers['Cache-Control'])
self.assert_equal(cc.max_age, 12 * 60 * 60)
rv.close()
# Test again with direct use of send_file utility.
rv = flask.send_file('static/index.html')
cc = parse_cache_control_header(rv.headers['Cache-Control'])
self.assert_equal(cc.max_age, 12 * 60 * 60)
rv.close()
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 3600
with app.test_request_context():
# Test with static file handler.
rv = app.send_static_file('index.html')
cc = parse_cache_control_header(rv.headers['Cache-Control'])
self.assert_equal(cc.max_age, 3600)
rv.close()
# Test again with direct use of send_file utility.
rv = flask.send_file('static/index.html')
cc = parse_cache_control_header(rv.headers['Cache-Control'])
self.assert_equal(cc.max_age, 3600)
rv.close()
class StaticFileApp(flask.Flask):
def get_send_file_max_age(self, filename):
return 10
app = StaticFileApp(__name__)
with app.test_request_context():
# Test with static file handler.
rv = app.send_static_file('index.html')
cc = parse_cache_control_header(rv.headers['Cache-Control'])
self.assert_equal(cc.max_age, 10)
rv.close()
# Test again with direct use of send_file utility.
rv = flask.send_file('static/index.html')
cc = parse_cache_control_header(rv.headers['Cache-Control'])
self.assert_equal(cc.max_age, 10)
rv.close()
def document():
url = request.args.get('url')
if not url or not os.path.exists(url):
return abort(400, "File %s not found " % url)
return send_file(url)
def api_request_basic():
"""Establishes the endpoint that is used for the basic fetch mode
POST body should be json formated.
Required field(s):
- url : actual url that is requested to be fetched by the minion
Optional field(s):
- user-agent : user agent to be used during the request of the url
if this argument is not provided, then the default
user agent in the config file will be used.
"""
try:
data = request.get_json()
if data is None:
raise BadRequest
file_path = run_job(data, "basic")
return send_file(file_path)
except BadRequest as e:
return prepare_400("api_request_basic", str(e))
except ValueError as e:
return prepare_400("api_request_basic", str(e))
except Exception as e:
print type(e)
return prepare_500("api_request_basic", str(e))
def api_request_website():
"""Establishes the endpoint that is used for the website fetch mode
POST body should be json formated.
Required field(s):
- url : actual url that is requested to be fetched by the minion
Optional field(s):
- user-agent : user agent to be used during the request of the url
if this argument is not provided, then the default
user agent in the config file will be used.
"""
try:
data = request.get_json()
if data is None:
raise BadRequest
file_path = run_job(data, "website")
return send_file(file_path)
except BadRequest as e:
return prepare_400("api_request_website", str(e))
except ValueError as e:
return prepare_400("api_request_website", str(e))
except Exception as e:
print type(e)
return prepare_500("api_request_website", str(e))
def api_basic_results(job_id):
"""Establishes endpoint for retrieval of historical boomerang results
:param job_id: id of the specific job that you wish to look up
"""
if minion.config['BOOMERANG_STORE_RESULTS']:
try:
file_path = get_job_results(job_id)
return send_file(file_path)
except BadRequest as e:
return prepare_400("api_basic_results", str(e))
except ValueError:
return prepare_404('api_basic_results')
except Exception as e:
return prepare_500("api_basic_results", str(e))
else:
return prepare_400("api_basic_results", error="Config set to not store results")
def GET_v1_compile_job_id_hex(job_id):
"""Download a compiled firmware
"""
job = get_job_metadata(job_id)
if not job:
return error("Compile job not found", 404)
if job['result']['firmware']:
return send_file(job['result']['firmware'], mimetype='application/octet-stream', as_attachment=True, attachment_filename=job['result']['firmware_filename'])
return error("Compile job not finished or other error.", 422)
def GET_v1_compile_job_id_src(job_id):
"""Download a completed compile job.
"""
job = get_job_metadata(job_id)
if not job:
return error("Compile job not found", 404)
if job['result']['firmware']:
source_zip = qmk_storage.get('%(id)s/%(source_archive)s' % job['result'])
return send_file(source_zip, mimetype='application/octet-stream', as_attachment=True, attachment_filename=job['result']['source_archive'])
return error("Compile job not finished or other error.", 422)