java类kafka.consumer.SimpleConsumer的实例源码

KafkaInfos.java 文件源码 项目:DCMonitor 阅读 21 收藏 0 点赞 0 评论 0
private SimpleConsumer createSimpleConsumer(Integer brokerId) {

    try {
      String brokerInfo = zkClient.readData(ZkUtils.BrokerIdsPath() + "/" + brokerId, true);
      if (brokerInfo == null) {
        log.error("Broker clientId %d does not exist", brokerId);
        return null;
      }
      Map<String, Object> map = Resources.jsonMapper.readValue(
        brokerInfo, new TypeReference<Map<String, Object>>() {
        }
      );
      String host = (String) map.get("host");
      Integer port = (Integer) map.get("port");
      return new SimpleConsumer(host, port, 10000, 100000, "KafkaConsumerInfos");
    } catch (Exception e) {
      log.error(e, "Could not parse broker[%d] info", brokerId);
      return null;
    }
  }
KafkaInfos.java 文件源码 项目:DCMonitor 阅读 21 收藏 0 点赞 0 评论 0
private long getTopicLogSize(String topic, int pid) {
  Option<Object> o = ZkUtils.getLeaderForPartition(zkClient, topic, pid);
  if (o.isEmpty() || o.get() == null) {
    log.error("No broker for partition %s - %s", topic, pid);
    return 0;
  }
  Integer leaderId = Int.unbox(o.get());
  SimpleConsumer consumer = consumerMap.get(leaderId);
  if (consumer == null) {
    consumer = createSimpleConsumer(leaderId);
  }
  // createSimpleConsumer may fail.
  if (consumer == null) {
    return 0;
  }
  consumerMap.put(leaderId, consumer);
  TopicAndPartition topicAndPartition = new TopicAndPartition(topic, pid);
  PartitionOffsetRequestInfo requestInfo = new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1);
  OffsetRequest request = new OffsetRequest(
    new Map1<TopicAndPartition, PartitionOffsetRequestInfo>(topicAndPartition, requestInfo),
    0,
    Request.OrdinaryConsumerId()
  );
  OffsetResponse response = consumer.getOffsetsBefore(request);
  PartitionOffsetsResponse offsetsResponse = response.partitionErrorAndOffsets().get(topicAndPartition).get();
  return scala.Long.unbox(offsetsResponse.offsets().head());
}
KafkaInfos.java 文件源码 项目:DCMonitor 阅读 22 收藏 0 点赞 0 评论 0
private List<BrokerInfo> getBrokerInfos() {
  List<BrokerInfo> infos = Lists.newArrayListWithExpectedSize(consumerMap.size());
  for (Map.Entry<Integer, SimpleConsumer> entry : consumerMap.entrySet()) {
    BrokerInfo info = new BrokerInfo();
    info.id = entry.getKey();
    info.host = entry.getValue().host();
    info.port = entry.getValue().port();
    infos.add(info);
  }
  return infos;
}
KafkaInfos.java 文件源码 项目:DCMonitor 阅读 21 收藏 0 点赞 0 评论 0
@Override
public void close() {
  for (SimpleConsumer consumer : consumerMap.values()) {
    if (consumer != null) {
      consumer.close();
    }
  }
  if (zkClient != null) {
    zkClient.close();
  }
}
KafkaInputFormat.java 文件源码 项目:kangaroo 阅读 25 收藏 0 点赞 0 评论 0
/**
 * Gets all of the input splits for the {@code topic}, filtering out any {@link InputSplit}s already consumed by the
 * {@code group}.
 * 
 * @param conf
 *            the job configuration.
 * @param topic
 *            the topic.
 * @param group
 *            the consumer group.
 * @return input splits for the job.
 * @throws IOException
 */
