def web_node_button_action(name=None):
# Either way, name has no leading slash.
if 'unbind' in request.form:
delete_node_binding(name)
elif 'bind' in request.form:
manname = request.form['manifest_sel']
manifest = BP.manifest_lookup(manname)
build_node(manifest, name)
return redirect(request.base_url) # Eliminates browser caching of POST
python类base_url()的实例源码
def root():
return render_template(
'index.tpl',
api_version=mainapp.config['API_VERSION'],
base_url=request.base_url,
mirror=mainapp.config['L4TM_MIRROR'],
release=mainapp.config['L4TM_RELEASE'],
rules=mainapp.config['rules'],
url_root=request.url_root,
coordinate=mainapp.config['tmconfig'].racks[1]['coordinate'])
###########################################################################
# Networking stuff
def article(article_id):
comment_form = CommentForm()
if comment_form.validate_on_submit():
from MagicPress.utils.tasks import send_async_email
new_comment = Comment(username=comment_form.name.data)
new_comment.text = comment_form.text.data
new_comment.create_time = datetime.utcnow()
new_comment.site = comment_form.site.data
new_comment.email = comment_form.email.data
new_comment.ip = request.remote_addr
new_comment.language = request.accept_languages.best
new_comment.os = request.user_agent.platform
new_comment.browser = request.user_agent.browser
new_comment.article_id = str(request.base_url).split('/')[-1]
info = get_ip_info(request.remote_addr)
new_comment.location = info['country']+info['region']+info['city']
new_comment.network = info['isp']
if gfw.filter(comment_form.text.data) or gfw.filter(comment_form.name.data):
new_comment.hidden = False
flash(u'????????????')
else:
new_comment.hidden = True
message_details = {}
message_details['subject'] = 'New Comment'
message_details['recipients'] = [current_app.config['ADMIN_EMAIL']]
message_details['body'] = "Name: %s\nEmail: %s\nSite: %s\nLocation: %s\n" \
"Hihhen: %s\nText:\n\n %s" % (
comment_form.name.data, current_app.config['ADMIN_EMAIL'], comment_form.site.data,
info['country']+info['region']+info['city'], str(new_comment.hidden), comment_form.text.data)
send_async_email.delay(message_details)
db.session.add(new_comment)
db.session.commit()
the_article = Article.query.filter_by(id=article_id).first()
next_article = db.session.query(Article).filter(Article.id < article_id, Article.state == True).order_by(Article.id.desc()).first()
pre_article = db.session.query(Article).filter(Article.id > article_id, Article.state == True).order_by(Article.id.asc()).first()
comments = Comment.query.filter_by(article_id=article_id, hidden=True).all()
return render_template(get_theme() + '/article.html', article=the_article, next_article=next_article,
pre_article=pre_article, comment_form=comment_form, comments=comments)
def get_swaggerui_blueprint(base_url, api_url, config=None, oauth_config=None):
swagger_ui = Blueprint('swagger_ui',
__name__,
static_folder='dist',
template_folder='templates')
default_config = {
'app_name': 'Swagger UI',
'dom_id': '#swagger-ui',
'url': api_url,
'layout': 'StandaloneLayout'
}
if config:
default_config.update(config)
fields = {
# Some fields are used directly in template
'base_url': base_url,
'app_name': default_config.pop('app_name'),
# Rest are just serialized into json string for inclusion in the .js file
'config_json': json.dumps(default_config),
}
if oauth_config:
fields['oauth_config_json'] = json.dumps(oauth_config)
@swagger_ui.route('/')
@swagger_ui.route('/<path:path>')
def show(path=None):
if not path or path == 'index.html':
if not default_config.get('oauth2RedirectUrl', None):
default_config.update(
{"oauth2RedirectUrl": "%s/oauth2-redirect.html" % request.base_url}
)
fields['config_json'] = json.dumps(default_config)
return render_template('index.template.html', **fields)
else:
return send_from_directory(
# A bit of a hack to not pollute the default /static path with our files.
os.path.join(
swagger_ui.root_path,
swagger_ui._static_folder
),
path
)
return swagger_ui
def applications():
"""Mock of the YARN cluster apps REST resource."""
if 'last' in request.args:
return jsonify(redis.get(request.base_url))
d = st.fixed_dictionaries({
'allocatedMB': st.integers(-1),
'allocatedVCores': st.integers(-1),
'amContainerLogs': st.text(),
'amHostHttpAddress': st.text(),
'applicationTags': st.text(),
'applicationType': st.sampled_from(['MAPREDUCE', 'SPARK']),
'clusterId': st.integers(0),
'diagnostics': st.text(),
'elapsedTime': st.integers(0),
'finalStatus': st.sampled_from(['UNDEFINED', 'SUCCEEDED', 'FAILED', 'KILLED']),
'finishedTime': st.integers(0),
'id': st.text(string.ascii_letters, min_size=5, max_size=25),
'memorySeconds': st.integers(0),
'name': st.text(min_size=5),
'numAMContainerPreempted': st.integers(0),
'numNonAMContainerPreempted': st.integers(0),
'preemptedResourceMB': st.integers(0),
'preemptedResourceVCores': st.integers(0),
'progress': st.floats(0, 100),
'queue': st.text(),
'runningContainers': st.integers(-1),
'startedTime': st.integers(0),
'state': st.sampled_from(['NEW', 'NEW_SAVING', 'SUBMITTED', 'ACCEPTED', 'RUNNING', 'FINISHED', 'FAILED', 'KILLED']),
'trackingUI': st.text(),
'trackingUrl': st.just(os.environ['YARN_ENDPOINT']),
'user': st.text(),
'vcoreSeconds': st.integers(0)
})
result = json.dumps({
'apps': {
'app': st.lists(d, min_size=4, average_size=10).example()
}
})
redis.set(request.base_url, result)
return jsonify(result)
def mapreduce_application():
"""Mock of the mapreduce jobs REST resource."""
if 'last' in request.args:
return jsonify(redis.get(request.base_url))
d = st.fixed_dictionaries({
'startTime': st.integers(0),
'finishTime': st.integers(0),
'elapsedTime': st.integers(0),
'id': st.integers(0),
'name': st.text(),
'user': st.text(),
'state': st.sampled_from(['NEW', 'SUCCEEDED', 'RUNNING', 'FAILED', 'KILLED']),
'mapsTotal': st.integers(0),
'mapsCompleted': st.integers(0),
'reducesTotal': st.integers(0),
'reducesCompleted': st.integers(0),
'mapProgress': st.floats(0, 100),
'reduceProgress': st.floats(0, 100),
'mapsPending': st.integers(0),
'mapsRunning': st.integers(0),
'reducesPending': st.integers(0),
'reducesRunning': st.integers(0),
'uberized': st.booleans(),
'diagnostics': st.text(),
'newReduceAttempts': st.integers(0),
'runningReduceAttempts': st.integers(0),
'failedReduceAttempts': st.integers(0),
'killedReduceAttempts': st.integers(0),
'successfulReduceAttempts': st.integers(0),
'newMapAttempts': st.integers(0),
'runningMapAttempts': st.integers(0),
'failedMapAttempts': st.integers(0),
'killedMapAttempts': st.integers(0),
'successfulMapAttempts': st.integers(0)
})
result = json.dumps({
'jobs': {
'job': st.lists(d, average_size=3).example()
}
})
redis.set(request.base_url, result)
return jsonify(result)
def list(select, request):
'''Returns a styled list from the data passed. The pagination style
is defined in the request data. The default values of `number` and
`page` will be 10 and 1, respectively. If these parameters are passed
as data in the request, then the values will be updated accordingly.
Keyword arguments:
select -- A database query of data.
request -- A request of some type.
'''
number = 10
page = 1
for key in request.values:
if key == 'number':
number = int(request.values.get('number'))
elif key == 'page':
page = int(request.values.get('page'))
'''Call peewee paginate method on the query.'''
arr = []
for i in select.paginate(page, number):
arr.append(i.to_dict())
'''By default, `next_page_path` and `prev_page_path` are None.'''
next_page_path = None
prev_page_path = None
base_path = request.base_url + "?page="
end_path = "&number=" + str(number)
'''Update `next_page_path` and `prev_page_path` if there is data on
either a next or previous page from the pagination.
'''
if len(arr) == number:
next_page_path = base_path + str(page + 1) + end_path
if page > 1:
prev_page_path = base_path + str(page - 1) + end_path
'''Return an array of dicts, containing the data and pagination.'''
data = [dict(data=arr)]
data.append(dict(paging=dict(next=next_page_path,
previous=prev_page_path)))
return data
def in_cache():
url = request.base_url.replace("/cached.gif", "/")
path = request.path.replace("/cached.gif", "/")
base_url = request.url_root
# select view from plugins and fall back on default view if no plugin will handle it
ui_plugins = pluginManager.get_implementations(octoprint.plugin.UiPlugin,
sorting_context="UiPlugin.on_ui_render")
for plugin in ui_plugins:
if plugin.will_handle_ui(request):
ui = plugin._identifier
key = _cache_key(plugin._identifier,
url=url,
additional_key_data=plugin.get_ui_additional_key_data_for_cache)
unless = _preemptive_unless(url, additional_unless=plugin.get_ui_preemptive_caching_additional_unless)
data = _preemptive_data(plugin._identifier,
path=path,
base_url=base_url,
data=plugin.get_ui_data_for_preemptive_caching,
additional_request_data=plugin.get_ui_additional_request_data_for_preemptive_caching)
break
else:
ui = "_default"
key = _cache_key("_default", url=url)
unless = _preemptive_unless(url)
data = _preemptive_data("_default", path=path, base_url=base_url)
response = make_response(bytes(base64.b64decode("R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7")))
response.headers["Content-Type"] = "image/gif"
if unless or not preemptiveCache.has_record(data, root=path):
_logger.info("Preemptive cache not active for path {}, ui {} and data {!r}, signaling as cached".format(path, ui, data))
return response
elif util.flask.is_in_cache(key):
_logger.info("Found path {} in cache (key: {}), signaling as cached".format(path, key))
return response
elif util.flask.is_cache_bypassed(key):
_logger.info("Path {} was bypassed from cache (key: {}), signaling as cached".format(path, key))
return response
else:
_logger.debug("Path {} not yet cached (key: {}), signaling as missing".format(path, key))
return abort(404)
def main_page():
"""Renders the main page. When this page is shown, we create a new
channel to push asynchronous updates to the client."""
user = users.get_current_user()
game_key = request.args.get('g')
if not game_key:
game_key = user.user_id()
game = Game(id=game_key, userX=user, moveX=True, board=' '*9)
game.put()
else:
game = Game.get_by_id(game_key)
if not game:
return 'No such game', 404
if not game.userO:
game.userO = user
game.put()
# [START pass_token]
# choose a unique identifier for channel_id
channel_id = user.user_id() + game_key
# encrypt the channel_id and send it as a custom token to the
# client
# Firebase's data security rules will be able to decrypt the
# token and prevent unauthorized access
client_auth_token = create_custom_token(channel_id)
_send_firebase_message(channel_id, message=game.to_json())
# game_link is a url that you can open in another browser to play
# against this player
game_link = '{}?g={}'.format(request.base_url, game_key)
# push all the data to the html template so the client will
# have access
template_values = {
'token': client_auth_token,
'channel_id': channel_id,
'me': user.user_id(),
'game_key': game_key,
'game_link': game_link,
'initial_message': urllib.unquote(game.to_json())
}
return flask.render_template('fire_index.html', **template_values)
# [END pass_token]
def add_pillar_request_to_notification(notification):
"""Adds request metadata to the Bugsnag notifications.
This basically copies bugsnag.flask.add_flask_request_to_notification,
but is altered to include Pillar-specific metadata.
"""
from flask import request, session
from bugsnag.wsgi import request_path
import pillar.auth
if not request:
return
notification.context = "%s %s" % (request.method,
request_path(request.environ))
if 'id' not in notification.user:
user: pillar.auth.UserClass = pillar.auth.current_user._get_current_object()
notification.set_user(id=user.user_id,
email=user.email,
name=user.username)
notification.user['roles'] = sorted(user.roles)
notification.user['capabilities'] = sorted(user.capabilities)
session_dict = dict(session)
for key in SESSION_KEYS_TO_REMOVE:
try:
del session_dict[key]
except KeyError:
pass
notification.add_tab("session", session_dict)
notification.add_tab("environment", dict(request.environ))
remote_addr = request.remote_addr
forwarded_for = request.headers.get('X-Forwarded-For')
if forwarded_for:
remote_addr = f'{forwarded_for} (proxied via {remote_addr})'
notification.add_tab("request", {
"method": request.method,
"url": request.base_url,
"headers": dict(request.headers),
"params": dict(request.form),
"data": {'request.data': request.data,
'request.json': request.get_json()},
"endpoint": request.endpoint,
"remote_addr": remote_addr,
})
def web_node_status(name=None):
'''name will never have a leading / but now always needs one.'''
name = '/' + name
try:
node = BP.nodes[name][0]
ESPURL = None # testable value in Jinja2
ESPsizeMB = 0
installsh = installlog = None
status = get_node_status(name)
if status is not None:
if status['status'] == 'ready':
ESPpath = '%s/%s/%s.ESP' % (
BP.config['TFTP_IMAGES'], node.hostname, node.hostname)
if os.path.isfile(ESPpath):
prefix = request.url.split(_ERS_element)[0]
ESPURL = '%s%s/ESP/%s' % (
prefix, _ERS_element, node.hostname)
ESPsizeMB = os.stat(ESPpath).st_size >> 20
if status['status'] in ('building', 'ready'):
installpath = '%s/%s/untar/root' % (
BP.config['FILESYSTEM_IMAGES'], node.hostname)
try:
with open(installpath + '/install.sh') as f:
installsh = f.read()
with open(installpath + '/install.log') as f:
installlog = f.read()
except Exception as e:
installsh = installlog = None
pass
# all manifests' names with namespace
manifests = sorted(BP.blueprints['manifest'].get_all())
return render_template(
_ERS_element + '.tpl',
label=__doc__,
node=node,
manifests=manifests,
status=status,
base_url=request.url.split(name)[0],
ESPURL=ESPURL,
ESPsizeMB=ESPsizeMB,
installsh=installsh,
installlog=installlog
)
except Exception as e:
return make_response('Kaboom: %s' % str(e), 404)
return_styles.py 文件源码
项目:holbertonschool_airbnb_clone
作者: johndspence
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def list(select, request):
'''Returns a styled list from the data passed. The pagination style
is defined in the request data. The default values of `number` and
`page` will be 10 and 1, respectively. If these parameters are passed
as data in the request, then the values will be updated accordingly.
Keyword arguments:
select -- A database query of data.
request -- A request of some type.
'''
number = 10
page = 1
for key in request.values:
if key == 'number':
number = int(request.values.get('number'))
elif key == 'page':
page = int(request.values.get('page'))
'''Call peewee paginate method on the query.'''
arr = []
for i in select.paginate(page, number):
arr.append(i.to_dict())
'''By default, `next_page_path` and `prev_page_path` are None.'''
next_page_path = None
prev_page_path = None
base_path = request.base_url + "?page="
end_path = "&number=" + str(number)
'''Update `next_page_path` and `prev_page_path` if there is data on
either a next or previous page from the pagination.
'''
if len(arr) == number:
next_page_path = base_path + str(page + 1) + end_path
if page > 1:
prev_page_path = base_path + str(page - 1) + end_path
'''Return an array of dicts, containing the data and pagination.'''
data = [dict(data=arr)]
data.append(dict(paging=dict(next=next_page_path,
previous=prev_page_path)))
return data