public MockKafkaStream(int numStreams) {
this.queues = Lists.newArrayList();
this.mockStreams = Lists.newArrayList();
this.offsets = Lists.newArrayList();
for (int i = 0; i < numStreams; i++) {
BlockingQueue<FetchedDataChunk> queue = Queues.newLinkedBlockingQueue();
this.queues.add(queue);
this.mockStreams.add(createMockStream(queue));
this.offsets.add(new AtomicLong(0));
}
this.nextStream = new AtomicLong(-1);
}
java类kafka.consumer.FetchedDataChunk的实例源码
MockKafkaStream.java 文件源码
项目:incubator-gobblin
阅读 21
收藏 0
点赞 0
评论 0
MockKafkaStream.java 文件源码
项目:incubator-gobblin
阅读 24
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
private static KafkaStream<byte[], byte[]> createMockStream(BlockingQueue<FetchedDataChunk> queue) {
KafkaStream<byte[], byte[]> stream = (KafkaStream<byte[], byte[]>) Mockito.mock(KafkaStream.class);
ConsumerIterator<byte[], byte[]> it =
new ConsumerIterator<>(queue, -1, new DefaultDecoder(new VerifiableProperties()), new DefaultDecoder(new VerifiableProperties()), "clientId");
Mockito.when(stream.iterator()).thenReturn(it);
return stream;
}
MockKafkaStream.java 文件源码
项目:incubator-gobblin
阅读 23
收藏 0
点赞 0
评论 0
public void pushToStream(String message) {
int streamNo = (int) this.nextStream.incrementAndGet() % this.queues.size();
AtomicLong offset = this.offsets.get(streamNo);
BlockingQueue<FetchedDataChunk> queue = this.queues.get(streamNo);
AtomicLong thisOffset = new AtomicLong(offset.incrementAndGet());
List<Message> seq = Lists.newArrayList();
seq.add(new Message(message.getBytes(Charsets.UTF_8)));
ByteBufferMessageSet messageSet = new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, offset, JavaConversions.asScalaBuffer(seq));
FetchedDataChunk chunk = new FetchedDataChunk(messageSet,
new PartitionTopicInfo("topic", streamNo, queue, thisOffset, thisOffset, new AtomicInteger(1), "clientId"),
thisOffset.get());
queue.add(chunk);
}
MockKafkaStream.java 文件源码
项目:incubator-gobblin
阅读 22
收藏 0
点赞 0
评论 0
public void shutdown() {
for (BlockingQueue<FetchedDataChunk> queue : this.queues) {
queue.add(ZookeeperConsumerConnector.shutdownCommand());
}
}