java类kafka.consumer.ConsumerConfig的实例源码

OldApiTopicConsumer.java 文件源码 项目:azeroth 阅读 17 收藏 0 点赞 0 评论 0
/**
 * 
 * @param connector
 * @param topics
 * @param processThreads 
 */
@SuppressWarnings("unchecked")
public OldApiTopicConsumer(ConsumerContext context) {

    this.consumerContext = context;
    try {
        Class<?> deserializerClass = Class
            .forName(context.getProperties().getProperty("value.deserializer"));
        deserializer = (Deserializer<Object>) deserializerClass.newInstance();
    } catch (Exception e) {
    }
    this.connector = kafka.consumer.Consumer
        .createJavaConsumerConnector(new ConsumerConfig(context.getProperties()));

    int poolSize = consumerContext.getMessageHandlers().size();
    this.fetchExecutor = new StandardThreadExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS,
        poolSize, new StandardThreadFactory("KafkaFetcher"));

    this.defaultProcessExecutor = new StandardThreadExecutor(1, context.getMaxProcessThreads(),
        30, TimeUnit.SECONDS, context.getMaxProcessThreads(),
        new StandardThreadFactory("KafkaProcessor"), new PoolFullRunsPolicy());

    logger.info(
        "Kafka Conumer ThreadPool initialized,fetchPool Size:{},defalutProcessPool Size:{} ",
        poolSize, context.getMaxProcessThreads());
}
KafkaDataProvider.java 文件源码 项目:linden 阅读 78 收藏 0 点赞 0 评论 0
public KafkaDataProvider(String zookeeper, String topic, String groupId) {
  super(MessageAndMetadata.class);
  Properties props = new Properties();
  props.put("zookeeper.connect", zookeeper);
  props.put("group.id", groupId);
  props.put("zookeeper.session.timeout.ms", "30000");
  props.put("auto.commit.interval.ms", "1000");
  props.put("fetch.message.max.bytes", "4194304");
  consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
  Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  topicCountMap.put(topic, 1);
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);

  iter = stream.iterator();
}
KafkaDataSpout.java 文件源码 项目:storm-demos 阅读 17 收藏 0 点赞 0 评论 0
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {

/**
 * this method used to set kafka-consumer configuration
 * 
 * Args :
 *  m_zookeeper: zookeeper address with port
 *  m_groupId  : kafka-consumer consumer group
 * 
 * Return :
 *  an object of ConnsumerConfig 
 * 
 */

      Properties props = new Properties();
      props.put("zookeeper.connect", zookeeper);
      props.put("group.id", groupId);
      props.put("zookeeper.session.timeout.ms", "400");
      props.put("zookeeper.sync.time.ms", "200");
      props.put("auto.commit.interval.ms", "1000");
      return new ConsumerConfig(props);
  }
