BookKeeperClient(DistributedLogConfiguration conf,
String name,
String zkServers,
ZooKeeperClient zkc,
String ledgersPath,
ClientSocketChannelFactory channelFactory,
HashedWheelTimer requestTimer,
StatsLogger statsLogger,
Optional<FeatureProvider> featureProvider) {
this.conf = conf;
this.name = name;
this.zkServers = zkServers;
this.ledgersPath = ledgersPath;
this.passwd = conf.getBKDigestPW().getBytes(UTF_8);
this.channelFactory = channelFactory;
this.requestTimer = requestTimer;
this.statsLogger = statsLogger;
this.featureProvider = featureProvider;
this.ownZK = null == zkc;
if (null != zkc) {
// reference the passing zookeeper client
this.zkc = zkc;
}
}
java类org.jboss.netty.channel.socket.ClientSocketChannelFactory的实例源码
BookKeeperClient.java 文件源码
项目:distributedlog
阅读 24
收藏 0
点赞 0
评论 0
LoadBalancer.java 文件源码
项目:tightrope
阅读 25
收藏 0
点赞 0
评论 0
public synchronized void start() {
final Executor bossPool = Executors.newCachedThreadPool();
final Executor workerPool = Executors.newCachedThreadPool();
bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(bossPool, workerPool));
final ClientSocketChannelFactory clientSocketChannelFactory = new NioClientSocketChannelFactory(bossPool, workerPool);
bootstrap.setOption("child.tcpNoDelay", true);
allChannels = new DefaultChannelGroup("handler");
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new FrontendHandler(allChannels, clientSocketChannelFactory, serverPool, statistics));
}
});
log.info("Starting on port {}", port);
acceptor = bootstrap.bind(new InetSocketAddress(port));
if (acceptor.isBound()) {
log.info("Server started successfully");
}
}
Fetcher.java 文件源码
项目:incubator-tajo
阅读 30
收藏 0
点赞 0
评论 0
public Fetcher(URI uri, File file, ClientSocketChannelFactory factory) {
this.uri = uri;
this.file = file;
String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
this.host = uri.getHost() == null ? "localhost" : uri.getHost();
this.port = uri.getPort();
if (port == -1) {
if (scheme.equalsIgnoreCase("http")) {
this.port = 80;
} else if (scheme.equalsIgnoreCase("https")) {
this.port = 443;
}
}
bootstrap = new ClientBootstrap(factory);
bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
bootstrap.setOption("tcpNoDelay", true);
ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(file);
bootstrap.setPipelineFactory(pipelineFactory);
}
BlockingRpcClient.java 文件源码
项目:incubator-tajo
阅读 25
收藏 0
点赞 0
评论 0
BlockingRpcClient(final Class<?> protocol,
final InetSocketAddress addr, ClientSocketChannelFactory factory)
throws Exception {
this.protocol = protocol;
String serviceClassName = protocol.getName() + "$"
+ protocol.getSimpleName() + "Service";
Class<?> serviceClass = Class.forName(serviceClassName);
stubMethod = serviceClass.getMethod("newBlockingStub",
BlockingRpcChannel.class);
this.handler = new ClientChannelUpstreamHandler();
pipeFactory = new ProtoPipelineFactory(handler,
RpcResponse.getDefaultInstance());
super.init(addr, pipeFactory, factory);
rpcChannel = new ProxyRpcChannel();
this.key = new RpcConnectionKey(addr, protocol, false);
}
AsyncRpcClient.java 文件源码
项目:incubator-tajo
阅读 32
收藏 0
点赞 0
评论 0
AsyncRpcClient(final Class<?> protocol,
final InetSocketAddress addr, ClientSocketChannelFactory factory)
throws Exception {
this.protocol = protocol;
String serviceClassName = protocol.getName() + "$"
+ protocol.getSimpleName() + "Service";
Class<?> serviceClass = Class.forName(serviceClassName);
stubMethod = serviceClass.getMethod("newStub", RpcChannel.class);
this.handler = new ClientChannelUpstreamHandler();
pipeFactory = new ProtoPipelineFactory(handler,
RpcResponse.getDefaultInstance());
super.init(addr, pipeFactory, factory);
rpcChannel = new ProxyRpcChannel();
this.key = new RpcConnectionKey(addr, protocol, true);
}
RpcChannelFactory.java 文件源码
项目:incubator-tajo
阅读 27
收藏 0
点赞 0
评论 0
public static synchronized ClientSocketChannelFactory createClientChannelFactory(String name, int workerNum) {
name = name + "-" + clientCount.incrementAndGet();
if(LOG.isDebugEnabled()){
LOG.debug("Create " + name + " ClientSocketChannelFactory. Worker:" + workerNum);
}
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
ThreadFactory bossFactory = builder.setNameFormat(name + " Boss #%d").build();
ThreadFactory workerFactory = builder.setNameFormat(name + " Worker #%d").build();
NioClientBossPool bossPool = new NioClientBossPool(Executors.newCachedThreadPool(bossFactory), 1,
new HashedWheelTimer(), ThreadNameDeterminer.CURRENT);
NioWorkerPool workerPool = new NioWorkerPool(Executors.newCachedThreadPool(workerFactory), workerNum,
ThreadNameDeterminer.CURRENT);
return new NioClientSocketChannelFactory(bossPool, workerPool);
}
NettyClientBase.java 文件源码
项目:incubator-tajo
阅读 20
收藏 0
点赞 0
评论 0
public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory)
throws IOException {
try {
this.bootstrap = new ClientBootstrap(factory);
this.bootstrap.setPipelineFactory(pipeFactory);
// TODO - should be configurable
this.bootstrap.setOption("connectTimeoutMillis", 10000);
this.bootstrap.setOption("connectResponseTimeoutMillis", 10000);
this.bootstrap.setOption("receiveBufferSize", 1048576 * 10);
this.bootstrap.setOption("tcpNoDelay", true);
this.bootstrap.setOption("keepAlive", true);
connect(addr);
} catch (Throwable t) {
close();
throw new IOException(t.getCause());
}
}
Fetcher.java 文件源码
项目:tajo-cdh
阅读 29
收藏 0
点赞 0
评论 0
public Fetcher(URI uri, File file, ClientSocketChannelFactory factory) {
this.uri = uri;
this.file = file;
String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
this.host = uri.getHost() == null ? "localhost" : uri.getHost();
this.port = uri.getPort();
if (port == -1) {
if (scheme.equalsIgnoreCase("http")) {
this.port = 80;
} else if (scheme.equalsIgnoreCase("https")) {
this.port = 443;
}
}
bootstrap = new ClientBootstrap(factory);
bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
bootstrap.setOption("tcpNoDelay", true);
ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(file);
bootstrap.setPipelineFactory(pipelineFactory);
}
BlockingRpcClient.java 文件源码
项目:tajo-cdh
阅读 24
收藏 0
点赞 0
评论 0
BlockingRpcClient(final Class<?> protocol,
final InetSocketAddress addr, ClientSocketChannelFactory factory)
throws Exception {
this.protocol = protocol;
String serviceClassName = protocol.getName() + "$"
+ protocol.getSimpleName() + "Service";
Class<?> serviceClass = Class.forName(serviceClassName);
stubMethod = serviceClass.getMethod("newBlockingStub",
BlockingRpcChannel.class);
this.handler = new ClientChannelUpstreamHandler();
pipeFactory = new ProtoPipelineFactory(handler,
RpcResponse.getDefaultInstance());
super.init(addr, pipeFactory, factory);
rpcChannel = new ProxyRpcChannel();
this.key = new RpcConnectionKey(addr, protocol, false);
}
AsyncRpcClient.java 文件源码
项目:tajo-cdh
阅读 28
收藏 0
点赞 0
评论 0
AsyncRpcClient(final Class<?> protocol,
final InetSocketAddress addr, ClientSocketChannelFactory factory)
throws Exception {
this.protocol = protocol;
String serviceClassName = protocol.getName() + "$"
+ protocol.getSimpleName() + "Service";
Class<?> serviceClass = Class.forName(serviceClassName);
stubMethod = serviceClass.getMethod("newStub", RpcChannel.class);
this.handler = new ClientChannelUpstreamHandler();
pipeFactory = new ProtoPipelineFactory(handler,
RpcResponse.getDefaultInstance());
super.init(addr, pipeFactory, factory);
rpcChannel = new ProxyRpcChannel();
this.key = new RpcConnectionKey(addr, protocol, true);
}
RpcChannelFactory.java 文件源码
项目:tajo-cdh
阅读 24
收藏 0
点赞 0
评论 0
public static synchronized ClientSocketChannelFactory createClientChannelFactory(String name, int workerNum) {
name = name + "-" + clientCount.incrementAndGet();
if(LOG.isDebugEnabled()){
LOG.debug("Create " + name + " ClientSocketChannelFactory. Worker:" + workerNum);
}
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
ThreadFactory bossFactory = builder.setNameFormat(name + " Boss #%d").build();
ThreadFactory workerFactory = builder.setNameFormat(name + " Worker #%d").build();
NioClientBossPool bossPool = new NioClientBossPool(Executors.newCachedThreadPool(bossFactory), 1,
new HashedWheelTimer(), ThreadNameDeterminer.CURRENT);
NioWorkerPool workerPool = new NioWorkerPool(Executors.newCachedThreadPool(workerFactory), workerNum,
ThreadNameDeterminer.CURRENT);
return new NioClientSocketChannelFactory(bossPool, workerPool);
}
NettyClientBase.java 文件源码
项目:tajo-cdh
阅读 20
收藏 0
点赞 0
评论 0
public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory)
throws IOException {
try {
this.bootstrap = new ClientBootstrap(factory);
this.bootstrap.setPipelineFactory(pipeFactory);
// TODO - should be configurable
this.bootstrap.setOption("connectTimeoutMillis", 10000);
this.bootstrap.setOption("connectResponseTimeoutMillis", 10000);
this.bootstrap.setOption("receiveBufferSize", 1048576 * 10);
this.bootstrap.setOption("tcpNoDelay", true);
this.bootstrap.setOption("keepAlive", true);
connect(addr);
} catch (Throwable t) {
close();
throw new IOException("Connect error to " + addr + " cause " + t.getMessage(), t.getCause());
}
}
HttpClientFactory.java 文件源码
项目:rest4j
阅读 25
收藏 0
点赞 0
评论 0
/**
* Creates a new HttpClientFactory.
*
* @param filters the filter chain shared by all Clients created by this factory
* @param channelFactory the ClientSocketChannelFactory that all Clients created by this
* factory will share
* @param shutdownFactory if true, the channelFactory will be shut down when this
* factory is shut down
* @param executor an executor shared by all Clients created by this factory to schedule
* tasks
* @param shutdownExecutor if true, the executor will be shut down when this factory is
* shut down
* @param callbackExecutor an optional executor to invoke user callbacks that otherwise
* will be invoked by scheduler executor.
* @param shutdownCallbackExecutor if true, the callback executor will be shut down when
* this factory is shut down
*/
public HttpClientFactory(FilterChain filters,
ClientSocketChannelFactory channelFactory,
boolean shutdownFactory,
ScheduledExecutorService executor,
boolean shutdownExecutor,
ExecutorService callbackExecutor,
boolean shutdownCallbackExecutor)
{
this(filters,
channelFactory,
shutdownFactory,
executor,
shutdownExecutor,
callbackExecutor,
shutdownCallbackExecutor,
NULL_JMX_MANAGER);
}
HttpClientFactory.java 文件源码
项目:rest4j
阅读 21
收藏 0
点赞 0
评论 0
public HttpClientFactory(FilterChain filters,
ClientSocketChannelFactory channelFactory,
boolean shutdownFactory,
ScheduledExecutorService executor,
boolean shutdownExecutor,
ExecutorService callbackExecutor,
boolean shutdownCallbackExecutor,
AbstractJmxManager jmxManager)
{
_filters = filters;
_channelFactory = channelFactory;
_shutdownFactory = shutdownFactory;
_executor = executor;
_shutdownExecutor = shutdownExecutor;
_callbackExecutor = callbackExecutor;
_shutdownCallbackExecutor = shutdownCallbackExecutor;
_jmxManager = jmxManager;
}
HttpNettyClient.java 文件源码
项目:rest4j
阅读 21
收藏 0
点赞 0
评论 0
/**
* Creates a new HttpNettyClient with some default parameters
*
* @see #HttpNettyClient(ClientSocketChannelFactory,ScheduledExecutorService,int,int,int,int,int,SSLContext,SSLParameters,int,ExecutorService,int)
*/
public HttpNettyClient(ClientSocketChannelFactory factory,
ScheduledExecutorService executor,
int poolSize,
int requestTimeout,
int idleTimeout,
int shutdownTimeout,
int maxResponseSize)
{
this(factory,
executor,
poolSize,
requestTimeout,
idleTimeout,
shutdownTimeout,
maxResponseSize,
null,
null,
Integer.MAX_VALUE,
executor,
Integer.MAX_VALUE);
}
TestHttpClientFactory.java 文件源码
项目:rest4j
阅读 18
收藏 0
点赞 0
评论 0
/**
* Tests that even when the factory is shutdown with a long timeout, it does not occupy
* any executors with tasks that might prevent them shutting down properly.
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
@Test
public void testShutdownTimeoutDoesNotOccupyExecutors()
throws InterruptedException, ExecutionException, TimeoutException
{
ExecutorService boss = Executors.newCachedThreadPool();
ExecutorService worker = Executors.newCachedThreadPool();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(boss, worker);
HttpClientFactory factory = new HttpClientFactory(FilterChains.empty(), channelFactory, false, scheduler, false);
FutureCallback<None> callback = new FutureCallback<None>();
factory.shutdown(callback, 60, TimeUnit.MINUTES);
callback.get(60, TimeUnit.SECONDS);
scheduler.shutdown();
channelFactory.releaseExternalResources();
Assert.assertTrue(scheduler.awaitTermination(60, TimeUnit.SECONDS));
Assert.assertTrue(boss.awaitTermination(60, TimeUnit.SECONDS));
Assert.assertTrue(worker.awaitTermination(60, TimeUnit.SECONDS));
}
RtmpProxy.java 文件源码
项目:flazr
阅读 23
收藏 0
点赞 0
评论 0
public static void main(String[] args) throws Exception {
Executor executor = Executors.newCachedThreadPool();
ChannelFactory factory = new NioServerSocketChannelFactory(executor, executor);
ServerBootstrap sb = new ServerBootstrap(factory);
ClientSocketChannelFactory cf = new NioClientSocketChannelFactory(executor, executor);
sb.setPipelineFactory(new ProxyPipelineFactory(cf,
RtmpConfig.PROXY_REMOTE_HOST, RtmpConfig.PROXY_REMOTE_PORT));
InetSocketAddress socketAddress = new InetSocketAddress(RtmpConfig.PROXY_PORT);
sb.bind(socketAddress);
logger.info("proxy server started, listening on {}", socketAddress);
Thread monitor = new StopMonitor(RtmpConfig.PROXY_STOP_PORT);
monitor.start();
monitor.join();
ChannelGroupFuture future = ALL_CHANNELS.close();
logger.info("closing channels");
future.awaitUninterruptibly();
logger.info("releasing resources");
factory.releaseExternalResources();
logger.info("server stopped");
}
RedisClient.java 文件源码
项目:ShankShock-Core
阅读 26
收藏 0
点赞 0
评论 0
/**
* Create a new client that connects to the supplied host and port. Connection
* attempts and non-blocking commands will {@link #setDefaultTimeout timeout}
* after 60 seconds.
*
* @param host Server hostname.
* @param port Server port.
*/
public RedisClient(String host, int port) {
ExecutorService connectors = Executors.newFixedThreadPool(1);
ExecutorService workers = Executors.newCachedThreadPool();
ClientSocketChannelFactory factory = new NioClientSocketChannelFactory(connectors, workers);
InetSocketAddress addr = new InetSocketAddress(host, port);
bootstrap = new ClientBootstrap(factory);
bootstrap.setOption("remoteAddress", addr);
setDefaultTimeout(60, TimeUnit.SECONDS);
channels = new DefaultChannelGroup();
timer = new HashedWheelTimer();
}
HexDumpProxy.java 文件源码
项目:uli-mini-tools
阅读 20
收藏 0
点赞 0
评论 0
public static void main(String[] args) throws Exception {
// Validate command line options.
if (args.length != 3) {
System.err.println("Usage: " + HexDumpProxy.class.getSimpleName() + " <local port> <remote host> <remote port>");
return;
}
// Parse command line options.
int localPort = Integer.parseInt(args[0]);
String remoteHost = args[1];
int remotePort = Integer.parseInt(args[2]);
System.err.println("Proxying *:" + localPort + " to " + remoteHost + ':' + remotePort + " ...");
// Configure the bootstrap.
Executor executor = Executors.newCachedThreadPool();
ServerBootstrap sb = new ServerBootstrap(new NioServerSocketChannelFactory(executor, executor));
// Set up the event pipeline factory.
ClientSocketChannelFactory cf = new NioClientSocketChannelFactory(executor, executor);
sb.setPipelineFactory(new HexDumpProxyPipelineFactory(cf, remoteHost, remotePort));
// Start up the server.
sb.bind(new InetSocketAddress(localPort));
}
TextDumpProxy.java 文件源码
项目:uli-mini-tools
阅读 17
收藏 0
点赞 0
评论 0
public static void main(String[] args) throws Exception {
// Validate command line options.
if (args.length != 3) {
System.err.println(
"Usage: " + TextDumpProxy.class.getSimpleName() +
" <local port> <remote host> <remote port>");
return;
}
// Parse command line options.
int localPort = Integer.parseInt(args[0]);
String remoteHost = args[1];
int remotePort = Integer.parseInt(args[2]);
System.err.println(
"Proxying *:" + localPort + " to " +
remoteHost + ':' + remotePort + " ...");
// Configure the bootstrap.
Executor executor = Executors.newCachedThreadPool();
ServerBootstrap sb = new ServerBootstrap(new NioServerSocketChannelFactory(executor, executor));
// Set up the event pipeline factory.
ClientSocketChannelFactory cf = new NioClientSocketChannelFactory(executor, executor);
sb.setPipelineFactory(new TextDumpProxyPipelineFactory(cf, remoteHost, remotePort));
// Start up the server.
sb.bind(new InetSocketAddress(localPort));
}
HttpTunnelSoakTester.java 文件源码
项目:httptunnel
阅读 26
收藏 0
点赞 0
评论 0
public HttpTunnelSoakTester() {
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
executor = Executors.newCachedThreadPool();
ServerSocketChannelFactory serverChannelFactory = new NioServerSocketChannelFactory(
executor, executor);
HttpTunnelServerChannelFactory serverTunnelFactory = new HttpTunnelServerChannelFactory(
serverChannelFactory);
serverBootstrap = new ServerBootstrap(serverTunnelFactory);
serverBootstrap.setPipelineFactory(createServerPipelineFactory());
ClientSocketChannelFactory clientChannelFactory = new NioClientSocketChannelFactory(
executor, executor);
HttpTunnelClientChannelFactory clientTunnelFactory = new HttpTunnelClientChannelFactory(
clientChannelFactory);
clientBootstrap = new ClientBootstrap(clientTunnelFactory);
clientBootstrap.setPipelineFactory(createClientPipelineFactory());
configureProxy();
channels = new DefaultChannelGroup();
}
TCPClient.java 文件源码
项目:CacheStore
阅读 27
收藏 0
点赞 0
评论 0
private static ClientSocketChannelFactory getClientSocketChannelFactory(boolean nio) {
if ( nio)
return new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
else
return new OioClientSocketChannelFactory(
Executors.newCachedThreadPool());
}
ClientConnectionImpl.java 文件源码
项目:bigstreams
阅读 23
收藏 0
点赞 0
评论 0
public ClientConnectionImpl(ExecutorService connectService,
ClientSocketChannelFactory socketChannelFactory, Timer timeoutTimer) {
super();
this.connectService = connectService;
this.socketChannelFactory = socketChannelFactory;
this.timeoutTimer = timeoutTimer;
}
ClientConnectionFactoryImpl.java 文件源码
项目:bigstreams
阅读 29
收藏 0
点赞 0
评论 0
public ClientConnectionFactoryImpl(Timer timeoutTimer,
ClientSocketChannelFactory socketChannelFactory,
long connectEstablishTimeout, long sendTimeOut, Protocol protocol) {
super();
this.timeoutTimer = timeoutTimer;
this.socketChannelFactory = socketChannelFactory;
this.connectEstablishTimeout = connectEstablishTimeout;
this.sendTimeOut = sendTimeOut;
this.protocol = protocol;
}
PerChannelBookieClient.java 文件源码
项目:bigstreams
阅读 23
收藏 0
点赞 0
评论 0
public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
InetSocketAddress addr, AtomicLong totalBytesOutstanding) {
this.addr = addr;
this.executor = executor;
this.totalBytesOutstanding = totalBytesOutstanding;
this.channelFactory = channelFactory;
connect(channelFactory);
}
NetworkFailureHandler.java 文件源码
项目:flink
阅读 21
收藏 0
点赞 0
评论 0
public NetworkFailureHandler(
AtomicBoolean blocked,
Consumer<NetworkFailureHandler> onClose,
ClientSocketChannelFactory channelFactory,
String remoteHost,
int remotePort) {
this.blocked = blocked;
this.onClose = onClose;
this.channelFactory = channelFactory;
this.remoteHost = remoteHost;
this.remotePort = remotePort;
}
NetworkFailuresProxy.java 文件源码
项目:flink
阅读 23
收藏 0
点赞 0
评论 0
public NetworkFailuresProxy(int localPort, String remoteHost, int remotePort) {
// Configure the bootstrap.
serverBootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(executor, executor));
// Set up the event pipeline factory.
ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(executor, executor);
serverBootstrap.setOption("child.tcpNoDelay", true);
serverBootstrap.setOption("child.keepAlive", true);
serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
// synchronized for a race between blocking and creating new handlers
synchronized (networkFailureHandlers) {
NetworkFailureHandler failureHandler = new NetworkFailureHandler(
blocked,
networkFailureHandler -> networkFailureHandlers.remove(networkFailureHandler),
channelFactory,
remoteHost,
remotePort);
networkFailureHandlers.add(failureHandler);
pipeline.addLast(NETWORK_FAILURE_HANDLER_NAME, failureHandler);
}
return pipeline;
}
});
channel = serverBootstrap.bind(new InetSocketAddress(localPort));
LOG.info("Proxying [*:{}] to [{}:{}]", getLocalPort(), remoteHost, remotePort);
}
TestFetcher.java 文件源码
项目:incubator-tajo
阅读 23
收藏 0
点赞 0
评论 0
@Test
public void testGet() throws IOException {
Random rnd = new Random();
FileWriter writer = new FileWriter(INPUT_DIR + "data");
String data;
for (int i = 0; i < 100; i++) {
data = ""+rnd.nextInt();
writer.write(data);
}
writer.flush();
writer.close();
DataRetriever ret = new DirectoryRetriever(INPUT_DIR);
HttpDataServer server = new HttpDataServer(
NetUtils.createSocketAddr("127.0.0.1:0"), ret);
server.start();
InetSocketAddress addr = server.getBindAddress();
URI uri = URI.create("http://127.0.0.1:"+addr.getPort() + "/data");
ClientSocketChannelFactory channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1);
Fetcher fetcher = new Fetcher(uri, new File(OUTPUT_DIR + "data"), channelFactory);
fetcher.get();
server.stop();
FileSystem fs = FileSystem.getLocal(new TajoConf());
FileStatus inStatus = fs.getFileStatus(new Path(INPUT_DIR, "data"));
FileStatus outStatus = fs.getFileStatus(new Path(OUTPUT_DIR, "data"));
assertEquals(inStatus.getLen(), outStatus.getLen());
}
RpcChannelFactory.java 文件源码
项目:incubator-tajo
阅读 26
收藏 0
点赞 0
评论 0
/**
* make this factory static thus all clients can share its thread pool.
* NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
*/
public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory(){
//shared woker and boss pool
if(factory == null){
TajoConf conf = new TajoConf();
int workerNum = conf.getIntVar(TajoConf.ConfVars.INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM);
factory = createClientChannelFactory("Internal-Client", workerNum);
}
return factory;
}
TestFetcher.java 文件源码
项目:tajo-cdh
阅读 22
收藏 0
点赞 0
评论 0
@Test
public void testGet() throws IOException {
Random rnd = new Random();
FileWriter writer = new FileWriter(INPUT_DIR + "data");
String data;
for (int i = 0; i < 100; i++) {
data = ""+rnd.nextInt();
writer.write(data);
}
writer.flush();
writer.close();
DataRetriever ret = new DirectoryRetriever(INPUT_DIR);
HttpDataServer server = new HttpDataServer(
NetUtils.createSocketAddr("127.0.0.1:0"), ret);
server.start();
InetSocketAddress addr = server.getBindAddress();
URI uri = URI.create("http://127.0.0.1:"+addr.getPort() + "/data");
ClientSocketChannelFactory channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1);
Fetcher fetcher = new Fetcher(uri, new File(OUTPUT_DIR + "data"), channelFactory);
fetcher.get();
server.stop();
FileSystem fs = FileSystem.getLocal(new TajoConf());
FileStatus inStatus = fs.getFileStatus(new Path(INPUT_DIR, "data"));
FileStatus outStatus = fs.getFileStatus(new Path(OUTPUT_DIR, "data"));
assertEquals(inStatus.getLen(), outStatus.getLen());
}