def _setup_referent(self, metadata, constraint):
spec = constraint.elements[0]._get_colspec()
parts = spec.split(".")
tname = parts[-2]
if len(parts) == 3:
referent_schema = parts[0]
else:
referent_schema = None
if tname != '_alembic_batch_temp':
key = sql_schema._get_table_key(tname, referent_schema)
if key in metadata.tables:
t = metadata.tables[key]
for elem in constraint.elements:
colname = elem._get_colspec().split(".")[-1]
if not t.c.contains_column(colname):
t.append_column(
Column(colname, sqltypes.NULLTYPE)
)
else:
Table(
tname, metadata,
*[Column(n, sqltypes.NULLTYPE) for n in
[elem._get_colspec().split(".")[-1]
for elem in constraint.elements]],
schema=referent_schema)
python类_get_table_key()的实例源码
def _setup_referent(self, metadata, constraint):
spec = constraint.elements[0]._get_colspec()
parts = spec.split(".")
tname = parts[-2]
if len(parts) == 3:
referent_schema = parts[0]
else:
referent_schema = None
if tname != '_alembic_batch_temp':
key = sql_schema._get_table_key(tname, referent_schema)
if key in metadata.tables:
t = metadata.tables[key]
for elem in constraint.elements:
colname = elem._get_colspec().split(".")[-1]
if not t.c.contains_column(colname):
t.append_column(
Column(colname, sqltypes.NULLTYPE)
)
else:
Table(
tname, metadata,
*[Column(n, sqltypes.NULLTYPE) for n in
[elem._get_colspec().split(".")[-1]
for elem in constraint.elements]],
schema=referent_schema)
def _setup_referent(self, metadata, constraint):
spec = constraint.elements[0]._get_colspec()
parts = spec.split(".")
tname = parts[-2]
if len(parts) == 3:
referent_schema = parts[0]
else:
referent_schema = None
if tname != '_alembic_batch_temp':
key = sql_schema._get_table_key(tname, referent_schema)
if key in metadata.tables:
t = metadata.tables[key]
for elem in constraint.elements:
colname = elem._get_colspec().split(".")[-1]
if not t.c.contains_column(colname):
t.append_column(
Column(colname, sqltypes.NULLTYPE)
)
else:
Table(
tname, metadata,
*[Column(n, sqltypes.NULLTYPE) for n in
[elem._get_colspec().split(".")[-1]
for elem in constraint.elements]],
schema=referent_schema)
def _setup_referent(self, metadata, constraint):
spec = constraint.elements[0]._get_colspec()
parts = spec.split(".")
tname = parts[-2]
if len(parts) == 3:
referent_schema = parts[0]
else:
referent_schema = None
if tname != '_alembic_batch_temp':
key = sql_schema._get_table_key(tname, referent_schema)
if key in metadata.tables:
t = metadata.tables[key]
for elem in constraint.elements:
colname = elem._get_colspec().split(".")[-1]
if not t.c.contains_column(colname):
t.append_column(
Column(colname, sqltypes.NULLTYPE)
)
else:
Table(
tname, metadata,
*[Column(n, sqltypes.NULLTYPE) for n in
[elem._get_colspec().split(".")[-1]
for elem in constraint.elements]],
schema=referent_schema)
def _setup_referent(self, metadata, constraint):
spec = constraint.elements[0]._get_colspec()
parts = spec.split(".")
tname = parts[-2]
if len(parts) == 3:
referent_schema = parts[0]
else:
referent_schema = None
if tname != '_alembic_batch_temp':
key = sql_schema._get_table_key(tname, referent_schema)
if key in metadata.tables:
t = metadata.tables[key]
for elem in constraint.elements:
colname = elem._get_colspec().split(".")[-1]
if not t.c.contains_column(colname):
t.append_column(
Column(colname, sqltypes.NULLTYPE)
)
else:
Table(
tname, metadata,
*[Column(n, sqltypes.NULLTYPE) for n in
[elem._get_colspec().split(".")[-1]
for elem in constraint.elements]],
schema=referent_schema)
def _setup_referent(self, metadata, constraint):
spec = constraint.elements[0]._get_colspec()
parts = spec.split(".")
tname = parts[-2]
if len(parts) == 3:
referent_schema = parts[0]
else:
referent_schema = None
if tname != '_alembic_batch_temp':
key = sql_schema._get_table_key(tname, referent_schema)
if key in metadata.tables:
t = metadata.tables[key]
for elem in constraint.elements:
colname = elem._get_colspec().split(".")[-1]
if not t.c.contains_column(colname):
t.append_column(
Column(colname, sqltypes.NULLTYPE)
)
else:
Table(
tname, metadata,
*[Column(n, sqltypes.NULLTYPE) for n in
[elem._get_colspec().split(".")[-1]
for elem in constraint.elements]],
schema=referent_schema)
def _setup_referent(self, metadata, constraint):
spec = constraint.elements[0]._get_colspec()
parts = spec.split(".")
tname = parts[-2]
if len(parts) == 3:
referent_schema = parts[0]
else:
referent_schema = None
if tname != '_alembic_batch_temp':
key = sql_schema._get_table_key(tname, referent_schema)
if key in metadata.tables:
t = metadata.tables[key]
for elem in constraint.elements:
colname = elem._get_colspec().split(".")[-1]
if not t.c.contains_column(colname):
t.append_column(
Column(colname, sqltypes.NULLTYPE)
)
else:
Table(
tname, metadata,
*[Column(n, sqltypes.NULLTYPE) for n in
[elem._get_colspec().split(".")[-1]
for elem in constraint.elements]],
schema=referent_schema)
def _setup_referent(self, metadata, constraint):
spec = constraint.elements[0]._get_colspec()
parts = spec.split(".")
tname = parts[-2]
if len(parts) == 3:
referent_schema = parts[0]
else:
referent_schema = None
if tname != '_alembic_batch_temp':
key = sql_schema._get_table_key(tname, referent_schema)
if key in metadata.tables:
t = metadata.tables[key]
for elem in constraint.elements:
colname = elem._get_colspec().split(".")[-1]
if not t.c.contains_column(colname):
t.append_column(
Column(colname, sqltypes.NULLTYPE)
)
else:
Table(
tname, metadata,
*[Column(n, sqltypes.NULLTYPE) for n in
[elem._get_colspec().split(".")[-1]
for elem in constraint.elements]],
schema=referent_schema)
def _setup_referent(self, metadata, constraint):
spec = constraint.elements[0]._get_colspec()
parts = spec.split(".")
tname = parts[-2]
if len(parts) == 3:
referent_schema = parts[0]
else:
referent_schema = None
if tname != '_alembic_batch_temp':
key = sql_schema._get_table_key(tname, referent_schema)
if key in metadata.tables:
t = metadata.tables[key]
for elem in constraint.elements:
colname = elem._get_colspec().split(".")[-1]
if not t.c.contains_column(colname):
t.append_column(
Column(colname, sqltypes.NULLTYPE)
)
else:
Table(
tname, metadata,
*[Column(n, sqltypes.NULLTYPE) for n in
[elem._get_colspec().split(".")[-1]
for elem in constraint.elements]],
schema=referent_schema)
def _setup_referent(self, metadata, constraint):
spec = constraint.elements[0]._get_colspec()
parts = spec.split(".")
tname = parts[-2]
if len(parts) == 3:
referent_schema = parts[0]
else:
referent_schema = None
if tname != '_alembic_batch_temp':
key = sql_schema._get_table_key(tname, referent_schema)
if key in metadata.tables:
t = metadata.tables[key]
for elem in constraint.elements:
colname = elem._get_colspec().split(".")[-1]
if not t.c.contains_column(colname):
t.append_column(
Column(colname, sqltypes.NULLTYPE)
)
else:
Table(
tname, metadata,
*[Column(n, sqltypes.NULLTYPE) for n in
[elem._get_colspec().split(".")[-1]
for elem in constraint.elements]],
schema=referent_schema)
def _compare_tables(conn_table_names, metadata_table_names,
object_filters,
inspector, metadata, diffs, autogen_context):
for s, tname in metadata_table_names.difference(conn_table_names):
name = '%s.%s' % (s, tname) if s else tname
metadata_table = metadata.tables[sa_schema._get_table_key(tname, s)]
if _run_filters(metadata_table, tname, "table", False, None, object_filters):
diffs.append(("add_table", metadata.tables[name]))
log.info("Detected added table %r", name)
_compare_indexes_and_uniques(s, tname, object_filters,
None,
metadata_table,
diffs, autogen_context, inspector)
removal_metadata = sa_schema.MetaData()
for s, tname in conn_table_names.difference(metadata_table_names):
name = sa_schema._get_table_key(tname, s)
exists = name in removal_metadata.tables
t = sa_schema.Table(tname, removal_metadata, schema=s)
if not exists:
inspector.reflecttable(t, None)
if _run_filters(t, tname, "table", True, None, object_filters):
diffs.append(("remove_table", t))
log.info("Detected removed table %r", name)
existing_tables = conn_table_names.intersection(metadata_table_names)
existing_metadata = sa_schema.MetaData()
conn_column_info = {}
for s, tname in existing_tables:
name = sa_schema._get_table_key(tname, s)
exists = name in existing_metadata.tables
t = sa_schema.Table(tname, existing_metadata, schema=s)
if not exists:
inspector.reflecttable(t, None)
conn_column_info[(s, tname)] = t
for s, tname in sorted(existing_tables):
name = '%s.%s' % (s, tname) if s else tname
metadata_table = metadata.tables[name]
conn_table = existing_metadata.tables[name]
if _run_filters(metadata_table, tname, "table", False, conn_table, object_filters):
_compare_columns(s, tname, object_filters,
conn_table,
metadata_table,
diffs, autogen_context, inspector)
_compare_indexes_and_uniques(s, tname, object_filters,
conn_table,
metadata_table,
diffs, autogen_context, inspector)
# TODO:
# table constraints
# sequences
def _compare_tables(conn_table_names, metadata_table_names,
object_filters,
inspector, metadata, diffs, autogen_context):
for s, tname in metadata_table_names.difference(conn_table_names):
name = '%s.%s' % (s, tname) if s else tname
metadata_table = metadata.tables[sa_schema._get_table_key(tname, s)]
if _run_filters(metadata_table, tname, "table", False, None, object_filters):
diffs.append(("add_table", metadata.tables[name]))
log.info("Detected added table %r", name)
_compare_indexes_and_uniques(s, tname, object_filters,
None,
metadata_table,
diffs, autogen_context, inspector)
removal_metadata = sa_schema.MetaData()
for s, tname in conn_table_names.difference(metadata_table_names):
name = sa_schema._get_table_key(tname, s)
exists = name in removal_metadata.tables
t = sa_schema.Table(tname, removal_metadata, schema=s)
if not exists:
inspector.reflecttable(t, None)
if _run_filters(t, tname, "table", True, None, object_filters):
diffs.append(("remove_table", t))
log.info("Detected removed table %r", name)
existing_tables = conn_table_names.intersection(metadata_table_names)
existing_metadata = sa_schema.MetaData()
conn_column_info = {}
for s, tname in existing_tables:
name = sa_schema._get_table_key(tname, s)
exists = name in existing_metadata.tables
t = sa_schema.Table(tname, existing_metadata, schema=s)
if not exists:
inspector.reflecttable(t, None)
conn_column_info[(s, tname)] = t
for s, tname in sorted(existing_tables):
name = '%s.%s' % (s, tname) if s else tname
metadata_table = metadata.tables[name]
conn_table = existing_metadata.tables[name]
if _run_filters(metadata_table, tname, "table", False, conn_table, object_filters):
_compare_columns(s, tname, object_filters,
conn_table,
metadata_table,
diffs, autogen_context, inspector)
_compare_indexes_and_uniques(s, tname, object_filters,
conn_table,
metadata_table,
diffs, autogen_context, inspector)
# TODO:
# table constraints
# sequences