def build_sitemap():
from redberry.models import RedPost, RedCategory
from apesmit import Sitemap
sm = Sitemap(changefreq='weekly')
for post in RedPost.all_published():
sm.add(url_for('redberry.show_post', slug=post.slug, _external=True), lastmod=post.updated_at.date())
for category in RedCategory.query.all():
sm.add(url_for('redberry.show_category', category_slug=category.slug, _external=True), lastmod=category.updated_at.date())
with open(os.path.join(REDBERRY_ROOT, 'static', 'redberry', 'sitemap.xml'), 'w') as f:
sm.write(f)
flash("Sitemap created.", 'success')
return redirect(url_for('redberry.home'))
##############
# ADMIN ROUTES
##############
python类path()的实例源码
def after_request(response):
response.headers.add('Access-Control-Allow-Methods', 'GET, POST')
response.headers.add('Access-Control-Allow-Credentials', 'true')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type, *')
response.headers.add('Cache-Control', 'no-cache')
response.headers.add('Cache-Control', 'no-store')
if api.auth.is_logged_in():
if 'token' in session:
response.set_cookie('token', session['token'], domain=app.config['SESSION_COOKIE_DOMAIN'])
else:
csrf_token = api.common.token()
session['token'] = csrf_token
response.set_cookie('token', csrf_token, domain=app.config['SESSION_COOKIE_DOMAIN'])
# JB: This is a hack. We need a better solution
if request.path[0:19] != "/api/autogen/serve/":
response.mimetype = 'application/json'
return response
def make_pagination_headers(limit, curpage, total, link_header=True):
"""Return Link Hypermedia Header."""
lastpage = int(math.ceil(1.0 * total / limit) - 1)
headers = {'X-Total-Count': str(total), 'X-Limit': str(limit),
'X-Page-Last': str(lastpage), 'X-Page': str(curpage)}
if not link_header:
return headers
base = "{}?%s".format(request.path)
links = {}
links['first'] = base % urlencode(dict(request.args, **{PAGE_ARG: 0}))
links['last'] = base % urlencode(dict(request.args, **{PAGE_ARG: lastpage}))
if curpage:
links['prev'] = base % urlencode(dict(request.args, **{PAGE_ARG: curpage - 1}))
if curpage < lastpage:
links['next'] = base % urlencode(dict(request.args, **{PAGE_ARG: curpage + 1}))
headers['Link'] = ",".join(['<%s>; rel="%s"' % (v, n) for n, v in links.items()])
return headers
# pylama:ignore=R0201
def iam_sts_credentials(api_version, requested_role, junk=None):
if not _supports_iam(api_version):
return passthrough(request.path)
try:
role_params = roles.get_role_params_from_ip(
request.remote_addr,
requested_role=requested_role
)
except roles.UnexpectedRoleError:
msg = "Role name {0} doesn't match expected role for container"
log.error(msg.format(requested_role))
return '', 404
log.debug('Providing assumed role credentials for {0}'.format(role_params['name']))
assumed_role = roles.get_assumed_role_credentials(
role_params=role_params,
api_version=api_version
)
return jsonify(assumed_role)
def ip_whitelist_add(ip_to_allow, info_record_dict=None):
"""??ip????, ?????"""
if ip_to_allow in single_ip_allowed_set:
return
dbgprint('ip white added', ip_to_allow, 'info:', info_record_dict)
single_ip_allowed_set.add(ip_to_allow)
is_ip_not_in_allow_range.cache_clear()
append_ip_whitelist_file(ip_to_allow)
# dbgprint(single_ip_allowed_set)
try:
with open(zmirror_root(human_ip_verification_whitelist_log), 'a', encoding='utf-8') as fp:
fp.write(datetime.now().strftime('%Y-%m-%d %H:%M:%S') + " " + ip_to_allow
+ " " + str(request.user_agent)
+ " " + repr(info_record_dict) + "\n")
except: # coverage: exclude
errprint('Unable to write log file', os.path.abspath(human_ip_verification_whitelist_log))
traceback.print_exc()
def check_auth(func):
"""
This decorator for routes checks that the user is authorized (or that no login is required).
If they haven't, their intended destination is stored and they're sent to get authorized.
It has to be placed AFTER @app.route() so that it can capture `request.path`.
"""
if 'login' not in conf:
return func
# inspired by <https://flask-login.readthedocs.org/en/latest/_modules/flask_login.html#login_required>
@functools.wraps(func)
def decorated_view(*args, **kwargs):
if current_user.is_anonymous:
print('unauthorized user visited {!r}'.format(request.path))
session['original_destination'] = request.path
return redirect(url_for('get_authorized'))
print('{} visited {!r}'.format(current_user.email, request.path))
assert current_user.email.lower() in conf.login['whitelist'], current_user
return func(*args, **kwargs)
return decorated_view
def after_request(response):
response.headers.add('Access-Control-Allow-Methods', 'GET, POST')
response.headers.add('Access-Control-Allow-Credentials', 'true')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type, *')
response.headers.add('Cache-Control', 'no-cache')
response.headers.add('Cache-Control', 'no-store')
if api.auth.is_logged_in():
if 'token' in session:
response.set_cookie('token', session['token'])
else:
csrf_token = api.common.token()
session['token'] = csrf_token
response.set_cookie('token', csrf_token)
# JB: This is a hack. We need a better solution
if request.path[0:19] != "/api/autogen/serve/":
response.mimetype = 'appication/json'
return response
def convert(content_type, content):
cachekey = request.path
cachevalue = cache.get(cachekey)
if cachevalue is None:
content = decode(content)
content_type = decode(content_type).decode()
if content_type.startswith('image/svg+xml'):
content = __svg2png(content)
content_type = 'image/png'
cache.set(cachekey, (content, content_type), timeout=CACHE_RETENTION)
else:
content = cachevalue[0]
content_type = cachevalue[1]
response = make_response(content)
response.content_type = content_type
response.cache_control.max_age = CACHE_RETENTION
return response
def init_utils(app):
app.jinja_env.filters['unix_time'] = unix_time
app.jinja_env.filters['unix_time_millis'] = unix_time_millis
app.jinja_env.filters['long2ip'] = long2ip
app.jinja_env.globals.update(pages=pages)
app.jinja_env.globals.update(can_register=can_register)
app.jinja_env.globals.update(mailserver=mailserver)
app.jinja_env.globals.update(ctf_name=ctf_name)
@app.context_processor
def inject_user():
if authed():
return dict(session)
return dict()
@app.before_request
def needs_setup():
if request.path == '/setup' or request.path.startswith('/static'):
return
if not is_setup():
return redirect('/setup')
def after_request(response):
response.headers.add('Access-Control-Allow-Methods', 'GET, POST')
response.headers.add('Access-Control-Allow-Credentials', 'true')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type, *')
response.headers.add('Cache-Control', 'no-cache')
response.headers.add('Cache-Control', 'no-store')
if api.auth.is_logged_in():
if 'token' in session:
response.set_cookie('token', session['token'])
else:
csrf_token = api.common.token()
session['token'] = csrf_token
response.set_cookie('token', csrf_token)
# JB: This is a hack. We need a better solution
if request.path[0:19] != "/api/autogen/serve/":
response.mimetype = 'appication/json'
return response
def _atlassian_jwt_post_token(self):
if not getattr(g, 'ac_client', None):
return dict()
args = request.args.copy()
try:
del args['jwt']
except KeyError:
pass
signature = encode_token(
'POST',
request.path + '?' + urlencode(args),
g.ac_client.clientKey,
g.ac_client.sharedSecret)
args['jwt'] = signature
return dict(atlassian_jwt_post_url=request.path + '?' + urlencode(args))
def log_exception(self, exc_info):
self.logger.error("""
Path: %s
HTTP Method: %s
Client IP Address: %s
User Agent: %s
User Platform: %s
User Browser: %s
User Browser Version: %s
GET args: %s
view args: %s
URL: %s
""" % (
request.path,
request.method,
request.remote_addr,
request.user_agent.string,
request.user_agent.platform,
request.user_agent.browser,
request.user_agent.version,
dict(request.args),
request.view_args,
request.url
), exc_info=exc_info)
def log_request(code='-'):
proto = request.environ.get('SERVER_PROTOCOL')
msg = request.method + ' ' + request.path + ' ' + proto
code = str(code)
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
logger.info('%s - - [%s] "%s" %s', request.remote_addr, log_date_time_string(), msg, code)
def authenticated(fn):
"""Mark a route as requiring authentication."""
@wraps(fn)
def decorated_function(*args, **kwargs):
if not session.get('is_authenticated'):
return redirect(url_for('login', next=request.url))
if request.path == '/logout':
return fn(*args, **kwargs)
if (not session.get('name') or
not session.get('email') or
not session.get('institution')) and request.path != '/profile':
return redirect(url_for('profile', next=request.url))
return fn(*args, **kwargs)
return decorated_function
def __call__(self, func):
@wraps(func)
def decorator(*args, **kwargs):
response = func(*args, **kwargs)
if not USE_CACHE:
return response
for path in self.routes:
route = '%s,%s,%s' % (path, json.dumps(self.args), None)
Cache.delete(unicode(route))
user = kwargs.get('user', None) if self.clear_for_user else None
Logger.debug(unicode('clear %s from cache' % (route)))
if user:
route = '%s,%s,%s' % (path, json.dumps(self.args), user)
Cache.delete(unicode(route))
Logger.debug(unicode('clear %s from cache' % (route)))
return response
return decorator
def sitemap():
conn,curr = sphinx_conn()
querysql='SELECT info_hash,create_time FROM film order by create_time desc limit 100'
curr.execute(querysql)
rows=curr.fetchall()
sphinx_close(curr,conn)
sitemaplist=[]
for row in rows:
info_hash = row['info_hash']
mtime = datetime.datetime.fromtimestamp(int(row['create_time'])).strftime('%Y-%m-%d')
url = domain+'hash/{}.html'.format(info_hash)
url_xml = '<url><loc>{}</loc><lastmod>{}</lastmod><changefreq>daily</changefreq><priority>0.8</priority></url>'.format(url, mtime)
sitemaplist.append(url_xml)
xml_content = '<?xml version="1.0" encoding="UTF-8"?><urlset>{}</urlset>'.format("".join(x for x in sitemaplist))
with open('static/sitemap.xml', 'wb') as f:
f.write(xml_content)
f.close()
return send_from_directory(app.static_folder, request.path[1:])
def chan_friends(channel):
"""
If you GET this endpoint, go to /api/v1/channel/<channel>/friend
with <channel> replaced for the channel of the friends you want to get
<channel> can either be an int that matches the channel, or a string
that matches the owner's username
"""
# model = request.path.split("/")[-1]
model = "Friend"
if channel.isdigit():
fields = {"channelId": int(channel)}
else:
fields = {"owner": channel.lower()}
packet, code = generate_response(
model,
request.path,
request.method,
request.values,
data=results,
fields=fields
)
return make_response(jsonify(packet), code)
# There was an error!
# if not str(code).startswith("2"):
# return make_response(jsonify(packet), code)
# NOTE: Not needed currently, but this is how you would check
# TODO: Fix this endpoint to remove timing elements (friends are forever)
# TODO: Use Object.update(**changes) instead of Object(**updated_object).save()
def chan_messages(channel):
"""
If you GET this endpoint, go to /api/v1/channel/<channel>/messages
with <channel> replaced for the messages you want to get
"""
model = "Message"
if channel.isdigit():
fields = {"channelId": int(channel)}
else:
fields = {"owner": channel.lower()}
packet, code = generate_response(
model,
request.path,
request.method,
request.values,
fields=fields
)
return make_response(jsonify(packet), code)
def user_quotes(channel):
"""
If you GET this endpoint, go to /api/v1/channel/<channel>/quote
with <channel> replaced for the channel you want to get quotes for
"""
model = "quote"
if channel.isdigit():
fields = {"channelId": int(channel), "deleted": False}
else:
fields = {"owner": channel.lower(), "deleted": False}
packet, code = generate_response(
model,
request.path,
request.method,
request.values,
fields=fields
)
return make_response(jsonify(packet), code)
def user_commands(channel):
"""
If you GET this endpoint, simply go to /api/v1/channel/<channel>/command
with <channel> replaced for the channel you want to get commands for
"""
model = "Command"
if channel.isdigit():
fields = {"channelId": int(channel), "deleted": False}
else:
fields = {"channelName": channel.lower(), "deleted": False}
packet, code = generate_response(
model,
request.path,
request.method,
request.values,
fields=fields
)
return make_response(jsonify(packet), code)
def after_request_log(response):
name = dns_resolve(request.remote_addr)
current_app.logger.warn(u"""[client {ip} {host}] {http} "{method} {path}" {status}
Request: {method} {path}
Version: {http}
Status: {status}
Url: {url}
IP: {ip}
Hostname: {host}
Agent: {agent_platform} | {agent_browser} | {agent_browser_version}
Raw Agent: {agent}
""".format(method=request.method,
path=request.path,
url=request.url,
ip=request.remote_addr,
host=name if name is not None else '?',
agent_platform=request.user_agent.platform,
agent_browser=request.user_agent.browser,
agent_browser_version=request.user_agent.version,
agent=request.user_agent.string,
http=request.environ.get('SERVER_PROTOCOL'),
status=response.status))
return response
def request_start():
content_type = request.headers.get('Accept') or ''
real_ip = request.headers.get('X-Real-Ip') or ''
Log.info(request.path+' '+format_args(request.args)\
+' '+real_ip\
+' '+content_type)
#Test content_type
# if content_type and content_type not in AVAILABLE_CONTENT_TYPES:
# results = {'message' : 'Content-Type not supported',
# 'message_code' : 8
# }
# return {'error' : 'content-type'}
# return self.render(results, status_code = 405)
def cached(key='view{path}'):
""" Sets up a function to have cached results """
def decorator(func):
""" Gets a cached value or calculates one """
@wraps(func)
def decorated_function(*args, **kwargs):
ckey = cache_key(key, *args, **kwargs)
value = cache.get(ckey)
if value is not None:
return value
print("DB HIT: \"{}\"".format(ckey))
value = func(*args, **kwargs)
cache.set(ckey, value)
return value
return decorated_function
return decorator
def post(year, month, day, post_name):
rel_url = request.path[len('/post/'):]
fixed_rel_url = storage.fix_post_relative_url(rel_url)
if rel_url != fixed_rel_url:
return redirect(request.url_root + 'post/' + fixed_rel_url) # it's not the correct relative url, so redirect
post_ = storage.get_post(rel_url, include_draft=False)
if post_ is None:
abort(404)
post_d = post_.to_dict()
del post_d['raw_content']
post_d['content'] = get_parser(post_.format).parse_whole(post_.raw_content)
post_d['content'], post_d['toc'], post_d['toc_html'] = parse_toc(post_d['content'])
post_d['url'] = make_abs_url(post_.unique_key)
post_ = post_d
return custom_render_template(post_['layout'] + '.html', entry=post_)
def entity(entity_id):
entity = load_entity(entity_id, request.auth)
if entity is None:
raise NotFound()
# load possible duplicates via fingerprint expansion
# fingerprints = expand_fingerprints(entity.fingerprints, request.auth)
query = Query(request.args, prefix='duplicates_', path=request.path,
limit=5)
# query.add_facet('countries', 'Countries', country_label)
duplicates = search_duplicates(entity.id, entity.fingerprints, query,
request.auth)
# load links
query = Query(request.args, prefix='links_', path=request.path,
limit=10)
query.add_facet('schemata', 'Types', link_schema_label)
query.add_facet('remote.countries', 'Countries', country)
links = search_links(entity, query, request.auth)
return render_template("entity.html", entity=entity, links=links,
duplicates=duplicates)
def store_form(form):
entry = FormEntry(form_id=g.page.id)
for f in form:
field = Field.query.filter_by(form_id=g.page.id).filter_by(name=f.name).one_or_none()
if field is None:
continue
field_entry = FieldEntry(field_id=field.id)
data = f.data
if field.type == 'file_input':
file_data = request.files[field.name]
filename = '%s-%s-%s.%s' %(field.name, date_stamp(), str(time.time()).replace('.', ''), os.path.splitext(file_data.filename)[-1])
path = os.path.join(app.config['FORM_UPLOADS_PATH'], secure_filename(filename))
file_data.save(path)
data = filename
field_entry.value = data
db.session.add(field_entry)
entry.fields.append(field_entry)
db.session.add(entry)
db.session.commit()
def route():
args = request.json
args = args if args else {}
cfg = args.get('cfg', None)
log_request(
args.get('user', 'unknown'),
args.get('hostname', 'unknown'),
request.remote_addr,
request.method,
request.path,
args)
try:
endpoint = create_endpoint(request.method, cfg, args)
json, status = endpoint.execute()
except AutocertError as ae:
status = 500
json = dict(errors={ae.name: ae.message})
return make_response(jsonify(json), status)
if not json:
raise EmptyJsonError(json)
return make_response(jsonify(json), status)
def _create_etag(path, recursive, lm=None):
if lm is None:
lm = _create_lastmodified(path, recursive)
if lm is None:
return None
hash = hashlib.sha1()
hash.update(str(lm))
hash.update(str(recursive))
if path.endswith("/files") or path.endswith("/files/sdcard"):
# include sd data in etag
hash.update(repr(sorted(printer.get_sd_files(), key=lambda x: x[0])))
return hash.hexdigest()
def get_full_path(ver):
""" L2-L4 Path Query, use onePath for single paths """
onepath = False
depth = '20'
error = version_chk(ver, ['v1.0', 'v1.1', 'v2'])
if error:
return error
if 'onepath' in request.args:
if request.args['onepath'] == "True":
onepath = True
if 'depth' in request.args:
depth = request.args['depth']
try:
return jsonify(upgrade_api(nglib.query.path.get_full_path(request.args['src'], \
request.args['dst'], {"onepath": onepath, "depth": depth}), ver))
except ResultError as e:
return jsonify(errors.json_error(e.expression, e.message))
def get_routed_path(ver):
""" Routed Paths, accepts vrf """
onepath = False
depth = '20'
vrf = 'default'
error = version_chk(ver, ['v1.0', 'v1.1', 'v2'])
if error:
return error
if 'onepath' in request.args:
if request.args['onepath'] == "True":
onepath = True
if 'depth' in request.args:
depth = request.args['depth']
if 'vrf' in request.args:
vrf = request.args['vrf']
try:
return jsonify(upgrade_api(nglib.query.path.get_routed_path(request.args['src'], \
request.args['dst'], {"onepath": onepath, "depth": depth, \
"VRF": vrf}), ver))
except ResultError as e:
return jsonify(errors.json_error(e.expression, e.message))