KafkaRpcPluginThread.java 文件源码

java
阅读 21 收藏 0 点赞 0 评论 0

项目:opentsdb-rpc-kafka 作者:
/**
 * Default ctor
 * @param group The group object this writer belongs to
 * @param threadID The ID of the thread, an index from 0 to max int
 * @param topics The topic list to subscribe to
 */
public KafkaRpcPluginThread(final KafkaRpcPluginGroup group, 
    final int threadID, final String topics) {
  if (topics == null || topics.isEmpty()) {
    throw new IllegalArgumentException("Missing topics");
  }
  if (threadID < 0) {
    throw new IllegalArgumentException("Cannot have a negative thread ID: " 
        + threadID);
  }
  if (group.getParent().getTSDB() == null) {
    throw new IllegalArgumentException("Missing TSDB in the group");
  }
  if (group.getRateLimiter() == null) {
    throw new IllegalArgumentException("Missing rate limiter in the group");
  }
  if (group.getGroupID() == null || group.getGroupID().isEmpty()) {
    throw new IllegalArgumentException("Missing group ID");
  }
  if (group.getParent().getHost() == null || 
      group.getParent().getHost().isEmpty()) {
    throw new IllegalArgumentException("Missing host name");
  }

  namespace_counters = group.getParent().getNamespaceCounters();
  track_metric_prefix = group.getParent().trackMetricPrefix();
  this.thread_id = threadID;
  this.group = group;
  this.tsdb = group.getParent().getTSDB();
  this.rate_limiter = group.getRateLimiter();
  this.consumer_type = group.getConsumerType();
  thread_running.set(false);

  topic_filter = new Whitelist(topics);
  consumer_id = threadID + "_" + group.getParent().getHost();
  if (consumer_type == TsdbConsumerType.REQUEUE_RAW) {
    if (group.getParent().getConfig().hasProperty(
        KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + "requeueDelay")) {
      requeue_delay = group.getParent().getConfig().getLong(
          KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + "requeueDelay");
    } else {
      requeue_delay = KafkaRpcPluginConfig.DEFAULT_REQUEUE_DELAY_MS;
    }
  } else {
    requeue_delay = 0;
  }
  deserializer = group.getDeserializer();
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号