def execute(self, statement):
execution_id = self.athena.start_query_execution(self.dbname, statement)
if not execution_id:
return
while True:
stats = self.athena.get_query_execution(execution_id)
status = stats['QueryExecution']['Status']['State']
if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
break
time.sleep(0.2) # 200ms
if status == 'SUCCEEDED':
results = self.athena.get_query_results(execution_id)
headers = [h['Name'].encode("utf-8") for h in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]
if self.format in ['CSV', 'CSV_HEADER']:
csv_writer = csv.writer(sys.stdout, quoting=csv.QUOTE_ALL)
if self.format == 'CSV_HEADER':
csv_writer.writerow(headers)
csv_writer.writerows([[text.encode("utf-8") for text in row] for row in self.athena.yield_rows(results, headers)])
elif self.format == 'TSV':
print(tabulate([row for row in self.athena.yield_rows(results, headers)], tablefmt='tsv'))
elif self.format == 'TSV_HEADER':
print(tabulate([row for row in self.athena.yield_rows(results, headers)], headers=headers, tablefmt='tsv'))
elif self.format == 'VERTICAL':
for num, row in enumerate(self.athena.yield_rows(results, headers)):
print('--[RECORD {}]--'.format(num+1))
print(tabulate(zip(*[headers, row]), tablefmt='presto'))
else: # ALIGNED
print(tabulate([x for x in self.athena.yield_rows(results, headers)], headers=headers, tablefmt='presto'))
if status == 'FAILED':
print(stats['QueryExecution']['Status']['StateChangeReason'])
评论列表
文章目录