def init(config: dict, engine: Optional[Engine] = None) -> None:
"""
Initializes this module with the given config,
registers all known command handlers
and starts polling for message updates
:param config: config to use
:param engine: database engine for sqlalchemy (Optional)
:return: None
"""
_CONF.update(config)
if not engine:
if _CONF.get('dry_run', False):
engine = create_engine('sqlite://',
connect_args={'check_same_thread': False},
poolclass=StaticPool,
echo=False)
else:
engine = create_engine('sqlite:///tradesv3.sqlite')
session = scoped_session(sessionmaker(bind=engine, autoflush=True, autocommit=True))
Trade.session = session()
Trade.query = session.query_property()
_DECL_BASE.metadata.create_all(engine)
python类sessionmaker()的实例源码
test_sql.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_temporary_table(self):
test_data = u'Hello, World!'
expected = DataFrame({'spam': [test_data]})
Base = declarative.declarative_base()
class Temporary(Base):
__tablename__ = 'temp_test'
__table_args__ = {'prefixes': ['TEMPORARY']}
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
spam = sqlalchemy.Column(sqlalchemy.Unicode(30), nullable=False)
Session = sa_session.sessionmaker(bind=self.conn)
session = Session()
with session.transaction:
conn = session.connection()
Temporary.__table__.create(conn)
session.add(Temporary(spam=test_data))
session.flush()
df = sql.read_sql_query(
sql=sqlalchemy.select([Temporary.spam]),
con=conn,
)
tm.assert_frame_equal(df, expected)
def setUp(self):
self.addCleanup(Test.cleanup, self)
url = os.getenv("DB_URL", "sqlite:///:memory:")
self.dburl = url
# print self.dburl
# if not url:
# self.skipTest("No database URL set")
#
# self.dburl = "sqlite:///:memory:"
DBSession = scoped_session(sessionmaker())
self.engine = create_engine(self.dburl, echo=False)
DBSession.remove()
DBSession.configure(bind=self.engine, autoflush=False, expire_on_commit=False)
# Base.metadata.drop_all(engine)
Base.metadata.create_all(self.engine)
self.session = DBSession
def get_session(dbpath, scoped=False): # , enable_fk_if_sqlite=True):
"""
Create an sql alchemy session for IO db operations
:param dbpath: the path to the database, e.g. sqlite:///path_to_my_dbase.sqlite
:param scoped: boolean (False by default) if the session must be scoped session
"""
# init the session:
engine = create_engine(dbpath)
Base.metadata.create_all(engine) # @UndefinedVariable
# enable fkeys if sqlite. This can be added also as event listener as outlined here:
# http://stackoverflow.com/questions/13712381/how-to-turn-on-pragma-foreign-keys-on-in-sqlalchemy-migration-script-or-conf
# NOT implemented YET. See models.py
if not scoped:
# create a configured "Session" class
session = sessionmaker(bind=engine)
# create a Session
return session()
# return session
else:
session_factory = sessionmaker(bind=engine)
return scoped_session(session_factory)
def __init__(self, db_path: str, lang: str):
engine = create_engine('sqlite:///{}'.format(db_path))
SessionMaker = sessionmaker(bind=engine)
self.session = SessionMaker()
self.lang = lang
def __init__(self, url='sqlite:///default.db'):
self.engine = create_engine(
os.environ.get('DATABASE_URL', url),
echo=False
)
self.Session = scoped_session(sessionmaker(
bind=self.engine,
expire_on_commit=False))
def get_session(self):
if self.session is None:
klass = sessionmaker(bind=create_engine(self.database))
self.session = klass()
return self.session
def persist_bundle(self):
from madmex.persistence.driver import persist_bundle
from sqlalchemy import create_engine
from sqlalchemy.orm.session import sessionmaker
from madmex.util import remove_file
dummy = DummyBundle()
persist_bundle(dummy)
my_database = getattr(SETTINGS, 'ANTARES_TEST_DATABASE')
klass = sessionmaker(bind=create_engine(my_database))
session = klass()
query = 'SELECT count(*) FROM product WHERE uuid=\'%s\';' % dummy.uuid_id
print query
try:
result_set = session.execute(query)
for row in result_set:
self.assertGreater(row['count'], 0)
# Delete object from database.
session.delete(dummy.get_database_object())
session.commit()
for file_name in dummy.get_files():
full_path = os.path.join(dummy.get_output_directory(), os.path.basename(file_name))
self.assertTrue(os.path.isfile(full_path))
# Remove file from filesystem.
remove_file(full_path)
except:
session.rollback()
raise
finally:
session.close()
def persist_bundle_sensor(self):
from madmex.persistence.driver import persist_bundle
folder = '/LUSTRE/MADMEX/staging/madmex_antares/test_ingest/556_297_041114_dim_img_spot'
from sqlalchemy import create_engine
from sqlalchemy.orm.session import sessionmaker
from madmex.mapper.bundle.spot5 import Bundle
#from madmex.configuration import SETTINGS
dummy = Bundle(folder)
#dummy.target = '/LUSTRE/MADMEX/staging/'
target_url = getattr(SETTINGS, 'TEST_FOLDER')
print target_url
#TODO please fix me, horrible hack
dummy.target = target_url
persist_bundle(dummy)
my_database = getattr(SETTINGS, 'ANTARES_TEST_DATABASE')
klass = sessionmaker(bind=create_engine(my_database))
session = klass()
query = 'SELECT count(*) FROM product WHERE uuid=\'%s\';' % dummy.uuid_id
try:
result_set = session.execute(query)
for row in result_set:
self.assertGreater(row['count'], 0)
session.delete(dummy.get_database_object())
session.commit()
for file_name in dummy.get_files():
full_path = os.path.join(target_url, os.path.basename(file_name))
self.assertTrue(os.path.isfile(full_path))
os.remove(full_path)
except:
session.rollback()
raise
finally:
session.close()
def create_vector_tables(path_query):
klass = sessionmaker(bind=ENGINE)
session = klass()
file_open = open(path_query, 'r')
sql = " ".join(file_open.readlines())
session.execute(sql)
def __init__(self, path, echo=True):
self.engine = create_engine(path, echo=echo)
if not database_exists(self.engine.url):
create_database(self.engine.url)
Base.metadata.create_all(self.engine)
from sqlalchemy.orm import scoped_session
self.Session = scoped_session(sessionmaker(bind=self.engine))