def _foreign_key_constraint(self, name, source, referent,
local_cols, remote_cols,
onupdate=None, ondelete=None,
deferrable=None, source_schema=None,
referent_schema=None):
m = sa_schema.MetaData()
if source == referent:
t1_cols = local_cols + remote_cols
else:
t1_cols = local_cols
sa_schema.Table(referent, m,
*[sa_schema.Column(n, NULLTYPE) for n in remote_cols],
schema=referent_schema)
t1 = sa_schema.Table(source, m,
*[sa_schema.Column(n, NULLTYPE) for n in t1_cols],
schema=source_schema)
tname = "%s.%s" % (referent_schema, referent) if referent_schema \
else referent
f = sa_schema.ForeignKeyConstraint(local_cols,
["%s.%s" % (tname, n)
for n in remote_cols],
name=name,
onupdate=onupdate,
ondelete=ondelete,
deferrable=deferrable
)
t1.append_constraint(f)
return f
python类MetaData()的实例源码
def _unique_constraint(self, name, source, local_cols, schema=None, **kw):
t = sa_schema.Table(source, sa_schema.MetaData(),
*[sa_schema.Column(n, NULLTYPE) for n in local_cols],
schema=schema)
kw['name'] = name
uq = sa_schema.UniqueConstraint(*[t.c[n] for n in local_cols], **kw)
# TODO: need event tests to ensure the event
# is fired off here
t.append_constraint(uq)
return uq
def _table(self, name, *columns, **kw):
m = sa_schema.MetaData()
t = sa_schema.Table(name, m, *columns, **kw)
for f in t.foreign_keys:
self._ensure_table_for_fk(m, f)
return t
def _index(self, name, tablename, columns, schema=None, **kw):
t = sa_schema.Table(tablename or 'no_table', sa_schema.MetaData(),
*[sa_schema.Column(n, NULLTYPE) for n in columns],
schema=schema
)
return sa_schema.Index(name, *[t.c[n] for n in columns], **kw)
def metadata_from_session(s):
"""
Args:
s: an SQLAlchemy :class:`Session`
Returns:
The metadata.
Get the metadata associated with the schema.
"""
meta = MetaData()
meta.reflect(bind=s.bind)
return meta
def provide_metadata(fn, *args, **kw):
"""Provide bound MetaData for a single test, dropping afterwards."""
from . import config
from sqlalchemy import schema
metadata = schema.MetaData(config.db)
self = args[0]
prev_meta = getattr(self, 'metadata', None)
self.metadata = metadata
try:
return fn(*args, **kw)
finally:
metadata.drop_all()
self.metadata = prev_meta
def drop_all_tables(engine, inspector, schema=None, include_names=None):
from sqlalchemy import Column, Table, Integer, MetaData, \
ForeignKeyConstraint
from sqlalchemy.schema import DropTable, DropConstraint
if include_names is not None:
include_names = set(include_names)
with engine.connect() as conn:
for tname, fkcs in reversed(
inspector.get_sorted_table_and_fkc_names(schema=schema)):
if tname:
if include_names is not None and tname not in include_names:
continue
conn.execute(DropTable(
Table(tname, MetaData(), schema=schema)
))
elif fkcs:
if not engine.dialect.supports_alter:
continue
for tname, fkc in fkcs:
if include_names is not None and \
tname not in include_names:
continue
tb = Table(
tname, MetaData(),
Column('x', Integer),
Column('y', Integer),
schema=schema
)
conn.execute(DropConstraint(
ForeignKeyConstraint(
[tb.c.x], [tb.c.y], name=fkc)
))
def provide_metadata(fn, *args, **kw):
"""Provide bound MetaData for a single test, dropping afterwards."""
from . import config
from sqlalchemy import schema
metadata = schema.MetaData(config.db)
self = args[0]
prev_meta = getattr(self, 'metadata', None)
self.metadata = metadata
try:
return fn(*args, **kw)
finally:
metadata.drop_all()
self.metadata = prev_meta
def drop_all_tables(engine, inspector, schema=None, include_names=None):
from sqlalchemy import Column, Table, Integer, MetaData, \
ForeignKeyConstraint
from sqlalchemy.schema import DropTable, DropConstraint
if include_names is not None:
include_names = set(include_names)
with engine.connect() as conn:
for tname, fkcs in reversed(
inspector.get_sorted_table_and_fkc_names(schema=schema)):
if tname:
if include_names is not None and tname not in include_names:
continue
conn.execute(DropTable(
Table(tname, MetaData(), schema=schema)
))
elif fkcs:
if not engine.dialect.supports_alter:
continue
for tname, fkc in fkcs:
if include_names is not None and \
tname not in include_names:
continue
tb = Table(
tname, MetaData(),
Column('x', Integer),
Column('y', Integer),
schema=schema
)
conn.execute(DropConstraint(
ForeignKeyConstraint(
[tb.c.x], [tb.c.y], name=fkc)
))
def provide_metadata(fn, *args, **kw):
"""Provide bound MetaData for a single test, dropping afterwards."""
from . import config
from sqlalchemy import schema
metadata = schema.MetaData(config.db)
self = args[0]
prev_meta = getattr(self, 'metadata', None)
self.metadata = metadata
try:
return fn(*args, **kw)
finally:
metadata.drop_all()
self.metadata = prev_meta
def _prep_testing_database(options, file_config):
from sqlalchemy.testing import engines
from sqlalchemy import schema, inspect
# also create alt schemas etc. here?
if options.dropfirst:
e = engines.utf8_engine()
inspector = inspect(e)
try:
view_names = inspector.get_view_names()
except NotImplementedError:
pass
else:
for vname in view_names:
e.execute(schema._DropView(schema.Table(vname, schema.MetaData())))
try:
view_names = inspector.get_view_names(schema="test_schema")
except NotImplementedError:
pass
else:
for vname in view_names:
e.execute(schema._DropView(
schema.Table(vname,
schema.MetaData(), schema="test_schema")))
for tname in reversed(inspector.get_table_names(order_by="foreign_key")):
e.execute(schema.DropTable(schema.Table(tname, schema.MetaData())))
for tname in reversed(inspector.get_table_names(
order_by="foreign_key", schema="test_schema")):
e.execute(schema.DropTable(
schema.Table(tname, schema.MetaData(), schema="test_schema")))
e.dispose()
def provide_metadata(fn, *args, **kw):
"""Provide bound MetaData for a single test, dropping afterwards."""
from . import config
from sqlalchemy import schema
metadata = schema.MetaData(config.db)
self = args[0]
prev_meta = getattr(self, 'metadata', None)
self.metadata = metadata
try:
return fn(*args, **kw)
finally:
metadata.drop_all()
self.metadata = prev_meta
def _primary_key_constraint(self, name, table_name, cols, schema=None):
m = sa_schema.MetaData()
columns = [sa_schema.Column(n, NULLTYPE) for n in cols]
t1 = sa_schema.Table(table_name, m,
*columns,
schema=schema)
p = sa_schema.PrimaryKeyConstraint(*columns, name=name)
t1.append_constraint(p)
return p
def _foreign_key_constraint(self, name, source, referent,
local_cols, remote_cols,
onupdate=None, ondelete=None,
deferrable=None, source_schema=None,
referent_schema=None):
m = sa_schema.MetaData()
if source == referent:
t1_cols = local_cols + remote_cols
else:
t1_cols = local_cols
sa_schema.Table(referent, m,
*[sa_schema.Column(n, NULLTYPE) for n in remote_cols],
schema=referent_schema)
t1 = sa_schema.Table(source, m,
*[sa_schema.Column(n, NULLTYPE) for n in t1_cols],
schema=source_schema)
tname = "%s.%s" % (referent_schema, referent) if referent_schema \
else referent
f = sa_schema.ForeignKeyConstraint(local_cols,
["%s.%s" % (tname, n)
for n in remote_cols],
name=name,
onupdate=onupdate,
ondelete=ondelete,
deferrable=deferrable
)
t1.append_constraint(f)
return f
def _unique_constraint(self, name, source, local_cols, schema=None, **kw):
t = sa_schema.Table(source, sa_schema.MetaData(),
*[sa_schema.Column(n, NULLTYPE) for n in local_cols],
schema=schema)
kw['name'] = name
uq = sa_schema.UniqueConstraint(*[t.c[n] for n in local_cols], **kw)
# TODO: need event tests to ensure the event
# is fired off here
t.append_constraint(uq)
return uq
def _table(self, name, *columns, **kw):
m = sa_schema.MetaData()
t = sa_schema.Table(name, m, *columns, **kw)
for f in t.foreign_keys:
self._ensure_table_for_fk(m, f)
return t
def _index(self, name, tablename, columns, schema=None, **kw):
t = sa_schema.Table(tablename or 'no_table', sa_schema.MetaData(),
*[sa_schema.Column(n, NULLTYPE) for n in columns],
schema=schema
)
return sa_schema.Index(name, *[t.c[n] for n in columns], **kw)
def upgrade_database(config: Config, engine: Engine, metadata: MetaData,
*, revision: str='head',
module_name: typing.Optional[str]=None) -> None:
"""Upgrades the database schema to the chosen ``revision`` (default is
head).
"""
script = ScriptDirectory.from_config(config)
def upgrade(rev, context):
def update_current_rev(old, new):
if old == new:
return
if new is None:
context.impl._exec(context._version.delete())
elif old is None:
context.impl._exec(
context._version.insert().values(
version_num=literal_column("'%s'" % new)
)
)
else:
context.impl._exec(
context._version.update().values(
version_num=literal_column("'%s'" % new)
)
)
if not rev and revision == 'head':
if module_name:
import_all_modules(module_name)
metadata.create_all(engine)
dest = script.get_revision(revision)
update_current_rev(None, dest and dest.revision)
return []
return script._upgrade_revs(revision, rev)
with EnvironmentContext(config, script, fn=upgrade, as_sql=False,
destination_rev=revision, tag=None):
script.run_env()
def test_connection(
ctx: object, metadata: MetaData, engine: Engine,
real_transaction: bool=False
) -> typing.Generator:
"""Joining a SQLAlchemy session into an external transaction for test suit.
.. seealso::
Documentation of the SQLAlchemy session used in test suites.
<http://docs.sqlalchemy.org/en/latest/orm/session_transaction.html#joining-a-session-into-an-external-transaction-such-as-for-test-suites>
""" # noqa
if real_transaction:
metadata.create_all(engine)
try:
yield engine
finally:
metadata.drop_all(engine, checkfirst=True)
return
connection = engine.connect()
try:
metadata.drop_all(connection, checkfirst=True)
transaction = connection.begin()
try:
metadata.create_all(bind=connection)
ctx._test_fx_connection = connection
try:
yield connection
finally:
del ctx._test_fx_connection
finally:
transaction.rollback()
finally:
connection.close()
engine.dispose()
def main():
parser = argparse.ArgumentParser(description='Generates SQLAlchemy model code from an existing database.')
parser.add_argument('url', nargs='?', help='SQLAlchemy url to the database')
parser.add_argument('--version', action='store_true', help="print the version number and exit")
parser.add_argument('--schema', help='load tables from an alternate schema')
parser.add_argument('--tables', help='tables to process (comma-separated, default: all)')
parser.add_argument('--noviews', action='store_true', help="ignore views")
parser.add_argument('--noindexes', action='store_true', help='ignore indexes')
parser.add_argument('--noconstraints', action='store_true', help='ignore constraints')
parser.add_argument('--nojoined', action='store_true', help="don't autodetect joined table inheritance")
parser.add_argument('--noinflect', action='store_true', help="don't try to convert tables names to singular form")
parser.add_argument('--noclasses', action='store_true', help="don't generate classes, only tables")
parser.add_argument('--outfile', help='file to write output to (default: stdout)')
args = parser.parse_args()
if args.version:
version = pkg_resources.get_distribution('sqlacodegen').parsed_version
print(version.public)
return
if not args.url:
print('You must supply a url\n', file=sys.stderr)
parser.print_help()
return
engine = create_engine(args.url)
metadata = MetaData(engine)
tables = args.tables.split(',') if args.tables else None
metadata.reflect(engine, args.schema, not args.noviews, tables)
outfile = codecs.open(args.outfile, 'w', encoding='utf-8') if args.outfile else sys.stdout
generator = CodeGenerator(metadata, args.noindexes, args.noconstraints, args.nojoined, args.noinflect,
args.noclasses)
generator.render(outfile)