def bookmarklet_js():
base_url = request.url.replace(
"browser-tools/bookmarklet.js",
"static/browser-tools/"
)
if "localhost:" not in base_url:
# seems like this shouldn't be necessary. but i think
# flask's request.url is coming in with http even when
# we asked for https on the server. weird.
base_url = base_url.replace("http://", "https://")
rendered = render_template(
"browser-tools/bookmarklet.js",
base_url=base_url
)
resp = make_response(rendered, 200)
resp.mimetype = "application/javascript"
return resp
python类make_response()的实例源码
def session_chain_add(session_name, table_name, chain_name):
try:
req = request.get_json()
if req is None:
req = {}
fw.sessions[session_name].chains.add(
name=chain_name, table=table_name, **req)
return make_response(jsonify(message='Chain created'), status.HTTP_201_CREATED)
except ChainExists as e:
http_code = status.HTTP_409_CONFLICT
except ChainNotValid as e:
http_code = status.HTTP_400_BAD_REQUEST
except Exception as e:
http_code = status.HTTP_500_INTERNAL_SERVER_ERROR
return make_response(jsonify(message='Failed to create chain', error=e.message), http_code)
def get(self):
if request.cookies.get('save_id'):
resp = make_response(redirect(url_for('.exit')))
resp.set_cookie('user_name', expires=0)
resp.set_cookie('login_time', expires=0)
resp.set_cookie('save_id', expires=0)
return resp
if session.get('name'):
session.pop('name')
if session.get('show_name'):
session.pop('show_name')
if session.get('user_id'):
session.pop('user_id')
return redirect(url_for('.login'))
# ?config.json ???? is_register ?false??????? ??????????????
def post_file():
if "var.json" in request.files.keys():
usrvar = json.load(request.files["var.json"])
else:
return make_response("no var.json", 200)
if "data.csv" in request.files.keys():
csvfile = request.files["data.csv"]
resp = { "server-connected": False, "data-posted": False, "err-occur": False }
rpmng = ReportManager(usrvar)
try:
if rpmng.connect_server():
resp["server-connected"] = True
if rpmng.submit_progress(csvfile=csvfile): resp["data-posted"] = True
except:
resp["err-occur"] = True
rpmng.finalize()
return make_response(jsonify(resp), 200)
else:
return make_response("no data.csv", 200)
def session_interface_add(session_name, interface_name):
try:
req = request.get_json()
if req is None:
req = {}
fw.sessions[session_name].interfaces.add(
name=interface_name, **req)
return make_response(jsonify(message='Interface created'), status.HTTP_201_CREATED)
except InterfaceExists as e:
http_code = status.HTTP_409_CONFLICT
except InterfaceNotValid as e:
http_code = status.HTTP_400_BAD_REQUEST
except Exception as e:
http_code = status.HTTP_500_INTERNAL_SERVER_ERROR
return make_response(jsonify(message='Failed to create interface', error=e.message), http_code)
def post(self):
if (request.form['username']):
data = {"user": request.form['username'], "key": request.form['password']}
result = dockletRequest.unauthorizedpost('/login/', data)
ok = result and result.get('success', None)
if (ok and (ok == "true")):
# set cookie:docklet-jupyter-cookie for jupyter notebook
resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
app_key = os.environ['APP_KEY']
resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(request.form['username'], app_key))
# set session for docklet
session['username'] = request.form['username']
session['nickname'] = result['data']['nickname']
session['description'] = result['data']['description']
session['avatar'] = '/static/avatar/'+ result['data']['avatar']
session['usergroup'] = result['data']['group']
session['status'] = result['data']['status']
session['token'] = result['data']['token']
return resp
else:
return redirect('/login/')
else:
return redirect('/login/')
def get(self):
form = external_generate.external_auth_generate_request()
result = dockletRequest.unauthorizedpost('/external_login/', form)
ok = result and result.get('success', None)
if (ok and (ok == "true")):
# set cookie:docklet-jupyter-cookie for jupyter notebook
resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
app_key = os.environ['APP_KEY']
resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key))
# set session for docklet
session['username'] = result['data']['username']
session['nickname'] = result['data']['nickname']
session['description'] = result['data']['description']
session['avatar'] = '/static/avatar/'+ result['data']['avatar']
session['usergroup'] = result['data']['group']
session['status'] = result['data']['status']
session['token'] = result['data']['token']
return resp
else:
return redirect('/login/')
def post(self):
form = external_generate.external_auth_generate_request()
result = dockletRequest.unauthorizedpost('/external_login/', form)
ok = result and result.get('success', None)
if (ok and (ok == "true")):
# set cookie:docklet-jupyter-cookie for jupyter notebook
resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
app_key = os.environ['APP_KEY']
resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key))
# set session for docklet
session['username'] = result['data']['username']
session['nickname'] = result['data']['nickname']
session['description'] = result['data']['description']
session['avatar'] = '/static/avatar/'+ result['data']['avatar']
session['usergroup'] = result['data']['group']
session['status'] = result['data']['status']
session['token'] = result['data']['token']
return resp
else:
return redirect('/login/')
def verify_request_signature(func):
@wraps(func)
def decorated(*args, **kwargs):
signature = request.headers.get('x-hub-signature', None)
if signature:
elements = signature.split('=')
method = elements[0]
signature_hash = elements[1]
expected_hash = hmac.new(APP_SECRET, msg=request.get_data(), digestmod=method).hexdigest()
if signature_hash != expected_hash:
LOGGER.error('Signature was invalid')
return make_response('', 403)
else:
LOGGER.error('Could not validate the signature')
return func(*args, **kwargs)
return decorated
def webhook_callback():
data = request.json
if data['object'] == 'page':
for page_entry in data['entry']:
page_id = page_entry['id']
time_of_event = page_entry['time']
for message_event in page_entry['messaging']:
if 'optin' in message_event:
LOGGER.info('Webhook received message event: option from page %s at %d', page_id, time_of_event)
elif 'message' in message_event:
received_message(message_event)
elif 'delivery' in message_event:
LOGGER.info('Webhook received message event: delivery from page %s at %d', page_id, time_of_event)
elif 'postback' in message_event:
received_postback(message_event)
elif 'read' in message_event:
LOGGER.info('Webhook received message event: read from page %s at %d', page_id, time_of_event)
elif 'account_linking' in message_event:
LOGGER.info('Webhook received message event: account linking from page %s at %d', page_id,
time_of_event)
else:
LOGGER.info('Webhook received unknown message event: %s from page %s at %d', message_event, page_id,
time_of_event)
return make_response('', 200)
def test_view_decorators(self):
app = flask.Flask(__name__)
def add_x_parachute(f):
def new_function(*args, **kwargs):
resp = flask.make_response(f(*args, **kwargs))
resp.headers['X-Parachute'] = 'awesome'
return resp
return new_function
class Index(flask.views.View):
decorators = [add_x_parachute]
def dispatch_request(self):
return 'Awesome'
app.add_url_rule('/', view_func=Index.as_view('index'))
c = app.test_client()
rv = c.get('/')
self.assert_equal(rv.headers['X-Parachute'], 'awesome')
self.assert_equal(rv.data, b'Awesome')
def test_make_response(self):
app = flask.Flask(__name__)
with app.test_request_context():
rv = flask.make_response()
self.assert_equal(rv.status_code, 200)
self.assert_equal(rv.data, b'')
self.assert_equal(rv.mimetype, 'text/html')
rv = flask.make_response('Awesome')
self.assert_equal(rv.status_code, 200)
self.assert_equal(rv.data, b'Awesome')
self.assert_equal(rv.mimetype, 'text/html')
rv = flask.make_response('W00t', 404)
self.assert_equal(rv.status_code, 404)
self.assert_equal(rv.data, b'W00t')
self.assert_equal(rv.mimetype, 'text/html')
def __call__(self, handler=None, error=None, request_type=None):
"""
:type error: ??
:param request_type: ????
:return:
"""
result = super(HTMLResponse, self).__call__(handler, error, request_type)
if self.request_type == RequestTypeEnum.Flask:
from flask import make_response
html = '<p>code:%s message:%s</p>' % (result["code"], result["message"])
response = make_response(html)
response.headers["Content-Type"] = "text/html; charset=utf-8"
return response
else:
html = '<p>code:%s message:%s</p>' % (result["code"], result["message"])
self.handler.set_header("Content-Type", "text/html; charset=utf-8")
return self.handler.write(html)
def password_required(short_name):
(project, owner, n_tasks, n_task_runs,
overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
form = PasswordForm(request.form)
if request.method == 'POST' and form.validate():
password = request.form.get('password')
cookie_exp = current_app.config.get('PASSWD_COOKIE_TIMEOUT')
passwd_mngr = ProjectPasswdManager(CookieHandler(request, signer, cookie_exp))
if passwd_mngr.validates(password, project):
response = make_response(redirect(request.args.get('next')))
return passwd_mngr.update_response(response, project, get_user_id_or_ip())
flash(gettext('Sorry, incorrect password'))
return render_template('projects/password.html',
project=project,
form=form,
short_name=short_name,
next=request.args.get('next'))
def run_cmd():
"""Execute general (non-host specific) command."""
#Read 'command' argument from query string.
comm = request.args.get('command')
if not comm or comm in EXCLUDED_FUNCTIONS:
abort(400)
try:
#Read 'args' argument from query string.
args = request.args.getlist('args')
#Cast arguments to string.
for i, val in enumerate(args):
args[i] = str(val)
except KeyError:
args = []
#Execute command.
ret = RSPET_API.call_plugin(comm, args)
if ret['code'] == rspet_server.ReturnCodes.OK:
http_code = 200
else:
http_code = 404
return make_response(jsonify(ret), http_code)
def ThreeDTilesRead(table, column, bounds, lod):
session = Session(table, column)
# offsets = [round(off, 2) for off in list_from_str(offsets)]
box = list_from_str(bounds)
# requested = [scales, offsets]
stored_patches = session.lopocstable.filter_stored_output()
schema = stored_patches['point_schema']
pcid = stored_patches['pcid']
# scales = [scale] * 3
scales = stored_patches['scales']
offsets = stored_patches['offsets']
[tile, npoints] = get_points(session, box, lod, offsets, pcid, scales, schema)
if Config.DEBUG:
tile.sync()
print("NPOINTS: ", npoints)
# build flask response
response = make_response(tile.to_array().tostring())
response.headers['content-type'] = 'application/octet-stream'
return response
def api_send_loves():
sender = request.form.get('sender')
recipients = sanitize_recipients(request.form.get('recipient'))
message = request.form.get('message')
try:
recipients = send_loves(recipients, message, sender_username=sender)
recipients_display_str = ', '.join(recipients)
link_url = create_love_link(recipients_display_str, message).url
return make_response(
u'Love sent to {}! Share: {}'.format(recipients_display_str, link_url),
LOVE_CREATED_STATUS_CODE,
{}
)
except TaintedLove as exc:
return make_response(
exc.user_message,
LOVE_FAILED_STATUS_CODE if exc.is_error else LOVE_CREATED_STATUS_CODE,
{}
)
def test_view_decorators(self):
app = flask.Flask(__name__)
def add_x_parachute(f):
def new_function(*args, **kwargs):
resp = flask.make_response(f(*args, **kwargs))
resp.headers['X-Parachute'] = 'awesome'
return resp
return new_function
class Index(flask.views.View):
decorators = [add_x_parachute]
def dispatch_request(self):
return 'Awesome'
app.add_url_rule('/', view_func=Index.as_view('index'))
c = app.test_client()
rv = c.get('/')
self.assert_equal(rv.headers['X-Parachute'], 'awesome')
self.assert_equal(rv.data, b'Awesome')
def test_make_response(self):
app = flask.Flask(__name__)
with app.test_request_context():
rv = flask.make_response()
self.assert_equal(rv.status_code, 200)
self.assert_equal(rv.data, b'')
self.assert_equal(rv.mimetype, 'text/html')
rv = flask.make_response('Awesome')
self.assert_equal(rv.status_code, 200)
self.assert_equal(rv.data, b'Awesome')
self.assert_equal(rv.mimetype, 'text/html')
rv = flask.make_response('W00t', 404)
self.assert_equal(rv.status_code, 404)
self.assert_equal(rv.data, b'W00t')
self.assert_equal(rv.mimetype, 'text/html')
def plot_correlation():
"""
Generate the Correlation heat map as a PNG
Parameters
----------
job_id: str
Returns
-------
image/png
"""
job_id = request.args.get('job_id')
if job_id is None:
return None
job = mongo_get_job(job_id)
s3_file_key = job['s3_file_key']
data = s3_to_df(s3_file_key)
fig = plot_correlation_fig(data)
correlation_plot = fig_to_png(fig)
response = make_response(correlation_plot.getvalue())
response.mimetype = 'image/png'
return response
def convert(content_type, content):
cachekey = request.path
cachevalue = cache.get(cachekey)
if cachevalue is None:
content = decode(content)
content_type = decode(content_type).decode()
if content_type.startswith('image/svg+xml'):
content = __svg2png(content)
content_type = 'image/png'
cache.set(cachekey, (content, content_type), timeout=CACHE_RETENTION)
else:
content = cachevalue[0]
content_type = cachevalue[1]
response = make_response(content)
response.content_type = content_type
response.cache_control.max_age = CACHE_RETENTION
return response
def get_inbox():
pyldnlog.debug("Requested inbox data of {} in {}".format(request.url, request.headers['Accept']))
if not request.headers['Accept'] or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']:
resp = make_response(inbox_graph.serialize(format='application/ld+json'))
resp.headers['Content-Type'] = 'application/ld+json'
elif request.headers['Accept'] in ACCEPTED_TYPES:
resp = make_response(inbox_graph.serialize(format=request.headers['Accept']))
resp.headers['Content-Type'] = request.headers['Accept']
else:
return 'Requested format unavailable', 415
resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn'
resp.headers['Allow'] = "GET, HEAD, OPTIONS, POST"
resp.headers['Link'] = '<http://www.w3.org/ns/ldp#Resource>; rel="type", <http://www.w3.org/ns/ldp#RDFSource>; rel="type", <http://www.w3.org/ns/ldp#Container>; rel="type", <http://www.w3.org/ns/ldp#BasicContainer>; rel="type"'
resp.headers['Accept-Post'] = 'application/ld+json, text/turtle'
return resp
def get_notification(id):
pyldnlog.debug("Requested notification data of {}".format(request.url))
pyldnlog.debug("Headers: {}".format(request.headers))
# Check if the named graph exists
pyldnlog.debug("Dict key is {}".format(pyldnconf._inbox_url + id))
if pyldnconf._inbox_url + id not in graphs:
return 'Requested notification does not exist', 404
if 'Accept' not in request.headers or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']:
resp = make_response(graphs[pyldnconf._inbox_url + id].serialize(format='application/ld+json'))
resp.headers['Content-Type'] = 'application/ld+json'
elif request.headers['Accept'] in ACCEPTED_TYPES:
resp = make_response(graphs[pyldnconf._inbox_url + id].serialize(format=request.headers['Accept']))
resp.headers['Content-Type'] = request.headers['Accept']
else:
return 'Requested format unavailable', 415
resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn'
resp.headers['Allow'] = "GET"
return resp
def post(self, meta_type):
try:
if meta_type == 'item':
service = self.item_meta_service
new_metadata = ItemMetadata(request.get_json(), version=1, active=True)
elif meta_type == 'user':
service = self.user_meta_service
new_metadata = UserMetadata(request.get_json(), version=1, active=True)
else:
raise StatusCodeException('Invalid type', 400)
if not service.get_active():
service.insert(new_metadata.to_database())
return make_response(new_metadata.to_json())
else:
raise StatusCodeException('Conflict', 409)
except StatusCodeException as ex:
return ex.to_response()
except Exception as ex:
return StatusCodeException(ex.message, 500).to_response()
def get(self, meta_type):
try:
if meta_type == 'item':
json_metadata = [ItemMetadata(meta).to_json()
for meta in self.item_meta_service.get_all()]
elif meta_type == 'user':
json_metadata = [UserMetadata(meta).to_json()
for meta in self.user_meta_service.get_all()]
else:
raise StatusCodeException('Invalid type', 400)
return make_response(json_metadata)
except StatusCodeException as ex:
return ex.to_response()
except Exception as ex:
return StatusCodeException(ex.message, 500).to_response()
def validate_json(required):
def decorator(f):
@wraps(f)
def wrapper(*args, **kw):
try:
request.json
except Exception:
return json_response(
"This endpoint requires a json request body", 400
)
for r in required.split(","):
if r not in (request.json or {}):
log.warning(
"Required field not specified: %s, json is %s",
r, request.json
)
return make_response(jsonify(
{
"message": "Required field not specified: %s" % r,
"status": 500
}), 500)
return f(*args, **kw)
return wrapper
return decorator
def post_recipe():
payload = request.get_json()
topology = payload.get("topology")
scenarios = payload.get("scenarios")
headers = payload.get("headers")
#pattern = payload.get("header_pattern")
if not topology:
abort(400, "Topology required")
if not scenarios:
abort(400, "Failure scenarios required")
if headers and type(headers)!=dict:
abort(400, "Headers must be a dictionary")
# if not pattern:
# abort(400, "Header_pattern required")
appgraph = ApplicationGraph(topology)
fg = A8FailureGenerator(appgraph, a8_controller_url='{0}/v1/rules'.format(a8_controller_url), a8_controller_token=a8_controller_token, headers=headers, debug=debug)
fg.setup_failures(scenarios)
return make_response(jsonify(recipe_id=fg.get_id()), 201, {'location': url_for('get_recipe_results', recipe_id=fg.get_id())})
def test_view_decorators(self):
app = flask.Flask(__name__)
def add_x_parachute(f):
def new_function(*args, **kwargs):
resp = flask.make_response(f(*args, **kwargs))
resp.headers['X-Parachute'] = 'awesome'
return resp
return new_function
class Index(flask.views.View):
decorators = [add_x_parachute]
def dispatch_request(self):
return 'Awesome'
app.add_url_rule('/', view_func=Index.as_view('index'))
c = app.test_client()
rv = c.get('/')
self.assert_equal(rv.headers['X-Parachute'], 'awesome')
self.assert_equal(rv.data, b'Awesome')
def test_make_response(self):
app = flask.Flask(__name__)
with app.test_request_context():
rv = flask.make_response()
self.assert_equal(rv.status_code, 200)
self.assert_equal(rv.data, b'')
self.assert_equal(rv.mimetype, 'text/html')
rv = flask.make_response('Awesome')
self.assert_equal(rv.status_code, 200)
self.assert_equal(rv.data, b'Awesome')
self.assert_equal(rv.mimetype, 'text/html')
rv = flask.make_response('W00t', 404)
self.assert_equal(rv.status_code, 404)
self.assert_equal(rv.data, b'W00t')
self.assert_equal(rv.mimetype, 'text/html')
def test_make_response_with_response_instance(self):
app = flask.Flask(__name__)
with app.test_request_context():
rv = flask.make_response(
flask.jsonify({'msg': 'W00t'}), 400)
self.assertEqual(rv.status_code, 400)
self.assertEqual(rv.data, b'{\n "msg": "W00t"\n}')
self.assertEqual(rv.mimetype, 'application/json')
rv = flask.make_response(
flask.Response(''), 400)
self.assertEqual(rv.status_code, 400)
self.assertEqual(rv.data, b'')
self.assertEqual(rv.mimetype, 'text/html')
rv = flask.make_response(
flask.Response('', headers={'Content-Type': 'text/html'}),
400, [('X-Foo', 'bar')])
self.assertEqual(rv.status_code, 400)
self.assertEqual(rv.headers['Content-Type'], 'text/html')
self.assertEqual(rv.headers['X-Foo'], 'bar')