def get_indexes(self, connection, table_name, schema=None, **kw):
"""
Overrides base class method
"""
if schema is None:
schema = self.default_schema_name
stmt = select(["*"], from_obj=[text('dbc.Indices')]) \
.where(and_(text('DatabaseName = :schema'),
text('TableName=:table'))) \
.order_by(asc(column('IndexName')))
res = connection.execute(stmt, schema=schema, table=table_name).fetchall()
def grouper(fk_row):
return {
'name': fk_row.IndexName or fk_row.IndexNumber, # If IndexName is None TODO: Check what to do
'unique': True if fk_row.UniqueFlag == 'Y' else False
}
# TODO: Check if there's a better way
indices = list()
for index_info, index_cols in groupby(res, grouper):
index_dict = {
'name': index_info['name'],
'column_names': list(),
'unique': index_info['unique']
}
for index_col in index_cols:
index_dict['column_names'].append(self.normalize_name(index_col['ColumnName']))
indices.append(index_dict)
return indices
评论列表
文章目录