def _make_foreign_key(params, conn_table):
tname = params['referred_table']
if params['referred_schema']:
tname = "%s.%s" % (params['referred_schema'], tname)
options = params.get('options', {})
const = sa_schema.ForeignKeyConstraint(
[conn_table.c[cname] for cname in params['constrained_columns']],
["%s.%s" % (tname, n) for n in params['referred_columns']],
onupdate=options.get('onupdate'),
ondelete=options.get('ondelete'),
deferrable=options.get('deferrable'),
initially=options.get('initially'),
name=params['name']
)
# needed by 0.7
conn_table.append_constraint(const)
return const
python类ForeignKeyConstraint()的实例源码
def generic_constraint(self, name, table_name, type_, schema=None, **kw):
t = self.table(table_name, schema=schema)
types = {
'foreignkey': lambda name: sa_schema.ForeignKeyConstraint(
[], [], name=name),
'primary': sa_schema.PrimaryKeyConstraint,
'unique': sa_schema.UniqueConstraint,
'check': lambda name: sa_schema.CheckConstraint("", name=name),
None: sa_schema.Constraint
}
try:
const = types[type_]
except KeyError:
raise TypeError("'type' can be one of %s" %
", ".join(sorted(repr(x) for x in types)))
else:
const = const(name=name)
t.append_constraint(const)
return const
def visit_drop_constraint(self, drop):
constraint = drop.element
if isinstance(constraint, sa_schema.ForeignKeyConstraint):
qual = "FOREIGN KEY "
const = self.preparer.format_constraint(constraint)
elif isinstance(constraint, sa_schema.PrimaryKeyConstraint):
qual = "PRIMARY KEY "
const = ""
elif isinstance(constraint, sa_schema.UniqueConstraint):
qual = "INDEX "
const = self.preparer.format_constraint(constraint)
else:
qual = ""
const = self.preparer.format_constraint(constraint)
return "ALTER TABLE %s DROP %s%s" % \
(self.preparer.format_table(constraint.table),
qual, const)
def _mysql_drop_constraint(element, compiler, **kw):
"""Redefine SQLAlchemy's drop constraint to
raise errors for invalid constraint type."""
constraint = element.element
if isinstance(constraint, (schema.ForeignKeyConstraint,
schema.PrimaryKeyConstraint,
schema.UniqueConstraint)
):
return compiler.visit_drop_constraint(element, **kw)
elif isinstance(constraint, schema.CheckConstraint):
raise NotImplementedError(
"MySQL does not support CHECK constraints.")
else:
raise NotImplementedError(
"No generic 'DROP CONSTRAINT' in MySQL - "
"please specify constraint type")
def _make_foreign_key(params, conn_table):
tname = params['referred_table']
if params['referred_schema']:
tname = "%s.%s" % (params['referred_schema'], tname)
options = params.get('options', {})
const = sa_schema.ForeignKeyConstraint(
[conn_table.c[cname] for cname in params['constrained_columns']],
["%s.%s" % (tname, n) for n in params['referred_columns']],
onupdate=options.get('onupdate'),
ondelete=options.get('ondelete'),
deferrable=options.get('deferrable'),
initially=options.get('initially'),
name=params['name']
)
# needed by 0.7
conn_table.append_constraint(const)
return const
def generic_constraint(self, name, table_name, type_, schema=None, **kw):
t = self.table(table_name, schema=schema)
types = {
'foreignkey': lambda name: sa_schema.ForeignKeyConstraint(
[], [], name=name),
'primary': sa_schema.PrimaryKeyConstraint,
'unique': sa_schema.UniqueConstraint,
'check': lambda name: sa_schema.CheckConstraint("", name=name),
None: sa_schema.Constraint
}
try:
const = types[type_]
except KeyError:
raise TypeError("'type' can be one of %s" %
", ".join(sorted(repr(x) for x in types)))
else:
const = const(name=name)
t.append_constraint(const)
return const
def visit_drop_constraint(self, drop):
constraint = drop.element
if isinstance(constraint, sa_schema.ForeignKeyConstraint):
qual = "FOREIGN KEY "
const = self.preparer.format_constraint(constraint)
elif isinstance(constraint, sa_schema.PrimaryKeyConstraint):
qual = "PRIMARY KEY "
const = ""
elif isinstance(constraint, sa_schema.UniqueConstraint):
qual = "INDEX "
const = self.preparer.format_constraint(constraint)
else:
qual = ""
const = self.preparer.format_constraint(constraint)
return "ALTER TABLE %s DROP %s%s" % \
(self.preparer.format_table(constraint.table),
qual, const)
def _mysql_drop_constraint(element, compiler, **kw):
"""Redefine SQLAlchemy's drop constraint to
raise errors for invalid constraint type."""
constraint = element.element
if isinstance(constraint, (schema.ForeignKeyConstraint,
schema.PrimaryKeyConstraint,
schema.UniqueConstraint)
):
return compiler.visit_drop_constraint(element, **kw)
elif isinstance(constraint, schema.CheckConstraint):
raise NotImplementedError(
"MySQL does not support CHECK constraints.")
else:
raise NotImplementedError(
"No generic 'DROP CONSTRAINT' in MySQL - "
"please specify constraint type")
def _make_foreign_key(params, conn_table):
tname = params['referred_table']
if params['referred_schema']:
tname = "%s.%s" % (params['referred_schema'], tname)
options = params.get('options', {})
const = sa_schema.ForeignKeyConstraint(
[conn_table.c[cname] for cname in params['constrained_columns']],
["%s.%s" % (tname, n) for n in params['referred_columns']],
onupdate=options.get('onupdate'),
ondelete=options.get('ondelete'),
deferrable=options.get('deferrable'),
initially=options.get('initially'),
name=params['name']
)
# needed by 0.7
conn_table.append_constraint(const)
return const
def generic_constraint(self, name, table_name, type_, schema=None, **kw):
t = self.table(table_name, schema=schema)
types = {
'foreignkey': lambda name: sa_schema.ForeignKeyConstraint(
[], [], name=name),
'primary': sa_schema.PrimaryKeyConstraint,
'unique': sa_schema.UniqueConstraint,
'check': lambda name: sa_schema.CheckConstraint("", name=name),
None: sa_schema.Constraint
}
try:
const = types[type_]
except KeyError:
raise TypeError("'type' can be one of %s" %
", ".join(sorted(repr(x) for x in types)))
else:
const = const(name=name)
t.append_constraint(const)
return const
def visit_drop_constraint(self, drop):
constraint = drop.element
if isinstance(constraint, sa_schema.ForeignKeyConstraint):
qual = "FOREIGN KEY "
const = self.preparer.format_constraint(constraint)
elif isinstance(constraint, sa_schema.PrimaryKeyConstraint):
qual = "PRIMARY KEY "
const = ""
elif isinstance(constraint, sa_schema.UniqueConstraint):
qual = "INDEX "
const = self.preparer.format_constraint(constraint)
else:
qual = ""
const = self.preparer.format_constraint(constraint)
return "ALTER TABLE %s DROP %s%s" % \
(self.preparer.format_table(constraint.table),
qual, const)
def _mysql_drop_constraint(element, compiler, **kw):
"""Redefine SQLAlchemy's drop constraint to
raise errors for invalid constraint type."""
constraint = element.element
if isinstance(constraint, (schema.ForeignKeyConstraint,
schema.PrimaryKeyConstraint,
schema.UniqueConstraint)
):
return compiler.visit_drop_constraint(element, **kw)
elif isinstance(constraint, schema.CheckConstraint):
raise NotImplementedError(
"MySQL does not support CHECK constraints.")
else:
raise NotImplementedError(
"No generic 'DROP CONSTRAINT' in MySQL - "
"please specify constraint type")
def _make_foreign_key(params, conn_table):
tname = params['referred_table']
if params['referred_schema']:
tname = "%s.%s" % (params['referred_schema'], tname)
options = params.get('options', {})
const = sa_schema.ForeignKeyConstraint(
[conn_table.c[cname] for cname in params['constrained_columns']],
["%s.%s" % (tname, n) for n in params['referred_columns']],
onupdate=options.get('onupdate'),
ondelete=options.get('ondelete'),
deferrable=options.get('deferrable'),
initially=options.get('initially'),
name=params['name']
)
# needed by 0.7
conn_table.append_constraint(const)
return const
def generic_constraint(self, name, table_name, type_, schema=None, **kw):
t = self.table(table_name, schema=schema)
types = {
'foreignkey': lambda name: sa_schema.ForeignKeyConstraint(
[], [], name=name),
'primary': sa_schema.PrimaryKeyConstraint,
'unique': sa_schema.UniqueConstraint,
'check': lambda name: sa_schema.CheckConstraint("", name=name),
None: sa_schema.Constraint
}
try:
const = types[type_]
except KeyError:
raise TypeError("'type' can be one of %s" %
", ".join(sorted(repr(x) for x in types)))
else:
const = const(name=name)
t.append_constraint(const)
return const
def visit_drop_constraint(self, drop):
constraint = drop.element
if isinstance(constraint, sa_schema.ForeignKeyConstraint):
qual = "FOREIGN KEY "
const = self.preparer.format_constraint(constraint)
elif isinstance(constraint, sa_schema.PrimaryKeyConstraint):
qual = "PRIMARY KEY "
const = ""
elif isinstance(constraint, sa_schema.UniqueConstraint):
qual = "INDEX "
const = self.preparer.format_constraint(constraint)
else:
qual = ""
const = self.preparer.format_constraint(constraint)
return "ALTER TABLE %s DROP %s%s" % \
(self.preparer.format_table(constraint.table),
qual, const)
def visit_drop_constraint(self, drop):
constraint = drop.element
if isinstance(constraint, sa_schema.ForeignKeyConstraint):
qual = "FOREIGN KEY "
const = self.preparer.format_constraint(constraint)
elif isinstance(constraint, sa_schema.PrimaryKeyConstraint):
qual = "PRIMARY KEY "
const = ""
elif isinstance(constraint, sa_schema.UniqueConstraint):
qual = "INDEX "
const = self.preparer.format_constraint(constraint)
else:
qual = ""
const = self.preparer.format_constraint(constraint)
return "ALTER TABLE %s DROP %s%s" % \
(self.preparer.format_table(constraint.table),
qual, const)
def visit_drop_constraint(self, drop):
constraint = drop.element
if isinstance(constraint, sa_schema.ForeignKeyConstraint):
qual = "FOREIGN KEY "
const = self.preparer.format_constraint(constraint)
elif isinstance(constraint, sa_schema.PrimaryKeyConstraint):
qual = "PRIMARY KEY "
const = ""
elif isinstance(constraint, sa_schema.UniqueConstraint):
qual = "INDEX "
const = self.preparer.format_constraint(constraint)
else:
qual = ""
const = self.preparer.format_constraint(constraint)
return "ALTER TABLE %s DROP %s%s" % \
(self.preparer.format_table(constraint.table),
qual, const)
def visit_drop_constraint(self, drop):
constraint = drop.element
if isinstance(constraint, sa_schema.ForeignKeyConstraint):
qual = "FOREIGN KEY "
const = self.preparer.format_constraint(constraint)
elif isinstance(constraint, sa_schema.PrimaryKeyConstraint):
qual = "PRIMARY KEY "
const = ""
elif isinstance(constraint, sa_schema.UniqueConstraint):
qual = "INDEX "
const = self.preparer.format_constraint(constraint)
else:
qual = ""
const = self.preparer.format_constraint(constraint)
return "ALTER TABLE %s DROP %s%s" % \
(self.preparer.format_table(constraint.table),
qual, const)
def visit_drop_constraint(self, drop):
constraint = drop.element
if isinstance(constraint, sa_schema.ForeignKeyConstraint):
qual = "FOREIGN KEY "
const = self.preparer.format_constraint(constraint)
elif isinstance(constraint, sa_schema.PrimaryKeyConstraint):
qual = "PRIMARY KEY "
const = ""
elif isinstance(constraint, sa_schema.UniqueConstraint):
qual = "INDEX "
const = self.preparer.format_constraint(constraint)
else:
qual = ""
const = self.preparer.format_constraint(constraint)
return "ALTER TABLE %s DROP %s%s" % \
(self.preparer.format_table(constraint.table),
qual, const)
def visit_drop_constraint(self, drop):
constraint = drop.element
if isinstance(constraint, sa_schema.ForeignKeyConstraint):
qual = "FOREIGN KEY "
const = self.preparer.format_constraint(constraint)
elif isinstance(constraint, sa_schema.PrimaryKeyConstraint):
qual = "PRIMARY KEY "
const = ""
elif isinstance(constraint, sa_schema.UniqueConstraint):
qual = "INDEX "
const = self.preparer.format_constraint(constraint)
else:
qual = ""
const = self.preparer.format_constraint(constraint)
return "ALTER TABLE %s DROP %s%s" % \
(self.preparer.format_table(constraint.table),
qual, const)