def add_imports(self, collector):
if self.table.columns:
collector.add_import(Column)
for column in self.table.columns:
collector.add_import(column.type)
if column.server_default:
collector.add_literal_import('sqlalchemy', 'text')
for constraint in sorted(self.table.constraints, key=_get_constraint_sort_key):
if isinstance(constraint, ForeignKeyConstraint):
if len(constraint.columns) > 1:
collector.add_literal_import('sqlalchemy', 'ForeignKeyConstraint')
else:
collector.add_literal_import('sqlalchemy', 'ForeignKey')
elif isinstance(constraint, UniqueConstraint):
if len(constraint.columns) > 1:
collector.add_literal_import('sqlalchemy', 'UniqueConstraint')
elif not isinstance(constraint, PrimaryKeyConstraint):
collector.add_import(constraint)
for index in self.table.indexes:
if len(index.columns) > 1:
collector.add_import(index)
python类ForeignKey()的实例源码
def render_constraint(constraint):
def render_fk_options(*opts):
opts = [repr(opt) for opt in opts]
for attr in 'ondelete', 'onupdate', 'deferrable', 'initially', 'match':
value = getattr(constraint, attr, None)
if value:
opts.append('{0}={1!r}'.format(attr, value))
return ', '.join(opts)
if isinstance(constraint, ForeignKey):
remote_column = '{0}.{1}'.format(constraint.column.table.fullname, constraint.column.name)
return 'ForeignKey({0})'.format(render_fk_options(remote_column))
elif isinstance(constraint, ForeignKeyConstraint):
local_columns = _get_column_names(constraint)
remote_columns = ['{0}.{1}'.format(fk.column.table.fullname, fk.column.name)
for fk in constraint.elements]
return 'ForeignKeyConstraint({0})'.format(render_fk_options(local_columns, remote_columns))
elif isinstance(constraint, CheckConstraint):
return 'CheckConstraint({0!r})'.format(_get_compiled_expression(constraint.sqltext))
elif isinstance(constraint, UniqueConstraint):
columns = [repr(col.name) for col in constraint.columns]
return 'UniqueConstraint({0})'.format(', '.join(columns))
def reflecttable(self, table):
# to use information_schema:
#ischema.reflecttable(self, table, ischema_names, use_mysql=True)
tabletype, foreignkeyD = self.moretableinfo(table=table)
table._impl.mysql_engine = tabletype
c = self.execute("describe " + table.name, {})
while True:
row = c.fetchone()
if row is None:
break
#print "row! " + repr(row)
(name, type, nullable, primary_key, default) = (row[0], row[1], row[2] == 'YES', row[3] == 'PRI', row[4])
match = re.match(r'(\w+)(\(.*?\))?', type)
coltype = match.group(1)
args = match.group(2)
#print "coltype: " + repr(coltype) + " args: " + repr(args)
coltype = ischema_names.get(coltype, MSString)
if args is not None:
args = re.findall(r'(\d+)', args)
#print "args! " +repr(args)
coltype = coltype(*[int(a) for a in args])
arglist = []
fkey = foreignkeyD.get(name)
if fkey is not None:
arglist.append(schema.ForeignKey(fkey))
table.append_item(schema.Column(name, coltype, *arglist,
**dict(primary_key=primary_key,
nullable=nullable,
default=default
)))
def reflecttable(self, table):
c = self.execute("PRAGMA table_info(" + table.name + ")", {})
while True:
row = c.fetchone()
if row is None:
break
#print "row! " + repr(row)
(name, type, nullable, primary_key) = (row[1], row[2].upper(), not row[3], row[5])
match = re.match(r'(\w+)(\(.*?\))?', type)
coltype = match.group(1)
args = match.group(2)
#print "coltype: " + repr(coltype) + " args: " + repr(args)
coltype = pragma_names.get(coltype, SLString)
if args is not None:
args = re.findall(r'(\d+)', args)
#print "args! " +repr(args)
coltype = coltype(*[int(a) for a in args])
table.append_item(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable))
c = self.execute("PRAGMA foreign_key_list(" + table.name + ")", {})
while True:
row = c.fetchone()
if row is None:
break
(tablename, localcol, remotecol) = (row[2], row[3], row[4])
#print "row! " + repr(row)
remotetable = Table(tablename, self, autoload = True)
table.c[localcol].append_item(schema.ForeignKey(remotetable.c[remotecol]))
# check for UNIQUE indexes
c = self.execute("PRAGMA index_list(" + table.name + ")", {})
unique_indexes = []
while True:
row = c.fetchone()
if row is None:
break
if (row[2] == 1):
unique_indexes.append(row[1])
# loop thru unique indexes for one that includes the primary key
for idx in unique_indexes:
c = self.execute("PRAGMA index_info(" + idx + ")", {})
cols = []
while True:
row = c.fetchone()
if row is None:
break
cols.append(row[2])
col = table.columns[row[2]]
# unique index that includes the pk is considered a multiple primary key
for col in cols:
column = table.columns[col]
table.columns[col]._set_primary_key()