def get_data():
cname = request.args.get("collection")
settings = CSETTINGS[cname]
search_string = request.args.get("search_string")
xaxis = request.args.get("xaxis")
yaxis = request.args.get("yaxis")
xaxis = get_mapped_name(settings, xaxis)
yaxis = get_mapped_name(settings, yaxis)
projection = [xaxis, yaxis]
if search_string.strip() != "":
criteria = process_search_string(search_string, settings)
data = []
for r in DB[cname].find(criteria, projection=projection):
x = _get_val(xaxis, r, None)
y = _get_val(yaxis, r, None)
if x and y:
data.append([x, y])
else:
data = []
return jsonify(jsanitize(data))
python类jsonify()的实例源码
def _serve_webui(self, file_name='index.html'): # pylint: disable=redefined-builtin
try:
assert file_name
web3 = self.flask_app.config.get('WEB3_ENDPOINT')
if web3 and 'config.' in file_name and file_name.endswith('.json'):
host = request.headers.get('Host')
if any(h in web3 for h in ('localhost', '127.0.0.1')) and host:
_, _port = split_endpoint(web3)
_host, _ = split_endpoint(host)
web3 = 'http://{}:{}'.format(_host, _port)
response = jsonify({'raiden': self._api_prefix, 'web3': web3})
else:
response = send_from_directory(self.flask_app.config['WEBUI_PATH'], file_name)
except (NotFound, AssertionError):
response = send_from_directory(self.flask_app.config['WEBUI_PATH'], 'index.html')
return response
def delete_executor(executor_id):
"""
Delete an executor
Example request::
DEL /api/1/executors/6890192d8b6c40e5af16f13aa036c7dc
"""
manager.remove_executor(executor_id)
return json.jsonify({
"msg": "Executor %s succesfully deleted" % executor_id
})
########
# UTILS
#######
def join(id):
event = find_event(id)
if event == None:
return not_found()
user = find_user(request.values.get('user_param'), request.values.get('user_param_value'))
if user == None:
return user_not_specified()
join_event(user, event)
return jsonify({
'joined': True,
'rsvps_disabled': False,
'event_archived': False,
'over_capacity': False,
'past_deadline': False,
})
def handle_authn_response():
# parse authn response
authn_response = current_app.rp.client.parse_response(AuthorizationResponse,
info=request.query_string.decode("utf-8"),
sformat="urlencoded")
auth_code = None
if "code" in authn_response:
auth_code = authn_response["code"]
# make token request
args = {
"code": auth_code,
"client_id": current_app.rp.client.client_id,
"client_secret": current_app.rp.client.client_secret
}
token_response = current_app.rp.client.do_access_token_request(scope="openid", request_args=args)
access_token = token_response["access_token"]
id_token = token_response["id_token"].to_dict()
# TODO do userinfo req
else:
id_token = authn_response["id_token"].to_dict()
access_token = authn_response.get("access_token")
return jsonify(dict(auth_code=auth_code, token=access_token, id_token=id_token))
def api_private_counties_by_month():
NOW = datetime.now()
result = []
r = current_app.db_session.query(
Report.test_start_time,
Report.probe_cc) \
.order_by(Report.test_start_time)
# XXX this can be done better in a SQL that is not sqlite
monthly_buckets = {}
for tst, country in r:
if tst > NOW:
# We ignore measurements from time travelers
continue
bkt = tst.strftime("%Y-%m-01")
monthly_buckets[bkt] = monthly_buckets.get(bkt, [])
if country not in monthly_buckets[bkt]:
monthly_buckets[bkt].append(country)
for bkt in monthly_buckets.keys():
result.append({
'date': bkt,
'value': len(monthly_buckets[bkt])
})
return jsonify(result)
def api_private_runs_by_month():
NOW = datetime.now()
result = []
r = current_app.db_session.query(
Report.test_start_time) \
.order_by(Report.test_start_time)
# XXX this can be done better in a SQL that is not sqlite
monthly_buckets = {}
for res in r:
tst = res.test_start_time
if tst > NOW:
# We ignore measurements from time travelers
continue
bkt = tst.strftime("%Y-%m-01")
monthly_buckets[bkt] = monthly_buckets.get(bkt, 0)
monthly_buckets[bkt] += 1
for bkt in monthly_buckets.keys():
result.append({
'date': bkt,
'value': monthly_buckets[bkt]
})
return jsonify(result)
def api_private_blockpage_detected():
q = current_app.db_session.query(
distinct(Report.probe_cc).label('probe_cc'),
).join(Measurement, Measurement.report_no == Report.report_no) \
.filter(Measurement.confirmed == True) \
.filter(or_(
Report.test_name == 'http_requests',
Report.test_name == 'web_connectivity'
))
results = []
for row in q:
results.append({
'probe_cc': row.probe_cc
})
return jsonify({
'results': results
})
def variantset_search():
if not request.json:
return makeGAException(
message='Bad Content Type, please send application/json',
ecode=-1,
rcode=415)
datasetId = request.json.get('datasetId', "")
pageSize = request.json.get('pageSize', None)
pageToken = request.json.get('pageToken', None)
try:
garesp = tileSearch.searchVariantSets(
datasetId=datasetId, pageSize=pageSize, pageToken=pageToken)
except:
return makeGAException(
message='search command exited abnormally', ecode=500, rcode=500)
return jsonify(garesp.gavsetresponse_info)
def put(self, username, transaction_id):
user = self.authenticate()
if user:
new_transaction = request.get_json(force=True)
transaction = user.transactions.filter_by(
transaction_id=transaction_id).first_or_404()
transaction.category_id = new_transaction['category_id']
transaction.person_id = new_transaction['person_id']
transaction.transaction_date = new_transaction['transaction_date']
transaction.value = new_transaction['value']
transaction.notes = new_transaction['notes']
transaction.type = new_transaction['type']
transaction.done = new_transaction['done']
db.session.commit()
return json.jsonify(transaction.as_dict())
raise InvalidUsage()
def post(self, username, transaction_id):
user = self.authenticate()
if user:
transaction = user.transactions.filter_by(
transaction_id=transaction_id).first_or_404()
supposed_item = request.get_json(force=True)
item = TransactionItem()
item.item_id = 1 if len(transaction.transaction_items.all(
)) else transaction.transaction_items.all()[-1].item_id + 1
item.user_id = user.user_id
item.transaction_id = transaction.transaction_id
item.person_id = supposed_item['person_id']
item.item_date = supposed_item['item_date']
item.value = supposed_item['value']
item.notes = supposed_item['notes']
item.type = supposed_item['type']
item.done = supposed_item['done']
db.session.add(item)
db.session.commit()
if item.item_id:
return json.jsonify(item.as_dict())
raise InvalidUsage()
def put(self, username, transaction_id, item_id):
user = self.authenticate()
if user:
transaction = user.transactions.filter_by(
transaction_id=transaction_id).first_or_404()
new_item = request.get_json(force=True)
item = transaction.transaction_items.filter_by(
item_id=item_id).first_or_404()
item.person_id = new_item['person_id']
item.item_date = new_item['item_date']
item.value = new_item['value']
item.notes = new_item['notes']
item.type = new_item['type']
item.done = new_item['done']
db.session.commit()
return json.jsonify(item.as_dict())
raise InvalidUsage()
def stack_analyses_debug(external_request_id):
"""Debug endpoint exposing operational data for particular stack analysis.
This endpoint is not part of the public API.
Note the existence of the data is not guaranteed,
therefore the endpoint can return 404 even for valid request IDs.
"""
results = retrieve_worker_results(rdb, external_request_id)
if not results:
return jsonify(error='No operational data for the request ID'), 404
response = {'tasks': []}
for result in results:
op_data = result.to_dict()
audit = op_data.get('task_result', {}).get('_audit', {})
task_data = {'task_name': op_data.get('worker')}
task_data['started_at'] = audit.get('started_at')
task_data['ended_at'] = audit.get('ended_at')
task_data['error'] = op_data.get('error')
response['tasks'].append(task_data)
return jsonify(response), 200
def hipfrog():
requestdata = json.loads(request.get_data())
callingMessage = requestdata['item']['message']['message'].lower().split()
oauthId = requestdata['oauth_client_id']
installation = messageFunctions.getInstallationFromOauthId(oauthId)
if installation.glassfrogToken is None:
message = strings.set_token_first
message_dict = messageFunctions.createMessageDict(strings.error_color, message)
elif len(callingMessage) == 1:
message = strings.help_hipfrog
message_dict = messageFunctions.createMessageDict(strings.succes_color, message)
elif len(callingMessage) > 1:
# /hipfrog something
message = strings.missing_functionality.format(callingMessage[1])
message_dict = messageFunctions.createMessageDict(strings.error_color, message)
# TODO Generate message_dict and color here
return json.jsonify(message_dict)
def error_handler(error):
"""
Standard Error Handler
"""
if isinstance(error, HTTPException):
return jsonify({
'statusCode': error.code,
'name': error.name,
'description': error.description
}), error.code
else:
return jsonify({
'statusCode': 500,
'name': 'Internal Server Error',
'description': 'An unknown error has occurred'
}), 500
# common errors - add others as needed
def get_group_sections():
q = request.query_string.decode()
if not q:
return json.jsonify({'status': 'error'})
all_cn = []
for cn in rd.smembers('group:%s' % q):
try:
all_cn.insert(0, int(cn))
except ValueError:
continue
schedule = tuple(
lesson.details
for lesson in Lesson.query.filter(Lesson.class_no.in_(all_cn))
.order_by(Lesson.start).all()
)
return json.jsonify({
'status': 'ok',
'events': schedule,
})
def get_ids(collection_name):
settings = CSETTINGS[collection_name]
doc = DB[collection_name].distinct(settings["unique_key"])
return jsonify(jsanitize(doc))
def get_doc_json(collection_name, uid):
settings = CSETTINGS[collection_name]
criteria = {
settings["unique_key"]: process(uid, settings["unique_key_type"])}
doc = DB[collection_name].find_one(criteria)
return jsonify(jsanitize(doc))
def task_in_json(taskid):
if ':' not in taskid:
return json.jsonify({'code': 400, 'error': 'bad project:task_id format'})
project, taskid = taskid.split(':', 1)
taskdb = app.config['taskdb']
task = taskdb.get_task(project, taskid)
if not task:
return json.jsonify({'code': 404, 'error': 'not found'})
task['status_string'] = app.config['taskdb'].status_to_string(task['status'])
return json.jsonify(task)
def handle_invalid_usage(error):
"""
Handler for APIErrors thrown by API endpoints
"""
log.info('%d %s', error.status_code, error.message)
response = json.jsonify(error.to_dict())
response.status_code = error.status_code
return response
def add_plan():
"""
Add a plan.
Example request::
PUT /api/1/executors/3b373155577b4d1bbc62216ffea013a4
Body:
{
"name": "Terminate instances in Playground",
"attack": {
"args": {
"region": "eu-west-1",
"filters": {
"tag:Name": "playground-asg"
}
},
"ref": "terminate_ec2_instance:TerminateEC2Instance"
},
"planner": {
"ref": "simple_planner:SimplePlanner",
"args": {
"min_time" : "10:00",
"max_time" : "19:00",
"times": 4
}
}
}
"""
assert validate_payload(request, plan_schema)
req_json = request.get_json()
name = req_json["name"]
planner_config = req_json["planner"]
attack_config = req_json["attack"]
manager.execute_plan(name, planner_config, attack_config)
return json.jsonify({"msg": "ok"})
def delete_plan(plan_id):
"""
Delete a plan
Example request::
DEL /api/1/plans/6890192d8b6c40e5af16f13aa036c7dc
"""
manager.delete_plan(plan_id)
return json.jsonify({
"msg": "Plan %s successfully deleted" % plan_id
})
def render_event(event):
return jsonify(filter_participants(event))
def render_events(events):
return jsonify([filter_participants(event) for event in events])
def not_found():
response = jsonify({"message":"not_found"})
response.status_code = 404
return response
def user_not_specified():
response = jsonify({"message": "user_not_specified"})
response.status_code = 422
return response
def custom_401(error):
# API CALL
if 'Content-Type' in request.headers and request.headers['Content-Type'].lower() == 'application/json':
return jsonify(errors=['Not authorized']), 401
return render_template('home/401.html', message=error.description['message']), 401
def get_version():
return jsonify({
"version": __version__
})
def api_private_asn_by_month():
NOW = datetime.now()
result = []
r = current_app.db_session.query(
Report.test_start_time,
Report.probe_asn) \
.order_by(Report.test_start_time)
# XXX this can be done better in a SQL that is not sqlite
monthly_buckets = {}
for tst, asn in r:
asn = 'AS%d' % asn
if tst > NOW:
# We ignore measurements from time travelers
continue
bkt = tst.strftime("%Y-%m-01")
monthly_buckets[bkt] = monthly_buckets.get(bkt, [])
if asn not in monthly_buckets[bkt]:
monthly_buckets[bkt].append(asn)
for bkt in monthly_buckets.keys():
result.append({
'date': bkt,
'value': len(monthly_buckets[bkt])
})
return jsonify(result)
def api_private_reports_per_day():
q = current_app.db_session.query(
func.count(func.date_trunc('day', Report.test_start_time)),
func.date_trunc('day', Report.test_start_time)
).group_by(func.date_trunc('day', Report.test_start_time)).order_by(
func.date_trunc('day', Report.test_start_time)
)
result = []
for count, date in q:
result.append({
'count': count,
'date': date.strftime("%Y-%m-%d")
})
return jsonify(result)