def __init__(self, config):
log.debug('Starting KinesisProducer')
self.config = config
self._queue = queue.Queue()
self._closed = False
accumulator = RecordAccumulator(RawBuffer, config)
if config['kinesis_concurrency'] == 1:
client = Client(config)
else:
client = ThreadPoolClient(config)
self._sender = Sender(queue=self._queue,
accumulator=accumulator,
client=client,
partitioner=random_partitioner)
self._sender.daemon = True
self._sender.start()
评论列表
文章目录