KafkaInputFormatTest.java 文件源码

java
阅读 23 收藏 0 点赞 0 评论 0

项目:kangaroo 作者:
@Test
public void testGetOffsets() throws Exception {
    final SimpleConsumer consumer = mock(SimpleConsumer.class);

    final long[] offsets = { 101, 91, 81, 71, 61, 51, 41, 31, 21, 11 };
    when(consumer.getOffsetsBefore("topic", 1, OffsetRequest.LatestTime(), Integer.MAX_VALUE)).thenReturn(offsets);
    when(consumer.getOffsetsBefore("topic", 1, 0, 1)).thenReturn(new long[] {});

    final KafkaInputFormat inputFormat = new KafkaInputFormat();

    // case 0: get everything (-1 last commit, 0 asOfTime, as many partitions as possible) -> all offsets
    long[] expected = offsets;
    List<Long> actual = inputFormat.getOffsets(consumer, "topic", 1, -1, 0, Integer.MAX_VALUE);
    compareArrayContents(offsets, actual);

    // case 1: lastCommit of 52 -> we should only get back the first 5 offsets + the lastCommit
    final int lastCommit = 52;
    expected = new long[6];
    System.arraycopy(offsets, 0, expected, 0, 6);
    expected[5] = lastCommit;
    actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, 0, Integer.MAX_VALUE);
    compareArrayContents(expected, actual);

    // case 2: lastCommit of 52, asOfTime 51 -> still include last offsets
    final int asOfTime = 999;
    when(consumer.getOffsetsBefore("topic", 1, asOfTime, 1)).thenReturn(new long[] { 51 });
    actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, Integer.MAX_VALUE);
    compareArrayContents(expected, actual);

    // case 3: lastCommit of 52, asOfTime 52 -> don't include last offsets
    when(consumer.getOffsetsBefore("topic", 1, asOfTime, 1)).thenReturn(new long[] { 52 });
    expected = Arrays.copyOfRange(offsets, 0, 5);
    actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, Integer.MAX_VALUE);
    compareArrayContents(expected, actual);

    // case 4: maxSplitsPerPartition == number of commits (5) -> should include all 5 offsets
    actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 5);
    compareArrayContents(expected, actual);

    // case 5: maxSplitsPerPartition = number of commits - 1 (4) -> should STILL include all 5 offsets
    actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 4);
    compareArrayContents(expected, actual);

    // case 6: maxSplitsPerPartition = number of commits - 2 (3) -> should exclude the first (largest) offset
    actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 3);
    expected = Arrays.copyOfRange(offsets, 1, 5);
    compareArrayContents(expected, actual);

    // case 7: maxSplitsPerPartition = 1 -> should include just 2 commits
    actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 1);
    expected = Arrays.copyOfRange(offsets, 3, 5);
    compareArrayContents(expected, actual);
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号