@Test
public void testGetInputSplits() throws Exception {
final KafkaInputFormat inputFormat = spy(new KafkaInputFormat());
final SimpleConsumer mockConsumer = mock(SimpleConsumer.class);
final ZkUtils mockZk = mock(ZkUtils.class);
final Configuration mockConf = new Configuration(false);
final Broker broker = new Broker("127.0.0.1", 9092, 1);
doReturn(mockConsumer).when(inputFormat).getConsumer(broker);
doReturn(mockZk).when(inputFormat).getZk(mockConf);
doReturn(Lists.newArrayList(20l, 10l)).when(inputFormat).getOffsets(mockConsumer, "topic", 0, -1, 0,
Integer.MAX_VALUE);
doReturn(Lists.newArrayList(30l, 20l, 0l)).when(inputFormat).getOffsets(mockConsumer, "topic", 1, 10, 0,
Integer.MAX_VALUE);
final Partition p1 = new Partition("topic", 0, broker);
final Partition p2 = new Partition("topic", 1, broker);
when(mockZk.getPartitions("topic")).thenReturn(Lists.newArrayList(p1, p2));
when(mockZk.getBroker(1)).thenReturn(broker);
when(mockZk.getLastCommit("group", p1)).thenReturn(-1l);
when(mockZk.getLastCommit("group", p2)).thenReturn(10l);
final List<InputSplit> result = inputFormat.getInputSplits(mockConf, "topic", "group");
// assert the contents of each split
Assert.assertEquals(3, result.size());
final KafkaInputSplit split1 = (KafkaInputSplit) result.get(0);
final Broker broker1 = split1.getPartition().getBroker();
assertEquals(broker, broker1);
assertEquals("127.0.0.1", broker1.getHost());
assertEquals(9092, broker1.getPort());
assertEquals(1, broker1.getId());
assertEquals("1-0", split1.getPartition().getBrokerPartition());
assertEquals(0, split1.getPartition().getPartId());
assertEquals(10l, split1.getStartOffset());
assertEquals(20l, split1.getEndOffset());
assertEquals("topic", split1.getPartition().getTopic());
final KafkaInputSplit split2 = (KafkaInputSplit) result.get(1);
assertEquals(20l, split2.getStartOffset());
assertEquals(30l, split2.getEndOffset());
assertEquals("1-1", split2.getPartition().getBrokerPartition());
final KafkaInputSplit split3 = (KafkaInputSplit) result.get(2);
assertEquals(0l, split3.getStartOffset());
assertEquals(20l, split3.getEndOffset());
assertEquals("1-1", split3.getPartition().getBrokerPartition());
// verify one and only one call to getConsumer - should get the cached consumer second time around
verify(inputFormat, times(1)).getConsumer(broker);
verify(inputFormat, times(1)).getConsumer(any(Broker.class));
// verify the closeable components are closed
verify(mockConsumer, times(1)).close();
verify(mockZk, times(1)).close();
}
KafkaInputFormatTest.java 文件源码
java
阅读 27
收藏 0
点赞 0
评论 0
项目:kangaroo
作者:
评论列表
文章目录