java类org.apache.hadoop.io.ObjectWritable的实例源码

ScoreUpdater.java 文件源码 项目:GeoCrawler 阅读 24 收藏 0 点赞 0 评论 0
/**
 * Changes input into ObjectWritables.
 */
public void map(Text key, Writable value,
    OutputCollector<Text, ObjectWritable> output, Reporter reporter)
    throws IOException {

  ObjectWritable objWrite = new ObjectWritable();
  objWrite.set(value);
  output.collect(key, objWrite);
}
LinkRank.java 文件源码 项目:GeoCrawler 阅读 27 收藏 0 点赞 0 评论 0
/**
 * Runs the inverter job. The inverter job flips outlinks to inlinks to be
 * passed into the analysis job.
 * 
 * @param nodeDb
 *          The node database to use.
 * @param outlinkDb
 *          The outlink database to use.
 * @param output
 *          The output directory.
 * 
 * @throws IOException
 *           If an error occurs while running the inverter job.
 */
private void runInverter(Path nodeDb, Path outlinkDb, Path output)
    throws IOException {

  // configure the inverter
  JobConf inverter = new NutchJob(getConf());
  inverter.setJobName("LinkAnalysis Inverter");
  FileInputFormat.addInputPath(inverter, nodeDb);
  FileInputFormat.addInputPath(inverter, outlinkDb);
  FileOutputFormat.setOutputPath(inverter, output);
  inverter.setInputFormat(SequenceFileInputFormat.class);
  inverter.setMapperClass(Inverter.class);
  inverter.setReducerClass(Inverter.class);
  inverter.setMapOutputKeyClass(Text.class);
  inverter.setMapOutputValueClass(ObjectWritable.class);
  inverter.setOutputKeyClass(Text.class);
  inverter.setOutputValueClass(LinkDatum.class);
  inverter.setOutputFormat(SequenceFileOutputFormat.class);
  inverter.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
      false);

  // run the inverter job
  LOG.info("Starting inverter job");
  try {
    JobClient.runJob(inverter);
  } catch (IOException e) {
    LOG.error(StringUtils.stringifyException(e));
    throw e;
  }
  LOG.info("Finished inverter job.");
}
LinkRank.java 文件源码 项目:GeoCrawler 阅读 27 收藏 0 点赞 0 评论 0
/**
 * Runs the link analysis job. The link analysis job applies the link rank
 * formula to create a score per url and stores that score in the NodeDb.
 * 
 * Typically the link analysis job is run a number of times to allow the link
 * rank scores to converge.
 * 
 * @param nodeDb
 *          The node database from which we are getting previous link rank
 *          scores.
 * @param inverted
 *          The inverted inlinks
 * @param output
 *          The link analysis output.
 * @param iteration
 *          The current iteration number.
 * @param numIterations
 *          The total number of link analysis iterations
 * 
 * @throws IOException
 *           If an error occurs during link analysis.
 */
private void runAnalysis(Path nodeDb, Path inverted, Path output,
    int iteration, int numIterations, float rankOne) throws IOException {

  JobConf analyzer = new NutchJob(getConf());
  analyzer.set("link.analyze.iteration", String.valueOf(iteration + 1));
  analyzer.setJobName("LinkAnalysis Analyzer, iteration " + (iteration + 1)
      + " of " + numIterations);
  FileInputFormat.addInputPath(analyzer, nodeDb);
  FileInputFormat.addInputPath(analyzer, inverted);
  FileOutputFormat.setOutputPath(analyzer, output);
  analyzer.set("link.analyze.rank.one", String.valueOf(rankOne));
  analyzer.setMapOutputKeyClass(Text.class);
  analyzer.setMapOutputValueClass(ObjectWritable.class);
  analyzer.setInputFormat(SequenceFileInputFormat.class);
  analyzer.setMapperClass(Analyzer.class);
  analyzer.setReducerClass(Analyzer.class);
  analyzer.setOutputKeyClass(Text.class);
  analyzer.setOutputValueClass(Node.class);
  analyzer.setOutputFormat(MapFileOutputFormat.class);
  analyzer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
      false);

  LOG.info("Starting analysis job");
  try {
    JobClient.runJob(analyzer);
  } catch (IOException e) {
    LOG.error(StringUtils.stringifyException(e));
    throw e;
  }
  LOG.info("Finished analysis job.");
}
LinkRank.java 文件源码 项目:GeoCrawler 阅读 27 收藏 0 点赞 0 评论 0
/**
 * Convert values to ObjectWritable
 */
public void map(Text key, Writable value,
    OutputCollector<Text, ObjectWritable> output, Reporter reporter)
    throws IOException {

  ObjectWritable objWrite = new ObjectWritable();
  objWrite.set(value);
  output.collect(key, objWrite);
}
LinkRank.java 文件源码 项目:GeoCrawler 阅读 23 收藏 0 点赞 0 评论 0
/**
 * Convert values to ObjectWritable
 */
public void map(Text key, Writable value,
    OutputCollector<Text, ObjectWritable> output, Reporter reporter)
    throws IOException {

  ObjectWritable objWrite = new ObjectWritable();
  objWrite.set(WritableUtils.clone(value, conf));
  output.collect(key, objWrite);
}
LinkDumper.java 文件源码 项目:GeoCrawler 阅读 21 收藏 0 点赞 0 评论 0
/**
 * Wraps all values in ObjectWritables.
 */
