def register():
if request.method == 'GET':
return redirect(url_for('auth_view.authenticate'))
register_form = RegisterForm()
if register_form.validate() and register_form.email.data:
email = register_form.email.data
firstname = register_form.firstname.data
lastname = register_form.lastname.data
NodeDefender.db.user.create(email, firstname, lastname)
NodeDefender.db.user.enable(email)
NodeDefender.db.user.set_password(email, register_form.password.data)
NodeDefender.mail.user.confirm_user(email)
flash('Register Successful, please login', 'success')
else:
flash('Error doing register, please try again', 'error')
return redirect(url_for('auth_view.authenticate'))
flash('error', 'error')
return redirect(url_for('auth_view.authenticate'))
python类method()的实例源码
def register_token(token):
user = NodeDefender.db.user.get(serializer.loads_salted(token))
if user is None:
flash('Invalid Token', 'error')
return redirect(url_for('index'))
register_form = RegisterTokenForm()
if request.method == 'GET':
return render_template('frontend/auth/register.html', RegisterForm =
register_form, user = user)
if register_form.validate_on_submit():
user.firstname = register_form.firstname.data
user.lastname = register_form.lastname.data
user.enabled = True
user.confirmed_at = datetime.now()
NodeDefender.db.user.save_sql(user)
NodeDefender.db.user.set_password(user.email, register_form.password.data)
NodeDefender.mail.user.confirm_user(user.email)
flash('Register Successful, please login', 'success')
else:
flash('Error doing register, please try again', 'error')
return redirect(url_for('auth_view.login'))
return redirect(url_for('auth_view.login'))
def admin_server():
MQTTList = NodeDefender.db.mqtt.list(user = current_user.email)
MQTT = CreateMQTTForm()
if request.method == 'GET':
return render_template('frontend/admin/server.html',
MQTTList = MQTTList, MQTTForm = MQTT)
if MQTT.Submit.data and MQTT.validate_on_submit():
try:
NodeDefender.db.mqtt.create(MQTT.IPAddr.data, MQTT.Port.data)
NodeDefender.mqtt.connection.add(MQTT.IPAddr.data, MQTT.Port.data)
except ValueError as e:
flash('Error: {}'.format(e), 'danger')
return redirect(url_for('admin_view.admin_server'))
if General.Submit.data and General.validate_on_submit():
flash('Successfully updated General Settings', 'success')
return redirect(url_for('admin_server'))
else:
flash('Error when trying to update General Settings', 'danger')
return redirect(url_for('admin_view.admin_server'))
flash('{}'.format(e), 'success')
return redirect(url_for('admin_view.admin_server'))
def admin_groups():
GroupForm = CreateGroupForm()
groups = NodeDefender.db.group.list(user_mail = current_user.email)
if request.method == 'GET':
return render_template('frontend/admin/groups.html', groups = groups,
CreateGroupForm = GroupForm)
else:
if not GroupForm.validate_on_submit():
flash('Form not valid', 'danger')
return redirect(url_for('admin_view.admin_groups'))
try:
group = NodeDefender.db.group.create(GroupForm.Name.data)
NodeDefender.db.group.update(group.name, **\
{'email' : GroupForm.Email.data,
'description' :
GroupForm.description.data})
except ValueError as e:
flash('Error: {}'.format(e), 'danger')
return redirect(url_for('admin_view.admin_groups'))
flash('Successfully Created Group: {}'.format(group.name), 'success')
return redirect(url_for('admin_view.admin_group', name =
serializer.dumps(group.name)))
def admin_users():
UserForm = CreateUserForm()
if request.method == 'GET':
if current_user.superuser:
users = NodeDefender.db.user.list()
else:
groups = NodeDefender.db.group.list(current_user.email)
groups = [group.name for group in groups]
users = NodeDefender.db.user.list(*groups)
return render_template('frontend/admin/users.html', Users = users,\
CreateUserForm = UserForm)
if not UserForm.validate():
flash('Error adding user', 'danger')
return redirect(url_for('admin_view.admin_users'))
try:
user = NodeDefender.db.user.create(UserForm.Email.data,
UserForm.Firstname.data,
UserForm.Lastname.data)
except ValueError as e:
flash('Error: {}'.format(e), 'danger')
redirect(url_for('admin_view.admin_users'))
flash('Successfully added user {}'.format(user.firstname), 'success')
return redirect(url_for('admin_view.admin_user', email = user.email))
def admin_user(email):
email = serializer.loads(email)
usersettings = UserSettings()
userpassword = UserPassword()
usergroupadd = UserGroupAdd()
user = NodeDefender.db.user.get(email)
if request.method == 'GET':
if user is None:
flash('User {} not found'.format(id), 'danger')
return redirect(url_for('admin_view.admin_groups'))
return render_template('frontend/admin/user.html', User = user, UserSettings =
usersettings, UserPassword = userpassword,
UserGroupAdd = usergroupadd)
if usersettings.Email.data and usersettings.validate():
NodeDefender.db.user.update(usersettings.Email.data, **\
{'firstname' : usersettings.Firstname.data,
'lastname' : usersettings.Lastname.data})
return redirect(url_for('admin_view.admin_user', email =
serializer.dumps(usersettings.Email.data)))
def protect(self):
if request.method not in current_app.config['WTF_CSRF_METHODS']:
return
try:
validate_csrf(self._get_csrf_token())
except ValidationError as e:
logger.info(e.args[0])
self._error_response(e.args[0])
if request.is_secure and current_app.config['WTF_CSRF_SSL_STRICT']:
if not request.referrer:
self._error_response('The referrer header is missing.')
good_referrer = 'https://{0}/'.format(request.host)
if not same_origin(request.referrer, good_referrer):
self._error_response('The referrer does not match the host.')
g.csrf_valid = True # mark this request as CSRF valid
def register_extension(app):
@app.before_request
def add_correlation_id(*args, **kw):
correlation_id = request.headers.get(CORRELATION_ID)
log.debug("%s %s", request.method, request.url)
if not correlation_id:
correlation_id = str(uuid.uuid4())
if request.method != "GET":
"""
TODO: remove sensitive information such as username/password
"""
log.debug({
"message": "Tracking request",
"correlation_id": correlation_id,
"method": request.method,
"uri": request.url,
"data": request.data,
})
request.correlation_id = correlation_id
@app.after_request
def save_correlation_id(response):
if CORRELATION_ID not in response.headers:
response.headers[CORRELATION_ID] = getattr(request, "correlation_id", None)
return response
def admin_view():
if request.method == 'POST':
username = request.form.get('name')
password = request.form.get('password')
admin_user= Teams.query.filter_by(name=request.form['name'], admin=True).first()
if admin_user and bcrypt_sha256.verify(request.form['password'], admin_user.password):
try:
session.regenerate() # NO SESSION FIXATION FOR YOU
except:
pass # TODO: Some session objects dont implement regenerate :(
session['username'] = admin_user.name
session['id'] = admin_user.id
session['admin'] = True
session['nonce'] = sha512(os.urandom(10))
db.session.close()
return redirect('/admin/graphs')
if is_admin():
return redirect('/admin/graphs')
return render_template('admin/login.html')
def admin_keys(chalid):
if request.method == 'GET':
chal = Challenges.query.filter_by(id=chalid).first_or_404()
json_data = {'keys':[]}
flags = json.loads(chal.flags)
for i, x in enumerate(flags):
json_data['keys'].append({'id':i, 'key':x['flag'], 'type':x['type']})
return jsonify(json_data)
elif request.method == 'POST':
chal = Challenges.query.filter_by(id=chalid).first()
newkeys = request.form.getlist('keys[]')
newvals = request.form.getlist('vals[]')
print(list(zip(newkeys, newvals)))
flags = []
for flag, val in zip(newkeys, newvals):
flag_dict = {'flag':flag, 'type':int(val)}
flags.append(flag_dict)
json_data = json.dumps(flags)
chal.flags = json_data
db.session.commit()
db.session.close()
return '1'
def hello():
form = ReusableForm(request.form)
print(form.errors)
if request.method == 'POST':
name=request.form['name']
password=request.form['password']
email=request.form['email']
print(name, " ", email, " ", password)
if form.validate():
# Save the comment here.
flash('Thank you, ' + name)
else:
flash('Error: All the form fields are required. ')
return render_template('hello.html', form=form)
def login():
if request.method == "POST":
post_data = {
"username": request.form["username"],
"password": request.form["password"],
}
r = requests.post("http://127.0.0.1:8000/api/user/login", data=post_data)
resp = r.json()
if resp["status"] == 1:
session["token"] = r.cookies["token"]
session["api-session"] = r.cookies["flask"]
return redirect("/problems")
else:
flash(resp["message"])
return render_template("login.html")
else:
return render_template("login.html")
def problem(pid):
if "token" not in session:
return redirect("/")
if request.method == "POST":
r = requests.post("http://127.0.0.1:8000/api/problems/submit",
data={
"pid": pid,
"key": request.form["solution"],
},
cookies={
"token": session["token"],
"flask": session["api-session"]
})
app.logger.info(r.text)
app.logger.info(r.status_code)
resp = r.json()
if resp.get("status", 0) == 0:
flash(resp.get("message", "That's not the correct answer"))
return redirect("/problem/" + pid)
else:
return redirect("/problems")
else:
problem = api_get("/api/problems/" + pid)
return render_template("problem.html",
problem=problem["data"])
def __init__(self, options, func, request_context):
self.options = options
self.operation = request.endpoint
self.func = func.__name__
self.method = request.method
self.args = request.args
self.view_args = request.view_args
self.request_context = request_context
self.timing = dict()
self.error = None
self.stack_trace = None
self.request_body = None
self.response_body = None
self.response_headers = None
self.status_code = None
self.success = None
def newMenuItem(restaurant_id):
if request.method == 'POST':
newItem = MenuItem(name=request.form['name'], description=request.form[
'description'], price=request.form['price'], course=request.form['course'], restaurant_id=restaurant_id)
session.add(newItem)
session.commit()
return redirect(url_for('showMenu', restaurant_id=restaurant_id))
else:
return render_template('newMenuItem.html', restaurant_id=restaurant_id)
return render_template('newMenuItem.html', restaurant=restaurant)
# return 'This page is for making a new menu item for restaurant %s'
# %restaurant_id
# Edit a menu item
def editMenuItem(restaurant_id, menu_id):
editedItem = session.query(MenuItem).filter_by(id=menu_id).one()
if request.method == 'POST':
if request.form['name']:
editedItem.name = request.form['name']
if request.form['description']:
editedItem.description = request.form['name']
if request.form['price']:
editedItem.price = request.form['price']
if request.form['course']:
editedItem.course = request.form['course']
session.add(editedItem)
session.commit()
return redirect(url_for('showMenu', restaurant_id=restaurant_id))
else:
return render_template(
'editMenuItem.html', restaurant_id=restaurant_id, menu_id=menu_id, item=editedItem)
# return 'This page is for editing menu item %s' % menu_id
# Delete a menu item
def stop_instance():
res = {}
if request.method == 'POST':
req_data = json.loads(request.data)
db_session = db.Session()
instance_query_result = db_session.query(Instance).filter(\
Instance.container_serial == req_data['container_serial']).first()
if instance_query_result is not None:
policy = {}
policy['operate'] = app.config['STOP']
policy['container_serial'] = req_data['container_serial']
policy['container_name'] = instance_query_result.container_name
policy['user_name'] = req_data['user_name']
message = json.dumps(policy)
ui_mq = UiQueue()
worker_res = ui_mq.send(message)
worker_res_dict = json.loads(worker_res)
res['code'] = worker_res_dict['code']
res['message'] = worker_res_dict['message']
res['container_serial'] = worker_res_dict['container_serial']
eagle_logger.info(res['message'])
else:
res['code'] = '0x9'
res['message'] = 'container not exist'
return jsonify(**res)
def restart_instance():
res = {}
if request.method == 'POST':
req_data = json.loads(request.data)
db_session = db.Session()
instance_query_result = db_session.query(Instance).filter(\
Instance.container_serial == req_data['container_serial']).first()
if instance_query_result is not None:
policy = {}
policy['operate'] = app.config['RESTART']
policy['container_serial'] = req_data['container_serial']
policy['container_name'] = instance_query_result.container_name
policy['user_name'] = req_data['user_name']
message = json.dumps(policy)
ui_mq = UiQueue()
worker_res = ui_mq.send(message)
worker_res_dict = json.loads(worker_res)
res = worker_res_dict
eagle_logger.info(res['message'])
else:
res['code'] = '0x9'
res['message'] = 'container not exist'
return jsonify(**res)
def remove_instance():
res = {}
if request.method == 'POST':
req_data = json.loads(request.data)
db_session = db.Session()
instance_query_result = db_session.query(Instance).filter(\
Instance.container_serial == req_data['container_serial']).first()
if instance_query_result is not None:
policy = {}
policy['operate'] = app.config['REMOVE']
policy['container_serial'] = req_data['container_serial']
policy['user_name'] = req_data['user_name']
message = json.dumps(policy)
ui_mq = UiQueue()
worker_res = ui_mq.send(message)
worker_res_dict = json.loads(worker_res)
res['code'] = worker_res_dict['code']
res['message'] = worker_res_dict['message']
res['container_serial'] = worker_res_dict['container_serial']
eagle_logger.info(res['message'])
else:
res['code'] = '0x9'
res['message'] = 'container not exist'
return jsonify(**res)
def tail():
if request.method == 'POST':
fi = request.form['file']
if os.path.isfile(fi):
n = int(request.form['n'])
le = io.open(fi, 'r', encoding='utf-8')
taildata = le.read()[-n:]
le.close()
else:
taildata = "No such file."
return render_template('tail.html',taildata=taildata)