def test_db():
"""Test to check db."""
_db = SqliteDatabase(":memory:")
with test_database(
_db, (
Organisation,
User,
UserOrg,
OrcidToken,
UserOrgAffiliation,
Task,
AffiliationRecord,
),
fail_silently=True) as _test_db:
yield _test_db
return
python类SqliteDatabase()的实例源码
def sql_select(explain=False):
statement, params = load_query(request.args['query'])
database = current_app.extensions.get('peewee').database.obj
if explain:
if isinstance(database, SqliteDatabase):
statement = 'EXPLAIN QUERY PLAN\n%s' % statement
else:
statement = 'EXPLAIN\n%s' % statement
result = database.execute_sql(statement, params)
headers = []
data = list(result.fetchall())
if data:
headers = ['' for _ in range(len(data[0]))]
return g.debug_toolbar.render('panels/sqlalchemy_select.html', {
'result': data,
'headers': headers,
'sql': format_sql(statement, params),
'duration': float(request.args['duration']),
})
def __get_migrator(self):
if isinstance(self.db.engine, peewee.SqliteDatabase) or isinstance(self.db.engine, SqliteExtDatabase):
return SqliteMigrator(self.db.engine)
elif isinstance(self.db.engine, peewee.MySQLDatabase):
return MySQLMigrator(self.db.engine)
elif isinstance(self.db.engine, peewee.PostgresqlDatabase):
return PostgresqlMigrator(self.db.engine)
raise ImproperlyConfigured('Database engine doesn\'t support Migrations!')
def __init__(self, *args, **kwargs):
super(APIFlask, self).__init__(*args, **kwargs)
self.config.update({
'DB_ENGINE': SqliteDatabase,
'DATABASE': {'database': 'flask_restapi.db'}
})
def get_db_conn():
import peewee
env = os.environ.get('SENTINEL_ENV', 'production')
# default values should be used unless you need a different config for development
db_host = sentinel_cfg.get('db_host', '127.0.0.1')
db_port = sentinel_cfg.get('db_port', None)
db_name = sentinel_cfg.get('db_name', 'sentinel')
db_user = sentinel_cfg.get('db_user', 'sentinel')
db_password = sentinel_cfg.get('db_password', 'sentinel')
db_charset = sentinel_cfg.get('db_charset', 'utf8mb4')
db_driver = sentinel_cfg.get('db_driver', 'sqlite')
if (env == 'test'):
if db_driver == 'sqlite':
db_name = sqlite_test_db_name(db_name)
else:
db_name = "%s_test" % db_name
peewee_drivers = {
'mysql': peewee.MySQLDatabase,
'postgres': peewee.PostgresqlDatabase,
'sqlite': peewee.SqliteDatabase,
}
driver = peewee_drivers.get(db_driver)
dbpfn = 'passwd' if db_driver == 'mysql' else 'password'
db_conn = {
'host': db_host,
'user': db_user,
dbpfn: db_password,
}
if db_port:
db_conn['port'] = int(db_port)
if driver == peewee.SqliteDatabase:
db_conn = {}
db = driver(db_name, **db_conn)
return db
def get_db_conn():
import peewee
env = os.environ.get('SENTINEL_ENV', 'production')
# default values should be used unless you need a different config for development
db_host = sentinel_cfg.get('db_host', '127.0.0.1')
db_port = sentinel_cfg.get('db_port', None)
db_name = sentinel_cfg.get('db_name', 'sentinel')
db_user = sentinel_cfg.get('db_user', 'sentinel')
db_password = sentinel_cfg.get('db_password', 'sentinel')
db_charset = sentinel_cfg.get('db_charset', 'utf8mb4')
db_driver = sentinel_cfg.get('db_driver', 'sqlite')
if (env == 'test'):
if db_driver == 'sqlite':
db_name = sqlite_test_db_name(db_name)
else:
db_name = "%s_test" % db_name
peewee_drivers = {
'mysql': peewee.MySQLDatabase,
'postgres': peewee.PostgresqlDatabase,
'sqlite': peewee.SqliteDatabase,
}
driver = peewee_drivers.get(db_driver)
dbpfn = 'passwd' if db_driver == 'mysql' else 'password'
db_conn = {
'host': db_host,
'user': db_user,
dbpfn: db_password,
}
if db_port:
db_conn['port'] = int(db_port)
if driver == peewee.SqliteDatabase:
db_conn = {}
db = driver(db_name, **db_conn)
return db
def setup_class(cls):
"""
When a Test is created override the ``database`` attribute for all
the tables with a SqliteDatabase in memory.
"""
for table in TABLES:
table._meta.database = cls.TEST_DB
table.create_table(fail_silently=True)
cls.app = app.test_client()
def test_database_creation(tmpdir):
db = peewee.SqliteDatabase(':memory:')
manager = DatabaseManager(db, directory=tmpdir)
assert isinstance(manager.database, peewee.SqliteDatabase)
db = {'engine': 'peewee.SqliteDatabase', 'name': ':memory:'}
manager = DatabaseManager(db, directory=tmpdir)
assert isinstance(manager.database, peewee.SqliteDatabase)
db = 'sqlite:///:memory:'
manager = DatabaseManager(db, directory=tmpdir)
assert isinstance(manager.database, peewee.SqliteDatabase)
def test_database_creation_error(tmpdir):
db = {'name': ':memory:'}
with pytest.raises(peewee.DatabaseError):
DatabaseManager(db, directory=tmpdir)
db = {'engine': 'peewee.SqliteDatabase'}
with pytest.raises(peewee.DatabaseError):
DatabaseManager(db, directory=tmpdir)
db = {'engine': 'unknown.FakeDatabase', 'name': ':memory:'}
with pytest.raises(peewee.DatabaseError):
DatabaseManager(db, directory=tmpdir)
def test_info(tmpdir, caplog):
manager = DatabaseManager('sqlite:///:memory:', directory=tmpdir)
manager.info()
assert 'driver: SqliteDatabase' in caplog.text
assert 'database: :memory:' in caplog.text
def load_database(self, database):
"""
Load the given database, whatever it might be.
A connection string: ``sqlite:///database.sqlite``
A dictionary: ``{'engine': 'SqliteDatabase', 'name': 'database.sqlite'}``
A peewee.Database instance: ``peewee.SqliteDatabase('database.sqlite')``
:param database: Connection string, dict, or peewee.Database instance to use.
:raises: peewee.DatabaseError if database connection cannot be established.
:return: Database connection.
:rtype: peewee.Database instance.
"""
# It could be an actual instance...
if isinstance(database, (peewee.Proxy, peewee.Database)):
return database
# It could be a dictionary...
if isinstance(database, dict):
try:
name = database.pop('name')
engine = database.pop('engine')
except KeyError:
error_msg = 'Configuration dict must specify "name" and "engine" keys.'
raise peewee.DatabaseError(error_msg)
db_class = pydoc.locate(engine)
if not db_class:
raise peewee.DatabaseError('Unable to import engine class: {}'.format(engine))
return db_class(name, **database)
# Or it could be a database URL.
return url_connect(database)
def startup_library_service():
wamp = autobahn_sync.AutobahnSync()
wamp.run()
db = pw.SqliteDatabase('books.db')
class Book(pw.Model):
title = pw.CharField()
author = pw.CharField()
class Meta:
database = db
try:
db.create_table(Book)
except pw.OperationalError:
pass
@wamp.register('com.library.get_book')
def get_book(id):
try:
b = Book.get(id=id)
except pw.DoesNotExist:
return {'_error': "Doesn't exist"}
return {'id': id, 'title': b.title, 'author': b.author}
@wamp.register('com.library.new_book')
def new_book(title, author):
book = Book(title=title, author=author)
book.save()
wamp.session.publish('com.library.book_created', book.id)
return {'id': book.id}
def __init__(self):
self.path = None
self.db = SqliteDatabase(None)
def generate_sqlite_db(variants_to_process, temp_sqlite_db_path, sqlite_db_path):
logging.info("populating sqlite database: " + temp_sqlite_db_path)
if os.path.isfile(temp_sqlite_db_path):
run("rm -f " + temp_sqlite_db_path)
sqlite_db = peewee.SqliteDatabase(temp_sqlite_db_path, autocommit=False)
class t(_SharedVariantPositionFields):
n_expected_samples = peewee.IntegerField(index=True, null=True)
n_available_samples = peewee.IntegerField(index=True, null=True)
class Meta:
database = sqlite_db
indexes = (
(('chrom', 'pos', 'ref', 'alt', 'het_or_hom_or_hemi'), True), # True means unique index
)
t.create_table(fail_silently=True)
# copy the records from the Variant table used by generate_HC_bams.py
sqlite_db.connect()
with sqlite_db.atomic():
for v in variants_to_process: #Variant.select().where(Variant.finished==1).dicts():
#shortcuts.model_to_dict(v)
d = {
'chrom': v.chrom, 'pos': v.pos, 'ref': v.ref, 'alt': v.alt, 'het_or_hom_or_hemi': v.het_or_hom_or_hemi,
'n_expected_samples': v.n_expected_samples,
'n_available_samples': v.n_available_samples,
}
# delete readviz_bam_paths as they're no longer relevant because the data from these is being combined into one bam file
#print("INSERTING " + str(d))
t.insert(**d).execute()
sqlite_db.close()
run("mv %s %s" % (temp_sqlite_db_path, sqlite_db_path))
def test_pd_field():
db = SqliteDatabase(":memory:")
class TestModel(Model):
pf = PartialDateField(null=True)
class Meta:
database = db
TestModel.create_table()
TestModel(pf=PartialDate()).save()
TestModel(pf=None).save()
res = [r[0] for r in db.execute_sql("SELECT pf FROM testmodel").fetchall()]
assert res[0] is None and res[1] is None
TestModel(pf=PartialDate(1997)).save()
TestModel(pf=PartialDate(1996, 4)).save()
TestModel(pf=PartialDate(1995, 5, 13)).save()
res = [r[0] for r in db.execute_sql("SELECT pf FROM testmodel").fetchall()]
assert '1995-05-13' in res
assert '1996-04-**' in res
assert '1997-**-**' in res
res = [r.pf for r in TestModel.select().order_by(TestModel.pf)]
assert res[0] is None
assert res[1] is None
assert res[2] == PartialDate(1995, 5, 13)
assert res[3] == PartialDate(1996, 4)
assert res[4] == PartialDate(1997)
def is_sqlite(db):
return isinstance(db, pw.SqliteDatabase)
def test_time_column(self):
class SomeModel(pw.Model):
some_field = pw.TimeField(null=True)
class Meta:
database = self.db
self.evolve_and_check_noop()
t = datetime.time(11,11,11)
SomeModel.create(some_field=t)
self.assertEqual(SomeModel.select().first().some_field, t)
## SQLite doesn't work
#class SQLite(PostgreSQL):
# @classmethod
# def setUpClass(cls):
# os.system('rm /tmp/peeweedbevolve_test.db')
#
# def setUp(self):
# self.db = pw.SqliteDatabase('/tmp/peeweedbevolve_test.db')
# self.db.connect()
# peeweedbevolve.clear()
#
# def tearDown(self):
# self.db.close()
# os.system('rm /tmp/peeweedbevolve_test.db')
def test_migrate(export_file, tmpdir):
runner = CliRunner()
memory_db = peewee.SqliteDatabase(':memory:')
migrate_mock = mock.MagicMock(spec=migrate.migrate, side_effect=migrate.migrate)
with mock.patch('tracboat.cli.peewee.SqliteDatabase', lambda uri: memory_db), \
mock.patch('tracboat.migrate.migrate', migrate_mock):
result = runner.invoke(
cli.migrate, obj={}, catch_exceptions=False,
args=['--from-export-file', export_file, '--mock', '--mock-path', str(tmpdir)]
)
migrate_mock.assert_called()
assert result.exit_code == 0
def test_init():
from peewee import SqliteDatabase
from erna.automatic_processing import database
import tempfile
with tempfile.NamedTemporaryFile() as f:
test_db = SqliteDatabase(
f.name,
fields={'night': 'INTEGER', 'longblob': 'BLOB'}
)
with test_database(test_db, database.MODELS):
database.setup_database(test_db)
def test_init_twice():
from peewee import SqliteDatabase
from erna.automatic_processing import database
import tempfile
with tempfile.NamedTemporaryFile() as f:
test_db = SqliteDatabase(
f.name,
fields={'night': 'INTEGER', 'longblob': 'BLOB'}
)
with test_database(test_db, database.MODELS):
database.setup_database(test_db)
database.setup_database(test_db)
def test_get_database_with_defined_database_type(self):
self.assertIsInstance(get_database(env('DB_TYPE')), SqliteDatabase)
def get_database(db_type):
"""
Get the database object that should be used.
Args:
db_type: string
Returns:
peewee database driver.
"""
db = False
if db_type == 'sql':
from peewee import SqliteDatabase
db = SqliteDatabase(env("DB"))
elif db_type == 'mysql':
from peewee import MySQLDatabase
db = MySQLDatabase(
env("DB"),
host=env("DB_HOST"),
port=int(env("DB_PORT")),
user=env("DB_USERNAME"),
passwd=env("DB_PASSWORD")
)
elif db_type == 'postgresql':
from peewee import PostgresqlDatabase
db = PostgresqlDatabase(
env('DB'),
host=env("DB_HOST"),
port=int(env("DB_PORT")),
user=env("DB_USERNAME"),
passwd=env("DB_PASSWORD")
)
elif db_type == 'berkeley':
from playhouse.berkeleydb import BerkeleyDatabase
db = BerkeleyDatabase(
env('DB'),
host=env("DB_HOST"),
port=int(env("DB_PORT")),
user=env("DB_USERNAME"),
passwd=env("DB_PASSWORD")
)
return db