DMaaPKafkaConsumerFactory.java 文件源码 项目:dmaap-framework 阅读 16 收藏 0 点赞 0 评论 0
private ConsumerConfig createConsumerConfig(String groupId,
        String consumerId) {
    final Properties props = new Properties();
    props.put("zookeeper.connect", fZooKeeper);
    props.put("group.id", groupId);
    props.put("consumer.id", consumerId);
    //props.put("auto.commit.enable", "false");
    // additional settings: start with our defaults, then pull in configured
    // overrides
    props.putAll(KafkaInternalDefaults);
    for (String key : KafkaConsumerKeys) {
        transferSettingIfProvided(props, key, "kafka");
    }

    return new ConsumerConfig(props);
}
OSMKafkaSpout.java 文件源码 项目:geomesa-tutorials 阅读 20 收藏 0 点赞 0 评论 0
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
    _collector = spoutOutputCollector;
    Properties props = new Properties();
    props.put("zookeeper.connect", conf.get(OSMIngest.ZOOKEEPERS));
    props.put("group.id", groupId);
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, 1);
    Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, new StringDecoder(new VerifiableProperties()), new StringDecoder(new VerifiableProperties()));
    List<KafkaStream<String, String>> streams = consumerMap.get(topic);
    KafkaStream<String, String> stream = null;
    if (streams.size() == 1) {
        stream = streams.get(0);
    } else {
        log.error("Streams should be of size 1");
    }
    kafkaIterator = stream.iterator();
}
KafkaDistributed.java 文件源码 项目:jlogstash-input-plugin 阅读 21 收藏 0 点赞 0 评论 0
@SuppressWarnings("unchecked")
public void prepare() {
    Properties props = geneConsumerProp();

    for(String topicName : topic.keySet()){
        ConsumerConnector consumer = kafka.consumer.Consumer
                .createJavaConsumerConnector(new ConsumerConfig(props));

        consumerConnMap.put(topicName, consumer);
    }
    if(distributed!=null){
        try {
            logger.warn("zkDistributed is start...");
            zkDistributed = ZkDistributed.getSingleZkDistributed(distributed);
            zkDistributed.zkRegistration();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            logger.error("zkRegistration fail:{}",ExceptionUtil.getErrorMessage(e));
        }
    }
}
KafkaDistributed.java 文件源码 项目:jlogstash-input-plugin 阅读 19 收藏 0 点赞 0 评论 0
public void reconnConsumer(String topicName){

        //停止topic 对应的conn
        ConsumerConnector consumerConn = consumerConnMap.get(topicName);
        consumerConn.commitOffsets(true);
        consumerConn.shutdown();
        consumerConnMap.remove(topicName);

        //停止topic 对应的stream消耗线程
        ExecutorService es = executorMap.get(topicName);
        es.shutdownNow();
        executorMap.remove(topicName);

        Properties prop = geneConsumerProp();
        ConsumerConnector newConsumerConn = kafka.consumer.Consumer
                .createJavaConsumerConnector(new ConsumerConfig(prop));
        consumerConnMap.put(topicName, newConsumerConn);

        addNewConsumer(topicName, topic.get(topicName));
}
Kafka.java 文件源码 项目:jlogstash-input-plugin 阅读 19 收藏 0 点赞 0 评论 0
public void reconnConsumer(String topicName){

    //停止topic 对应的conn
    ConsumerConnector consumerConn = consumerConnMap.get(topicName);
    consumerConn.commitOffsets(true);
    consumerConn.shutdown();
    consumerConnMap.remove(topicName);

    //停止topic 对应的stream消耗线程
    ExecutorService es = executorMap.get(topicName);
    es.shutdownNow();   
    executorMap.remove(topicName);

    Properties prop = geneConsumerProp();
    ConsumerConnector newConsumerConn = kafka.consumer.Consumer
            .createJavaConsumerConnector(new ConsumerConfig(prop));
    consumerConnMap.put(topicName, newConsumerConn);

    addNewConsumer(topicName, topic.get(topicName));
}
KafkaLoader.java 文件源码 项目:VoltDB 阅读 21 收藏 0 点赞 0 评论 0
public KafkaConsumerConnector(String zk, String groupName) {
    //Get group id which should be unique for table so as to keep offsets clean for multiple runs.
    String groupId = "voltdb-" + groupName;
    //TODO: Should get this from properties file or something as override?
    Properties props = new Properties();
    props.put("zookeeper.connect", zk);
    props.put("group.id", groupId);
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.commit.enable", "true");
    props.put("auto.offset.reset", "smallest");
    props.put("rebalance.backoff.ms", "10000");

    m_consumerConfig = new ConsumerConfig(props);

    m_consumer = kafka.consumer.Consumer.createJavaConsumerConnector(m_consumerConfig);
}
BlockingKafkaMessageConsumerCoordinator.java 文件源码 项目:benchmarkio 阅读 24 收藏 0 点赞 0 评论 0
@Override
public CompletionService<Histogram> startConsumers() {
    final ConsumerConfig consumerConfig = new ConsumerConfig(props);

    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    // Create message streams
    final Map<String, Integer> topicMap = new HashMap<>();
    topicMap.put(topic, numThreads);

    final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicMap);
    final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // Pass each stream to a consumer that will read from the stream in its own thread.
    for (final KafkaStream<byte[], byte[]> stream : streams) {
        executorCompletionService.submit(new BlockingKafkaMessageConsumer(stream));
    }

    return executorCompletionService;
}
KafkaSourceOp.java 文件源码 项目:StreamCQL 阅读 16 收藏 0 点赞 0 评论 0
/**
 * {@inheritDoc}
 */
