@SuppressWarnings("rawtypes")
NettyClientAsync(Map storm_conf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, ReconnectRunnable reconnector) {
super(storm_conf, factory, scheduler, host, port, reconnector);
BATCH_THREASHOLD_WARN = ConfigExtension.getNettyBufferThresholdSize(storm_conf);
blockSend = isBlockSend(storm_conf);
directlySend = isDirectSend(storm_conf);
flush_later = new AtomicBoolean(false);
flushCheckInterval = Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10);
Runnable flusher = new Runnable() {
@Override
public void run() {
flush();
}
};
long initialDelay = Math.min(1000, max_sleep_ms * max_retries);
scheduler.scheduleAtFixedRate(flusher, initialDelay, flushCheckInterval, TimeUnit.MILLISECONDS);
clientChannelFactory = factory;
start();
LOG.info(this.toString());
}
NettyClientAsync.java 文件源码
java
阅读 26
收藏 0
点赞 0
评论 0
项目:jstrom
作者:
评论列表
文章目录