def support_jsonp(f):
"""
Wraps JSONified output for JSONP
https://gist.github.com/richardchien/7b7c2727feb3e8993845a3ce61bad808
"""
@wraps(f)
def decorated_function(*args, **kwargs):
callback = request.args.get('callback', False)
if callback:
content = str(callback) + '(' + f(*args, **kwargs).data.decode('utf-8') + ')'
return current_app.response_class(content, mimetype='application/json')
else:
return f(*args, **kwargs)
return decorated_function
python类response_class()的实例源码
def jsonp(func):
"""
Wraps JSONified output for JSONP requests.
"""
@wraps(func)
def decorated_function(*args, **kwargs):
callback = request.args.get('callback', False)
if callback:
data = str(func(*args, **kwargs).data)
content = str(callback) + '(' + data + ')'
mimetype = 'application/javascript'
return current_app.response_class(
content,
mimetype=mimetype
)
else:
return func(*args, **kwargs)
return decorated_function
def status(code):
def response(*args, **kwargs):
if args and kwargs:
raise TypeError("Cannot pass both args and kwargs.")
elif len(args) == 1:
data = args[0]
else:
data = args or kwargs
if data:
data = (json.dumps(data), '\n')
else:
data = None
return current_app.response_class(
response=data,
status=code,
mimetype=current_app.config['JSONIFY_MIMETYPE']
)
return response
def stream(self):
"""
A view function that streams server-sent events. Ignores any
:mailheader:`Last-Event-ID` headers in the HTTP request.
Use a "channel" query parameter to stream events from a different
channel than the default channel (which is "sse").
"""
channel = request.args.get('channel') or 'sse'
@stream_with_context
def generator():
for message in self.messages(channel=channel):
yield str(message)
return current_app.response_class(
generator(),
mimetype='text/event-stream',
)
def stream(self):
"""
A view function that streams server-sent events. Ignores any
:mailheader:`Last-Event-ID` headers in the HTTP request.
"""
current_app.logger.debug('in ServerSentEventsBlueprint.stream')
@stream_with_context
def generator():
for message in self.messages():
lines = ["data:{value}".format(value=line) for line in
message.splitlines()]
lines.insert(0, "event:{value}".format(value='message'))
yield "\n".join(lines) + "\n\n"
return current_app.response_class(
generator(),
mimetype='text/event-stream',
)
def to_json_response(self, response, headers=None):
"""Serialize simple response to Flask response."""
if self.raw:
return response
response = current_app.response_class(
dumps(response, indent=2), mimetype='application/json')
if headers:
response.headers.extend(headers)
return response
def jsonpify(f):
"""Wrap JSONified output for JSONP."""
@wraps(f)
def decorated_function(*args, **kwargs):
callback = request.args.get('callback', False)
if callback:
content = str(callback) + '(' + str(f(*args, **kwargs).data) + ')'
return current_app.response_class(content,
mimetype='application/javascript')
else:
return f(*args, **kwargs)
return decorated_function
def json_response(fn):
@wraps(fn)
def wrapped(*args, **kwargs):
result = fn(*args, **kwargs)
if isinstance(result, current_app.response_class):
return result
if isinstance(result, (list, tuple)):
result = {'items': result}
data = json.dumps(result, cls=ModelJSONEncoder)
return current_app.response_class(data, mimetype='application/json')
return wrapped
def get_actions():
"""
Returns the possible actions configured on the system.
"""
return current_app.response_class(
json.dumps([
action.to_dict(domain=g.domain, user=g.user) for action in actions
]),
mimetype='application/json')
def send_stream():
conn = ConnectionPool.get_connection()
channel = getattr(session, 'sid', None)
if channel is None:
channel = 'common'
last_id = request.headers.get('Last-Event-Id')
if last_id is None:
last_id = request.args.get('lastid')
return current_app.response_class(
EvtStream(conn, channel, last_id),
direct_passthrough=True,
mimetype='text/event-stream')
def error_handler(ctx):
return current_app.response_class(error_to_json(ctx.exception),
mimetype='application/json', status=400)
def home(filename=None):
context = dict(
bower=full_url_for('home') + 'bower_components',
node_modules=full_url_for('home') + 'node_modules',
project_names=[proj for proj in crasher.projects],
type_names=[t for t in crasher.types],
thresholds=[str(thresh) for thresh in crasher.thresholds],
basehref=full_url_for('home'),
restbase=full_url_for('root'),
default_threshold=str(crasher.default_threshold),
fixed_fields=list(crasher.config.UserInterface.fixed_summary_fields),
)
if filename and os.path.exists(relative('ui/app/', filename)):
if 'main.css' in filename:
css = current_app.response_class(
render_template(filename, **context),
mimetype="text/css")
return css
else:
# It's a static file.
return send_from_directory(relative('ui/app/'), filename)
elif filename and 'views/' in filename and filename.endswith('.html'):
# 404 on missing view...
# If this is not here, Angular could try to load the index page in
# an infinite loop. Which is bad.
return '', 404
# Otherwise, it's a route in the web app.
return render_template('index.html', **context)
def jsonify_resource(resource):
indent = None
if current_app.config['JSONIFY_PRETTYPRINT_REGULAR'] \
and not request.is_xhr:
indent = 2
return current_app.response_class(json.dumps(resource, indent=indent),
mimetype='application/json')
def api_private_blockpage_count():
probe_cc = request.args.get('probe_cc')
if probe_cc is None:
raise Exception('err')
url = urljoin(current_app.config['CENTRIFUGATION_BASE_URL'],
'blockpage-count-%s.json' % probe_cc)
resp_text = json.dumps({'results': []})
resp = requests.get(url)
if resp.status_code != 404:
resp_text = resp.text
return current_app.response_class(
resp_text,
mimetype=current_app.config['JSONIFY_MIMETYPE']
)
def api_private_measurement_count_by_country():
url = urljoin(current_app.config['CENTRIFUGATION_BASE_URL'], 'count-by-country.json')
resp = requests.get(url)
return current_app.response_class(
resp.text,
mimetype=current_app.config['JSONIFY_MIMETYPE']
)
def api_private_measurement_count_total():
url = urljoin(current_app.config['CENTRIFUGATION_BASE_URL'], 'count-total.json')
resp = requests.get(url)
return current_app.response_class(
resp.text,
mimetype=current_app.config['JSONIFY_MIMETYPE']
)
def yamlfy(*args, **kwargs):
"""Create respose in YAML just like jsonify does it for JSON."""
if args and kwargs:
raise TypeError('yamlfy() behavior undefined when passed both args and kwargs')
elif len(args) == 1: # single args are passed directly to dumps()
data = args[0]
else:
data = args or kwargs
return current_app.response_class((yaml.dump(data), '\n'), mimetype="text/yaml")
def jsonify(mongo_doc, status=200, headers=None):
"""JSonifies a Mongo document into a Flask response object."""
return current_app.response_class(dumps(mongo_doc),
mimetype='application/json',
status=status,
headers=headers)
def bsonify(mongo_doc, status=200, headers=None):
"""BSonifies a Mongo document into a Flask response object."""
import bson
data = bson.BSON.encode(mongo_doc)
return current_app.response_class(data,
mimetype='application/bson',
status=status,
headers=headers)
def jsonp(f):
@wraps(f)
def decorated_function(*args, **kwargs):
callback = request.args.get('callback', None)
rtn = f(*args, **kwargs)
if isinstance(rtn, tuple):
content = '{0}({1})'.format(str(callback), rtn[0].data) if callback else rtn[0].data
status = rtn[1]
else:
content = '{0}({1})'.format(str(callback), rtn.data) if callback else rtn.data
status = 200
return current_app.response_class(content, mimetype='application/json', status=status)
return decorated_function
def jsonp(f):
@wraps(f)
def decorated_function(*args, **kwargs):
callback = request.args.get('callback', None)
rtn = f(*args, **kwargs)
if isinstance(rtn, tuple):
content = '{0}({1})'.format(str(callback), rtn[0].data) if callback else rtn[0].data
status = rtn[1]
else:
content = '{0}({1})'.format(str(callback), eval(rtn.data)) if callback else rtn.data
status = 200
return current_app.response_class(content, mimetype='application/json', status=status)
return decorated_function
def jsonpify(*args, **kw):
"""Passes the specified arguments directly to :func:`jsonify` with a status
code of 200, then wraps the response with the name of a JSON-P callback
function specified as a query parameter called ``'callback'`` (or does
nothing if no such callback function is specified in the request).
If the keyword arguments include the string specified by :data:`_HEADERS`,
its value must be a dictionary specifying headers to set before sending the
JSONified response to the client. Headers on the response will be
overwritten by headers specified in this dictionary.
If the keyword arguments include the string specified by :data:`_STATUS`,
its value must be an integer representing the status code of the response.
Otherwise, the status code of the response will be :http:status:`200`.
"""
# HACK In order to make the headers and status code available in the
# content of the response, we need to send it from the view function to
# this jsonpify function via its keyword arguments. This is a limitation of
# the mimerender library: it has no way of making the headers and status
# code known to the rendering functions.
headers = kw.pop(_HEADERS, {})
status_code = kw.pop(_STATUS, 200)
response = jsonify(*args, **kw)
callback = request.args.get('callback', False)
if callback:
# Reload the data from the constructed JSON string so we can wrap it in
# a JSONP function.
data = json.loads(response.data)
# Force the 'Content-Type' header to be 'application/javascript'.
#
# Note that this is different from the mimetype used in Flask for JSON
# responses; Flask uses 'application/json'. We use
# 'application/javascript' because a JSONP response is valid
# Javascript, but not valid JSON.
headers['Content-Type'] = 'application/javascript'
# Add the headers and status code as metadata to the JSONP response.
meta = _headers_to_json(headers) if headers is not None else {}
meta['status'] = status_code
inner = json.dumps(dict(meta=meta, data=data))
content = '{0}({1})'.format(callback, inner)
# Note that this is different from the mimetype used in Flask for JSON
# responses; Flask uses 'application/json'. We use
# 'application/javascript' because a JSONP response is not valid JSON.
mimetype = 'application/javascript'
response = current_app.response_class(content, mimetype=mimetype)
# Set the headers on the HTTP response as well.
if headers:
set_headers(response, headers)
response.status_code = status_code
return response
def get_measurement(measurement_id):
# XXX this query is SUPER slow
m = RE_MSM_ID.match(measurement_id)
if not m:
raise BadRequest("Invalid measurement_id")
msm_no = int(m.group(1))
q = current_app.db_session.query(
Measurement.report_no.label('report_no'),
Measurement.frame_off.label('frame_off'),
Measurement.frame_size.label('frame_size'),
Measurement.intra_off.label('intra_off'),
Measurement.intra_size.label('intra_size'),
Report.report_no.label('r_report_no'),
Report.autoclaved_no.label('r_autoclaved_no'),
Autoclaved.filename.label('a_filename'),
Autoclaved.autoclaved_no.label('a_autoclaved_no'),
).filter(Measurement.msm_no == msm_no)\
.join(Report, Report.report_no == Measurement.report_no)\
.join(Autoclaved, Autoclaved.autoclaved_no == Report.autoclaved_no)
try:
msmt = q.one()
except exc.MultipleResultsFound:
current_app.logger.warning("Duplicate rows for measurement_id: %s" % measurement_id)
msmt = q.first()
except exc.NoResultFound:
# XXX we should actually return a 404 here
raise BadRequest("No measurement found")
# Usual size of LZ4 frames is 256kb of decompressed text.
# Largest size of LZ4 frame was ~55Mb compressed and ~56Mb decompressed. :-/
range_header = "bytes={}-{}".format(msmt.frame_off, msmt.frame_off + msmt.frame_size - 1)
r = requests.get(urljoin(current_app.config['AUTOCLAVED_BASE_URL'], msmt.a_filename),
headers={"Range": range_header})
r.raise_for_status()
blob = r.content
if len(blob) != msmt.frame_size:
raise RuntimeError('Failed to fetch LZ4 frame', len(blob), msmt.frame_size)
blob = lz4framed.decompress(blob)[msmt.intra_off:msmt.intra_off+msmt.intra_size]
if len(blob) != msmt.intra_size or blob[:1] != b'{' or blob[-1:] != b'}':
raise RuntimeError('Failed to decompress LZ4 frame to measurement.json', len(blob), msmt.intra_size, blob[:1], blob[-1:])
# There is no replacement of `measurement_id` with `msm_no` or anything
# else to keep sanity. Maybe it'll happen as part of orchestration update.
# Also, blob is not decoded intentionally to save CPU
return current_app.response_class(
blob,
mimetype=current_app.config['JSONIFY_MIMETYPE']
)
def send_file(self, filename, base='fs', version=-1, cache_for=31536000):
"""Return an instance of the :attr:`~flask.Flask.response_class`
containing the named file, and implement conditional GET semantics
(using :meth:`~werkzeug.wrappers.ETagResponseMixin.make_conditional`).
.. code-block:: python
@app.route('/uploads/<path:filename>')
def get_upload(filename):
return mongo.send_file(filename)
:param str filename: the filename of the file to return
:param str base: the base name of the GridFS collections to use
:param bool version: if positive, return the Nth revision of the file
identified by filename; if negative, return the Nth most recent
revision. If no such version exists, return with HTTP status 404.
:param int cache_for: number of seconds that browsers should be
instructed to cache responses
"""
if not isinstance(base, text_type):
raise TypeError('"base" must be string or unicode')
if not isinstance(version, num_type):
raise TypeError('"version" must be an integer')
if not isinstance(cache_for, num_type):
raise TypeError('"cache_for" must be an integer')
storage = GridFS(self.db, base)
try:
fileobj = storage.get_version(filename=filename, version=version)
except NoFile:
abort(404)
# mostly copied from flask/helpers.py, with
# modifications for GridFS
data = wrap_file(request.environ, fileobj, buffer_size=1024 * 256)
response = current_app.response_class(
data,
mimetype=fileobj.content_type,
direct_passthrough=True)
response.content_length = fileobj.length
response.last_modified = fileobj.upload_date
response.set_etag(fileobj.md5)
response.cache_control.max_age = cache_for
response.cache_control.s_max_age = cache_for
response.cache_control.public = True
response.make_conditional(request)
return response
def send_file(self, filename, base='fs', version=-1, cache_for=31536000):
"""Return an instance of the :attr:`~flask.Flask.response_class`
containing the named file, and implement conditional GET semantics
(using :meth:`~werkzeug.wrappers.ETagResponseMixin.make_conditional`).
.. code-block:: python
@app.route('/uploads/<path:filename>')
def get_upload(filename):
return mongo.send_file(filename)
:param str filename: the filename of the file to return
:param str base: the base name of the GridFS collections to use
:param bool version: if positive, return the Nth revision of the file
identified by filename; if negative, return the Nth most recent
revision. If no such version exists, return with HTTP status 404.
:param int cache_for: number of seconds that browsers should be
instructed to cache responses
"""
if not isinstance(base, text_type):
raise TypeError('"base" must be string or unicode')
if not isinstance(version, num_type):
raise TypeError('"version" must be an integer')
if not isinstance(cache_for, num_type):
raise TypeError('"cache_for" must be an integer')
storage = GridFS(self.db, base)
try:
fileobj = storage.get_version(filename=filename, version=version)
except NoFile:
abort(404)
# mostly copied from flask/helpers.py, with
# modifications for GridFS
data = wrap_file(request.environ, fileobj, buffer_size=1024 * 256)
response = current_app.response_class(
data,
mimetype=fileobj.content_type,
direct_passthrough=True)
response.content_length = fileobj.length
response.last_modified = fileobj.upload_date
response.set_etag(fileobj.md5)
response.cache_control.max_age = cache_for
response.cache_control.s_max_age = cache_for
response.cache_control.public = True
response.make_conditional(request)
return response