def render_redberry(template_name, **kwargs):
# For Accelerated Mobile Pages (AMP, ref: https://www.ampproject.org) use templates with a .amp suffix
if '.amp' in request.path:
kwargs['render_amp'] = True
return render_template(template_name, **kwargs)
python类path()的实例源码
def login_required(func):
@wraps(func)
def wrapper(*args, **kwargs):
if request.method == 'POST' :
if not is_authenticated():
abort(401)
else:
return func(*args, **kwargs)
else:
if not is_authenticated():
return redirect("/login/" + "?next=" + request.path)
else:
return func(*args, **kwargs)
return wrapper
def image_list(masterip):
data = {
"user": session['username']
}
# path = request.path[:request.path.rfind("/")]
# path = path[:path.rfind("/")+1]
result = dockletRequest.post("/image/list/", data, masterip)
logger.debug("image" + str(type(result)))
return json.dumps(result)
def monitor_request(comid,infotype,masterip):
data = {
"user": session['username']
}
path = request.path[request.path.find("/")+1:]
path = path[path.find("/")+1:]
path = path[path.find("/")+1:]
logger.debug(path + "_____" + masterip)
result = dockletRequest.post("/monitor/"+path, data, masterip)
logger.debug("monitor" + str(type(result)))
return json.dumps(result)
def monitor_user_request(issue,masterip):
data = {
"user": session['username']
}
path = "/monitor/user/" + str(issue) + "/"
logger.debug(path + "_____" + masterip)
result = dockletRequest.post(path, data, masterip)
logger.debug("monitor" + str(type(result)))
return json.dumps(result)
def jupyter_prefix():
path = request.args.get('next')
if path == None:
return redirect('/login/')
return redirect('/login/'+'?next='+path)
def login_required(func):
@wraps(func)
def wrapper(*args, **kwargs):
global G_usermgr
logger.info ("get request, path: %s" % request.path)
token = request.form.get("token", None)
if (token == None):
return json.dumps({'success':'false', 'message':'user or key is null'})
cur_user = G_usermgr.auth_token(token)
if (cur_user == None):
return json.dumps({'success':'false', 'message':'token failed or expired', 'Unauthorized': 'True'})
return func(cur_user, cur_user.username, request.form, *args, **kwargs)
return wrapper
def get_request_information():
"""
Returns a dictionary of contextual information about the user at the time of logging.
Returns:
The dictionary.
"""
information = {}
if has_request_context():
information["request"] = {
"api_endpoint_method": request.method,
"api_endpoint": request.path,
"ip": request.remote_addr,
"platform": request.user_agent.platform,
"browser": request.user_agent.browser,
"browser_version": request.user_agent.version,
"user_agent":request.user_agent.string
}
if api.auth.is_logged_in():
user = api.user.get_user()
team = api.user.get_team()
groups = api.team.get_groups()
information["user"] = {
"username": user["username"],
"email": user["email"],
"team_name": team["team_name"],
"groups": [group["name"] for group in groups]
}
return information
def teacher_session():
if '/teacher/' in request.path:
if 'credentials' not in flask.session:
return flask.redirect(flask.url_for('index'))
elif not flask.session['is_teacher']:
return flask.redirect(flask.url_for('register'))
def student_session():
if '/student/' in request.path:
if 'credentials' not in flask.session:
return flask.redirect(flask.url_for('index'))
elif not flask.session['is_student']:
return flask.redirect(flask.url_for('register'))
# make sure user is authenticated w/ live session on every request
def manage_session():
# want to go through oauth flow for this route specifically
# not get stuck in redirect loop
if request.path == '/oauth/callback':
return
# allow all users to visit the index page without a session
if request.path == '/' or request.path == '/oauth/logout':
return
# validate that user has valid session
# add the google user info into session
if 'credentials' not in flask.session:
flask.session['redirect'] = request.path
return flask.redirect(flask.url_for('oauth2callback'))
def iam_role_info(api_version, junk=None):
if not _supports_iam(api_version):
return passthrough(request.path)
role_params_from_ip = roles.get_role_params_from_ip(request.remote_addr)
if role_params_from_ip['name']:
log.debug('Providing IAM role info for {0}'.format(role_params_from_ip['name']))
return jsonify(roles.get_role_info_from_params(role_params_from_ip))
else:
log.error('Role name not found; returning 404.')
return '', 404
def iam_role_name(api_version):
if not _supports_iam(api_version):
return passthrough(request.path)
role_params_from_ip = roles.get_role_params_from_ip(request.remote_addr)
if role_params_from_ip['name']:
return role_params_from_ip['name']
else:
log.error('Role name not found; returning 404.')
return '', 404
def load_ip_whitelist_file():
"""?????ip???"""
set_buff = set()
if os.path.exists(zmirror_root(human_ip_verification_whitelist_file_path)):
with open(zmirror_root(human_ip_verification_whitelist_file_path), 'r', encoding='utf-8') as fp:
set_buff.add(fp.readline().strip())
return set_buff
def response_cookies_deep_copy():
"""
It's a BAD hack to get RAW cookies headers, but so far, we don't have better way.
We'd go DEEP inside the urllib's private method to get raw headers
raw_headers example:
[('Cache-Control', 'private'),
('Content-Length', '48234'),
('Content-Type', 'text/html; Charset=utf-8'),
('Server', 'Microsoft-IIS/8.5'),
('Set-Cookie','BoardList=BoardID=Show; expires=Mon, 02-May-2016 16:00:00 GMT; path=/'),
('Set-Cookie','aspsky=abcefgh; expires=Sun, 24-Apr-2016 16:00:00 GMT; path=/; HttpOnly'),
('Set-Cookie', 'ASPSESSIONIDSCSSDSSQ=OGKMLAHDHBFDJCDMGBOAGOMJ; path=/'),
('X-Powered-By', 'ASP.NET'),
('Date', 'Tue, 26 Apr 2016 12:32:40 GMT')]
"""
raw_headers = parse.remote_response.raw._original_response.headers._headers
header_cookies_string_list = []
for name, value in raw_headers:
if name.lower() == 'set-cookie':
if my_host_scheme == 'http://':
value = value.replace('Secure;', '')
value = value.replace(';Secure', ';')
value = value.replace('; Secure', ';')
if 'httponly' in value.lower():
if enable_aggressive_cookies_path_rewrite:
# ??cookie path??, ???path???? /
value = regex_cookie_path_rewriter.sub('path=/;', value)
elif enable_aggressive_cookies_path_rewrite is not None:
# ??HttpOnly Cookies?path???url?
# eg(/extdomains/a.foobar.com): path=/verify; -> path=/extdomains/a.foobar.com/verify
if parse.remote_domain not in domain_alias_to_target_set: # do not rewrite main domains
value = regex_cookie_path_rewriter.sub(
'\g<prefix>=/extdomains/' + parse.remote_domain + '\g<path>', value)
header_cookies_string_list.append(value)
return header_cookies_string_list
def assemble_parse():
"""??????URL???????????URL"""
_temp = decode_mirror_url()
parse.remote_domain = _temp['domain'] # type: str
parse.is_https = _temp['is_https'] # type: bool
parse.remote_path = _temp['path'] # type: str
parse.remote_path_query = _temp['path_query'] # type: str
parse.is_external_domain = is_external_domain(parse.remote_domain)
parse.remote_url = assemble_remote_url() # type: str
parse.url_no_scheme = parse.remote_url[parse.remote_url.find('//') + 2:] # type: str
recent_domains[parse.remote_domain] = True # ?????????
dbgprint('after assemble_parse, url:', parse.remote_url, ' path_query:', parse.remote_path_query)
def delete(short_name):
(project, owner, n_tasks,
n_task_runs, overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
title = project_title(project, "Delete")
ensure_authorized_to('read', project)
ensure_authorized_to('delete', project)
pro = pro_features()
project_sanitized, owner_sanitized = sanitize_project_owner(project, owner, current_user)
if request.method == 'GET':
response = dict(template='/projects/delete.html',
title=title,
project=project_sanitized,
owner=owner_sanitized,
n_tasks=n_tasks,
overall_progress=overall_progress,
last_activity=last_activity,
pro_features=pro,
csrf=generate_csrf())
return handle_content_type(response)
######### this block was edited by shruthi
if("directory_names" in project.info.keys()):
for i in project.info["directory_names"]:
if os.path.exists(i):
shutil.rmtree(i)#deleting the actual folder
project_repo.delete(project)
########## end block
auditlogger.add_log_entry(project, None, current_user)
flash(gettext('Project deleted!'), 'success')
return redirect_content_type(url_for('account.profile', name=current_user.name))
def _check_if_redirect_to_password(project):
cookie_exp = current_app.config.get('PASSWD_COOKIE_TIMEOUT')
passwd_mngr = ProjectPasswdManager(CookieHandler(request, signer, cookie_exp))
if passwd_mngr.password_needed(project, get_user_id_or_ip()):
return redirect(url_for('.password_required',
short_name=project.short_name, next=request.path))
def ratelimit(limit, per, send_x_headers=True,
scope_func=lambda: request.remote_addr,
key_func=lambda: request.endpoint,
path=lambda: request.path):
"""
Decorator for limiting the access to a route.
Returns the function if within the limit, otherwise TooManyRequests error
"""
def decorator(f):
@wraps(f)
def rate_limited(*args, **kwargs):
try:
key = 'rate-limit/%s/%s/' % (key_func(), scope_func())
rlimit = RateLimit(key, limit, per, send_x_headers)
g._view_rate_limit = rlimit
#if over_limit is not None and rlimit.over_limit:
if rlimit.over_limit:
raise TooManyRequests
return f(*args, **kwargs)
except Exception as e:
return error.format_exception(e, target=path(),
action=f.__name__)
return update_wrapper(rate_limited, f)
return decorator