@Override
public void initialize()
    throws StreamingException
{
    ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = Maps.newHashMap();
    topicCountMap.put(topic, TOPIC_COUNT);

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
        consumerConnector.createMessageStreams(topicCountMap);
    KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
    consumerIterator = stream.iterator();
}
HttpClient.java 文件源码 项目:punxsutawney 阅读 28 收藏 0 点赞 0 评论 0
public static void main(String[] args) throws Exception {
    if (id == null) throw new IllegalStateException("Undefined HC_ID");
    if (zk == null) throw new IllegalStateException("Undefined HC_ZK");

    out.println("Starting " + HttpClient.class.getSimpleName());
    out.println("Using zk:" + zk + ", id:" + id);

    Properties props = new Properties();
    props.put("zookeeper.connect", zk);
    props.put("group.id", id);
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");

    ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
    KafkaStream<byte[],byte[]> stream = consumer.createMessageStreams(Collections.singletonMap(id, 1)).get(id).get(0);

    consume(consumer, stream);
}
KafkaIgniteStreamerSelfTest.java 文件源码 项目:ignite 阅读 16 收藏 0 点赞 0 评论 0
/**
 * Creates default consumer config.
 *
 * @param zooKeeper ZooKeeper address &lt;server:port&gt;.
 * @param grpId Group Id for kafka subscriber.
 * @return Kafka consumer configuration.
 */
private ConsumerConfig createDefaultConsumerConfig(String zooKeeper, String grpId) {
    A.notNull(zooKeeper, "zookeeper");
    A.notNull(grpId, "groupId");

    Properties props = new Properties();

    props.put("zookeeper.connect", zooKeeper);
    props.put("group.id", grpId);
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "smallest");

    return new ConsumerConfig(props);
}
SdcKafkaTestUtil.java 文件源码 项目:datacollector 阅读 17 收藏 0 点赞 0 评论 0
public List<KafkaStream<byte[], byte[]>> createKafkaStream(
    String zookeeperConnectString,
    String topic,
    int partitions
) {
  //create consumer
  Properties consumerProps = new Properties();
  consumerProps.put("zookeeper.connect", zookeeperConnectString);
  consumerProps.put("group.id", "testClient");
  consumerProps.put("zookeeper.session.timeout.ms", "6000");
  consumerProps.put("zookeeper.sync.time.ms", "200");
  consumerProps.put("auto.commit.interval.ms", "1000");
  consumerProps.put("consumer.timeout.ms", "500");
  ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
  Map<String, Integer> topicCountMap = new HashMap<>();
  topicCountMap.put(topic, partitions);
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  return consumerMap.get(topic);
}
KafkaTestUtil.java 文件源码 项目:datacollector 阅读 19 收藏 0 点赞 0 评论 0
public static List<KafkaStream<byte[], byte[]>> createKafkaStream(String zookeeperConnectString, String topic, int partitions) {
  //create consumer
  Properties consumerProps = new Properties();
  consumerProps.put("zookeeper.connect", zookeeperConnectString);
  consumerProps.put("group.id", "testClient");
  consumerProps.put("zookeeper.session.timeout.ms", "6000");
  consumerProps.put("zookeeper.sync.time.ms", "200");
  consumerProps.put("auto.commit.interval.ms", "1000");
  consumerProps.put("consumer.timeout.ms", "500");
  ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
  Map<String, Integer> topicCountMap = new HashMap<>();
  topicCountMap.put(topic, partitions);
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  return consumerMap.get(topic);

}
AbstractMultiBrokerTest.java 文件源码 项目:netty-kafka-producer 阅读 20 收藏 0 点赞 0 评论 0
public List<KafkaStream<byte[], byte[]>> consume(String topic) {

        Properties consumerProperties = TestUtils.createConsumerProperties(
                ZK_HOST,
                UUID.randomUUID().toString(),
                "client",
                TIMEOUT);

        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put(topic, 1); // not sure why is this 1

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
                Consumer.createJavaConsumerConnector (new ConsumerConfig(consumerProperties)).createMessageStreams(topicCountMap);

        return consumerMap.get(topic);
    }
