java类kafka.consumer.Consumer的实例源码

DefaultConsumerFactory.java 文件源码 项目:bootique-kafka-client 阅读 17 收藏 0 点赞 0 评论 0
@Override
public ConsumerConnector newConsumerConnector(String name, ConsumerConfig configOverrides) {

    Properties mergedProps = new Properties();

    Map<String, String> config = configs.get(name);

    if (config != null) {
        mergedProps.putAll(config);
    }

    if (configOverrides != null) {
        mergedProps.putAll(configOverrides.createConsumerConfig());
    }

    return Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(mergedProps));
}
OSMKafkaSpout.java 文件源码 项目:geomesa-tutorials 阅读 18 收藏 0 点赞 0 评论 0
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
    _collector = spoutOutputCollector;
    Properties props = new Properties();
    props.put("zookeeper.connect", conf.get(OSMIngest.ZOOKEEPERS));
    props.put("group.id", groupId);
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, 1);
    Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, new StringDecoder(new VerifiableProperties()), new StringDecoder(new VerifiableProperties()));
    List<KafkaStream<String, String>> streams = consumerMap.get(topic);
    KafkaStream<String, String> stream = null;
    if (streams.size() == 1) {
        stream = streams.get(0);
    } else {
        log.error("Streams should be of size 1");
    }
    kafkaIterator = stream.iterator();
}
BlockingKafkaMessageConsumerCoordinator.java 文件源码 项目:benchmarkio 阅读 20 收藏 0 点赞 0 评论 0
@Override
public CompletionService<Histogram> startConsumers() {
    final ConsumerConfig consumerConfig = new ConsumerConfig(props);

    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    // Create message streams
    final Map<String, Integer> topicMap = new HashMap<>();
    topicMap.put(topic, numThreads);

    final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicMap);
    final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // Pass each stream to a consumer that will read from the stream in its own thread.
    for (final KafkaStream<byte[], byte[]> stream : streams) {
        executorCompletionService.submit(new BlockingKafkaMessageConsumer(stream));
    }

    return executorCompletionService;
}
KafkaSourceOp.java 文件源码 项目:StreamCQL 阅读 15 收藏 0 点赞 0 评论 0
/**
 * {@inheritDoc}
 */
