def __init__(self, stream_name, back_off_limit=60, send_window=13):
self.stream_name = stream_name
self.back_off_limit = back_off_limit
self.last_send = 0
self._kinesis = boto3.client('kinesis')
self._sequence_number_for_ordering = '0'
self._record_agg = aws_kinesis_agg.aggregator.RecordAggregator()
self._send_window = send_window
try:
self._kinesis.create_stream(StreamName=stream_name, ShardCount=1)
except ClientError as e:
# ResourceInUseException is raised when the stream already exists
if e.response['Error']['Code'] != 'ResourceInUseException':
logger.error(e)
raise
waiter = self._kinesis.get_waiter('stream_exists')
# waits up to 180 seconds for stream to exist
waiter.wait(StreamName=self.stream_name)
评论列表
文章目录