TestDatasetSink.java 文件源码

java
阅读 40 收藏 0 点赞 0 评论 0

项目:flume-release-1.7.0 作者:
@Test
public void testSerializedWithIncompatibleSchemasWithSavePolicy()
    throws EventDeliveryException {
  if (Datasets.exists(ERROR_DATASET_URI)) {
    Datasets.delete(ERROR_DATASET_URI);
  }
  config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY,
      DatasetSinkConstants.SAVE_FAILURE_POLICY);
  config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI,
      ERROR_DATASET_URI);
  final DatasetSink sink = sink(in, config);

  GenericRecordBuilder builder = new GenericRecordBuilder(
      INCOMPATIBLE_SCHEMA);
  GenericData.Record rec = builder.set("username", "koala").build();

  // We pass in a valid schema in the header, but an incompatible schema
  // was used to serialize the record
  Event badEvent = event(rec, INCOMPATIBLE_SCHEMA, SCHEMA_FILE, true);
  putToChannel(in, badEvent);

  // run the sink
  sink.start();
  sink.process();
  sink.stop();

  Assert.assertEquals("Good records should have been written",
      Sets.newHashSet(expected),
      read(Datasets.load(FILE_DATASET_URI)));
  Assert.assertEquals("Should not have rolled back", 0, remaining(in));
  Assert.assertEquals("Should have saved the bad event",
      Sets.newHashSet(AvroFlumeEvent.newBuilder()
        .setBody(ByteBuffer.wrap(badEvent.getBody()))
        .setHeaders(toUtf8Map(badEvent.getHeaders()))
        .build()),
      read(Datasets.load(ERROR_DATASET_URI, AvroFlumeEvent.class)));
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号