def new_comment():
data = request.json
comments_dir = app._config['henet']['comments_dir']
article_uuid = data['source_path']
article_thread = ArticleThread(comments_dir, article_uuid)
article_thread.add_comment(text=data['text'],
author=data['author'])
article_thread.save()
notifs = app._config['notifications']
moderator = notifs.get('moderate_comment')
if moderator is not None:
app.send_email([moderator], u'Nouveau commentaire',
MODERATE_BODY)
emit(EVENT_CREATED_COMMENT, article_uuid=article_uuid)
return {'result': 'OK'}
python类json()的实例源码
def _request(request, request_fallback=None):
''' Extract request fields wherever they may come from: GET, POST, forms, fallback '''
# Use lambdas to avoid evaluating bottle.request.* which may throw an Error
all_dicts = [
lambda: request.json,
lambda: request.forms,
lambda: request.query,
lambda: request.files,
#lambda: request.POST,
lambda: request_fallback
]
request_dict = dict()
for req_dict_ in all_dicts:
try:
req_dict = req_dict_()
except KeyError:
continue
if req_dict is not None and hasattr(req_dict, 'items'):
for req_key, req_val in req_dict.items():
request_dict[req_key] = req_val
return request_dict
def bot_hook():
"""Entry point for the Telegram connection."""
bot = telegram.Bot(botdata['BotToken'])
dispatcher = Dispatcher(bot, None, workers=0)
dispatcher.add_handler(CommandHandler('Abfahrten', abfahrten, pass_args=True))
dispatcher.add_handler(CommandHandler('abfahrten', abfahrten, pass_args=True))
dispatcher.add_handler(CommandHandler('Abfahrt', abfahrten, pass_args=True))
dispatcher.add_handler(CommandHandler('abfahrt', abfahrten, pass_args=True))
dispatcher.add_handler(CommandHandler('A', abfahrten, pass_args=True))
dispatcher.add_handler(CommandHandler('a', abfahrten, pass_args=True))
dispatcher.add_handler(CommandHandler('Hilfe', hilfe))
dispatcher.add_handler(CommandHandler('hilfe', hilfe))
dispatcher.add_handler(CommandHandler('help', hilfe))
dispatcher.add_handler(MessageHandler(Filters.location, nearest_stations))
update = telegram.update.Update.de_json(request.json, bot)
dispatcher.process_update(update)
return 'OK'
def load_config(config_file):
"""
Load configuration from file (output: dict)
"""
if os.path.isfile(config_file):
try:
with open(config_file, 'rU') as f:
config = json.load(f)
except ValueError:
print('Wrong JSON format in {} file'.format(config_file))
sys.exit(3)
except IOError as e:
print('Error while reading from file, {}'.format(e))
sys.exit(2)
else:
return config
else:
print('Configuration file {} not found'.format(config_file))
sys.exit(1)
def add_woman():
new_woman = {}
try:
max_id_woman = max(EXTRAORDINARY_WOMEN, key=lambda x:x['id'])
max_id = max_id_woman['id'] + 1
data = request.json
new_woman = {
"id": max_id,
"name": data['name'],
"origin": data['origin'],
"occupation": data['occupation']
}
EXTRAORDINARY_WOMEN.append(new_woman)
return HTTPResponse(
status=200,
body=json.dumps({"extraordinary_woman": new_woman}))
except:
return HTTPResponse(
status=400,
body=json.dumps({'error': 'error adding a woman'}))
def network():
global network_graph
try:
graph_json = request.json
except IndexError:
pass
# some network nodes could be removed from the graph to avoid confusing the user
# the graph contains
network_json = loads(graph_json)
G = json_graph.node_link_graph(network_json)
fig = plt.figure()
plt.axis('off')
networkx.draw_networkx(G, node_size=80, node_color='c', font_size=8)
network_graph = BytesIO()
fig.savefig(network_graph, format='png')
# redirect any attempts to non-existing pages to the main page
def dtos_get_films_with_actors():
try:
queryObject = QueryObject(
filter = "ReleaseYear='{0}'".format(request.query['releaseYear']),
expand = ['FilmActors.Actor', 'FilmCategories']
)
resultSerialData = dataService.dataViewDto.getItems("Film", queryObject)
return json.dumps(resultSerialData, cls=CustomEncoder, indent=2)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# POST: POST: api/datasource/crud/operations/dtos/TestAction
# with: Content-Type: application/json and body - {"param1":1}
def post_batch_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleInsertEntityBatch(entitySetName, request.json, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
## DELETE: api/datasource/crud/batch/:entitySetName
#@delete('/api/datasource/crud/batch/<entitySetName>')
#def delete_batch_entityset(entitySetName):
# try:
# result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.json, dataService)
# response.content_type = "application/json; charset=utf-8"
# return json.dumps(result, cls=CustomEncoder)
# except dalUtils.StatusCodeError as err:
# response.status = err.value
# except:
# abort(400, 'Bad Request')
# DELETE: api/datasource/crud/batch/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
def entities_get_films_with_actors():
try:
queryObject = QueryObject(
filter = "ReleaseYear='{0}'".format(request.query['releaseYear']),
expand = ['FilmActors.Actor', 'FilmCategories']
)
resultSerialData = dataService.from_.remote.dtoView.Films.getItems(queryObject)
return json.dumps(resultSerialData, cls=CustomEncoder, indent=2)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# POST: api/datasource/crud/operations/entities/TestAction
# with: Content-Type: application/json and body - {"param1":1}
def apply(self, callback, route):
def wrapper(*a, **ka):
try:
rv = callback(*a, **ka)
except HTTPResponse as resp:
rv = resp
if isinstance(rv, dict):
json_response = dumps(rv)
response.content_type = 'application/json'
return json_response
elif isinstance(rv, HTTPResponse) and isinstance(rv.body, dict):
rv.body = dumps(rv.body)
rv.content_type = 'application/json'
return rv
return wrapper
def post_index():
event_type = request.get_header('X-GitHub-Event')
if not is_request_from_github():
abort(403, "Forbidden for IP %s, it's not GitHub's address" % remote_ip())
if request.content_type.split(';')[0] != 'application/json':
abort(415, "Expected application/json, but got %s" % request.content_type)
if event_type == 'ping':
return handle_ping()
elif event_type == 'push':
return handle_push()
else:
abort(400, "Unsupported event type: %s" % event_type)
def build_preview():
if request.content_type == 'application/json':
rst = request.json['rst']
else:
rst = request.POST['rst']
key = md5(rst)
if key in _CACHE:
res, warnings = _CACHE[key]
else:
res, warnings = _CACHE[key] = rst2html(rst, theme='acr')
return {'result': res, 'warnings': warnings,
'valid': len(warnings) == 0}
def __call__(self, *args, **kwargs):
headers = {}
if self._access_token:
headers['Authorization'] = 'Token ' + self._access_token
resp = requests.post(
self._url, json=kwargs,
headers=headers
)
if resp.ok:
data = resp.json()
if data.get('status') == 'failed':
raise Error(resp.status_code, data.get('retcode'))
return data.get('data')
raise Error(resp.status_code)
def scale_down(k8s_host, **kwargs):
"""
Scale down number of replicas to 0
"""
pass_headers = {}
if 'k8s_api_headers' in kwargs:
headers = kwargs.pop('k8s_api_headers')
pass_headers.update(headers)
pass_headers.update({
'Content-Type': 'application/strategic-merge-patch+json'
})
payload = {
'spec': {
'replicas': 0
}
}
api_path = K8S_API['deployments']
namespace = kwargs['namespace']
specs = kwargs['objects']['deployments']['specification']
if specs['kind'] == 'List':
deployments = specs['items']
else:
deployments = [specs]
for deployment in deployments:
deployment_name = deployment['metadata']['name']
url = '{}/{}/namespaces/{}/deployments/{}'.format(
k8s_host, api_path,
namespace,
deployment_name
)
req('PATCH', url, pass_headers, payload)
def get_kv(consul_host, key, list_keys=False):
"""
Retrieve value for specified key from Consul (output: dict or list)
"""
url = '{}/{}/{}'.format(consul_host, CONSUL_KV_API, key)
if list_keys:
value = req('GET', url + '/?keys')
else:
try:
value = json.loads(b64decode(req('GET', url)[0]['Value']))
except ValueError as e:
abort(422, 'Bad JSON: {}'.format(e))
return value
def list_women():
return HTTPResponse(
status=200,
body=json.dumps({"extraordinary_women": EXTRAORDINARY_WOMEN}))
def get_woman(woman_id):
for woman in EXTRAORDINARY_WOMEN:
if woman['id'] == int(woman_id):
return HTTPResponse(
status=200,
body=json.dumps({'extraordinary_woman': woman}))
else:
return HTTPResponse(
status=404,
body=json.dumps({'error': 'id not found'}))
def delete_woman(woman_id):
for woman in EXTRAORDINARY_WOMEN:
if woman['id'] == int(woman_id):
EXTRAORDINARY_WOMEN.remove(woman)
return HTTPResponse(status=204)
else:
return HTTPResponse(
status=204,
body=json.dumps({'error': 'id not found'}))
def get_json_profiles():
"""Get all profiles (JSON)"""
results = db.get_profiles()
return json.dumps(results)
def get_json_profile(item):
"""Get one profile info"""
results = db.get_profile(item)
return json.dumps(results)
def update_profile(name):
"""Update profile info (port & autostart)"""
response.set_cookie("indiserver_profile", name,
None, max_age=3600000, path='/')
data = request.json
port = data.get('port', args.indi_port)
autostart = bool(data.get('autostart', 0))
db.update_profile(name, port, autostart)
def save_profile_drivers(name):
"""Add drivers to existing profile"""
data = request.json
db.save_profile_drivers(name, data)
def get_json_profile_labels(item):
"""Get driver labels of specific profile"""
results = db.get_profile_drivers_labels(item)
return json.dumps(results)
def get_server_status():
"""Server status"""
status = [{'status': str(indi_server.is_running()), 'active_profile': active_profile}]
return json.dumps(status)
def get_server_drivers():
"""List server drivers"""
status = []
for driver in indi_server.get_running_drivers():
status.append({'driver': driver})
return json.dumps(status)
def get_json_groups():
"""Get all driver families (JSON)"""
response.content_type = 'application/json'
families = collection.get_families()
return json.dumps(sorted(families.keys()))
def get_json_drivers():
"""Get all drivers (JSON)"""
response.content_type = 'application/json'
return json.dumps([ob.__dict__ for ob in collection.drivers])
def post_data():
global global_cnt
global global_src_ip
global global_dst_ip
global global_proto
try:
post = request.json
except IndexError:
pass
# overall received packets
global_cnt += 1
data = loads(post)
# src ip distribution
if data[u'ip_src'] in global_src_ip.keys():
global_src_ip[data[u'ip_src']] += 1
else:
global_src_ip[data[u'ip_src']] = 1
# dst ip distribution
if data[u'ip_dst'] in global_dst_ip.keys():
global_dst_ip[data[u'ip_dst']] += 1
else:
global_dst_ip[data[u'ip_dst']] = 1
# proto distribution
if data[u'protocol'] in global_proto.keys():
global_proto[data[u'protocol']] += 1
else:
global_proto[data[u'protocol']] = 1
def dtos_test_action():
try:
param1 = request.json['param1']
# TODO: Add some actions in here
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
def get_metadata():
try:
response.content_type = "application/json; charset=utf-8"
metadataClient = databaseInfo.getMetadataClient()
return json.dumps(metadataClient, cls=MetadataEncoder, indent=2)
except:
abort(500, 'Internal server error')
# GET: api/datasource/crud/:entitySetName?skip=20&top=10