def query():
if(request.method == 'POST'):
organisation = request.form['organisation']
email_address = request.form['email_address']
filename = organisation + ".html"
info = db.session.query(User.github_username, User.name).filter_by(organisation = organisation).all()
if(info == []):
job = q.enqueue_call(
func="main.save_info", args=(organisation, email_address, ), result_ttl=5000, timeout=600
)
flash("We shall notify you at " + email_address + " when the processing is complete")
else:
lists = []
for i in info:
lists.append([str(i.github_username), str(i.name)])
get_nodes.creating_objs(lists, organisation)
return render_template(filename, organisation=str(organisation)+'.json')
return render_template('query.html')
python类method()的实例源码
def is_hot_dog():
if request.method == 'POST':
if not 'file' in request.files:
return jsonify({'error': 'no file'}), 400
# Image info
img_file = request.files.get('file')
img_name = img_file.filename
mimetype = img_file.content_type
# Return an error if not a valid mimetype
if not mimetype in valid_mimetypes:
return jsonify({'error': 'bad-type'})
# Write image to static directory and do the hot dog check
img_file.save(os.path.join(app.config['UPLOAD_FOLDER'], img_name))
hot_dog_conf = rekognizer.get_confidence(img_name)
# Delete image when done with analysis
os.remove(os.path.join(app.config['UPLOAD_FOLDER'], img_name))
is_hot_dog = 'false' if hot_dog_conf == 0 else 'true'
return_packet = {
'is_hot_dog': is_hot_dog,
'confidence': hot_dog_conf
}
return jsonify(return_packet)
def add():
if request.method == 'POST':
if request.form['submit'] == 'Add':
addr = request.form['addr'].lstrip().rstrip()
f = io.open('blastlist.txt', 'a', encoding="utf-8")
f.write(addr.decode('utf-8') + u'\r\n')
f.close()
return render_template('listadded.html',addr=addr)
elif request.form['submit'] == 'Remove':
addr = request.form['addr'].lstrip().rstrip()
f = io.open('blastlist.txt', 'r', encoding="utf-8")
lines = f.readlines()
f.close()
f = io.open('blastlist.txt', 'w', encoding="utf-8")
for line in lines:
if addr not in line:
f.write(line.decode('utf-8'))
f.close()
return render_template('listremoved.html',addr=addr)
def login():
""" This login function checks if the username & password
match the admin.db; if the authentication is successful,
it passes the id of the user into login_user() """
if request.method == "POST" and \
"username" in request.form and \
"password" in request.form:
username = request.form["username"]
password = request.form["password"]
user = User.get(username)
# If we found a user based on username then compare that the submitted
# password matches the password in the database. The password is stored
# is a slated hash format, so you must hash the password before comparing it.
if user and hash_pass(password) == user.password:
login_user(user, remember=True)
# FIXME! Get this to work properly...
# return redirect(request.args.get("next") or url_for("index"))
return redirect(url_for("index"))
else:
flash(u"Invalid username, please try again.")
return render_template("login.html")
def main_teacher():
tm = teachers_model.Teachers(flask.session['id'])
if request.method == 'POST':
cm = courses_model.Courses()
if "close" in request.form.keys():
cid = request.form["close"]
cm.cid = cid
cm.close_session(cm.get_active_session())
elif "open" in request.form.keys():
cid = request.form["open"]
cm.cid = cid
cm.open_session()
courses = tm.get_courses_with_session()
empty = True if len(courses) == 0 else False
context = dict(data=courses)
return render_template('main_teacher.html', empty=empty, **context)
def add_backnode():
if request.method == 'POST':
node_name = request.form['node_name']
ftp_ip = request.form['ftp_ip']
ftp_port = request.form['ftp_port']
ftp_user = request.form['ftp_user']
ftp_pass = request.form['ftp_pass']
back_node = backhosts.query.filter(or_(backhosts.host_node==node_name,backhosts.ftp_ip==ftp_ip)).all()
if back_node:
flash(u'%s ?????????????!' %node_name)
return render_template('addbacknode.html')
backhost = backhosts(host_node=node_name,ftp_ip=ftp_ip,ftp_port=ftp_port,ftp_user=ftp_user,ftp_pass=ftp_pass)
db.session.add(backhost)
db.session.commit()
flash(u'%s ??????!' %node_name)
return render_template('addbacknode.html')
else:
return render_template('addbacknode.html')
def customer():
if request.method == 'POST':
try:
customer_id = request.form['customer_id']
customer_oper = request.form['customer_oper']
customer = customers.query.filter_by(id=customer_id).first()
if customer_oper == 'stop_back':
customer.customers_status = 1
else:
customer.customers_status = 0
db.session.add(customer)
db.session.commit()
return u"???????"
except Exception, e:
print e
return u"???????"
else:
customer_all = customers.query.all()
return render_template('customers.html',customers=customer_all)
def changepass():
if request.method == 'POST':
# process password change
if request.form['pass1'] == request.form['pass2']:
change_password(session['username'], request.form['pass1'])
log_action(session['uid'], 8)
session.pop('logged_in', None)
session.pop('uid', None)
session.pop('priv', None)
session.pop('username', None)
flash('Your password has been changed. Please login using your new password.')
return redirect(url_for('home'))
else:
flash('The passwords you entered do not match. Please try again.')
return render_template('changepass.html')
return render_template('changepass.html')
#
# EDIT USER PAGE
#
def adduser():
if request.method == 'POST':
if request.form['pass1'] == request.form['pass2']:
if user_exists(request.form['username']) == False:
# create the user
admin = 0
if request.form['status'] == 'admin':
admin = 1
create_user(request.form['username'], request.form['pass1'], admin)
log_action(session['uid'], 10)
flash(request.form['username'] + ' has been created.')
return render_template('adduser.html', acp=session['priv'], username=session['username'])
else:
flash('The username you entered is already in use.')
return render_template('adduser.html', acp=session['priv'], username=session['username'])
else:
flash('The passwords you entered do not match. Please try again.')
return render_template('adduser.html', acp=session['priv'], username=session['username'])
return render_template('adduser.html', acp=session['priv'], username=session['username'])
def login():
"""Login POST handler.
Only runs when ``/login`` is hit with a POST method. There is no GET method
equivilent, as it is handled by the navigation template. Sets the status
code to ``401`` on login failure.
Returns:
JSON formatted output describing success or failure.
"""
log.debug("Entering login, attempting to authenticate user.")
username = request.form['signin_username']
password = request.form['signin_password']
log.debug("Username: {0}".format(username))
if fe.check_auth(username, password):
log.debug("User authenticated. Trying to set session.")
session_id = fe.set_login_id()
session['logged_in'] = session_id
log.debug("Session ID: {0}, returning to user".format(session_id))
return jsonify({ "login": "success" })
log.debug("Username or password not recognized, sending 401.")
response.status = 401
return jsonify({ "login": "failed" })
def json_query_simple(query, query_args=[], empty_ok=False):
"""Do a SQL query that selects one column and dump those values as
a JSON array"""
if request.method != 'GET':
return not_allowed()
try:
cursor = dbcursor_query(query, query_args)
except Exception as ex:
return error(str(ex))
if cursor.rowcount == 0:
cursor.close()
if empty_ok:
return json_response([])
else:
return not_found()
result = []
for row in cursor:
result.append(row[0])
cursor.close()
return json_response(result)
def json_query(query, query_args=[], name = 'name', single = False):
"""Do a SQL query that selects one column containing JSON and dump
the results, honoring the 'expanded' and 'pretty' arguments. If
the 'single' argument is True, the first-returned row will be
returned as a single item instead of an array."""
if request.method != 'GET':
return not_allowed()
try:
cursor = dbcursor_query(query, query_args)
except Exception as ex:
return error(str(ex))
if single and cursor.rowcount == 0:
cursor.close()
return not_found()
result = []
for row in cursor:
this = base_url(None if single else row[0][name])
row[0]['href'] = this
result.append( row[0] if single or is_expanded() else this)
cursor.close()
return json_response(result[0] if single else result)
def new():
form = ProjectForm(request.form)
if request.method == 'POST' and form.validate():
user_repo_path = join('repos', form.name.data)
if os.path.isdir(user_repo_path):
flash(_('This project name already exists'), 'error')
else:
project = Project(form.name.data, current_user)
db.session.add(project)
db.session.commit()
#project.create_project(form.name.data, current_user)
flash(_('Project created successfuly!'), 'info')
return redirect(url_for('branches.view',
project=form.name.data,
branch='master', filename='index'))
return render_template('new.html', form=form)
def html2rst():
if request.method == 'POST':
if request.form.has_key('content'):
input = request.form.get('content')
if not input:
input = 'undefined'
if input != 'undefined':
try:
converted = html2rest(input)
prefetch = None
except:
converted = None
prefetch = input
return render_template('html2rst.html', converted=converted,
prefetch=prefetch)
return render_template('html2rst.html')
def request_validate(view):
@wraps(view)
def wrapper(*args, **kwargs):
endpoint = request.endpoint.partition('.')[-1]
# data
method = request.method
if method == 'HEAD':
method = 'GET'
locations = validators.get((endpoint, method), {})
data_type = {"json": "request_data", "args": "request_args"}
for location, schema in locations.items():
value = getattr(request, location, MultiDict())
validator = FlaskValidatorAdaptor(schema)
result = validator.validate(value)
LOG.info("Validated request %s: %s" % (location, result))
if schema.get("maxProperties") == 0:
continue
else:
kwargs[data_type[location]] = result
context = request.environ['context']
return view(*args, context=context, **kwargs)
return wrapper
def update_content_in_local_cache(url, content, method='GET'):
"""?? local_cache ??????, ??content
?stream?????"""
if local_cache_enable and method == 'GET' and cache.is_cached(url):
info_dict = cache.get_info(url)
resp = cache.get_obj(url)
resp.set_data(content)
# ???????????content?, without_content ????true
# ?????????, ???content????, ????????
# ?stream???, ??????http?, ???????, ????????????????
# ?????????????????????, ???????????????
info_dict['without_content'] = False
if verbose_level >= 4: dbgprint('LocalCache_UpdateCache', url, content[:30], len(content))
cache.put_obj(
url,
resp,
obj_size=len(content),
expires=get_expire_from_mime(parse.mime),
last_modified=info_dict.get('last_modified'),
info_dict=info_dict,
)
def request_remote_site():
"""
???????(high-level), ????404/500??? domain_guess ??
"""
# ????????
# ??: ?zmirror?????????, ??????????????
parse.remote_response = send_request(
parse.remote_url,
method=request.method,
headers=parse.client_header,
data=parse.request_data_encoded,
)
if parse.remote_response.url != parse.remote_url:
warnprint("requests's remote url", parse.remote_response.url,
'does no equals our rewrited url', parse.remote_url)
if 400 <= parse.remote_response.status_code <= 599:
# ??url????????
dbgprint("Domain guessing for", request.url)
result = guess_correct_domain()
if result is not None:
parse.remote_response = result
def reset_api_key(name):
"""
Reset API-KEY for user.
Returns a Jinja2 template.
"""
if request.method == 'POST':
user = user_repo.get_by_name(name)
if not user:
return abort(404)
ensure_authorized_to('update', user)
user.api_key = model.make_uuid()
user_repo.update(user)
cached_users.delete_user_summary(user.name)
msg = gettext('New API-KEY generated')
flash(msg, 'success')
return redirect_content_type(url_for('account.profile', name=name))
else:
csrf = dict(form=dict(csrf=generate_csrf()))
return jsonify(csrf)
def password_required(short_name):
(project, owner, n_tasks, n_task_runs,
overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
form = PasswordForm(request.form)
if request.method == 'POST' and form.validate():
password = request.form.get('password')
cookie_exp = current_app.config.get('PASSWD_COOKIE_TIMEOUT')
passwd_mngr = ProjectPasswdManager(CookieHandler(request, signer, cookie_exp))
if passwd_mngr.validates(password, project):
response = make_response(redirect(request.args.get('next')))
return passwd_mngr.update_response(response, project, get_user_id_or_ip())
flash(gettext('Sorry, incorrect password'))
return render_template('projects/password.html',
project=project,
form=form,
short_name=short_name,
next=request.args.get('next'))
def publish(short_name):
(project, owner, n_tasks, n_task_runs,
overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
#### shruthi
if("sched" in project.info.keys() and project.info["sched"]=="FRG"):
if(project.owner_id==current_user.id and not cached_users.is_quiz_created(current_user.id, project)):
flash("You did not created quiz.Please create the quiz","danger")
return redirect(url_for('quiz.create_quiz', short_name=project.short_name))
#### end
pro = pro_features()
ensure_authorized_to('publish', project)
if request.method == 'GET':
return render_template('projects/publish.html',
project=project,
pro_features=pro)
project.published = True
project_repo.save(project)
task_repo.delete_taskruns_from_project(project)
result_repo.delete_results_from_project(project)
webhook_repo.delete_entries_from_project(project)
auditlogger.log_event(project, current_user, 'update', 'published', False, True)
flash(gettext('Project published! Volunteers will now be able to help you!'))
return redirect(url_for('.details', short_name=project.short_name))
def users(user_id=None):
"""Manage users of PYBOSSA."""
form = SearchForm(request.body)
users = [user for user in user_repo.filter_by(admin=True)
if user.id != current_user.id]
if request.method == 'POST' and form.user.data:
query = form.user.data
found = [user for user in user_repo.search_by_name(query)
if user.id != current_user.id]
[ensure_authorized_to('update', found_user) for found_user in found]
if not found:
flash("<strong>Ooops!</strong> We didn't find a user "
"matching your query: <strong>%s</strong>" % form.user.data)
response = dict(template='/admin/users.html', found=found, users=users,
title=gettext("Manage Admin Users"),
form=form)
return handle_content_type(response)
response = dict(template='/admin/users.html', found=[], users=users,
title=gettext("Manage Admin Users"), form=form)
return handle_content_type(response)
def new_item(category):
if request.method == 'GET':
return render('newitem.html', category=category)
elif request.method == 'POST':
name = request.form['name']
highlight = request.form['highlight']
url = request.form['url']
if valid_item(name, url, highlight):
user_id = login_session['user_id']
item = Item(name=name, highlight=highlight, url=url, user_id=user_id, category_id=category.id)
session.add(item)
session.commit()
flash("Newed item %s!" % item.name)
return redirect(url_for('show_item', category_id=category.id, item_id=item.id))
else:
error = "Complete info please!"
return render('newitem.html', category=category, name=name, highlight=highlight, url=url,
error=error)
def edit_item(category, item):
if request.method == 'GET':
return render('edititem.html', category=category, item=item, name=item.name, highlight=item.highlight,
url=item.url)
elif request.method == 'POST':
name = request.form['name']
highlight = request.form['highlight']
url = request.form['url']
if valid_item(name, url, highlight):
item.name = name
item.highlight = highlight
item.url = url
session.commit()
flash("Edited item %s!" % item.name)
return redirect(url_for('show_item', category_id=category.id, item_id=item.id))
else:
error = "Complete info please!"
return render('edititem.html', category=category, item=item, name=name, highlight=highlight,
url=url, error=error)
def new_category():
if request.method == 'GET':
return render('newcategory.html')
elif request.method == 'POST':
name = request.form['name']
description = request.form['description']
if valid_category(name, description):
user_id = login_session['user_id']
category = Category(name=name, description=description, user_id=user_id)
session.add(category)
session.commit()
flash("Newed category %s!" % category.name)
return redirect(url_for("show_items", category_id=category.id))
else:
error = "Complete info please!"
return render('newcategory.html', name=name, description=description, error=error)
def edit_category(category):
if request.method == 'GET':
return render('editcategory.html', category=category, name=category.name,
description=category.description)
elif request.method == 'POST':
name = request.form['name']
description = request.form['description']
if valid_category(name, description):
category.name = name
category.description = description
session.commit()
flash("Edited category %s!" % category.name)
return redirect(url_for("show_items", category_id=category.id))
else:
error = "Complete info please!"
return render('editcategory.html', category=category, name=name, description=description, error=error)
def modify_blog_posts(id):
""" RESTful routes for fetching, updating, or deleting a specific blog post """
post = BlogPost.query.get(id)
if not post:
return jsonify({"message": "No blog post found with id " + str(id)}), 404
if request.method == 'GET':
return post
elif request.method == 'PUT':
data = request.form
post.content = data["blog_content"]
db.session.add(post)
db.session.commit()
return jsonify({"message": "Success!"}), 200
else:
db.session.delete(post)
db.session.commit()
return jsonify({"message": "Success!"}), 200
def userlist():
u=User()
form=AddUserForm()
flag=current_user.is_administrator(g.user)
if flag is True:
userlist=u.GetUserList()
jsondata=request.get_json()
if request.method == 'POST' and jsondata:
if jsondata['action'] == u'edit':
username=jsondata['username']
location=url_for('.admin_edit_profile',username=username)
return jsonify({"status":302,"location":location})
else:
username=jsondata['username']
u.RemUser(username)
return redirect('userlist')
elif request.method == 'POST' and form.validate():
pwd=u.GetPassword(g.user)
if u.verify_password(form.oripassword.data):
u.AddUser(form.username.data,form.password.data,form.role.data,form.email.data)
return redirect('userlist')
else:
return render_template('userlist.html',userlist=userlist,form=form)
else:
abort(403)
def edit_profile():
form=EditProfileForm()
u=User()
if request.method == 'POST' and form.validate():
pwd=u.GetPassword(g.user)
if u.verify_password(form.oripassword.data):
email=form.email.data
aboutme=form.about_me.data
if form.password.data is not u'':
u.ChangePassword(g.user,form.password.data)
u.ChangeProfile(g.user,email,aboutme)
flash('??????')
return redirect(url_for('.user',username=g.user))
else:
flash('????????')
u.GetUserInfo(g.user)
form.email.data=u.email
form.about_me.data=u.aboutme
return render_template('edit_profile.html',form=form,u=u)
def nodes_node(name):
name = serializer.loads(name)
node = NodeDefender.db.node.get(name)
if request.method == 'GET':
return render_template('frontend/nodes/node.html', Node = node)
if icpeform.Submit.data and icpeform.validate_on_submit():
icpe.alias = BasicForm.alias.data
icpe.comment = BasicForm.comment.data
elif locationform.Submit.data and locationform.validate_on_submit():
icpe.location.street = AddressForm.street.data
icpe.location.city = AddressForm.city.data
icpe.location.geolat = AddressForm.geolat.data
icpe.location.geolong = AddressForm.geolong.data
db.session.add(icpe)
db.session.commit()
return render_template('frontend/nodes/node.html', Node = node)
def login():
if request.method == 'GET':
return redirect(url_for('auth_view.authenticate'))
login_form = LoginForm()
if login_form.validate() and login_form.email.data:
user = NodeDefender.db.user.get(login_form.email.data)
if user is None:
flash('Email or Password Wrong', 'error')
return redirect(url_for('auth_view.login'))
if not user.verify_password(login_form.password.data):
flash('Email or Password Wrong', 'error')
return redirect(url_for('auth_view.login'))
if not user.enabled:
flash('Account Locked', 'error')
return redirect(url_for('auth_view.login'))
if login_form.remember():
login_user(user, remember = True)
else:
login_user(user)
return redirect(url_for('index'))