KafkaLogAppenderTest.java 文件源码 项目:java-kafka 阅读 15 收藏 0 点赞 0 评论 0
@Test
public void testKafkaLogAppender() {
    Properties consumerProps = new Properties();
    consumerProps.put("zookeeper.connect", zookeeper);
    consumerProps.put("group.id", "kafka-log-appender-test");
    consumerProps.put("auto.offset.reset", "smallest");
    consumerProps.put("schema.registry.url", schemaRegistry);

    Map<String, Integer> topicMap = new HashMap<String, Integer>();
    topicMap.put(topic, 1);

    ConsumerIterator<String, Object> iterator = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps))
            .createMessageStreams(topicMap, new StringDecoder(null), new KafkaAvroDecoder(new VerifiableProperties(consumerProps)))
            .get(topic).get(0).iterator();

    String testMessage = "I am a test message";
    logger.info(testMessage);

    MessageAndMetadata<String, Object> messageAndMetadata = iterator.next();
    GenericRecord logLine = (GenericRecord) messageAndMetadata.message();
    assertEquals(logLine.get("line").toString(), testMessage);
    assertEquals(logLine.get("logtypeid"), KafkaLogAppender.InfoLogTypeId);
    assertNotNull(logLine.get("source"));
    assertEquals(((Map<CharSequence, Object>) logLine.get("timings")).size(), 1);
    assertEquals(((Map<CharSequence, Object>) logLine.get("tag")).size(), 2);
}
KtGroup.java 文件源码 项目:kt 阅读 18 收藏 0 点赞 0 评论 0
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId, String offset) {
    // http://kafka.apache.org/08/configuration.html
    Properties props = new Properties();
    props.put("zookeeper.connect", zookeeper);
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("group.id", groupId);

    // Turn off managing the offset in zookeeper and always start at the tail
    // if we enable this in the future make sure to set 'auto.commit.interval.ms'
    props.put("auto.commit.enable", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", offset);

    return new ConsumerConfig(props);
}
KafkaConsumer.java 文件源码 项目:java-kafka-client-libs 阅读 26 收藏 0 点赞 0 评论 0
private void setUpConsumer( Map<String, Integer> topicMap, MessageHandler<?> handler, Properties consumerProps ) {
    _executors = new HashMap<String, ExecutorService>();
    _topicConsumers = new HashMap<String, ConsumerConnector>();

    for ( String topic : topicMap.keySet() ) {
        String normalizedTopic = topic.replace( ".", "_" );
        String normalizedConsumerGroupId = getGroupId( consumerProps.getProperty( "group.id" ), normalizedTopic );
        consumerProps.setProperty( "group.id", normalizedConsumerGroupId );
        LOG.warn( "Consuming topic '" + topic + "' with group.id '" + normalizedConsumerGroupId + "'" );
        LOG.warn( consumerProps.toString() );
        ConsumerConfig topicConfig = new ConsumerConfig( consumerProps );
        _topicConsumers.put( topic, kafka.consumer.Consumer.createJavaConsumerConnector( topicConfig ) );
    }
    _topicMap = topicMap;
    _handler = handler;
}
KafkaSpout.java 文件源码 项目:monasca-thresh 阅读 17 收藏 0 点赞 0 评论 0
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  logger.info("Opened");
  this.collector = collector;
  logger.info(" topic = " + kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic());
  this.spoutName = String.format("%s-%d", context.getThisComponentId(), context.getThisTaskId());

  Properties kafkaProperties =
      KafkaConsumerProperties.createKafkaProperties(kafkaSpoutConfig.kafkaConsumerConfiguration);
  // Have to use a different consumer.id for each spout so use the storm taskId. Otherwise,
  // zookeeper complains about a conflicted ephemeral node when there is more than one spout
  // reading from a topic
  kafkaProperties.setProperty("consumer.id", String.valueOf(context.getThisTaskId()));
  ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
  this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}
