@Test
public void testLogging() throws InterruptedException {
for (int i = 0; i<1000; ++i) {
logger.info("message"+i);
}
final KafkaStream<byte[], byte[]> log = kafka.createClient().createMessageStreamsByFilter(new Whitelist("logs"),1).get(0);
final ConsumerIterator<byte[], byte[]> iterator = log.iterator();
for (int i=0; i<1000; ++i) {
final String messageFromKafka = new String(iterator.next().message(), UTF8);
assertThat(messageFromKafka, Matchers.equalTo("message"+i));
}
}
java类kafka.consumer.Whitelist的实例源码
LogbackIntegrationIT.java 文件源码
项目:wngn-jms-kafka
阅读 18
收藏 0
点赞 0
评论 0
KafkaReceiver.java 文件源码
项目:koper
阅读 22
收藏 0
点赞 0
评论 0
/**
* 启动 MessageReceiver,开始监听topic消息
*/
@Override
public void start() {
if (consumer == null) {
//sync init
synchronized (lock) {
init();
}
}
String topicString = buildTopicsString();
Whitelist topicFilter = new Whitelist(topicString);
List<KafkaStream<byte[], byte[]>> streamList = consumer.createMessageStreamsByFilter(topicFilter, partitions);
if (org.apache.commons.collections.CollectionUtils.isEmpty(streamList))
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
log.warn(e.getMessage(), e);
}
processStreamsByTopic(topicString, streamList);
}
MessageReader.java 文件源码
项目:secor
阅读 28
收藏 0
点赞 0
评论 0
public MessageReader(SecorConfig config, OffsetTracker offsetTracker) throws
UnknownHostException {
mConfig = config;
mOffsetTracker = offsetTracker;
mConsumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig());
if (!mConfig.getKafkaTopicBlacklist().isEmpty() && !mConfig.getKafkaTopicFilter().isEmpty()) {
throw new RuntimeException("Topic filter and blacklist cannot be both specified.");
}
TopicFilter topicFilter = !mConfig.getKafkaTopicBlacklist().isEmpty()? new Blacklist(mConfig.getKafkaTopicBlacklist()):
new Whitelist(mConfig.getKafkaTopicFilter());
LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), topicFilter);
List<KafkaStream<byte[], byte[]>> streams =
mConsumerConnector.createMessageStreamsByFilter(topicFilter);
KafkaStream<byte[], byte[]> stream = streams.get(0);
mIterator = stream.iterator();
mLastAccessTime = new HashMap<TopicPartition, Long>();
StatsUtil.setLabel("secor.kafka.consumer.id", IdUtil.getConsumerId());
mTopicPartitionForgetSeconds = mConfig.getTopicPartitionForgetSeconds();
mCheckMessagesPerSecond = mConfig.getMessagesPerSecond() / mConfig.getConsumerThreads();
mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass());
}
TopicCommand.java 文件源码
项目:buka
阅读 17
收藏 0
点赞 0
评论 0
private static List<String> getTopics(ZkClient zkClient, TopicCommandOptions opts) {
String topicsSpec = opts.options.valueOf(opts.topicOpt);
final Whitelist topicsFilter = new Whitelist(topicsSpec);
Set<String> allTopics = ZkUtils.getAllTopics(zkClient);
final List<String> result = Lists.newArrayList();
Utils.foreach(allTopics, new Callable1<String>() {
@Override
public void apply(String topic) {
if (topicsFilter.isTopicAllowed(topic))
result.add(topic);
}
});
Collections.sort(result);
return result;
}
KafkaAppenderIT.java 文件源码
项目:wngn-jms-kafka
阅读 21
收藏 0
点赞 0
评论 0
@Test
public void testLogging() throws InterruptedException {
final Logger logger = loggerContext.getLogger("ROOT");
unit.start();
assertTrue("appender is started", unit.isStarted());
for (int i = 0; i<1000; ++i) {
final LoggingEvent loggingEvent = new LoggingEvent("a.b.c.d", logger, Level.INFO, "message"+i, null, new Object[0]);
unit.append(loggingEvent);
}
final Properties consumerProperties = new Properties();
consumerProperties.put("metadata.broker.list", kafka.getBrokerList());
consumerProperties.put("group.id", "simple-consumer-" + new Random().nextInt());
consumerProperties.put("auto.commit.enable","false");
consumerProperties.put("auto.offset.reset","smallest");
consumerProperties.put("zookeeper.connect", kafka.getZookeeperConnection());
final kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(consumerProperties);
final ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
final KafkaStream<byte[], byte[]> log = javaConsumerConnector.createMessageStreamsByFilter(new Whitelist("logs"),1).get(0);
final ConsumerIterator<byte[], byte[]> iterator = log.iterator();
for (int i=0; i<1000; ++i) {
final String messageFromKafka = new String(iterator.next().message(), UTF8);
assertThat(messageFromKafka, Matchers.equalTo("message"+i));
}
}
KafkaIngesterConsumer.java 文件源码
项目:chaperone
阅读 22
收藏 0
点赞 0
评论 0
private void init() {
// register kafka offset lag metrics, one Gauge is for per consumer level granularity
MetricRegistry registry = Metrics.getRegistry();
try {
fetchedMsgCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-msgFetchRate");
failedToIngestCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-failedToIngest");
kafkaOffsetLagGauge =
registry.register("kafkaIngesterConsumer." + this.getName() + "-kafkaOffsetLag", new JmxAttributeGauge(
new ObjectName(maxLagMetricName), "Value"));
} catch (MalformedObjectNameException | IllegalArgumentException e) {
logger.error("Register failure for metrics of KafkaIngesterConsumer", e);
}
TopicFilter topicFilter = new Whitelist(AuditConfig.AUDIT_TOPIC_NAME);
logger.info("{}: Topic filter is {}", getName(), AuditConfig.AUDIT_TOPIC_NAME);
this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
KafkaStream<byte[], byte[]> stream = consumer.createMessageStreamsByFilter(topicFilter, 1).get(0);
iterator = stream.iterator();
logger.info("KafkaIngesterConsumer thread {} is initialized successfully", getName());
if (AuditConfig.INGESTER_ENABLE_DEDUP) {
deduplicator =
new Deduplicator(threadId, AuditConfig.INGESTER_REDIS_HOST, AuditConfig.INGESTER_REDIS_PORT,
AuditConfig.INGESTER_REDIS_KEY_TTL_SEC, AuditConfig.INGESTER_DUP_HOST_PREFIX,
AuditConfig.INGESTER_HOSTS_WITH_DUP);
deduplicator.open();
} else {
deduplicator = null;
}
}
TopicCount.java 文件源码
项目:druid-kafka-ext
阅读 22
收藏 0
点赞 0
评论 0
public String pattern() {
if (topicFilter instanceof Whitelist)
return whiteListPattern;
else if (topicFilter instanceof Blacklist)
return blackListPattern;
else
throw new KafkaZKException("Invalid topicFilter.");
}
KafkaReporterTest.java 文件源码
项目:java-kafka
阅读 17
收藏 0
点赞 0
评论 0
@Test
public void testCodahaleKafkaMetricsReporter() {
registry = new MetricRegistry();
registry.counter("test_counter").inc();
kafkaReporter = KafkaReporter.builder(registry,
kafkaConnect,
topic,
schemaRegistry).build();
// ObjectMapper mapper = new ObjectMapper().registerModule(new MetricsModule(TimeUnit.SECONDS,
// TimeUnit.SECONDS,
// false));
// StringWriter r = new StringWriter();
// try {
// mapper.writeValue(r, registry);
// } catch (IOException e) {
// e.printStackTrace();
// }
kafkaReporter.report();
Properties props = new Properties();
props.put("zookeeper.connect", zkConnect);
props.put("group.id", UUID.randomUUID().toString());
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "30000");
props.put("consumer.timeout.ms", "30000");
props.put("schema.registry.url", schemaRegistry);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
KafkaStream<String, Object> messageStream = consumer.createMessageStreamsByFilter(new Whitelist(topic),
1,
new StringDecoder(null),
new KafkaAvroDecoder(new VerifiableProperties(props))).get(0);
GenericRecord message = (GenericRecord) messageStream.iterator().next().message();
assertNotNull(message);
}
KafkaReporterTest.java 文件源码
项目:java-kafka
阅读 17
收藏 0
点赞 0
评论 0
@Test
public void testTopicReporter() {
MetricsRegistry registry = new MetricsRegistry();
Counter counter = registry.newCounter(KafkaReporterTest.class, "test-counter");
counter.inc();
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConnect);
producerProps.put("schema.registry.url", schemaRegistry);
KafkaReporter reporter = new KafkaReporter(registry, producerProps, topic);
reporter.start(1, TimeUnit.SECONDS);
Properties props = new Properties();
props.put("zookeeper.connect", zkConnect);
props.put("group.id", UUID.randomUUID().toString());
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "30000");
props.put("consumer.timeout.ms", "30000");
props.put("schema.registry.url", schemaRegistry);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
KafkaStream<String, Object> messageStream = consumer.createMessageStreamsByFilter(new Whitelist(topic),
1,
new StringDecoder(null),
new KafkaAvroDecoder(new VerifiableProperties(props))).get(0);
GenericRecord message = (GenericRecord) messageStream.iterator().next().message();
assertNotNull(message);
reporter.shutdown();
}
Topics.java 文件源码
项目:debezium-proto
阅读 27
收藏 0
点赞 0
评论 0
public static TopicFilter anyOf( String...topics) {
StringJoiner joiner = new StringJoiner(",");
for ( String topic : topics ) {
joiner.add(topic);
}
return new Whitelist(joiner.toString());
}
KafkaConsumerHelper.java 文件源码
项目:fudanweixin
阅读 16
收藏 0
点赞 0
评论 0
public synchronized void start() {
Config conf= Config.getInstance();
Properties props = new Properties();
props.put("zookeeper.connect",conf
.get("kafka.servers"));
props.put("group.id", conf.get("kafka.groupid"));
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(
props));
String topics = "";
for (String s : callbacks.keySet()) {
topics += "," + s;
}
if (topics.length() > 0) {
topics = topics.substring(1);
Decoder<String> sd = new StringDecoder(null);
List<KafkaStream<String, String>> streams = consumer
.createMessageStreamsByFilter(
new Whitelist(topics),
Integer.parseInt(conf.get(
"kafka.threads")), sd, sd);
if (streams != null) {
ExecutorService tph = ThreadPoolHelper.getInstance()
.getSchPool();
for (KafkaStream<String, String> stream : streams) {
tph.submit(new CallbackThread(callbacks, stream));
}
}
}
}
KafkaRpcPluginThread.java 文件源码
项目:opentsdb-rpc-kafka
阅读 19
收藏 0
点赞 0
评论 0
/**
* Default ctor
* @param group The group object this writer belongs to
* @param threadID The ID of the thread, an index from 0 to max int
* @param topics The topic list to subscribe to
*/
public KafkaRpcPluginThread(final KafkaRpcPluginGroup group,
final int threadID, final String topics) {
if (topics == null || topics.isEmpty()) {
throw new IllegalArgumentException("Missing topics");
}
if (threadID < 0) {
throw new IllegalArgumentException("Cannot have a negative thread ID: "
+ threadID);
}
if (group.getParent().getTSDB() == null) {
throw new IllegalArgumentException("Missing TSDB in the group");
}
if (group.getRateLimiter() == null) {
throw new IllegalArgumentException("Missing rate limiter in the group");
}
if (group.getGroupID() == null || group.getGroupID().isEmpty()) {
throw new IllegalArgumentException("Missing group ID");
}
if (group.getParent().getHost() == null ||
group.getParent().getHost().isEmpty()) {
throw new IllegalArgumentException("Missing host name");
}
namespace_counters = group.getParent().getNamespaceCounters();
track_metric_prefix = group.getParent().trackMetricPrefix();
this.thread_id = threadID;
this.group = group;
this.tsdb = group.getParent().getTSDB();
this.rate_limiter = group.getRateLimiter();
this.consumer_type = group.getConsumerType();
thread_running.set(false);
topic_filter = new Whitelist(topics);
consumer_id = threadID + "_" + group.getParent().getHost();
if (consumer_type == TsdbConsumerType.REQUEUE_RAW) {
if (group.getParent().getConfig().hasProperty(
KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + "requeueDelay")) {
requeue_delay = group.getParent().getConfig().getLong(
KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + "requeueDelay");
} else {
requeue_delay = KafkaRpcPluginConfig.DEFAULT_REQUEUE_DELAY_MS;
}
} else {
requeue_delay = 0;
}
deserializer = group.getDeserializer();
}
TopicCount.java 文件源码
项目:druid-kafka-ext
阅读 23
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
public static TopicCount constructTopicCount(ZKConnector<?> zkClient, String group,
String consumerId) {
KafkaZKData.ZKGroupDirs dirs = new KafkaZKData.ZKGroupDirs(group);
String subscriptionPattern = null;
Map<String, Integer> topMap = null;
try {
String topicCountString = zkClient.readData(dirs.consumerRegistryDir() + "/" + consumerId);
ObjectMapper mapper = new ObjectMapper();
TypeReference<Map<String, Object>> typeMap = new TypeReference<Map<String, Object>>() {
};
Map<String, Object> jsonObj = mapper.reader(typeMap).readValue(
topicCountString);
if (jsonObj == null)
throw new KafkaZKException("error constructing TopicCount : "
+ topicCountString);
Object pattern = jsonObj.get("pattern");
if (pattern == null)
throw new KafkaZKException("error constructing TopicCount : "
+ topicCountString);
subscriptionPattern = (String) pattern;
Object sub = jsonObj.get("subscription");
if (sub == null)
throw new KafkaZKException("error constructing TopicCount : "
+ topicCountString);
topMap = (Map<String, Integer>) sub;
} catch (Throwable t) {
throw new KafkaZKException(t);
}
boolean hasWhiteList = whiteListPattern.equals(subscriptionPattern);
boolean hasBlackList = blackListPattern.equals(subscriptionPattern);
if (topMap.isEmpty() || !(hasWhiteList || hasBlackList)) {
return new StaticTopicCount(consumerId, topMap);
} else {
String regex = null;
Integer numStreams = -1;
for (Entry<String, Integer> entity : topMap.entrySet()) {
regex = entity.getKey();
numStreams = entity.getValue();
break;
}
TopicFilter filter = hasWhiteList ? new Whitelist(regex)
: new Blacklist(regex);
return new WildcardTopicCount(zkClient, consumerId, filter,
numStreams);
}
}
Topics.java 文件源码
项目:debezium-proto
阅读 28
收藏 0
点赞 0
评论 0
public static TopicFilter of( String topic ) {
return new Whitelist(topic);
}