@Override
public ConsumerConnector newConsumerConnector(String name, ConsumerConfig configOverrides) {
Properties mergedProps = new Properties();
Map<String, String> config = configs.get(name);
if (config != null) {
mergedProps.putAll(config);
}
if (configOverrides != null) {
mergedProps.putAll(configOverrides.createConsumerConfig());
}
return Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(mergedProps));
}
java类kafka.consumer.Consumer的实例源码
DefaultConsumerFactory.java 文件源码
项目:bootique-kafka-client
阅读 17
收藏 0
点赞 0
评论 0
OSMKafkaSpout.java 文件源码
项目:geomesa-tutorials
阅读 18
收藏 0
点赞 0
评论 0
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
_collector = spoutOutputCollector;
Properties props = new Properties();
props.put("zookeeper.connect", conf.get(OSMIngest.ZOOKEEPERS));
props.put("group.id", groupId);
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, new StringDecoder(new VerifiableProperties()), new StringDecoder(new VerifiableProperties()));
List<KafkaStream<String, String>> streams = consumerMap.get(topic);
KafkaStream<String, String> stream = null;
if (streams.size() == 1) {
stream = streams.get(0);
} else {
log.error("Streams should be of size 1");
}
kafkaIterator = stream.iterator();
}
BlockingKafkaMessageConsumerCoordinator.java 文件源码
项目:benchmarkio
阅读 20
收藏 0
点赞 0
评论 0
@Override
public CompletionService<Histogram> startConsumers() {
final ConsumerConfig consumerConfig = new ConsumerConfig(props);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// Create message streams
final Map<String, Integer> topicMap = new HashMap<>();
topicMap.put(topic, numThreads);
final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicMap);
final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// Pass each stream to a consumer that will read from the stream in its own thread.
for (final KafkaStream<byte[], byte[]> stream : streams) {
executorCompletionService.submit(new BlockingKafkaMessageConsumer(stream));
}
return executorCompletionService;
}
KafkaSourceOp.java 文件源码
项目:StreamCQL
阅读 15
收藏 0
点赞 0
评论 0
/**
* {@inheritDoc}
*/
@Override
public void initialize()
throws StreamingException
{
ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = Maps.newHashMap();
topicCountMap.put(topic, TOPIC_COUNT);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
consumerIterator = stream.iterator();
}
KafkaConsumerModule.java 文件源码
项目:heroic
阅读 18
收藏 0
点赞 0
评论 0
@Provides
@KafkaScope
ConsumerSchema.Consumer consumer(final IngestionManager ingestionManager) {
// XXX: make target group configurable?
final IngestionGroup ingestion = ingestionManager.useDefaultGroup();
if (ingestion.isEmpty()) {
throw new IllegalStateException("No backends are part of the ingestion group");
}
final ConsumerSchema.Depends d = DaggerConsumerSchema_Depends
.builder()
.primaryComponent(primary)
.depends(depends)
.dependsModule(new ConsumerSchema.DependsModule(ingestion))
.build();
final ConsumerSchema.Exposed exposed = schema.setup(d);
return exposed.consumer();
}
KafkaHelper.java 文件源码
项目:easyframe-msg
阅读 15
收藏 0
点赞 0
评论 0
public static ConsumerConnector getConsumer(String groupId) {
//加上线程名字的考虑是:保证每个线程只有一个Consumer,但是每个线程又可以有一个独立的Consumer,从而消费不同的partition
String consumerKey = groupId + "|" + Thread.currentThread().getName();
ConsumerConnector msgConnector = groupConsumers.get(consumerKey);
if (msgConnector == null) {
try {
consumerLock.lock();
msgConnector = groupConsumers.get(consumerKey);
if (msgConnector == null) {
msgConnector = Consumer.createJavaConsumerConnector(getConsumerRealConfig(groupId));
groupConsumers.put(consumerKey, msgConnector);
}
} finally {
consumerLock.unlock();
}
}
return msgConnector;
}
HttpClient.java 文件源码
项目:punxsutawney
阅读 27
收藏 0
点赞 0
评论 0
public static void main(String[] args) throws Exception {
if (id == null) throw new IllegalStateException("Undefined HC_ID");
if (zk == null) throw new IllegalStateException("Undefined HC_ZK");
out.println("Starting " + HttpClient.class.getSimpleName());
out.println("Using zk:" + zk + ", id:" + id);
Properties props = new Properties();
props.put("zookeeper.connect", zk);
props.put("group.id", id);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
KafkaStream<byte[],byte[]> stream = consumer.createMessageStreams(Collections.singletonMap(id, 1)).get(id).get(0);
consume(consumer, stream);
}
KafkaConfiguration.java 文件源码
项目:space-shuttle-demo
阅读 16
收藏 0
点赞 0
评论 0
@Bean
protected KafkaStream<String, float[]> kafkaStream() {
final String topicName = retrieveTopicNameFromGatewayAddress(gatewayUrl());
ConsumerConnector consumerConnector =
Consumer.createJavaConsumerConnector(consumerConfig());
Map<String, Integer> topicCounts = new HashMap<>();
topicCounts.put(topicName, 1);
VerifiableProperties emptyProps = new VerifiableProperties();
StringDecoder keyDecoder = new StringDecoder(emptyProps);
FeatureVectorDecoder valueDecoder = new FeatureVectorDecoder();
Map<String, List<KafkaStream<String, float[]>>> streams =
consumerConnector.createMessageStreams(topicCounts, keyDecoder, valueDecoder);
List<KafkaStream<String, float[]>> streamsByTopic = streams.get(topicName);
Preconditions.checkNotNull(streamsByTopic, String.format("Topic %s not found in streams map.", topicName));
Preconditions.checkElementIndex(0, streamsByTopic.size(),
String.format("List of streams of topic %s is empty.", topicName));
return streamsByTopic.get(0);
}
SdcKafkaTestUtil.java 文件源码
项目:datacollector
阅读 18
收藏 0
点赞 0
评论 0
public List<KafkaStream<byte[], byte[]>> createKafkaStream(
String zookeeperConnectString,
String topic,
int partitions
) {
//create consumer
Properties consumerProps = new Properties();
consumerProps.put("zookeeper.connect", zookeeperConnectString);
consumerProps.put("group.id", "testClient");
consumerProps.put("zookeeper.session.timeout.ms", "6000");
consumerProps.put("zookeeper.sync.time.ms", "200");
consumerProps.put("auto.commit.interval.ms", "1000");
consumerProps.put("consumer.timeout.ms", "500");
ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, partitions);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
return consumerMap.get(topic);
}
KafkaTestUtil.java 文件源码
项目:datacollector
阅读 19
收藏 0
点赞 0
评论 0
public static List<KafkaStream<byte[], byte[]>> createKafkaStream(String zookeeperConnectString, String topic, int partitions) {
//create consumer
Properties consumerProps = new Properties();
consumerProps.put("zookeeper.connect", zookeeperConnectString);
consumerProps.put("group.id", "testClient");
consumerProps.put("zookeeper.session.timeout.ms", "6000");
consumerProps.put("zookeeper.sync.time.ms", "200");
consumerProps.put("auto.commit.interval.ms", "1000");
consumerProps.put("consumer.timeout.ms", "500");
ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, partitions);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
return consumerMap.get(topic);
}
AbstractMultiBrokerTest.java 文件源码
项目:netty-kafka-producer
阅读 17
收藏 0
点赞 0
评论 0
public List<KafkaStream<byte[], byte[]>> consume(String topic) {
Properties consumerProperties = TestUtils.createConsumerProperties(
ZK_HOST,
UUID.randomUUID().toString(),
"client",
TIMEOUT);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, 1); // not sure why is this 1
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
Consumer.createJavaConsumerConnector (new ConsumerConfig(consumerProperties)).createMessageStreams(topicCountMap);
return consumerMap.get(topic);
}
KafkaLogAppenderTest.java 文件源码
项目:java-kafka
阅读 19
收藏 0
点赞 0
评论 0
@Test
public void testKafkaLogAppender() {
Properties consumerProps = new Properties();
consumerProps.put("zookeeper.connect", zookeeper);
consumerProps.put("group.id", "kafka-log-appender-test");
consumerProps.put("auto.offset.reset", "smallest");
consumerProps.put("schema.registry.url", schemaRegistry);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, 1);
ConsumerIterator<String, Object> iterator = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps))
.createMessageStreams(topicMap, new StringDecoder(null), new KafkaAvroDecoder(new VerifiableProperties(consumerProps)))
.get(topic).get(0).iterator();
String testMessage = "I am a test message";
logger.info(testMessage);
MessageAndMetadata<String, Object> messageAndMetadata = iterator.next();
GenericRecord logLine = (GenericRecord) messageAndMetadata.message();
assertEquals(logLine.get("line").toString(), testMessage);
assertEquals(logLine.get("logtypeid"), KafkaLogAppender.InfoLogTypeId);
assertNotNull(logLine.get("source"));
assertEquals(((Map<CharSequence, Object>) logLine.get("timings")).size(), 1);
assertEquals(((Map<CharSequence, Object>) logLine.get("tag")).size(), 2);
}
KtGroup.java 文件源码
项目:kt
阅读 19
收藏 0
点赞 0
评论 0
public KtGroup(Config config) {
// Because we are not pushing names to zookeeper random names should be fine
String groupId = config.getGroupId();
if (groupId == null) {
// default to a unique group id
groupId = "Kt-" + UUID.randomUUID();
}
String offset = "largest";
if (config.getLocation().equals("tail")) {
offset = "smallest";
}
log.info("Starting consumer at '{}' offset", offset);
consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(config.getZookeeper(), groupId, offset));
this.topic = config.getTopic();
}
KafkaSpout.java 文件源码
项目:monasca-thresh
阅读 16
收藏 0
点赞 0
评论 0
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
logger.info("Opened");
this.collector = collector;
logger.info(" topic = " + kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic());
this.spoutName = String.format("%s-%d", context.getThisComponentId(), context.getThisTaskId());
Properties kafkaProperties =
KafkaConsumerProperties.createKafkaProperties(kafkaSpoutConfig.kafkaConsumerConfiguration);
// Have to use a different consumer.id for each spout so use the storm taskId. Otherwise,
// zookeeper complains about a conflicted ephemeral node when there is more than one spout
// reading from a topic
kafkaProperties.setProperty("consumer.id", String.valueOf(context.getThisTaskId()));
ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}
KafkaTestBase.java 文件源码
项目:incubator-gobblin
阅读 22
收藏 0
点赞 0
评论 0
KafkaConsumerSuite(String zkConnectString, String topic)
{
_topic = topic;
Properties consumeProps = new Properties();
consumeProps.put("zookeeper.connect", zkConnectString);
consumeProps.put("group.id", _topic+"-"+System.nanoTime());
consumeProps.put("zookeeper.session.timeout.ms", "10000");
consumeProps.put("zookeeper.sync.time.ms", "10000");
consumeProps.put("auto.commit.interval.ms", "10000");
consumeProps.put("_consumer.timeout.ms", "10000");
_consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
_consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
_stream = streams.get(0);
_iterator = _stream.iterator();
}
KafkaTestBase.java 文件源码
项目:incubator-gobblin
阅读 27
收藏 0
点赞 0
评论 0
KafkaConsumerSuite(String zkConnectString, String topic)
{
_topic = topic;
Properties consumeProps = new Properties();
consumeProps.put("zookeeper.connect", zkConnectString);
consumeProps.put("group.id", _topic+"-"+System.nanoTime());
consumeProps.put("zookeeper.session.timeout.ms", "10000");
consumeProps.put("zookeeper.sync.time.ms", "10000");
consumeProps.put("auto.commit.interval.ms", "10000");
consumeProps.put("_consumer.timeout.ms", "10000");
_consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
_consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
_stream = streams.get(0);
_iterator = _stream.iterator();
}
MessageReader.java 文件源码
项目:secor
阅读 24
收藏 0
点赞 0
评论 0
public MessageReader(SecorConfig config, OffsetTracker offsetTracker) throws
UnknownHostException {
mConfig = config;
mOffsetTracker = offsetTracker;
mConsumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig());
if (!mConfig.getKafkaTopicBlacklist().isEmpty() && !mConfig.getKafkaTopicFilter().isEmpty()) {
throw new RuntimeException("Topic filter and blacklist cannot be both specified.");
}
TopicFilter topicFilter = !mConfig.getKafkaTopicBlacklist().isEmpty()? new Blacklist(mConfig.getKafkaTopicBlacklist()):
new Whitelist(mConfig.getKafkaTopicFilter());
LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), topicFilter);
List<KafkaStream<byte[], byte[]>> streams =
mConsumerConnector.createMessageStreamsByFilter(topicFilter);
KafkaStream<byte[], byte[]> stream = streams.get(0);
mIterator = stream.iterator();
mLastAccessTime = new HashMap<TopicPartition, Long>();
StatsUtil.setLabel("secor.kafka.consumer.id", IdUtil.getConsumerId());
mTopicPartitionForgetSeconds = mConfig.getTopicPartitionForgetSeconds();
mCheckMessagesPerSecond = mConfig.getMessagesPerSecond() / mConfig.getConsumerThreads();
mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass());
}
KafkaMessageConsumer.java 文件源码
项目:haogrgr-test
阅读 21
收藏 0
点赞 0
评论 0
/**
* 初始化Kafka消费者客户端, 并获取Topic对应的Stream
*/
private void openKafkaStream() {
logger.info("开始初始化Kafka消费客户端");
this.consumer = Consumer.createJavaConsumerConnector(getConsumerConfig());
StringDecoder decoder = new StringDecoder(null);
Map<String, Integer> topicCountMap = Maps.of(topic, 1);
Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,
decoder, decoder);
List<KafkaStream<String, String>> streams = consumerMap.get(topic);
this.stream = streams.get(0);
Assert.notNull(stream);
}
KafkaSpout.java 文件源码
项目:CadalWorkspace
阅读 17
收藏 0
点赞 0
评论 0
/**
* Create a Kafka consumer.
*/
@Override
public void open() {
// these consumers use ZooKeeper for commit, offset and segment consumption tracking
// TODO: consider using SimpleConsumer the same way the Hadoop consumer job does to avoid ZK dependency
// TODO: use the task details from TopologyContext in the normal open method
ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// consumer with just one thread since the real parallelism is handled by Storm already
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaMessageStream>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaMessageStream stream = consumerMap.get(topic).get(0);
consumerIterator = stream.iterator();
}
KafkaSpout.java 文件源码
项目:CadalWorkspace
阅读 21
收藏 0
点赞 0
评论 0
/**
* Create a Kafka consumer.
*/
@Override
public void open() {
// these consumers use ZooKeeper for commit, offset and segment consumption tracking
// TODO: consider using SimpleConsumer the same way the Hadoop consumer job does to avoid ZK dependency
// TODO: use the task details from TopologyContext in the normal open method
ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// consumer with just one thread since the real parallelism is handled by Storm already
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaMessageStream>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaMessageStream stream = consumerMap.get(topic).get(0);
consumerIterator = stream.iterator();
}
KafkaConsumerTest.java 文件源码
项目:pentaho-kafka-consumer
阅读 31
收藏 0
点赞 0
评论 0
@Before
public void setUp() {
data = new KafkaConsumerData();
meta = new KafkaConsumerMeta();
meta.setKafkaProperties(getDefaultKafkaProperties());
meta.setLimit(STEP_LIMIT);
stepMeta = new StepMeta("KafkaConsumer", meta);
transMeta = new TransMeta();
transMeta.addStep(stepMeta);
trans = new Trans(transMeta);
PowerMockito.mockStatic(Consumer.class);
when(Consumer.createJavaConsumerConnector(any(ConsumerConfig.class))).thenReturn(zookeeperConsumerConnector);
when(zookeeperConsumerConnector.createMessageStreams(anyMapOf(String.class, Integer.class))).thenReturn(streamsMap);
when(streamsMap.get(anyObject())).thenReturn(stream);
when(stream.get(anyInt())).thenReturn(kafkaStream);
when(kafkaStream.iterator()).thenReturn(streamIterator);
when(streamIterator.next()).thenReturn(generateKafkaMessage());
}
VehicleDataGeneration.java 文件源码
项目:Practical-Real-time-Processing-and-Analytics
阅读 17
收藏 0
点赞 0
评论 0
private static Map<String, Location> getVehicleStartPoints() {
Map<String, Location> vehicleStartPoint = new HashMap<String, Location>();
Properties props = new Properties();
props.put("zookeeper.connect", ZOOKEEPER_CONNECTION_STRING);
props.put("group.id", "DataLoader" + r.nextInt(100));
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("auto.offset.reset", "smallest");
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(KAFKA_TOPIC_STATIC_DATA, new Integer(1));
KafkaStream<byte[], byte[]> stream = consumer.createMessageStreams(topicCountMap).get(KAFKA_TOPIC_STATIC_DATA)
.get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
String message = new String(it.next().message());
try {
vehicleStartPoint = objectMapper.readValue(message, new TypeReference<Map<String, Location>>() {
});
} catch (IOException e) {
e.printStackTrace();
}
break;
}
consumer.shutdown();
return vehicleStartPoint;
}
Original.java 文件源码
项目:open-kilda
阅读 25
收藏 0
点赞 0
评论 0
private ConsumerIterator<String, String> buildConsumer(String topic) {
Properties props = consumerProperties();
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, 1);
ConsumerConfig consumerConfig = new ConsumerConfig(props);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams(topicCountMap, new StringDecoder(null), new StringDecoder(null));
KafkaStream<String, String> stream = consumers.get(topic).get(0);
return stream.iterator();
}
SimpleKafkaTest.java 文件源码
项目:open-kilda
阅读 16
收藏 0
点赞 0
评论 0
private ConsumerIterator<String, String> buildConsumer(String topic) {
Properties props = consumerProperties();
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, 1);
ConsumerConfig consumerConfig = new ConsumerConfig(props);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams(
topicCountMap, new StringDecoder(null), new StringDecoder(null));
KafkaStream<String, String> stream = consumers.get(topic).get(0);
return stream.iterator();
}
KafkaUtils.java 文件源码
项目:Kafka-Insight
阅读 18
收藏 0
点赞 0
评论 0
public static ConsumerConnector createConsumerConnector(String zkAddr, String group) {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false");
props.put(KafkaConfig.ZkConnectProp(), zkAddr);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(props));
return consumerConnector;
}
DeprecatedConsumer.java 文件源码
项目:wngn-jms-kafka
阅读 22
收藏 0
点赞 0
评论 0
public DeprecatedConsumer(String topic, Properties props) {
this.props.put(ConsumerConstants.ZK_CONNECT, ConsumerConstants.ZK_CLUSTER_LIST);
this.props.put(ConsumerConstants.GROUP_ID, ConsumerConstants.GROUPID_KAFKA_TEST);
this.props.put(ConsumerConstants.ZK_SESSION_TIMEOUT_MS, "40000");
this.props.put(ConsumerConstants.ZK_SYNC_TIME_MS, "200");
this.props.put(ConsumerConstants.AUTO_COMMIT_INTERVAL_MS, "1000");
this.props.putAll(props);
this.consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
this.topic = topic;
}
KafkaAppenderIT.java 文件源码
项目:wngn-jms-kafka
阅读 21
收藏 0
点赞 0
评论 0
@Test
public void testLogging() throws InterruptedException {
final Logger logger = loggerContext.getLogger("ROOT");
unit.start();
assertTrue("appender is started", unit.isStarted());
for (int i = 0; i<1000; ++i) {
final LoggingEvent loggingEvent = new LoggingEvent("a.b.c.d", logger, Level.INFO, "message"+i, null, new Object[0]);
unit.append(loggingEvent);
}
final Properties consumerProperties = new Properties();
consumerProperties.put("metadata.broker.list", kafka.getBrokerList());
consumerProperties.put("group.id", "simple-consumer-" + new Random().nextInt());
consumerProperties.put("auto.commit.enable","false");
consumerProperties.put("auto.offset.reset","smallest");
consumerProperties.put("zookeeper.connect", kafka.getZookeeperConnection());
final kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(consumerProperties);
final ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
final KafkaStream<byte[], byte[]> log = javaConsumerConnector.createMessageStreamsByFilter(new Whitelist("logs"),1).get(0);
final ConsumerIterator<byte[], byte[]> iterator = log.iterator();
for (int i=0; i<1000; ++i) {
final String messageFromKafka = new String(iterator.next().message(), UTF8);
assertThat(messageFromKafka, Matchers.equalTo("message"+i));
}
}
TestKafka.java 文件源码
项目:wngn-jms-kafka
阅读 18
收藏 0
点赞 0
评论 0
public ConsumerConnector createClient(Properties consumerProperties) {
consumerProperties.put("metadata.broker.list", getBrokerList());
consumerProperties.put("group.id", "simple-consumer-" + new Random().nextInt());
consumerProperties.put("auto.commit.enable","false");
consumerProperties.put("auto.offset.reset","smallest");
consumerProperties.put("zookeeper.connect", getZookeeperConnection());
final kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(consumerProperties);
return Consumer.createJavaConsumerConnector(consumerConfig);
}
KafkaDemoClient.java 文件源码
项目:iotplatform
阅读 22
收藏 0
点赞 0
评论 0
private static ConsumerIterator<String, String> buildConsumer(String topic) {
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, 1);
ConsumerConfig consumerConfig = new ConsumerConfig(consumerProperties());
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams(topicCountMap, new StringDecoder(null), new StringDecoder(null));
KafkaStream<String, String> stream = consumers.get(topic).get(0);
return stream.iterator();
}
HighLevelConsumerDemo.java 文件源码
项目:jaf-examples
阅读 17
收藏 0
点赞 0
评论 0
public static void main(String[] args) {
args = new String[] { Constants.ZK_SERVER, Constants.TOPIC_NAME, "group1", "consumer1" };
if (args == null || args.length != 4) {
System.err.println("Usage:\n\tjava -jar kafka_consumer.jar ${zookeeper_list} ${topic_name} ${group_name} ${consumer_id}");
System.exit(1);
}
String zk = args[0];
String topic = args[1];
String groupid = args[2];
String consumerid = args[3];
Properties props = new Properties();
props.put("zookeeper.connect", zk);
props.put("group.id", groupid);
props.put("client.id", "test");
props.put("consumer.id", consumerid);
props.put("auto.offset.reset", "smallest");
props.put("auto.commit.enable", "true");
props.put("auto.commit.interval.ms", "60000");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream1 = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> interator = stream1.iterator();
while (interator.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = interator.next();
String message = String.format(
"Topic:%s, GroupID:%s, Consumer ID:%s, PartitionID:%s, Offset:%s, Message Key:%s, Message Payload: %s",
messageAndMetadata.topic(), groupid, consumerid, messageAndMetadata.partition(),
messageAndMetadata.offset(), new String(messageAndMetadata.key() != null ? messageAndMetadata.key() : "".getBytes()),
new String(messageAndMetadata.message()));
System.out.println(message);
}
}