KafkaTestBase.java 文件源码 项目:incubator-gobblin 阅读 21 收藏 0 点赞 0 评论 0
KafkaConsumerSuite(String zkConnectString, String topic)
{
  _topic = topic;
  Properties consumeProps = new Properties();
  consumeProps.put("zookeeper.connect", zkConnectString);
  consumeProps.put("group.id", _topic+"-"+System.nanoTime());
  consumeProps.put("zookeeper.session.timeout.ms", "10000");
  consumeProps.put("zookeeper.sync.time.ms", "10000");
  consumeProps.put("auto.commit.interval.ms", "10000");
  consumeProps.put("_consumer.timeout.ms", "10000");

  _consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));

  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
      _consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
  _stream = streams.get(0);
  _iterator = _stream.iterator();
}
KafkaTestBase.java 文件源码 项目:incubator-gobblin 阅读 16 收藏 0 点赞 0 评论 0
KafkaConsumerSuite(String zkConnectString, String topic)
{
  _topic = topic;
  Properties consumeProps = new Properties();
  consumeProps.put("zookeeper.connect", zkConnectString);
  consumeProps.put("group.id", _topic+"-"+System.nanoTime());
  consumeProps.put("zookeeper.session.timeout.ms", "10000");
  consumeProps.put("zookeeper.sync.time.ms", "10000");
  consumeProps.put("auto.commit.interval.ms", "10000");
  consumeProps.put("_consumer.timeout.ms", "10000");

  _consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));

  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
      _consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
  _stream = streams.get(0);
  _iterator = _stream.iterator();
}
KafkaConsumer.java 文件源码 项目:dataflux 阅读 27 收藏 0 点赞 0 评论 0
public KafkaConsumer(final String topic) {
    this.topic = topic;
    Properties props = null;
    try {

        //read in default configuration
        InputStream in = getClass().getResourceAsStream("/consumer.properties");

        props = new Properties();

        props.load(in);

        config = new ConsumerConfig(props);

        //in.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}
KafkaConsumer.java 文件源码 项目:dataflux 阅读 27 收藏 0 点赞 0 评论 0
public KafkaConsumer(final String topic, final String configFile) {
    this.topic = topic;
    Properties props = null;
    try {

        FileReader reader = new FileReader(configFile);

        props = new Properties();

        props.load(reader);

        config = new ConsumerConfig(props);

        //in.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}
KafkaConsumerService.java 文件源码 项目:basis 阅读 74 收藏 0 点赞 0 评论 0
KafkaConsumerService(MetricRegistry metrics,
                     ConsumerConfig consumerConfig,
                     Map<String, Integer> topics,
                     Decoder<K> keyDecoder,
                     Decoder<V> valueDecoder,
                     MessageHandler<K, V> messageHandler) {

    MDC.put("group_id", consumerConfig.groupId());

    this.metrics = metrics;
    this.consumerConfig = consumerConfig;
    this.topics = topics;
    this.keyDecoder = keyDecoder;
    this.valueDecoder = valueDecoder;
    this.messageHandler = messageHandler;
}
myConsumer.java 文件源码 项目:CadalWorkspace 阅读 26 收藏 0 点赞 0 评论 0
public static void main(String[] args) {
    Properties props = new Properties();
    props.put("zookeeper.connect","10.15.62.76:2181");
    props.put("group.id","mygroup001");
    props.put("zookeeper.session.timeout.ms","40000");
    props.put("zookeeper.sync.time.ms","200");
    props.put("auto.commit.interval.ms","1000");

    ConsumerConfig consumerConfig = new ConsumerConfig(props);

    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
    topicCountMap.put("my-topic",new Integer(1));
    System.out.println("zzzzzzzzzzzzz");
    Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("my-topic");

    KafkaStream<byte[], byte[]> stream = streams.get(0);

    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    System.out.println("before while...");
    while(it.hasNext()){
        System.out.println(new String(it.next().message()));
    }
}
KafkaSpout.java 文件源码 项目:CadalWorkspace 阅读 17 收藏 0 点赞 0 评论 0
/**
 * Create a Kafka consumer.
 */
@Override
public void open() {

    // these consumers use ZooKeeper for commit, offset and segment consumption tracking
    // TODO: consider using SimpleConsumer the same way the Hadoop consumer job does to avoid ZK dependency
    // TODO: use the task details from TopologyContext in the normal open method
    ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    // consumer with just one thread since the real parallelism is handled by Storm already
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));

    Map<String, List<KafkaMessageStream>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
    KafkaMessageStream stream = consumerMap.get(topic).get(0);

    consumerIterator = stream.iterator();
}
KafkaSpout.java 文件源码 项目:CadalWorkspace 阅读 19 收藏 0 点赞 0 评论 0
/**
 * Create a Kafka consumer.
 */
@Override
public void open() {

    // these consumers use ZooKeeper for commit, offset and segment consumption tracking
    // TODO: consider using SimpleConsumer the same way the Hadoop consumer job does to avoid ZK dependency
    // TODO: use the task details from TopologyContext in the normal open method
    ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    // consumer with just one thread since the real parallelism is handled by Storm already
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));

    Map<String, List<KafkaMessageStream>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
    KafkaMessageStream stream = consumerMap.get(topic).get(0);

    consumerIterator = stream.iterator();
}
SpoutKafka.java 文件源码 项目:CadalWorkspace 阅读 16 收藏 0 点赞 0 评论 0
@SuppressWarnings("unchecked")
@Override
public void open(Map conf, TopologyContext context,
        SpoutOutputCollector collector) {
    this._collector = collector;


    Properties props = new Properties();
    props.put("zk.connect", "10.15.62.104:2181");
    props.put("groupid", "group1");

    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumer = kafka.consumer.Consumer
            .createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("newtopic", new Integer(1));

    Map<String, List<KafkaMessageStream>> consumerMap = consumer
            .createMessageStreams(topicCountMap);

    KafkaMessageStream stream = consumerMap.get("newtopic").get(0);

    this.it = stream.iterator();        
}
SpoutKafka.java 文件源码 项目:CadalWorkspace 阅读 21 收藏 0 点赞 0 评论 0
@SuppressWarnings("unchecked")
@Override
public void open(Map conf, TopologyContext context,
        SpoutOutputCollector collector) {
    this._collector = collector;
    this.logger = Logger.getLogger(BoltCassandra.class.getClass().getName());

    // Construct kafka part
    Properties props = new Properties();
    props.put("zk.connect", "10.15.62.75:2181");
    props.put("groupid", "sec-group-1");        // 

    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("sec-stream-one", new Integer(1));        // 

    Map<String, List<KafkaMessageStream>> consumerMap = consumer.createMessageStreams(topicCountMap);

    KafkaMessageStream stream = consumerMap.get("sec-stream-one").get(0);       //

    this.it = stream.iterator();
}


问题


面经


文章

微信
公众号

扫码关注公众号