@Test
public void test_no_acks_send_message() throws Exception {
String topic = "test_no_acks_send_message";
createTopic(topic, 1);
ProducerProperties properties = new ProducerProperties();
properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);
properties.override(ProducerProperties.DATA_ACK, Acknowledgment.WAIT_FOR_NO_ONE);
KafkaProducer producer = new KafkaProducer("localhost", START_PORT, topic, properties);
producer.connect().sync();
KafkaTopic kafkaTopic = producer.topic();
kafkaTopic.send(null, freeLaterBuffer(TEST_MESSAGE.getBytes()));
final List<KafkaStream<byte[], byte[]>> consume = consume(topic);
final KafkaStream<byte[], byte[]> stream = consume.get(0);
final ConsumerIterator<byte[], byte[]> messages = stream.iterator();
Assert.assertThat(TEST_MESSAGE, is(new String(messages.next().message())));
producer.disconnect().sync();
}
ListenerTest.java 文件源码
java
阅读 19
收藏 0
点赞 0
评论 0
项目:netty-kafka-producer
作者:
评论列表
文章目录