def setUp(self):
class TestConfig(object):
SQLALCHEMY_TRACK_MODIFICATIONS = True
SQLALCHEMY_DATABASE_URI = 'sqlite://'
DEBUG = True
TESTING = True
MSEARCH_INDEX_NAME = mkdtemp()
# MSEARCH_BACKEND = 'whoosh'
self.app = Flask(__name__)
self.app.config.from_object(TestConfig())
# we need this instance to be:
# a) global for all objects we share and
# b) fresh for every test run
global db
db = SQLAlchemy(self.app)
self.search = Search(self.app, db=db)
self.Post = None
python类SQLAlchemy()的实例源码
def github_callback_get_account(db, gh_api):
"""Processing GitHub callback action
:param db: Database for storing GitHub user info
:type db: ``flask_sqlalchemy.SQLAlchemy``
:param gh_api: GitHub API client ready for the communication
:type gh_api: ``repocribro.github.GitHubAPI``
:return: User account and flag if it's new one
:rtype: tuple of ``repocribro.models.UserAccount``, bool
"""
user_data = gh_api.get('/user').data
gh_user = db.session.query(User).filter(
User.github_id == user_data['id']
).first()
is_new = False
if gh_user is None:
user_account = UserAccount()
db.session.add(user_account)
gh_user = User.create_from_dict(user_data, user_account)
db.session.add(gh_user)
db.session.commit()
is_new = True
return gh_user.user_account, is_new
def connect(app):
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.ext.declarative import declarative_base
from generic_social_network.app.models.tables.my_model import MyModel
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + _db_file_location()
class MySQLAlchemy(SQLAlchemy):
def make_declarative_base(self):
from flask.ext.sqlalchemy import _BoundDeclarativeMeta, _QueryProperty
base = declarative_base(cls=MyModel, name='MyModel', metaclass=_BoundDeclarativeMeta)
base.query = _QueryProperty(self)
return base
db = MySQLAlchemy(app)
# db.engine.echo = True
return db
def __init__(self, app, db, table, key_prefix):
if db is None:
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
self.db = db
self.key_prefix = key_prefix
class Session(self.db.Model):
__tablename__ = table
id = self.db.Column(self.db.Integer, primary_key=True)
session_id = self.db.Column(self.db.String(256), unique=True)
data = self.db.Column(self.db.LargeBinary)
expiry = self.db.Column(self.db.DateTime)
def __init__(self, session_id, data, expiry):
self.session_id = session_id
self.data = data
self.expiry = expiry
def __repr__(self):
return '<Session data %s>' % self.data
self.db.create_all()
self.sql_session_model = Session
def _find_db_ext():
"""Find the current ORM Extension this App is using.
After we discover and initialize the proper ORM Extension we need the
``db`` proxy to resolve to it, which is exactly what this method does here.
Does this by simply inspecting a global, but that could be done better.
Returns:
flask_utils.FlaskDB|flask_sqlalchemy.SQLAlchemy: The correct and
current ORM Extension in use for this App.
"""
if _SELECTED_BACKEND is MISSING:
# @TODO: Do like Flask does with the context error and have a single
# leading line that sort of explains the issue and is easily searched?
raise RuntimeError("You have attempted to use the Fleaker DB proxy "
"before it was initialized! Please ensure you are "
"in the right context or push it yourself! Please "
"see the documentation for more information!")
return _SELECTED_BACKEND
def create_app():
app = Flask(__name__)
# SQLAlchemy config
app.config['SECRET_KEY'] = '123456790'
app.config['SQLALCHEMY_DATABASE_URI'] = \
'postgresql+psycopg2://localhost:5432/test'
# Flask-ImageAlchemy config
app.config['AWS_ACCESS_KEY_ID'] = os.environ.get('AWS_ACCESS_KEY_ID')
app.config['AWS_SECRET_ACCESS_KEY'] = os.environ.get('AWS_SECRET_ACCESS_KEY')
app.config['AWS_REGION_NAME'] = os.environ.get('AWS_REGION_NAME', 'eu-central-1')
app.config['S3_BUCKET_NAME'] = os.environ.get('AWS_REGION_NAME', 'haraka-local')
# init extensions
db.init_app(app)
s3_storage.init_app(app)
return app
# create app
def __init__(self, flask_app, config):
self._flask_app = flask_app
self.config = config
self._plugin_manager = None
self._flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///%s' % os.path.join(prism.settings.CONFIG_FOLDER, 'prism.db')
self._flask_app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
self._database = SQLAlchemy(self._flask_app)
def test_unversioned_db_fixture(unversioned_db):
"""Test unversioned SQLAlchemy object.
"""
assert unversioned_db.__class__ == flask_sqlalchemy.SQLAlchemy
assert unversioned_db.session.__class__ == sqlalchemy.orm.scoping.scoped_session
assert (unversioned_db.session.session_factory().__class__.__name__ ==
flask_sqlalchemy.SignallingSession.__name__)
def unversioned_db(app, request):
"""An unversioned db fixture.
"""
db = flask_sqlalchemy.SQLAlchemy(app)
yield db
def versions(self, before=None, after=None, return_query=False):
"""Fetch the history of the given object from its history table.
:param before: Return changes only _before_ the provided ``DateTime``.
:param before: Return changes only _after_ the provided ``DateTime``.
:param return_query: Return a SQLAlchemy query instead of a list of models.
:return: List of history models for the given object (or a query object).
"""
# get the primary keys for this table
prim_keys = [k.key for k in self.__history_mapper__.primary_key if k.key != 'version']
# Find all previous versions that have the same primary keys as myself
query = self.__history_mapper__.class_.query.filter_by(
**{k: getattr(self, k) for k in prim_keys}
)
# Filter additionally by date as needed
if before is not None:
query = query.filter(self.__history_mapper__.class_.changed <= before)
if after is not None:
query = query.filter(self.__history_mapper__.class_.changed >= after)
# Order by the version
query = query.order_by(self.__history_mapper__.class_.version)
if return_query:
return query
else:
return query.all()
def init_app():
# So that we can use Flask-SQLAlchemy, we'll make a Flask app.
from flask import Flask
app = Flask(__name__)
connect_to_db(app)
print "Connected to DB."
def sqlalchemy_db(app):
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
return db
def get_all_latest_submissions(self) -> '_MyQuery[Work]':
"""Get a list of all the latest submissions (:class:`Work`) by each
:class:`User` who has submitted at least one work for this assignment.
:returns: The latest submissions.
"""
# get_from_latest_submissions uses SQLAlchemy magic that MyPy cannot
# encode.
return self.get_from_latest_submissions(t.cast(Work, Work))
def configure_app(config_object, app):
app.config.from_object(config_object)
db = SQLAlchemy(app)
def new():
if request.method == 'POST':
# The request is POST with some data, get POST data and validate it.
# The form data is available in request.form dictionary.
# Check if all the fields are entered. If not, raise an error
if not request.form['name'] or not request.form['email'] or not request.form['comment']:
flash('Please enter all the fields', 'error')
else:
# The data is valid. So create a new 'Comments' object
# to save to the database
comment = Comments(request.form['name'],
request.form['email'],
request.form['comment'])
# Add it to the SQLAlchemy session and commit it to
# save it to the database
db.session.add(comment)
db.session.commit()
# Flash a success message
flash('Comment was successfully submitted')
# Redirect to the view showing all the comments
return redirect(url_for('show_all'))
# Render the form template if the request is a GET request or
# the form validation failed
return render_template('new.html')
# This is the code that gets executed when the current python file is
# executed.
def _initialize():
import json
import os.path
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
config_path = os.environ.get('LIBRARIAN_CONFIG_PATH', 'server-config.json')
with open(config_path) as f:
config = json.load(f)
if 'SECRET_KEY' not in config:
print('cannot start server: must define the Flask "secret key" as the item '
'"SECRET_KEY" in "server-config.json"', file=sys.stderr)
sys.exit(1)
# TODO: configurable logging parameters will likely be helpful. We use UTC
# for timestamps using standard ISO-8601 formatting. The Python docs claim
# that 8601 is the default format but this does not appear to be true.
loglevel_cfg = config.get('log_level', 'info')
loglevel = _log_level_names.get(loglevel_cfg)
warn_loglevel = (loglevel is None)
if warn_loglevel:
loglevel = logging.INFO
logging.basicConfig(
level=loglevel,
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%dT%H:%M:%SZ'
)
import time
logging.getLogger('').handlers[0].formatter.converter = time.gmtime
logger = logging.getLogger('librarian')
if warn_loglevel:
logger.warn('unrecognized value %r for "log_level" config item', loglevel_cfg)
tf = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')
app = Flask('librarian', template_folder=tf)
app.config.update(config)
db = SQLAlchemy(app)
return logger, app, db
def __init__(self, app, db, table, key_prefix, use_signer=False,
permanent=True):
if db is None:
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
self.db = db
self.key_prefix = key_prefix
self.use_signer = use_signer
self.permanent = permanent
class Session(self.db.Model):
__tablename__ = table
id = self.db.Column(self.db.Integer, primary_key=True)
session_id = self.db.Column(self.db.String(255), unique=True)
data = self.db.Column(self.db.Text)
expiry = self.db.Column(self.db.DateTime)
def __init__(self, session_id, data, expiry):
self.session_id = session_id
self.data = data
self.expiry = expiry
def __repr__(self):
return '<Session data {0!s}>'.format(self.data)
self.sql_session_model = Session
def from_sql(row):
"""
Translates a SQLAlchemy model instance into a dictionary
"""
data = row.__dict__.copy()
data['id'] = row.id
data.pop('_sa_instance_state')
return data
def run_server():
global app
app.run(host='0.0.0.0', port=80, threaded=True,debug=False, use_reloader=False)
# SQLAlchemy Models
def connect_db(**kwargs):
"""Link the app database to a postgres db with the supplied info."""
db_uri = 'postgresql://{0}:{1}@{2}:{3}/{4}'.format(kwargs.get('username'),
kwargs.get('password'),
kwargs.get('host'),
kwargs.get('port'),
kwargs.get('database'))
app = kwargs.get('app')
db = SQLAlchemy(app)
app.config['SQLALCHEMY_DATABASE_URI'] = db_uri
return db
def test_model(self):
"""
Test model
:return: void
"""
test_db = DatabaseManager.get_sql_alchemy_instance()
self.assert_is_instance(db, LocalProxy)
self.assert_is_instance(db._get_current_object(), SQLAlchemy)
self.assert_equal_deep(test_db, db._get_current_object())
self.assert_equal_deep(sqlalchemy_mapper, mapper)
self.assert_equal_deep(sqlalchemy_relationship, relationship)
self.assert_equal_deep(sqlalchemy_backref, backref)
def get_sql_alchemy_instance():
"""
Get sql alchemy instance
:return: SQLAlchemy
:rtype: flask_sqlalchemy.SQLAlchemy
"""
if DatabaseManager._sql_alchemy_instance is None:
DatabaseManager._sql_alchemy_instance = SQLAlchemy()
return DatabaseManager._sql_alchemy_instance
def _create_my_sql(self, config):
"""
Create my sql
:param config: The config
:return: SQLAlchemy Engine
:rtype: sqlalchemy.engine.base.Engine
"""
db = DatabaseManager.get_sql_alchemy_instance()
bind = config['name']
if self._instances_config and self._instances_config[0]['name'] == config['name']:
bind = None
return db.get_engine(bind=bind)
def _create_postgre_sql(self, config):
"""
Create PostgreSQL
:param config: The config
:return: SQLAlchemy Engine
:rtype: sqlalchemy.engine.base.Engine
"""
db = DatabaseManager.get_sql_alchemy_instance()
bind = config['name']
if self._instances_config and self._instances_config[0]['name'] == config['name']:
bind = None
return db.get_engine(bind=bind)
def _create_sqlite(self, config):
"""
Create SQLite
:param config: The config
:return: SQLAlchemy Engine
:rtype: sqlalchemy.engine.base.Engine
"""
db = DatabaseManager.get_sql_alchemy_instance()
bind = config['name']
if self._instances_config and self._instances_config[0]['name'] == config['name']:
bind = None
return db.get_engine(bind=bind)
def setupRestApi():
app = Flask(__name__)
app.config.update(
DEBUG=True,
SQLALCHEMY_DATABASE_URI='sqlite:///dmb.db',
SQLALCHEMY_TRACK_MODIFICATIONS=False
)
db = SQLAlchemy(app)
class MenuItem(db.Model):
id = db.Column(db.Integer, primary_key=True)
menuItemName = db.Column(db.Text)
price = db.Column(db.Float)
class ItemAddOn(db.Model):
id = db.Column(db.Integer, primary_key=True)
menuItemId = db.Column(db.Integer, db.ForeignKey('menu_item.id'))
addOnName = db.Column(db.Text)
price = db.Column(db.Integer)
db.create_all()
manager = APIManager(app, flask_sqlalchemy_db=db)
methods = ['GET', 'POST', 'DELETE', 'PATCH']
url_prefix = '/dmb'
manager.create_api(MenuItem, methods=methods, url_prefix=url_prefix)
manager.create_api(ItemAddOn, methods=methods, url_prefix=url_prefix)
app.run()
def init_app():
# So that we can use Flask-SQLAlchemy, we'll make a Flask app
from flask import Flask
app = Flask(__name__)
connect_to_db(app)
print "Connected to DB."
def _discover_ideal_backend(orm_backend):
"""Auto-discover the ideal backend based on what is installed.
Right now, handles discovery of:
* PeeWee
* SQLAlchemy
Args:
orm_backend (str): The ``orm_backend`` value that was passed to the
``create_app`` function. That is, the ORM Backend the User
indicated they wanted to use.
Returns:
str|fleaker.missing.MissingSentinel: Returns a string for the ideal
backend if it found one, or :obj:`fleaker.MISSING` if we couldn't
find one.
Raises:
RuntimeError: Raised if no user provided ORM Backend is given and BOTH
PeeWee and SQLAlchemy are installed.
"""
if orm_backend:
return orm_backend
if peewee is not MISSING and sqlalchemy is not MISSING:
raise RuntimeError('Both PeeWee and SQLAlchemy detected as installed, '
'but no explicit backend provided! Please specify '
'one!')
if peewee is not MISSING:
return _PEEWEE_BACKEND
elif sqlalchemy is not MISSING:
return _SQLALCHEMY_BACKEND
else:
return MISSING
def _regenerate_database(db: SQLAlchemy):
""" For testing purposes only. """
db.drop_all()
db.create_all()
db.session.commit()
def create_app(taskflow_instance, connection_string=None, secret_key=None):
app = flask.Flask(__name__)
app.config['DEBUG'] = os.getenv('DEBUG', False)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_DATABASE_URI'] = connection_string or os.getenv('SQL_ALCHEMY_CONNECTION')
app.config['SESSION_COOKIE_NAME'] = 'taskflowsession'
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['PERMANENT_SESSION_LIFETIME'] = 43200
app.config['SECRET_KEY'] = secret_key or os.getenv('FLASK_SESSION_SECRET_KEY')
db = SQLAlchemy(metadata=metadata, model_class=BaseModel)
db.init_app(app)
api = Api(app)
CORS(app, supports_credentials=True)
login_manager = LoginManager()
login_manager.init_app(app)
@login_manager.user_loader
def load_user(user_id):
return db.session.query(User).filter(User.id == user_id).first()
def apply_attrs(class_def, attrs):
for key, value in attrs.items():
setattr(class_def, key, value)
return class_def
attrs = {
'session': db.session,
'taskflow': taskflow_instance
}
with app.app_context():
api.add_resource(apply_attrs(resources.LocalSessionResource, attrs), '/v1/session')
api.add_resource(apply_attrs(resources.WorkflowListResource, attrs), '/v1/workflows')
api.add_resource(apply_attrs(resources.WorkflowResource, attrs), '/v1/workflows/<workflow_name>')
api.add_resource(apply_attrs(resources.TaskListResource, attrs), '/v1/tasks')
api.add_resource(apply_attrs(resources.TaskResource, attrs), '/v1/tasks/<task_name>')
api.add_resource(apply_attrs(resources.WorkflowInstanceListResource, attrs), '/v1/workflow-instances')
api.add_resource(apply_attrs(resources.WorkflowInstanceResource, attrs), '/v1/workflow-instances/<int:instance_id>')
api.add_resource(apply_attrs(resources.RecurringWorkflowLastestResource, attrs), '/v1/workflow-instances/recurring-latest')
api.add_resource(apply_attrs(resources.TaskInstanceListResource, attrs), '/v1/task-instances')
api.add_resource(apply_attrs(resources.TaskInstanceResource, attrs), '/v1/task-instances/<int:instance_id>')
api.add_resource(apply_attrs(resources.RecurringTaskLastestResource, attrs), '/v1/task-instances/recurring-latest')
return app