def _peewee_model():
from peewee import BaseModel, Database
return [BaseModel, Database]
python类Database()的实例源码
def load_database(self, database):
"""
Load the given database, whatever it might be.
A connection string: ``sqlite:///database.sqlite``
A dictionary: ``{'engine': 'SqliteDatabase', 'name': 'database.sqlite'}``
A peewee.Database instance: ``peewee.SqliteDatabase('database.sqlite')``
:param database: Connection string, dict, or peewee.Database instance to use.
:raises: peewee.DatabaseError if database connection cannot be established.
:return: Database connection.
:rtype: peewee.Database instance.
"""
# It could be an actual instance...
if isinstance(database, (peewee.Proxy, peewee.Database)):
return database
# It could be a dictionary...
if isinstance(database, dict):
try:
name = database.pop('name')
engine = database.pop('engine')
except KeyError:
error_msg = 'Configuration dict must specify "name" and "engine" keys.'
raise peewee.DatabaseError(error_msg)
db_class = pydoc.locate(engine)
if not db_class:
raise peewee.DatabaseError('Unable to import engine class: {}'.format(engine))
return db_class(name, **database)
# Or it could be a database URL.
return url_connect(database)
def _load_from_config_dict(self, config_dict):
try:
name = config_dict.pop('name')
engine = config_dict.pop('engine')
except KeyError:
raise RuntimeError('DATABASE configuration must specify a '
'`name` and `engine`.')
if '.' in engine:
path, class_name = engine.rsplit('.', 1)
else:
path, class_name = 'peewee', engine
try:
__import__(path)
module = sys.modules[path]
database_class = getattr(module, class_name)
assert issubclass(database_class, _Database)
except ImportError:
raise RuntimeError('Unable to import %s' % engine)
except AttributeError:
raise RuntimeError('Database engine not found %s' % engine)
except AssertionError:
raise RuntimeError('Database engine not a subclass of '
'peewee.Database: %s' % engine)
return database_class(name, **config_dict)
def __init__(self, app=None, database=None):
self.database = None # Reference to actual Peewee database instance.
self._app = app
self._db = database # dict, url, Database, or None (default).
if app is not None:
self.init_app(app)
def _load_database(self, app, config_value):
if isinstance(config_value, Database):
database = config_value
elif isinstance(config_value, dict):
database = self._load_from_config_dict(dict(config_value))
else:
# Assume a database connection URL.
database = db_url_connect(config_value)
if isinstance(self.database, Proxy):
self.database.initialize(database)
else:
self.database = database
def _load_from_config_dict(self, config_dict):
try:
name = config_dict.pop('name')
engine = config_dict.pop('engine')
except KeyError:
raise RuntimeError('DATABASE configuration must specify a '
'`name` and `engine`.')
if '.' in engine:
path, class_name = engine.rsplit('.', 1)
else:
path, class_name = 'peewee', engine
try:
__import__(path)
module = sys.modules[path]
database_class = getattr(module, class_name)
assert issubclass(database_class, Database)
except ImportError:
raise RuntimeError('Unable to import %s' % engine)
except AttributeError:
raise RuntimeError('Database engine not found %s' % engine)
except AssertionError:
raise RuntimeError('Database engine not a subclass of '
'peewee.Database: %s' % engine)
return database_class(name, **config_dict)
def get_model_class(self):
if self.database is None:
raise RuntimeError('Database must be initialized.')
class BaseModel(Model):
class Meta:
database = self.database
return BaseModel
def __init__(self, db_or_model, file_or_name, fields=None,
field_names=None, has_header=True, sample_size=10,
converter=None, db_table=None, pk_in_csv=False,
**reader_kwargs):
self.file_or_name = file_or_name
self.fields = fields
self.field_names = field_names
self.has_header = has_header
self.sample_size = sample_size
self.converter = converter
self.reader_kwargs = reader_kwargs
if isinstance(file_or_name, basestring):
self.filename = file_or_name
elif isinstance(file_or_name, StringIO):
self.filename = 'data.csv'
else:
self.filename = file_or_name.name
if isinstance(db_or_model, Database):
self.database = db_or_model
self.model = None
self.db_table = (
db_table or
os.path.splitext(os.path.basename(self.filename))[0])
else:
self.model = db_or_model
self.database = self.model._meta.database
self.db_table = self.model._meta.db_table
self.fields = self.model._meta.sorted_fields
self.field_names = self.model._meta.sorted_field_names
# If using an auto-incrementing primary key, ignore it unless we
# are told the primary key is included in the CSV.
if self.model._meta.auto_increment and not pk_in_csv:
self.fields = self.fields[1:]
self.field_names = self.field_names[1:]
def __init__(self, app=None, database=None):
self.database = None # Reference to actual Peewee database instance.
self._app = app
self._db = database # dict, url, Database, or None (default).
if app is not None:
self.init_app(app)
def _load_database(self, app, config_value):
if isinstance(config_value, Database):
database = config_value
elif isinstance(config_value, dict):
database = self._load_from_config_dict(dict(config_value))
else:
# Assume a database connection URL.
database = db_url_connect(config_value)
if isinstance(self.database, Proxy):
self.database.initialize(database)
else:
self.database = database
def _load_from_config_dict(self, config_dict):
try:
name = config_dict.pop('name')
engine = config_dict.pop('engine')
except KeyError:
raise RuntimeError('DATABASE configuration must specify a '
'`name` and `engine`.')
if '.' in engine:
path, class_name = engine.rsplit('.', 1)
else:
path, class_name = 'peewee', engine
try:
__import__(path)
module = sys.modules[path]
database_class = getattr(module, class_name)
assert issubclass(database_class, Database)
except ImportError:
raise RuntimeError('Unable to import %s' % engine)
except AttributeError:
raise RuntimeError('Database engine not found %s' % engine)
except AssertionError:
raise RuntimeError('Database engine not a subclass of '
'peewee.Database: %s' % engine)
return database_class(name, **config_dict)
def get_model_class(self):
if self.database is None:
raise RuntimeError('Database must be initialized.')
class BaseModel(Model):
class Meta:
database = self.database
return BaseModel
def __init__(self, db_or_model, file_or_name, fields=None,
field_names=None, has_header=True, sample_size=10,
converter=None, db_table=None, pk_in_csv=False,
**reader_kwargs):
self.file_or_name = file_or_name
self.fields = fields
self.field_names = field_names
self.has_header = has_header
self.sample_size = sample_size
self.converter = converter
self.reader_kwargs = reader_kwargs
if isinstance(file_or_name, basestring):
self.filename = file_or_name
elif isinstance(file_or_name, StringIO):
self.filename = 'data.csv'
else:
self.filename = file_or_name.name
if isinstance(db_or_model, Database):
self.database = db_or_model
self.model = None
self.db_table = (
db_table or
os.path.splitext(os.path.basename(self.filename))[0])
else:
self.model = db_or_model
self.database = self.model._meta.database
self.db_table = self.model._meta.db_table
self.fields = self.model._meta.sorted_fields
self.field_names = self.model._meta.sorted_field_names
# If using an auto-incrementing primary key, ignore it unless we
# are told the primary key is included in the CSV.
if self.model._meta.auto_increment and not pk_in_csv:
self.fields = self.fields[1:]
self.field_names = self.field_names[1:]
def _load_database(self, app, config_value):
if isinstance(config_value, Database):
database = config_value
elif isinstance(config_value, dict):
database = self._load_from_config_dict(dict(config_value))
else:
# Assume a database connection URL.
database = db_url_connect(config_value)
if isinstance(self.database, Proxy):
self.database.initialize(database)
else:
self.database = database
def _load_from_config_dict(self, config_dict):
try:
name = config_dict.pop('name')
engine = config_dict.pop('engine')
except KeyError:
raise RuntimeError('DATABASE configuration must specify a '
'`name` and `engine`.')
if '.' in engine:
path, class_name = engine.rsplit('.', 1)
else:
path, class_name = 'peewee', engine
try:
__import__(path)
module = sys.modules[path]
database_class = getattr(module, class_name)
assert issubclass(database_class, Database)
except ImportError:
raise RuntimeError('Unable to import %s' % engine)
except AttributeError:
raise RuntimeError('Database engine not found %s' % engine)
except AssertionError:
raise RuntimeError('Database engine not a subclass of '
'peewee.Database: %s' % engine)
return database_class(name, **config_dict)
def get_model_class(self):
if self.database is None:
raise RuntimeError('Database must be initialized.')
class BaseModel(Model):
class Meta:
database = self.database
return BaseModel
def __init__(self, db_or_model, file_or_name, fields=None,
field_names=None, has_header=True, sample_size=10,
converter=None, db_table=None, pk_in_csv=False,
**reader_kwargs):
self.file_or_name = file_or_name
self.fields = fields
self.field_names = field_names
self.has_header = has_header
self.sample_size = sample_size
self.converter = converter
self.reader_kwargs = reader_kwargs
if isinstance(file_or_name, basestring):
self.filename = file_or_name
elif isinstance(file_or_name, StringIO):
self.filename = 'data.csv'
else:
self.filename = file_or_name.name
if isinstance(db_or_model, Database):
self.database = db_or_model
self.model = None
self.db_table = (
db_table or
os.path.splitext(os.path.basename(self.filename))[0])
else:
self.model = db_or_model
self.database = self.model._meta.database
self.db_table = self.model._meta.db_table
self.fields = self.model._meta.sorted_fields
self.field_names = self.model._meta.sorted_field_names
# If using an auto-incrementing primary key, ignore it unless we
# are told the primary key is included in the CSV.
if self.model._meta.auto_increment and not pk_in_csv:
self.fields = self.fields[1:]
self.field_names = self.field_names[1:]
def get_conn(self):
if self.closed:
raise OperationalError('Database pool has not been initialized')
return AioConnection(self.pool.acquire(),
autocommit=self.autocommit,
autorollback=self.autorollback,
exception_wrapper=self.exception_wrapper)
def connect(self, safe=True):
if self.deferred:
raise OperationalError('Database has not been initialized')
if not self.closed:
if safe:
return
raise OperationalError('Connection already open')
with self.exception_wrapper:
self.pool = await self._connect(self.database,
**self.connect_kwargs)
self.closed = False
def drop_table(self, model_class, fail_silently=False, cascade=False):
qc = self.compiler()
if cascade and not self.drop_cascade:
raise ValueError('Database does not support DROP TABLE..CASCADE.')
async with self.get_conn() as conn:
args = qc.drop_table(model_class, fail_silently, cascade)
return await conn.execute_sql(*args)