@Test
public void testGetOffsets() throws Exception {
final SimpleConsumer consumer = mock(SimpleConsumer.class);
final long[] offsets = { 101, 91, 81, 71, 61, 51, 41, 31, 21, 11 };
when(consumer.getOffsetsBefore("topic", 1, OffsetRequest.LatestTime(), Integer.MAX_VALUE)).thenReturn(offsets);
when(consumer.getOffsetsBefore("topic", 1, 0, 1)).thenReturn(new long[] {});
final KafkaInputFormat inputFormat = new KafkaInputFormat();
// case 0: get everything (-1 last commit, 0 asOfTime, as many partitions as possible) -> all offsets
long[] expected = offsets;
List<Long> actual = inputFormat.getOffsets(consumer, "topic", 1, -1, 0, Integer.MAX_VALUE);
compareArrayContents(offsets, actual);
// case 1: lastCommit of 52 -> we should only get back the first 5 offsets + the lastCommit
final int lastCommit = 52;
expected = new long[6];
System.arraycopy(offsets, 0, expected, 0, 6);
expected[5] = lastCommit;
actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, 0, Integer.MAX_VALUE);
compareArrayContents(expected, actual);
// case 2: lastCommit of 52, asOfTime 51 -> still include last offsets
final int asOfTime = 999;
when(consumer.getOffsetsBefore("topic", 1, asOfTime, 1)).thenReturn(new long[] { 51 });
actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, Integer.MAX_VALUE);
compareArrayContents(expected, actual);
// case 3: lastCommit of 52, asOfTime 52 -> don't include last offsets
when(consumer.getOffsetsBefore("topic", 1, asOfTime, 1)).thenReturn(new long[] { 52 });
expected = Arrays.copyOfRange(offsets, 0, 5);
actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, Integer.MAX_VALUE);
compareArrayContents(expected, actual);
// case 4: maxSplitsPerPartition == number of commits (5) -> should include all 5 offsets
actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 5);
compareArrayContents(expected, actual);
// case 5: maxSplitsPerPartition = number of commits - 1 (4) -> should STILL include all 5 offsets
actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 4);
compareArrayContents(expected, actual);
// case 6: maxSplitsPerPartition = number of commits - 2 (3) -> should exclude the first (largest) offset
actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 3);
expected = Arrays.copyOfRange(offsets, 1, 5);
compareArrayContents(expected, actual);
// case 7: maxSplitsPerPartition = 1 -> should include just 2 commits
actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 1);
expected = Arrays.copyOfRange(offsets, 3, 5);
compareArrayContents(expected, actual);
}
KafkaInputFormatTest.java 文件源码
java
阅读 23
收藏 0
点赞 0
评论 0
项目:kangaroo
作者:
评论列表
文章目录