@Override
public void initialize()
    throws StreamingException
{
    ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = Maps.newHashMap();
    topicCountMap.put(topic, TOPIC_COUNT);

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
        consumerConnector.createMessageStreams(topicCountMap);
    KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
    consumerIterator = stream.iterator();
}
KafkaConsumerModule.java 文件源码 项目:heroic 阅读 18 收藏 0 点赞 0 评论 0
@Provides
@KafkaScope
ConsumerSchema.Consumer consumer(final IngestionManager ingestionManager) {
    // XXX: make target group configurable?
    final IngestionGroup ingestion = ingestionManager.useDefaultGroup();

    if (ingestion.isEmpty()) {
        throw new IllegalStateException("No backends are part of the ingestion group");
    }

    final ConsumerSchema.Depends d = DaggerConsumerSchema_Depends
        .builder()
        .primaryComponent(primary)
        .depends(depends)
        .dependsModule(new ConsumerSchema.DependsModule(ingestion))
        .build();

    final ConsumerSchema.Exposed exposed = schema.setup(d);
    return exposed.consumer();
}
KafkaHelper.java 文件源码 项目:easyframe-msg 阅读 15 收藏 0 点赞 0 评论 0
public static ConsumerConnector getConsumer(String groupId) {
    //加上线程名字的考虑是:保证每个线程只有一个Consumer,但是每个线程又可以有一个独立的Consumer,从而消费不同的partition
    String consumerKey = groupId + "|" + Thread.currentThread().getName();
    ConsumerConnector msgConnector = groupConsumers.get(consumerKey);
    if (msgConnector == null) {
        try {
            consumerLock.lock();
            msgConnector = groupConsumers.get(consumerKey);
            if (msgConnector == null) {
                msgConnector = Consumer.createJavaConsumerConnector(getConsumerRealConfig(groupId));
                groupConsumers.put(consumerKey, msgConnector);
            }
        } finally {
            consumerLock.unlock();
        }
    }

    return msgConnector;
}
HttpClient.java 文件源码 项目:punxsutawney 阅读 27 收藏 0 点赞 0 评论 0
public static void main(String[] args) throws Exception {
    if (id == null) throw new IllegalStateException("Undefined HC_ID");
    if (zk == null) throw new IllegalStateException("Undefined HC_ZK");

    out.println("Starting " + HttpClient.class.getSimpleName());
    out.println("Using zk:" + zk + ", id:" + id);

    Properties props = new Properties();
    props.put("zookeeper.connect", zk);
    props.put("group.id", id);
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");

    ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
    KafkaStream<byte[],byte[]> stream = consumer.createMessageStreams(Collections.singletonMap(id, 1)).get(id).get(0);

    consume(consumer, stream);
}
KafkaConfiguration.java 文件源码 项目:space-shuttle-demo 阅读 16 收藏 0 点赞 0 评论 0
@Bean
protected KafkaStream<String, float[]> kafkaStream() {

    final String topicName = retrieveTopicNameFromGatewayAddress(gatewayUrl());

    ConsumerConnector consumerConnector =
            Consumer.createJavaConsumerConnector(consumerConfig());
    Map<String, Integer> topicCounts = new HashMap<>();
    topicCounts.put(topicName, 1);
    VerifiableProperties emptyProps = new VerifiableProperties();
    StringDecoder keyDecoder = new StringDecoder(emptyProps);
    FeatureVectorDecoder valueDecoder = new FeatureVectorDecoder();
    Map<String, List<KafkaStream<String, float[]>>> streams =
            consumerConnector.createMessageStreams(topicCounts, keyDecoder, valueDecoder);
    List<KafkaStream<String, float[]>> streamsByTopic = streams.get(topicName);
    Preconditions.checkNotNull(streamsByTopic, String.format("Topic %s not found in streams map.", topicName));
    Preconditions.checkElementIndex(0, streamsByTopic.size(),
            String.format("List of streams of topic %s is empty.", topicName));
    return streamsByTopic.get(0);
}
SdcKafkaTestUtil.java 文件源码 项目:datacollector 阅读 18 收藏 0 点赞 0 评论 0
public List<KafkaStream<byte[], byte[]>> createKafkaStream(
    String zookeeperConnectString,
    String topic,
    int partitions
) {
  //create consumer
  Properties consumerProps = new Properties();
  consumerProps.put("zookeeper.connect", zookeeperConnectString);
  consumerProps.put("group.id", "testClient");
  consumerProps.put("zookeeper.session.timeout.ms", "6000");
  consumerProps.put("zookeeper.sync.time.ms", "200");
  consumerProps.put("auto.commit.interval.ms", "1000");
  consumerProps.put("consumer.timeout.ms", "500");
  ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
  Map<String, Integer> topicCountMap = new HashMap<>();
  topicCountMap.put(topic, partitions);
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  return consumerMap.get(topic);
}
KafkaTestUtil.java 文件源码 项目:datacollector 阅读 19 收藏 0 点赞 0 评论 0
public static List<KafkaStream<byte[], byte[]>> createKafkaStream(String zookeeperConnectString, String topic, int partitions) {
  //create consumer
  Properties consumerProps = new Properties();
  consumerProps.put("zookeeper.connect", zookeeperConnectString);
  consumerProps.put("group.id", "testClient");
  consumerProps.put("zookeeper.session.timeout.ms", "6000");
  consumerProps.put("zookeeper.sync.time.ms", "200");
  consumerProps.put("auto.commit.interval.ms", "1000");
  consumerProps.put("consumer.timeout.ms", "500");
  ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
  Map<String, Integer> topicCountMap = new HashMap<>();
  topicCountMap.put(topic, partitions);
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  return consumerMap.get(topic);

}
AbstractMultiBrokerTest.java 文件源码 项目:netty-kafka-producer 阅读 17 收藏 0 点赞 0 评论 0
public List<KafkaStream<byte[], byte[]>> consume(String topic) {

        Properties consumerProperties = TestUtils.createConsumerProperties(
                ZK_HOST,
                UUID.randomUUID().toString(),
                "client",
                TIMEOUT);

        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put(topic, 1); // not sure why is this 1

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
                Consumer.createJavaConsumerConnector (new ConsumerConfig(consumerProperties)).createMessageStreams(topicCountMap);

        return consumerMap.get(topic);
    }
