def validate(self):
#check for old pw hash and upadte password if needed
self.user = db.session.query(models.User).filter(models.User.email == self.email.data).first()
if self.user and self.user.password.startswith("pbkdf2:sha1"):
if check_password_hash(self.user.password, self.password.data):
self.user.password = encrypt_password(self.password.data)
self.user.active = 1
self.user.roles.append(db.session.query(models.Role).filter(models.Role.name=="admin").first())
db.session.commit()
return True
#do the flask-security checks
if not super(Login, self).validate():
return False
return True
python类Role()的实例源码
def post(self):
""" Assign a scheduling task """
s = Schedule2.query\
.join(Role)\
.join(Location)\
.join(Organization)\
.filter(Schedule2.state == "mobius-queue", Organization.active == True)\
.order_by(asc(Schedule2.last_update))\
.first()
if s is None:
abort(404)
s.transition_to_mobius_processing()
role = Role.query.get(s.role_id)
loc = Location.query.get(role.location_id)
return {
"schedule_id": s.id,
"role_id": role.id,
"location_id": loc.id,
"organization_id": loc.organization_id,
}
def get(self):
""" Return all queued calculations """
response = {}
response[API_ENVELOPE] = {}
for state in ['chomp-queue', 'chomp-processing']:
data = Schedule2.query.join(Role)\
.join(Location)\
.join(Organization)\
.filter(Schedule2.state == state, Organization.active == True)\
.all()
response[API_ENVELOPE][state] = map(
lambda schedule: marshal(schedule, tasking_schedule_fields),
data)
return response
def post(self):
""" Assign a scheduling task """
s = Schedule2.query\
.join(Role)\
.join(Location)\
.join(Organization)\
.filter(Schedule2.state == "chomp-queue", Organization.active == True)\
.order_by(asc(Schedule2.last_update))\
.first()
if s is None:
abort(404)
s.transition_to_chomp_processing()
role = Role.query.get(s.role_id)
loc = Location.query.get(role.location_id)
return {
"schedule_id": s.id,
"role_id": role.id,
"location_id": loc.id,
"organization_id": loc.organization_id,
}
def create_user(
name=None, email=None,
password=None, role=None, app=None):
"Create a user"
role, created = get_or_create(Role, name=role)
with app.app_context():
if all([name, email, password]):
user = User(
first_name=name,
roles=[role],
active=True,
email=email
)
user.set_password(password)
db.session.add(user)
else:
user = "Cant create the user"
db.session.commit()
print(user)
def configure_admin(app, db):
admin.init_app(app)
# Admin pages
admin.add_view(
BaseView(OperatingSystem, db.session, endpoint='operatingsystems'))
admin.add_view(BaseView(Project, db.session, endpoint='projects'))
admin.add_view(ReleaseView(Release, db.session, endpoint='releases'))
admin.add_view(TestRunView(TestRun, db.session, endpoint='testruns'))
# security
admin.add_view(BaseView(User, db.session, endpoint='users',
category='Auth'))
admin.add_view(BaseView(Role, db.session, endpoint='roles',
category='Auth'))
@app.security.context_processor
def security_context_processor():
return dict(
admin_base_template=admin.base_template,
admin_view=admin.index_view,
h=admin_helpers,
get_url=url_for
)
def test_delete_it_all(base):
user1 = User.query.first(email=user_dict['email'])
user2 = User.query.first(email=user_dict2['email'])
role1 = Role.query.first(name='User')
role2 = Role.query.first(name='Admin')
assert user1.delete() is True
assert role1.delete() is True
assert user2.delete() is True
assert role2.delete() is True
user1 = User.query.first(email=user_dict['email'])
user2 = User.query.first(email=user_dict2['email'])
role1 = Role.query.first(name='User')
role2 = Role.query.first(name='Admin')
assert user1 is None
assert role1 is None
assert user2 is None
assert role2 is None
####################################################################################################
# Ensure we've closed all the RethinkDB connections
####################################################################################################
def startScheduler():
db.create_all()
#create default roles!
if not db.session.query(models.Role).filter(models.Role.name == "admin").first():
admin_role = models.Role(name='admin', description='Administrator Role')
user_role = models.Role(name='user', description='User Role')
db.session.add(admin_role)
db.session.add(user_role)
db.session.commit()
try:
import tzlocal
tz = tzlocal.get_localzone()
logger.info("local timezone: %s" % tz)
except:
tz = None
if not tz or tz.zone == "local":
logger.error('Local timezone name could not be determined. Scheduler will display times in UTC for any log'
'messages. To resolve this set up /etc/timezone with correct time zone name.')
tz = pytz.utc
#in debug mode this is executed twice :(
#DONT run flask in auto reload mode when testing this!
scheduler = BackgroundScheduler(logger=sched_logger, timezone=tz)
scheduler.add_job(notify.task, 'interval', seconds=config.SCAN_INTERVAL, max_instances=1,
start_date=datetime.datetime.now(tz) + datetime.timedelta(seconds=2))
scheduler.start()
sched = scheduler
#notify.task()
def make_shell_context():
"""??????"""
return dict(
app = app,
db = db,
User = User,
Role = Role,
Comment = Comment
)
def make_shell_context():
return dict(app=app, db=db, User=User, Role=Role)
def make_shell_context():
return dict(app=app, db=db, User=User, Role=Role)
def make_shell_context():
return dict(app=app, db=db, User=User, Role=Role,Question=Question,Category=Category,Usertoca=Usertoca,Potoca=Potoca,Vote=Vote)
def make_shell_context():
return dict(app=app, db=db, User=User, Role=Role, Tag=Tag, Video=Video, videotags=videotags)
def test_role_model(self):
"""
Test number of records in Role table
"""
# create test role
role = Role(name="CEO", description="Run the whole company")
# save role to database
db.session.add(role)
db.session.commit()
self.assertEqual(Role.query.count(), 1)
def get(self):
""" Return all queued calculations """
response = {}
response[API_ENVELOPE] = {}
for state in ['mobius-queue', 'mobius-processing']:
data = Schedule2.query\
.join(Role)\
.join(Location)\
.join(Organization)\
.filter(Schedule2.state == state, Organization.active == True)\
.all()
response[API_ENVELOPE][state] = map(
lambda schedule: marshal(schedule, tasking_schedule_fields),
data)
return response
def make_shell_context():
return dict(app=app, db=db, User=User, Role=Role, Permission=Permission)
def make_shell_context() :
return dict(app=app , dn=db , User=User,Role=Role)
def make_shell_context():
return dict(app=app, db=db, User=User, Role=Role, Permission=Permission,
Post=Post)
def create_app():
"""
App factory pattern avoids circular imports, so instead of importing
'app' directly you import its factory. If you need the current running app
you can use 'from flask import current_app'
:return: app
"""
app = Flask(__name__)
config_name = os.environ.get('FLASK_CONFIG') or 'development'
cfg = os.path.join(os.getcwd(), 'config', config_name + '.py')
app.config.from_pyfile(cfg)
# initialize extensions
bootstrap.init_app(app)
db.init_app(app)
# security
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
app.security = Security(app, user_datastore)
# import blueprints
from app.dashboard.views import dashboard_blueprint
from app.operatingsystem.views import os_blueprint
from app.project.views import project_blueprint
from app.release.views import release_blueprint
from app.testrun.views import run_blueprint
app.register_blueprint(dashboard_blueprint)
app.register_blueprint(os_blueprint)
app.register_blueprint(project_blueprint)
app.register_blueprint(release_blueprint)
app.register_blueprint(run_blueprint)
configure_admin(app, db)
db.create_all(app=app)
return app
def make_shell_context():
return dict(app = app, db = db, User = User, Role = Role, Post = Post)
def set_role(id, role):
user = User.query.get(id)
role = Role.query.filter_by(name=role).first()
if not user:
print 'User not found'
exit(1)
if not role:
print 'Role not found'
exit(1)
user.roles.append(role)
db.session.add(user)
db.session.commit()
print 'Role set successfully.'
def unset_role(id, role):
user = User.query.get(id)
role = Role.query.filter_by(name=role).first()
if not user:
print 'User not found'
exit(1)
if not role:
print 'Role not found'
exit(1)
user.roles.remove(role)
db.session.add(user)
db.session.commit()
print 'Role unset successfully.'
def add_role(name):
role = Role(name=name)
db.session.add(role)
db.session.commit()
print '%s added' % unicode(role)
def delete_role(name):
role = Role.query.filter_by(name=name).first()
if not role:
print 'Role not found'
exit(1)
db.session.delete(role)
db.session.commit()
print 'Role %s deleted' % name
def make_shell_context():
return dict(app = app,db = db,User = User, Role = Role,Review =Review)
def make_shell_context():
return dict(app=app,db=db,User=User,Role=Role,BopGame=BopGame,OwGame=OwGame,OwGlobal=OwGlobal,OwCross=OwCross)
def make_shell_context():
return dict(app=app, db=db, User=User, Role=Role, Permission=Permission, Word=Word, Note=Note, UserWord=UserWord)
def make_shell_context():
return dict(app=app, db=db, User=User, Follow=Follow, Role=Role, Permission=Permission, Post=Post, Comment=Comment)
def test_create_role(base):
role = Role(role_dict)
role.save()
assert isinstance(role, Role)
assert role.name == role_dict['name']
assert role.description == role_dict['description']
def test_create_admin_role(base):
role = Role(role_dict_admin)
role.save()
assert isinstance(role, Role)
assert role.name == role_dict_admin['name']
assert role.description == role_dict_admin['description']