def twittercallback():
verification = request.args["oauth_verifier"]
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
try:
auth.request_token = session["request_token"]
except KeyError:
flash("Please login again", "danger")
return redirect(url_for("bp.home"))
try:
auth.get_access_token(verification)
except tweepy.TweepError:
flash("Failed to get access token", "danger")
return redirect(url_for("bp.home"))
session["access_token"] = auth.access_token
session["access_token_secret"] = auth.access_token_secret
return render_template("twittercallback.html", form=HashtagForm())
python类session()的实例源码
def get(self):
masterip = self.masterip
data = {
"user": session['username'],
}
result = dockletRequest.post('/monitor/hosts/%s/containerslist/'%(self.com_ip), data, masterip)
containers = result.get('monitor').get('containerslist')
containerslist = []
for container in containers:
result = dockletRequest.post('/monitor/vnodes/%s/basic_info/'%(container), data, masterip)
basic_info = result.get('monitor').get('basic_info')
result = dockletRequest.post('/monitor/vnodes/%s/owner/'%(container), data, masterip)
owner = result.get('monitor')
basic_info['owner'] = owner
containerslist.append(basic_info)
return self.render(self.template_path, containerslist = containerslist, com_ip = self.com_ip, user = session['username'], masterip = masterip)
def get(self):
data = {
"user": session['username'],
}
allresult = dockletRequest.post_to_all('/monitor/listphynodes/', data)
allmachines = {}
for master in allresult:
allmachines[master] = []
iplist = allresult[master].get('monitor').get('allnodes')
for ip in iplist:
containers = {}
result = dockletRequest.post('/monitor/hosts/%s/containers/'%(ip), data, master.split("@")[0])
containers = result.get('monitor').get('containers')
result = dockletRequest.post('/monitor/hosts/%s/status/'%(ip), data, master.split("@")[0])
status = result.get('monitor').get('status')
allmachines[master].append({'ip':ip,'containers':containers, 'status':status})
#print(machines)
return self.render(self.template_path, allmachines = allmachines, user = session['username'])
def post_to_all(self, url = '/', data={}):
if (url == '/'):
res = []
for masterip in masterips:
try:
requests.post("http://"+getip(masterip)+":"+master_port+"/isalive/",data=data)
except Exception as e:
logger.debug(e)
continue
res.append(masterip)
return res
data = dict(data)
data['token'] = session['token']
logger.info("Docklet Request: user = %s data = %s, url = %s"%(session['username'], data, url))
result = {}
for masterip in masterips:
try:
res = requests.post("http://"+getip(masterip)+":"+master_port+url,data=data).json()
except Exception as e:
logger.debug(e)
continue
result[masterip] = res
logger.debug("get result from " + getip(masterip))
return result
def post(self):
masterip = self.masterip
index1 = self.image.rindex("_")
index2 = self.image[:index1].rindex("_")
checkname(self.clustername)
data = {
"clustername": self.clustername,
'imagename': self.image[:index2],
'imageowner': self.image[index2+1:index1],
'imagetype': self.image[index1+1:],
}
result = dockletRequest.post("/cluster/create/", dict(data, **(request.form)), masterip)
if(result.get('success', None) == "true"):
return redirect("/dashboard/")
#return self.render(self.template_path, user = session['username'])
else:
return self.render(self.error_path, message = result.get('message'))
def post(self):
masterip = self.masterip
data = {
"clustername": self.clustername,
"image": self.imagename,
"containername": self.containername,
"description": self.description,
"isforce": self.isforce
}
result = dockletRequest.post("/cluster/save/", data, masterip)
if(result):
if result.get('success') == 'true':
#return self.render(self.success_path, user = session['username'])
return redirect("/config/")
#res = detailClusterView()
#res.clustername = self.clustername
#return res.as_view()
else:
if result.get('reason') == "exists":
return self.render(self.template_path, containername = self.containername, clustername = self.clustername, image = self.imagename, user = session['username'], description = self.description, masterip=masterip)
else:
return self.render(self.error_path, message = result.get('message'))
else:
self.error()
def after_request(response):
response.headers.add('Access-Control-Allow-Methods', 'GET, POST')
response.headers.add('Access-Control-Allow-Credentials', 'true')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type, *')
response.headers.add('Cache-Control', 'no-cache')
response.headers.add('Cache-Control', 'no-store')
if api.auth.is_logged_in():
if 'token' in session:
response.set_cookie('token', session['token'], domain=app.config['SESSION_COOKIE_DOMAIN'])
else:
csrf_token = api.common.token()
session['token'] = csrf_token
response.set_cookie('token', csrf_token, domain=app.config['SESSION_COOKIE_DOMAIN'])
# JB: This is a hack. We need a better solution
if request.path[0:19] != "/api/autogen/serve/":
response.mimetype = 'application/json'
return response
def main_teacher():
tm = teachers_model.Teachers(flask.session['id'])
if request.method == 'POST':
cm = courses_model.Courses()
if "close" in request.form.keys():
cid = request.form["close"]
cm.cid = cid
cm.close_session(cm.get_active_session())
elif "open" in request.form.keys():
cid = request.form["open"]
cm.cid = cid
cm.open_session()
courses = tm.get_courses_with_session()
empty = True if len(courses) == 0 else False
context = dict(data=courses)
return render_template('main_teacher.html', empty=empty, **context)
def test_session_transactions(self):
app = flask.Flask(__name__)
app.testing = True
app.secret_key = 'testing'
@app.route('/')
def index():
return text_type(flask.session['foo'])
with app.test_client() as c:
with c.session_transaction() as sess:
self.assert_equal(len(sess), 0)
sess['foo'] = [42]
self.assert_equal(len(sess), 1)
rv = c.get('/')
self.assert_equal(rv.data, b'[42]')
with c.session_transaction() as sess:
self.assert_equal(len(sess), 1)
self.assert_equal(sess['foo'], [42])
def test_session_using_application_root(self):
class PrefixPathMiddleware(object):
def __init__(self, app, prefix):
self.app = app
self.prefix = prefix
def __call__(self, environ, start_response):
environ['SCRIPT_NAME'] = self.prefix
return self.app(environ, start_response)
app = flask.Flask(__name__)
app.wsgi_app = PrefixPathMiddleware(app.wsgi_app, '/bar')
app.config.update(
SECRET_KEY='foo',
APPLICATION_ROOT='/bar'
)
@app.route('/')
def index():
flask.session['testing'] = 42
return 'Hello World'
rv = app.test_client().get('/', 'http://example.com:8080/')
self.assert_in('path=/bar', rv.headers['set-cookie'].lower())
def test_session_using_session_settings(self):
app = flask.Flask(__name__)
app.config.update(
SECRET_KEY='foo',
SERVER_NAME='www.example.com:8080',
APPLICATION_ROOT='/test',
SESSION_COOKIE_DOMAIN='.example.com',
SESSION_COOKIE_HTTPONLY=False,
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_PATH='/'
)
@app.route('/')
def index():
flask.session['testing'] = 42
return 'Hello World'
rv = app.test_client().get('/', 'http://www.example.com:8080/test/')
cookie = rv.headers['set-cookie'].lower()
self.assert_in('domain=.example.com', cookie)
self.assert_in('path=/', cookie)
self.assert_in('secure', cookie)
self.assert_not_in('httponly', cookie)
def signup():
from forms import SignupForm
form = SignupForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data.lower()).first()
if user is not None:
form.email.errors.append("The Email address is already taken.")
return render_template('signup.html', form=form)
newuser = User(form.firstname.data,form.lastname.data,form.email.data,form.password.data)
db.session.add(newuser)
db.session.commit()
session['email'] = newuser.email
return redirect(url_for('login'))
return render_template('signup.html', form=form)
def login():
if g.user is not None and g.user.is_authenticated:
return redirect(url_for('index'))
from app.forms import LoginForm
form = LoginForm()
if form.validate_on_submit():
session['remember_me'] = form.remember_me.data
user = User.query.filter_by(email=form.email.data.lower()).first()
if user and user.check_password(form.password.data):
session['email'] = form.email.data
login_user(user,remember=session['remember_me'])
return redirect(url_for('index'))
else:
return render_template('login.html',form=form,failed_auth=True)
return render_template('login.html',form=form)
def ping(service_id):
from app import db, models
from models import Service
finding = Service.query.filter_by(serviceid=service_id).first()
if finding is not None:
image_name = finding.imagename
uploadn = finding.uploadname
usern = finding.username
firstcreatetime = finding.firstcreatetime
u = Service(serviceid = service_id, createdtime = str(time.time()), imagename = image_name, uploadname = uploadn, username = usern, firstcreatetime = firstcreatetime)
db.session.add(u)
db.session.commit()
db.session.delete(finding)
db.session.commit()
else:
return "The service "+service_id+" has been removed!"
return "There are existing service:"+service_id
def test_oauth_authorized_saves_token_and_user_to_session(self, oauth):
fake_resp = {'oauth_token_secret': u'secret',
'username': u'palotespaco',
'fullname': u'paco palotes',
'oauth_token':u'token',
'user_nsid': u'user'}
oauth.authorized_response.return_value = fake_resp
expected_token = {
'oauth_token_secret': u'secret',
'oauth_token': u'token'
}
expected_user = {'username': u'palotespaco', 'user_nsid': u'user'}
with flask_app.test_client() as c:
c.get('/flickr/oauth-authorized')
assert session['flickr_token'] == expected_token, session['flickr_token']
assert session['flickr_user'] == expected_user, session['flickr_user']
def new_item(category):
if request.method == 'GET':
return render('newitem.html', category=category)
elif request.method == 'POST':
name = request.form['name']
highlight = request.form['highlight']
url = request.form['url']
if valid_item(name, url, highlight):
user_id = login_session['user_id']
item = Item(name=name, highlight=highlight, url=url, user_id=user_id, category_id=category.id)
session.add(item)
session.commit()
flash("Newed item %s!" % item.name)
return redirect(url_for('show_item', category_id=category.id, item_id=item.id))
else:
error = "Complete info please!"
return render('newitem.html', category=category, name=name, highlight=highlight, url=url,
error=error)
def check_auth(func):
"""
This decorator for routes checks that the user is authorized (or that no login is required).
If they haven't, their intended destination is stored and they're sent to get authorized.
It has to be placed AFTER @app.route() so that it can capture `request.path`.
"""
if 'login' not in conf:
return func
# inspired by <https://flask-login.readthedocs.org/en/latest/_modules/flask_login.html#login_required>
@functools.wraps(func)
def decorated_view(*args, **kwargs):
if current_user.is_anonymous:
print('unauthorized user visited {!r}'.format(request.path))
session['original_destination'] = request.path
return redirect(url_for('get_authorized'))
print('{} visited {!r}'.format(current_user.email, request.path))
assert current_user.email.lower() in conf.login['whitelist'], current_user
return func(*args, **kwargs)
return decorated_view
def test_session_transactions(self):
app = flask.Flask(__name__)
app.testing = True
app.secret_key = 'testing'
@app.route('/')
def index():
return text_type(flask.session['foo'])
with app.test_client() as c:
with c.session_transaction() as sess:
self.assert_equal(len(sess), 0)
sess['foo'] = [42]
self.assert_equal(len(sess), 1)
rv = c.get('/')
self.assert_equal(rv.data, b'[42]')
with c.session_transaction() as sess:
self.assert_equal(len(sess), 1)
self.assert_equal(sess['foo'], [42])
def test_session_using_application_root(self):
class PrefixPathMiddleware(object):
def __init__(self, app, prefix):
self.app = app
self.prefix = prefix
def __call__(self, environ, start_response):
environ['SCRIPT_NAME'] = self.prefix
return self.app(environ, start_response)
app = flask.Flask(__name__)
app.wsgi_app = PrefixPathMiddleware(app.wsgi_app, '/bar')
app.config.update(
SECRET_KEY='foo',
APPLICATION_ROOT='/bar'
)
@app.route('/')
def index():
flask.session['testing'] = 42
return 'Hello World'
rv = app.test_client().get('/', 'http://example.com:8080/')
self.assert_in('path=/bar', rv.headers['set-cookie'].lower())
def test_session_using_session_settings(self):
app = flask.Flask(__name__)
app.config.update(
SECRET_KEY='foo',
SERVER_NAME='www.example.com:8080',
APPLICATION_ROOT='/test',
SESSION_COOKIE_DOMAIN='.example.com',
SESSION_COOKIE_HTTPONLY=False,
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_PATH='/'
)
@app.route('/')
def index():
flask.session['testing'] = 42
return 'Hello World'
rv = app.test_client().get('/', 'http://www.example.com:8080/test/')
cookie = rv.headers['set-cookie'].lower()
self.assert_in('domain=.example.com', cookie)
self.assert_in('path=/', cookie)
self.assert_in('secure', cookie)
self.assert_not_in('httponly', cookie)
def after_request(response):
response.headers.add('Access-Control-Allow-Methods', 'GET, POST')
response.headers.add('Access-Control-Allow-Credentials', 'true')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type, *')
response.headers.add('Cache-Control', 'no-cache')
response.headers.add('Cache-Control', 'no-store')
if api.auth.is_logged_in():
if 'token' in session:
response.set_cookie('token', session['token'])
else:
csrf_token = api.common.token()
session['token'] = csrf_token
response.set_cookie('token', csrf_token)
# JB: This is a hack. We need a better solution
if request.path[0:19] != "/api/autogen/serve/":
response.mimetype = 'appication/json'
return response
def info(id):
info = g.plex.getInfo(id)
views = None
parent = None
episodes = None
cur_el = info.getchildren()[0]
cur_type = cur_el.get("type")
if cur_type == "movie":
views = db.session.query(models.Processed).filter(models.Processed.title == info.find("Video").get("title")).all()
elif cur_type == "season":
parent = g.plex.getInfo(info.getchildren()[0].get("parentRatingKey")).getchildren()[0]
episodes = g.plex.episodes(id)
elif cur_type == "episode":
views = db.session.query(models.Processed).filter(models.Processed.session_id.like("%/metadata/" + cur_el.get("ratingKey") + "_%")).all()
elif cur_type == "show":
episodes = db.session.query(db.func.count(models.Processed.title), models.Processed).filter(models.Processed.orig_title.like(cur_el.get("title"))).group_by(models.Processed.title).having(db.func.count(models.Processed.orig_title) > 0).order_by(db.func.count(models.Processed.orig_title).desc(), models.Processed.time.desc()).all()
return render_template('info.html', info=info, history=views, parent=parent, episodes=episodes, title=_('Info'))
def chals():
if not is_admin():
if not ctftime():
if view_after_ctf():
pass
else:
return redirect('/')
if can_view_challenges():
chals = Challenges.query.add_columns('id', 'name', 'value', 'description', 'category').order_by(Challenges.value).all()
json = {'game':[]}
for x in chals:
files = [ str(f.location) for f in Files.query.filter_by(chal=x.id).all() ]
json['game'].append({'id':x[1], 'name':x[2], 'value':x[3], 'description':x[4], 'category':x[5], 'files':files})
db.session.close()
return jsonify(json)
else:
db.session.close()
return redirect('/login')
def after_request(response):
response.headers.add('Access-Control-Allow-Methods', 'GET, POST')
response.headers.add('Access-Control-Allow-Credentials', 'true')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type, *')
response.headers.add('Cache-Control', 'no-cache')
response.headers.add('Cache-Control', 'no-store')
if api.auth.is_logged_in():
if 'token' in session:
response.set_cookie('token', session['token'])
else:
csrf_token = api.common.token()
session['token'] = csrf_token
response.set_cookie('token', csrf_token)
# JB: This is a hack. We need a better solution
if request.path[0:19] != "/api/autogen/serve/":
response.mimetype = 'appication/json'
return response
def test_session_transactions(self):
app = flask.Flask(__name__)
app.testing = True
app.secret_key = 'testing'
@app.route('/')
def index():
return text_type(flask.session['foo'])
with app.test_client() as c:
with c.session_transaction() as sess:
self.assert_equal(len(sess), 0)
sess['foo'] = [42]
self.assert_equal(len(sess), 1)
rv = c.get('/')
self.assert_equal(rv.data, b'[42]')
with c.session_transaction() as sess:
self.assert_equal(len(sess), 1)
self.assert_equal(sess['foo'], [42])
def test_session_using_application_root(self):
class PrefixPathMiddleware(object):
def __init__(self, app, prefix):
self.app = app
self.prefix = prefix
def __call__(self, environ, start_response):
environ['SCRIPT_NAME'] = self.prefix
return self.app(environ, start_response)
app = flask.Flask(__name__)
app.wsgi_app = PrefixPathMiddleware(app.wsgi_app, '/bar')
app.config.update(
SECRET_KEY='foo',
APPLICATION_ROOT='/bar'
)
@app.route('/')
def index():
flask.session['testing'] = 42
return 'Hello World'
rv = app.test_client().get('/', 'http://example.com:8080/')
self.assert_in('path=/bar', rv.headers['set-cookie'].lower())
def test_session_using_session_settings(self):
app = flask.Flask(__name__)
app.config.update(
SECRET_KEY='foo',
SERVER_NAME='www.example.com:8080',
APPLICATION_ROOT='/test',
SESSION_COOKIE_DOMAIN='.example.com',
SESSION_COOKIE_HTTPONLY=False,
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_PATH='/'
)
@app.route('/')
def index():
flask.session['testing'] = 42
return 'Hello World'
rv = app.test_client().get('/', 'http://www.example.com:8080/test/')
cookie = rv.headers['set-cookie'].lower()
self.assert_in('domain=.example.com', cookie)
self.assert_in('path=/', cookie)
self.assert_in('secure', cookie)
self.assert_not_in('httponly', cookie)
def _dispatch(self, route, **kwargs):
user = None
if 'shared_id' in session:
if session['shared_id'] in self.user_tracker.user_refs:
user = self.user_tracker.users[session['shared_id']]
if user is None:
user = self.user_tracker.connect_user()
session['shared_id'] = user.id_
user.open_page(route, kwargs)
ctx = CallCTX(abort=abort)
user.active_controller.before_connect(ctx)
ctx.deactivate()
possible = user.active_controller.render_page()
if user.active_controller.special_return_handler is not None:
return user.active_controller.special_return_handler()
return possible
def sidebar_data():
"""Set the sidebar function."""
# Get post of recent
recent = db.session.query(Post).order_by(
Post.publish_date.desc()
).limit(5).all()
# Get the tags and sort by count of posts.
top_tags = db.session.query(
Tag, func.count(posts_tags.c.post_id).label('total')
).join(
posts_tags
).group_by(Tag).order_by('total DESC').limit(5).all()
return recent, top_tags
# Use the Blueprint object to set the Route URL
# Register the view function into blueprint
def new_post():
"""View function for new_port."""
form = PostForm()
# Ensure the user logged in.
# Flask-Login.current_user can be access current user.
if not current_user:
return redirect(url_for('main.login'))
# Will be execute when click the submit in the create a new post page.
if form.validate_on_submit():
new_post = Post()
new_post.title = form.title.data
new_post.text = form.text.data
new_post.publish_date = datetime.now()
new_post.user = current_user
db.session.add(new_post)
db.session.commit()
return redirect(url_for('blog.home'))
return render_template('new_post.html',
form=form)