@Override
protected int getRecordsInTarget() {
int expectedRecordsInTarget = 0;
for(KafkaStream<byte[], byte[]> kafkaStream : kafkaStreams) {
ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
try {
while (it.hasNext()) {
expectedRecordsInTarget++;
it.next();
}
} catch (kafka.consumer.ConsumerTimeoutException e) {
//no-op
}
}
return expectedRecordsInTarget;
}
java类kafka.consumer.ConsumerIterator的实例源码
KafkaDestinationMultiPartitionPipelineRunIT.java 文件源码
项目:datacollector
阅读 15
收藏 0
点赞 0
评论 0
DataBrokerTest.java 文件源码
项目:netty-kafka-producer
阅读 19
收藏 0
点赞 0
评论 0
@Test
public void test_sendMessage() throws Exception {
createTopic(topic);
CountDownLatch latch = new CountDownLatch(1);
ProducerProperties properties = new ProducerProperties();
properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);
DataKafkaBroker dataChannel = new DataKafkaBroker("localhost", START_PORT, 0, topic,new NioEventLoopGroup(), properties);
dataChannel.connect().sync();
dataChannel.send(freeLaterBuffer("1".getBytes()), 0, freeLaterBuffer(TEST_MESSAGE.getBytes()));
final KafkaStream<byte[], byte[]> stream = consume(topic).get(0);
final ConsumerIterator<byte[], byte[]> messages = stream.iterator();
Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE));
dataChannel.disconnect();
}
ListenerTest.java 文件源码
项目:netty-kafka-producer
阅读 19
收藏 0
点赞 0
评论 0
@Test
public void test_no_acks_send_message() throws Exception {
String topic = "test_no_acks_send_message";
createTopic(topic, 1);
ProducerProperties properties = new ProducerProperties();
properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);
properties.override(ProducerProperties.DATA_ACK, Acknowledgment.WAIT_FOR_NO_ONE);
KafkaProducer producer = new KafkaProducer("localhost", START_PORT, topic, properties);
producer.connect().sync();
KafkaTopic kafkaTopic = producer.topic();
kafkaTopic.send(null, freeLaterBuffer(TEST_MESSAGE.getBytes()));
final List<KafkaStream<byte[], byte[]>> consume = consume(topic);
final KafkaStream<byte[], byte[]> stream = consume.get(0);
final ConsumerIterator<byte[], byte[]> messages = stream.iterator();
Assert.assertThat(TEST_MESSAGE, is(new String(messages.next().message())));
producer.disconnect().sync();
}
KafkaTopicSingleBrokerTest.java 文件源码
项目:netty-kafka-producer
阅读 15
收藏 0
点赞 0
评论 0
@Test
public void test_producer() throws Exception {
String topic = "test";
ProducerProperties properties = new ProducerProperties();
properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);
createTopic(topic);
KafkaProducer producer = new KafkaProducer("localhost", START_PORT, topic, properties);
producer.connect().sync();
KafkaTopic kafkaTopic = producer.topic();
kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "01").getBytes()));
kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "02").getBytes()));
kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "03").getBytes()));
final KafkaStream<byte[], byte[]> stream = consume(topic).get(0);
final ConsumerIterator<byte[], byte[]> messages = stream.iterator();
Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "01"));
Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "02"));
Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "03"));
producer.disconnect().sync();
}
ConsumerWorker.java 文件源码
项目:yuzhouwan
阅读 18
收藏 0
点赞 0
评论 0
@Override
public void run() {
ConsumerIterator<byte[], byte[]> iter = kafkaStream.iterator();
MessageAndMetadata<byte[], byte[]> msg;
int total = 0, fail = 0, success = 0;
long start = System.currentTimeMillis();
while (iter.hasNext()) {
try {
msg = iter.next();
_log.info("Thread {}: {}", threadNum, new String(msg.message(), "utf-8"));
_log.info("partition: {}, offset: {}", msg.partition(), msg.offset());
success++;
} catch (Exception e) {
_log.error("{}", e);
fail++;
}
_log.info("Count [fail/success/total]: [{}/{}/{}], Time: {}s", fail, success, ++total,
(System.currentTimeMillis() - start) / 1000);
}
}
KafkaLogAppenderTest.java 文件源码
项目:java-kafka
阅读 17
收藏 0
点赞 0
评论 0
@Test
public void testKafkaLogAppender() {
Properties consumerProps = new Properties();
consumerProps.put("zookeeper.connect", zookeeper);
consumerProps.put("group.id", "kafka-log-appender-test");
consumerProps.put("auto.offset.reset", "smallest");
consumerProps.put("schema.registry.url", schemaRegistry);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, 1);
ConsumerIterator<String, Object> iterator = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps))
.createMessageStreams(topicMap, new StringDecoder(null), new KafkaAvroDecoder(new VerifiableProperties(consumerProps)))
.get(topic).get(0).iterator();
String testMessage = "I am a test message";
logger.info(testMessage);
MessageAndMetadata<String, Object> messageAndMetadata = iterator.next();
GenericRecord logLine = (GenericRecord) messageAndMetadata.message();
assertEquals(logLine.get("line").toString(), testMessage);
assertEquals(logLine.get("logtypeid"), KafkaLogAppender.InfoLogTypeId);
assertNotNull(logLine.get("source"));
assertEquals(((Map<CharSequence, Object>) logLine.get("timings")).size(), 1);
assertEquals(((Map<CharSequence, Object>) logLine.get("tag")).size(), 2);
}
KafkaSpoutTest.java 文件源码
项目:LogRTA
阅读 21
收藏 0
点赞 0
评论 0
public void activate() {
consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
Map<String,Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, 1);
System.out.println("*********Results********topic:"+topic);
Map<String, List<KafkaStream<byte[],byte[]>>> streamMap=consumer.createMessageStreams(topickMap);
KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
ConsumerIterator<byte[],byte[]> it =stream.iterator();
while(it.hasNext()){
String value =new String(it.next().message());
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd日 HH:mm:ss SSS");
Date curDate = new Date(System.currentTimeMillis());
String str = formatter.format(curDate);
System.out.println("storm接收到来自kafka的消息--->" + value);
collector.emit(new Values(value,1,str), value);
}
}
ConsumerThread.java 文件源码
项目:java-kafka-client-libs
阅读 22
收藏 0
点赞 0
评论 0
@Override
public void run() {
LOG.info( "Consuming thread started" );
try {
ConsumerIterator<byte[], byte[]> it = _stream.iterator();
while ( it.hasNext() ) {
long start = System.currentTimeMillis();
byte[] message = it.next().message();
LOG.debug( "message received: {}", ( new String( message ) ) );
_handler.onMessage( message );
long time = System.currentTimeMillis() - start;
KruxStdLib.STATSD.time( "message_received." + _topic, time );
}
} catch ( Exception e ) {
if ( e instanceof InterruptedException ) {
LOG.warn( "Consumer group threads interrupted, shutting down" );
} else {
LOG.error( "no longer fetching messages", e );
}
}
}
SimpleHLConsumer.java 文件源码
项目:java.study
阅读 19
收藏 0
点赞 0
评论 0
public void testConsumer() throws Exception {
String fileName = "logX.txt";
BufferedWriter out = new BufferedWriter(new FileWriter(fileName));
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
String msg = new String(it.next().message(), Charset.forName("UTF-8"));
System.out.println("Message from Single Topic: " + msg);
out.write(msg, 0, msg.length());
out.write('\n');
out.flush();
}
}
if (consumer != null) {
consumer.shutdown();
}
if (null != out) {
out.close();
}
}
KafkaSpout.java 文件源码
项目:monasca-thresh
阅读 17
收藏 0
点赞 0
评论 0
@Override
public void run() {
while (this.shouldContinue) {
final ConsumerIterator<byte[], byte[]> it = streams.get(0).iterator();
if (it.hasNext()) {
final byte[] message = it.next().message();
synchronized (this) {
this.message = message;
// Wake up getMessage() if it is waiting
if (this.waiting) {
notify();
}
while (this.message != null && this.shouldContinue)
try {
wait();
} catch (InterruptedException e) {
logger.info("Wait interrupted", e);
}
}
}
}
logger.info("readerThread {} exited", this.readerThread.getName());
this.readerThread = null;
}