KafkaInputFormatTest.java 文件源码

java
阅读 27 收藏 0 点赞 0 评论 0

项目:kangaroo 作者:
@Test
public void testGetInputSplits() throws Exception {
    final KafkaInputFormat inputFormat = spy(new KafkaInputFormat());
    final SimpleConsumer mockConsumer = mock(SimpleConsumer.class);
    final ZkUtils mockZk = mock(ZkUtils.class);
    final Configuration mockConf = new Configuration(false);

    final Broker broker = new Broker("127.0.0.1", 9092, 1);
    doReturn(mockConsumer).when(inputFormat).getConsumer(broker);
    doReturn(mockZk).when(inputFormat).getZk(mockConf);
    doReturn(Lists.newArrayList(20l, 10l)).when(inputFormat).getOffsets(mockConsumer, "topic", 0, -1, 0,
            Integer.MAX_VALUE);
    doReturn(Lists.newArrayList(30l, 20l, 0l)).when(inputFormat).getOffsets(mockConsumer, "topic", 1, 10, 0,
            Integer.MAX_VALUE);

    final Partition p1 = new Partition("topic", 0, broker);
    final Partition p2 = new Partition("topic", 1, broker);
    when(mockZk.getPartitions("topic")).thenReturn(Lists.newArrayList(p1, p2));
    when(mockZk.getBroker(1)).thenReturn(broker);
    when(mockZk.getLastCommit("group", p1)).thenReturn(-1l);
    when(mockZk.getLastCommit("group", p2)).thenReturn(10l);

    final List<InputSplit> result = inputFormat.getInputSplits(mockConf, "topic", "group");

    // assert the contents of each split
    Assert.assertEquals(3, result.size());
    final KafkaInputSplit split1 = (KafkaInputSplit) result.get(0);
    final Broker broker1 = split1.getPartition().getBroker();
    assertEquals(broker, broker1);
    assertEquals("127.0.0.1", broker1.getHost());
    assertEquals(9092, broker1.getPort());
    assertEquals(1, broker1.getId());
    assertEquals("1-0", split1.getPartition().getBrokerPartition());
    assertEquals(0, split1.getPartition().getPartId());
    assertEquals(10l, split1.getStartOffset());
    assertEquals(20l, split1.getEndOffset());
    assertEquals("topic", split1.getPartition().getTopic());

    final KafkaInputSplit split2 = (KafkaInputSplit) result.get(1);
    assertEquals(20l, split2.getStartOffset());
    assertEquals(30l, split2.getEndOffset());
    assertEquals("1-1", split2.getPartition().getBrokerPartition());

    final KafkaInputSplit split3 = (KafkaInputSplit) result.get(2);
    assertEquals(0l, split3.getStartOffset());
    assertEquals(20l, split3.getEndOffset());
    assertEquals("1-1", split3.getPartition().getBrokerPartition());

    // verify one and only one call to getConsumer - should get the cached consumer second time around
    verify(inputFormat, times(1)).getConsumer(broker);
    verify(inputFormat, times(1)).getConsumer(any(Broker.class));

    // verify the closeable components are closed
    verify(mockConsumer, times(1)).close();
    verify(mockZk, times(1)).close();
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号