def verify_route():
"""
Verify if the route is in a list of routes that need have the authorization header
"""
for route in routes:
if route[0] == str(request.url_rule) and request.method in route[1] and 'Authorization' not in request.headers:
abort(403, 'Authorization header missing')
# TODO: Implementar a validação dos modelos de entrada
python类url_rule()的实例源码
def encrypt_password():
"""
Verify if the route is for password reset or user create, then encrypts the password.
"""
if request.json is None or not 'password' in request.json:
return
if str(request.url_rule) == '/auth/user/' and request.method == 'POST' \
or str(request.url_rule) == '/auth/user/resetpassword/' and request.method == 'PUT':
request.json['password'] = generate_password_hash(request.json['password'])
def get_url_rule():
rule = None
url_rule = request.url_rule
if url_rule is not None:
rule = url_rule.rule
return rule
def request_finished(self, app, response):
rule = request.url_rule.rule if request.url_rule is not None else ""
rule = build_name_with_http_method_prefix(rule, request)
request_data = get_data_from_request(request)
response_data = get_data_from_response(response)
elasticapm.set_transaction_data(request_data, 'request')
elasticapm.set_transaction_data(response_data, 'response')
if response.status_code:
result = 'HTTP {}xx'.format(response.status_code // 100)
else:
result = response.status
self.client.end_transaction(rule, result)
def build_url(self, **kwargs):
arg = request.args.copy()
view_args = request.view_args
arg.update(view_args)
for attr in kwargs.keys():
if attr in arg:
arg.pop(attr)
arg.update(kwargs.items())
rule = request.url_rule
result = rule.build(arg)
return result[1]
def inject_template_vars():
output = {}
output["baseurl"] = get_baseurl()
current_path = str(request.url_rule)[1:].split("/")
output["current_path"] = current_path[0]
return output
def requireAuthenticate(acceptGuest):
def requireAuth(f):
@wraps(f)
def decorated_function(*args, **kwargs):
auth = request.authorization
if auth:
if acceptGuest and request.headers['Service-Provider'] == 'Guest' and auth.username == app.config['GUEST_ID']:
if auth.password == app.config['GUEST_TOKEN']:
g.currentUser = None
g.loginWith = 'Guest'
return f(*args, **kwargs)
else:
g.loginWith = None
if request.headers['Service-Provider'] == 'Facebook':
g.facebookToken = FacebookModel.getTokenValidation(app.config['FACEBOOK_ACCESS_TOKEN'], auth.password)
if g.facebookToken['is_valid'] and g.facebookToken['user_id'] == auth.username:
g.currentUser = FacebookModel.getUser(auth.username)
g.loginWith = 'Facebook'
elif request.headers['Service-Provider'] == 'Google':
g.googleToken = GoogleModel.getTokenValidation(app.config['GOOGLE_CLIENT_ID'], auth.password)
if g.googleToken and g.googleToken['sub'] == auth.username:
g.currentUser = GoogleModel.getUser(auth.username)
g.loginWith = 'Google'
if g.loginWith and (str(request.url_rule) == '/v1/login' or g.currentUser):
if str(request.url_rule) == '/v1/login' or not g.currentUser['disabled']:
return f(*args, **kwargs)
return abort(410)
return abort(401)
return decorated_function
return requireAuth
def _has_fr_route(self):
"""Encapsulating the rules for whether the request was to a Flask endpoint"""
# 404's, 405's, which might not have a url_rule
if self._should_use_fr_error_handler():
return True
# for all other errors, just check if FR dispatched the route
if not request.url_rule:
return False
return self.owns_endpoint(request.url_rule.endpoint)
def load_user():
# Make sure not to run function for static files
if request.url_rule and '/static/' in request.url_rule.rule:
return
# Load user
session_id = request.cookies.get('session_id')
user_id = request.cookies.get('user_id')
if session_id and user_id:
try:
user_id = int(user_id)
except ValueError:
return
session = db.session.query(LoginSession).get((session_id, user_id))
if session:
user = session.user
if user and session.active:
g.logged_in = True
g.user = user
# Update user's last_action timestamp if it's been at least
# LAST_ACTION_OFFSET minutes.
current = datetime.datetime.now()
mins_ago = current - datetime.timedelta(
minutes=app.config['LAST_ACTION_OFFSET'])
if g.user and g.user.last_action < mins_ago:
g.user.last_action = current
db.session.commit()
return
g.logged_in = False
g.user = None
return
def submit_lifecycle_ignition(request_raw_query):
"""
Lifecycle Ignition
---
tags:
- lifecycle
responses:
200:
description: A JSON of the ignition status
"""
try:
machine_ignition = json.loads(request.get_data())
except ValueError:
app.logger.error("%s have incorrect content" % request.path)
return jsonify({"message": "FlaskValueError"}), 406
req = requests.get("%s/ignition?%s" % (EC.matchbox_uri, request_raw_query))
try:
matchbox_ignition = json.loads(req.content)
req.close()
except ValueError:
app.logger.error("%s have incorrect matchbox return" % request.path)
return jsonify({"message": "MatchboxValueError"}), 406
@smartdb.cockroach_transaction
def op(caller=request.url_rule):
with SMART.new_session() as session:
try:
inject = crud.InjectLifecycle(session, request_raw_query=request_raw_query)
if json.dumps(machine_ignition, sort_keys=True) == json.dumps(matchbox_ignition, sort_keys=True):
inject.refresh_lifecycle_ignition(True)
return jsonify({"message": "Up-to-date"}), 200
else:
inject.refresh_lifecycle_ignition(False)
return jsonify({"message": "Outdated"}), 210
except AttributeError:
return jsonify({"message": "Unknown"}), 406
return op(caller=request.url_rule)
def lifecycle_rolling_delete(request_raw_query):
"""
Lifecycle Rolling Update
Disable the current policy for a given machine by UUID or MAC
---
tags:
- lifecycle
parameters:
- name: request_raw_query
in: path
description: Pass the mac as 'mac=<mac>'
required: true
type: string
responses:
200:
description: Rolling Update is not enable
schema:
type: dict
"""
app.logger.info("%s %s" % (request.method, request.url))
@smartdb.cockroach_transaction
def op(caller=request.url_rule):
with SMART.new_session() as session:
life = crud.InjectLifecycle(session, request_raw_query)
life.apply_lifecycle_rolling(False, None)
return jsonify({"enable": False, "request_raw_query": request_raw_query}), 200
return op(caller=request.url_rule)
def report_lifecycle_coreos_install(status, request_raw_query):
"""
Lifecycle CoreOS Install
Report the status of a CoreOS install by MAC
---
tags:
- lifecycle
responses:
200:
description: CoreOS Install report
schema:
type: dict
"""
app.logger.info("%s %s" % (request.method, request.url))
if status.lower() == "success":
success = True
elif status.lower() == "fail":
success = False
else:
app.logger.error("%s %s" % (request.method, request.url))
return "success or fail != %s" % status.lower(), 403
@smartdb.cockroach_transaction
def op(caller=request.url_rule):
with SMART.new_session() as session:
inject = crud.InjectLifecycle(session, request_raw_query=request_raw_query)
inject.refresh_lifecycle_coreos_install(success)
op(caller=request.url_rule)
repositories.machine_state.update(
mac=tools.get_mac_from_raw_query(request_raw_query),
state=MachineStates.installation_succeed if success else MachineStates.installation_failed)
return jsonify({"success": success, "request_raw_query": request_raw_query}), 200
def before_request():
g.user = current_user
if should_redirect(current_user, request.url_rule):
redirect_to('home')
def _has_fr_route(self):
"""Encapsulating the rules for whether the request was to a Flask endpoint"""
# 404's, 405's, which might not have a url_rule
if self._should_use_fr_error_handler():
return True
# for all other errors, just check if FR dispatched the route
if not request.url_rule:
return False
return self.owns_endpoint(request.url_rule.endpoint)
def _has_fr_route(self):
"""Encapsulating the rules for whether the request was to a Flask endpoint"""
# 404's, 405's, which might not have a url_rule
if self._should_use_fr_error_handler():
return True
# for all other errors, just check if FR dispatched the route
if not request.url_rule:
return False
return self.owns_endpoint(request.url_rule.endpoint)
def before_request():
g.res = LunabitResponse()
# all routes must have a trailing slash, will append if doesn't exist
if not request.path.endswith('/'):
request.path = ''.join([request.path, '/'])
# removing api version from beginning of string for proper routing
request_path = str(request.url_rule) if request.url_rule else request.path
url_rules = [r.rule for r in app.url_map.iter_rules()]
if request_path in url_rules:
for r in app.url_map.iter_rules():
if r.rule == request_path and request.method in r.methods:
print(r.methods)
try:
g.req = LunabitRequest.from_data(request.method, request)
g.user = g.req.user
break
except GridlightException as e:
raise InvalidUsage(message=str(e))
else:
err = ''.join(["URL '", request_path, "' does not allow the method '", request.method,"'."])
print_log(err)
raise InvalidUsage(message=err, status_code=405)
else:
err = ''.join(["URL '", request_path, "' does not exist."])
print_log(err)
raise InvalidUsage(message=err, status_code=404)
def change_lifecycle_rolling(request_raw_query):
"""
Lifecycle Rolling Update
Change the current policy for a given machine by MAC
---
tags:
- lifecycle
parameters:
- name: request_raw_query
in: path
description: Pass the mac as 'mac=<mac>'
required: true
type: string
responses:
200:
description: Rolling Update is enable
schema:
type: dict
401:
description: Mac address is not in database
schema:
type: dict
"""
app.logger.info("%s %s" % (request.method, request.url))
try:
strategy = json.loads(request.get_data())["strategy"]
app.logger.info("%s %s rolling strategy: setting to %s" % (request.method, request.url, strategy))
except (KeyError, ValueError):
# JSONDecodeError is a subclass of ValueError
# Cannot use JSONDecodeError because the import is not consistent between python3.X
app.logger.info("%s %s rolling strategy: setting default to kexec" % (request.method, request.url))
strategy = "kexec"
@smartdb.cockroach_transaction
def op(caller=request.url_rule):
with SMART.new_session() as session:
try:
life = crud.InjectLifecycle(session, request_raw_query)
life.apply_lifecycle_rolling(True, strategy)
return jsonify({"enable": True, "request_raw_query": request_raw_query, "strategy": strategy}), 200
except AttributeError:
return jsonify({"enable": None, "request_raw_query": request_raw_query, "strategy": strategy}), 401
return op(caller=request.url_rule)
def __inject_flask_g(*args, **kwargs):
if str(request.url_rule) == '/static/<path:filename>':
return
homeworks = HwSet(app.config['HOMEWORK_DIR'],[''])
if current_user.is_authenticated():
mongouser = app.config['USERS_COLLECTION'].find_one({"_id": current_user.name})
if mongouser is None:
session['course'] = None
return
if len(mongouser['course']) != 0:
session['course'] = mongouser['course']
if session.get('course') is not None:
problem_dict = mongouser['problem_list']
course_name = session['course']
course = app.config['COURSE_COLLECTION'].find_one({"name": course_name})
if course == None or not(os.path.isdir(os.path.join(app.config['HOMEWORK_DIR_FOR_CLASS'],course_name))):
session['course'] = None
return
if not os.path.isdir(course["path"]):
session['course'] = None
if app.config['COURSE_COLLECTION'].count({"name":course}) > 0:
app.config['COURSE_COLLECTION'].remove({"name":course})
return
problem_list = problem_dict.get(course_name,'key_error')
if current_user.is_admin:
problem_list = course['problem_list']
if (not current_user.is_admin) and (problem_list == 'key_error' or (len(problem_list) == 0) or (not_int_list(problem_list,course['problem_list'])) or (not_cover_list(problem_list,course['problem_list']))) and (len(course['problem_list']) != 0):
problem_list = getproblemlist(course['problem_list'],app.config['HOMEWORK_NUM'])
problem_dict.update({course_name:problem_list})
app.config['USERS_COLLECTION'].remove({"_id":mongouser['_id']})
app.config['USERS_COLLECTION'].insert({"_id":mongouser['_id'],"password":mongouser['password'],"problem_list":problem_dict,"course":mongouser['course']})
string = str(problem_list)
course_path = os.path.join(app.config['COURSE_HOMEWORK_DIR'],course_name)
if string == "key_error":
homeworks = HwSet(course_path,[''])
else:
tmplist = string.split('@')
list = [item for item in tmplist]
homeworks = HwSet(course_path,list)
g.homeworks = HwSetProxy(homeworks)
# g.utcnow will be used in templates/homework.html to determine some
# visual styles
g.utcnow = utc_now()