KafkaLogAppenderTest.java 文件源码 项目:java-kafka 阅读 19 收藏 0 点赞 0 评论 0
@Test
public void testKafkaLogAppender() {
    Properties consumerProps = new Properties();
    consumerProps.put("zookeeper.connect", zookeeper);
    consumerProps.put("group.id", "kafka-log-appender-test");
    consumerProps.put("auto.offset.reset", "smallest");
    consumerProps.put("schema.registry.url", schemaRegistry);

    Map<String, Integer> topicMap = new HashMap<String, Integer>();
    topicMap.put(topic, 1);

    ConsumerIterator<String, Object> iterator = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps))
            .createMessageStreams(topicMap, new StringDecoder(null), new KafkaAvroDecoder(new VerifiableProperties(consumerProps)))
            .get(topic).get(0).iterator();

    String testMessage = "I am a test message";
    logger.info(testMessage);

    MessageAndMetadata<String, Object> messageAndMetadata = iterator.next();
    GenericRecord logLine = (GenericRecord) messageAndMetadata.message();
    assertEquals(logLine.get("line").toString(), testMessage);
    assertEquals(logLine.get("logtypeid"), KafkaLogAppender.InfoLogTypeId);
    assertNotNull(logLine.get("source"));
    assertEquals(((Map<CharSequence, Object>) logLine.get("timings")).size(), 1);
    assertEquals(((Map<CharSequence, Object>) logLine.get("tag")).size(), 2);
}
KtGroup.java 文件源码 项目:kt 阅读 19 收藏 0 点赞 0 评论 0
public KtGroup(Config config) {
    // Because we are not pushing names to zookeeper random names should be fine
    String groupId = config.getGroupId();
    if (groupId == null) {
        // default to a unique group id
        groupId = "Kt-" + UUID.randomUUID();
    }

    String offset = "largest";
    if (config.getLocation().equals("tail")) {
        offset = "smallest";
    }
    log.info("Starting consumer at '{}' offset", offset);
    consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(config.getZookeeper(), groupId, offset));
    this.topic = config.getTopic();
}
KafkaSpout.java 文件源码 项目:monasca-thresh 阅读 16 收藏 0 点赞 0 评论 0
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  logger.info("Opened");
  this.collector = collector;
  logger.info(" topic = " + kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic());
  this.spoutName = String.format("%s-%d", context.getThisComponentId(), context.getThisTaskId());

  Properties kafkaProperties =
      KafkaConsumerProperties.createKafkaProperties(kafkaSpoutConfig.kafkaConsumerConfiguration);
  // Have to use a different consumer.id for each spout so use the storm taskId. Otherwise,
  // zookeeper complains about a conflicted ephemeral node when there is more than one spout
  // reading from a topic
  kafkaProperties.setProperty("consumer.id", String.valueOf(context.getThisTaskId()));
  ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
  this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}
