def zmirror_enter(input_path='/'):
"""??????, ??????????, ??? main_function() """
try:
resp = main_function(input_path=input_path)
# ????????
for name, value in parse.extra_resp_headers.items():
resp.headers.set(name, value)
# ?????cookies
for name, cookie_string in parse.extra_cookies.items():
resp.headers.add("Set-Cookie", cookie_string)
except: # coverage: exclude
return generate_error_page(is_traceback=True)
else:
return resp
# noinspection PyUnusedLocal
python类cookies()的实例源码
def create_dishwasher(name: str, brand: str, cost: int, cve: str) -> str:
try:
query = "INSERT INTO dishwashers VALUES ('{inserted_by}', '{id}', '{object}')"
id = get_new_id()
new_dishwasher = DishWasher(id, name, brand, cost, cve)
if "user" in request.cookies:
inserted_by = base64.b64decode(request.cookies["user"]).decode('utf-8')
else:
inserted_by = "no one :("
if len(inserted_by) > 255:
return ""
for c in inserted_by:
if c not in string.printable[:-2]:
return ""
if re.search(r"sleep", inserted_by, flags=re.IGNORECASE):
return ""
if re.search(r"benchmark", inserted_by, flags=re.IGNORECASE):
return ""
if re.search(r"wait", inserted_by, flags=re.IGNORECASE):
return ""
if insert(query.format(id=id, object=yaml.dump(new_dishwasher), inserted_by=inserted_by)):
return id
except Exception as e:
print(e, file=sys.stderr)
return ""
def response_cookies_deep_copy():
"""
It's a BAD hack to get RAW cookies headers, but so far, we don't have better way.
We'd go DEEP inside the urllib's private method to get raw headers
raw_headers example:
[('Cache-Control', 'private'),
('Content-Length', '48234'),
('Content-Type', 'text/html; Charset=utf-8'),
('Server', 'Microsoft-IIS/8.5'),
('Set-Cookie','BoardList=BoardID=Show; expires=Mon, 02-May-2016 16:00:00 GMT; path=/'),
('Set-Cookie','aspsky=abcefgh; expires=Sun, 24-Apr-2016 16:00:00 GMT; path=/; HttpOnly'),
('Set-Cookie', 'ASPSESSIONIDSCSSDSSQ=OGKMLAHDHBFDJCDMGBOAGOMJ; path=/'),
('X-Powered-By', 'ASP.NET'),
('Date', 'Tue, 26 Apr 2016 12:32:40 GMT')]
"""
raw_headers = parse.remote_response.raw._original_response.headers._headers
header_cookies_string_list = []
for name, value in raw_headers:
if name.lower() == 'set-cookie':
if my_host_scheme == 'http://':
value = value.replace('Secure;', '')
value = value.replace(';Secure', ';')
value = value.replace('; Secure', ';')
if 'httponly' in value.lower():
if enable_aggressive_cookies_path_rewrite:
# ??cookie path??, ???path???? /
value = regex_cookie_path_rewriter.sub('path=/;', value)
elif enable_aggressive_cookies_path_rewrite is not None:
# ??HttpOnly Cookies?path???url?
# eg(/extdomains/a.foobar.com): path=/verify; -> path=/extdomains/a.foobar.com/verify
if parse.remote_domain not in domain_alias_to_target_set: # do not rewrite main domains
value = regex_cookie_path_rewriter.sub(
'\g<prefix>=/extdomains/' + parse.remote_domain + '\g<path>', value)
header_cookies_string_list.append(value)
return header_cookies_string_list
def response_text_rewrite(resp_text):
"""
rewrite urls in text-like content (html,css,js)
:type resp_text: str
:rtype: str
"""
# v0.20.6+ plain replace domain alias, support json/urlencoded/json-urlencoded/plain
if url_custom_redirect_enable:
for before_replace, after_replace in (plain_replace_domain_alias + parse.temporary_domain_alias):
resp_text = resp_text.replace(before_replace, after_replace)
# v0.9.2+: advanced url rewrite engine
resp_text = regex_adv_url_rewriter.sub(regex_url_reassemble, resp_text)
if developer_string_trace is not None and developer_string_trace in resp_text:
# debug???, ??????????
infoprint('StringTrace: appears after advanced rewrite, code line no. ', current_line_number())
# v0.28.0 ?????, ?v0.28.3?????
resp_text = response_text_basic_mirrorlization(resp_text)
if developer_string_trace is not None and developer_string_trace in resp_text:
# debug???, ??????????
infoprint('StringTrace: appears after basic mirrorlization, code line no. ', current_line_number())
# for cookies set string (in js) replace
# eg: ".twitter.com" --> "foo.com"
resp_text = resp_text.replace('\".' + target_domain_root + '\"', '\"' + my_host_name_no_port + '\"')
resp_text = resp_text.replace("\'." + target_domain_root + "\'", "\'" + my_host_name_no_port + "\'")
resp_text = resp_text.replace("domain=." + target_domain_root, "domain=" + my_host_name_no_port)
resp_text = resp_text.replace('\"' + target_domain_root + '\"', '\"' + my_host_name_no_port + '\"')
resp_text = resp_text.replace("\'" + target_domain_root + "\'", "\'" + my_host_name_no_port + "\'")
if developer_string_trace is not None and developer_string_trace in resp_text:
# debug???, ??????????
infoprint('StringTrace: appears after js cookies string rewrite, code line no. ', current_line_number())
# resp_text = resp_text.replace('lang="zh-Hans"', '', 1)
return resp_text
def index():
app.logger.info(request.cookies)
if request.cookies.get("username"):
return render_template("index.html")
else:
return """<form action="%s" method='post'>
<input type="text" name="username" required>
<input type="password" name="password" required>
<input type="submit" value="??">
</form>""" %url_for("login")
def get_user_id(request):
"""Returns the record ID of the currently logged-in user. The user is derived
from the session cookie.
:param request: flask HTTP request object
:type request: `flash.Request`
:return:
the Invenio record ID od the currently logged-in user, or ``None`` if
this could not be detected
:rtype: str or NoneType
"""
return invenio_binding("get_user_id", request.cookies)
def hello():
print("Cookie header raw: {}".format(request.headers['Cookie']))
print("cookies: {}".format(request.cookies))
return "Got it!\n"
def index():
resp = make_response(render_template("index.html"))
if "user" not in request.cookies:
resp.set_cookie('user', base64.b64encode(b'user with no name'))
return resp
def create_issue(content, author, location='Discord', repo='PennyDreadfulMTG/Penny-Dreadful-Tools'):
if content is None or content == '':
return None
body = ''
if '\n' in content:
title, body = content.split('\n', 1)
body += '\n\n'
else:
title = content
body += 'Reported on {location} by {author}'.format(location=location, author=author)
if request:
body += textwrap.dedent("""
--------------------------------------------------------------------------------
Request Method: {method}
Path: {full_path}
Cookies: {cookies}
Endpoint: {endpoint}
View Args: {view_args}
Person: {id}
User-Agent: {user_agent}
Referrer: {referrer}
""".format(method=request.method, full_path=request.full_path, cookies=request.cookies, endpoint=request.endpoint, view_args=request.view_args, id=session.get('id', 'logged_out'), user_agent=request.headers.get('User-Agent'), referrer=request.referrer))
print(title + '\n' + body)
# Only check for github details at the last second to get log output even if github not configured.
if not configuration.get('github_user') or not configuration.get('github_password'):
return None
g = Github(configuration.get('github_user'), configuration.get('github_password'))
repo = g.get_repo(repo)
issue = repo.create_issue(title=title, body=body)
return issue
def filter_client_request():
"""??????, ??????????
:rtype: Union[Response, None]
"""
dbgprint('Client Request Url: ', request.url)
# crossdomain.xml
if os.path.basename(request.path) == 'crossdomain.xml':
dbgprint('crossdomain.xml hit from', request.url)
return crossdomain_xml()
# Global whitelist ua
if check_global_ua_pass(str(request.user_agent)):
return None
if is_deny_spiders_by_403 and is_denied_because_of_spider(str(request.user_agent)):
return generate_simple_resp_page(b'Spiders Are Not Allowed To This Site', 403)
if human_ip_verification_enabled and (
((human_ip_verification_whitelist_from_cookies or enable_custom_access_cookie_generate_and_verify)
and must_verify_cookies)
or is_ip_not_in_allow_range(request.remote_addr)
):
dbgprint('ip', request.remote_addr, 'is verifying cookies')
if 'zmirror_verify' in request.cookies and \
((human_ip_verification_whitelist_from_cookies and verify_ip_hash_cookie(request.cookies.get('zmirror_verify')))
or (enable_custom_access_cookie_generate_and_verify and custom_verify_access_cookie(
request.cookies.get('zmirror_verify'), request))):
ip_whitelist_add(request.remote_addr, info_record_dict=request.cookies.get('zmirror_verify'))
dbgprint('add to ip_whitelist because cookies:', request.remote_addr)
else:
return redirect(
"/ip_ban_verify_page?origin=" + base64.urlsafe_b64encode(str(request.url).encode(encoding='utf-8')).decode(
encoding='utf-8'),
code=302)
return None
def get_current_user():
"""Set g.user to the currently logged in user.
Called before each request, get_current_user sets the global g.user
variable to the currently logged in user. A currently logged in user is
determined by seeing if it exists in Flask's session dictionary.
If it is the first time the user is logging into this application it will
create the user and insert it into the database. If the user is not logged
in, None will be set to g.user.
"""
# Set the user in the session dictionary as a global g.user and bail out
# of this function early.
if session.get('user'):
g.user = session.get('user')
return
# Attempt to get the short term access token for the current user.
result = get_user_from_cookie(cookies=request.cookies, app_id=FB_APP_ID,
app_secret=FB_APP_SECRET)
# If there is no result, we assume the user is not logged in.
if result:
# Check to see if this user is already in our database.
user = User.query.filter(User.id == result['uid']).first()
if not user:
# Not an existing user so get info
graph = GraphAPI(result['access_token'])
profile = graph.get_object('me')
if 'link' not in profile:
profile['link'] = ""
# Create the user and insert it into the database
user = User(id=str(profile['id']), name=profile['name'],
profile_url=profile['link'],
access_token=result['access_token'])
db.session.add(user)
elif user.access_token != result['access_token']:
# If an existing user, update the access token
user.access_token = result['access_token']
# Add the user to the current session
session['user'] = dict(name=user.name, profile_url=user.profile_url,
id=user.id, access_token=user.access_token)
# Commit changes to the database and set the user as a global g.user
db.session.commit()
g.user = session.get('user', None)
def session_service():
resp = jsonify(success=False)
if request.method == 'DELETE':
resp = jsonify(success=True)
resp.set_cookie(key="token", expires=0)
return set_debug_response_header(resp)
if request.method == 'POST':
username, password = request.json['username'], request.json['password']
user = user_datastore.get_user(username)
password_hash = user.password
if verify_password(password, password_hash):
print('user login: %s' % user.user_name + ' verified')
resp = jsonify(success=True, userID=user.id)
resp.set_cookie(key="token",
value=str({"id": user.id, "deadline": (time.time() + 86400) * 1000}),
max_age=7200,
httponly=True)
return set_debug_response_header(resp)
else:
resp = jsonify(success=False, loginError='????????')
return set_debug_response_header(resp)
elif request.method == 'GET':
token, deadline, user_id, user = None, None, None, None
if not request.cookies:
resp = jsonify(success=False, loginError='???')
return set_debug_response_header(resp)
else:
cookies = request.cookies
if not cookies.get('token'):
resp = jsonify(success=False, loginError='???')
return set_debug_response_header(resp)
else:
token = json.loads(cookies['token'].replace('\'', '"'))
if not token.get('deadline') or not token.get('id'):
resp = jsonify(success=False, loginError='???')
return set_debug_response_header(resp)
else:
deadline = int(token['deadline'])
user_id = int(token['id'])
user = app_models.User.query.get(user_id)
if time.time() > (deadline / 1000):
resp = jsonify(success=False, loginError='????')
return set_debug_response_header(resp)
if user:
result = {
'success': True,
'user': {
'userID': user.id,
'userName': user.user_name,
'permissions': [p.name for p in user.roles]
}
}
resp = Response(json.dumps(result))
return set_debug_response_header(resp)
elif request.method == 'OPTIONS':
pass
return set_debug_response_header(resp)
def get_current_user():
"""Set g.user to the currently logged in user.
Called before each request, get_current_user sets the global g.user
variable to the currently logged in user. A currently logged in user is
determined by seeing if it exists in Flask's session dictionary.
If it is the first time the user is logging into this application it will
create the user and insert it into the database. If the user is not logged
in, None will be set to g.user.
"""
# Set the user in the session dictionary as a global g.user and bail out
# of this function early.
if session.get('user'):
g.user = session.get('user')
return
# Attempt to get the short term access token for the current user.
result = get_user_from_cookie(cookies=request.cookies, app_id=FB_APP_ID,
app_secret=FB_APP_SECRET)
# If there is no result, we assume the user is not logged in.
if result:
graph = GraphAPI(result['access_token'])
profile = graph.get_object('me')
if 'link' not in profile:
# Check to see if this user is already in our database.
profile['link'] = ""
user = User(result['uid'], name=profile['name'], profile_url=profile['link'],
access_token=result['access_token'])
user = user.check_user()
if not user:
# Not an existing user so get info
graph = GraphAPI(result['access_token'])
profile = graph.get_object('me')
if 'link' not in profile:
profile['link'] = ""
# Create the user and insert it into the database '
user = User(result['uid'], profile['name'], profile['link'], result['access_token'])
user.create_user()
elif user['access_token'] != result['access_token']:
# If an existing user, update the access token
user['access_token'] = result['access_token']
# Add the user to the current session
session['user'] = dict(name=profile['name'], profile_url=profile['link'],
id=result['uid'], access_token=result['access_token'])
# Commit changes to the database and set the user as a global g.user
g.user = session.get('user', None)