@Test
public void runNoData() throws Exception {
when(iterator.hasNext()).thenReturn(false);
final KafkaRpcPluginThread writer = Mockito.spy(
new KafkaRpcPluginThread(group, 1, TOPICS));
writer.run();
verify(tsdb, never()).addPoint(anyString(), anyLong(), anyLong(), anyMap());
verify(tsdb, never()).addHistogramPoint(anyString(), anyLong(),
any(byte[].class), anyMap());
verify(tsdb, never()).addAggregatePoint(anyString(), anyLong(), anyLong(),
anyMap(), anyBoolean(), anyString(), anyString(), anyString());
verify(consumer_connector, times(1))
.createMessageStreamsByFilter(any(TopicFilter.class), anyInt());
verify(writer, times(1)).shutdown();
verify(consumer_connector, times(1)).shutdown();
}
java类kafka.consumer.TopicFilter的实例源码
TestKafkaRpcPluginThread.java 文件源码
项目:opentsdb-rpc-kafka
阅读 15
收藏 0
点赞 0
评论 0
TestKafkaRpcPluginThread.java 文件源码
项目:opentsdb-rpc-kafka
阅读 16
收藏 0
点赞 0
评论 0
@Test
public void runNoDataRestart() throws Exception {
when(iterator.hasNext()).thenReturn(false);
final KafkaRpcPluginThread writer = Mockito.spy(
new KafkaRpcPluginThread(group, 1, TOPICS));
writer.run();
writer.run();
verify(tsdb, never()).addPoint(anyString(), anyLong(), anyLong(), anyMap());
verify(tsdb, never()).addHistogramPoint(anyString(), anyLong(),
any(byte[].class), anyMap());
verify(tsdb, never()).addAggregatePoint(anyString(), anyLong(), anyLong(),
anyMap(), anyBoolean(), anyString(), anyString(), anyString());
verify(consumer_connector, times(2))
.createMessageStreamsByFilter(any(TopicFilter.class), anyInt());
verify(writer, times(2)).shutdown();
verify(consumer_connector, times(2)).shutdown();
}
TestKafkaRpcPluginThread.java 文件源码
项目:opentsdb-rpc-kafka
阅读 17
收藏 0
点赞 0
评论 0
@Test
public void runNoStreams() throws Exception {
when(stream_list.get(0))
.thenThrow(new ArrayIndexOutOfBoundsException());
KafkaRpcPluginThread writer = Mockito.spy(
new KafkaRpcPluginThread(group, 1, TOPICS));
writer.run();
verify(tsdb, never()).addPoint(anyString(), anyLong(), anyLong(), anyMap());
verify(tsdb, never()).addHistogramPoint(anyString(), anyLong(),
any(byte[].class), anyMap());
verify(tsdb, never()).addAggregatePoint(anyString(), anyLong(), anyLong(),
anyMap(), anyBoolean(), anyString(), anyString(), anyString());
verify(consumer_connector, times(1))
.createMessageStreamsByFilter(any(TopicFilter.class), anyInt());
verify(writer, times(1)).shutdown();
verify(consumer_connector, times(1)).shutdown();
}
TestKafkaRpcPluginThread.java 文件源码
项目:opentsdb-rpc-kafka
阅读 17
收藏 0
点赞 0
评论 0
private void verifyMessageRead(final KafkaRpcPluginThread writer,
final boolean requeued) {
verify(writer, times(1)).shutdown();
verify(consumer_connector, times(1)).shutdown();
verify(consumer_connector, times(1))
.createMessageStreamsByFilter(any(TopicFilter.class), anyInt());
verify(iterator, times(2)).hasNext();
if (requeued) {
verify(requeue, times(1)).handleError(
any(IncomingDataPoint.class), any(Exception.class));
} else {
verify(requeue, never()).handleError(
any(IncomingDataPoint.class), any(Exception.class));
}
if (data != null) {
verify(rate_limiter, times(1)).acquire();
}
}
DbzDatabases.java 文件源码
项目:debezium-proto
阅读 17
收藏 0
点赞 0
评论 0
@Override
protected void onStart(DbzNode node) {
logger.debug("DATABASES: Starting and subscribing to '{}'...", Topic.SCHEMA_UPDATES);
// Add a single-threaded consumer that will read the "schema-updates" topic to get all database schema updates.
// We use a unique group ID so that we get *all* the messages on this topic.
int numThreads = 1;
String groupId = "databases-" + node.id(); // unique so that all clients see all messages
TopicFilter topicFilter = Topics.anyOf(Topic.SCHEMA_UPDATES);
node.subscribe(groupId, topicFilter, numThreads, (topic, partition, offset, key, msg) -> {
Document updatedSchema = Message.getAfter(msg);
DatabaseId dbId = Identifier.parseDatabaseId(key);
activeDatabases.put(dbId.asString(), new ActiveDatabase(dbId, updatedSchema));
logger.debug("DATABASES: Cached active database '{}'...", dbId);
return true;
});
}
MessageReader.java 文件源码
项目:secor
阅读 29
收藏 0
点赞 0
评论 0
public MessageReader(SecorConfig config, OffsetTracker offsetTracker) throws
UnknownHostException {
mConfig = config;
mOffsetTracker = offsetTracker;
mConsumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig());
if (!mConfig.getKafkaTopicBlacklist().isEmpty() && !mConfig.getKafkaTopicFilter().isEmpty()) {
throw new RuntimeException("Topic filter and blacklist cannot be both specified.");
}
TopicFilter topicFilter = !mConfig.getKafkaTopicBlacklist().isEmpty()? new Blacklist(mConfig.getKafkaTopicBlacklist()):
new Whitelist(mConfig.getKafkaTopicFilter());
LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), topicFilter);
List<KafkaStream<byte[], byte[]>> streams =
mConsumerConnector.createMessageStreamsByFilter(topicFilter);
KafkaStream<byte[], byte[]> stream = streams.get(0);
mIterator = stream.iterator();
mLastAccessTime = new HashMap<TopicPartition, Long>();
StatsUtil.setLabel("secor.kafka.consumer.id", IdUtil.getConsumerId());
mTopicPartitionForgetSeconds = mConfig.getTopicPartitionForgetSeconds();
mCheckMessagesPerSecond = mConfig.getMessagesPerSecond() / mConfig.getConsumerThreads();
mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass());
}
TestKafkaRpcPluginThread.java 文件源码
项目:opentsdb-rpc-kafka
阅读 19
收藏 0
点赞 0
评论 0
@Test
public void runConsumerRuntimeException() throws Exception {
when(consumer_connector.createMessageStreamsByFilter(
(TopicFilter) any(), anyInt())).thenThrow(
new RuntimeException("Foobar"));
KafkaRpcPluginThread writer = Mockito.spy(
new KafkaRpcPluginThread(group, 1, TOPICS));
writer.run();
verify(writer, times(1)).shutdown();
verify(consumer_connector, times(1)).shutdown();
}
TestKafkaRpcPluginThread.java 文件源码
项目:opentsdb-rpc-kafka
阅读 15
收藏 0
点赞 0
评论 0
@Test(expected = Exception.class)
public void runConsumerException() throws Exception {
when(consumer_connector.createMessageStreamsByFilter(
(TopicFilter) any(), anyInt())).thenThrow(
new Exception("Foobar"));
KafkaRpcPluginThread writer = Mockito.spy(
new KafkaRpcPluginThread(group, 1, TOPICS));
writer.run();
verify(writer, times(1)).shutdown();
verify(consumer_connector, times(1)).shutdown();
}
KafkaIngesterConsumer.java 文件源码
项目:chaperone
阅读 20
收藏 0
点赞 0
评论 0
private void init() {
// register kafka offset lag metrics, one Gauge is for per consumer level granularity
MetricRegistry registry = Metrics.getRegistry();
try {
fetchedMsgCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-msgFetchRate");
failedToIngestCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-failedToIngest");
kafkaOffsetLagGauge =
registry.register("kafkaIngesterConsumer." + this.getName() + "-kafkaOffsetLag", new JmxAttributeGauge(
new ObjectName(maxLagMetricName), "Value"));
} catch (MalformedObjectNameException | IllegalArgumentException e) {
logger.error("Register failure for metrics of KafkaIngesterConsumer", e);
}
TopicFilter topicFilter = new Whitelist(AuditConfig.AUDIT_TOPIC_NAME);
logger.info("{}: Topic filter is {}", getName(), AuditConfig.AUDIT_TOPIC_NAME);
this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
KafkaStream<byte[], byte[]> stream = consumer.createMessageStreamsByFilter(topicFilter, 1).get(0);
iterator = stream.iterator();
logger.info("KafkaIngesterConsumer thread {} is initialized successfully", getName());
if (AuditConfig.INGESTER_ENABLE_DEDUP) {
deduplicator =
new Deduplicator(threadId, AuditConfig.INGESTER_REDIS_HOST, AuditConfig.INGESTER_REDIS_PORT,
AuditConfig.INGESTER_REDIS_KEY_TTL_SEC, AuditConfig.INGESTER_DUP_HOST_PREFIX,
AuditConfig.INGESTER_HOSTS_WITH_DUP);
deduplicator.open();
} else {
deduplicator = null;
}
}
TopicCount.java 文件源码
项目:druid-kafka-ext
阅读 21
收藏 0
点赞 0
评论 0
public WildcardTopicCount(ZKConnector<?> zkClient, String consumerIdString,
TopicFilter topicFilter, Integer numStreams) {
this.consumerIdString = consumerIdString;
this.numStreams = numStreams;
this.topicFilter = topicFilter;
this.zkClient = zkClient;
}
Topics.java 文件源码
项目:debezium-proto
阅读 23
收藏 0
点赞 0
评论 0
public static TopicFilter anyOf( String...topics) {
StringJoiner joiner = new StringJoiner(",");
for ( String topic : topics ) {
joiner.add(topic);
}
return new Whitelist(joiner.toString());
}
Topics.java 文件源码
项目:debezium-proto
阅读 29
收藏 0
点赞 0
评论 0
public static TopicFilter noneOf( String...topics) {
StringJoiner joiner = new StringJoiner(",");
for ( String topic : topics ) {
joiner.add(topic);
}
return new Blacklist(joiner.toString());
}
InMemorySyncMessageBus.java 文件源码
项目:debezium-proto
阅读 20
收藏 0
点赞 0
评论 0
public Subscriber(String groupId, TopicFilter topicFilter, int numThreads, Deserializer<KeyType> keyDecoder,
Deserializer<MessageType> messageDecoder, MessageConsumer<KeyType, MessageType> consumer) {
this.topicFilter = topicFilter;
this.keyDecoder = keyDecoder;
this.messageDecoder = messageDecoder;
this.consumer = consumer;
}
TestKafkaRpcPluginThread.java 文件源码
项目:opentsdb-rpc-kafka
阅读 16
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
@Before
public void before() throws Exception {
tsdb = PowerMockito.mock(TSDB.class);
config = new KafkaRpcPluginConfig(new Config(false));
group = mock(KafkaRpcPluginGroup.class);
message = mock(MessageAndMetadata.class);
rate_limiter = mock(RateLimiter.class);
requeue = mock(KafkaStorageExceptionHandler.class);
counters = new ConcurrentHashMap<String, Map<String, AtomicLong>>();
deserializer = new JSONDeserializer();
consumer_connector = mock(ConsumerConnector.class);
mockStatic(Consumer.class);
when(Consumer.createJavaConsumerConnector((ConsumerConfig) any()))
.thenReturn(consumer_connector);
when(tsdb.getConfig()).thenReturn(config);
when(tsdb.getStorageExceptionHandler()).thenReturn(requeue);
parent = mock(KafkaRpcPlugin.class);
when(parent.getHost()).thenReturn(LOCALHOST);
when(parent.getTSDB()).thenReturn(tsdb);
when(parent.getConfig()).thenReturn(config);
when(parent.getNamespaceCounters()).thenReturn(counters);
when(parent.trackMetricPrefix()).thenReturn(true);
when(group.getParent()).thenReturn(parent);
when(group.getRateLimiter()).thenReturn(rate_limiter);
when(group.getGroupID()).thenReturn(GROUPID);
when(group.getConsumerType()).thenReturn(TsdbConsumerType.RAW);
when(group.getDeserializer()).thenReturn(deserializer);
config.overrideConfig(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX
+ "zookeeper.connect", ZKS);
stream_list = mock(List.class);
when(consumer_connector.createMessageStreamsByFilter(
(TopicFilter) any(), anyInt())).thenReturn(stream_list);
final KafkaStream<byte[], byte[]> stream = mock(KafkaStream.class);
when(stream_list.get(0)).thenReturn(stream);
iterator = mock(ConsumerIterator.class);
when(stream.iterator()).thenReturn(iterator);
when(iterator.hasNext()).thenReturn(true).thenReturn(false);
when(iterator.next()).thenReturn(message);
PowerMockito.mockStatic(ConsumerConfig.class);
PowerMockito.whenNew(ConsumerConfig.class).withAnyArguments()
.thenReturn(mock(ConsumerConfig.class));
PowerMockito.mockStatic(Consumer.class);
when(Consumer.createJavaConsumerConnector(any(ConsumerConfig.class)))
.thenReturn(consumer_connector);
}
TopicCount.java 文件源码
项目:druid-kafka-ext
阅读 20
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
public static TopicCount constructTopicCount(ZKConnector<?> zkClient, String group,
String consumerId) {
KafkaZKData.ZKGroupDirs dirs = new KafkaZKData.ZKGroupDirs(group);
String subscriptionPattern = null;
Map<String, Integer> topMap = null;
try {
String topicCountString = zkClient.readData(dirs.consumerRegistryDir() + "/" + consumerId);
ObjectMapper mapper = new ObjectMapper();
TypeReference<Map<String, Object>> typeMap = new TypeReference<Map<String, Object>>() {
};
Map<String, Object> jsonObj = mapper.reader(typeMap).readValue(
topicCountString);
if (jsonObj == null)
throw new KafkaZKException("error constructing TopicCount : "
+ topicCountString);
Object pattern = jsonObj.get("pattern");
if (pattern == null)
throw new KafkaZKException("error constructing TopicCount : "
+ topicCountString);
subscriptionPattern = (String) pattern;
Object sub = jsonObj.get("subscription");
if (sub == null)
throw new KafkaZKException("error constructing TopicCount : "
+ topicCountString);
topMap = (Map<String, Integer>) sub;
} catch (Throwable t) {
throw new KafkaZKException(t);
}
boolean hasWhiteList = whiteListPattern.equals(subscriptionPattern);
boolean hasBlackList = blackListPattern.equals(subscriptionPattern);
if (topMap.isEmpty() || !(hasWhiteList || hasBlackList)) {
return new StaticTopicCount(consumerId, topMap);
} else {
String regex = null;
Integer numStreams = -1;
for (Entry<String, Integer> entity : topMap.entrySet()) {
regex = entity.getKey();
numStreams = entity.getValue();
break;
}
TopicFilter filter = hasWhiteList ? new Whitelist(regex)
: new Blacklist(regex);
return new WildcardTopicCount(zkClient, consumerId, filter,
numStreams);
}
}
TopicCount.java 文件源码
项目:druid-kafka-ext
阅读 20
收藏 0
点赞 0
评论 0
public static TopicCount constructTopicCount(ZKConnector<?> zkClient,
String consumerIdString, TopicFilter filter, int numStreams) {
return new WildcardTopicCount(zkClient, consumerIdString, filter,
numStreams);
}
Topics.java 文件源码
项目:debezium-proto
阅读 24
收藏 0
点赞 0
评论 0
public static TopicFilter of( String topic ) {
return new Whitelist(topic);
}
KafkaMessageBus.java 文件源码
项目:debezium-proto
阅读 17
收藏 0
点赞 0
评论 0
@Override
public <KeyType, MessageType> void subscribe(String groupId, TopicFilter topicFilter, int numThreads, Deserializer<KeyType> keyDecoder,
Deserializer<MessageType> messageDecoder, MessageConsumer<KeyType, MessageType> consumer) {
if (!running) throw new IllegalStateException("Kafka client is no longer running");
if (numThreads < 1) return;
// Create the config for this consumer ...
final boolean autoCommit = true;
final boolean debug = logger.isDebugEnabled();
Properties props = new Properties();
props.putAll(this.consumerConfig);
props.put("group.id", groupId);
// Create the consumer and iterate over the streams and create a thread to process each one ...
ConsumerConnector connector = getOrCreateConnector(props);
connector.createMessageStreamsByFilter(topicFilter, numThreads, DEFAULT_DECODER, DEFAULT_DECODER)
.forEach(stream -> {
this.executor.get().execute(() -> {
final ConsumerIterator<byte[], byte[]> iter = stream.iterator();
boolean success = false;
while (true) {
try {
while (running && iter.hasNext()) {
// Determine if we're still running after we've received a message ...
if ( running ) {
MessageAndMetadata<byte[], byte[]> msg = iter.next();
if (debug) {
logger.debug("Consuming next message on topic '{}', partition {}, offset {}",
msg.topic(), msg.partition(), msg.offset());
}
success = consumer.consume(msg.topic(), msg.partition(), msg.offset(),
keyDecoder.deserialize(msg.topic(),msg.key()),
messageDecoder.deserialize(msg.topic(),msg.message()));
logger.debug("Consume message: {}", success);
if (success && autoCommit) {
logger.debug("Committing offsets");
connector.commitOffsets();
}
}
}
} catch (ConsumerTimeoutException e) {
logger.debug("Consumer timed out and continuing");
// Keep going ...
}
}
});
});
}
InMemorySyncMessageBus.java 文件源码
项目:debezium-proto
阅读 14
收藏 0
点赞 0
评论 0
@Override
public <KeyType, MessageType> void subscribe(String groupId, TopicFilter topicFilter, int numThreads, Deserializer<KeyType> keyDecoder,
Deserializer<MessageType> messageDecoder, MessageConsumer<KeyType, MessageType> consumer) {
subscribers.add(new Subscriber<>(groupId, topicFilter, numThreads, keyDecoder, messageDecoder, consumer));
}
ConsumerConnector.java 文件源码
项目:kafka-0.11.0.0-src-with-comment
阅读 17
收藏 0
点赞 0
评论 0
/**
* Create a list of MessageAndTopicStreams containing messages of type T.
*
* @param topicFilter a TopicFilter that specifies which topics to
* subscribe to (encapsulates a whitelist or a blacklist).
* @param numStreams the number of message streams to return.
* @param keyDecoder a decoder that decodes the message key
* @param valueDecoder a decoder that decodes the message itself
* @return a list of KafkaStream. Each stream supports an
* iterator over its MessageAndMetadata elements.
*/
public <K, V> List<KafkaStream<K, V>>
createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
DbzNode.java 文件源码
项目:debezium-proto
阅读 17
收藏 0
点赞 0
评论 0
/**
* Subscribe to one or more topics.
*
* @param groupId the identifier of the consumer's group; may not be null
* @param topicFilter the filter for the topics; may not be null
* @param numThreads the number of threads on which consumers should be called
* @param messageDecoder the decoder that should be used to convert the {@code byte[]} message into an object form expected by
* the consumer
* @param consumer the consumer, which should be threadsafe if {@code numThreads} is more than 1
*/
public <MessageType> void subscribe(String groupId, TopicFilter topicFilter, int numThreads, Deserializer<MessageType> messageDecoder,
MessageConsumer<String, MessageType> consumer) {
subscribe(groupId, topicFilter, numThreads, Serdes.stringDeserializer(), messageDecoder, consumer);
}
DbzNode.java 文件源码
项目:debezium-proto
阅读 15
收藏 0
点赞 0
评论 0
/**
* Subscribe to one or more topics.
*
* @param groupId the identifier of the consumer's group; may not be null
* @param topicFilter the filter for the topics; may not be null
* @param numThreads the number of threads on which consumers should be called
* @param consumer the consumer, which should be threadsafe if {@code numThreads} is more than 1
*/
public void subscribe(String groupId, TopicFilter topicFilter, int numThreads, MessageConsumer<String, Document> consumer) {
subscribe(groupId, topicFilter, numThreads, Serdes.stringDeserializer(), Serdes.document(), consumer);
}
DbzNode.java 文件源码
项目:debezium-proto
阅读 17
收藏 0
点赞 0
评论 0
/**
* Subscribe to one or more topics.
*
* @param groupId the identifier of the consumer's group; may not be null
* @param topicFilter the filter for the topics; may not be null
* @param numThreads the number of threads on which consumers should be called
* @param keyDecoder the decoder that should be used to convert the {@code byte[]} key into an object form expected by the
* consumer
* @param messageDecoder the decoder that should be used to convert the {@code byte[]} message into an object form expected by
* the consumer
* @param consumer the consumer, which should be threadsafe if {@code numThreads} is more than 1
*/
public <KeyType, MessageType> void subscribe(String groupId, TopicFilter topicFilter, int numThreads, Deserializer<KeyType> keyDecoder,
Deserializer<MessageType> messageDecoder, MessageConsumer<KeyType, MessageType> consumer) {
logger.debug("NODE: subscribing {} in group '{}' to topics {}",consumer,groupId,topicFilter);
messageBus.get().subscribe(groupId, topicFilter, numThreads, keyDecoder, messageDecoder, consumer);
}
MessageBus.java 文件源码
项目:debezium-proto
阅读 14
收藏 0
点赞 0
评论 0
/**
* Subscribe to one or more topics.
*
* @param groupId the identifier of the consumer's group; may not be null
* @param topicFilter the filter for the topics; may not be null
* @param numThreads the number of threads on which consumers should be called
* @param keyDeserializer the deserializer that should be used to convert the {@code byte[]} key into an object form expected
* by the consumer
* @param messageDeserializer the deserializer that should be used to convert the {@code byte[]} message into an object form
* expected by the consumer
* @param consumer the consumer, which should be threadsafe if {@code numThreads} is more than 1
*/
public <KeyType, MessageType> void subscribe(String groupId, TopicFilter topicFilter, int numThreads,
Deserializer<KeyType> keyDeserializer,
Deserializer<MessageType> messageDeserializer,
MessageConsumer<KeyType, MessageType> consumer);
ConsumerConnector.java 文件源码
项目:kafka-0.11.0.0-src-with-comment
阅读 17
收藏 0
点赞 0
评论 0
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
ConsumerConnector.java 文件源码
项目:kafka-0.11.0.0-src-with-comment
阅读 15
收藏 0
点赞 0
评论 0
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);