@Test(timeout = 4000)
public void testSendMapCount() throws Exception {
final List<ShuffleHandler.ReduceMapFileCount> listenerList =
new ArrayList<ShuffleHandler.ReduceMapFileCount>();
final ChannelHandlerContext mockCtx =
Mockito.mock(ChannelHandlerContext.class);
final MessageEvent mockEvt = Mockito.mock(MessageEvent.class);
final Channel mockCh = Mockito.mock(AbstractChannel.class);
// Mock HttpRequest and ChannelFuture
final HttpRequest mockHttpRequest = createMockHttpRequest();
final ChannelFuture mockFuture = createMockChannelFuture(mockCh,
listenerList);
// Mock Netty Channel Context and Channel behavior
Mockito.doReturn(mockCh).when(mockCtx).getChannel();
Mockito.when(mockCtx.getChannel()).thenReturn(mockCh);
Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class));
Mockito.when(mockCh.write(Object.class)).thenReturn(mockFuture);
//Mock MessageEvent behavior
Mockito.doReturn(mockCh).when(mockEvt).getChannel();
Mockito.when(mockEvt.getChannel()).thenReturn(mockCh);
Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage();
final ShuffleHandler sh = new MockShuffleHandler();
Configuration conf = new Configuration();
sh.init(conf);
sh.start();
int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES,
ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
sh.getShuffle(conf).messageReceived(mockCtx, mockEvt);
assertTrue("Number of Open files should not exceed the configured " +
"value!-Not Expected",
listenerList.size() <= maxOpenFiles);
while(!listenerList.isEmpty()) {
listenerList.remove(0).operationComplete(mockFuture);
assertTrue("Number of Open files should not exceed the configured " +
"value!-Not Expected",
listenerList.size() <= maxOpenFiles);
}
sh.close();
}
java类org.jboss.netty.channel.AbstractChannel的实例源码
TestShuffleHandler.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 32
收藏 0
点赞 0
评论 0
TestShuffleHandler.java 文件源码
项目:hops
阅读 34
收藏 0
点赞 0
评论 0
@Test(timeout = 4000)
public void testSendMapCount() throws Exception {
final List<ShuffleHandler.ReduceMapFileCount> listenerList =
new ArrayList<ShuffleHandler.ReduceMapFileCount>();
final ChannelHandlerContext mockCtx =
Mockito.mock(ChannelHandlerContext.class);
final MessageEvent mockEvt = Mockito.mock(MessageEvent.class);
final Channel mockCh = Mockito.mock(AbstractChannel.class);
final ChannelPipeline mockPipeline = Mockito.mock(ChannelPipeline.class);
// Mock HttpRequest and ChannelFuture
final HttpRequest mockHttpRequest = createMockHttpRequest();
final ChannelFuture mockFuture = createMockChannelFuture(mockCh,
listenerList);
final ShuffleHandler.TimeoutHandler timerHandler =
new ShuffleHandler.TimeoutHandler();
// Mock Netty Channel Context and Channel behavior
Mockito.doReturn(mockCh).when(mockCtx).getChannel();
Mockito.when(mockCh.getPipeline()).thenReturn(mockPipeline);
Mockito.when(mockPipeline.get(
Mockito.any(String.class))).thenReturn(timerHandler);
Mockito.when(mockCtx.getChannel()).thenReturn(mockCh);
Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class));
Mockito.when(mockCh.write(Object.class)).thenReturn(mockFuture);
//Mock MessageEvent behavior
Mockito.doReturn(mockCh).when(mockEvt).getChannel();
Mockito.when(mockEvt.getChannel()).thenReturn(mockCh);
Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage();
final ShuffleHandler sh = new MockShuffleHandler();
Configuration conf = new Configuration();
sh.init(conf);
sh.start();
int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES,
ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
sh.getShuffle(conf).messageReceived(mockCtx, mockEvt);
assertTrue("Number of Open files should not exceed the configured " +
"value!-Not Expected",
listenerList.size() <= maxOpenFiles);
while(!listenerList.isEmpty()) {
listenerList.remove(0).operationComplete(mockFuture);
assertTrue("Number of Open files should not exceed the configured " +
"value!-Not Expected",
listenerList.size() <= maxOpenFiles);
}
sh.close();
}
TestShuffleHandler.java 文件源码
项目:tez
阅读 36
收藏 0
点赞 0
评论 0
@Test(timeout = 4000)
public void testSendMapCount() throws Exception {
final List<ShuffleHandler.ReduceMapFileCount> listenerList =
new ArrayList<ShuffleHandler.ReduceMapFileCount>();
final ChannelHandlerContext mockCtx =
mock(ChannelHandlerContext.class);
final MessageEvent mockEvt = mock(MessageEvent.class);
final Channel mockCh = mock(AbstractChannel.class);
final ChannelPipeline mockPipeline = Mockito.mock(ChannelPipeline.class);
// Mock HttpRequest and ChannelFuture
final HttpRequest mockHttpRequest = createMockHttpRequest();
final ChannelFuture mockFuture = createMockChannelFuture(mockCh,
listenerList);
final ShuffleHandler.TimeoutHandler timerHandler =
new ShuffleHandler.TimeoutHandler();
// Mock Netty Channel Context and Channel behavior
Mockito.doReturn(mockCh).when(mockCtx).getChannel();
Mockito.when(mockCh.getPipeline()).thenReturn(mockPipeline);
Mockito.when(mockPipeline.get(Mockito.any(String.class))).thenReturn(timerHandler);
when(mockCtx.getChannel()).thenReturn(mockCh);
Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class));
when(mockCh.write(Object.class)).thenReturn(mockFuture);
//Mock MessageEvent behavior
Mockito.doReturn(mockCh).when(mockEvt).getChannel();
when(mockEvt.getChannel()).thenReturn(mockCh);
Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage();
final ShuffleHandler sh = new MockShuffleHandler();
Configuration conf = new Configuration();
// The Shuffle handler port associated with the service is bound to but not used.
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
sh.init(conf);
sh.start();
int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES,
ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
sh.getShuffle(conf).messageReceived(mockCtx, mockEvt);
assertTrue("Number of Open files should not exceed the configured " +
"value!-Not Expected",
listenerList.size() <= maxOpenFiles);
while(!listenerList.isEmpty()) {
listenerList.remove(0).operationComplete(mockFuture);
assertTrue("Number of Open files should not exceed the configured " +
"value!-Not Expected",
listenerList.size() <= maxOpenFiles);
}
sh.close();
}