def correct_for_autogen_constraints(self, conn_unique_constraints,
conn_indexes,
metadata_unique_constraints,
metadata_indexes):
conn_uniques_by_name = dict(
(c.name, c) for c in conn_unique_constraints)
conn_indexes_by_name = dict(
(c.name, c) for c in conn_indexes)
# TODO: if SQLA 1.0, make use of "duplicates_constraint"
# metadata
doubled_constraints = dict(
(name, (conn_uniques_by_name[name], conn_indexes_by_name[name]))
for name in set(conn_uniques_by_name).intersection(
conn_indexes_by_name)
)
for name, (uq, ix) in doubled_constraints.items():
conn_indexes.remove(ix)
for idx in list(metadata_indexes):
if idx.name in conn_indexes_by_name:
continue
if compat.sqla_08:
exprs = idx.expressions
else:
exprs = idx.columns
for expr in exprs:
if not isinstance(expr, (Column, UnaryExpression)):
util.warn(
"autogenerate skipping functional index %s; "
"not supported by SQLAlchemy reflection" % idx.name
)
metadata_indexes.discard(idx)
python类_UnaryExpression()的实例源码
def correct_for_autogen_constraints(self, conn_unique_constraints,
conn_indexes,
metadata_unique_constraints,
metadata_indexes):
conn_uniques_by_name = dict(
(c.name, c) for c in conn_unique_constraints)
conn_indexes_by_name = dict(
(c.name, c) for c in conn_indexes)
# TODO: if SQLA 1.0, make use of "duplicates_constraint"
# metadata
doubled_constraints = dict(
(name, (conn_uniques_by_name[name], conn_indexes_by_name[name]))
for name in set(conn_uniques_by_name).intersection(
conn_indexes_by_name)
)
for name, (uq, ix) in doubled_constraints.items():
conn_indexes.remove(ix)
for idx in list(metadata_indexes):
if idx.name in conn_indexes_by_name:
continue
if compat.sqla_08:
exprs = idx.expressions
else:
exprs = idx.columns
for expr in exprs:
if not isinstance(expr, (Column, UnaryExpression)):
util.warn(
"autogenerate skipping functional index %s; "
"not supported by SQLAlchemy reflection" % idx.name
)
metadata_indexes.discard(idx)
def correct_for_autogen_constraints(self, conn_unique_constraints,
conn_indexes,
metadata_unique_constraints,
metadata_indexes):
conn_uniques_by_name = dict(
(c.name, c) for c in conn_unique_constraints)
conn_indexes_by_name = dict(
(c.name, c) for c in conn_indexes)
# TODO: if SQLA 1.0, make use of "duplicates_constraint"
# metadata
doubled_constraints = dict(
(name, (conn_uniques_by_name[name], conn_indexes_by_name[name]))
for name in set(conn_uniques_by_name).intersection(
conn_indexes_by_name)
)
for name, (uq, ix) in doubled_constraints.items():
conn_indexes.remove(ix)
for idx in list(metadata_indexes):
if idx.name in conn_indexes_by_name:
continue
if util.sqla_08:
exprs = idx.expressions
else:
exprs = idx.columns
for expr in exprs:
while isinstance(expr, UnaryExpression):
expr = expr.element
if not isinstance(expr, Column):
util.warn(
"autogenerate skipping functional index %s; "
"not supported by SQLAlchemy reflection" % idx.name
)
metadata_indexes.discard(idx)
def correct_for_autogen_constraints(self, conn_unique_constraints,
conn_indexes,
metadata_unique_constraints,
metadata_indexes):
conn_uniques_by_name = dict(
(c.name, c) for c in conn_unique_constraints)
conn_indexes_by_name = dict(
(c.name, c) for c in conn_indexes)
# TODO: if SQLA 1.0, make use of "duplicates_constraint"
# metadata
doubled_constraints = dict(
(name, (conn_uniques_by_name[name], conn_indexes_by_name[name]))
for name in set(conn_uniques_by_name).intersection(
conn_indexes_by_name)
)
for name, (uq, ix) in doubled_constraints.items():
conn_indexes.remove(ix)
for idx in list(metadata_indexes):
if idx.name in conn_indexes_by_name:
continue
if compat.sqla_08:
exprs = idx.expressions
else:
exprs = idx.columns
for expr in exprs:
if not isinstance(expr, (Column, UnaryExpression)):
util.warn(
"autogenerate skipping functional index %s; "
"not supported by SQLAlchemy reflection" % idx.name
)
metadata_indexes.discard(idx)
def correct_for_autogen_constraints(self, conn_unique_constraints,
conn_indexes,
metadata_unique_constraints,
metadata_indexes):
conn_uniques_by_name = dict(
(c.name, c) for c in conn_unique_constraints)
conn_indexes_by_name = dict(
(c.name, c) for c in conn_indexes)
# TODO: if SQLA 1.0, make use of "duplicates_constraint"
# metadata
doubled_constraints = dict(
(name, (conn_uniques_by_name[name], conn_indexes_by_name[name]))
for name in set(conn_uniques_by_name).intersection(
conn_indexes_by_name)
)
for name, (uq, ix) in doubled_constraints.items():
conn_indexes.remove(ix)
for idx in list(metadata_indexes):
if idx.name in conn_indexes_by_name:
continue
if compat.sqla_08:
exprs = idx.expressions
else:
exprs = idx.columns
for expr in exprs:
if not isinstance(expr, (Column, UnaryExpression)):
util.warn(
"autogenerate skipping functional index %s; "
"not supported by SQLAlchemy reflection" % idx.name
)
metadata_indexes.discard(idx)
def correct_for_autogen_constraints(self, conn_unique_constraints,
conn_indexes,
metadata_unique_constraints,
metadata_indexes):
conn_uniques_by_name = dict(
(c.name, c) for c in conn_unique_constraints)
conn_indexes_by_name = dict(
(c.name, c) for c in conn_indexes)
# TODO: if SQLA 1.0, make use of "duplicates_constraint"
# metadata
doubled_constraints = dict(
(name, (conn_uniques_by_name[name], conn_indexes_by_name[name]))
for name in set(conn_uniques_by_name).intersection(
conn_indexes_by_name)
)
for name, (uq, ix) in doubled_constraints.items():
conn_indexes.remove(ix)
for idx in list(metadata_indexes):
if idx.name in conn_indexes_by_name:
continue
if compat.sqla_08:
exprs = idx.expressions
else:
exprs = idx.columns
for expr in exprs:
while isinstance(expr, UnaryExpression):
expr = expr.element
if not isinstance(expr, Column):
util.warn(
"autogenerate skipping functional index %s; "
"not supported by SQLAlchemy reflection" % idx.name
)
metadata_indexes.discard(idx)
def correct_for_autogen_constraints(self, conn_unique_constraints,
conn_indexes,
metadata_unique_constraints,
metadata_indexes):
conn_uniques_by_name = dict(
(c.name, c) for c in conn_unique_constraints)
conn_indexes_by_name = dict(
(c.name, c) for c in conn_indexes)
# TODO: if SQLA 1.0, make use of "duplicates_constraint"
# metadata
doubled_constraints = dict(
(name, (conn_uniques_by_name[name], conn_indexes_by_name[name]))
for name in set(conn_uniques_by_name).intersection(
conn_indexes_by_name)
)
for name, (uq, ix) in doubled_constraints.items():
conn_indexes.remove(ix)
for idx in list(metadata_indexes):
if idx.name in conn_indexes_by_name:
continue
if util.sqla_08:
exprs = idx.expressions
else:
exprs = idx.columns
for expr in exprs:
while isinstance(expr, UnaryExpression):
expr = expr.element
if not isinstance(expr, Column):
util.warn(
"autogenerate skipping functional index %s; "
"not supported by SQLAlchemy reflection" % idx.name
)
metadata_indexes.discard(idx)
def correct_for_autogen_constraints(self, conn_unique_constraints,
conn_indexes,
metadata_unique_constraints,
metadata_indexes):
conn_uniques_by_name = dict(
(c.name, c) for c in conn_unique_constraints)
conn_indexes_by_name = dict(
(c.name, c) for c in conn_indexes)
# TODO: if SQLA 1.0, make use of "duplicates_constraint"
# metadata
doubled_constraints = dict(
(name, (conn_uniques_by_name[name], conn_indexes_by_name[name]))
for name in set(conn_uniques_by_name).intersection(
conn_indexes_by_name)
)
for name, (uq, ix) in doubled_constraints.items():
conn_indexes.remove(ix)
for idx in list(metadata_indexes):
if idx.name in conn_indexes_by_name:
continue
if util.sqla_08:
exprs = idx.expressions
else:
exprs = idx.columns
for expr in exprs:
while isinstance(expr, UnaryExpression):
expr = expr.element
if not isinstance(expr, Column):
util.warn(
"autogenerate skipping functional index %s; "
"not supported by SQLAlchemy reflection" % idx.name
)
metadata_indexes.discard(idx)
def correct_for_autogen_constraints(self, conn_unique_constraints,
conn_indexes,
metadata_unique_constraints,
metadata_indexes):
conn_uniques_by_name = dict(
(c.name, c) for c in conn_unique_constraints)
conn_indexes_by_name = dict(
(c.name, c) for c in conn_indexes)
# TODO: if SQLA 1.0, make use of "duplicates_constraint"
# metadata
doubled_constraints = dict(
(name, (conn_uniques_by_name[name], conn_indexes_by_name[name]))
for name in set(conn_uniques_by_name).intersection(
conn_indexes_by_name)
)
for name, (uq, ix) in doubled_constraints.items():
conn_indexes.remove(ix)
for idx in list(metadata_indexes):
if idx.name in conn_indexes_by_name:
continue
if compat.sqla_08:
exprs = idx.expressions
else:
exprs = idx.columns
for expr in exprs:
while isinstance(expr, UnaryExpression):
expr = expr.element
if not isinstance(expr, Column):
util.warn(
"autogenerate skipping functional index %s; "
"not supported by SQLAlchemy reflection" % idx.name
)
metadata_indexes.discard(idx)
def correct_for_autogen_constraints(self, conn_unique_constraints,
conn_indexes,
metadata_unique_constraints,
metadata_indexes):
conn_uniques_by_name = dict(
(c.name, c) for c in conn_unique_constraints)
conn_indexes_by_name = dict(
(c.name, c) for c in conn_indexes)
# TODO: if SQLA 1.0, make use of "duplicates_constraint"
# metadata
doubled_constraints = dict(
(name, (conn_uniques_by_name[name], conn_indexes_by_name[name]))
for name in set(conn_uniques_by_name).intersection(
conn_indexes_by_name)
)
for name, (uq, ix) in doubled_constraints.items():
conn_indexes.remove(ix)
for idx in list(metadata_indexes):
if idx.name in conn_indexes_by_name:
continue
if compat.sqla_08:
exprs = idx.expressions
else:
exprs = idx.columns
for expr in exprs:
if not isinstance(expr, (Column, UnaryExpression)):
util.warn(
"autogenerate skipping functional index %s; "
"not supported by SQLAlchemy reflection" % idx.name
)
metadata_indexes.discard(idx)