def get_read_stream(self, dag_id, task_id, execution_date):
key_name = self.get_key_name(dag_id, task_id, execution_date)
key = self.bucket.get_key(key_name)
if key is not None:
import tempfile
temp_file_stream = tempfile.TemporaryFile(mode='w+b')
key.get_file(temp_file_stream)
# Stream has been read in and is now at the end
# So reset it to the start
temp_file_stream.seek(0)
return temp_file_stream
message = \
'S3 key named {key_name} in bucket {bucket_name} does not exist.'.format(key_name=key_name,
bucket_name=self.bucket_name)
raise StorageDriverError(message)
评论列表
文章目录