def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'false', 'no')
python类environ()的实例源码
def after_request(resp):
"""
Adds HTTP Headers to the response
Arguments
----------
resp : flask.Response object
the Flask response object
"""
resp.headers['Cache-Control'] = 'no-cache'
if app.config['DEV']:
resp.headers['Access-Control-Allow-Origin'] = '*'
resp.headers['Access-Control-Allow-Methods'] = 'GET, POST'
resp.headers['Access-Control-Allow-Headers'] = 'Origin, X-Requested-With, Content-Type, Accept'
source_ip = get_real_source_ip()
structured_log(
level='info',
msg="HTTP Request",
method=request.method,
uri=request.path,
status=resp.status_code,
src_ip=source_ip,
protocol=request.environ['SERVER_PROTOCOL'],
user_agent=request.environ.get('HTTP_USER_AGENT', '-')
)
return resp
def get_real_source_ip():
"""
Returns the real source IP address of the HTTP request.
"""
if 'X-Forwarded-For' in request.headers:
return request.headers.getlist("X-Forwarded-For")[0].rpartition(' ')[-1]
else:
return request.environ['REMOTE_ADDR']
def register():
with open('users.pkl', 'r') as f:
userdict = load(f)
if request.method == 'GET':
logdate = datetime.strftime(date.today(), '%Y-%m-%d')
logfn = './logs/activity-'+logdate
user = 'Anonymous' # Username is Anonymous by default
if 'token' in session:
token = session['token']
tokenfilename = 'registered/'+token
with open(tokenfilename, 'r') as f: # for getting username associated with the set token
user = f.readline()[:-1]
with open(logfn, 'a') as f: # logging username, IP addr, end-point, request type
log = user+' '+request.environ['REMOTE_ADDR']+' register'+' GET\n'
f.write(log)
return render_template('register.html', userdict=userdict) # inputs for name, email, timezone, phone, aboutme
elif request.method == 'POST':
uname = str(request.form['uname']) # mandatory
email = str(request.form['email']) # mandatory
timezone = request.form['timezone'] # optional
phone = request.form['phone'] # optional
aboutme = request.form['aboutme'] # optional
token = pbkdf2_sha512.encrypt(email) # setting the token as a salted and hashed email
token = token.replace('/', '+') # filenames can't contain '/'
session['token'] = token # set token in session key on successful registration
fn = 'registered' + path.sep + token
with open(fn, 'w') as f: # write reviewer info into a file named by the token
f.write(uname+'\n'+email+'\n'+timezone+'\n'+phone+'\n'+aboutme+'\n')
f.write('--files--\n')
userdict[uname] = email
with open('users.pkl', 'w') as f: # dictionary with keys as usernames and values as emails
dump(userdict, f)
send_email(email, 'Welcome to AROWF!', 'registration_mail', name=uname, token=token) # send welcome email with token
logdate = datetime.strftime(date.today(), '%Y-%m-%d')
logfn = './logs/activity-'+logdate
with open(logfn, 'a') as f: # logging username, IP addr, end-point, request type
log = uname+' '+request.environ['REMOTE_ADDR']+' register'+' POST\n'
f.write(log)
return redirect(url_for('index'))
def token():
if request.method == 'GET':
logdate = datetime.strftime(date.today(), '%Y-%m-%d')
logfn = './logs/activity-'+logdate
user = 'Anonymous' # Username is Anonymous by default
if 'token' in session:
token = session['token']
tokenfilename = 'registered/'+token
with open(tokenfilename, 'r') as f: # for getting username associated with the set token
user = f.readline()[:-1]
with open(logfn, 'a') as f: # logging username, IP addr, end-point, request type
log = user+' '+request.environ['REMOTE_ADDR']+' token'+' GET\n'
f.write(log)
tokenNames = listdir('registered/') # get list of all tokens
return render_template('token.html', user=user, tokenNames=tokenNames) # displays links to help docs for each end-point
elif request.method == 'POST': # if token not set in session key
if request.form['tokeninput'] != 'null':
session['token'] = request.form['tokeninput'] # obtain from form and set it
else:
session.pop('token', None)
logdate = datetime.strftime(date.today(), '%Y-%m-%d')
logfn = './logs/activity-'+logdate
user = 'Anonymous' # Username is Anonymous by default
if 'token' in session:
token = session['token']
tokenfilename = 'registered/'+token
with open(tokenfilename, 'r') as f: # for getting username associated with the set token
user = f.readline()[:-1]
with open(logfn, 'a') as f: # logging username, IP addr, end-point, request type
log = user+' '+request.environ['REMOTE_ADDR']+' token'+' POST\n'
f.write(log)
return redirect(url_for('index'))
def page_not_found(error):
logdate = datetime.strftime(date.today(), '%Y-%m-%d')
logfn = './logs/activity-'+logdate
user = 'Anonymous' # Username is Anonymous by default
if 'token' in session:
token = session['token']
tokenfilename = 'registered/'+token
with open(tokenfilename, 'r') as f: # for getting username associated with the set token
user = f.readline()[:-1]
with open(logfn, 'a') as f: # logging username, IP addr, error code
log = user+' '+request.environ['REMOTE_ADDR']+' 404\n'
f.write(log)
return render_template('404.html'), 404
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'false', 'no')
def update_entry():
"""Add a new entry to the bibliography."""
form = BiblioForm()
article_name = request.environ["HTTP_REFERER"].split("=")[-1]
if form.validate_on_submit():
article = BiblioEntry.query.filter_by(ID=form.ID.data).first()
article.ID = form.ID.data
article.ENTRYTYPE = form.typ.data
article.authors = form.author.data
article.title = form.title.data
article.year = form.year.data
article.journal = form.journal.data
article.school = form.school.data
article.url = form.url.data
article.keywords = form.keywords.data
article.tag = form.tag.data
db.session.add(article)
user = current_user.name
event = Event(author=user, article=form.ID.data,
event="UPDATE", time=time.time())
db.session.add(event)
db.session.commit()
return redirect("/biblio/article=" + article_name)
return redirect("/biblio")
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'false', 'no')
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'false', 'no')
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'false', 'no')
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'false', 'no')
def ensure_secure_request():
is_request_secure = request.environ['wsgi.url_scheme'] == 'https'
if not is_request_secure and not config.allow_insecure_transport:
if request.method in ('POST', 'PUT', 'PATCH'):
# request body already sent in insecure manner
# return error in this case to notify cluster admin
return abort(400)
else:
return redirect(request.url.replace('http://', 'https://', 1))
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'false', 'no')
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'false', 'no')
def swagger(file_name):
with open('/usr/local/share/ari/api-docs/{file_name}'.format(file_name=file_name), 'r') as swagger_file:
swagger_spec = swagger_file.read()
swagger_spec = swagger_spec.replace('localhost:8088', 'ari:{port}'.format(port=request.environ['SERVER_PORT']))
return make_response(swagger_spec, 200, {'Content-Type': 'application/json'})
def swagger(file_name):
with open('/usr/local/share/ari/api-docs/{file_name}'.format(file_name=file_name), 'r') as swagger_file:
swagger_spec = swagger_file.read()
swagger_spec = swagger_spec.replace('localhost:8088', 'ari:{port}'.format(port=request.environ['SERVER_PORT']))
return make_response(swagger_spec, 200, {'Content-Type': 'application/json'})
def swagger(file_name):
with open('/usr/local/share/ari/api-docs/{file_name}'.format(file_name=file_name), 'r') as swagger_file:
swagger_spec = swagger_file.read()
swagger_spec = swagger_spec.replace('localhost:8088', 'ari:{port}'.format(port=request.environ['SERVER_PORT']))
return make_response(swagger_spec, 200, {'Content-Type': 'application/json'})
def swagger(file_name):
with open('/usr/local/share/ari/api-docs/{file_name}'.format(file_name=file_name), 'r') as swagger_file:
swagger_spec = swagger_file.read()
swagger_spec = swagger_spec.replace('localhost:8088', 'ari:{port}'.format(port=request.environ['SERVER_PORT']))
return make_response(swagger_spec, 200, {'Content-Type': 'application/json'})
def swagger(file_name):
with open('/usr/local/share/ari/api-docs/{file_name}'.format(file_name=file_name), 'r') as swagger_file:
swagger_spec = swagger_file.read()
swagger_spec = swagger_spec.replace('localhost:8088', 'ari:{port}'.format(port=request.environ['SERVER_PORT']))
return make_response(swagger_spec, 200, {'Content-Type': 'application/json'})