public void map(Text key, Writable value,
    OutputCollector<Text, ObjectWritable> output, Reporter reporter)
    throws IOException {

  ObjectWritable objWrite = new ObjectWritable();
  objWrite.set(value);
  output.collect(key, objWrite);
}
LinkDumper.java 文件源码 项目:GeoCrawler 阅读 26 收藏 0 点赞 0 评论 0
/**
 * Inverts outlinks to inlinks while attaching node information to the
 * outlink.
 */
public void reduce(Text key, Iterator<ObjectWritable> values,
    OutputCollector<Text, LinkNode> output, Reporter reporter)
    throws IOException {

  String fromUrl = key.toString();
  List<LinkDatum> outlinks = new ArrayList<LinkDatum>();
  Node node = null;

  // loop through all values aggregating outlinks, saving node
  while (values.hasNext()) {
    ObjectWritable write = values.next();
    Object obj = write.get();
    if (obj instanceof Node) {
      node = (Node) obj;
    } else if (obj instanceof LinkDatum) {
      outlinks.add(WritableUtils.clone((LinkDatum) obj, conf));
    }
  }

  // only collect if there are outlinks
  int numOutlinks = node.getNumOutlinks();
  if (numOutlinks > 0) {
    for (int i = 0; i < outlinks.size(); i++) {
      LinkDatum outlink = outlinks.get(i);
      String toUrl = outlink.getUrl();

      // collect the outlink as an inlink with the node
      output.collect(new Text(toUrl), new LinkNode(fromUrl, node));
    }
  }
}
MapFieldValueFilter.java 文件源码 项目:gora-boot 阅读 30 收藏 0 点赞 0 评论 0
@Override
public void write(DataOutput out) throws IOException {
  Text.writeString(out, fieldName);
  Text.writeString(out, mapKey.toString());
  WritableUtils.writeEnum(out, filterOp);
  WritableUtils.writeVInt(out, operands.size());
  for (Object operand : operands) {
    if (operand instanceof String) {
      throw new IllegalStateException("Use Utf8 instead of String for operands");
    }
    if (operand instanceof Utf8) {
      operand = operand.toString();
    }
    if (operand instanceof Boolean) {
      ObjectWritable.writeObject(out, operand, Boolean.TYPE, conf);
    } else if (operand instanceof Character) {
      ObjectWritable.writeObject(out, operand, Character.TYPE, conf);
    } else if (operand instanceof Byte) {
      ObjectWritable.writeObject(out, operand, Byte.TYPE, conf);
    } else if (operand instanceof Short) {
      ObjectWritable.writeObject(out, operand, Short.TYPE, conf);
    } else if (operand instanceof Integer) {
      ObjectWritable.writeObject(out, operand, Integer.TYPE, conf);
    } else if (operand instanceof Long) {
      ObjectWritable.writeObject(out, operand, Long.TYPE, conf);
    } else if (operand instanceof Float) {
      ObjectWritable.writeObject(out, operand, Float.TYPE, conf);
    } else if (operand instanceof Double) {
      ObjectWritable.writeObject(out, operand, Double.TYPE, conf);
    } else if (operand instanceof Void) {
      ObjectWritable.writeObject(out, operand, Void.TYPE, conf);
    } else {
      ObjectWritable.writeObject(out, operand, operand.getClass(), conf);
    }
  }
  out.writeBoolean(filterIfMissing);
}
SingleFieldValueFilter.java 文件源码 项目:gora-boot 阅读 21 收藏 0 点赞 0 评论 0
@Override
public void write(DataOutput out) throws IOException {
  Text.writeString(out, fieldName);
  WritableUtils.writeEnum(out, filterOp);
  WritableUtils.writeVInt(out, operands.size());
  for (Object operand : operands) {
    if (operand instanceof String) {
      throw new IllegalStateException("Use Utf8 instead of String for operands");
    }
    if (operand instanceof Utf8) {
      operand = operand.toString();
    }
    if (operand instanceof Boolean) {
      ObjectWritable.writeObject(out, operand, Boolean.TYPE, conf);
    } else if (operand instanceof Character) {
      ObjectWritable.writeObject(out, operand, Character.TYPE, conf);
    } else if (operand instanceof Byte) {
      ObjectWritable.writeObject(out, operand, Byte.TYPE, conf);
    } else if (operand instanceof Short) {
      ObjectWritable.writeObject(out, operand, Short.TYPE, conf);
    } else if (operand instanceof Integer) {
      ObjectWritable.writeObject(out, operand, Integer.TYPE, conf);
    } else if (operand instanceof Long) {
      ObjectWritable.writeObject(out, operand, Long.TYPE, conf);
    } else if (operand instanceof Float) {
      ObjectWritable.writeObject(out, operand, Float.TYPE, conf);
    } else if (operand instanceof Double) {
      ObjectWritable.writeObject(out, operand, Double.TYPE, conf);
    } else if (operand instanceof Void) {
      ObjectWritable.writeObject(out, operand, Void.TYPE, conf);
    } else {
      ObjectWritable.writeObject(out, operand, operand.getClass(), conf);
    }
  }
  out.writeBoolean(filterIfMissing);
}
ScoreUpdater.java 文件源码 项目:anthelion 阅读 23 收藏 0 点赞 0 评论 0
/**
 * Changes input into ObjectWritables.
 */
public void map(Text key, Writable value,
  OutputCollector<Text, ObjectWritable> output, Reporter reporter)
  throws IOException {

  ObjectWritable objWrite = new ObjectWritable();
  objWrite.set(value);
  output.collect(key, objWrite);
}


问题


面经


文章

微信
公众号

扫码关注公众号