KafkaTestBase.java 文件源码 项目:incubator-gobblin 阅读 22 收藏 0 点赞 0 评论 0
KafkaConsumerSuite(String zkConnectString, String topic)
{
  _topic = topic;
  Properties consumeProps = new Properties();
  consumeProps.put("zookeeper.connect", zkConnectString);
  consumeProps.put("group.id", _topic+"-"+System.nanoTime());
  consumeProps.put("zookeeper.session.timeout.ms", "10000");
  consumeProps.put("zookeeper.sync.time.ms", "10000");
  consumeProps.put("auto.commit.interval.ms", "10000");
  consumeProps.put("_consumer.timeout.ms", "10000");

  _consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));

  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
      _consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
  _stream = streams.get(0);
  _iterator = _stream.iterator();
}
KafkaTestBase.java 文件源码 项目:incubator-gobblin 阅读 27 收藏 0 点赞 0 评论 0
KafkaConsumerSuite(String zkConnectString, String topic)
{
  _topic = topic;
  Properties consumeProps = new Properties();
  consumeProps.put("zookeeper.connect", zkConnectString);
  consumeProps.put("group.id", _topic+"-"+System.nanoTime());
  consumeProps.put("zookeeper.session.timeout.ms", "10000");
  consumeProps.put("zookeeper.sync.time.ms", "10000");
  consumeProps.put("auto.commit.interval.ms", "10000");
  consumeProps.put("_consumer.timeout.ms", "10000");

  _consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));

  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
      _consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
  _stream = streams.get(0);
  _iterator = _stream.iterator();
}
MessageReader.java 文件源码 项目:secor 阅读 24 收藏 0 点赞 0 评论 0
public MessageReader(SecorConfig config, OffsetTracker offsetTracker) throws
        UnknownHostException {
    mConfig = config;
    mOffsetTracker = offsetTracker;

    mConsumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig());

    if (!mConfig.getKafkaTopicBlacklist().isEmpty() && !mConfig.getKafkaTopicFilter().isEmpty()) {
        throw new RuntimeException("Topic filter and blacklist cannot be both specified.");
    }
    TopicFilter topicFilter = !mConfig.getKafkaTopicBlacklist().isEmpty()? new Blacklist(mConfig.getKafkaTopicBlacklist()):
            new Whitelist(mConfig.getKafkaTopicFilter());
    LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), topicFilter);
    List<KafkaStream<byte[], byte[]>> streams =
        mConsumerConnector.createMessageStreamsByFilter(topicFilter);
    KafkaStream<byte[], byte[]> stream = streams.get(0);
    mIterator = stream.iterator();
    mLastAccessTime = new HashMap<TopicPartition, Long>();
    StatsUtil.setLabel("secor.kafka.consumer.id", IdUtil.getConsumerId());
    mTopicPartitionForgetSeconds = mConfig.getTopicPartitionForgetSeconds();
    mCheckMessagesPerSecond = mConfig.getMessagesPerSecond() / mConfig.getConsumerThreads();
    mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass());
}
KafkaMessageConsumer.java 文件源码 项目:haogrgr-test 阅读 21 收藏 0 点赞 0 评论 0
/**
 * 初始化Kafka消费者客户端, 并获取Topic对应的Stream
 */
private void openKafkaStream() {
    logger.info("开始初始化Kafka消费客户端");

    this.consumer = Consumer.createJavaConsumerConnector(getConsumerConfig());

    StringDecoder decoder = new StringDecoder(null);
    Map<String, Integer> topicCountMap = Maps.of(topic, 1);
    Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,
            decoder, decoder);

    List<KafkaStream<String, String>> streams = consumerMap.get(topic);
    this.stream = streams.get(0);

    Assert.notNull(stream);
}
KafkaSpout.java 文件源码 项目:CadalWorkspace 阅读 17 收藏 0 点赞 0 评论 0
/**
 * Create a Kafka consumer.
 */
@Override
public void open() {

    // these consumers use ZooKeeper for commit, offset and segment consumption tracking
    // TODO: consider using SimpleConsumer the same way the Hadoop consumer job does to avoid ZK dependency
    // TODO: use the task details from TopologyContext in the normal open method
    ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    // consumer with just one thread since the real parallelism is handled by Storm already
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));

    Map<String, List<KafkaMessageStream>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
    KafkaMessageStream stream = consumerMap.get(topic).get(0);

    consumerIterator = stream.iterator();
}
KafkaSpout.java 文件源码 项目:CadalWorkspace 阅读 21 收藏 0 点赞 0 评论 0
/**
 * Create a Kafka consumer.
 */
