/**
*
* @param connector
* @param topics
* @param processThreads
*/
@SuppressWarnings("unchecked")
public OldApiTopicConsumer(ConsumerContext context) {
this.consumerContext = context;
try {
Class<?> deserializerClass = Class
.forName(context.getProperties().getProperty("value.deserializer"));
deserializer = (Deserializer<Object>) deserializerClass.newInstance();
} catch (Exception e) {
}
this.connector = kafka.consumer.Consumer
.createJavaConsumerConnector(new ConsumerConfig(context.getProperties()));
int poolSize = consumerContext.getMessageHandlers().size();
this.fetchExecutor = new StandardThreadExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS,
poolSize, new StandardThreadFactory("KafkaFetcher"));
this.defaultProcessExecutor = new StandardThreadExecutor(1, context.getMaxProcessThreads(),
30, TimeUnit.SECONDS, context.getMaxProcessThreads(),
new StandardThreadFactory("KafkaProcessor"), new PoolFullRunsPolicy());
logger.info(
"Kafka Conumer ThreadPool initialized,fetchPool Size:{},defalutProcessPool Size:{} ",
poolSize, context.getMaxProcessThreads());
}
java类kafka.consumer.ConsumerConfig的实例源码
OldApiTopicConsumer.java 文件源码
项目:azeroth
阅读 17
收藏 0
点赞 0
评论 0
KafkaDataProvider.java 文件源码
项目:linden
阅读 78
收藏 0
点赞 0
评论 0
public KafkaDataProvider(String zookeeper, String topic, String groupId) {
super(MessageAndMetadata.class);
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "30000");
props.put("auto.commit.interval.ms", "1000");
props.put("fetch.message.max.bytes", "4194304");
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
iter = stream.iterator();
}
KafkaDataSpout.java 文件源码
项目:storm-demos
阅读 17
收藏 0
点赞 0
评论 0
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
/**
* this method used to set kafka-consumer configuration
*
* Args :
* m_zookeeper: zookeeper address with port
* m_groupId : kafka-consumer consumer group
*
* Return :
* an object of ConnsumerConfig
*
*/
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
DMaaPKafkaConsumerFactory.java 文件源码
项目:dmaap-framework
阅读 16
收藏 0
点赞 0
评论 0
private ConsumerConfig createConsumerConfig(String groupId,
String consumerId) {
final Properties props = new Properties();
props.put("zookeeper.connect", fZooKeeper);
props.put("group.id", groupId);
props.put("consumer.id", consumerId);
//props.put("auto.commit.enable", "false");
// additional settings: start with our defaults, then pull in configured
// overrides
props.putAll(KafkaInternalDefaults);
for (String key : KafkaConsumerKeys) {
transferSettingIfProvided(props, key, "kafka");
}
return new ConsumerConfig(props);
}
OSMKafkaSpout.java 文件源码
项目:geomesa-tutorials
阅读 20
收藏 0
点赞 0
评论 0
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
_collector = spoutOutputCollector;
Properties props = new Properties();
props.put("zookeeper.connect", conf.get(OSMIngest.ZOOKEEPERS));
props.put("group.id", groupId);
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, new StringDecoder(new VerifiableProperties()), new StringDecoder(new VerifiableProperties()));
List<KafkaStream<String, String>> streams = consumerMap.get(topic);
KafkaStream<String, String> stream = null;
if (streams.size() == 1) {
stream = streams.get(0);
} else {
log.error("Streams should be of size 1");
}
kafkaIterator = stream.iterator();
}
KafkaDistributed.java 文件源码
项目:jlogstash-input-plugin
阅读 21
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
public void prepare() {
Properties props = geneConsumerProp();
for(String topicName : topic.keySet()){
ConsumerConnector consumer = kafka.consumer.Consumer
.createJavaConsumerConnector(new ConsumerConfig(props));
consumerConnMap.put(topicName, consumer);
}
if(distributed!=null){
try {
logger.warn("zkDistributed is start...");
zkDistributed = ZkDistributed.getSingleZkDistributed(distributed);
zkDistributed.zkRegistration();
} catch (Exception e) {
// TODO Auto-generated catch block
logger.error("zkRegistration fail:{}",ExceptionUtil.getErrorMessage(e));
}
}
}
KafkaDistributed.java 文件源码
项目:jlogstash-input-plugin
阅读 19
收藏 0
点赞 0
评论 0
public void reconnConsumer(String topicName){
//停止topic 对应的conn
ConsumerConnector consumerConn = consumerConnMap.get(topicName);
consumerConn.commitOffsets(true);
consumerConn.shutdown();
consumerConnMap.remove(topicName);
//停止topic 对应的stream消耗线程
ExecutorService es = executorMap.get(topicName);
es.shutdownNow();
executorMap.remove(topicName);
Properties prop = geneConsumerProp();
ConsumerConnector newConsumerConn = kafka.consumer.Consumer
.createJavaConsumerConnector(new ConsumerConfig(prop));
consumerConnMap.put(topicName, newConsumerConn);
addNewConsumer(topicName, topic.get(topicName));
}
Kafka.java 文件源码
项目:jlogstash-input-plugin
阅读 19
收藏 0
点赞 0
评论 0
public void reconnConsumer(String topicName){
//停止topic 对应的conn
ConsumerConnector consumerConn = consumerConnMap.get(topicName);
consumerConn.commitOffsets(true);
consumerConn.shutdown();
consumerConnMap.remove(topicName);
//停止topic 对应的stream消耗线程
ExecutorService es = executorMap.get(topicName);
es.shutdownNow();
executorMap.remove(topicName);
Properties prop = geneConsumerProp();
ConsumerConnector newConsumerConn = kafka.consumer.Consumer
.createJavaConsumerConnector(new ConsumerConfig(prop));
consumerConnMap.put(topicName, newConsumerConn);
addNewConsumer(topicName, topic.get(topicName));
}
KafkaLoader.java 文件源码
项目:VoltDB
阅读 21
收藏 0
点赞 0
评论 0
public KafkaConsumerConnector(String zk, String groupName) {
//Get group id which should be unique for table so as to keep offsets clean for multiple runs.
String groupId = "voltdb-" + groupName;
//TODO: Should get this from properties file or something as override?
Properties props = new Properties();
props.put("zookeeper.connect", zk);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.commit.enable", "true");
props.put("auto.offset.reset", "smallest");
props.put("rebalance.backoff.ms", "10000");
m_consumerConfig = new ConsumerConfig(props);
m_consumer = kafka.consumer.Consumer.createJavaConsumerConnector(m_consumerConfig);
}
BlockingKafkaMessageConsumerCoordinator.java 文件源码
项目:benchmarkio
阅读 24
收藏 0
点赞 0
评论 0
@Override
public CompletionService<Histogram> startConsumers() {
final ConsumerConfig consumerConfig = new ConsumerConfig(props);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// Create message streams
final Map<String, Integer> topicMap = new HashMap<>();
topicMap.put(topic, numThreads);
final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicMap);
final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// Pass each stream to a consumer that will read from the stream in its own thread.
for (final KafkaStream<byte[], byte[]> stream : streams) {
executorCompletionService.submit(new BlockingKafkaMessageConsumer(stream));
}
return executorCompletionService;
}
KafkaSourceOp.java 文件源码
项目:StreamCQL
阅读 16
收藏 0
点赞 0
评论 0
/**
* {@inheritDoc}
*/
@Override
public void initialize()
throws StreamingException
{
ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = Maps.newHashMap();
topicCountMap.put(topic, TOPIC_COUNT);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
consumerIterator = stream.iterator();
}
HttpClient.java 文件源码
项目:punxsutawney
阅读 28
收藏 0
点赞 0
评论 0
public static void main(String[] args) throws Exception {
if (id == null) throw new IllegalStateException("Undefined HC_ID");
if (zk == null) throw new IllegalStateException("Undefined HC_ZK");
out.println("Starting " + HttpClient.class.getSimpleName());
out.println("Using zk:" + zk + ", id:" + id);
Properties props = new Properties();
props.put("zookeeper.connect", zk);
props.put("group.id", id);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
KafkaStream<byte[],byte[]> stream = consumer.createMessageStreams(Collections.singletonMap(id, 1)).get(id).get(0);
consume(consumer, stream);
}
KafkaIgniteStreamerSelfTest.java 文件源码
项目:ignite
阅读 16
收藏 0
点赞 0
评论 0
/**
* Creates default consumer config.
*
* @param zooKeeper ZooKeeper address <server:port>.
* @param grpId Group Id for kafka subscriber.
* @return Kafka consumer configuration.
*/
private ConsumerConfig createDefaultConsumerConfig(String zooKeeper, String grpId) {
A.notNull(zooKeeper, "zookeeper");
A.notNull(grpId, "groupId");
Properties props = new Properties();
props.put("zookeeper.connect", zooKeeper);
props.put("group.id", grpId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
return new ConsumerConfig(props);
}
SdcKafkaTestUtil.java 文件源码
项目:datacollector
阅读 17
收藏 0
点赞 0
评论 0
public List<KafkaStream<byte[], byte[]>> createKafkaStream(
String zookeeperConnectString,
String topic,
int partitions
) {
//create consumer
Properties consumerProps = new Properties();
consumerProps.put("zookeeper.connect", zookeeperConnectString);
consumerProps.put("group.id", "testClient");
consumerProps.put("zookeeper.session.timeout.ms", "6000");
consumerProps.put("zookeeper.sync.time.ms", "200");
consumerProps.put("auto.commit.interval.ms", "1000");
consumerProps.put("consumer.timeout.ms", "500");
ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, partitions);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
return consumerMap.get(topic);
}
KafkaTestUtil.java 文件源码
项目:datacollector
阅读 19
收藏 0
点赞 0
评论 0
public static List<KafkaStream<byte[], byte[]>> createKafkaStream(String zookeeperConnectString, String topic, int partitions) {
//create consumer
Properties consumerProps = new Properties();
consumerProps.put("zookeeper.connect", zookeeperConnectString);
consumerProps.put("group.id", "testClient");
consumerProps.put("zookeeper.session.timeout.ms", "6000");
consumerProps.put("zookeeper.sync.time.ms", "200");
consumerProps.put("auto.commit.interval.ms", "1000");
consumerProps.put("consumer.timeout.ms", "500");
ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, partitions);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
return consumerMap.get(topic);
}
AbstractMultiBrokerTest.java 文件源码
项目:netty-kafka-producer
阅读 20
收藏 0
点赞 0
评论 0
public List<KafkaStream<byte[], byte[]>> consume(String topic) {
Properties consumerProperties = TestUtils.createConsumerProperties(
ZK_HOST,
UUID.randomUUID().toString(),
"client",
TIMEOUT);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, 1); // not sure why is this 1
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
Consumer.createJavaConsumerConnector (new ConsumerConfig(consumerProperties)).createMessageStreams(topicCountMap);
return consumerMap.get(topic);
}
KafkaLogAppenderTest.java 文件源码
项目:java-kafka
阅读 15
收藏 0
点赞 0
评论 0
@Test
public void testKafkaLogAppender() {
Properties consumerProps = new Properties();
consumerProps.put("zookeeper.connect", zookeeper);
consumerProps.put("group.id", "kafka-log-appender-test");
consumerProps.put("auto.offset.reset", "smallest");
consumerProps.put("schema.registry.url", schemaRegistry);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, 1);
ConsumerIterator<String, Object> iterator = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps))
.createMessageStreams(topicMap, new StringDecoder(null), new KafkaAvroDecoder(new VerifiableProperties(consumerProps)))
.get(topic).get(0).iterator();
String testMessage = "I am a test message";
logger.info(testMessage);
MessageAndMetadata<String, Object> messageAndMetadata = iterator.next();
GenericRecord logLine = (GenericRecord) messageAndMetadata.message();
assertEquals(logLine.get("line").toString(), testMessage);
assertEquals(logLine.get("logtypeid"), KafkaLogAppender.InfoLogTypeId);
assertNotNull(logLine.get("source"));
assertEquals(((Map<CharSequence, Object>) logLine.get("timings")).size(), 1);
assertEquals(((Map<CharSequence, Object>) logLine.get("tag")).size(), 2);
}
KtGroup.java 文件源码
项目:kt
阅读 18
收藏 0
点赞 0
评论 0
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId, String offset) {
// http://kafka.apache.org/08/configuration.html
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("group.id", groupId);
// Turn off managing the offset in zookeeper and always start at the tail
// if we enable this in the future make sure to set 'auto.commit.interval.ms'
props.put("auto.commit.enable", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", offset);
return new ConsumerConfig(props);
}
KafkaConsumer.java 文件源码
项目:java-kafka-client-libs
阅读 26
收藏 0
点赞 0
评论 0
private void setUpConsumer( Map<String, Integer> topicMap, MessageHandler<?> handler, Properties consumerProps ) {
_executors = new HashMap<String, ExecutorService>();
_topicConsumers = new HashMap<String, ConsumerConnector>();
for ( String topic : topicMap.keySet() ) {
String normalizedTopic = topic.replace( ".", "_" );
String normalizedConsumerGroupId = getGroupId( consumerProps.getProperty( "group.id" ), normalizedTopic );
consumerProps.setProperty( "group.id", normalizedConsumerGroupId );
LOG.warn( "Consuming topic '" + topic + "' with group.id '" + normalizedConsumerGroupId + "'" );
LOG.warn( consumerProps.toString() );
ConsumerConfig topicConfig = new ConsumerConfig( consumerProps );
_topicConsumers.put( topic, kafka.consumer.Consumer.createJavaConsumerConnector( topicConfig ) );
}
_topicMap = topicMap;
_handler = handler;
}
KafkaSpout.java 文件源码
项目:monasca-thresh
阅读 17
收藏 0
点赞 0
评论 0
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
logger.info("Opened");
this.collector = collector;
logger.info(" topic = " + kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic());
this.spoutName = String.format("%s-%d", context.getThisComponentId(), context.getThisTaskId());
Properties kafkaProperties =
KafkaConsumerProperties.createKafkaProperties(kafkaSpoutConfig.kafkaConsumerConfiguration);
// Have to use a different consumer.id for each spout so use the storm taskId. Otherwise,
// zookeeper complains about a conflicted ephemeral node when there is more than one spout
// reading from a topic
kafkaProperties.setProperty("consumer.id", String.valueOf(context.getThisTaskId()));
ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}
KafkaTestBase.java 文件源码
项目:incubator-gobblin
阅读 21
收藏 0
点赞 0
评论 0
KafkaConsumerSuite(String zkConnectString, String topic)
{
_topic = topic;
Properties consumeProps = new Properties();
consumeProps.put("zookeeper.connect", zkConnectString);
consumeProps.put("group.id", _topic+"-"+System.nanoTime());
consumeProps.put("zookeeper.session.timeout.ms", "10000");
consumeProps.put("zookeeper.sync.time.ms", "10000");
consumeProps.put("auto.commit.interval.ms", "10000");
consumeProps.put("_consumer.timeout.ms", "10000");
_consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
_consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
_stream = streams.get(0);
_iterator = _stream.iterator();
}
KafkaTestBase.java 文件源码
项目:incubator-gobblin
阅读 16
收藏 0
点赞 0
评论 0
KafkaConsumerSuite(String zkConnectString, String topic)
{
_topic = topic;
Properties consumeProps = new Properties();
consumeProps.put("zookeeper.connect", zkConnectString);
consumeProps.put("group.id", _topic+"-"+System.nanoTime());
consumeProps.put("zookeeper.session.timeout.ms", "10000");
consumeProps.put("zookeeper.sync.time.ms", "10000");
consumeProps.put("auto.commit.interval.ms", "10000");
consumeProps.put("_consumer.timeout.ms", "10000");
_consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
_consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
_stream = streams.get(0);
_iterator = _stream.iterator();
}
KafkaConsumer.java 文件源码
项目:dataflux
阅读 27
收藏 0
点赞 0
评论 0
public KafkaConsumer(final String topic) {
this.topic = topic;
Properties props = null;
try {
//read in default configuration
InputStream in = getClass().getResourceAsStream("/consumer.properties");
props = new Properties();
props.load(in);
config = new ConsumerConfig(props);
//in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
KafkaConsumer.java 文件源码
项目:dataflux
阅读 27
收藏 0
点赞 0
评论 0
public KafkaConsumer(final String topic, final String configFile) {
this.topic = topic;
Properties props = null;
try {
FileReader reader = new FileReader(configFile);
props = new Properties();
props.load(reader);
config = new ConsumerConfig(props);
//in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
KafkaConsumerService.java 文件源码
项目:basis
阅读 74
收藏 0
点赞 0
评论 0
KafkaConsumerService(MetricRegistry metrics,
ConsumerConfig consumerConfig,
Map<String, Integer> topics,
Decoder<K> keyDecoder,
Decoder<V> valueDecoder,
MessageHandler<K, V> messageHandler) {
MDC.put("group_id", consumerConfig.groupId());
this.metrics = metrics;
this.consumerConfig = consumerConfig;
this.topics = topics;
this.keyDecoder = keyDecoder;
this.valueDecoder = valueDecoder;
this.messageHandler = messageHandler;
}
myConsumer.java 文件源码
项目:CadalWorkspace
阅读 26
收藏 0
点赞 0
评论 0
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect","10.15.62.76:2181");
props.put("group.id","mygroup001");
props.put("zookeeper.session.timeout.ms","40000");
props.put("zookeeper.sync.time.ms","200");
props.put("auto.commit.interval.ms","1000");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
topicCountMap.put("my-topic",new Integer(1));
System.out.println("zzzzzzzzzzzzz");
Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("my-topic");
KafkaStream<byte[], byte[]> stream = streams.get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
System.out.println("before while...");
while(it.hasNext()){
System.out.println(new String(it.next().message()));
}
}
KafkaSpout.java 文件源码
项目:CadalWorkspace
阅读 17
收藏 0
点赞 0
评论 0
/**
* Create a Kafka consumer.
*/
@Override
public void open() {
// these consumers use ZooKeeper for commit, offset and segment consumption tracking
// TODO: consider using SimpleConsumer the same way the Hadoop consumer job does to avoid ZK dependency
// TODO: use the task details from TopologyContext in the normal open method
ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// consumer with just one thread since the real parallelism is handled by Storm already
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaMessageStream>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaMessageStream stream = consumerMap.get(topic).get(0);
consumerIterator = stream.iterator();
}
KafkaSpout.java 文件源码
项目:CadalWorkspace
阅读 19
收藏 0
点赞 0
评论 0
/**
* Create a Kafka consumer.
*/
@Override
public void open() {
// these consumers use ZooKeeper for commit, offset and segment consumption tracking
// TODO: consider using SimpleConsumer the same way the Hadoop consumer job does to avoid ZK dependency
// TODO: use the task details from TopologyContext in the normal open method
ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// consumer with just one thread since the real parallelism is handled by Storm already
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaMessageStream>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaMessageStream stream = consumerMap.get(topic).get(0);
consumerIterator = stream.iterator();
}
SpoutKafka.java 文件源码
项目:CadalWorkspace
阅读 16
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this._collector = collector;
Properties props = new Properties();
props.put("zk.connect", "10.15.62.104:2181");
props.put("groupid", "group1");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer
.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("newtopic", new Integer(1));
Map<String, List<KafkaMessageStream>> consumerMap = consumer
.createMessageStreams(topicCountMap);
KafkaMessageStream stream = consumerMap.get("newtopic").get(0);
this.it = stream.iterator();
}
SpoutKafka.java 文件源码
项目:CadalWorkspace
阅读 21
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this._collector = collector;
this.logger = Logger.getLogger(BoltCassandra.class.getClass().getName());
// Construct kafka part
Properties props = new Properties();
props.put("zk.connect", "10.15.62.75:2181");
props.put("groupid", "sec-group-1"); //
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("sec-stream-one", new Integer(1)); //
Map<String, List<KafkaMessageStream>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaMessageStream stream = consumerMap.get("sec-stream-one").get(0); //
this.it = stream.iterator();
}