python类send_file()的实例源码

__init__.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_zip(self, project, ty):
        """Get a ZIP file directly from uploaded directory
        or generate one on the fly and upload it if not existing."""
        filename = self.download_name(project, ty)
        if not self.zip_existing(project, ty):
            print "Warning: Generating %s on the fly now!" % filename
            self._make_zip(project, ty)
        if isinstance(uploader, local.LocalUploader):
            filepath = self._download_path(project)
            res = send_file(filename_or_fp=safe_join(filepath, filename),
                            mimetype='application/octet-stream',
                            as_attachment=True,
                            attachment_filename=filename)
            # fail safe mode for more encoded filenames.
            # It seems Flask and Werkzeug do not support RFC 5987 http://greenbytes.de/tech/tc2231/#encoding-2231-char
            # res.headers['Content-Disposition'] = 'attachment; filename*=%s' % filename
            return res
        else:
            return redirect(url_for('rackspace', filename=filename,
                                    container=self._container(project),
                                    _external=True))
app.py 文件源码 项目:hack-lastfm 作者: CSEC-NITH 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def collage():
    if request.method=='GET':
        return render_template('collage.html',input=True)
    elif request.method == 'POST':
        username = request.form['lastfm_username']
        duration = request.form['duration']
        print(username)
        payload = {
            'user': username,
            'api_key': config.LASTFM_API_KEY,
            'method': 'user.gettopalbums',
            'period':duration,
            'limit':9,
            'format':'json'
        }
        r = requests.get(config.url, params = payload)
        filenames = get_album_art(r.text, username)
        generate_collage(filenames,username)
        return send_file('static/' + username+'.jpg', mimetype='image/jpg')
views.py 文件源码 项目:esdc-shipment 作者: erigones 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_image_file(uuid, image_file_path):
    """GetImageFile (GET /images/:uuid/file)

    Return the image file.
    """
    encoded_md5 = get_image_file_md5(uuid, image_file_path)

    if NGINX_ENABLED:
        res = make_response('')
        res.headers['Content-Type'] = 'application/octet-stream'
        res.headers['X-Accel-Redirect'] = '/static/images/%s/file?md5=%s' % (uuid, encoded_md5)
    else:
        res = make_response(send_file(image_file_path, add_etags=False, cache_timeout=0))

    res.headers['Content-Length'] = get_image_file_size(image_file_path)
    res.headers['Content-MD5'] = encoded_md5

    return res
