public void start(Path outputDir, int numReducers, boolean concurrent) throws Exception {
LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers);
DataStore<Long,CINode> store = DataStoreFactory.getDataStore(Long.class, CINode.class, new Configuration());
job = new Job(getConf());
if (!job.getConfiguration().get("io.serializations").contains("org.apache.hadoop.io.serializer.JavaSerialization")) {
job.getConfiguration().set("io.serializations", job.getConfiguration().get("io.serializations") + ",org.apache.hadoop.io.serializer.JavaSerialization");
}
job.setJobName("Link Verifier");
job.setNumReduceTasks(numReducers);
job.setJarByClass(getClass());
Query<Long,CINode> query = store.newQuery();
if (!concurrent) {
// no concurrency filtering, only need prev field
query.setFields("prev");
} else {
readFlushed(job.getConfiguration());
}
GoraMapper.initMapperJob(job, query, store, LongWritable.class, VLongWritable.class, VerifyMapper.class, true);
job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
job.setReducerClass(VerifyReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, outputDir);
store.close();
job.submit();
}
Verify.java 文件源码
java
阅读 21
收藏 0
点赞 0
评论 0
项目:gora-boot
作者:
评论列表
文章目录