KafkaTopicSingleBrokerTest.java 文件源码

java
阅读 15 收藏 0 点赞 0 评论 0

项目:netty-kafka-producer 作者:
@Test
public void test_producer() throws Exception {

    String topic = "test";
    ProducerProperties properties = new ProducerProperties();
    properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);
    createTopic(topic);

    KafkaProducer producer = new KafkaProducer("localhost", START_PORT, topic, properties);
    producer.connect().sync();
    KafkaTopic kafkaTopic = producer.topic();

    kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "01").getBytes()));
    kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "02").getBytes()));
    kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "03").getBytes()));

    final KafkaStream<byte[], byte[]> stream = consume(topic).get(0);
    final ConsumerIterator<byte[], byte[]> messages = stream.iterator();

    Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "01"));
    Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "02"));
    Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "03"));
    producer.disconnect().sync();
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号