job_handlers.py 文件源码 项目:encore 作者: statgen 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_job_output(job_id, filename, as_attach=False, mimetype=None, tail=None, head=None, job=None):
    try:
        output_file = job.relative_path(filename)
        if tail or head:
            if tail and head:
                return "Cannot specify tail AND head", 500
            cmd = "head" if head else "tail"
            count = tail or head
            p = subprocess.Popen([cmd, "-n", count, output_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            tail_data, tail_error = p.communicate()
            resp = make_response(tail_data)
            if as_attach:
                resp.headers["Content-Disposition"] = "attachment; filename={}".format(filename)
            if mimetype:
                resp.headers["Content-Type"] = mimetype
            return resp
        else:
            return send_file(output_file, as_attachment=as_attach, mimetype=mimetype)
    except Exception as e:
        print e
        return "File Not Found", 404
util.py 文件源码 项目:gepify 作者: nvlbg 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def send_file(filename, attachment_filename, mimetype, **kwargs):
    response = flask_send_file(filename, mimetype=mimetype)

    try:
        attachment_filename = attachment_filename.encode('latin-1')
    except UnicodeEncodeError:
        filenames = {
            'filename': unicodedata
                .normalize('NFKD', attachment_filename)
                .encode('latin-1', 'ignore'),
            'filename*': "UTF-8''{}".format(
                url_quote(attachment_filename)),
        }
    else:
        filenames = {'filename': attachment_filename}

    response.headers.set(
        'Content-Disposition', 'attachment', **filenames)
    return response
helpers.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_send_file_xsendfile(self):
        app = flask.Flask(__name__)
        app.use_x_sendfile = True
        with app.test_request_context():
            rv = flask.send_file('static/index.html')
            self.assert_true(rv.direct_passthrough)
            self.assert_in('x-sendfile', rv.headers)
            self.assert_equal(rv.headers['x-sendfile'],
                os.path.join(app.root_path, 'static/index.html'))
            self.assert_equal(rv.mimetype, 'text/html')
            rv.close()
static.py 文件源码 项目:rc-niceties 作者: mjec 项目源码 文件源码 阅读 75 收藏 0 点赞 0 评论 0
def serve_static_files(p, index_on_error=True):
    """Securely serve static files for the given path using send_file."""

    # Determine the canonical path of the file
    full_path = os.path.realpath(os.path.join(app.static_folder, p))

    # We have a problem if either:
    #   - the path is not a sub-path of app.static_folder; or
    #   - the path does not refer to a real file.
    if (os.path.commonprefix([app.static_folder, full_path]) != app.static_folder or
            not os.path.isfile(full_path)):
        file_to_return = app.config.get('STATIC_FILE_ON_404', None)
        if file_to_return is not None:
            full_path = os.path.realpath(os.path.join(app.static_folder, file_to_return))
        else:
            return abort(404)
    return send_file(full_path)
server.py 文件源码 项目:mal 作者: chaosking121 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def shrug():
    import os, os.path
    from tools.slack_posting import postImage

    filename = os.path.join(os.getcwd(), tools.settings.getValue('misc', 'shinoa_shrug'))

    if request.method == 'GET':
        return send_file(filename, mimetype='image/{}'.format(filename.split('.')[-1]))

    elif request.method == 'POST':
        if (not (isValidCommand(request, 'shrug'))):
            return "You dun goof'd."

        postImage(filename, request.form['channel_id'], None, "shrug", "*shrug*")

        return ""
handlers.py 文件源码 项目:veripress 作者: veripress 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def pages(page_path):
    if not validate_custom_page_path(page_path):
        raise ApiException(error=Error.NOT_ALLOWED, message='The visit of path "{}" is not allowed.'.format(page_path))

    rel_url, exists = storage.fix_relative_url('page', page_path)
    if exists:
        file_path = rel_url
        return send_file(file_path)
    elif rel_url is None:  # pragma: no cover, it seems impossible to make this happen, see code of 'fix_relative_url'
        raise ApiException(error=Error.BAD_PATH, message='The path "{}" cannot be recognized.'.format(page_path))
    else:
        page_d = cache.get('api-handler.' + rel_url)
        if page_d is not None:
            return page_d  # pragma: no cover, here just get the cached dict

        page = storage.get_page(rel_url, include_draft=False)
        if page is None:
            raise ApiException(error=Error.RESOURCE_NOT_EXISTS)
        page_d = page.to_dict()
        del page_d['raw_content']
        page_d['content'] = get_parser(page.format).parse_whole(page.raw_content)

        cache.set('api-handler.' + rel_url, page_d, timeout=2 * 60)
        return page_d
api.py 文件源码 项目:cuckoodroid-2.0 作者: idanr1986 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def report_get(task_id, report_format="json"):
    task = Task.query.get(task_id)
    if not task:
        return json_error(404, "Task not found")

    if task.status == Task.DELETED:
        return json_error(404, "Task report has been deleted")

    if task.status != Task.FINISHED:
        return json_error(420, "Task not finished yet")

    report_path = os.path.join(settings.reports_directory,
                               "%d" % task_id, "report.%s" % report_format)
    if not os.path.isfile(report_path):
        return json_error(404, "Report format not found")

    return send_file(report_path)
api.py 文件源码 项目:cuckoodroid-2.0 作者: idanr1986 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def pcap_get(task_id):
    task = Task.query.get(task_id)
    if not task:
        return json_error(404, "Task not found")

    if task.status == Task.DELETED:
        return json_error(404, "Task files has been deleted")

    if task.status != Task.FINISHED:
        return json_error(420, "Task not finished yet")

    pcap_path = os.path.join(settings.reports_directory,
                             "%s" % task_id, "dump.pcap")
    if not os.path.isfile(pcap_path):
        return json_error(404, "Pcap file not found")

    return send_file(pcap_path)
web_server.py 文件源码 项目:shakecast 作者: usgs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def shakemap_overlay(shakemap_id):
    session = Session()
    shakemap = (session.query(ShakeMap)
                    .filter(ShakeMap.shakemap_id == shakemap_id)
                    .order_by(desc(ShakeMap.shakemap_version))
                    .limit(1)).first()
    if shakemap is not None:
        img = os.path.join(app.config['EARTHQUAKES'],
                           shakemap_id,
                           shakemap_id + '-' + str(shakemap.shakemap_version),
                           'ii_overlay.png')

    else:
        img = app.send_static_file('sc_logo.png')

    Session.remove()
    return send_file(img, mimetype='image/gif')
server.py 文件源码 项目:esper 作者: scanner-research 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def index():
    path = request.args.get('path')
    frame = request.args.get('frame')
    id = request.args.get('id')

    basename = os.path.split(path)[1]
    video_path = '{}/{}'.format(FILE_DIR, basename)

    if not os.path.isfile(video_path):
        blob = bucket.get_blob(path)
        with open(video_path, 'wb') as f:
            blob.download_to_file(f)

    sp.check_call(shlex.split("ffmpeg -y -i {} -vf \"select=eq(n\\,{})\" -frames:v 1 {}/%05d.jpg".format(video_path, frame, FILE_DIR)))
    blob = bucket.blob('public/thumbnails/tvnews/frame_{}.jpg'.format(id))
    img_path = '{}/00001.jpg'.format(FILE_DIR)
    blob.upload_from_filename(img_path)

    return send_file(img_path)
webhook.py 文件源码 项目:talkback 作者: Axilent 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def media_file(filename):
    """ 
    Retrieves a media file.
    """
    outfile = media_storage.get_file(filename)
    return send_file(outfile,mimetype='image/png')
run.py 文件源码 项目:apiTest 作者: wuranxu 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def showHtml(filename):
    return send_file(os.path.join(app.config['REPORT'], filename))


# ????????
run.py 文件源码 项目:apiTest 作者: wuranxu 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def download_homework(filename):
    return send_file(os.path.join(app.config['HOMEWORK'], filename))
__init__.py 文件源码 项目:bock 作者: afreeorange 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def scripts():
    return send_file('ui/cached_dist/Bock.js')
__init__.py 文件源码 项目:bock 作者: afreeorange 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def styles():
    return send_file('ui/cached_dist/Bock.css')
__init__.py 文件源码 项目:bock 作者: afreeorange 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def fonts(route):
    return send_file('ui/cached_dist/fonts/{}'.format(route))
__init__.py 文件源码 项目:bock 作者: afreeorange 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def route(route):
    return send_file('ui/cached_dist/index.html')
__init__.py 文件源码 项目:bock 作者: afreeorange 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def index():
    return send_file('ui/cached_dist/index.html')
helpers.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_send_file_regular(self):
        app = flask.Flask(__name__)
        with app.test_request_context():
            rv = flask.send_file('static/index.html')
            self.assert_true(rv.direct_passthrough)
            self.assert_equal(rv.mimetype, 'text/html')
            with app.open_resource('static/index.html') as f:
                rv.direct_passthrough = False
                self.assert_equal(rv.data, f.read())
            rv.close()
helpers.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_attachment(self):
        app = flask.Flask(__name__)
        with catch_warnings() as captured:
            with app.test_request_context():
                f = open(os.path.join(app.root_path, 'static/index.html'))
                rv = flask.send_file(f, as_attachment=True)
                value, options = parse_options_header(rv.headers['Content-Disposition'])
                self.assert_equal(value, 'attachment')
                rv.close()
            # mimetypes + etag
            self.assert_equal(len(captured), 2)

        with app.test_request_context():
            self.assert_equal(options['filename'], 'index.html')
            rv = flask.send_file('static/index.html', as_attachment=True)
            value, options = parse_options_header(rv.headers['Content-Disposition'])
            self.assert_equal(value, 'attachment')
            self.assert_equal(options['filename'], 'index.html')
            rv.close()

        with app.test_request_context():
            rv = flask.send_file(StringIO('Test'), as_attachment=True,
                                 attachment_filename='index.txt',
                                 add_etags=False)
            self.assert_equal(rv.mimetype, 'text/plain')
            value, options = parse_options_header(rv.headers['Content-Disposition'])
            self.assert_equal(value, 'attachment')
            self.assert_equal(options['filename'], 'index.txt')
            rv.close()
helpers.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_static_file(self):
        app = flask.Flask(__name__)
        # default cache timeout is 12 hours
        with app.test_request_context():
            # Test with static file handler.
            rv = app.send_static_file('index.html')
            cc = parse_cache_control_header(rv.headers['Cache-Control'])
            self.assert_equal(cc.max_age, 12 * 60 * 60)
            rv.close()
            # Test again with direct use of send_file utility.
            rv = flask.send_file('static/index.html')
            cc = parse_cache_control_header(rv.headers['Cache-Control'])
            self.assert_equal(cc.max_age, 12 * 60 * 60)
            rv.close()
        app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 3600
        with app.test_request_context():
            # Test with static file handler.
            rv = app.send_static_file('index.html')
            cc = parse_cache_control_header(rv.headers['Cache-Control'])
            self.assert_equal(cc.max_age, 3600)
            rv.close()
            # Test again with direct use of send_file utility.
            rv = flask.send_file('static/index.html')
            cc = parse_cache_control_header(rv.headers['Cache-Control'])
            self.assert_equal(cc.max_age, 3600)
            rv.close()
        class StaticFileApp(flask.Flask):
            def get_send_file_max_age(self, filename):
                return 10
        app = StaticFileApp(__name__)
        with app.test_request_context():
            # Test with static file handler.
            rv = app.send_static_file('index.html')
            cc = parse_cache_control_header(rv.headers['Cache-Control'])
            self.assert_equal(cc.max_age, 10)
            rv.close()
            # Test again with direct use of send_file utility.
            rv = flask.send_file('static/index.html')
            cc = parse_cache_control_header(rv.headers['Cache-Control'])
            self.assert_equal(cc.max_age, 10)
            rv.close()
app.py 文件源码 项目:supervising-ui 作者: USCDataScience 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def document():
    url = request.args.get('url')
    if not url or not os.path.exists(url):
        return abort(400, "File %s not found " % url)
    return send_file(url)
minion.py 文件源码 项目:boomerang 作者: EmersonElectricCo 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def api_request_basic():
    """Establishes the endpoint that is used for the basic fetch mode

    POST body should be json formated.
    Required field(s):
      - url : actual url that is requested to be fetched by the minion

    Optional field(s):
      - user-agent : user agent to be used during the request of the url
                     if this argument is not provided, then the default
                     user agent in the config file will be used.

    """
    try:
        data = request.get_json()
        if data is None:
            raise BadRequest

        file_path = run_job(data, "basic")
        return send_file(file_path)

    except BadRequest as e:
        return prepare_400("api_request_basic", str(e))
    except ValueError as e:
        return prepare_400("api_request_basic", str(e))
    except Exception as e:
        print type(e)
        return prepare_500("api_request_basic", str(e))
minion.py 文件源码 项目:boomerang 作者: EmersonElectricCo 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def api_request_website():
    """Establishes the endpoint that is used for the website fetch mode

    POST body should be json formated.
    Required field(s):
      - url : actual url that is requested to be fetched by the minion

    Optional field(s):
      - user-agent : user agent to be used during the request of the url
                     if this argument is not provided, then the default
                     user agent in the config file will be used.


    """
    try:
        data = request.get_json()
        if data is None:
            raise BadRequest

        file_path = run_job(data, "website")
        return send_file(file_path)

    except BadRequest as e:
        return prepare_400("api_request_website", str(e))
    except ValueError as e:
        return prepare_400("api_request_website", str(e))
    except Exception as e:
        print type(e)
        return prepare_500("api_request_website", str(e))
minion.py 文件源码 项目:boomerang 作者: EmersonElectricCo 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def api_basic_results(job_id):
    """Establishes endpoint for retrieval of historical boomerang results

    :param job_id: id of the specific job that you wish to look up

    """
    if minion.config['BOOMERANG_STORE_RESULTS']:
        try:
            file_path = get_job_results(job_id)
            return send_file(file_path)
        except BadRequest as e:
            return prepare_400("api_basic_results", str(e))
        except ValueError:
            return prepare_404('api_basic_results')
        except Exception as e:
            return prepare_500("api_basic_results", str(e))
    else:
        return prepare_400("api_basic_results", error="Config set to not store results")
web.py 文件源码 项目:qmk_compiler_api 作者: qmk 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def GET_v1_compile_job_id_hex(job_id):
    """Download a compiled firmware
    """
    job = get_job_metadata(job_id)
    if not job:
        return error("Compile job not found", 404)

    if job['result']['firmware']:
        return send_file(job['result']['firmware'], mimetype='application/octet-stream', as_attachment=True, attachment_filename=job['result']['firmware_filename'])

    return error("Compile job not finished or other error.", 422)
web.py 文件源码 项目:qmk_compiler_api 作者: qmk 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def GET_v1_compile_job_id_src(job_id):
    """Download a completed compile job.
    """
    job = get_job_metadata(job_id)
    if not job:
        return error("Compile job not found", 404)

    if job['result']['firmware']:
        source_zip = qmk_storage.get('%(id)s/%(source_archive)s' % job['result'])
        return send_file(source_zip, mimetype='application/octet-stream', as_attachment=True, attachment_filename=job['result']['source_archive'])

    return error("Compile job not finished or other error.", 422)


问题


面经


文章

微信
公众号

扫码关注公众号