def request_validate(view):
@wraps(view)
def wrapper(*args, **kwargs):
endpoint = request.endpoint.partition('.')[-1]
# data
method = request.method
if method == 'HEAD':
method = 'GET'
locations = validators.get((endpoint, method), {})
data_type = {"json": "request_data", "args": "request_args"}
for location, schema in locations.items():
value = getattr(request, location, MultiDict())
validator = FlaskValidatorAdaptor(schema)
result = validator.validate(value)
LOG.info("Validated request %s: %s" % (location, result))
if schema.get("maxProperties") == 0:
continue
else:
kwargs[data_type[location]] = result
context = request.environ['context']
return view(*args, context=context, **kwargs)
return wrapper
python类environ()的实例源码
def auth():
"""
Check user/password and returns token if valid
"""
if settings.API.get('forwarded_host'):
try:
if not request.environ['HTTP_X_FORWARDED_HOST'] == settings.API['forwarded_host']:
raise BadRequest('Invalid HTTP_X_FORWARDED_HOST')
except KeyError:
raise BadRequest('Missing HTTP_X_FORWARDED_HOST')
body = request.get_json()
authenticated, ret = GeneralController.auth(body)
if authenticated:
return ret
else:
raise Unauthorized(ret)
def _setup_requests(app):
def _init_request():
session = request.environ['beaker.session']
session.save()
_setup_connector(
app=current_app,
app_config=current_app.config,
session=session
)
@app.before_request
def before_request():
init_request = _init_request()
return init_request
def createTeam():
""" """
if request.environ.get('HTTP_ORIGIN') == request.host_url[:-1]:
newTeam = Team.query.filter_by(team_name=request.args['team']).first()
dbEnv = AresSql.SqliteDB(request.args['report_name'])
if not newTeam:
team = Team(request.args['team'], request.args['team_email'])
db.session.add(team)
db.session.commit()
newTeam = Team.query.filter_by(team_name=request.args['team']).first()
team_id = newTeam.team_id
role = request.args.get('role', 'user')
dbEnv.modify("""INSERT INTO team_def (team_id, team_name, role) VALUES (%s, '%s', '%s');
INSERT INTO env_auth (env_id, team_id)
SELECT env_def.env_id, %s
FROM env_def
WHERE env_def.env_name = '%s' ;""" % (team_id, request.args['team'], role, team_id, request.args['report_name']))
return json.dumps('Success'), 200
return json.dumps('Forbidden', 403)
def page_not_found(error):
logdate = datetime.strftime(date.today(), '%Y-%m-%d')
logfn = './logs/activity-'+logdate
user = 'Anonymous' # Username is Anonymous by default
if 'token' in session:
token = session['token']
tokenfilename = 'registered/'+token
with open(tokenfilename, 'r') as f: # for getting username associated with the set token
user = f.readline()[:-1]
with open(logfn, 'a') as f: # logging username, IP addr, error code
log = user+' '+request.environ['REMOTE_ADDR']+' 500\n'
f.write(log)
return render_template('500.html'), 500
# No cache
def post_comment():
"""Add post to article."""
form = PostForm()
article = request.environ["HTTP_REFERER"].split("=")[-1]
tim = time.time()
user = current_user.name
post = Post(author=user, article=article,
message=form.message.data, time=tim)
db.session.add(post)
user = current_user.name
event = Event(author=user, article=article,
event="COMMENT", time=time.time())
db.session.add(event)
db.session.commit()
return redirect("/biblio/article=" + article)
def send_error_mail(exception):
"""Sends an error mail to the admin containing the traceback and configuration.
After that, a custom HTTP 500 page is shown.
:param exception: the exception raised
:type exception: ``Exception``
:return:
the HTML to send back in the response, and the HTTP code 500.
:rtype: str, int
"""
# Inspired from <https://github.com/jasonwyatt/Flask-ErrorMail>.
message = Message("Join2 ORCID exception: %s" % exception, sender=CFG_SITE_ADMIN_EMAIL,
recipients=[CFG_SITE_ADMIN_EMAIL])
message_contents = ["Traceback:", "=" * 80, traceback.format_exc(), "\n", "Request Information:", "=" * 80]
environ = request.environ
for key in sorted(environ.keys()):
message_contents.append("%s: %s" % (key, environ.get(key)))
message.body = "\n".join(message_contents) + "\n"
mailer.send(message)
return render_template("500.html"), 500
def run_flask_request(environ):
from .wsgi_aux import app
if '_wsgi.input' in environ:
environ['wsgi.input'] = BytesIO(environ['_wsgi.input'])
# Create a request context similar to that of the original request
# so that the task can have access to flask.g, flask.request, etc.
with app.request_context(environ):
# Record the fact that we are running in the Celery worker now
g.in_celery = True
# Run the route function and record the response
try:
rv = app.full_dispatch_request()
except:
# If we are in debug mode we want to see the exception
# Else, return a 500 error
if app.debug:
raise
rv = app.make_response(InternalServerError())
return (rv.get_data(), rv.status_code, rv.headers)
def wrapped(*args, **kwargs):
# If we are already running the request on the celery side, then we
# just call the wrapped function to allow the request to execute.
if getattr(g, 'in_celery', False):
return f(*args, **kwargs)
# If we are on the Flask side, we need to launch the Celery task,
# passing the request environment, which will be used to reconstruct
# the request object. The request body has to be handled as a special
# case, since WSGI requires it to be provided as a file-like object.
environ = {k: v for k, v in request.environ.items()
if isinstance(v, text_types)}
if 'wsgi.input' in request.environ:
environ['_wsgi.input'] = request.get_data()
t = run_flask_request.apply_async(args=(environ,))
# Return a 202 response, with a link that the client can use to
# obtain task status that is based on the Celery task id.
if t.state == states.PENDING or t.state == states.RECEIVED or \
t.state == states.STARTED:
return '', 202, {'Location': url_for('tasks.get_status', id=t.id)}
# If the task already finished, return its return value as response.
# This would be the case when CELERY_ALWAYS_EAGER is set to True.
return t.info
def error():
"""This endpoint is used by httpd, which redirects its errors to it."""
try:
status = int(request.environ['REDIRECT_STATUS'])
except Exception:
# if there's an exception, it means that a client accessed this directly;
# in this case, we want to make it look like the endpoint is not here
return api_404_handler()
msg = 'Unknown error'
# for now, we just provide specific error for stuff that already happened;
# before adding more, I'd like to see them actually happening with reproducers
if status == 401:
msg = 'Authentication failed'
elif status == 405:
msg = 'Method not allowed for this endpoint'
raise HTTPError(status, msg)
def flask_cache_key(*args, **kwargs):
"""Create a key for use by Flask-Caching.
Args:
None
Returns:
result: Key to be used
"""
# Use the request URI as part of the key
path = request.path
# This helps to differentiate between various instances of infoset
# each running on different ports
server_port = request.environ['SERVER_PORT']
# Use a hash of the request arguments as part of the key
args = str(hash(frozenset(request.args.items())))
# Return
result = (
'infoset_flask_{}_{}_{}'.format(
path, server_port, args)[:255].encode('utf-8'))
return result
def run_ctx_request(environ):
"""
run flask request context in celery worker
"""
from blueprints import app # wsgi.app
if '_wsgi.input' in environ:
# an input stream (file-like object) from which the HTTP request body can be read.
# detail: https://www.python.org/dev/peps/pep-0333/#environ-variables
environ['wsgi.input'] = BytesIO(environ['_wsgi.input'])
with app.request_context():
g.in_celery = True
try:
rv = app.full_dispatch_request()
except InternalServerError:
if app.debug:
raise
return app.make_response(InternalServerError())
return (rv.get_data(), rv.status_code, rv.headers)
def decorator(*args, **kwargs):
if getattr(g, 'in_celery', False):
return f(*args, **kwargs)
environ = {k: v for k, v in request.environ.items()
if isinstance(v, text_types)}
if 'wsgi.input' in request.environ:
environ['_wsgi.input'] = request.get_data() # request.body
task = run_ctx_request.apply_async(args=(environ,))
if task.state == states.PENDING or task.state == states.RECEIVED or \
task.state == states.STARTED:
return '', 202, {'Location': url_for('api.get_status', id=task.id)}
return task.info
def __init__(self, rate_limiter):
super(LogApiCallCount, self).__init__()
if self.stats:
tags = ['version:'+str(Config.API_VERSION_MINOR),
'application:rest-api',
'api-key:'+rate_limiter._auth_key.id,
'method:'+request.environ['REQUEST_METHOD'],
'endpoint:'+request.url_rule.rule.split('<')[0].split(request.blueprint)[1]]
ip = get_remote_addr()
self.stats.track(get_remote_addr(),
'api_call_count',
properties=dict(tags=tags,
ip=ip,
ignore_time = True,
))
# self.stats.increment('api.call.count',
# tags=tags,)
def ensure_project_exists(view):
@wraps(view)
def wrapper(*args, **kwargs):
context = request.environ['context']
if context.using_keystone:
find_or_create_project(request, context)
return view(*args, **kwargs)
return wrapper
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'false', 'no')
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'false', 'no')
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'false', 'no')
def twitter():
auth = tweepy.OAuthHandler("T4NRPcEtUrCEU58FesRmRtkdW", "zmpbytgPpSbro6RZcXsKgYQoz24zLH3vYZHOHAAs5j33P4eoRg", "http://"+ request.environ["HTTP_HOST"] + "/auth/twitter")
auth.set_access_token(config.TWITTER_ACCESS_TOKEN, config.TWITTER_ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)
try:
if api.me().name:
return redirect(url_for('index'))
except tweepy.TweepError:
pass
redirect_url = auth.get_authorization_url()
session["request_token"] = auth.request_token
return redirect(redirect_url)
def make_response(self, rv):
status_or_headers = headers = None
if isinstance(rv, tuple):
rv, status_or_headers, headers = rv + (None,) * (3 - len(rv))
if rv is None:
raise ValueError('View function did not return a response')
if isinstance(status_or_headers, (dict, list)):
headers, status_or_headers = status_or_headers, None
if not isinstance(rv, self.response_class):
# When we create a response object directly, we let the constructor
# set the headers and status. We do this because there can be
# some extra logic involved when creating these objects with
# specific values (like default content type selection).
if isinstance(rv, (JSONRender, text_type, bytes, bytearray, list, dict)):
rv = self.response_class(rv, headers=headers, status=status_or_headers)
headers = status_or_headers = None
else:
rv = self.response_class.force_type(rv, request.environ)
if status_or_headers is not None:
if isinstance(status_or_headers, string_types):
rv.status = status_or_headers
else:
rv.status_code = status_or_headers
if headers:
rv.headers.extend(headers)
return rv
def force_type(cls, rv, environ=None):
if isinstance(rv, dict) or isinstance(rv, list):
rv = Response(
json.dumps(
rv,
cls=TimestampJSONEncoder,
),
content_type='application/json'
)
return super(CerberusResponse, cls).force_type(rv, environ)
def get_http_info_with_retriever(self, retriever=None):
"""
Exact method for getting http_info but with form data work around.
"""
if retriever is None:
retriever = self.get_form_data
url_parts = urlparse.urlsplit(request.url)
try:
data = retriever()
except ClientDisconnected:
data = {}
headers = dict(get_headers(request.environ))
if self.data_blacklist:
data = apply_blacklist(data, self.data_blacklist)
if self.headers_blacklist:
headers = apply_blacklist(headers, self.headers_blacklist)
return {
'url': '%s://%s%s' % (url_parts.scheme, url_parts.netloc, url_parts.path),
'query_string': url_parts.query,
'method': request.method,
'data': data,
'headers': headers,
'env': dict(get_environ(request.environ)),
}
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'false', 'no')
def get_script_url():
# Dynamically determine the script's url
# For production use, this is not a great idea. Instead, set it
# explicitly. Remember that for production, webhook urls must start with
# https!
my_url = rm_queryparameters(full_url(request.environ))
# See http://flask.pocoo.org/docs/0.10/api/#flask.request
return my_url
def url_origin(s, use_forwarded_host = False):
# testing if Heroku includes forwarding host
use_forwarded_host = True
include_protocol = True
ssl = (('HTTPS' in s) and s['HTTPS'] == 'on')
sp = s['SERVER_PROTOCOL'].lower()
protocol = sp[:sp.find('/')] + ('s' if ssl else '' )
port = s['SERVER_PORT']
port = '' if ((not ssl and port=='80') or (ssl and port=='443')) else (':' + port)
host = s['HTTP_X_FORWARDED_HOST'] if (use_forwarded_host and ('HTTP_X_FORWARDED_HOST' in s)) \
else (s['HTTP_HOST'] if ('HTTP_HOST' in s) else None)
host = host if (host != None) else (s['SERVER_NAME'] + port)
# The protocol can easily be wrong if we're frontended by a HTTPS proxy
# (Like the standard Heroku setup!)
on_heroku = heroku_env in os.environ
upgrade_insecure_request = request.headers.get('Upgrade-Insecure-Requests')
upgrade_insecure_request = upgrade_insecure_request and upgrade_insecure_request == 1
https_proto = request.headers.get('X-Forwarded-Proto')
https_proto = https_proto and https_proto == 'https'
use_https = on_heroku or upgrade_insecure_request or https_proto
if use_https: # Special handling
protocol = "https"
return protocol + '://' + host
def editScript():
""" """
scriptDtls = request.environ['HTTP_REFERER'].split('?')[0].split('/')
if scriptDtls[-2] == 'run':
script = open(os.path.join(config.ARES_USERS_LOCATION, scriptDtls[-1], "%s.py" % scriptDtls[-1]), 'w')
else:
script = open(os.path.join(config.ARES_USERS_LOCATION, scriptDtls[-2], "%s.py" % scriptDtls[-1]), 'w')
try:
for line in request.data.decode('utf-8').split('\n'):
script.write('%s\n' % line)
except Exception as e:
logging.debug(e)
finally:
script.close()
return 'OK'
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'false', 'no')
def on_connect():
addr = request.environ['REMOTE_ADDR']
if addr in blacklist:
print('{} was found in the blacklist and was rejected'.format(addr))
shard.update_connection(request)
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'false', 'no')
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'false', 'no')