@Test
public void test_sendMessage() throws Exception {
createTopic(topic);
CountDownLatch latch = new CountDownLatch(1);
ProducerProperties properties = new ProducerProperties();
properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);
DataKafkaBroker dataChannel = new DataKafkaBroker("localhost", START_PORT, 0, topic,new NioEventLoopGroup(), properties);
dataChannel.connect().sync();
dataChannel.send(freeLaterBuffer("1".getBytes()), 0, freeLaterBuffer(TEST_MESSAGE.getBytes()));
final KafkaStream<byte[], byte[]> stream = consume(topic).get(0);
final ConsumerIterator<byte[], byte[]> messages = stream.iterator();
Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE));
dataChannel.disconnect();
}
DataBrokerTest.java 文件源码
java
阅读 19
收藏 0
点赞 0
评论 0
项目:netty-kafka-producer
作者:
评论列表
文章目录