def list_virtual_tns():
"""
The VirtualTN resource endpoint for listing VirtualTN's from the pool.
"""
virtual_tns = VirtualTN.query.all()
res = [{'value': tn.value, 'session_id': tn.session_id} for tn in virtual_tns]
available = len([tn.value for tn in virtual_tns if tn.session_id is None])
return Response(
json.dumps({"virtual_tns": res,
"pool_size": len(res),
"available": available,
"in_use": len(res) - available}),
content_type="application/json")
python类Response()的实例源码
def list_proxy_sessions():
"""
The ProxySession resource endpoint for listing ProxySessions
from the pool.
"""
sessions = ProxySession.query.all()
res = [{
'id': s.id,
'date_created': s.date_created.strftime('%Y-%m-%d %H:%M:%S'),
'virtual_tn': s.virtual_TN,
'participant_a': s.participant_a,
'participant_b': s.participant_b,
'expiry_date': s.expiry_date.strftime('%Y-%m-%d %H:%M:%S')
if s.expiry_date else None}
for s in sessions]
return Response(
json.dumps({"total_sessions": len(res), "sessions": res}),
content_type="application/json")
def delete_session():
"""
The ProxySession resource endpoint for removing a ProxySession
to the pool.
"""
body = request.json
try:
session_id = str(body['session_id'])
except (KeyError):
raise InvalidAPIUsage(
"Required argument: 'session_id' (str)",
payload={'reason':
'invalidAPIUsage'})
try:
session = ProxySession.query.filter_by(id=session_id).one()
except NoResultFound:
msg = ("ProxySession {} could not be deleted because"
" it does not exist".format(session_id))
log.info({"message": msg,
"status": "failed"})
raise InvalidAPIUsage(
msg, status_code=404,
payload={'reason':
'ProxySession not found'})
participant_a, participant_b, virtual_tn = ProxySession.terminate(
session_id)
recipients = [participant_a, participant_b]
send_message(
recipients,
virtual_tn.value,
SESSION_END_MSG,
session_id,
is_system_msg=True)
msg = "Ended session {} and released {} back to pool".format(
session_id, virtual_tn.value)
log.info({"message": msg, "status": "succeeded"})
return Response(
json.dumps({"message": "Successfully ended the session.",
"status": "succeeded",
"session_id": session_id}),
content_type="application/json")
def getSwaggerJson():
json_file = open(os.path.join(
"./", "messages.swagger.json"), "r")
json = json_file.read()
json_file.close()
resp = Response(response=json,
status=200,
mimetype="application/json")
return resp
def corpora_list():
page = 1
query = None
category = None
doc = None
if 'page' in request.args:
page = request.args['page']
if 'query' in request.args:
query = request.args['query']
if 'category' in request.args:
category = request.args['category']
if 'doc' in request.args:
doc = unquote(request.args['doc'])
return Response(json.dumps(get_sentences(page=page, query=query, category=category, document=doc)),
mimetype='application/json')
def corpora_docs_list():
return Response(json.dumps(get_doc_list()), mimetype='application/json')
def docs_list():
page = 1
doc_type = None
state = None
if 'page' in request.args:
page = request.args['page']
if 'doc_type' in request.args:
doc_type = request.args['doc_type']
if 'state' in request.args:
state = request.args['state']
states = [state]
if state == ST_PENDING:
states.append(ST_PROCESSING)
if state == ST_PROCESSED:
states.append(ST_ERROR)
return Response(json.dumps(get_queue_list(page=page, doc_type=doc_type, states=states), default=datetime_handler),
mimetype='application/json')
def geocoding_list():
activity_id = None
queue_id = None
if 'activity_id' in request.args:
activity_id = request.args['activity_id']
if 'queue_id' in request.args:
queue_id = request.args['queue_id']
return Response(json.dumps(get_geocoding_list(activity_id=activity_id, queue_id=queue_id),
default=datetime_handler), mimetype='application/json')
def activity_list():
document_id = None
if 'document_id' in request.args:
document_id = request.args['document_id']
return Response(json.dumps(get_activity_list(document_id=document_id),
default=datetime_handler), mimetype='application/json')
def haproxy_config():
return flask.Response(Haproxy().config, mimetype='application/txt')
def process_demoroom_members():
# Verify that the request is propery authorized
#authz = valid_request_check(request)
#if not authz[0]:
# return authz[1]
status = 200
if request.method == "POST":
data = request.form
try:
sys.stderr.write("Adding %s to demo room.\n" % (data["email"]))
reply=send_welcome_message(data["email"])
status = 201
resp = Response(reply, content_type='application/json', status=status)
except KeyError:
error = {"Error":"API Expects dictionary object with single element and key of 'email'"}
status = 400
resp = Response(json.dumps(error), content_type='application/json', status=status)
# demo_room_members = get_membership()
# resp = Response(
# json.dumps(demo_room_members, sort_keys=True, indent = 4, separators = (',', ': ')),
# content_type='application/json',
# status=status)
else:
resp = Response("OK", status=status)
return resp
def remove_authorised_client(ip_addr_str=None):
"""Forgets that a client has been seen recently to allow running tests"""
source_ip = request.headers["X-Forwarded-For"]
if source_ip in _client_map:
del _client_map[source_ip]
return Response(status=204)
def custom_css():
return Response(get_config("css"), mimetype='text/css')
# Static HTML files
def post_json(self, endpoint: str, data: JsonDict) -> Response:
return self.client.post(endpoint,
content_type="application/json",
data=json.dumps(data))
def test_permalinks_work(self):
db = InMemoryDemoDatabase()
app = make_app(build_dir=self.TEST_DIR, demo_db=db)
predictor = CountingPredictor()
app.predictors = {"counting": predictor}
app.testing = True
client = app.test_client()
def post(endpoint: str, data: JsonDict) -> Response:
return client.post(endpoint, content_type="application/json", data=json.dumps(data))
data = {"some": "input"}
response = post("/predict/counting", data=data)
assert response.status_code == 200
result = json.loads(response.get_data())
slug = result.get("slug")
assert slug is not None
response = post("/permadata", data={"slug": "not the right slug"})
assert response.status_code == 400
response = post("/permadata", data={"slug": slug})
assert response.status_code == 200
result2 = json.loads(response.get_data())
assert set(result2.keys()) == {"modelName", "requestData", "responseData"}
assert result2["modelName"] == "counting"
assert result2["requestData"] == data
assert result2["responseData"] == result
def show_page(page):
try:
valid_length = len(page) >= MIN_PAGE_NAME_LENGTH
valid_name = re.match('^[a-z]+$', page.lower()) is not None
if valid_length and valid_name:
return render_template("{}.html".format(page))
else:
msg = "Sorry, couldn't find page with name {}".format(page)
raise NotFound(msg)
except:
rollbar.report_exc_info()
return Response("404 Not Found")
def make_response(self, response_data, headers):
# TODO: pass in optional filename
filename = "response.csv"
headers["Content-Disposition"] = "attachment; filename=\"{}\"".format(filename)
headers["Content-Type"] = "{}; charset=utf-8".format(CSV_CONTENT_TYPE)
response = Response(self.csvify(response_data), mimetype=CSV_CONTENT_TYPE)
return response, headers
def csvify(self, response_data):
"""
Make Flask `Response` object, with data returned as a generator for the CSV content
The CSV is built from JSON-like object (Python `dict` or list of `dicts`)
"""
if "items" in response_data:
list_response_data = response_data["items"]
else:
list_response_data = [response_data]
response_fields = list(list_response_data[0].keys())
column_order = getattr(self.response_schema, "csv_column_order", None)
if column_order is None:
# We should still be able to return a CSV even if no column order has been specified
column_names = response_fields
else:
column_names = self.response_schema.csv_column_order
# The column order be only partially specified
column_names.extend([field_name for field_name in response_fields if field_name not in column_names])
output = StringIO()
csv_writer = writer(output, quoting=QUOTE_MINIMAL)
csv_writer.writerow(column_names)
for item in list_response_data:
csv_writer.writerow([item[column] for column in column_names])
# Ideally we'd want to `yield` each line to stream the content
# But something downstream seems to break streaming
yield output.getvalue()
def setup(self):
@self.app.route('/anomaly', methods = ['POST'])
def api_anomaly():
data = request.json
if request.headers['Content-Type'] == 'application/json':
success = self.app.monitor.process_anomaly_data(data)
return self.handle_response(success, data)
else:
return Response("Unsupported media type\n" + data, status=415)
@self.app.route('/monitor', methods = ['POST'])
def api_monitor():
data = request.json
if request.headers['Content-Type'] == 'application/json':
success = self.app.monitor.process_monitor_flows(data)
return self.handle_response(success, data)
else:
return Response("Unsupported media type\n" + data, status=415)
def handle_response(self, success, data):
json_data = json.dumps(data)
if success:
return Response("OK\n" + json_data, status=200)
else:
return Response("BAD REQUEST\n" + json_data, status=400)