athena_cli.py 文件源码

python
阅读 31 收藏 0 点赞 0 评论 0

项目:athena-cli 作者: guardian 项目源码 文件源码
def execute(self, statement):
        execution_id = self.athena.start_query_execution(self.dbname, statement)
        if not execution_id:
            return

        while True:
            stats = self.athena.get_query_execution(execution_id)
            status = stats['QueryExecution']['Status']['State']
            if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
                break
            time.sleep(0.2)  # 200ms

        if status == 'SUCCEEDED':
            results = self.athena.get_query_results(execution_id)
            headers = [h['Name'].encode("utf-8") for h in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]

            if self.format in ['CSV', 'CSV_HEADER']:
                csv_writer = csv.writer(sys.stdout, quoting=csv.QUOTE_ALL)
                if self.format == 'CSV_HEADER':
                    csv_writer.writerow(headers)
                csv_writer.writerows([[text.encode("utf-8") for text in row] for row in self.athena.yield_rows(results, headers)])
            elif self.format == 'TSV':
                print(tabulate([row for row in self.athena.yield_rows(results, headers)], tablefmt='tsv'))
            elif self.format == 'TSV_HEADER':
                print(tabulate([row for row in self.athena.yield_rows(results, headers)], headers=headers, tablefmt='tsv'))
            elif self.format == 'VERTICAL':
                for num, row in enumerate(self.athena.yield_rows(results, headers)):
                    print('--[RECORD {}]--'.format(num+1))
                    print(tabulate(zip(*[headers, row]), tablefmt='presto'))
            else:  # ALIGNED
                print(tabulate([x for x in self.athena.yield_rows(results, headers)], headers=headers, tablefmt='presto'))

        if status == 'FAILED':
            print(stats['QueryExecution']['Status']['StateChangeReason'])
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号