def create_app(extra_config_settings={}):
"""
Initialize Flask applicaton
"""
app.config.from_object('app.startup.settings')
app.config.update(extra_config_settings)
# Load all blueprints with their manager commands, entities and views
from app import core
mongoengine.connect(
app.config['DATABASE_NAME'],
host=app.config['DATABASE_HOST'],
port=app.config['DATABASE_PORT'],
username=app.config['DATABASE_USER'],
password=app.config['DATABASE_PASSWORD']
)
return app
python类config()的实例源码
def encode_auth_token(self, user_id):
"""
Encode the Auth token
:param user_id: User's Id
:return:
"""
try:
payload = {
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=app.config.get('AUTH_TOKEN_EXPIRY_DAYS'),
seconds=app.config.get(
'AUTH_TOKEN_EXPIRY_SECONDS')),
'iat': datetime.datetime.utcnow(),
'sub': user_id
}
return jwt.encode(
payload,
app.config['SECRET_KEY'],
algorithm='HS256'
)
except Exception as e:
return e
def create_invitation_notification(self, invitations):
target_users = list()
for each in invitations:
target_users.append({'field': 'tag', 'key': 'email', 'relation': '=', 'value': each.invitee.email})
target_users.append({'operator': 'OR'})
payload = {
'app_id': app.config['ONE_SIGNAL_SETTINGS']['API_ID'],
'filters': target_users,
'contents': {'en': '{} sent you a invitation'.format(invitations[0].inviter.first_name)}
}
req = requests.post('https://onesignal.com/api/v1/notifications',
headers=self.header,
data=json.dumps(payload))
if req.status_code == 200:
app.logger.info('Push notification sent')
def unique(fun):
global r
if not r:
r = redis.StrictRedis.from_url(flaskapp.config['REDIS_URI'])
@wraps(fun)
def wrapper(*args, **kwargs):
key = 'celery_unique_lock:{}'.format(pickle.dumps((fun.__name__, args, kwargs)))
has_lock = False
try:
if r.set(key, 1, nx=True, ex=60*5):
has_lock = True
return fun(*args, **kwargs)
finally:
if has_lock:
r.delete(key)
return wrapper
def parse_and_run(args=None):
sslBaseDir = path.join(BASE_DIR, 'ssl')
p = ArgumentParser()
p.add_argument('--bind', '-b', action='store', help='the address to bind to', default='127.0.0.1')
p.add_argument('--port', '-p', action='store', type=int, help='the port to listen on', default=8080)
p.add_argument('--debug', '-d', action='store_true', help='enable debugging (use with caution)', default=False)
p.add_argument('--ssl', '-s', action='store_true', help='enable ssl', default=False)
args = p.parse_args(args)
if args.ssl:
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ctx.load_cert_chain(path.join(sslBaseDir, 'server.crt'), path.join(sslBaseDir, 'server.key'))
app.config['SESSION_TYPE'] = 'filesystem'
app.run(host=args.bind, port=args.port, debug=args.debug, ssl_context=ctx)
else:
app.run(host=args.bind, port=args.port, debug=args.debug)
def decode_auth_token(token):
"""
Decoding the token to get the payload and then return the user Id in 'sub'
:param token: Auth Token
:return:
"""
try:
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms='HS256')
is_token_blacklisted = BlackListToken.check_blacklist(token)
if is_token_blacklisted:
return 'Token was Blacklisted, Please login In'
return payload['sub']
except jwt.ExpiredSignatureError:
return 'Signature expired, Please sign in again'
except jwt.InvalidTokenError:
return 'Invalid token. Please sign in again'
def log( cls, level, message, caller = None ):
if not cls.logger:
cls.instantiate( logLevel = app.config['LEVELOFLOG'] )
try:
if level not in logging._levelNames:
cls.log( "ERROR", 'Invalid file level \'%s\''%( level ) )
logLevel = logging._levelNames[level]
if not caller:
callers = Log.getCallers( inspect.stack() )
else:
callers = caller
message = '%s.%s - %s'%( callers[0], callers[1] , message )
cls.logger.log( logLevel, message )
except Exception, e:
print 'Unable to record the log. Error: %s'%( e )
def send_mail(recipients, subject, body, html=None, sender=None):
from app import app
msg = {
'to': check_email(recipients),
'subj': subject,
'body': body,
}
if sender:
msg['from'] = sender
if html:
msg['html'] = html
redis = StrictRedis(
app.config['SMTP_BROKER_HOST'],
app.config['SMTP_BROKER_PORT'],
app.config['SMTP_BROKER_DB']
)
redis.publish(app.config['SMTP_BROKER_CHANNEL'], json.dumps(msg))
def send_ws(message, signal=consts.WS_NONE, tags=[]):
from app import app
_tags = tags if isinstance(tags, list) else [tags]
msg = {
'message': {
'message': message,
'signal': signal,
'tags': _tags,
},
'tags': _tags,
}
redis = StrictRedis(
app.config['WS_BROKER_HOST'],
app.config['WS_BROKER_PORT'],
app.config['WS_BROKER_DB']
)
redis.publish(app.config['WS_BROKER_CHANNEL'], json.dumps(msg))
def linkFacebook():
"""Links a Facebook account to an existing Google account.
"""
form = LinkFacebookForm()
# Check if the Service-Provider is Google
if form.validate_on_submit() and g.loginWith == 'Google' and g.currentUser['facebookId'] is None:
facebookToken = FacebookModel.getTokenValidation(app.config['FACEBOOK_ACCESS_TOKEN'], form.token.data)
if facebookToken['is_valid'] and facebookToken['user_id'] == form.facebookId.data:
# Continue only if the account doesn't exist yet.
if not FacebookModel.doesUserExist(form.facebookId.data):
if FacebookModel.linkToUserId(g.currentUser['_id'], form.facebookId.data):
return json.dumps({'result':'OK'}), 200
else:
return abort(403)
else:
return abort(401)
return abort(400)
def linkGoogle():
"""Links a Google account to an existing Facebook account.
"""
form = LinkGoogleForm()
# Check if the Service-Provider is Facebook
if form.validate_on_submit() and g.loginWith == 'Facebook' and g.currentUser['googleId'] is None:
googleToken = GoogleModel.getTokenValidation(app.config['GOOGLE_CLIENT_ID'], form.token.data)
if googleToken and googleToken['sub'] == form.googleId.data:
# Continue only if the account doesn't exist yet.
if not GoogleModel.doesUserExist(form.googleId.data):
if GoogleModel.linkToUserId(g.currentUser['_id'], form.googleId.data):
return json.dumps({'result':'OK'}), 200
else:
return abort(403)
else:
return abort(401)
return abort(400)
def mergeFacebook():
"""Merges an existing Facebook account to an existing Google account.
"""
form = MergeFacebookForm()
# Check if the Service-Provider is Google
if form.validate_on_submit() and g.loginWith == 'Google' and g.currentUser['facebookId'] is None:
facebookToken = FacebookModel.getTokenValidation(app.config['FACEBOOK_ACCESS_TOKEN'], form.token.data)
if facebookToken['is_valid'] and facebookToken['user_id'] == form.facebookId.data:
# Continue only if the account does exist.
if FacebookModel.doesUserExist(form.facebookId.data):
facebookUser = FacebookModel.getUser(form.facebookId.data)
if UserModel.mergeUsers(g.currentUser['_id'], facebookUser['_id']):
return json.dumps({'result':'OK'}), 200
else:
return abort(404)
else:
return abort(401)
return abort(400)
def mergeGoogle():
"""Merges an existing Google account to an existing Facebook account.
"""
form = MergeGoogleForm()
# Check if the Service-Provider is Facebook
if form.validate_on_submit() and g.loginWith == 'Facebook' and g.currentUser['googleId'] is None:
googleToken = GoogleModel.getTokenValidation(app.config['GOOGLE_CLIENT_ID'], form.token.data)
if googleToken and googleToken['sub'] == form.googleId.data:
# Continue only if the account does exist.
if GoogleModel.doesUserExist(form.googleId.data):
googleUser = GoogleModel.getUser(form.googleId.data)
if UserModel.mergeUsers(g.currentUser['_id'], googleUser['_id']):
return json.dumps({'result':'OK'}), 200
else:
return abort(404)
else:
return abort(401)
return abort(400)
def validate(self):
if not Form.validate(self):
return False
if self.uploadfile.name not in request.files:
self.uploadfile.errors.append("Please select a valid file.")
return False
self.file_ = request.files[self.uploadfile.name]
self.filename = secure_filename(self.file_.filename)
uploads = os.listdir(os.path.join(app.root_path, app.config['UPLOAD_FOLDER']))
if self.filename in uploads:
self.uploadfile.errors.append("A file with this name already exists.")
return False
if not self.filename:
self.uploadfile.errors.append("Invalid file name.")
return False
return True
def setUp(self):
"""Setup method for spinning up a test instance of app"""
app.config['TESTING'] = True
app.config['WTF_CSRF_ENABLED'] = False
self.app = app.test_client()
self.app.testing = True
self.authorization = {
'Authorization': "Basic {user}".format(
user=base64.b64encode(b"test:asdf").decode("ascii")
)
}
self.content_type = 'application/json'
self.dummy_name = 'dummy'
self.dummy_user = json.dumps(
{'username': 'dummy', 'languages': ['testLang']}
)
self.dummy_lang = json.dumps(
{'name': 'dummy', 'users': ['No one']}
)
def get_game_description(game_name, steam_description):
'''
gets description of game from IGDB, otherwise uses steam description if
possible.
'''
url = f'{app.config["IGDB_API_URL"]}/games/?fields=*&limit=1&search={game_name}'
headers = {
'Accept': 'application/json',
'user-key': app.config['IGDB_API_KEY']
}
res = cache.get(url, headers=headers).json()[0]
# check if we actually found something
if 'summary' in res.keys():
return shorten_description(res['summary'])
# otherwise use steam description
elif len(steam_description) > 0:
return shorten_description(steam_description)
# if no steam description, just tell the user
else:
return "Sorry, we couldn't find a description for this game ??"
def course_summary():
if request.method == 'POST' and 'course_results' in request.files:
pdf_file = request.files['course_results']
if pdf_file and allowed_file(pdf_file.filename):
original_filename = secure_filename(pdf_file.filename)
unique_filename = create_unique_filename(app.config['UPLOAD_FOLDER'],
original_filename)
pdf_file.save(os.path.join(app.config['UPLOAD_FOLDER'], unique_filename))
pdf_abs_path = os.path.abspath(os.path.join(app.config['UPLOAD_FOLDER'],
unique_filename))
student_course_summary = None
try:
student_course_summary = course_statistics.get_course_statistics(
pdf_abs_path)
except course_statistics.ReadPDFException as e:
return render_template('failure.html',
title='Failure',
redirect=True,
message='It seems the file you provided cound not be read as a PDF.')
return render_template('student_summary_%s.html' % student_course_summary.language,
title='Summary',
student_summary=student_course_summary)
return redirect(url_for('index'))
def app(request):
flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
flask_app.config['TESTING'] = True
flask_app.config['WTF_CSRF_ENABLED'] = False
flask_app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
flask_app.config['SERVER_NAME'] = 'localhost'
with flask_app.app_context():
yield flask_app
def login():
form = LoginForm()
if form.validate_on_submit():
flash('Login requested for OpenID="%s", remember_me=%s, %s, %s' %
(form.openid.data, str(form.remember_me.data), form.input_data.data, form.first_layer.data))
load_model()
return redirect('/index')
return render_template('login.html',
title='Sign In',
form=form,
providers=app.config['OPENID_PROVIDERS'])
def run_gunicorn_app(host, port, debug, **settings):
"""Serve Flask application using Gunicorn.
The Flask application and respective resources and endpoints should
defined in `app.py` in this same directory.
"""
logging.basicConfig(level='DEBUG' if debug else 'INFO')
# Set a global flag that indicates that we were invoked from the
# command line interface provided server command. This is detected
# by Flask.run to make the call into a no-op. This is necessary to
# avoid ugly errors when the script that is loaded here also attempts
# to start a server.
os.environ['FLASK_RUN_FROM_CLI_SERVER'] = '1'
settings['bind'] = '{}:{}'.format(host, port)
if debug:
app.jinja_env.auto_reload = True
app.config['TEMPLATES_AUTO_RELOAD'] = True
settings.update({'loglevel': 'debug',
'reload': True,
'threads': 1,
'workers': 1,
'worker_class': 'sync'})
app.wsgi_app = DebuggedApplication(app.wsgi_app, True)
logging.info(" * Launching in Debug mode.")
logging.info(" * Serving application using a single worker.")
else:
logging.info(" * Launching in Production Mode.")
logging.info(" * Serving application with {} worker(s)."
.format(settings["workers"]))
server = GunicornApp(app, settings=settings)
server.run()
def get_auth_info():
"""
Get user's identity
Verify the interity of token from the OAuth provider,
then look up the database check if user exist or not.
If the user does not exist, create a new user instead.
:return: user instance
"""
if 'Access-Token' not in request.headers:
return None
access_token = request.headers['Access-Token']
if access_token is not None:
# For development purpose only
if access_token in app.config['TEST_TOKEN'].keys():
user = MongoUtil.find_user(app.config['TEST_TOKEN'][access_token])
else:
# Check that the Access Token is valid.
url = ('https://www.googleapis.com/oauth2/v3/tokeninfo?access_token=%s'
% access_token)
result = requests.get(url).json()
if result.get('error_description') is not None:
app.logger.debug('User {} failed to access the server'.format(access_token))
return None
user = MongoUtil.find_user(result['email'])
if user is None:
# if user does not exist, create a new user instead
app.logger.info('Create User: {}'.format(user))
first_name, last_name = get_user_profile(access_token)
user = MongoUtil.create_user(result['email'], first_name, last_name)
return user
def __init__(self):
self.header = {
"Content-Type": "application/json; charset=utf-8",
"Authorization": "Basic {}".format(app.config['ONE_SIGNAL_SETTINGS']['API_KEY'])
}
def get_twitter_for_acc(account):
consumer_key = app.config['TWITTER_CONSUMER_KEY']
consumer_secret = app.config['TWITTER_CONSUMER_SECRET']
tokens = (OAuthToken.query.with_parent(account)
.order_by(db.desc(OAuthToken.created_at)).all())
for token in tokens:
t = Twitter(
auth=OAuth(token.token, token.token_secret,
consumer_key, consumer_secret))
try:
t.account.verify_credentials()
return t
except TwitterHTTPError as e:
if e.e.code == 401:
# token revoked
if sentry:
sentry.captureMessage(
'Twitter auth revoked', extra=locals())
db.session.delete(token)
db.session.commit()
else:
raise TemporaryError(e)
except URLError as e:
raise TemporaryError(e)
return None
def url_for_version(ver):
match = version_re.match(ver)
if not match:
return app.config['REPO_URL']
return app.config['COMMIT_URL'].format(**match.groupdict())
def add_post():
form = CreatePost()
if request.method == 'POST' and form.validate_on_submit():
""" Get image info """
if (form.image.data):
image = form.image.data
""" Create new file name """
old_filename, extension = os.path.splitext(image.filename)
filename = str(int(calendar.timegm(time.gmtime()))) + extension
""" Check directory """
directory = os.path.join(app.config['UPLOAD_FOLDER'], str(g.user.get_id()))
if not os.path.exists(directory):
os.makedirs(directory)
""" Save image """
image.save(os.path.join(directory, filename))
else:
filename = None
""" Add post to DB """
new_post = Posts(user_id=g.user.get_id(),
title=form.title.data,
text=form.text.data,
pub_date=form.date.data,
img=filename,
public=form.public.data)
db.session.add(new_post)
db.session.commit()
""" Success message """
flash('Done')
return (redirect(url_for("index")))
return render_template('addpost.html',
title='Create new post',
form=form)
def edit_post():
form = CreatePost()
if request.method == 'POST':
post = Posts.query.filter_by(id=request.args['id']).first()
""" UPDATE DATA """
post.title = form.title.data
post.text = form.text.data
post.pub_date = form.date.data
""" Get image info """
if (form.image.data):
image = form.image.data
""" Create new file name """
old_filename, extension = os.path.splitext(image.filename)
filename = str(int(calendar.timegm(time.gmtime()))) + extension
""" Check directory """
directory = os.path.join(app.config['UPLOAD_FOLDER'], str(g.user.get_id()))
if not os.path.exists(directory):
os.makedirs(directory)
""" Save image """
image.save(os.path.join(directory, filename))
""" add new imagename """
post.img = filename
post.public = form.public.data
""" UPDATE DATA IN DB """
db.session.commit()
""" Succes message """
flash('Done')
return (redirect(url_for("index")))
elif request.method == 'GET':
""" Get info for post """
post = Posts.query.filter_by(id=request.args['id']).first()
if post is not None and post.user_id == g.user.get_id():
return render_template('edit.html',
post=post,
title='Update post',
form=form)
flash('Something is wrong')
return redirect(url_for("index"))
def _connect(self):
client = MongoClient(app.config['MONGODB_URI'],
connect=False)
db = client[self._name]
db.authenticate(app.config['MONGODB_USER'],
app.config['MONGODB_PWD'])
self._db = db
def setUp(self):
app.config['TESTING'] = True
self.app = app.test_client()
self.app_context = app.app_context()
self.app_context.push()
def get_paginated_items(bucket, bucket_id, page, q):
"""
Get the items from the bucket and then paginate the results.
Items can also be search when the query parameter is set.
Construct the previous and next urls.
:param q: Query parameter
:param bucket: Bucket
:param bucket_id: Bucket Id
:param page: Page number
:return:
"""
if q:
pagination = BucketItem.query.filter(BucketItem.name.like("%" + q.lower().strip() + "%")) \
.order_by(BucketItem.create_at.desc()) \
.filter_by(bucket_id=bucket_id) \
.paginate(page=page, per_page=app.config['BUCKET_AND_ITEMS_PER_PAGE'], error_out=False)
else:
pagination = bucket.items.order_by(BucketItem.create_at.desc()).paginate(page=page, per_page=app.config[
'BUCKET_AND_ITEMS_PER_PAGE'], error_out=False)
previous = None
if pagination.has_prev:
if q:
previous = url_for('items.get_items', q=q, bucket_id=bucket_id, page=page - 1, _external=True)
else:
previous = url_for('items.get_items', bucket_id=bucket_id, page=page - 1, _external=True)
nex = None
if pagination.has_next:
if q:
nex = url_for('items.get_items', q=q, bucket_id=bucket_id, page=page + 1, _external=True)
else:
nex = url_for('items.get_items', bucket_id=bucket_id, page=page + 1, _external=True)
return pagination.items, nex, pagination, previous
def paginate_buckets(user_id, page, q, user):
"""
Get a user by Id, then get hold of their buckets and also paginate the results.
There is also an option to search for a bucket name if the query param is set.
Generate previous and next pagination urls
:param q: Query parameter
:param user_id: User Id
:param user: Current User
:param page: Page number
:return: Pagination next url, previous url and the user buckets.
"""
if q:
pagination = Bucket.query.filter(Bucket.name.like("%" + q.lower().strip() + "%")).filter_by(user_id=user_id) \
.paginate(page=page, per_page=app.config['BUCKET_AND_ITEMS_PER_PAGE'], error_out=False)
else:
pagination = user.buckets.paginate(page=page, per_page=app.config['BUCKET_AND_ITEMS_PER_PAGE'],
error_out=False)
previous = None
if pagination.has_prev:
if q:
previous = url_for('bucket.bucketlist', q=q, page=page - 1, _external=True)
else:
previous = url_for('bucket.bucketlist', page=page - 1, _external=True)
nex = None
if pagination.has_next:
if q:
nex = url_for('bucket.bucketlist', q=q, page=page + 1, _external=True)
else:
nex = url_for('bucket.bucketlist', page=page + 1, _external=True)
items = pagination.items
return items, nex, pagination, previous