def session_addressbook_update(session_name, address_name):
try:
req = request.get_json()
if req is None:
req = {}
fw.sessions[session_name].addressbook.update(
name=address_name, **req)
return make_response(jsonify(message='Address updated'), status.HTTP_200_OK)
except AddressNotFound as e:
http_code = status.HTTP_404_NOT_FOUND
except AddressNotValid as e:
http_code = status.HTTP_400_BAD_REQUEST
except AddressNotUpdated as e:
return make_response(jsonify(message='Address not updated'), status.HTTP_204_NO_CONTENT)
except Exception as e:
http_code = status.HTTP_500_INTERNAL_SERVER_ERROR
return make_response(jsonify(message='Failed to update address', error=e.message), http_code)
python类make_response()的实例源码
def session_service_delete(session_name, service_name):
data = {}
try:
fw.sessions[session_name].services.delete(name=service_name)
return make_response(jsonify(message='Service deleted'), status.HTTP_200_OK)
except ServiceNotFound as e:
http_code = status.HTTP_404_NOT_FOUND
except ServiceInUse as e:
http_code = status.HTTP_406_NOT_ACCEPTABLE
data['deps'] = e.deps
except ServiceNotValid as e:
http_code = status.HTTP_400_BAD_REQUEST
except Exception as e:
http_code = status.HTTP_500_INTERNAL_SERVER_ERROR
return make_response(jsonify(message='Failed to delete service', error=e.message, **data), http_code)
def session_service_update(session_name, service_name):
try:
req = request.get_json()
if req is None:
req = {}
fw.sessions[session_name].services.update(
name=service_name, **req)
return make_response(jsonify(message='Service updated'), status.HTTP_200_OK)
except ServiceNotFound as e:
http_code = status.HTTP_404_NOT_FOUND
except ServiceNotValid as e:
http_code = status.HTTP_400_BAD_REQUEST
except ServiceNotUpdated as e:
return make_response(jsonify(message='Service not updated'), status.HTTP_204_NO_CONTENT)
except Exception as e:
http_code = status.HTTP_500_INTERNAL_SERVER_ERROR
return make_response(jsonify(message='Failed to update service', error=e.message), http_code)
def session_chain_delete(session_name, table_name, chain_name):
data = {}
try:
fw.sessions[session_name].chains.delete(
name=chain_name, table=table_name)
return make_response(jsonify(message='Chain deleted'), status.HTTP_200_OK)
except ChainNotFound as e:
http_code = status.HTTP_404_NOT_FOUND
except ChainInUse as e:
http_code = status.HTTP_406_NOT_ACCEPTABLE
data['deps'] = e.deps
except ChainNotValid as e:
http_code = status.HTTP_400_BAD_REQUEST
except Exception as e:
http_code = status.HTTP_500_INTERNAL_SERVER_ERROR
return make_response(jsonify(message='Failed to delete chain', error=e.message, **data), http_code)
def session_chain_update(session_name, table_name, chain_name):
try:
req = request.get_json()
if req is None:
req = {}
fw.sessions[session_name].chains.update(
name=chain_name, table=table_name, **req)
return make_response(jsonify(message='Chain updated'), status.HTTP_200_OK)
except ChainNotFound as e:
http_code = status.HTTP_404_NOT_FOUND
except ChainNotValid as e:
http_code = status.HTTP_400_BAD_REQUEST
except ChainNotUpdated as e:
return make_response(jsonify(message='Chain not updated'), status.HTTP_204_NO_CONTENT)
except Exception as e:
http_code = status.HTTP_500_INTERNAL_SERVER_ERROR
return make_response(jsonify(message='Failed to update chain', error=e.message), http_code)
def session_check_update(session_name, check_name):
try:
req = request.get_json()
if req is None:
req = {}
fw.sessions[session_name].checks.update(name=check_name, **req)
return make_response(jsonify(message='Rollback check updated'), status.HTTP_200_OK)
except CheckNotFound as e:
http_code = status.HTTP_404_NOT_FOUND
except CheckNotValid as e:
http_code = status.HTTP_400_BAD_REQUEST
except CheckNotUpdated as e:
return make_response(jsonify(message='Rollback check not updated'), status.HTTP_204_NO_CONTENT)
except Exception as e:
http_code = status.HTTP_500_INTERNAL_SERVER_ERROR
return make_response(jsonify(message='Failed to update rollback check', error=e.message), http_code)
def update_payments(id):
try:
if not request.is_json:
raise DataValidationError('Invalid payment: Content Type is not json')
data = request.get_json(silent=True)
message = payment_service.update_payment(id,payment_replacement=data)
rc = status.HTTP_200_OK
except PaymentNotFoundError as e:
message = e.message
rc = status.HTTP_404_NOT_FOUND
except DataValidationError as e:
message = e.message
rc = status.HTTP_400_BAD_REQUEST
return make_response(jsonify(message), rc)
######################################################################
# UPDATE AN EXISTING PAYMENT PARTIALLY
######################################################################
def charge_payment(user_id):
try:
if not request.data:
raise DataValidationError('Invalid request: body of request contained no data')
if not request.is_json:
raise DataValidationError('Invalid request: request not json')
data = request.get_json()
if data['amount']:
if(data['amount'] < 0):
raise DataValidationError('Invalid request: Order amount is negative.')
else:
resp = payment_service.perform_payment_action(user_id,payment_attributes=data)
if resp == True:
message = {'success' : 'Default payment method for user_id: %s has been charged $%.2f' % (str(user_id), data['amount'])}
rc = status.HTTP_200_OK
except DataValidationError as e:
message = {'error' : e.message}
rc = status.HTTP_400_BAD_REQUEST
except KeyError as e:
message = {'error' : 'Invalid request: body of request does not have the amount to be charged'}
rc = status.HTTP_400_BAD_REQUEST
return make_response(jsonify(message), rc)
def home():
'''Home page of the service.'''
return flask.make_response(
'Hello World, I\'m FoodAId.',
200)
def kill():
'''Stops the bot thread.'''
BOT.is_running = False
return flask.make_response(
'@FoodAId:\n' +
'I\'m sad.\n' +
'Why do I have to die?\n' +
'Why does anyone have to die?',
200)
def get(self):
resp = make_response(redirect('/login/'))
session.pop('username', None)
session.pop('nickname', None)
session.pop('description', None)
session.pop('avatar', None)
session.pop('status', None)
session.pop('usergroup', None)
session.pop('token', None)
resp.set_cookie('docklet-jupyter-cookie', '', expires=0)
return resp
def logs_get(filename):
data = {
"filename": filename
}
result = dockletRequest.post('/logs/get/', data).get('result', '')
response = make_response(result)
response.headers["content-type"] = "text/plain"
return response
def bytes(n):
global max_size
n = min(n, max_size)
response = make_response()
response.data = bytearray(randint(0, 255) for i in range(n))
response.content_type = 'application/octet-stream'
response.status_code = 200
return response
def status_code(code):
r = make_response()
r.status_code = int(code)
return r
def post(self):
# get the post data
post_data = request.get_json()
# check if user already exists
user = User.query.filter_by(email=post_data.get('email')).first()
if not user:
try:
user = User(
email=post_data.get('email'),
password=post_data.get('password')
)
# insert the user
db.session.add(user)
db.session.commit()
# generate the auth token
auth_token = user.encode_auth_token(user.id)
responseObject = {
'status': 'success',
'message': 'Successfully registered.',
'auth_token': auth_token.decode()
}
return make_response(jsonify(responseObject)), 201
except Exception as e:
responseObject = {
'status': 'fail',
'message': 'Some error occurred. Please try again.'
}
return make_response(jsonify(responseObject)), 401
else:
responseObject = {
'status': 'fail',
'message': 'User already exists. Please Log in.',
}
return make_response(jsonify(responseObject)), 202
def post(self):
# get the post data
post_data = request.get_json()
try:
# fetch the user data
user = User.query.filter_by(
email=post_data.get('email')
).first()
if user and bcrypt.check_password_hash(
user.password, post_data.get('password')
):
auth_token = user.encode_auth_token(user.id)
if auth_token:
responseObject = {
'status': 'success',
'message': 'Successfully logged in.',
'auth_token': auth_token.decode()
}
return make_response(jsonify(responseObject)), 200
else:
responseObject = {
'status': 'fail',
'message': 'User does not exist.'
}
return make_response(jsonify(responseObject)), 404
except Exception as e:
print(e)
responseObject = {
'status': 'fail',
'message': 'Try again'
}
return make_response(jsonify(responseObject)), 500
def get(self):
# get the auth token
auth_header = request.headers.get('Authorization')
if auth_header:
try:
auth_token = auth_header.split(" ")[1]
except IndexError:
responseObject = {
'status': 'fail',
'message': 'Bearer token malformed.'
}
return make_response(jsonify(responseObject)), 401
else:
auth_token = ''
if auth_token:
resp = User.decode_auth_token(auth_token)
if not isinstance(resp, str):
user = User.query.filter_by(id=resp).first()
responseObject = {
'status': 'success',
'data': {
'user_id': user.id,
'email': user.email,
'admin': user.admin,
'registered_on': user.registered_on
}
}
return make_response(jsonify(responseObject)), 200
responseObject = {
'status': 'fail',
'message': resp
}
return make_response(jsonify(responseObject)), 401
else:
responseObject = {
'status': 'fail',
'message': 'Provide a valid auth token.'
}
return make_response(jsonify(responseObject)), 401
def post(self):
# get auth token
auth_header = request.headers.get('Authorization')
if auth_header:
auth_token = auth_header.split(" ")[1]
else:
auth_token = ''
if auth_token:
resp = User.decode_auth_token(auth_token)
if not isinstance(resp, str):
# mark the token as blacklisted
blacklist_token = BlacklistToken(token=auth_token)
try:
# insert the token
db.session.add(blacklist_token)
db.session.commit()
responseObject = {
'status': 'success',
'message': 'Successfully logged out.'
}
return make_response(jsonify(responseObject)), 200
except Exception as e:
responseObject = {
'status': 'fail',
'message': e
}
return make_response(jsonify(responseObject)), 200
else:
responseObject = {
'status': 'fail',
'message': resp
}
return make_response(jsonify(responseObject)), 401
else:
responseObject = {
'status': 'fail',
'message': 'Provide a valid auth token.'
}
return make_response(jsonify(responseObject)), 403
# define the API resources
def login(self):
if self.is_authenticated():
return redirect(url_for('get_config_site'))
if request.method == "GET":
return render_template('login.html')
if request.form.get('password', None) == config.get('CONFIG_PASSWORD', None):
resp = make_response(redirect(url_for('get_config_site')))
resp.set_cookie('auth', config['AUTH_KEY'])
return resp
def api(file):
if file == 'all':
return all
elif file == 'now':
return now
else:
return make_response('404 Not Found', 404)