def to_json_response(self, response, headers=None):
"""Serialize simple response to Flask response."""
if self.raw:
return response
response = current_app.response_class(
dumps(response, indent=2), mimetype='application/json')
if headers:
response.headers.extend(headers)
return response
python类json()的实例源码
def post(self, **kwargs):
"""Create a resource."""
data = request.json or {}
resource = self.load(data, **kwargs)
resource = self.save(resource)
logger.debug('Create a resource (%r)', kwargs)
return self.to_simple(resource, **kwargs)
def test_resource(app, api, client):
from flask_restler import Resource
@api.connect
class HelloResource(Resource):
def get(self, resource=None, **kwargs):
return 'Hello, %s!' % (resource and resource.title() or 'World')
@api.connect('/hello/<name>/how-are-you')
class HowAreYouResource(Resource):
def get(self, resource=None, name=None, **kwargs):
return 'Hello, %s! How are you?' % name.title()
response = client.get('/api/v1/hello')
assert response.json == 'Hello, World!'
response = client.get('/api/v1/hello/mike')
assert response.json == 'Hello, Mike!'
response = client.post('/api/v1/hello')
assert response.status_code == 405
response = client.get('/api/v1/hello/mike/how-are-you')
assert response.json == 'Hello, Mike! How are you?'
def _post_args(self): # pylint: disable=no-self-use
# pylint: disable=maybe-no-member
""" Return action and args after parsing request """
data = request.json if request.json else {}
params = api_utils.change_to_str_in_dict(data)
action = params.get('action', request.form.get('action', ''))
args = params.get('args', {})
try:
args['file'] = request.files['file']
except KeyError:
pass
LOGGER.debug('Input args are: action: %s, args: %s', action, args)
return action, args
def main():
req = flask_request.json
policy = Policy.initialize()
return json.dumps(policy.handle(req, settings.vi)).encode('utf-8')
def put(self, id):
'''Update an event by ID'''
ev = Event.query.filter(Event.disabled == 0).filter(Event.id_event == id).first()
abort_if_none(ev, 404, 'Not Found')
fill_object(ev, request.json)
db.session.commit()
return msg('success!')
def post(self):
'''Create a new event'''
ev = Event()
tags = request.json['tags'][:] # copy the tags dict
del request.json['tags']
for tm in tags:
t = Tag.query\
.filter(Tag.name == tm['name'])\
.filter(Tag.disabled == 0)\
.first()
if t is not None:
ev.tags.append(t)
continue
tag = Tag()
fill_object(tag, tm)
ev.tags.append(tag)
fill_object(ev, request.json)
# submit objects to db
db.session.add(ev)
db.session.commit()
return msg(ev.id_event, 'id')
def put(self, id):
'''Update an event_type by ID'''
et = EventType.query.filter(EventType.disabled == 0).filter(EventType.id_event_type == id)
et = et.first()
abort_if_none(et, 404, 'Not Found')
fill_object(et, request.json)
db.session.commit()
return msg('altered')
def post(self):
'''Create a new event_type'''
et = EventType(request.json['name'], request.json['description'])
et = et.first()
abort_if_none(et, 404, 'Not Found')
db.session.add(et)
db.session.commit()
return msg(et.id_event_type, 'id')
def put(self, id):
"""Update an user by ID"""
us = User.query\
.filter(User.disabled == 0)\
.filter(User.id_user == id)\
.first()
abort_if_none(us, 404, 'not found')
fill_object(us, request.json)
db.session.commit()
return msg('success!')
def post(self):
"""Create a new user"""
us = User()
fill_object(us, request.json)
db.session.add(us)
db.session.commit()
return msg(us.id_user, 'id')
def json_resp(thing):
json_str = json.dumps(thing, sort_keys=True, default=json_dumper, indent=4)
if request.path.endswith(".json") and (os.getenv("FLASK_DEBUG", False) == "True"):
logger.info(u"rendering output through debug_api.html template")
resp = make_response(render_template(
'debug_api.html',
data=json_str))
resp.mimetype = "text/html"
else:
resp = make_response(json_str, 200)
resp.mimetype = "application/json"
return resp
def abort_json(status_code, msg):
body_dict = {
"HTTP_status_code": status_code,
"message": msg,
"error": True
}
resp_string = json.dumps(body_dict, sort_keys=True, indent=4)
resp = make_response(resp_string, status_code)
resp.mimetype = "application/json"
abort(resp)
def log_request(resp):
if request.endpoint != "get_doi_endpoint":
return
logging_start_time = time()
try:
results = json.loads(resp.get_data())["results"][0]
except (ValueError, RuntimeError, KeyError):
# don't bother logging if no results
return
oa_color = results["oa_color"]
if not oa_color:
oa_color = "gray"
body = {
"timestamp": datetime.utcnow().isoformat(),
"elapsed": elapsed(g.request_start_time, 2),
"ip": get_ip(),
"status_code": resp.status_code,
"email": request.args.get("email", None),
"doi": results["doi"],
"year": results.get("year", None),
"oa_color": oa_color
}
h = {
"content-type": "text/json",
"X-Forwarded-For": get_ip()
}
url = "http://logs-01.loggly.com/inputs/6470410b-1d7f-4cb2-a625-72d8fa867d61/tag/{}/".format(
oa_color)
requests.post(url, headers=h, data=json.dumps(body))
# logger.info(u"log_request took {} seconds".format(elapsed(logging_start_time, 2)))
def get_doi_endpoint(doi):
# the GET api endpoint (returns json data)
my_pub = get_pub_from_doi(doi)
return jsonify({"results": [my_pub.to_dict()]})
def post_gs_cache_endpoint():
body = request.json
my_gs = post_gs_cache(**body)
return jsonify(my_gs.to_dict())
def restart_endpoint():
logger.info(u"in restart endpoint")
allowed_to_reboot = False
for (k, v) in request.args.iteritems():
if v==os.getenv("HEROKU_API_KEY"):
allowed_to_reboot = True
if not allowed_to_reboot:
logger.info(u"not allowed to reboot in restart_endpoint")
return jsonify({
"response": "not allowed to reboot, didn't send right heroku api key"
})
payload_json = json.loads(request.form["payload"])
dynos_to_restart = set()
for event in payload_json["events"]:
logger.info(u"dyno {}".format(event["program"]))
dyno_name = event["program"].split("/")[1]
dynos_to_restart.add(dyno_name)
# just restart each dyno once
logger.info(u"restarting dynos: {}".format(dynos_to_restart))
for dyno_name in dynos_to_restart:
restart_dyno("oadoi", dyno_name)
return jsonify({
"response": "restarted dynos: {}".format(dynos_to_restart)
})
def webhook():
try:
from custom import webhook_handler as h
if h.handle:
h.handle(request.json or request.form)
except ImportError:
print('There is no webhook handler.')
return '', 204