def line_call_back():
try:
if PRODUCTION == '1':
if not bot.bot_api.client.validate_signature(request.headers.get('X-Line-Channelsignature'),
request.get_data().decode("utf-8")):
return "NOT PASS"
bot.process_new_event(request.get_data().decode("utf-8"))
except Exception as ex:
logging.error(ex)
return "OK"
python类get_data()的实例源码
def edison_done():
data = request.get_data().decode("utf-8")
bot.take_photo_done(data)
return 'OK'
def webhook():
page.handle_webhook(request.get_data(as_text=True))
return "ok"
### Main method (Handles user messages, db) ###
def callback():
# get X-Line-Signature header value
signature = request.headers['X-Line-Signature']
# get request body as text
body = request.get_data(as_text=True)
app.logger.info("Request body: " + body)
# handle webhook body
try:
handler.handle(body, signature)
except InvalidSignatureError:
abort(400)
return 'OK'
def generate_secure_token():
payload = request.get_data()
token = SecureToken.encrypt(payload)
return token + b'\n'
def parse_frames(pipeline):
if request.method == 'GET':
# parse from text parameter
d = request.args.get('t')
if d == None:
abort(400)
else:
d = request.get_data(as_text=True)
tstart = time.time()
try:
r = proc_input(d,pipeline.upper())
except:
LOGGER.exception("Error processing input")
abort(500)
return jsonify(r)
def server_error_page(error):
db.session.rollback()
return jsonify({'error': 'internal server error'}), 500
# logger
# @app.before_request
# def log_request_info():
# app.logger.debug('Headers: %s', request.headers)
# app.logger.debug('Body: %s', request.get_data())
def get_sql_audit_info():
return sql_manager.audit_sql(get_object_from_json_tmp(request.get_data()))
def review_sql_work():
return sql_manager.audit_sql_work(get_object_from_json_tmp(request.get_data()))
def add_sql_work():
return sql_manager.add_sql_work(get_object_from_json_tmp(request.get_data()))
def add_user():
return user_manager.add_user(get_object_from_json_tmp(request.get_data()))
def add_group():
return user_manager.add_group_info(get_object_from_json_tmp(request.get_data()))
def update_group():
return user_manager.update_user_group_info(get_object_from_json_tmp(request.get_data()))
def update_sql_work():
return sql_manager.update_sql_work(get_object_from_json_tmp(request.get_data()))
# endregion
# region login api
def webhook():
payload = request.get_data(as_text=True)
print(payload)
page.handle_webhook(payload)
return "ok"
def get_form():
try:
return request._covador_form
except AttributeError:
if request.content_type.startswith('multipart/form-data'):
form = request.form.to_dict(False)
elif request.content_type.startswith('application/x-www-form-urlencoded'):
form = parse_qs(request.get_data(parse_form_data=False))
else:
form = {}
request._covador_form = form
return form
def get_request_log():
# parse args and forms
values = ''
if len(request.values) > 0:
for key in request.values:
values += key + ': ' + request.values[key] + ', '
route = '/' + request.base_url.replace(request.host_url, '')
request_log = {
'route': route,
'method': request.method,
}
# ??xml ??
category = request_log['route'].split('/')[1]
if category != "virtual_card":
return request_log
if len(request.get_data()) != 0:
body = json.loads(request.get_data())
if "password" in body:
body.pop("password")
request_log['body'] = body
if len(values) > 0:
request_log['values'] = values
return request_log
def saveFile(name):
"""
save plugin code. code is provides via http body
:param name: the plugin name
:return: empty http reponse
"""
with open("./modules/plugins/"+name+"/__init__.py", "wb") as fo:
fo.write(request.get_data())
cbpi.emit_message("PLUGIN %s SAVED" % (name))
return ('', 204)
def post(self):
"""Trigger a new run"""
run_payload = utils.uni_to_str(json.loads(request.get_data()))
run_payload['id'] = str(uuid.uuid4())
LOG.info('Triggering new ansible run %s', run_payload['id'])
run = self.manager.create_run(run_payload)
return run_model.format_response(run)