@Override
public void open() {

    // these consumers use ZooKeeper for commit, offset and segment consumption tracking
    // TODO: consider using SimpleConsumer the same way the Hadoop consumer job does to avoid ZK dependency
    // TODO: use the task details from TopologyContext in the normal open method
    ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    // consumer with just one thread since the real parallelism is handled by Storm already
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));

    Map<String, List<KafkaMessageStream>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
    KafkaMessageStream stream = consumerMap.get(topic).get(0);

    consumerIterator = stream.iterator();
}
KafkaConsumerTest.java 文件源码 项目:pentaho-kafka-consumer 阅读 31 收藏 0 点赞 0 评论 0
@Before
public void setUp() {
    data = new KafkaConsumerData();
    meta = new KafkaConsumerMeta();
    meta.setKafkaProperties(getDefaultKafkaProperties());
    meta.setLimit(STEP_LIMIT);

    stepMeta = new StepMeta("KafkaConsumer", meta);
    transMeta = new TransMeta();
    transMeta.addStep(stepMeta);
    trans = new Trans(transMeta);

    PowerMockito.mockStatic(Consumer.class);

    when(Consumer.createJavaConsumerConnector(any(ConsumerConfig.class))).thenReturn(zookeeperConsumerConnector);
    when(zookeeperConsumerConnector.createMessageStreams(anyMapOf(String.class, Integer.class))).thenReturn(streamsMap);
    when(streamsMap.get(anyObject())).thenReturn(stream);
    when(stream.get(anyInt())).thenReturn(kafkaStream);
    when(kafkaStream.iterator()).thenReturn(streamIterator);
    when(streamIterator.next()).thenReturn(generateKafkaMessage());
}
VehicleDataGeneration.java 文件源码 项目:Practical-Real-time-Processing-and-Analytics 阅读 17 收藏 0 点赞 0 评论 0
private static Map<String, Location> getVehicleStartPoints() {
    Map<String, Location> vehicleStartPoint = new HashMap<String, Location>();
    Properties props = new Properties();
    props.put("zookeeper.connect", ZOOKEEPER_CONNECTION_STRING);
    props.put("group.id", "DataLoader" + r.nextInt(100));
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put("auto.offset.reset", "smallest");

    ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(KAFKA_TOPIC_STATIC_DATA, new Integer(1)); 

    KafkaStream<byte[], byte[]> stream = consumer.createMessageStreams(topicCountMap).get(KAFKA_TOPIC_STATIC_DATA)
            .get(0);

    ConsumerIterator<byte[], byte[]> it = stream.iterator();

    while (it.hasNext()) {
        String message = new String(it.next().message());
        try {
            vehicleStartPoint = objectMapper.readValue(message, new TypeReference<Map<String, Location>>() {
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
        break;
    }
    consumer.shutdown();
    return vehicleStartPoint;
}
Original.java 文件源码 项目:open-kilda 阅读 25 收藏 0 点赞 0 评论 0
private ConsumerIterator<String, String> buildConsumer(String topic) {
    Properties props = consumerProperties();

    Map<String, Integer> topicCountMap = new HashMap<>();
    topicCountMap.put(topic, 1);
    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams(topicCountMap, new StringDecoder(null), new StringDecoder(null));
    KafkaStream<String, String> stream = consumers.get(topic).get(0);
    return stream.iterator();
}
SimpleKafkaTest.java 文件源码 项目:open-kilda 阅读 16 收藏 0 点赞 0 评论 0
private ConsumerIterator<String, String> buildConsumer(String topic) {
    Properties props = consumerProperties();

    Map<String, Integer> topicCountMap = new HashMap<>();
    topicCountMap.put(topic, 1);
    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams(
            topicCountMap, new StringDecoder(null), new StringDecoder(null));
    KafkaStream<String, String> stream = consumers.get(topic).get(0);
    return stream.iterator();
}
KafkaUtils.java 文件源码 项目:Kafka-Insight 阅读 18 收藏 0 点赞 0 评论 0
public static ConsumerConnector createConsumerConnector(String zkAddr, String group) {
    Properties props = new Properties();
    props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
    props.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false");
    props.put(KafkaConfig.ZkConnectProp(), zkAddr);
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(props));
    return consumerConnector;
}
DeprecatedConsumer.java 文件源码 项目:wngn-jms-kafka 阅读 22 收藏 0 点赞 0 评论 0
public DeprecatedConsumer(String topic, Properties props) {
    this.props.put(ConsumerConstants.ZK_CONNECT, ConsumerConstants.ZK_CLUSTER_LIST);
    this.props.put(ConsumerConstants.GROUP_ID, ConsumerConstants.GROUPID_KAFKA_TEST);
    this.props.put(ConsumerConstants.ZK_SESSION_TIMEOUT_MS, "40000");
    this.props.put(ConsumerConstants.ZK_SYNC_TIME_MS, "200");
    this.props.put(ConsumerConstants.AUTO_COMMIT_INTERVAL_MS, "1000");
    this.props.putAll(props);
    this.consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
    this.topic = topic;
}
KafkaAppenderIT.java 文件源码 项目:wngn-jms-kafka 阅读 21 收藏 0 点赞 0 评论 0
@Test
public void testLogging() throws InterruptedException {

    final Logger logger = loggerContext.getLogger("ROOT");

    unit.start();

    assertTrue("appender is started", unit.isStarted());

    for (int i = 0; i<1000; ++i) {
        final LoggingEvent loggingEvent = new LoggingEvent("a.b.c.d", logger, Level.INFO, "message"+i, null, new Object[0]);
        unit.append(loggingEvent);
    }

    final Properties consumerProperties = new Properties();
    consumerProperties.put("metadata.broker.list", kafka.getBrokerList());
    consumerProperties.put("group.id", "simple-consumer-" + new Random().nextInt());
    consumerProperties.put("auto.commit.enable","false");
    consumerProperties.put("auto.offset.reset","smallest");
    consumerProperties.put("zookeeper.connect", kafka.getZookeeperConnection());
    final kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(consumerProperties);
    final ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    final KafkaStream<byte[], byte[]> log = javaConsumerConnector.createMessageStreamsByFilter(new Whitelist("logs"),1).get(0);
    final ConsumerIterator<byte[], byte[]> iterator = log.iterator();

    for (int i=0; i<1000; ++i) {
        final String messageFromKafka = new String(iterator.next().message(), UTF8);
        assertThat(messageFromKafka, Matchers.equalTo("message"+i));
    }


}
TestKafka.java 文件源码 项目:wngn-jms-kafka 阅读 18 收藏 0 点赞 0 评论 0
public ConsumerConnector createClient(Properties consumerProperties) {
    consumerProperties.put("metadata.broker.list", getBrokerList());
    consumerProperties.put("group.id", "simple-consumer-" + new Random().nextInt());
    consumerProperties.put("auto.commit.enable","false");
    consumerProperties.put("auto.offset.reset","smallest");
    consumerProperties.put("zookeeper.connect", getZookeeperConnection());
    final kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(consumerProperties);
    return Consumer.createJavaConsumerConnector(consumerConfig);
}
KafkaDemoClient.java 文件源码 项目:iotplatform 阅读 22 收藏 0 点赞 0 评论 0
private static ConsumerIterator<String, String> buildConsumer(String topic) {
    Map<String, Integer> topicCountMap = new HashMap<>();
    topicCountMap.put(topic, 1);
    ConsumerConfig consumerConfig = new ConsumerConfig(consumerProperties());
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams(topicCountMap, new StringDecoder(null), new StringDecoder(null));
    KafkaStream<String, String> stream = consumers.get(topic).get(0);
    return stream.iterator();
}
HighLevelConsumerDemo.java 文件源码 项目:jaf-examples 阅读 17 收藏 0 点赞 0 评论 0
public static void main(String[] args) {
    args = new String[] { Constants.ZK_SERVER, Constants.TOPIC_NAME, "group1", "consumer1" };
    if (args == null || args.length != 4) {
        System.err.println("Usage:\n\tjava -jar kafka_consumer.jar ${zookeeper_list} ${topic_name} ${group_name} ${consumer_id}");
        System.exit(1);
    }
    String zk = args[0];
    String topic = args[1];
    String groupid = args[2];
    String consumerid = args[3];
    Properties props = new Properties();
    props.put("zookeeper.connect", zk);
    props.put("group.id", groupid);
    props.put("client.id", "test");
    props.put("consumer.id", consumerid);
    props.put("auto.offset.reset", "smallest");
    props.put("auto.commit.enable", "true");
    props.put("auto.commit.interval.ms", "60000");

    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, 1);
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);

    KafkaStream<byte[], byte[]> stream1 = consumerMap.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> interator = stream1.iterator();
    while (interator.hasNext()) {
        MessageAndMetadata<byte[], byte[]> messageAndMetadata = interator.next();
        String message = String.format(
                "Topic:%s, GroupID:%s, Consumer ID:%s, PartitionID:%s, Offset:%s, Message Key:%s, Message Payload: %s",
                messageAndMetadata.topic(), groupid, consumerid, messageAndMetadata.partition(),
                messageAndMetadata.offset(), new String(messageAndMetadata.key() != null ? messageAndMetadata.key() : "".getBytes()),
                new String(messageAndMetadata.message()));
        System.out.println(message);
    }
}


问题


面经


文章

微信
公众号

扫码关注公众号