List<InputSplit> getInputSplits(final Configuration conf, final String topic, final String group)
        throws IOException {
    final List<InputSplit> splits = Lists.newArrayList();
    final ZkUtils zk = getZk(conf);
    final Map<Broker, SimpleConsumer> consumers = Maps.newHashMap();
    try {
        for (final Partition partition : zk.getPartitions(topic)) {

            // cache the consumer connections - each partition will make use of each broker consumer
            final Broker broker = partition.getBroker();
            if (!consumers.containsKey(broker)) {
                consumers.put(broker, getConsumer(broker));
            }

            // grab all valid offsets
            final List<Long> offsets = getOffsets(consumers.get(broker), topic, partition.getPartId(),
                    zk.getLastCommit(group, partition), getIncludeOffsetsAfterTimestamp(conf),
                    getMaxSplitsPerPartition(conf));
            for (int i = 0; i < offsets.size() - 1; i++) {
                // ( offsets in descending order )
                final long start = offsets.get(i + 1);
                final long end = offsets.get(i);
                // since the offsets are in descending order, the first offset in the list is the largest offset for
                // the current partition. This split will be in charge of committing the offset for this partition.
                final boolean partitionCommitter = (i == 0);
                final InputSplit split = new KafkaInputSplit(partition, start, end, partitionCommitter);
                LOG.debug("Created input split: " + split);
                splits.add(split);
            }
        }
    } finally {
        // close resources
        IOUtils.closeQuietly(zk);
        for (final SimpleConsumer consumer : consumers.values()) {
            consumer.close();
        }
    }
    return splits;
}
KafkaInputFormat.java 文件源码 项目:kangaroo 阅读 22 收藏 0 点赞 0 评论 0
@VisibleForTesting
List<Long> getOffsets(final SimpleConsumer consumer, final String topic, final int partitionNum,
        final long lastCommit, final long asOfTime, final int maxSplitsPerPartition) {
    // all offsets that exist for this partition (in descending order)
    final long[] allOffsets = consumer.getOffsetsBefore(topic, partitionNum, OffsetRequest.LatestTime(),
            Integer.MAX_VALUE);

    // this gets us an offset that is strictly before 'asOfTime', or zero if none exist before that time
    final long[] offsetsBeforeAsOf = consumer.getOffsetsBefore(topic, partitionNum, asOfTime, 1);
    final long includeAfter = offsetsBeforeAsOf.length == 1 ? offsetsBeforeAsOf[0] : 0;

    // note that the offsets are in descending order
    List<Long> result = Lists.newArrayList();
    for (final long offset : allOffsets) {
        if (offset > lastCommit && offset > includeAfter) {
            result.add(offset);
        } else {
            // we add "lastCommit" iff it is after "includeAfter"
            if (lastCommit > includeAfter) {
                result.add(lastCommit);
            }
            // we can break out of loop here bc offsets are in desc order, and we've hit the latest one to include
            break;
        }
    }
    // to get maxSplitsPerPartition number of splits, you need (maxSplitsPerPartition + 1) number of offsets.
    if (result.size() - 1 > maxSplitsPerPartition) {
        result = result.subList(result.size() - maxSplitsPerPartition - 1, result.size());
    }
    LOG.debug(String.format("Offsets for %s:%d:%d = %s", consumer.host(), consumer.port(), partitionNum, result));
    return result;
}
AbstractFetcherThread.java 文件源码 项目:buka 阅读 33 收藏 0 点赞 0 评论 0
protected void init() {
    simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId);
    brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port);
    metricId = new ClientIdAndBroker(clientId, brokerInfo);
    fetcherStats = new FetcherStats(metricId);
    fetcherLagStats = new FetcherLagStats(metricId);
    fetchRequestBuilder = new FetchRequestBuilder().
            clientId(clientId).
            replicaId(fetcherBrokerId).
            maxWait(maxWait).
            minBytes(minBytes);
}
KafkaInputFormat.java 文件源码 项目:kangaroo 阅读 19 收藏 0 点赞 0 评论 0
@VisibleForTesting
SimpleConsumer getConsumer(final Broker broker) {
    return new SimpleConsumer(broker.getHost(), broker.getPort(), DEFAULT_SOCKET_TIMEOUT_MS,
            DEFAULT_BUFFER_SIZE_BYTES);
}
KafkaRecordReader.java 文件源码 项目:kangaroo 阅读 18 收藏 0 点赞 0 评论 0
@VisibleForTesting
SimpleConsumer getConsumer(final KafkaInputSplit split, final Configuration conf) {
    return new SimpleConsumer(split.getPartition().getBroker().getHost(), split.getPartition().getBroker()
            .getPort(), getKafkaSocketTimeoutMs(conf), getKafkaBufferSizeBytes(conf));
}
KafkaInputFormatTest.java 文件源码 项目:kangaroo 阅读 24 收藏 0 点赞 0 评论 0
@Test
public void testGetInputSplits() throws Exception {
    final KafkaInputFormat inputFormat = spy(new KafkaInputFormat());
    final SimpleConsumer mockConsumer = mock(SimpleConsumer.class);
    final ZkUtils mockZk = mock(ZkUtils.class);
    final Configuration mockConf = new Configuration(false);

    final Broker broker = new Broker("127.0.0.1", 9092, 1);
    doReturn(mockConsumer).when(inputFormat).getConsumer(broker);
    doReturn(mockZk).when(inputFormat).getZk(mockConf);
    doReturn(Lists.newArrayList(20l, 10l)).when(inputFormat).getOffsets(mockConsumer, "topic", 0, -1, 0,
            Integer.MAX_VALUE);
    doReturn(Lists.newArrayList(30l, 20l, 0l)).when(inputFormat).getOffsets(mockConsumer, "topic", 1, 10, 0,
            Integer.MAX_VALUE);

    final Partition p1 = new Partition("topic", 0, broker);
    final Partition p2 = new Partition("topic", 1, broker);
    when(mockZk.getPartitions("topic")).thenReturn(Lists.newArrayList(p1, p2));
    when(mockZk.getBroker(1)).thenReturn(broker);
    when(mockZk.getLastCommit("group", p1)).thenReturn(-1l);
    when(mockZk.getLastCommit("group", p2)).thenReturn(10l);

    final List<InputSplit> result = inputFormat.getInputSplits(mockConf, "topic", "group");

    // assert the contents of each split
    Assert.assertEquals(3, result.size());
    final KafkaInputSplit split1 = (KafkaInputSplit) result.get(0);
    final Broker broker1 = split1.getPartition().getBroker();
    assertEquals(broker, broker1);
    assertEquals("127.0.0.1", broker1.getHost());
    assertEquals(9092, broker1.getPort());
    assertEquals(1, broker1.getId());
    assertEquals("1-0", split1.getPartition().getBrokerPartition());
    assertEquals(0, split1.getPartition().getPartId());
    assertEquals(10l, split1.getStartOffset());
    assertEquals(20l, split1.getEndOffset());
    assertEquals("topic", split1.getPartition().getTopic());

    final KafkaInputSplit split2 = (KafkaInputSplit) result.get(1);
    assertEquals(20l, split2.getStartOffset());
    assertEquals(30l, split2.getEndOffset());
    assertEquals("1-1", split2.getPartition().getBrokerPartition());

    final KafkaInputSplit split3 = (KafkaInputSplit) result.get(2);
    assertEquals(0l, split3.getStartOffset());
    assertEquals(20l, split3.getEndOffset());
    assertEquals("1-1", split3.getPartition().getBrokerPartition());

    // verify one and only one call to getConsumer - should get the cached consumer second time around
    verify(inputFormat, times(1)).getConsumer(broker);
    verify(inputFormat, times(1)).getConsumer(any(Broker.class));

    // verify the closeable components are closed
    verify(mockConsumer, times(1)).close();
    verify(mockZk, times(1)).close();
}
KafkaInputFormatTest.java 文件源码 项目:kangaroo 阅读 21 收藏 0 点赞 0 评论 0
@Test
public void testGetOffsets() throws Exception {
    final SimpleConsumer consumer = mock(SimpleConsumer.class);

    final long[] offsets = { 101, 91, 81, 71, 61, 51, 41, 31, 21, 11 };
    when(consumer.getOffsetsBefore("topic", 1, OffsetRequest.LatestTime(), Integer.MAX_VALUE)).thenReturn(offsets);
    when(consumer.getOffsetsBefore("topic", 1, 0, 1)).thenReturn(new long[] {});

    final KafkaInputFormat inputFormat = new KafkaInputFormat();

    // case 0: get everything (-1 last commit, 0 asOfTime, as many partitions as possible) -> all offsets
    long[] expected = offsets;
    List<Long> actual = inputFormat.getOffsets(consumer, "topic", 1, -1, 0, Integer.MAX_VALUE);
    compareArrayContents(offsets, actual);

    // case 1: lastCommit of 52 -> we should only get back the first 5 offsets + the lastCommit
    final int lastCommit = 52;
    expected = new long[6];
    System.arraycopy(offsets, 0, expected, 0, 6);
    expected[5] = lastCommit;
    actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, 0, Integer.MAX_VALUE);
    compareArrayContents(expected, actual);

    // case 2: lastCommit of 52, asOfTime 51 -> still include last offsets
    final int asOfTime = 999;
    when(consumer.getOffsetsBefore("topic", 1, asOfTime, 1)).thenReturn(new long[] { 51 });
    actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, Integer.MAX_VALUE);
    compareArrayContents(expected, actual);

    // case 3: lastCommit of 52, asOfTime 52 -> don't include last offsets
    when(consumer.getOffsetsBefore("topic", 1, asOfTime, 1)).thenReturn(new long[] { 52 });
    expected = Arrays.copyOfRange(offsets, 0, 5);
    actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, Integer.MAX_VALUE);
    compareArrayContents(expected, actual);

    // case 4: maxSplitsPerPartition == number of commits (5) -> should include all 5 offsets
    actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 5);
    compareArrayContents(expected, actual);

    // case 5: maxSplitsPerPartition = number of commits - 1 (4) -> should STILL include all 5 offsets
    actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 4);
    compareArrayContents(expected, actual);

    // case 6: maxSplitsPerPartition = number of commits - 2 (3) -> should exclude the first (largest) offset
    actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 3);
    expected = Arrays.copyOfRange(offsets, 1, 5);
    compareArrayContents(expected, actual);

    // case 7: maxSplitsPerPartition = 1 -> should include just 2 commits
    actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 1);
    expected = Arrays.copyOfRange(offsets, 3, 5);
    compareArrayContents(expected, actual);
}


问题


面经


文章

微信
公众号

扫码关注公众号