def create_user(user, email, password=None, role='user'):
if not password:
password = read_pwd()
data = dict(
user_name=user, email=email, password=hash_pwd(password), active=True)
errors = schema.UserSchema(
exclude=('version_id',)).validate(data, db.session)
if errors:
raise InvalidCommand('Validation Error: %s' % errors)
role = model.Role.query.filter_by(name=role).one()
u = model.User(**data)
u.roles.append(role)
db.session.add(u) # @UndefinedVariable
db.session.commit() # @UndefinedVariable
python类session()的实例源码
def cleanup(uploads_older_then=24, conversions_older_then=24 * 7, purge_empty_ebooks_dirs=False):
since = datetime.now() - timedelta(hours=float(uploads_older_then))
for upload in model.Upload.query.filter(model.Upload.created < since):
logic.delete_upload(upload)
db.session.commit()
purge_empty_dirs(settings.UPLOAD_DIR, delete_root=False)
since = datetime.now() - timedelta(hours=float(conversions_older_then))
for batch in model.ConversionBatch.query.filter(model.ConversionBatch.created < since):
logic.delete_conversion_batch(batch);
for conversion in model.Conversion.query.filter(model.Conversion.created < since):
logic.delete_conversion(conversion)
db.session.commit()
purge_empty_dirs(settings.BOOKS_CONVERTED_DIR, delete_root=False)
if purge_empty_ebooks_dirs:
purge_empty_dirs(settings.BOOKS_BASE_DIR, delete_root=False)
def api_delete():
uniqueid = request.args.get("uniqueid")
file = File.query.filter_by(unique_id=uniqueid).first()
if g.user.admin or (g.user.id == file.uploader_id):
upload_folder = myapp.config['UPLOAD_FOLDER']
try:
folder = os.path.join(upload_folder, file.unique_id)
cmpl_path = os.path.join(folder, file.name)
pubkey = file.publickey
db.session.delete(pubkey)
db.session.delete(file)
db.session.commit()
os.remove(cmpl_path)
os.rmdir(folder)
return jsonify(response=responds['FILE_DELETED'])
except Exception as e:
print(e)
return jsonify(response=responds['SOME_ERROR'])
else:
return jsonify(response=responds['FAILED_AUTHORIZATION'])
def _adduser():
if request.method == 'POST':
if g.user.admin:
form = AddUser()
if form.validate():
newuser = User(form.username.data, form.password.data, form.email.data)
if form.adminuser.data is None:
newuser.admin = False
else:
newuser.admin = form.adminuser.data
session = db.session()
try:
session.add(newuser)
session.commit()
except Exception as e:
session.rollback()
return ret_dbfail_response(e)
return jsonify(response=('New User ' + newuser.username + ' added.'))
else:
return jsonify(response=responds["FAILED_VALIDATION"])
def add_slack_game_post():
session = db.session
data = flask.request.form
token = data['token']
if token != slack_token:
return "You're not who you say you are. Wrong token {}".format(token)
winner_email, loser_email = data['text'].split(' beat ')
winner = session.query(User).filter(
User.email == winner_email,
).first()
loser = session.query(User).filter(
User.email == loser_email
).first()
if not winner:
return 'no player with email {}'.format(winner_email)
if not loser:
return 'no player with email {}'.format(loser_email)
add_game(session, winner, loser, slack_user_submitted_by=data['user_name'])
session.commit()
return "{} beat {}! {}'s score is now {} and {}'s score is now {}".format(winner.name, loser.name, winner.name,
round(winner.elo, 3), loser.name,
round(loser.elo, 3))
def start_game_post():
session = db.session
# todo make this read from json
in_progress_player_1_id = 1
in_progress_player_2_id = 2
game = Game(
in_progress_player_1_id=in_progress_player_1_id,
in_progress_player_2_id=in_progress_player_2_id
)
session.add(game)
session.commit()
return 'game id and maybe redirect to play game'
def load_user(user_id):
session = db.session
return session.query(User).get(user_id)
def dev_login(user_id):
if ENVIRONMENT == 'dev':
login_user(db.session.query(User).get(user_id))
return redirect(url_for('index'))
def index():
session = db.session
current_user = flask.g.user
active_users = get_active_users(session)
return flask.render_template('index.html',
title='Cratejoy Darts',
current_user=current_user,
active_users=active_users,
auth_url=get_google_authorization_url())
def db(request, app):
"""Create test database tables"""
_db.drop_all()
# Create the tables based on the current model
_db.create_all()
user = User.create_test_user()
TestClient.test_user = user
app.test_client_class = TestClient
app.response_class = TestResponse
_db.session.commit()
def session(request, monkeypatch):
"""Prevent the session from closing"""
# Roll back at the end of every test
request.addfinalizer(_db.session.remove)
# Prevent the session from closing (make it a no-op) and
# committing (redirect to flush() instead)
# https://alextechrants.blogspot.com/2014/01/unit-testing-sqlalchemy-apps-part-2.html
# monkeypatch.setattr(_db.session, 'commit', _db.session.flush)
monkeypatch.setattr(_db.session, 'remove', lambda: None)
def change_password(user, password=None):
try:
u = model.User.query.filter(
or_(model.User.user_name == user, model.User.email == user)).one() # @UndefinedVariable
except NoResultFound:
raise InvalidCommand('No such User')
if not password:
password = read_pwd()
u.password = hash_pwd(password)
db.session.commit() # @UndefinedVariable
def migrate_tables():
import psycopg2
print('This will migrate database to latest schema, you are advised to backup database before running this command')
if prompt_bool('Do you want to continue?'):
mdir = os.path.join(SQL_DIR, 'migration')
version_obj=model.Version.query.one_or_none()
if not version_obj:
version_obj=model.Version(version=0, version_id=1)
db.session.add(version_obj)
old_version = version_obj.version
if old_version == db_version:
print('DB is at correct version %d'% old_version)
scripts = []
for script in os.listdir(mdir):
m=re.match(r'v(\d+)\.sql', script)
if m:
version = int(m.group(1))
if version <= db_version and version > old_version:
scripts.append((version, os.path.join(mdir,script)))
scripts.sort()
connection = psycopg2.connect(database=settings.DB_NAME,
user = settings.DB_USER,
password = settings.DB_PASSWORD,
host = settings.DB_HOST,
port = settings.DB_PORT)
connection.autocommit = True
#connection = db.engine.raw_connection() # @UndefinedVariable
try:
c = connection.cursor()
for v,fname in scripts:
script = open(fname, 'rt', encoding='utf-8-sig').read()
print('Upgrading database to version %d'% v)
res = c.execute(script)
version_obj.version = v
db.session.commit()
#connection.commit()
finally:
connection.close()
def change_account(cuser, form):
if form.validate() and cuser.check_password(form.oldpassword.data):
session = db.session()
try:
if cuser.email is not form.email.data:
cuser.email = form.email.data
if cuser.username is not form.username.data:
cuser.username = form.username.data
cuser.password = form.password.data
session.commit()
return jsonify(response=responds['INFO_CHANGED'])
except IntegrityError as e:
session.rollback()
return ret_dbfail_response(e)
def deliver_file(file, request):
upload_folder = myapp.config['UPLOAD_FOLDER']
path = os.path.join(upload_folder, file.unique_id)
file.dl_count += 1
db.session.commit()
filename = file.name
return send_from_directory(path, filename, as_attachment=(request.referrer is not None))
def api_unpublish():
if request.method == 'GET':
uniqueid = request.args['uniqueid']
file = File.query.filter_by(unique_id=uniqueid).first()
if file is not None:
if g.user.admin or (g.user.id == file.uploader_id):
key = file.publickey
if key is not None:
file.publickey.public = False
db.session.commit()
url = request.host_url + "pub/dl/" + key.hash
return jsonify(response=responds['PUBLIC_KEY_UNPUBLISH'], url=url)
return jsonify(response=responds['SOME_ERROR'])
def api_publish():
if request.method == 'GET':
uniqueid = request.args['uniqueid']
file = File.query.filter_by(unique_id=uniqueid).first()
if file is not None:
if g.user.admin or (g.user.id == file.uploader_id):
key = file.publickey
if (key is not None) and (key.public is False):
file.publickey.public = True
db.session.commit()
url = request.host_url + "pub/dl/" + key.hash
return jsonify(response=responds['PUBLIC_KEY_PUBLISH'], url=url)
return jsonify(response=responds['SOME_ERROR'])
def createadminuser():
admin = User(email='{{ADMIN_MAIL}}', password='{{ADMIN_PASSWORD}}',
username='{{ADMIN_USER}}')
admin.admin = True
db.session().add(admin)
db.session().commit()
print("Admin User created.")
def test_file_publichash(self):
file = File('testfile.txt', helpers.unique_id(), 1337, 'text/html')
key = PublicKey()
key.public = True
file.publickey = key
db.session().add(file)
db.session().commit()
self.assertEqual(key, file.publickey)
def load_user(user_id):
session = db.session
return session.query(User).get(user_id)
def dev_login(user_id):
if ENVIRONMENT == 'dev':
login_user(db.session.query(User).get(user_id))
return redirect(url_for('index'))
def export_users():
session = db.session
return flask.Response(json.dumps([user.dict for user in session.query(User).all()]), mimetype=u'application/json')
def export_games():
session = db.session
return flask.Response(
json.dumps([game_dict(session, game) for game in session.query(Game).all()]),
mimetype=u'application/json')
def add_game_post():
session = db.session
data = flask.request.json
winner_id = data['winner_id']
winner = session.query(User).get(winner_id)
if not winner:
raise Exception('no player with id {}'.format(winner_id))
loser_id = data['loser_id']
loser = session.query(User).get(loser_id)
if not loser:
raise Exception('no player with id {}'.format(loser_id))
game = add_game(session, winner, loser, submitted_by_id=flask.g.user.id)
session.commit()
return flask.Response(json.dumps({
'id': game.id,
'success': True
}), mimetype=u'application/json')
def remove_game_get(game_id):
session = db.session
current_user = flask.g.user
if not current_user.admin:
return flask.Response(json.dumps({
'success': False,
'message': 'Access Denied',
'affected_player_ids': [],
'updated_game_ids': []
}), mimetype=u'application/json')
game = session.query(Game).get(game_id)
if not game:
return flask.Response(json.dumps({
'success': False,
'message': 'No Such Game',
'affected_player_ids': [],
'updated_game_ids': []
}), mimetype=u'application/json')
affected_player_ids, updated_game_ids = remove_game(session, game)
return flask.Response(json.dumps({
'affected_player_ids': list(affected_player_ids),
'updated_game_ids': list(updated_game_ids)
}), mimetype=u'application/json')
def callback():
current_user = flask.g.user
session = db.session
# Redirect user to home page if already logged in.
if current_user is not None and current_user.is_authenticated:
return redirect(url_for('index'))
if 'error' in request.args:
if request.args.get('error') == 'access_denied':
return 'You denied access.'
return 'Error encountered.'
if 'code' not in request.args and 'state' not in request.args:
return redirect(url_for('login'))
else:
# Execution reaches here when user has
# successfully authenticated our app.
google = get_google_auth(state=flask_session['oauth_state'])
try:
token = google.fetch_token(
Auth.TOKEN_URI,
client_secret=Auth.CLIENT_SECRET,
authorization_response=request.url.replace('http://', 'https://'),
)
except HTTPError:
return 'HTTPError occurred.'
google = get_google_auth(token=token)
# todo: cool stuff in here
resp = google.get(Auth.USER_INFO)
if resp.status_code == 200:
user_data = resp.json()
if user_data.get('hd') != 'cratejoy.com':
return 'Only cratejoy.com emails allowed'
email = user_data['email']
user = User.query.filter_by(email=email).first()
if user is None:
user = User(
name=user_data['name'] or user_data['email'].split('@')[0].capitalize(),
email=email,
avatar=user_data['picture']
)
user.tokens = json.dumps(token)
session.add(user)
session.commit()
login_user(user)
return redirect(url_for('index'))
return 'Could not fetch your information.'
def _upload():
if request.method == 'POST':
file = request.files['files[]']
# get filename and folders
file_name = secure_filename(file.filename)
directory = str(unique_id())
upload_folder = myapp.config['UPLOAD_FOLDER']
if file.filename == '':
return redirect(request.url)
if file:
#and allowed_file(file.filename)
save_dir = os.path.join(upload_folder, directory)
if not os.path.exists(save_dir):
os.makedirs(save_dir)
cmpl_path = os.path.join(save_dir, file_name)
file.save(cmpl_path)
size = os.stat(cmpl_path).st_size
# create our file from the model and add it to the database
dbfile = File(file_name, directory, size, file.mimetype)
g.user.uploads.append(dbfile)
db.session().add(dbfile)
db.session().commit()
if "image" in dbfile.mimetype:
get_thumbnail(cmpl_path, "100")
thumbnail_url = request.host_url + 'thumbs/' + directory
else:
thumbnail_url = ""
url = request.host_url + 'uploads/' + directory
delete_url = url
delete_type = "DELETE"
file = {"name": file_name, "url": url, "thumbnailUrl": thumbnail_url, "deleteUrl": delete_url,
"deleteType": delete_type, "uid": directory}
return jsonify(files=[file])
else:
return jsonify(files=[{"name": file_name, "error": responds['FILETYPE_NOT_ALLOWED']}])
def callback():
current_user = flask.g.user
session = db.session
# Redirect user to home page if already logged in.
if current_user is not None and current_user.is_authenticated:
return redirect(url_for('index'))
if 'error' in request.args:
if request.args.get('error') == 'access_denied':
return 'You are denied access.'
return 'Error encountered.'
if 'code' not in request.args and 'state' not in request.args:
return redirect(url_for('login'))
else:
# Execution reaches here when user has
# successfully authenticated our app.
google = get_google_auth(state=flask_session['oauth_state'])
try:
token = google.fetch_token(
Auth.TOKEN_URI,
client_secret=Auth.CLIENT_SECRET,
authorization_response=request.url.replace('http://', 'https://'),
)
except HTTPError:
return 'HTTPError occurred.'
google = get_google_auth(token=token)
# todo: cool stuff in here
resp = google.get(Auth.USER_INFO)
if resp.status_code == 200:
user_data = resp.json()
email = user_data['email']
user = User.query.filter_by(email=email).first()
if user is None:
user = User(
name=user_data['name'] or user_data['email'].split('@')[0].capitalize(),
email=email,
avatar=user_data['picture']
)
user.tokens = json.dumps(token)
session.add(user)
session.commit()
login_user(user)
return redirect(url_for('index'))
return 'Could not fetch your information.'