def get_workspace(ws_id: int) -> Workspace:
"""
Returns the workspace model of the given workspace
:param ws_id: The workspace ID
:return: The corresponding workspace model
"""
workspace = db_session().query(Workspace).filter(Workspace.id == ws_id).first()
if not workspace:
raise NotFound("Could not find workspace with id {}".format(ws_id))
return workspace
python类query()的实例源码
def _get_stats_users(session):
"""return total registered users"""
try:
User = models.User
num_users = session.query(User).count()
except Exception as e:
msg = "unkown error when handling request: %s" % e
raise Exception(e)
stats = {
'total_users': num_users,
}
return stats
def _get_stats_jobs(session):
jobs_stats = {
JobState.PENDING: 0,
JobState.SCHEDULING: 0,
JobState.RUNNING: 0,
JobState.SUCCESS: 0,
JobState.ERROR: 0,
JobState.FAILED: 0,
'TOTAL': 0,
}
try:
Job = models.Job
result = session.query(Job.state, func.count(Job.serial_id)). \
group_by(Job.state)
for _type, count in result:
jobs_stats['TOTAL'] += count
if _type not in jobs_stats:
LOG.warning("unkown job type: %s" % _type)
continue
jobs_stats[_type] = count
except Exception as e:
LOG.error("handling request: %s" % e)
raise(e)
return jobs_stats
def _get_stats_tasks(session):
tasks_stats = {
TaskState.PENDING: 0,
TaskState.RUNNING: 0,
TaskState.SUCCESS: 0,
TaskState.ERROR: 0,
TaskState.FAILED: 0,
'TOTAL': 0,
}
try:
Task = models.Task
result = session.query(Task.state, func.count(Task.serial_id)). \
group_by(Task.state)
for _type, count in result:
tasks_stats['TOTAL'] += count
if _type not in tasks_stats:
LOG.warning("unkown task type: %s" % _type)
continue
tasks_stats[_type] = count
except Exception as e:
LOG.error("handling request: %s" % e)
return tasks_stats
def get_stats_files(session):
total_files = 0 # number of files
total_size_bytes = 0 # bytes
total_duration_seconds = 0 # seconds
try:
File = models.File
result = session.query(File.size, File.duration).all()
for size, duration in result:
try:
size = int(size)
except Exception as e: # ignore some ill-records
LOG.warning("%s" % e)
size = 0
try:
duration = float(duration)
except Exception as e: # ignore some ill-records
LOG.warning("%s" % e)
duration = 0
total_files += 1
total_size_bytes += size
total_duration_seconds += duration
except Exception as e:
LOG.error("handling request: %s" % e)
stats = {
'total_files': total_files,
'total_size_bytes': total_size_bytes,
'total_duration_seconds': total_duration_seconds,
}
return stats
def dashboard():
session = request.db_session
context = {}
context['repositories'] = session.query(Repository).order_by(Repository.name).all()
context['user'] = request.user
context['public_key'] = get_public_key()
context['all_users'] = session.query(User).all()
return render_template('dashboard.html', **context)
def edit_repo(repo_id):
session = request.db_session
if not request.user.can_manage:
return abort(403)
if repo_id is not None:
repo = session.query(Repository).get(repo_id)
else:
repo = None
errors = []
if request.method == 'POST':
secret = request.form.get('repo_secret', '')
clone_url = request.form.get('repo_clone_url', '')
repo_name = request.form.get('repo_name', '').strip()
ref_whitelist = request.form.get('repo_ref_whitelist', '')
if len(repo_name) < 3 or repo_name.count('/') != 1:
errors.append('Invalid repository name. Format must be owner/repo')
if not clone_url:
errors.append('No clone URL specified')
other = session.query(Repository).filter_by(name=repo_name).one_or_none()
if (other and not repo) or (other and other.id != repo.id):
errors.append('Repository {!r} already exists'.format(repo_name))
if not errors:
if not repo:
repo = Repository(name=repo_name, clone_url=clone_url, secret=secret,
build_count=0, ref_whitelist=ref_whitelist)
else:
repo.name = repo_name
repo.clone_url = clone_url
repo.secret = secret
repo.ref_whitelist = ref_whitelist
session.add(repo)
session.commit()
return redirect(repo.url())
return render_template('edit_repo.html', user=request.user, repo=repo, errors=errors)
def download(build_id, data):
if data not in (Build.Data_Artifact, Build.Data_Log):
return abort(404)
build = request.db_session.query(Build).get(build_id)
if not build:
return abort(404)
if not build.check_download_permission(data, request.user):
return abort(403)
if not build.exists(data):
return abort(404)
mime = 'application/zip' if data == Build.Data_Artifact else 'text/plain'
return utils.stream_file(build.path(data), mime=mime)
def delete():
repo_id = request.args.get('repo_id', '')
build_id = request.args.get('build_id', '')
user_id = request.args.get('user_id', '')
session = request.db_session
delete_target = None
if build_id:
delete_target = session.query(Build).get(build_id)
if not request.user.can_manage:
return abort(403)
elif repo_id:
delete_target = session.query(Repository).get(repo_id)
if not request.user.can_manage:
return abort(403)
elif user_id:
delete_target = session.query(User).get(user_id)
if delete_target and delete_target.id != request.user.id and not request.user.can_manage:
return abort(403)
if not delete_target:
return abort(404)
try:
session.delete(delete_target)
session.commit()
except Build.CanNotDelete as exc:
session.rollback()
utils.flash(str(exc))
referer = request.headers.get('Referer', url_for('dashboard'))
return redirect(referer)
utils.flash('{} deleted'.format(type(delete_target).__name__))
return redirect(url_for('dashboard'))
def validate_email(email, code):
"""
email must only one and is a email
:param email: email
:return: {0:success,1:double email,2:illegal}
"""
auth = Auth()
auth.verification_code = code
auth.email = email
if not_email(email):
return "illegal"
elif db.session.query(User).filter_by(email=email).first():
return "double"
else:
auth = db.session.query(Auth).filter_by(email=email).first()
if auth.verification_code == code:
db.session.delete(auth)
return "success"
else:
return "verification error"
# print email
# # user = session.query(User).filter_by(username='abc').first()
# # session.delete(user)
# if Auth.query.filter_by(email=email).first():
# db.session.delete(auth)
# db.session.commit()
# return "success"
# else:
# return "verification error"
def query_user_info(email, password):
"""
login in
:param email: login email
:param password: login password
:return: {success, fail}
"""
login_user = db.session.query(User).filter_by(email=email).first()
if login_user:
if login_user.verify_password(password):
return {"login_result": "success"}
return {"login_result": "fail"}
def generate_verification_code(email):
if User.query.filter_by(email=email).first():
return "double"
code = str(random.randint(0, 99999999)).zfill(8)
auth = Auth()
auth.email = email
auth.verification_code = code
if Auth.query.filter_by(email=email).first():
db.session.query(Auth).filter(Auth.email == email).update({"verification_code":code})
else:
db.session.add(auth)
db.session.commit()
send_verification_code_email(email, code)
return "success"
def update_count(winner, loser):
Session = sessionmaker()
engine = create_engine('sqlite:///' + os.path.join(basedir, 'data-dev.sqlite'))
Session.configure(bind=engine)
session = Session()
curr_usr = session.query(Post).get(winner)
loser_guy = session.query(Post).get(loser)
session.query(Post).filter_by(id=winner).update({'total_votes': curr_usr.total_votes+1})
session.commit()
flash("{} has {} votes and {} has {} votes".format(curr_usr.description, curr_usr.total_votes, loser_guy.description, loser_guy.total_votes))
return redirect(url_for('main.index'))
def signup():
if request.method == 'GET':
params = {}
return render_template('signup.html', params=params)
else:
print(request.form.items())
params = {}
params['f_name'] = request.form['f_name']
params['l_name'] = request.form['l_name']
params['username'] = request.form['username'].strip()
password = request.form['password']
verify = request.form['verify']
params['email'] = request.form['email']
if (not params['f_name'] or not params['l_name'] or not
params['username']):
params['message'] = 'Please enter your first name, last name, ' \
'and a username.'
return render_template('signup.html',
params=params)
userQuery = session.query(User).filter(
User.username == params['username']).first()
if userQuery:
params['message'] = 'That username is already in use. ' \
'Please choose a different one.'
return render_template('signup.html', params=params)
if not password:
params['message'] = 'Please enter a valid password'
return render_template('signup.html', params=params)
if password != verify:
params['message'] = 'Your passwords did not match. ' \
'Please try again.'
return render_template('signup.html', params=params)
if not params['email']:
params['message'] = 'Please enter a valid email address.'
return render_template('signup.html', params=params)
salt = make_salt()
hashed_password = hashlib.sha512(password + salt).hexdigest()
user = User(f_name=params['f_name'],
l_name=params['l_name'],
email=params['email'],
username=params['username'],
password=hashed_password,
salt=salt,
admin=False)
session.add(user)
session.commit()
if(user.id == 1):
user.admin = True
session.commit()
return redirect(url_for('login'))
def get_fac_data():
session = Session()
filter_ = json.loads(request.args.get('filter', '{}'))
query = session.query(Facility)
if filter_:
if filter_.get('group', None) is not None:
query = query.filter(Facility.groups.any(Group.name.like(filter_['group'])))
if filter_.get('latMax', None) is not None:
query = query.filter(Facility.lat_min < float(filter_['latMax']))
if filter_.get('latMin', None) is not None:
query = query.filter(Facility.lat_max > float(filter_['latMin']))
if filter_.get('lonMax', None) is not None:
query = query.filter(Facility.lon_min < float(filter_['lonMax']))
if filter_.get('lonMin', None) is not None:
query = query.filter(Facility.lon_max > float(filter_['lonMin']))
if filter_.get('facility_type', None) is not None:
query = query.filter(Facility.facility_type.like(filter_['facility_type']))
if filter_.get('keywords', None) is not None:
keys_raw = filter_['keywords'].lower().split(',')
keys = [key.strip(' ') for key in keys_raw]
key_filter = [or_(literal(key).contains(func.lower(Facility.name)),
func.lower(Facility.name).contains(key),
func.lower(Facility.description).contains(key)) for key in keys]
query = query.filter(and_(*key_filter))
if filter_.get('count', None) is None:
facs = query.limit(50).all()
else:
all_facs = query.all()
if len(all_facs) > filter_['count'] + 50:
facs = all_facs[filter_['count']:filter_['count'] + 50]
else:
facs = all_facs[filter_['count']:]
dicts = []
for fac in facs:
dict_ = fac.__dict__.copy()
dict_.pop('_sa_instance_state', None)
dicts += [dict_]
Session.remove()
return jsonify(success=True, data=dicts)
def get_users():
if request.method == 'GET':
session = Session()
filter_ = literal_eval(request.args.get('filter', 'None'))
if filter_:
if filter_.get('group', None):
users = (session.query(User)
.filter(User.shakecast_id > request.args.get('last_id', 0))
.filter(User.groups.any(Group.name.like(filter_['group'])))
.limit(50)
.all())
else:
users = (session.query(User)
.filter(User.shakecast_id > request.args.get('last_id', 0))
.limit(50)
.all())
else:
users = session.query(User).filter(User.shakecast_id > request.args.get('last_id', 0)).limit(50).all()
user_dicts = []
for user in users:
user_dict = user.__dict__.copy()
user_dict.pop('_sa_instance_state', None)
user_dict.pop('password', None)
user_dicts += [user_dict]
user_json = json.dumps(user_dicts, cls=AlchemyEncoder)
Session.remove()
else:
users = request.json.get('users', 'null')
for user in users:
if user['password'] == '':
user.pop('password')
if users is not None:
ui.send("{'import_user_dicts': {'func': f.import_user_dicts, \
'args_in': {'users': %s, '_user': %s}, \
'db_use': True, 'loop': False}}" % (str(users),
current_user.shakecast_id))
user_json = json.dumps(users)
return user_json
def clone(ws_id: int, url: str, name: str = None):
"""
Clones a repository by url into given workspace
:param name: Optional name of the local repository name, otherwise the remote name is taken
:param user_data: Session data to get access token for GitHub
:param ws_id: Destination workspace to clone
:param url: URL of the source repository
:return: True if successful, otherwise NameConflict is thrown
"""
workspace = get_workspace(ws_id)
url_decode = parse.urlparse(url)
if is_github(url_decode.netloc):
# Take the suffix of url as first name candidate
github_project_name = name
if github_project_name is None:
github_project_name = _repo_name_from_url(url_decode)
dbsession = db_session()
pj = dbsession.query(Project).join(Workspace)\
.filter(Workspace.id == workspace.id).filter(
Project.name == github_project_name).first()
dbsession.commit()
# Error when the project name in given workspace already exists
if pj is not None:
raise NameConflict('A project with name {} already exists'.format(github_project_name))
project_target_path = os.path.join(workspace.path, PROJECT_REL_PATH, github_project_name)
logger.info('Cloning from github repo...')
# If url in GitHub domain, access by token
url_with_token = _get_repo_url(url_decode)
out, err, exitcode = git_command(['clone', url_with_token, project_target_path])
if exitcode is 0:
setup_git_user_email(project_target_path)
# Check if the project is a valid son project
check_son_validity(project_target_path)
# Create project and scan it.
dbsession = db_session()
try:
pj = Project(github_project_name, github_project_name, workspace)
pj.repo_url = url
sync_project_descriptor(pj)
dbsession.add(pj)
scan_project_dir(project_target_path, pj)
dbsession.commit()
# Check if the project is valid
result = create_info_dict(out=out)
result["id"] = pj.id
return result
except:
dbsession.rollback()
shutil.rmtree(project_target_path)
raise Exception("Scan project failed")
else:
return create_info_dict(err=err, exitcode=exitcode)
raise NotImplemented("Cloning from other is not implemented yet. Only github is supported for now.")
def edit_user(user_id):
session = request.db_session
cuser = None
if user_id is not None:
cuser = session.query(User).get(user_id)
if not cuser:
return abort(404)
if cuser.id != request.user.id and not request.user.can_manage:
return abort(403)
elif not request.user.can_manage:
return abort(403)
errors = []
if request.method == 'POST':
if not cuser and not request.user.can_manage:
return abort(403)
user_name = request.form.get('user_name')
password = request.form.get('user_password')
can_manage = request.form.get('user_can_manage') == 'on'
can_view_buildlogs = request.form.get('user_can_view_buildlogs') == 'on'
can_download_artifacts = request.form.get('user_can_download_artifacts') == 'on'
if not cuser: # Create a new user
assert request.user.can_manage
other = session.query(User).filter_by(name=user_name).one_or_none()
if other:
errors.append('User {!r} already exists'.format(user_name))
else:
cuser = User(name=user_name, passhash=utils.hash_pw(password),
can_manage=can_manage, can_view_buildlogs=can_view_buildlogs,
can_download_artifacts=can_download_artifacts)
else: # Update user settings
if password:
cuser.passhash = utils.hash_pw(password)
# The user can only update privileges if he has managing privileges.
if request.user.can_manage:
cuser.can_manage = can_manage
cuser.can_view_buildlogs = can_view_buildlogs
cuser.can_download_artifacts = can_download_artifacts
if not errors:
session.add(cuser)
session.commit()
return redirect(cuser.url())
return render_template('edit_user.html', user=request.user, cuser=cuser,
errors=errors)