public void collect() throws Exception{
Properties properties = new Properties();
//class name을 user_id, grup_id로 사용함
properties.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
properties.put("group.id",group_id);
properties.put("zookeeper.session.timeout.ms", "6000");
properties.put("zookeeper.sync.time.ms", "2000");
properties.put("auto.commit.enable", "true");
properties.put("auto.commit.interval.ms", "5000");
properties.put("fetch.message.max.bytes", "31457280"); // 30MB
properties.put("auto.offset.reset", "smallest");
//properties.put("auto.offset.reset", "largest");
final ConsumerConnector consumer =
Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, NUM_THREADS);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
log.debug("NUM_THREADS : "+NUM_THREADS);
//for (final KafkaStream<byte[], byte[]> stream : streams) {
// executor.execute(new ConsumerT(stream));
//}
for (int m = 0; m < NUM_THREADS; m++) {
executor.execute(new ConsumerT(streams.get(m)));
}
}
java类kafka.consumer.Consumer的实例源码
AvroOneM2MDataSubscribe.java 文件源码
项目:SDA
阅读 20
收藏 0
点赞 0
评论 0
KafkaMqCollect.java 文件源码
项目:light_drtc
阅读 21
收藏 0
点赞 0
评论 0
public void init(){
Properties props = new Properties();
props.put("zookeeper.connect", Constants.kfZkServers);
props.put("group.id", Constants.kfGroupId);
props.put("auto.offset.reset", Constants.kfAutoOffsetReset);
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig config = new ConsumerConfig(props);
consumer = Consumer.createJavaConsumerConnector(config);
}
KafkaConsumer.java 文件源码
项目:change-data-capture
阅读 33
收藏 0
点赞 0
评论 0
public KafkaConsumer(final String topic, final String zkConnect, final String groupId, Decoder<Val> decoder){
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
new ConsumerConfig(getConsumerConfig(zkConnect, groupId)));
Map<String, Integer> topicCountMap = new HashMap(){{
put(topic, new Integer(1));
}};
Map<String, List<KafkaStream<byte[], Val>>> consumerMap = consumer.createMessageStreams(topicCountMap, new DefaultDecoder(null), decoder);
stream = consumerMap.get(topic).get(0);
}
OldConsumer.java 文件源码
项目:kafka-monitor
阅读 15
收藏 0
点赞 0
评论 0
public OldConsumer(String topic, Properties consumerProperties) {
_connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<String, String>>> kafkaStreams = _connector.createMessageStreams(topicCountMap, new StringDecoder(null), new StringDecoder(null));
_iter = kafkaStreams.get(topic).get(0).iterator();
}
DemoHighLevelConsumer.java 文件源码
项目:KafkaExample
阅读 15
收藏 0
点赞 0
评论 0
public static void main(String[] args) {
args = new String[] { "zookeeper0:2181/kafka", "topic1", "group2", "consumer1" };
if (args == null || args.length != 4) {
System.err.println("Usage:\n\tjava -jar kafka_consumer.jar ${zookeeper_list} ${topic_name} ${group_name} ${consumer_id}");
System.exit(1);
}
String zk = args[0];
String topic = args[1];
String groupid = args[2];
String consumerid = args[3];
Properties props = new Properties();
props.put("zookeeper.connect", zk);
props.put("group.id", groupid);
props.put("client.id", "test");
props.put("consumer.id", consumerid);
props.put("auto.offset.reset", "largest");
props.put("auto.commit.enable", "false");
props.put("auto.commit.interval.ms", "60000");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream1 = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> interator = stream1.iterator();
while (interator.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = interator.next();
String message = String.format(
"Topic:%s, GroupID:%s, Consumer ID:%s, PartitionID:%s, Offset:%s, Message Key:%s, Message Payload: %s",
messageAndMetadata.topic(), groupid, consumerid, messageAndMetadata.partition(),
messageAndMetadata.offset(), new String(messageAndMetadata.key()),
new String(messageAndMetadata.message()));
System.out.println(message);
consumerConnector.commitOffsets();
}
}
KafkaEventConsumer.java 文件源码
项目:EventStreamAnalytics
阅读 16
收藏 0
点赞 0
评论 0
public KafkaEventConsumer(ActorSystem system) {
logger.debug("Initilizing Consumer");
hazelcastEventActor = system.actorOf(Props.create(HazelcastEventActor.class), "hazelcastEventHandler");
mangoDBEventActor = system.actorOf(Props.create(MongoDBEventActor.class), "mongoDbEventHandler");
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "eventProcessor");
props.put("client.id", "workerEventProcessor");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
logger.debug("Intialized Consumer");
}
KafkaDemoClient.java 文件源码
项目:thingsboard
阅读 26
收藏 0
点赞 0
评论 0
private static ConsumerIterator<String, String> buildConsumer(String topic) {
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, 1);
ConsumerConfig consumerConfig = new ConsumerConfig(consumerProperties());
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams(topicCountMap, new StringDecoder(null), new StringDecoder(null));
KafkaStream<String, String> stream = consumers.get(topic).get(0);
return stream.iterator();
}
KafkaConsumer.java 文件源码
项目:kclient
阅读 29
收藏 0
点赞 0
评论 0
protected void initKafka() {
if (handler == null) {
log.error("Exectuor can't be null!");
throw new RuntimeException("Exectuor can't be null!");
}
log.info("Consumer properties:" + properties);
ConsumerConfig config = new ConsumerConfig(properties);
isAutoCommitOffset = config.autoCommitEnable();
log.info("Auto commit: " + isAutoCommitOffset);
consumerConnector = Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topics = new HashMap<String, Integer>();
topics.put(topic, streamNum);
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(
new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> streamsMap = consumerConnector
.createMessageStreams(topics, keyDecoder, valueDecoder);
streams = streamsMap.get(topic);
log.info("Streams:" + streams);
if (streams == null || streams.isEmpty()) {
log.error("Streams are empty.");
throw new IllegalArgumentException("Streams are empty.");
}
streamThreadPool = Executors.newFixedThreadPool(streamNum);
}
KafkaConsumerTestBase.java 文件源码
项目:flink
阅读 15
收藏 0
点赞 0
评论 0
/**
* Read topic to list, only using Kafka code.
*/
private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
// we request only one stream per consumer instance. Kafka will make sure that each consumer group
// will see each message only once.
Map<String, Integer> topicCountMap = Collections.singletonMap(topicName, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
if (streams.size() != 1) {
throw new RuntimeException("Expected only one message stream but got " + streams.size());
}
List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
if (kafkaStreams == null) {
throw new RuntimeException("Requested stream not available. Available streams: " + streams.toString());
}
if (kafkaStreams.size() != 1) {
throw new RuntimeException("Requested 1 stream from Kafka, bot got " + kafkaStreams.size() + " streams");
}
LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>();
int read = 0;
while (iteratorToRead.hasNext()) {
read++;
result.add(iteratorToRead.next());
if (read == stopAfter) {
LOG.info("Read " + read + " elements");
return result;
}
}
return result;
}
KafkaConsumerTestBase.java 文件源码
项目:flink
阅读 17
收藏 0
点赞 0
评论 0
/**
* Read topic to list, only using Kafka code.
*/
private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
// we request only one stream per consumer instance. Kafka will make sure that each consumer group
// will see each message only once.
Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
if (streams.size() != 1) {
throw new RuntimeException("Expected only one message stream but got "+streams.size());
}
List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
if (kafkaStreams == null) {
throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
}
if (kafkaStreams.size() != 1) {
throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
}
LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>();
int read = 0;
while(iteratorToRead.hasNext()) {
read++;
result.add(iteratorToRead.next());
if (read == stopAfter) {
LOG.info("Read "+read+" elements");
return result;
}
}
return result;
}