private SimpleConsumer createSimpleConsumer(Integer brokerId) {
try {
String brokerInfo = zkClient.readData(ZkUtils.BrokerIdsPath() + "/" + brokerId, true);
if (brokerInfo == null) {
log.error("Broker clientId %d does not exist", brokerId);
return null;
}
Map<String, Object> map = Resources.jsonMapper.readValue(
brokerInfo, new TypeReference<Map<String, Object>>() {
}
);
String host = (String) map.get("host");
Integer port = (Integer) map.get("port");
return new SimpleConsumer(host, port, 10000, 100000, "KafkaConsumerInfos");
} catch (Exception e) {
log.error(e, "Could not parse broker[%d] info", brokerId);
return null;
}
}
java类kafka.consumer.SimpleConsumer的实例源码
KafkaInfos.java 文件源码
项目:DCMonitor
阅读 21
收藏 0
点赞 0
评论 0
KafkaInfos.java 文件源码
项目:DCMonitor
阅读 21
收藏 0
点赞 0
评论 0
private long getTopicLogSize(String topic, int pid) {
Option<Object> o = ZkUtils.getLeaderForPartition(zkClient, topic, pid);
if (o.isEmpty() || o.get() == null) {
log.error("No broker for partition %s - %s", topic, pid);
return 0;
}
Integer leaderId = Int.unbox(o.get());
SimpleConsumer consumer = consumerMap.get(leaderId);
if (consumer == null) {
consumer = createSimpleConsumer(leaderId);
}
// createSimpleConsumer may fail.
if (consumer == null) {
return 0;
}
consumerMap.put(leaderId, consumer);
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, pid);
PartitionOffsetRequestInfo requestInfo = new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1);
OffsetRequest request = new OffsetRequest(
new Map1<TopicAndPartition, PartitionOffsetRequestInfo>(topicAndPartition, requestInfo),
0,
Request.OrdinaryConsumerId()
);
OffsetResponse response = consumer.getOffsetsBefore(request);
PartitionOffsetsResponse offsetsResponse = response.partitionErrorAndOffsets().get(topicAndPartition).get();
return scala.Long.unbox(offsetsResponse.offsets().head());
}
KafkaInfos.java 文件源码
项目:DCMonitor
阅读 22
收藏 0
点赞 0
评论 0
private List<BrokerInfo> getBrokerInfos() {
List<BrokerInfo> infos = Lists.newArrayListWithExpectedSize(consumerMap.size());
for (Map.Entry<Integer, SimpleConsumer> entry : consumerMap.entrySet()) {
BrokerInfo info = new BrokerInfo();
info.id = entry.getKey();
info.host = entry.getValue().host();
info.port = entry.getValue().port();
infos.add(info);
}
return infos;
}
KafkaInfos.java 文件源码
项目:DCMonitor
阅读 21
收藏 0
点赞 0
评论 0
@Override
public void close() {
for (SimpleConsumer consumer : consumerMap.values()) {
if (consumer != null) {
consumer.close();
}
}
if (zkClient != null) {
zkClient.close();
}
}
KafkaInputFormat.java 文件源码
项目:kangaroo
阅读 25
收藏 0
点赞 0
评论 0
/**
* Gets all of the input splits for the {@code topic}, filtering out any {@link InputSplit}s already consumed by the
* {@code group}.
*
* @param conf
* the job configuration.
* @param topic
* the topic.
* @param group
* the consumer group.
* @return input splits for the job.
* @throws IOException
*/
List<InputSplit> getInputSplits(final Configuration conf, final String topic, final String group)
throws IOException {
final List<InputSplit> splits = Lists.newArrayList();
final ZkUtils zk = getZk(conf);
final Map<Broker, SimpleConsumer> consumers = Maps.newHashMap();
try {
for (final Partition partition : zk.getPartitions(topic)) {
// cache the consumer connections - each partition will make use of each broker consumer
final Broker broker = partition.getBroker();
if (!consumers.containsKey(broker)) {
consumers.put(broker, getConsumer(broker));
}
// grab all valid offsets
final List<Long> offsets = getOffsets(consumers.get(broker), topic, partition.getPartId(),
zk.getLastCommit(group, partition), getIncludeOffsetsAfterTimestamp(conf),
getMaxSplitsPerPartition(conf));
for (int i = 0; i < offsets.size() - 1; i++) {
// ( offsets in descending order )
final long start = offsets.get(i + 1);
final long end = offsets.get(i);
// since the offsets are in descending order, the first offset in the list is the largest offset for
// the current partition. This split will be in charge of committing the offset for this partition.
final boolean partitionCommitter = (i == 0);
final InputSplit split = new KafkaInputSplit(partition, start, end, partitionCommitter);
LOG.debug("Created input split: " + split);
splits.add(split);
}
}
} finally {
// close resources
IOUtils.closeQuietly(zk);
for (final SimpleConsumer consumer : consumers.values()) {
consumer.close();
}
}
return splits;
}
KafkaInputFormat.java 文件源码
项目:kangaroo
阅读 22
收藏 0
点赞 0
评论 0
@VisibleForTesting
List<Long> getOffsets(final SimpleConsumer consumer, final String topic, final int partitionNum,
final long lastCommit, final long asOfTime, final int maxSplitsPerPartition) {
// all offsets that exist for this partition (in descending order)
final long[] allOffsets = consumer.getOffsetsBefore(topic, partitionNum, OffsetRequest.LatestTime(),
Integer.MAX_VALUE);
// this gets us an offset that is strictly before 'asOfTime', or zero if none exist before that time
final long[] offsetsBeforeAsOf = consumer.getOffsetsBefore(topic, partitionNum, asOfTime, 1);
final long includeAfter = offsetsBeforeAsOf.length == 1 ? offsetsBeforeAsOf[0] : 0;
// note that the offsets are in descending order
List<Long> result = Lists.newArrayList();
for (final long offset : allOffsets) {
if (offset > lastCommit && offset > includeAfter) {
result.add(offset);
} else {
// we add "lastCommit" iff it is after "includeAfter"
if (lastCommit > includeAfter) {
result.add(lastCommit);
}
// we can break out of loop here bc offsets are in desc order, and we've hit the latest one to include
break;
}
}
// to get maxSplitsPerPartition number of splits, you need (maxSplitsPerPartition + 1) number of offsets.
if (result.size() - 1 > maxSplitsPerPartition) {
result = result.subList(result.size() - maxSplitsPerPartition - 1, result.size());
}
LOG.debug(String.format("Offsets for %s:%d:%d = %s", consumer.host(), consumer.port(), partitionNum, result));
return result;
}
AbstractFetcherThread.java 文件源码
项目:buka
阅读 33
收藏 0
点赞 0
评论 0
protected void init() {
simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId);
brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port);
metricId = new ClientIdAndBroker(clientId, brokerInfo);
fetcherStats = new FetcherStats(metricId);
fetcherLagStats = new FetcherLagStats(metricId);
fetchRequestBuilder = new FetchRequestBuilder().
clientId(clientId).
replicaId(fetcherBrokerId).
maxWait(maxWait).
minBytes(minBytes);
}
KafkaInputFormat.java 文件源码
项目:kangaroo
阅读 19
收藏 0
点赞 0
评论 0
@VisibleForTesting
SimpleConsumer getConsumer(final Broker broker) {
return new SimpleConsumer(broker.getHost(), broker.getPort(), DEFAULT_SOCKET_TIMEOUT_MS,
DEFAULT_BUFFER_SIZE_BYTES);
}
KafkaRecordReader.java 文件源码
项目:kangaroo
阅读 18
收藏 0
点赞 0
评论 0
@VisibleForTesting
SimpleConsumer getConsumer(final KafkaInputSplit split, final Configuration conf) {
return new SimpleConsumer(split.getPartition().getBroker().getHost(), split.getPartition().getBroker()
.getPort(), getKafkaSocketTimeoutMs(conf), getKafkaBufferSizeBytes(conf));
}
KafkaInputFormatTest.java 文件源码
项目:kangaroo
阅读 24
收藏 0
点赞 0
评论 0
@Test
public void testGetInputSplits() throws Exception {
final KafkaInputFormat inputFormat = spy(new KafkaInputFormat());
final SimpleConsumer mockConsumer = mock(SimpleConsumer.class);
final ZkUtils mockZk = mock(ZkUtils.class);
final Configuration mockConf = new Configuration(false);
final Broker broker = new Broker("127.0.0.1", 9092, 1);
doReturn(mockConsumer).when(inputFormat).getConsumer(broker);
doReturn(mockZk).when(inputFormat).getZk(mockConf);
doReturn(Lists.newArrayList(20l, 10l)).when(inputFormat).getOffsets(mockConsumer, "topic", 0, -1, 0,
Integer.MAX_VALUE);
doReturn(Lists.newArrayList(30l, 20l, 0l)).when(inputFormat).getOffsets(mockConsumer, "topic", 1, 10, 0,
Integer.MAX_VALUE);
final Partition p1 = new Partition("topic", 0, broker);
final Partition p2 = new Partition("topic", 1, broker);
when(mockZk.getPartitions("topic")).thenReturn(Lists.newArrayList(p1, p2));
when(mockZk.getBroker(1)).thenReturn(broker);
when(mockZk.getLastCommit("group", p1)).thenReturn(-1l);
when(mockZk.getLastCommit("group", p2)).thenReturn(10l);
final List<InputSplit> result = inputFormat.getInputSplits(mockConf, "topic", "group");
// assert the contents of each split
Assert.assertEquals(3, result.size());
final KafkaInputSplit split1 = (KafkaInputSplit) result.get(0);
final Broker broker1 = split1.getPartition().getBroker();
assertEquals(broker, broker1);
assertEquals("127.0.0.1", broker1.getHost());
assertEquals(9092, broker1.getPort());
assertEquals(1, broker1.getId());
assertEquals("1-0", split1.getPartition().getBrokerPartition());
assertEquals(0, split1.getPartition().getPartId());
assertEquals(10l, split1.getStartOffset());
assertEquals(20l, split1.getEndOffset());
assertEquals("topic", split1.getPartition().getTopic());
final KafkaInputSplit split2 = (KafkaInputSplit) result.get(1);
assertEquals(20l, split2.getStartOffset());
assertEquals(30l, split2.getEndOffset());
assertEquals("1-1", split2.getPartition().getBrokerPartition());
final KafkaInputSplit split3 = (KafkaInputSplit) result.get(2);
assertEquals(0l, split3.getStartOffset());
assertEquals(20l, split3.getEndOffset());
assertEquals("1-1", split3.getPartition().getBrokerPartition());
// verify one and only one call to getConsumer - should get the cached consumer second time around
verify(inputFormat, times(1)).getConsumer(broker);
verify(inputFormat, times(1)).getConsumer(any(Broker.class));
// verify the closeable components are closed
verify(mockConsumer, times(1)).close();
verify(mockZk, times(1)).close();
}
KafkaInputFormatTest.java 文件源码
项目:kangaroo
阅读 21
收藏 0
点赞 0
评论 0
@Test
public void testGetOffsets() throws Exception {
final SimpleConsumer consumer = mock(SimpleConsumer.class);
final long[] offsets = { 101, 91, 81, 71, 61, 51, 41, 31, 21, 11 };
when(consumer.getOffsetsBefore("topic", 1, OffsetRequest.LatestTime(), Integer.MAX_VALUE)).thenReturn(offsets);
when(consumer.getOffsetsBefore("topic", 1, 0, 1)).thenReturn(new long[] {});
final KafkaInputFormat inputFormat = new KafkaInputFormat();
// case 0: get everything (-1 last commit, 0 asOfTime, as many partitions as possible) -> all offsets
long[] expected = offsets;
List<Long> actual = inputFormat.getOffsets(consumer, "topic", 1, -1, 0, Integer.MAX_VALUE);
compareArrayContents(offsets, actual);
// case 1: lastCommit of 52 -> we should only get back the first 5 offsets + the lastCommit
final int lastCommit = 52;
expected = new long[6];
System.arraycopy(offsets, 0, expected, 0, 6);
expected[5] = lastCommit;
actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, 0, Integer.MAX_VALUE);
compareArrayContents(expected, actual);
// case 2: lastCommit of 52, asOfTime 51 -> still include last offsets
final int asOfTime = 999;
when(consumer.getOffsetsBefore("topic", 1, asOfTime, 1)).thenReturn(new long[] { 51 });
actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, Integer.MAX_VALUE);
compareArrayContents(expected, actual);
// case 3: lastCommit of 52, asOfTime 52 -> don't include last offsets
when(consumer.getOffsetsBefore("topic", 1, asOfTime, 1)).thenReturn(new long[] { 52 });
expected = Arrays.copyOfRange(offsets, 0, 5);
actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, Integer.MAX_VALUE);
compareArrayContents(expected, actual);
// case 4: maxSplitsPerPartition == number of commits (5) -> should include all 5 offsets
actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 5);
compareArrayContents(expected, actual);
// case 5: maxSplitsPerPartition = number of commits - 1 (4) -> should STILL include all 5 offsets
actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 4);
compareArrayContents(expected, actual);
// case 6: maxSplitsPerPartition = number of commits - 2 (3) -> should exclude the first (largest) offset
actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 3);
expected = Arrays.copyOfRange(offsets, 1, 5);
compareArrayContents(expected, actual);
// case 7: maxSplitsPerPartition = 1 -> should include just 2 commits
actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 1);
expected = Arrays.copyOfRange(offsets, 3, 5);
compareArrayContents(expected, actual);
}