def uploadFile(current_user):
format = "%Y-%m-%dT%H:%M:%S"
now = datetime.datetime.utcnow().strftime(format)
try:
file = request.files['file']
except:
file = None
try:
url = request.form['url']
except:
url = None
if file and allowed_file(file.filename):
filename = now + '_' +str(current_user) + '_' + file.filename
filename = secure_filename(filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
file_uploaded = True
elif url:
file = urllib.urlopen(url)
filename = url.split('/')[-1]
filename = now + '_' +str(current_user) + '_' + filename
filename = secure_filename(filename)
if file and allowed_file(filename):
open(os.path.join(app.config['UPLOAD_FOLDER'], filename),
'wb').write(file.read())
file_uploaded = True
else:
filename = None
file_uploaded = False
return file_uploaded, filename
python类form()的实例源码
def query():
if(request.method == 'POST'):
organisation = request.form['organisation']
email_address = request.form['email_address']
filename = organisation + ".html"
info = db.session.query(User.github_username, User.name).filter_by(organisation = organisation).all()
if(info == []):
job = q.enqueue_call(
func="main.save_info", args=(organisation, email_address, ), result_ttl=5000, timeout=600
)
flash("We shall notify you at " + email_address + " when the processing is complete")
else:
lists = []
for i in info:
lists.append([str(i.github_username), str(i.name)])
get_nodes.creating_objs(lists, organisation)
return render_template(filename, organisation=str(organisation)+'.json')
return render_template('query.html')
def twittercallback():
verification = request.args["oauth_verifier"]
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
try:
auth.request_token = session["request_token"]
except KeyError:
flash("Please login again", "danger")
return redirect(url_for("bp.home"))
try:
auth.get_access_token(verification)
except tweepy.TweepError:
flash("Failed to get access token", "danger")
return redirect(url_for("bp.home"))
session["access_token"] = auth.access_token
session["access_token_secret"] = auth.access_token_secret
return render_template("twittercallback.html", form=HashtagForm())
def add():
if request.method == 'POST':
if request.form['submit'] == 'Add':
addr = request.form['addr'].lstrip().rstrip()
f = io.open('blastlist.txt', 'a', encoding="utf-8")
f.write(addr.decode('utf-8') + u'\r\n')
f.close()
return render_template('listadded.html',addr=addr)
elif request.form['submit'] == 'Remove':
addr = request.form['addr'].lstrip().rstrip()
f = io.open('blastlist.txt', 'r', encoding="utf-8")
lines = f.readlines()
f.close()
f = io.open('blastlist.txt', 'w', encoding="utf-8")
for line in lines:
if addr not in line:
f.write(line.decode('utf-8'))
f.close()
return render_template('listremoved.html',addr=addr)
def post(self):
if (request.form['username']):
data = {"user": request.form['username'], "key": request.form['password']}
result = dockletRequest.unauthorizedpost('/login/', data)
ok = result and result.get('success', None)
if (ok and (ok == "true")):
# set cookie:docklet-jupyter-cookie for jupyter notebook
resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
app_key = os.environ['APP_KEY']
resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(request.form['username'], app_key))
# set session for docklet
session['username'] = request.form['username']
session['nickname'] = result['data']['nickname']
session['description'] = result['data']['description']
session['avatar'] = '/static/avatar/'+ result['data']['avatar']
session['usergroup'] = result['data']['group']
session['status'] = result['data']['status']
session['token'] = result['data']['token']
return resp
else:
return redirect('/login/')
else:
return redirect('/login/')
def get(self):
form = external_generate.external_auth_generate_request()
result = dockletRequest.unauthorizedpost('/external_login/', form)
ok = result and result.get('success', None)
if (ok and (ok == "true")):
# set cookie:docklet-jupyter-cookie for jupyter notebook
resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
app_key = os.environ['APP_KEY']
resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key))
# set session for docklet
session['username'] = result['data']['username']
session['nickname'] = result['data']['nickname']
session['description'] = result['data']['description']
session['avatar'] = '/static/avatar/'+ result['data']['avatar']
session['usergroup'] = result['data']['group']
session['status'] = result['data']['status']
session['token'] = result['data']['token']
return resp
else:
return redirect('/login/')
def post(self):
form = external_generate.external_auth_generate_request()
result = dockletRequest.unauthorizedpost('/external_login/', form)
ok = result and result.get('success', None)
if (ok and (ok == "true")):
# set cookie:docklet-jupyter-cookie for jupyter notebook
resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
app_key = os.environ['APP_KEY']
resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key))
# set session for docklet
session['username'] = result['data']['username']
session['nickname'] = result['data']['nickname']
session['description'] = result['data']['description']
session['avatar'] = '/static/avatar/'+ result['data']['avatar']
session['usergroup'] = result['data']['group']
session['status'] = result['data']['status']
session['token'] = result['data']['token']
return resp
else:
return redirect('/login/')
def post(self):
masterip = self.masterip
index1 = self.image.rindex("_")
index2 = self.image[:index1].rindex("_")
checkname(self.clustername)
data = {
"clustername": self.clustername,
'imagename': self.image[:index2],
'imageowner': self.image[index2+1:index1],
'imagetype': self.image[index1+1:],
}
result = dockletRequest.post("/cluster/create/", dict(data, **(request.form)), masterip)
if(result.get('success', None) == "true"):
return redirect("/dashboard/")
#return self.render(self.template_path, user = session['username'])
else:
return self.render(self.error_path, message = result.get('message'))
def login_required(func):
@wraps(func)
def wrapper(*args, **kwargs):
logger.info ("get request, path: %s" % request.path)
token = request.form.get("token", None)
if (token == None):
logger.info ("get request without token, path: %s" % request.path)
return json.dumps({'success':'false', 'message':'user or key is null'})
result = post_to_user("/authtoken/", {'token':token})
if result.get('success') == 'true':
username = result.get('username')
beans = result.get('beans')
else:
return result
#if (cur_user == None):
# return json.dumps({'success':'false', 'message':'token failed or expired', 'Unauthorized': 'True'})
return func(username, beans, request.form, *args, **kwargs)
return wrapper
def delete_cluster(user, beans, form):
global G_vclustermgr
clustername = form.get('clustername', None)
if (clustername == None):
return json.dumps({'success':'false', 'message':'clustername is null'})
logger.info ("handle request : delete cluster %s" % clustername)
user_info = post_to_user("/user/selfQuery/" , {'token':form.get("token")})
user_info = json.dumps(user_info)
[status, usage_info] = G_vclustermgr.get_clustersetting(clustername, user, "all", True)
if status:
post_to_user("/user/usageRelease/", {'token':form.get('token'), 'cpu':usage_info['cpu'], 'memory':usage_info['memory'],'disk':usage_info['disk']})
[status, result] = G_vclustermgr.delete_cluster(clustername, user, user_info)
if status:
return json.dumps({'success':'true', 'action':'delete cluster', 'message':result})
else:
return json.dumps({'success':'false', 'action':'delete cluster', 'message':result})
def save_cluster(user, beans, form):
global G_vclustermgr
clustername = form.get('clustername', None)
if (clustername == None):
return json.dumps({'success':'false', 'message':'clustername is null'})
imagename = form.get("image", None)
description = form.get("description", None)
containername = form.get("containername", None)
isforce = form.get("isforce", None)
if not isforce == "true":
[status,message] = G_vclustermgr.image_check(user,imagename)
if not status:
return json.dumps({'success':'false','reason':'exists', 'message':message})
user_info = post_to_user("/user/selfQuery/", {'token':form.get("token")})
[status,message] = G_vclustermgr.create_image(user,clustername,containername,imagename,description,user_info["data"]["groupinfo"]["image"])
if status:
logger.info("image has been saved")
return json.dumps({'success':'true', 'action':'save'})
else:
logger.debug(message)
return json.dumps({'success':'false', 'reason':'exceed', 'message':message})
def user_quotainfo_monitor(user, beans, form, issue):
global G_historymgr
if issue == 'quotainfo':
logger.info("handle request: monitor/user/quotainfo/")
user_info = post_to_user("/user/selfQuery/", {'token':form.get("token")})
quotainfo = user_info['data']['groupinfo']
return json.dumps({'success':'true', 'quotainfo':quotainfo})
elif issue == 'createdvnodes':
logger.info("handle request: monitor/user/createdvnodes/")
res = G_historymgr.getCreatedVNodes(user)
return json.dumps({'success':'true', 'createdvnodes':res})
elif issue == 'net_stats':
logger.info("handle request: monitor/user/net_stats/")
res = G_historymgr.get_user_net_stats(user)
return json.dumps({'success':'true', 'net_stats':res})
else:
return json.dumps({'success':'false', 'message':"Unspported Method!"})
def beans_apply(cur_user,user,form,issue):
global G_applicationmgr
if issue == 'apply':
number = form.get("number",None)
reason = form.get("reason",None)
if number is None or reason is None:
return json.dumps({'success':'false', 'message':'Number and reason can\'t be null.'})
[success,message] = G_applicationmgr.apply(user,number,reason)
if not success:
return json.dumps({'success':'false', 'message':message})
else:
return json.dumps({'success':'true'})
elif issue == 'applymsgs':
applymsgs = G_applicationmgr.query(user)
return json.dumps({'success':'true','applymsgs':applymsgs})
else:
return json.dumps({'success':'false','message':'Unsupported URL!'})
simplifiedapp.py 文件源码
项目:nba-stats-twilio-sms-bot
作者: elizabethsiegle
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def send_sms():
msg = request.form['Body'].lower() # convert to lowercase
msg = "send 1st + last names of 2 players followed by a stat (GP,W,L,MIN,PTS,FG%,3P%,FT%,REB,AST,STL,BLK). Check for typos!"
player_and_stat = msg.split() #split
if len(player_and_stat) == 5: # check input: 2 players + stat
stat = player_and_stat.pop()
player1 = " ".join(player_and_stat[:2])
player1 = " ".join(player_and_stat[2:])
player_stat_map = dict(parse_data(stat))
player1_stats = player_stat_map.get(player1)
player2_stats = player_stat_map.get(player2)
if player1_stats and player2_stats:
msg = "{0}'s total, higher than {1}'s"
if player2_stats > player1_stats:
msg = "{1}'s total, higher than {0}'s"
msg = msg.format([player1_stats, player2_stats])
else: #check
msg = "check both players' names (first and last!)"
return MessagingResponse().message(msg)
def lookup():
icao_identifier = request.form['airport']
if request.form['date']:
try:
date = dateparser.parse(request.form['date']).date()
except ValueError:
return "Unable to understand date %s" % request.form['date'], 400
else:
date = datetime.date.today()
try:
result = do_lookup(icao_identifier, date)
except Exception as e:
return str(e), 400
except:
flask.abort(500)
return json.dumps(result)
def login():
""" This login function checks if the username & password
match the admin.db; if the authentication is successful,
it passes the id of the user into login_user() """
if request.method == "POST" and \
"username" in request.form and \
"password" in request.form:
username = request.form["username"]
password = request.form["password"]
user = User.get(username)
# If we found a user based on username then compare that the submitted
# password matches the password in the database. The password is stored
# is a slated hash format, so you must hash the password before comparing it.
if user and hash_pass(password) == user.password:
login_user(user, remember=True)
# FIXME! Get this to work properly...
# return redirect(request.args.get("next") or url_for("index"))
return redirect(url_for("index"))
else:
flash(u"Invalid username, please try again.")
return render_template("login.html")
def search_ili(q=None):
if q:
query = q
else:
query = request.form['query']
src_id = fetch_src()
kind_id = fetch_kind()
status_id = fetch_status()
ili = dict()
for c in query_omw("""SELECT * FROM ili WHERE def GLOB ?
""", [query]):
ili[c['id']] = (kind_id[c['kind_id']], c['def'],
src_id[c['origin_src_id']], c['src_key'],
status_id[c['status_id']], c['superseded_by_id'],
c['t'])
rsumm, up_who, down_who = f_rate_summary(list(ili.keys()))
return render_template('concept-list.html', ili=ili,
rsumm=rsumm, up_who=up_who, down_who=down_who)
def main_teacher():
tm = teachers_model.Teachers(flask.session['id'])
if request.method == 'POST':
cm = courses_model.Courses()
if "close" in request.form.keys():
cid = request.form["close"]
cm.cid = cid
cm.close_session(cm.get_active_session())
elif "open" in request.form.keys():
cid = request.form["open"]
cm.cid = cid
cm.open_session()
courses = tm.get_courses_with_session()
empty = True if len(courses) == 0 else False
context = dict(data=courses)
return render_template('main_teacher.html', empty=empty, **context)
def login():
db = UserDb(app.config['LOCAL_DB'])
form = request.form
user = form.get('user')
pwd = form.get('pwd')
password = db.login(user)
del db
if pwd == password:
# ??????
session.permanent = True
# session????
app.permanent_session_lifetime = timedelta(minutes=30)
session.update(dict(user=user))
return render_template('index.html')
elif password is None:
return render_template('login.html', info="??????!")
else:
return render_template('login.html', info="?????!")
def add_backnode():
if request.method == 'POST':
node_name = request.form['node_name']
ftp_ip = request.form['ftp_ip']
ftp_port = request.form['ftp_port']
ftp_user = request.form['ftp_user']
ftp_pass = request.form['ftp_pass']
back_node = backhosts.query.filter(or_(backhosts.host_node==node_name,backhosts.ftp_ip==ftp_ip)).all()
if back_node:
flash(u'%s ?????????????!' %node_name)
return render_template('addbacknode.html')
backhost = backhosts(host_node=node_name,ftp_ip=ftp_ip,ftp_port=ftp_port,ftp_user=ftp_user,ftp_pass=ftp_pass)
db.session.add(backhost)
db.session.commit()
flash(u'%s ??????!' %node_name)
return render_template('addbacknode.html')
else:
return render_template('addbacknode.html')
def customer():
if request.method == 'POST':
try:
customer_id = request.form['customer_id']
customer_oper = request.form['customer_oper']
customer = customers.query.filter_by(id=customer_id).first()
if customer_oper == 'stop_back':
customer.customers_status = 1
else:
customer.customers_status = 0
db.session.add(customer)
db.session.commit()
return u"???????"
except Exception, e:
print e
return u"???????"
else:
customer_all = customers.query.all()
return render_template('customers.html',customers=customer_all)
def changepass():
if request.method == 'POST':
# process password change
if request.form['pass1'] == request.form['pass2']:
change_password(session['username'], request.form['pass1'])
log_action(session['uid'], 8)
session.pop('logged_in', None)
session.pop('uid', None)
session.pop('priv', None)
session.pop('username', None)
flash('Your password has been changed. Please login using your new password.')
return redirect(url_for('home'))
else:
flash('The passwords you entered do not match. Please try again.')
return render_template('changepass.html')
return render_template('changepass.html')
#
# EDIT USER PAGE
#
def adduser():
if request.method == 'POST':
if request.form['pass1'] == request.form['pass2']:
if user_exists(request.form['username']) == False:
# create the user
admin = 0
if request.form['status'] == 'admin':
admin = 1
create_user(request.form['username'], request.form['pass1'], admin)
log_action(session['uid'], 10)
flash(request.form['username'] + ' has been created.')
return render_template('adduser.html', acp=session['priv'], username=session['username'])
else:
flash('The username you entered is already in use.')
return render_template('adduser.html', acp=session['priv'], username=session['username'])
else:
flash('The passwords you entered do not match. Please try again.')
return render_template('adduser.html', acp=session['priv'], username=session['username'])
return render_template('adduser.html', acp=session['priv'], username=session['username'])
def login():
"""Login POST handler.
Only runs when ``/login`` is hit with a POST method. There is no GET method
equivilent, as it is handled by the navigation template. Sets the status
code to ``401`` on login failure.
Returns:
JSON formatted output describing success or failure.
"""
log.debug("Entering login, attempting to authenticate user.")
username = request.form['signin_username']
password = request.form['signin_password']
log.debug("Username: {0}".format(username))
if fe.check_auth(username, password):
log.debug("User authenticated. Trying to set session.")
session_id = fe.set_login_id()
session['logged_in'] = session_id
log.debug("Session ID: {0}, returning to user".format(session_id))
return jsonify({ "login": "success" })
log.debug("Username or password not recognized, sending 401.")
response.status = 401
return jsonify({ "login": "failed" })
def add_fn(fn):
with open('/tmp/handler.py', 'w') as f:
f.write(request.form['code'])
with open('/tmp/Dockerfile', 'w') as f:
f.write('FROM scrappy-serverless\n')
f.write('WORKDIR /worker\n')
f.write('COPY handler.py /worker/handler.py\n')
f.write('CMD rq worker -u redis://redis -b ' + fn)
out = subprocess.check_output(['docker', 'build', '-t', 'serverless/'+fn, '/tmp'])
for line in out.split('\n'):
if line.startswith('Successfully built'):
image_id = line.split()[2]
break
else:
raise RuntimeError('bad build')
images[fn] = image_id
return 'OK ' + image_id
# PLAN: Call existing functions @ GET /api/<fn>
def new():
form = ProjectForm(request.form)
if request.method == 'POST' and form.validate():
user_repo_path = join('repos', form.name.data)
if os.path.isdir(user_repo_path):
flash(_('This project name already exists'), 'error')
else:
project = Project(form.name.data, current_user)
db.session.add(project)
db.session.commit()
#project.create_project(form.name.data, current_user)
flash(_('Project created successfuly!'), 'info')
return redirect(url_for('branches.view',
project=form.name.data,
branch='master', filename='index'))
return render_template('new.html', form=form)
def html2rst():
if request.method == 'POST':
if request.form.has_key('content'):
input = request.form.get('content')
if not input:
input = 'undefined'
if input != 'undefined':
try:
converted = html2rest(input)
prefetch = None
except:
converted = None
prefetch = input
return render_template('html2rst.html', converted=converted,
prefetch=prefetch)
return render_template('html2rst.html')
def password_required(short_name):
(project, owner, n_tasks, n_task_runs,
overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
form = PasswordForm(request.form)
if request.method == 'POST' and form.validate():
password = request.form.get('password')
cookie_exp = current_app.config.get('PASSWD_COOKIE_TIMEOUT')
passwd_mngr = ProjectPasswdManager(CookieHandler(request, signer, cookie_exp))
if passwd_mngr.validates(password, project):
response = make_response(redirect(request.args.get('next')))
return passwd_mngr.update_response(response, project, get_user_id_or_ip())
flash(gettext('Sorry, incorrect password'))
return render_template('projects/password.html',
project=project,
form=form,
short_name=short_name,
next=request.args.get('next'))
def create_post():
post_data = {
'title': request.form.get('title'),
'content': request.form.get('content'),
}
post = Post()
post.set(post_data)
post = markdown(post)
upload_image = request.files.get('featured_image')
if upload_image.filename != '' and allowed_file(upload_image.filename):
f = Attachment(upload_image.filename, data=upload_image.stream)
post.set('featured_image', f)
post.save()
tag_names = request.form.get('tags').lower().strip()
tags = [get_tag_by_name(x) for x in split_tag_names(tag_names)]
map_tags_to_post(tags, post)
return redirect(url_for('show_post', post_id=post.id))
def authorize_travis(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
if 'payload' not in request.form:
abort(requests.codes.bad, 'Expected a payload field with the json payload')
payload = json.loads(request.form.get('payload'))
token = request.headers.get('Authorization', None)
repository = payload['repository']
logging.info('Handling notification for repo: {0}/{1} commit {2}'.format(
repository['owner_name'], repository['name'], payload['commit']))
if not token:
abort(requests.codes.forbidden, 'A token is required')
repository = '{0}/{1}'.format(payload['repository']['owner_name'], payload['repository']['name'])
expected_token = sha256((repository + TRAVIS_TOKEN).encode('utf-8')).hexdigest()
if token != expected_token:
abort(requests.codes.unauthorized, 'Invalid token')
return f(payload, *args, **kwargs)
return wrapper