def get_unique_constraints(self, connection, table_name, schema=None, **kw):
"""
Overrides base class method
"""
if schema is None:
schema = self.default_schema_name
stmt = select([column('ColumnName'), column('IndexName')], from_obj=[text('dbc.Indices')]) \
.where(and_(text('DatabaseName = :schema'),
text('TableName=:table'),
text('IndexType=:indextype'))) \
.order_by(asc(column('IndexName')))
# U for Unique
res = connection.execute(stmt, schema=schema, table=table_name, indextype='U').fetchall()
def grouper(fk_row):
return {
'name': self.normalize_name(fk_row['IndexName']),
}
unique_constraints = list()
for constraint_info, constraint_cols in groupby(res, grouper):
unique_constraint = {
'name': self.normalize_name(constraint_info['name']),
'column_names': list()
}
for constraint_col in constraint_cols:
unique_constraint['column_names'].append(self.normalize_name(constraint_col['ColumnName']))
unique_constraints.append(unique_constraint)
return unique_constraints
评论列表
文章目录