/**
* Default ctor
* @param group The group object this writer belongs to
* @param threadID The ID of the thread, an index from 0 to max int
* @param topics The topic list to subscribe to
*/
public KafkaRpcPluginThread(final KafkaRpcPluginGroup group,
final int threadID, final String topics) {
if (topics == null || topics.isEmpty()) {
throw new IllegalArgumentException("Missing topics");
}
if (threadID < 0) {
throw new IllegalArgumentException("Cannot have a negative thread ID: "
+ threadID);
}
if (group.getParent().getTSDB() == null) {
throw new IllegalArgumentException("Missing TSDB in the group");
}
if (group.getRateLimiter() == null) {
throw new IllegalArgumentException("Missing rate limiter in the group");
}
if (group.getGroupID() == null || group.getGroupID().isEmpty()) {
throw new IllegalArgumentException("Missing group ID");
}
if (group.getParent().getHost() == null ||
group.getParent().getHost().isEmpty()) {
throw new IllegalArgumentException("Missing host name");
}
namespace_counters = group.getParent().getNamespaceCounters();
track_metric_prefix = group.getParent().trackMetricPrefix();
this.thread_id = threadID;
this.group = group;
this.tsdb = group.getParent().getTSDB();
this.rate_limiter = group.getRateLimiter();
this.consumer_type = group.getConsumerType();
thread_running.set(false);
topic_filter = new Whitelist(topics);
consumer_id = threadID + "_" + group.getParent().getHost();
if (consumer_type == TsdbConsumerType.REQUEUE_RAW) {
if (group.getParent().getConfig().hasProperty(
KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + "requeueDelay")) {
requeue_delay = group.getParent().getConfig().getLong(
KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + "requeueDelay");
} else {
requeue_delay = KafkaRpcPluginConfig.DEFAULT_REQUEUE_DELAY_MS;
}
} else {
requeue_delay = 0;
}
deserializer = group.getDeserializer();
}
KafkaRpcPluginThread.java 文件源码
java
阅读 21
收藏 0
点赞 0
评论 0
项目:opentsdb-rpc-kafka
作者:
评论列表
文章目录