def __get_top_for_tables(self, tables, top=30):
tables_information = {}
cursor = self.__conn.cursor()
for table in tables:
tables_information[table] = {'rows': []}
if top > 0:
try:
self.__logger.debug('Getting {top} rows for table {table}'.format(top=top, table=table))
cursor.execute('SELECT * FROM {schema}.{table} LIMIT {top}'.format(top=top, table=table, schema=self.__db_name))
for row in cursor.fetchall():
table_row = []
for column in row:
try:
if type(column) is unicode:
column = unicodedata.normalize('NFKD', column).encode('iso-8859-1', 'replace')
else:
column = str(column).decode('utf8', 'replace').encode('iso-8859-1', 'replace')
if self.__illegal_characters.search(column):
column = re.sub(self.__illegal_characters, '?', column)
if column == 'None':
column = 'NULL'
except:
column = 'Parse_error'
table_row.append(column)
tables_information[table]['rows'].append(table_row)
except pymysql.ProgrammingError:
tables_information[table]['rows'].append(
'Error getting table data {error}'.format(error=pymysql.ProgrammingError.message))
return tables_information
评论列表
文章目录