public MessageAndMetadata getNextMessage(String topic) {
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// it has only a single stream, because there is only one consumer
KafkaStream stream = streams.get(0);
final ConsumerIterator<byte[], byte[]> it = stream.iterator();
int counter = 0;
try {
if (it.hasNext()) {
return it.next();
} else {
return null;
}
} catch (ConsumerTimeoutException e) {
logger.error("0 messages available to fetch for the topic " + topic);
return null;
}
}
java类kafka.consumer.ConsumerTimeoutException的实例源码
KafkaConsumer.java 文件源码
项目:flume-release-1.7.0
阅读 27
收藏 0
点赞 0
评论 0
KafkaConsumerRunnable.java 文件源码
项目:watchtower-automation
阅读 21
收藏 0
点赞 0
评论 0
public void run() {
logger.debug("KafkaChannel {} has stream", this.threadNumber);
final ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator();
running = true;
while (running) {
try {
if (streamIterator.hasNext()) {
final byte[] message = streamIterator.next().message();
logger.debug("Thread {}: {}", threadNumber, message.toString());
consumeMessage(message);
}
} catch (ConsumerTimeoutException cte) {
logger.debug("Timed out when consuming from Kafka", cte);
KafkaHealthCheck.getInstance().heartAttack(cte.getMessage());
}
}
}
KafkaConsumer.java 文件源码
项目:sqoop-on-spark
阅读 26
收藏 0
点赞 0
评论 0
public MessageAndMetadata getNextMessage(String topic) {
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// it has only a single stream, because there is only one consumer
KafkaStream stream = streams.get(0);
final ConsumerIterator<byte[], byte[]> it = stream.iterator();
int counter = 0;
try {
if (it.hasNext()) {
return it.next();
} else {
return null;
}
} catch (ConsumerTimeoutException e) {
logger.error("0 messages available to fetch for the topic " + topic);
return null;
}
}
KafkaConsumerRunnable.java 文件源码
项目:watchtower-workflow
阅读 21
收藏 0
点赞 0
评论 0
public void run() {
logger.info("KafkaChannel {} has stream", this.threadNumber);
final ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator();
running = true;
while (running) {
try {
if (streamIterator.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = streamIterator.next();
byte[] key = messageAndMetadata.key();
byte[] message = messageAndMetadata.message();
consumeMessage(key, message);
}
} catch (ConsumerTimeoutException cte) {
logger.debug("Timed out when consuming from Kafka", cte);
KafkaHealthCheck.getInstance().heartAttack(cte.getMessage());
}
}
}
KafkaConsumer08.java 文件源码
项目:datacollector
阅读 21
收藏 0
点赞 0
评论 0
@Override
public MessageAndOffset read() throws StageException {
try {
//has next blocks indefinitely if consumer.timeout.ms is set to -1
//But if consumer.timeout.ms is set to a value, like 6000, a ConsumerTimeoutException is thrown
//if no message is written to kafka topic in that time.
if(consumerIterator.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = consumerIterator.next();
byte[] message = messageAndMetadata.message();
long offset = messageAndMetadata.offset();
int partition = messageAndMetadata.partition();
return new MessageAndOffset(message, offset, partition);
}
return null;
} catch (ConsumerTimeoutException e) {
/*For high level consumer the fetching logic is handled by a background
fetcher thread and is hidden from user, for either case of
1) broker down or
2) no message is available
the fetcher thread will keep retrying while the user thread will wait on the fetcher thread to put some
data into the buffer until timeout. So in a sentence the high-level consumer design is to
not let users worry about connect / reconnect issues.*/
return null;
}
}
RunnableConsumer.java 文件源码
项目:csc8101
阅读 24
收藏 0
点赞 0
评论 0
public void run() {
try {
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<String, String> messageAndMetadata = it.next();
String message = messageAndMetadata.message();
messageHandler.handle(message);
meter.mark();
}
messageHandler.flush();
} catch (ConsumerTimeoutException e) {
messageHandler.flush();
}
}
KafkaSource.java 文件源码
项目:flume-ng-extends
阅读 22
收藏 0
点赞 0
评论 0
/**
* Check if there are messages waiting in Kafka,
* waiting until timeout (10ms by default) for messages to arrive.
* and catching the timeout exception to return a boolean
*/
boolean hasNext() {
try {
it.hasNext();
return true;
} catch (ConsumerTimeoutException e) {
return false;
}
}
KafkaSource.java 文件源码
项目:apple-ums-server
阅读 23
收藏 0
点赞 0
评论 0
/**
* Check if there are messages waiting in Kafka, waiting until timeout (10ms
* by default) for messages to arrive. and catching the timeout exception to
* return a boolean
*/
boolean hasNext() {
try {
it.hasNext();
return true;
} catch (ConsumerTimeoutException e) {
return false;
}
}
KafkaSource.java 文件源码
项目:flume-ng-extends-source
阅读 23
收藏 0
点赞 0
评论 0
/**
* Check if there are messages waiting in Kafka,
* waiting until timeout (10ms by default) for messages to arrive.
* and catching the timeout exception to return a boolean
*/
boolean hasNext() {
try {
it.hasNext();
return true;
} catch (ConsumerTimeoutException e) {
return false;
}
}
MetricsToKafkaTest.java 文件源码
项目:support-metrics-client
阅读 21
收藏 0
点赞 0
评论 0
/**
* Helper function that consumes messages from a topic with a timeout.
*/
private static void verifyMetricsSubmittedToTopic(
String zkConnect,
String topic,
int expNumMetricSubmissions) throws IOException {
int timeoutMs = 10 * 1000;
KafkaMetricsToFile kafkaMetricsToFile = new KafkaMetricsToFile(zkConnect, timeoutMs);
List<KafkaStream<byte[], byte[]>> streams = kafkaMetricsToFile.getStreams(topic);
int numRecords = 0;
AvroDeserializer decoder = new AvroDeserializer();
try {
for (final KafkaStream<byte[], byte[]> stream : streams) {
for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
SupportKafkaMetricsBasic[] container = decoder.deserialize(SupportKafkaMetricsBasic.class,
messageAndMetadata.message());
assertThat(container.length).isEqualTo(1);
verifyBasicMetrics(container[0]);
numRecords++;
}
}
} catch (ConsumerTimeoutException e) {
// do nothing, this is expected success case since we consume with a timeout
}
assertThat(numRecords).isEqualTo(expNumMetricSubmissions);
// Cleanup
kafkaMetricsToFile.shutdown();
}
MsgIterator.java 文件源码
项目:easyframe-msg
阅读 19
收藏 0
点赞 0
评论 0
/** 判断是否有下一条消息
* @return */
public boolean hasNext() throws MsgTimeoutException{
try {
return iter.hasNext();
} catch (ConsumerTimeoutException ex) {
// 超时并不意味着出错,只是暂时没有消息
throw new MsgTimeoutException(ex);
}
}
ConsumingTask.java 文件源码
项目:ingestion-ws-kafka-hdfs
阅读 22
收藏 0
点赞 0
评论 0
private void loopProcessing(Iterator<MessageAndMetadata<byte[], byte[]>> it) throws Exception {
LOGGER.info("Starting processing {}", this);
while (canRun) {
try {
while (canRun && it.hasNext() && canRun) {
// ^^^ I know that it looks stupid but I think it makes sense :)
processMessage(it.next());
}
} catch (ConsumerTimeoutException ex) {
// We don't do anything here on purpose
}
}
LOGGER.info("Done processing {}", this);
}
FastKafkaSource.java 文件源码
项目:fraud-detection-tutorial
阅读 22
收藏 0
点赞 0
评论 0
boolean hasNext() {
try {
this.it.hasNext();
return true;
} catch (ConsumerTimeoutException var2) {
return false;
}
}
MessageResource.java 文件源码
项目:dropwizard-kafka-http
阅读 24
收藏 0
点赞 0
评论 0
@GET
@Timed
public Response consume(
@QueryParam("topic") String topic,
@QueryParam("timeout") Integer timeout
) {
if (Strings.isNullOrEmpty(topic))
return Response.status(400)
.entity(new String[]{"Undefined topic"})
.build();
Properties props = (Properties) consumerCfg.clone();
if (timeout != null) props.put("consumer.timeout.ms", "" + timeout);
ConsumerConfig config = new ConsumerConfig(props);
ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);
Map<String, Integer> streamCounts = Collections.singletonMap(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(streamCounts);
KafkaStream<byte[], byte[]> stream = streams.get(topic).get(0);
List<Message> messages = new ArrayList<>();
try {
for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream)
messages.add(new Message(messageAndMetadata));
} catch (ConsumerTimeoutException ignore) {
} finally {
connector.commitOffsets();
connector.shutdown();
}
return Response.ok(messages).build();
}
KafkaConsumerCallable.java 文件源码
项目:pentaho-kafka-consumer
阅读 19
收藏 0
点赞 0
评论 0
public Object call() throws KettleException {
try {
long limit;
String strData = meta.getLimit();
limit = getLimit(strData);
if (limit > 0) {
step.logDebug("Collecting up to " + limit + " messages");
} else {
step.logDebug("Collecting unlimited messages");
}
while (data.streamIterator.hasNext() && !data.canceled && (limit <= 0 || data.processed < limit)) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = data.streamIterator.next();
messageReceived(messageAndMetadata.key(), messageAndMetadata.message());
++data.processed;
}
} catch (ConsumerTimeoutException cte) {
step.logDebug("Received a consumer timeout after " + data.processed + " messages");
if (!meta.isStopOnEmptyTopic()) {
// Because we're not set to stop on empty, this is an abnormal
// timeout
throw new KettleException("Unexpected consumer timeout!", cte);
}
}
// Notify that all messages were read successfully
data.consumer.commitOffsets();
step.setOutputDone();
return null;
}
KafkaMessageBus.java 文件源码
项目:debezium-proto
阅读 19
收藏 0
点赞 0
评论 0
@Override
public <KeyType, MessageType> void subscribe(String groupId, TopicFilter topicFilter, int numThreads, Deserializer<KeyType> keyDecoder,
Deserializer<MessageType> messageDecoder, MessageConsumer<KeyType, MessageType> consumer) {
if (!running) throw new IllegalStateException("Kafka client is no longer running");
if (numThreads < 1) return;
// Create the config for this consumer ...
final boolean autoCommit = true;
final boolean debug = logger.isDebugEnabled();
Properties props = new Properties();
props.putAll(this.consumerConfig);
props.put("group.id", groupId);
// Create the consumer and iterate over the streams and create a thread to process each one ...
ConsumerConnector connector = getOrCreateConnector(props);
connector.createMessageStreamsByFilter(topicFilter, numThreads, DEFAULT_DECODER, DEFAULT_DECODER)
.forEach(stream -> {
this.executor.get().execute(() -> {
final ConsumerIterator<byte[], byte[]> iter = stream.iterator();
boolean success = false;
while (true) {
try {
while (running && iter.hasNext()) {
// Determine if we're still running after we've received a message ...
if ( running ) {
MessageAndMetadata<byte[], byte[]> msg = iter.next();
if (debug) {
logger.debug("Consuming next message on topic '{}', partition {}, offset {}",
msg.topic(), msg.partition(), msg.offset());
}
success = consumer.consume(msg.topic(), msg.partition(), msg.offset(),
keyDecoder.deserialize(msg.topic(),msg.key()),
messageDecoder.deserialize(msg.topic(),msg.message()));
logger.debug("Consume message: {}", success);
if (success && autoCommit) {
logger.debug("Committing offsets");
connector.commitOffsets();
}
}
}
} catch (ConsumerTimeoutException e) {
logger.debug("Consumer timed out and continuing");
// Keep going ...
}
}
});
});
}
TestKafkaSinkV2.java 文件源码
项目:suro
阅读 21
收藏 0
点赞 0
评论 0
@Test
public void testMultithread() throws IOException {
TopicCommand.createTopic(zk.getZkClient(),
new TopicCommand.TopicCommandOptions(new String[]{
"--zookeeper", "dummy", "--create", "--topic", TOPIC_NAME_MULTITHREAD,
"--replication-factor", "2", "--partitions", "1"}));
String description = "{\n" +
" \"type\": \"kafka\",\n" +
" \"client.id\": \"kafkasink\",\n" +
" \"metadata.broker.list\": \"" + kafkaServer.getBrokerListStr() + "\",\n" +
" \"request.required.acks\": 1,\n" +
" \"batchSize\": 10,\n" +
" \"jobQueueSize\": 3\n" +
"}";
ObjectMapper jsonMapper = new DefaultObjectMapper();
jsonMapper.registerSubtypes(new NamedType(KafkaSinkV2.class, "kafka"));
KafkaSinkV2 sink = jsonMapper.readValue(description, new TypeReference<Sink>(){});
sink.open();
int msgCount = 10000;
for (int i = 0; i < msgCount; ++i) {
Map<String, Object> msgMap = new ImmutableMap.Builder<String, Object>()
.put("key", Integer.toString(i))
.put("value", "message:" + i).build();
sink.writeTo(new DefaultMessageContainer(
new Message(TOPIC_NAME_MULTITHREAD, jsonMapper.writeValueAsBytes(msgMap)),
jsonMapper));
}
assertTrue(sink.getNumOfPendingMessages() > 0);
sink.close();
System.out.println(sink.getStat());
assertEquals(sink.getNumOfPendingMessages(), 0);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig("localhost:" + zk.getServerPort(), "gropuid_multhread"));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC_NAME_MULTITHREAD, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC_NAME_MULTITHREAD).get(0);
for (int i = 0; i < msgCount; ++i) {
stream.iterator().next();
}
try {
stream.iterator().next();
fail();
} catch (ConsumerTimeoutException e) {
//this is expected
consumer.shutdown();
}
}