def test_batchsize_2_post_fill(self):
record_queue = Queue()
batch_received = Event()
def handler(record_batch):
assert len(record_batch) == 2, \
"Incorrect batch size (expected 2, but found {}.".format(len(record_batch))
batch_received.set()
consumer = QueueConsumer("Test Consumer", record_queue, handler, batch_size=2)
consumer.start()
record_queue.put("Item1")
record_queue.put("Item2")
batch_received.wait(timeout=2000)
consumer.stop()
#
# With flush timeout
#
评论列表
文章目录