def diff_jobs():
user = root.authorized()
app = root.active_app()
selected_cases = request.query.selected_diff_cases
cases = selected_cases.rstrip(':').split(':')
cids = list()
contents = list()
for jid in cases:
cid = jobs(jid).cid
cids.append(cid)
app = jobs(jid).app
base_dir = os.path.join(user_dir, user, root.myapps[app].appname)
fn = os.path.join(base_dir, cid, root.myapps[app].simfn)
content = slurp_file(fn).splitlines(1)
contents.append(content)
import difflib
d = difflib.Differ()
result = list(d.compare(contents[0], contents[1]))
title = "diff " + cids[0] + " " + cids[1]
params = { 'cid': cid, 'contents': ' '.join(result), 'app': app, 'user': user, 'fn': title }
return template('more', params)
python类template()的实例源码
def chart(num_bars):
"""Creates a bar chart with the desired number of bars in a chart."""
if num_bars <= 0:
num_bars = 1
data = {"days": [], "bugs": [], "costs": []}
for i in range(1, num_bars + 1):
data['days'].append(i)
data['bugs'].append(random.randint(1,100))
data['costs'].append(random.uniform(1.00, 1000.00))
hover = create_hover_tool()
plot = create_bar_chart(data, "Bugs found per day", "days",
"bugs", hover)
script, div = components(plot)
return template(TEMPLATE_STRING, bars_count=num_bars,
the_div=div, the_script=script)
def get_main():
services = []
for service in config.sections():
service_status = get_service_action(service, 'status')
if service_status['status'] == 'not-found':
cls = 'active'
elif service_status['status'] == 'inactive' or service_status['status'] == 'failed':
cls = 'danger'
elif service_status['status'] == 'active':
cls = 'success'
else:
cls = 'warning'
disabled_start = True if cls == 'active' or cls == 'success' else False
disabled_stop = True if cls == 'active' or cls == 'danger' else False
disabled_restart = True if cls == 'active' or cls == 'danger' else False
services.append({'class': cls,
'disabled_start': disabled_start,
'disabled_stop': disabled_stop,
'disabled_restart': disabled_restart,
'title': config.get(service, 'title'),
'service': service})
return template('index', hostname=gethostname(), services=services)
def status():
cron = []
task = []
for item in db.cron.fetchall():
plan, status = db.status.fetch(item['id'])
last = status[1:status.find(']')]
status = status.split(' - ')[-1]
cron.append([item['path'], plan, last, status])
for item in db.taskq.fetchall():
length = db.task.length(item[0])
task.append(list(item) + [length])
context = {
'title': '????',
'cron': cron,
'task': task,
'conf': conf,
'notice': flash()
}
return template(stpl.status, context)
def hello2_name(name):
"""
????????????????????????????????????
bottle?????????????????????????????????
????./views/??????????????????????
???????????????????
??????????????????????????????
eg.
display_name="TEST NAME"
:param name:
:return:
"""
return template("sample", display_name=name)
# ??????
def chat_room():
"""
?????????
:return:
"""
# cookie??????request????
username = request.get_cookie("username")
# cookie???????????????????
if not username:
return redirect("/")
# ?????????????????????
talk_list = get_talk()
return template("chat_room", username=username, talk_list=talk_list)
def show_app(app):
# very similar to start_new_job() consider consolidating
user = root.authorized()
root.set_active(app)
# parameters for return template
if app not in root.myapps:
return template('error', err="app %s is not installed" % (app))
try:
params = {}
params.update(root.myapps[app].params)
params['cid'] = ''
params['app'] = app
params['user'] = user
params['apps'] = root.myapps
return template(os.path.join('apps', app), params)
except:
exc_type, exc_value, exc_traceback = sys.exc_info()
print traceback.print_exception(exc_type, exc_value, exc_traceback)
redirect('/app/'+app)
def showapps():
user = root.authorized()
q = request.query.q
if not q:
result = db().select(apps.ALL, orderby=apps.name)
else:
result = db(db.apps.name.contains(q, case_sensitive=False) |
db.apps.category.contains(q, case_sensitive=False) |
db.apps.description.contains(q, case_sensitive=False)).select()
# find out what apps have already been activated so that a user can't activate twice
uid = users(user=user).id
activated = db(app_user.uid == uid).select()
activated_apps = []
for row in activated:
activated_apps.append(row.appid)
if user == "admin":
configurable = True
else:
configurable = False
params = { 'configurable': configurable, 'user': user }
return template('apps', params, rows=result, activated=activated_apps)
def delete_app(appid):
user = root.authorized()
if user != 'admin':
return template('error', err="must be admin to edit app")
appname = request.forms.app
del_app_dir = request.forms.del_app_dir
try:
if user == 'admin':
# delete entry in DB
if del_app_dir == "on":
del_files = True
else:
del_files = False
root.myapps[appname].delete(appid, del_files)
else:
return template("error", err="must be admin")
except:
exc_type, exc_value, exc_traceback = sys.exc_info()
print traceback.print_exception(exc_type, exc_value, exc_traceback)
return template("error", err="failed to delete app... did the app load properly?")
redirect("/apps")
def addapp():
user = root.authorized()
if user != 'admin':
return template('error', err="must be admin to add app")
appname = request.forms.appname
input_format = request.forms.input_format
# ask for app name
category = request.forms.category
language = request.forms.language
description = request.forms.description
command = request.forms.command
preprocess = request.forms.preprocess
postprocess = request.forms.postprocess
# put in db
a = apprw.App()
#print "user:",user
a.create(appname, description, category, language,
input_format, command, preprocess, postprocess)
# load_apps() needs to be called here in case a user wants to delete
# this app just after it has been created... it is called again after
# the user uploads a sample input file
root.load_apps()
redirect('/app/'+appname)
def change_password():
# this is basically the same coding as the register function
# needs to be DRY'ed out in the future
user = root.authorized()
if config.auth and not root.authorized(): redirect('/login')
opasswd = request.forms.opasswd
pw1 = request.forms.npasswd1
pw2 = request.forms.npasswd2
# check old passwd
#user = request.forms.user
if _check_user_passwd(user, opasswd) and pw1 == pw2 and len(pw1) > 0:
u = users(user=user)
u.update_record(passwd=_hash_pass(pw1))
db.commit()
else:
return template('error', err="problem with password")
params = {}
params['user'] = user
params['alert'] = "SUCCESS: password changed"
return template('account', params)
def get_docker():
user = root.authorized()
params = {}
try:
cli = docker.Client(base_url=base_url)
images = cli.images()
conts = cli.containers(all=True)
except:
images = []
conts = []
params['alert'] = "ERROR: there was a problem talking to the Docker daemon..."
params['user'] = user
params['app'] = root.active_app()
if request.query.alert:
params['alert'] = request.query.alert
return template('docker', params, images=images, containers=conts)
def get_all_jobs():
user = root.authorized()
if not user == "admin":
return template("error", err="must be admin to use this feature")
cid = request.query.cid
app = request.query.app or root.active_app()
n = request.query.n
if not n:
n = config.jobs_num_rows
else:
n = int(n)
# sort by descending order of jobs.id
result = db((db.jobs.uid==users.id)).select(orderby=~jobs.id)[:n]
# clear notifications
users(user=user).update_record(new_shared_jobs=0)
db.commit()
params = {}
params['cid'] = cid
params['app'] = app
params['user'] = user
params['n'] = n
params['num_rows'] = config.jobs_num_rows
return template('shared', params, rows=result)
def get_aws():
user = root.authorized()
cid = request.query.cid
app = request.query.app or root.active_app()
uid = db(users.user==user).select(users.id).first()
#creds = db().select(db.aws_creds.ALL)
creds = db(aws_creds.uid==uid).select()
# look for aws instances registered by the current user
# which means first need to get the uid
instances = db(aws_instances.uid==uid).select()
params = {}
params['cid'] = cid
params['app'] = app
params['user'] = user
if request.query.status:
params['status'] = request.query.status
return template('aws', params, creds=creds, instances=instances)
def get_user_data(filepath):
user = root.authorized()
# filepath = request.query.filepath
# get the owner from the filepath
# e.g. "user_data/wes/mendel/y23022/file.dat"
path_list = filepath.split("/")
owner = path_list[0]
cid = path_list[2]
shared = jobs(cid=cid).shared
# print filepath, path_list, shared
# only allow admin to see other user's cases that have not been shared
if owner != user and shared != "True" and user != "admin":
return template('error', err="access forbidden")
return static_file(filepath, root=user_dir)
def admin_delete_user():
user = root.authorized()
if not user == "admin":
return template("error", err="must be admin to delete")
uid = request.forms.uid
if int(uid) == 0:
return template("error", err="can't delete admin user")
if request.forms.del_files == "True":
path = os.path.join(user_dir, users(uid).user)
print "deleting files in path:", path
if os.path.isdir(path): shutil.rmtree(path)
del db.users[uid]
db.commit()
redirect("/admin/show_users")
def get_stats():
root.authorized()
params = {}
# number of jobs in queued, running, and completed states
params['nq'] = db(jobs.state=='Q').count()
params['nr'] = db(jobs.state=='R').count()
params['nc'] = db(jobs.state=='C').count()
params['cpu'] = psutil.cpu_percent()
params['vm'] = psutil.virtual_memory().percent
params['disk'] = psutil.disk_usage('/').percent
params['cid'] = request.query.cid
params['app'] = request.query.app
return template("stats", params)
def main_form():
"""Main page"""
global saved_profile
drivers = collection.get_families()
if not saved_profile:
saved_profile = request.get_cookie('indiserver_profile') or 'Simulators'
profiles = db.get_profiles()
return template(os.path.join(views_path, 'form.tpl'), profiles=profiles,
drivers=drivers, saved_profile=saved_profile)
###############################################################################
# Profile endpoints
###############################################################################
def edit_item(no):
if request.GET.get('save','').strip():
edit = request.GET.get('task','').strip()
status = request.GET.get('status','').strip()
if status == 'open':
status = 1
else:
status = 0
c = conn.cursor()
c.execute("UPDATE todo SET task = ?, status = ? WHERE id LIKE ?" ,(edit, status, no))
conn.commit()
output = template("tpl/notice",msg = '??????' , no=no)
return output
else:
c = conn.cursor()
c.execute("SELECT task FROM todo WHERE id LIKE ?" ,[str(no)])
cur_data = c.fetchone()
return template('tpl/edit_task', old=cur_data, no=no)
def edit_item(no):
if request.GET.get('save','').strip():
name_r = request.GET.get('name', '').strip()
comment_r = request.GET.get('comment', '').strip()
c = conn.cursor()
c.execute("UPDATE comments SET name = ?, comment = ? WHERE id LIKE ?" ,(name_r, comment_r, no))
conn.commit()
return "<script>window.location.replace('/');</script>"
else:
c = conn.cursor()
c.execute("SELECT name, comment FROM comments WHERE id LIKE ?" ,[str(no)])
cur_data = c.fetchone()
#print cur_data
return template('tpl/edit', old=cur_data, no=no)
# ----------------------------------------------------
# ????
def cmd_toolkit():
cl_statement = request.forms.get('cl')
# xmlservice
itool = iToolKit()
itransport = iLibCall()
itool.add(iCmd5250(cl_statement, cl_statement))
itool.call(itransport)
# results from list
data = ''
for output_outer in itool.list_out():
for output_inner in output_outer:
data += output_inner
return template('cmd', data=data)
def edit():
#file_list = {
# 'filename1': 'path1',
# 'filename2': 'path2',
# 'dirname1': {
# 'filename3': 'path3',
# 'dirname2': {
# 'filename4': 'path4',
# 'filename5': 'path5'
# }
# }
#}
file_list = make_file_tree(ROOT)
file = request.GET.get('file')
if file:
with open(os.path.join(ROOT, file), 'r') as in_file:
code = in_file.read()
if file.split('.')[-1] in ['pyui', 'json']:
code = json.dumps(json.loads(code), indent=4, separators=(',', ': '))
output = template(os.path.join(IDE_REL_ROOT, 'main.tpl'), files = file_list, save_as = file, code = code)
else:
output = template(os.path.join(IDE_REL_ROOT, 'main.tpl'), files = file_list)
return output
def unsub_email(email, code):
for user in r.table('users').get_all(["email", email], index = 'identity_check').run(conn()):
if user['authlink'] == code:
user['subscribed'] = False
r.table('users').get_all(["email", email], index = 'identity_check').replace(user).run(conn())
users.flash('info', 'You\'ve been unsubscribed from further emails.')
users.login(user)
return
# for xenforo users
for user in r.table('users').filter({'email': email}).run(conn()):
if user['authlink'] == code:
user['subscribed'] = False
r.table('users').get_all(["xenforo", user['identity']]).replace(user).run(conn())
users.flash('info', 'You\'ve been unsubscribed from further emails.')
users.login(user)
return
users.flash('error', 'Authorization failed')
return template("login.htm")
def do_login():
username = request.forms.get('username')
password = get_salted_password()
current_user = user.get_by_username(username)
logined = current_user and current_user.salted_password == password
if logined:
response.set_cookie('ssl_uid', str(current_user.id))
response.set_cookie('ssl_pw', password)
return redirect('/')
return template('login',
username=username,
message='User not found.' if not current_user else 'Password is incorrect.',
salt=config.USER_SALT
)
def new_item():
if request.GET.get('save','').strip(): #?????
new=request.GET.get('task','').strip()
conn=sqlite3.connect('todo.db')
c=conn.cursor()
c.execute('INSERT INTO todo (task,status) VALUES (?,?)',(new,1))
new_id=c.lastrowid
conn.commit()
c.close()
return "<p> the new task was inserted into the database,the id is %s</p>" % new_id
else:
return template('new_task.html')
def edit_item(num):
if request.GET.get('save'):
edit=request.GET.get('task').strip()
status=request.GET.get('status').strip()
if status=='open':
status=1
else:
status=0
conn=sqlite3.connect('todo.db')
c=conn.cursor()
c.execute('UPDATE todo SET task=?,status=? WHERE id LIKE ?',(edit,status,num))
conn.commit()
return "<p>the item number %s was successfully update" % num
else:
conn = sqlite3.connect('todo.db')
c = conn.cursor()
c.execute('SELECT task FROM todo WHERE id LIKE ?',(str(num)))
cur_data=c.fetchone()
return template('edit_task.html',old=cur_data,num=num)
def show_post(permalink="notfound"):
cookie = bottle.request.get_cookie("session")
username = sessions.get_username(cookie)
permalink = cgi.escape(permalink)
print "about to query on permalink = ", permalink
post = posts.get_post_by_permalink(permalink)
if post is None:
bottle.redirect("/post_not_found")
# init comment form fields for additional comment
comment = {'name': "", 'body': "", 'email': ""}
return bottle.template("entry_template", dict(post=post, username=username, errors="", comment=comment))
# used to process a comment on a blog post
def process_signup():
email = bottle.request.forms.get("email")
username = bottle.request.forms.get("username")
password = bottle.request.forms.get("password")
verify = bottle.request.forms.get("verify")
# set these up in case we have an error case
errors = {'username': cgi.escape(username), 'email': cgi.escape(email)}
if validate_signup(username, password, verify, email, errors):
if not users.add_user(username, password, email):
# this was a duplicate
errors['username_error'] = "Username already in use. Please choose another"
return bottle.template("signup", errors)
session_id = sessions.start_session(username)
print session_id
bottle.response.set_cookie("session", session_id)
bottle.redirect("/welcome")
else:
print "user did not validate"
return bottle.template("signup", errors)
def present_welcome():
# check for a cookie, if present, then extract value
cookie = bottle.request.get_cookie("session")
username = sessions.get_username(cookie) # see if user is logged in
if username is None:
print "welcome: can't identify user...redirecting to signup"
bottle.redirect("/signup")
return bottle.template("welcome", {'username': username})
# Helper Functions
#extracts the tag from the tags form element. an experience python programmer could do this in fewer lines, no doubt
def show_post(permalink="notfound"):
cookie = bottle.request.get_cookie("session")
username = sessions.get_username(cookie)
permalink = cgi.escape(permalink)
print "about to query on permalink = ", permalink
post = posts.get_post_by_permalink(permalink)
if post is None:
bottle.redirect("/post_not_found")
# init comment form fields for additional comment
comment = {'name': "", 'body': "", 'email': ""}
return bottle.template("entry_template",
{"post": post, "username": username, "errors": "",
"comment": comment})
# used to process a comment on a blog post