@Test
public void test_producer() throws Exception {
String topic = "test";
ProducerProperties properties = new ProducerProperties();
properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);
createTopic(topic);
KafkaProducer producer = new KafkaProducer("localhost", START_PORT, topic, properties);
producer.connect().sync();
KafkaTopic kafkaTopic = producer.topic();
kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "01").getBytes()));
kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "02").getBytes()));
kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "03").getBytes()));
final KafkaStream<byte[], byte[]> stream = consume(topic).get(0);
final ConsumerIterator<byte[], byte[]> messages = stream.iterator();
Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "01"));
Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "02"));
Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "03"));
producer.disconnect().sync();
}
KafkaTopicSingleBrokerTest.java 文件源码
java
阅读 15
收藏 0
点赞 0
评论 0
项目:netty-kafka-producer
作者:
评论列表
文章目录