java类org.apache.hadoop.io.NullWritable的实例源码

MDSHiveLineReader.java 文件源码 项目:multiple-dimension-spread 阅读 20 收藏 0 点赞 0 评论 0
@Override
public boolean next( final NullWritable key, final ColumnAndIndex value ) throws IOException {
  if( currentSpread == null || currentIndex == currentIndexList.size() ){
    if( ! nextReader() ){
      updateCounter( reader.getReadStats() );
      isEnd = true;
      return false;
    }
  }

  spreadColumn.setSpread( currentSpread );
  value.column = spreadColumn;
  value.index =  currentIndexList.get( currentIndex );
  value.columnIndex = spreadCounter.get();
  currentIndex++;
  return true;
}
DistSum.java 文件源码 项目:hadoop 阅读 19 收藏 0 点赞 0 评论 0
/** {@inheritDoc} */
@Override
public void init(Job job) {
  // setup mapper
  job.setMapperClass(PartitionMapper.class);
  job.setMapOutputKeyClass(IntWritable.class);
  job.setMapOutputValueClass(SummationWritable.class);

  // setup partitioner
  job.setPartitionerClass(IndexPartitioner.class);

  // setup reducer
  job.setReducerClass(SummingReducer.class);
  job.setOutputKeyClass(NullWritable.class);
  job.setOutputValueClass(TaskResult.class);
  final Configuration conf = job.getConfiguration();
  final int nParts = conf.getInt(N_PARTS, 1);
  job.setNumReduceTasks(nParts);

  // setup input
  job.setInputFormatClass(SummationInputFormat.class);
}
TestJoinDatamerge.java 文件源码 项目:hadoop 阅读 21 收藏 0 点赞 0 评论 0
public void testEmptyJoin() throws Exception {
  Configuration conf = new Configuration();
  Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
  Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
  conf.set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose("outer",
      MapReduceTestUtil.Fake_IF.class, src));
  MapReduceTestUtil.Fake_IF.setKeyClass(conf, 
    MapReduceTestUtil.IncomparableKey.class);
  Job job = Job.getInstance(conf);
  job.setInputFormatClass(CompositeInputFormat.class);
  FileOutputFormat.setOutputPath(job, new Path(base, "out"));

  job.setMapperClass(Mapper.class);
  job.setReducerClass(Reducer.class);
  job.setOutputKeyClass(MapReduceTestUtil.IncomparableKey.class);
  job.setOutputValueClass(NullWritable.class);

  job.waitForCompletion(true);
  assertTrue(job.isSuccessful());
  base.getFileSystem(conf).delete(base, true);
}
AvroImportMapper.java 文件源码 项目:aliyun-maxcompute-data-collectors 阅读 16 收藏 0 点赞 0 评论 0
@Override
protected void map(LongWritable key, SqoopRecord val, Context context)
    throws IOException, InterruptedException {

  try {
    // Loading of LOBs was delayed until we have a Context.
    val.loadLargeObjects(lobLoader);
  } catch (SQLException sqlE) {
    throw new IOException(sqlE);
  }

  GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(),
      schema, bigDecimalFormatString);
  wrapper.datum(outKey);
  context.write(wrapper, NullWritable.get());
}
PostgreSQLCopyExportJob.java 文件源码 项目:aliyun-maxcompute-data-collectors 阅读 18 收藏 0 点赞 0 评论 0
@Override
protected void configureMapper(Job job, String tableName,
    String tableClassName) throws ClassNotFoundException, IOException {
  if (isHCatJob) {
    throw new IOException("Sqoop-HCatalog Integration is not supported.");
  }
  switch (getInputFileType()) {
    case AVRO_DATA_FILE:
      throw new IOException("Avro data file is not supported.");
    case SEQUENCE_FILE:
    case UNKNOWN:
    default:
      job.setMapperClass(getMapperClass());
  }

  // Concurrent writes of the same records would be problematic.
  ConfigurationHelper.setJobMapSpeculativeExecution(job, false);
  job.setMapOutputKeyClass(NullWritable.class);
  job.setMapOutputValueClass(NullWritable.class);
}
TotalOrderPartitioner.java 文件源码 项目:hadoop 阅读 21 收藏 0 点赞 0 评论 0
/**
 * Read the cut points from the given IFile.
 * @param fs The file system
 * @param p The path to read
 * @param keyClass The map output key class
 * @param job The job config
 * @throws IOException
 */
                               // matching key types enforced by passing in
@SuppressWarnings("unchecked") // map output key class
private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
    Configuration conf) throws IOException {
  SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
  ArrayList<K> parts = new ArrayList<K>();
  K key = ReflectionUtils.newInstance(keyClass, conf);
  NullWritable value = NullWritable.get();
  try {
    while (reader.next(key, value)) {
      parts.add(key);
      key = ReflectionUtils.newInstance(keyClass, conf);
    }
    reader.close();
    reader = null;
  } finally {
    IOUtils.cleanup(LOG, reader);
  }
  return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
}
OdpsExportMapper.java 文件源码 项目:aliyun-maxcompute-data-collectors 阅读 18 收藏 0 点赞 0 评论 0
public void map(LongWritable key, Record val, Context context) 
    throws IOException, InterruptedException{
  try {
    odpsImpl.parse(val);
    context.write(odpsImpl, NullWritable.get());
  } catch (Exception e) {
    LOG.error("Exception raised during data export");
    LOG.error("Exception: ", e);

    LOG.error("On input: " + val);
    LOG.error("At position " + key);
    InputSplit is = context.getInputSplit();
    LOG.error("");
    LOG.error("Currently processing split:");
    LOG.error(is);
    LOG.error("");
    LOG.error("This issue might not necessarily be caused by current input");
    LOG.error("due to the batching nature of export.");
    LOG.error("");
    throw new IOException("Can't export data, please check failed map task logs", e);
  }
}
TestFileOutputCommitter.java 文件源码 项目:hadoop 阅读 19 收藏 0 点赞 0 评论 0
private void writeOutput(RecordWriter theRecordWriter,
    TaskAttemptContext context) throws IOException, InterruptedException {
  NullWritable nullWritable = NullWritable.get();

  try {
    theRecordWriter.write(key1, val1);
    theRecordWriter.write(null, nullWritable);
    theRecordWriter.write(null, val1);
    theRecordWriter.write(nullWritable, val2);
    theRecordWriter.write(key2, nullWritable);
    theRecordWriter.write(key1, null);
    theRecordWriter.write(null, null);
    theRecordWriter.write(key2, val2);
  } finally {
    theRecordWriter.close(context);
  }
}
AvroRecordReader.java 文件源码 项目:aliyun-maxcompute-data-collectors 阅读 23 收藏 0 点赞 0 评论 0
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
  if (!reader.hasNext() || reader.pastSync(end)) {
    key = null;
    value = null;
    return false;
  }
  if (key == null) {
    key = new AvroWrapper<T>();
  }
  if (value == null) {
    value = NullWritable.get();
  }
  key.datum(reader.next(key.datum()));
  return true;
}
OrcCompactionReducer.java 文件源码 项目:dataSqueeze 阅读 20 收藏 0 点赞 0 评论 0
/**
 * {@inheritDoc}
 */
protected void reduce(final Text key, final Iterable<OrcValue> values, final Context context) throws IOException, InterruptedException {
    final Configuration configuration = context.getConfiguration();
    final String sourcePath = configuration.get("compactionSourcePath");
    final String targetPath = configuration.get("compactionTargetPath");

    // Reducer stores data at the target directory retaining the directory structure of files
    String filePath = key.toString().replace(sourcePath, targetPath);
    if (key.toString().endsWith("/")) {
        filePath = filePath.concat("file");
    }

    log.info("Compaction output path {}", filePath);
    final URI uri = URI.create(filePath);
    final MultipleOutputs multipleOutputs = new MultipleOutputs<NullWritable, OrcValue>(context);
    try {
        for (final OrcValue text : values) {
            multipleOutputs.write(NullWritable.get(), text, uri.toString());
        }
    } finally {
        multipleOutputs.close();
    }
}
GenerateDistCacheData.java 文件源码 项目:hadoop 阅读 20 收藏 0 点赞 0 评论 0
@Override
public Job call() throws IOException, InterruptedException,
                         ClassNotFoundException {
  UserGroupInformation ugi = UserGroupInformation.getLoginUser();
  ugi.doAs( new PrivilegedExceptionAction <Job>() {
     public Job run() throws IOException, ClassNotFoundException,
                             InterruptedException {
      job.setMapperClass(GenDCDataMapper.class);
      job.setNumReduceTasks(0);
      job.setMapOutputKeyClass(NullWritable.class);
      job.setMapOutputValueClass(BytesWritable.class);
      job.setInputFormatClass(GenDCDataFormat.class);
      job.setOutputFormatClass(NullOutputFormat.class);
      job.setJarByClass(GenerateDistCacheData.class);
      try {
        FileInputFormat.addInputPath(job, new Path("ignored"));
      } catch (IOException e) {
        LOG.error("Error while adding input path ", e);
      }
      job.submit();
      return job;
    }
  });
  return job;
}
TestFileOutputCommitter.java 文件源码 项目:hadoop 阅读 19 收藏 0 点赞 0 评论 0
private void writeOutput(RecordWriter theRecordWriter,
    TaskAttemptContext context) throws IOException, InterruptedException {
  NullWritable nullWritable = NullWritable.get();

  try {
    theRecordWriter.write(key1, val1);
    theRecordWriter.write(null, nullWritable);
    theRecordWriter.write(null, val1);
    theRecordWriter.write(nullWritable, val2);
    theRecordWriter.write(key2, nullWritable);
    theRecordWriter.write(key1, null);
    theRecordWriter.write(null, null);
    theRecordWriter.write(key2, val2);
  } finally {
    theRecordWriter.close(null);
  }
}
VPMapOnlyMapper.java 文件源码 项目:PigSPARQL 阅读 19 收藏 0 点赞 0 评论 0
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String[] parsedTriple = rdfParser.parseTriple(value.toString());
    if (parsedTriple != null) {
        // Convert liters to Pig Types, if possible
        parsedTriple[2] = Util.toPigTypes(parsedTriple[2]);
        // Use Predicate for Vertical Partitioning
        multipleOutputs.write(NullWritable.get(), new Text(parsedTriple[0] + "\t" + parsedTriple[2]),
                Util.generateFileName(parsedTriple[1]));
        // Write all parsed triples also to "inputData" for queries where Predicate is not known
        multipleOutputs.write(NullWritable.get(), new Text(parsedTriple[0] + "\t" + parsedTriple[1] + "\t" + parsedTriple[2]),
                Util.generateFileName("inputData"));
        context.getCounter("RDF Dataset Properties", VALID_TRIPLES).increment(1);
    } else {
        if (value.getLength() == 0 || value.toString().startsWith("@")) {
            System.out.println("IGNORING: " + value);
            context.getCounter("RDF Dataset Properties", IGNORED_LINES).increment(1);
        } else {
            System.out.println("DISCARDED: " + value);
            context.getCounter("RDF Dataset Properties", INVALID_TRIPLES).increment(1);
        }
    }
}
TestDatamerge.java 文件源码 项目:hadoop 阅读 21 收藏 0 点赞 0 评论 0
public void testEmptyJoin() throws Exception {
  JobConf job = new JobConf();
  Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
  Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
  job.set("mapreduce.join.expr", CompositeInputFormat.compose("outer",
      Fake_IF.class, src));
  job.setInputFormat(CompositeInputFormat.class);
  FileOutputFormat.setOutputPath(job, new Path(base, "out"));

  job.setMapperClass(IdentityMapper.class);
  job.setReducerClass(IdentityReducer.class);
  job.setOutputKeyClass(IncomparableKey.class);
  job.setOutputValueClass(NullWritable.class);

  JobClient.runJob(job);
  base.getFileSystem(job).delete(base, true);
}
TestReduceFetchFromPartialMem.java 文件源码 项目:hadoop 阅读 20 收藏 0 点赞 0 评论 0
public RecordReader<NullWritable,NullWritable> getRecordReader(
    InputSplit ignored, JobConf conf, Reporter reporter) {
  return new RecordReader<NullWritable,NullWritable>() {
    private boolean done = false;
    public boolean next(NullWritable key, NullWritable value)
        throws IOException {
      if (done)
        return false;
      done = true;
      return true;
    }
    public NullWritable createKey() { return NullWritable.get(); }
    public NullWritable createValue() { return NullWritable.get(); }
    public long getPos() throws IOException { return 0L; }
    public void close() throws IOException { }
    public float getProgress() throws IOException { return 0.0f; }
  };
}
CredentialsTestJob.java 文件源码 项目:hadoop 阅读 20 收藏 0 点赞 0 评论 0
public Job createJob() 
throws IOException {
  Configuration conf = getConf();
  conf.setInt(MRJobConfig.NUM_MAPS, 1);
  Job job = Job.getInstance(conf, "test");
  job.setNumReduceTasks(1);
  job.setJarByClass(CredentialsTestJob.class);
  job.setNumReduceTasks(1);
  job.setMapperClass(CredentialsTestJob.CredentialsTestMapper.class);
  job.setMapOutputKeyClass(IntWritable.class);
  job.setMapOutputValueClass(NullWritable.class);
  job.setReducerClass(CredentialsTestJob.CredentialsTestReducer.class);
  job.setInputFormatClass(SleepJob.SleepInputFormat.class);
  job.setPartitionerClass(SleepJob.SleepJobPartitioner.class);
  job.setOutputFormatClass(NullOutputFormat.class);
  job.setSpeculativeExecution(false);
  job.setJobName("test job");
  FileInputFormat.addInputPath(job, new Path("ignored"));
  return job;
}
TestTotalOrderPartitioner.java 文件源码 项目:hadoop 阅读 31 收藏 0 点赞 0 评论 0
public void testTotalOrderBinarySearch() throws Exception {
  TotalOrderPartitioner<Text,NullWritable> partitioner =
    new TotalOrderPartitioner<Text,NullWritable>();
  Configuration conf = new Configuration();
  Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
      "totalorderbinarysearch", conf, splitStrings);
  conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
  conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, Text.class, Object.class);
  try {
    partitioner.setConf(conf);
    NullWritable nw = NullWritable.get();
    for (Check<Text> chk : testStrings) {
      assertEquals(chk.data.toString(), chk.part,
          partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
    }
  } finally {
    p.getFileSystem(conf).delete(p, true);
  }
}
TestTableSnapshotInputFormat.java 文件源码 项目:ditb 阅读 25 收藏 0 点赞 0 评论 0
@Override
protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
    int numRegions, int expectedNumSplits) throws Exception {
  setupCluster();
  TableName tableName = TableName.valueOf("testWithMockedMapReduce");
  try {
    createTableAndSnapshot(
      util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);

    JobConf job = new JobConf(util.getConfiguration());
    Path tmpTableDir = util.getRandomDir();

    TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
      COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
      NullWritable.class, job, false, tmpTableDir);

    // mapred doesn't support start and end keys? o.O
    verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());

  } finally {
    util.getHBaseAdmin().deleteSnapshot(snapshotName);
    util.deleteTable(tableName);
    tearDownCluster();
  }
}
TestInputSampler.java 文件源码 项目:hadoop 阅读 20 收藏 0 点赞 0 评论 0
/**
 * Verify IntervalSampler contract, that samples are taken at regular
 * intervals from the given splits.
 */
@Test
@SuppressWarnings("unchecked") // IntWritable comparator not typesafe
public void testIntervalSampler() throws Exception {
  final int TOT_SPLITS = 16;
  final int PER_SPLIT_SAMPLE = 4;
  final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE;
  final double FREQ = 1.0 / TOT_SPLITS;
  InputSampler.Sampler<IntWritable,NullWritable> sampler =
    new InputSampler.IntervalSampler<IntWritable,NullWritable>(
        FREQ, NUM_SAMPLES);
  int inits[] = new int[TOT_SPLITS];
  for (int i = 0; i < TOT_SPLITS; ++i) {
    inits[i] = i;
  }
  Job ignored = Job.getInstance();
  Object[] samples = sampler.getSample(new TestInputSamplerIF(
        NUM_SAMPLES, TOT_SPLITS, inits), ignored);
  assertEquals(NUM_SAMPLES, samples.length);
  Arrays.sort(samples, new IntWritable.Comparator());
  for (int i = 0; i < NUM_SAMPLES; ++i) {
    assertEquals(i, ((IntWritable)samples[i]).get());
  }
}
TestInputSampler.java 文件源码 项目:hadoop 阅读 19 收藏 0 点赞 0 评论 0
/**
 * Verify IntervalSampler in mapred.lib.InputSampler, which is added back
 * for binary compatibility of M/R 1.x
 */
@Test (timeout = 30000)
@SuppressWarnings("unchecked") // IntWritable comparator not typesafe
public void testMapredIntervalSampler() throws Exception {
  final int TOT_SPLITS = 16;
  final int PER_SPLIT_SAMPLE = 4;
  final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE;
  final double FREQ = 1.0 / TOT_SPLITS;
  org.apache.hadoop.mapred.lib.InputSampler.Sampler<IntWritable,NullWritable>
      sampler = new org.apache.hadoop.mapred.lib.InputSampler.IntervalSampler
          <IntWritable,NullWritable>(FREQ, NUM_SAMPLES);
  int inits[] = new int[TOT_SPLITS];
  for (int i = 0; i < TOT_SPLITS; ++i) {
    inits[i] = i;
  }
  Job ignored = Job.getInstance();
  Object[] samples = sampler.getSample(new TestInputSamplerIF(
        NUM_SAMPLES, TOT_SPLITS, inits), ignored);
  assertEquals(NUM_SAMPLES, samples.length);
  Arrays.sort(samples, new IntWritable.Comparator());
  for (int i = 0; i < NUM_SAMPLES; ++i) {
    assertEquals(i,
        ((IntWritable)samples[i]).get());
  }
}
TestTotalOrderPartitioner.java 文件源码 项目:hadoop 阅读 26 收藏 0 点赞 0 评论 0
private static <T extends WritableComparable<?>> Path writePartitionFile(
    String testname, Configuration conf, T[] splits) throws IOException {
  final FileSystem fs = FileSystem.getLocal(conf);
  final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
                               ).makeQualified(fs);
  Path p = new Path(testdir, testname + "/_partition.lst");
  TotalOrderPartitioner.setPartitionFile(conf, p);
  conf.setInt(MRJobConfig.NUM_REDUCES, splits.length + 1);
  SequenceFile.Writer w = null;
  try {
    w = SequenceFile.createWriter(fs, conf, p,
        splits[0].getClass(), NullWritable.class,
        SequenceFile.CompressionType.NONE);
    for (int i = 0; i < splits.length; ++i) {
      w.append(splits[i], NullWritable.get());
    }
  } finally {
    if (null != w)
      w.close();
  }
  return p;
}
TestTableSnapshotInputFormat.java 文件源码 项目:ditb 阅读 28 收藏 0 点赞 0 评论 0
@Override
public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
    String snapshotName, Path tmpTableDir) throws Exception {
  Job job = new Job(UTIL.getConfiguration());
  TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
    new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
    NullWritable.class, job, false, tmpTableDir);
}
DistSum.java 文件源码 项目:hadoop 阅读 19 收藏 0 点赞 0 评论 0
/** {@inheritDoc} */
@Override
public void init(Job job) {
  // setup mapper
  job.setMapperClass(SummingMapper.class);
  job.setMapOutputKeyClass(NullWritable.class);
  job.setMapOutputValueClass(TaskResult.class);

  // zero reducer
  job.setNumReduceTasks(0);

  // setup input
  job.setInputFormatClass(PartitionInputFormat.class);
}
MDSHiveDirectVectorizedReader.java 文件源码 项目:multiple-dimension-spread 阅读 21 收藏 0 点赞 0 评论 0
@Override
public boolean next( final NullWritable key, final VectorizedRowBatch outputBatch ) throws IOException {
  outputBatch.reset();
  setting.setPartitionValues( outputBatch );

  if( indexSize <= currentIndex ){
    if( ! currentReader.hasNext() ){
      updateCounter( currentReader.getReadStats() );
      outputBatch.endOfFile = true;
      isEnd = true;
      return false;
    }
    while( ! setSpread() ){
      if( ! currentReader.hasNext() ){
        updateCounter( currentReader.getReadStats() );
        outputBatch.endOfFile = true;
        isEnd = true;
        return false;
      }
    }
  }
  int maxSize = outputBatch.getMaxSize();
  if( indexSize < currentIndex + maxSize ){
    maxSize = indexSize - currentIndex;
  }

  for( int colIndex : needColumnIds ){
    assignors[colIndex].setColumnVector( outputBatch.cols[colIndex] , currentIndexList , currentIndex , maxSize );
  }
  outputBatch.size = maxSize;

  currentIndex += maxSize;
  if( indexSize <= currentIndex && ! currentReader.hasNext() ){
    outputBatch.endOfFile = true;
  }

  return outputBatch.size > 0;
}
SleepJob.java 文件源码 项目:hadoop 阅读 22 收藏 0 点赞 0 评论 0
@Override
public Job call()
  throws IOException, InterruptedException, ClassNotFoundException {
  ugi.doAs(
    new PrivilegedExceptionAction<Job>() {
      public Job run()
        throws IOException, ClassNotFoundException, InterruptedException {
        job.setMapperClass(SleepMapper.class);
        job.setReducerClass(SleepReducer.class);
        job.setNumReduceTasks((mapTasksOnly) ? 0 : jobdesc.getNumberReduces());
        job.setMapOutputKeyClass(GridmixKey.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setSortComparatorClass(GridmixKey.Comparator.class);
        job.setGroupingComparatorClass(SpecGroupingComparator.class);
        job.setInputFormatClass(SleepInputFormat.class);
        job.setOutputFormatClass(NullOutputFormat.class);
        job.setPartitionerClass(DraftPartitioner.class);
        job.setJarByClass(SleepJob.class);
        job.getConfiguration().setBoolean(Job.USED_GENERIC_PARSER, true);
        job.submit();
        return job;

      }
    });

  return job;
}
TextCompactionReducerTest.java 文件源码 项目:dataSqueeze 阅读 21 收藏 0 点赞 0 评论 0
@Test
public void testReduceParentKey() throws IOException {
    configuration.set("compactionSourcePath", "/src/path");
    configuration.set("compactionTargetPath", "/target/path");
    values.add(value1);
    values.add(value2);
    reduceDriver.withInput(inputParentKey, values);
    final BytesWritable bytesWritable = new BytesWritable("value/src/path".getBytes());
    reduceDriver.withPathOutput(NullWritable.get(), value1, "value/target/path");
    reduceDriver.withPathOutput(NullWritable.get(), value2, "value/target/path");
    reduceDriver.runTest();
}
OutputHandler.java 文件源码 项目:hadoop 阅读 23 收藏 0 点赞 0 评论 0
/**
 * Create a handler that will handle any records output from the application.
 * @param collector the "real" collector that takes the output
 * @param reporter the reporter for reporting progress
 */
public OutputHandler(OutputCollector<K, V> collector, Reporter reporter, 
                     RecordReader<FloatWritable,NullWritable> recordReader,
                     String expectedDigest) {
  this.reporter = reporter;
  this.collector = collector;
  this.recordReader = recordReader;
  this.expectedDigest = expectedDigest;
}
TestMDSHiveDirectVectorizedReader.java 文件源码 项目:multiple-dimension-spread 阅读 20 收藏 0 点赞 0 评论 0
@Test
public void T_allTest_1() throws IOException{
  String dirName = this.getClass().getClassLoader().getResource( "io/out" ).getPath();
  String outPath = String.format( "%s/TestMDSHiveDirectVectorizedReader_T_allTest_1.mds" , dirName );
  createFile( outPath );

  HiveVectorizedReaderSetting setting = getHiveVectorizedReaderSetting( new HiveReaderSetting( new Configuration() , new OrExpressionNode() , true , false , false ) );
  File inFile = new File( outPath );
  MDSHiveDirectVectorizedReader reader = new MDSHiveDirectVectorizedReader( new FileInputStream( inFile ) , inFile.length() , 0 , inFile.length() , setting , new DummyJobReporter() );
  NullWritable key = reader.createKey();
  VectorizedRowBatch value = reader.createValue();
  int colCount = 0;
  while( reader.next( key , value ) ){
    BytesColumnVector str = (BytesColumnVector)value.cols[0];
    LongColumnVector num2 = (LongColumnVector)value.cols[2];
    LongColumnVector p = (LongColumnVector)value.cols[4];
    assertEquals( null , value.cols[1] );
    assertEquals( null , value.cols[3] );
    for( int i = 0 ; i < value.size ; i++,colCount++ ){
      assertEquals( new String( str.vector[i] , str.start[i] , str.length[i] ) , "a-" + colCount );
      assertEquals( num2.vector[i] , colCount * 2 );
      assertEquals( p.vector[0] , 100 );
    }
  }
  reader.getPos();
  reader.getProgress();
  reader.close();
}
LanguageModel.java 文件源码 项目:mapreduce-samples 阅读 15 收藏 0 点赞 0 评论 0
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            //this -> <is=1000, is book=10>

            TreeMap<Integer, List<String>> tm = new TreeMap<Integer, List<String>>(Collections.reverseOrder());
            for (Text val : values) {
                String cur_val = val.toString().trim();
                String word = cur_val.split("=")[0].trim();
                int count = Integer.parseInt(cur_val.split("=")[1].trim());
                if(tm.containsKey(count)) {
                    tm.get(count).add(word);
                }
                else {
                    List<String> list = new ArrayList<>();
                    list.add(word);
                    tm.put(count, list);
                }
            }

            Iterator<Integer> iter = tm.keySet().iterator();

            for(int j=0 ; iter.hasNext() && j < n; j++) {
                int keyCount = iter.next();
                List<String> words = tm.get(keyCount);
                for(String curWord: words) {
                    context.write(new DBOutputWritable(key.toString(), curWord, keyCount), NullWritable.get());
                    j++;
                }
            }
        }
TestTotalOrderPartitioner.java 文件源码 项目:hadoop 阅读 29 收藏 0 点赞 0 评论 0
public void testTotalOrderCustomComparator() throws Exception {
  TotalOrderPartitioner<Text,NullWritable> partitioner =
    new TotalOrderPartitioner<Text,NullWritable>();
  Configuration conf = new Configuration();
  Text[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length);
  Arrays.sort(revSplitStrings, new ReverseStringComparator());
  Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
      "totalordercustomcomparator", conf, revSplitStrings);
  conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
  conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, Text.class, Object.class);
  conf.setClass(MRJobConfig.KEY_COMPARATOR,
    ReverseStringComparator.class, RawComparator.class);
  ArrayList<Check<Text>> revCheck = new ArrayList<Check<Text>>();
  revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
  revCheck.add(new Check<Text>(new Text("aaabb"), 9));
  revCheck.add(new Check<Text>(new Text("aabbb"), 9));
  revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
  revCheck.add(new Check<Text>(new Text("babbb"), 8));
  revCheck.add(new Check<Text>(new Text("baabb"), 8));
  revCheck.add(new Check<Text>(new Text("yai"), 1));
  revCheck.add(new Check<Text>(new Text("yak"), 1));
  revCheck.add(new Check<Text>(new Text("z"), 0));
  revCheck.add(new Check<Text>(new Text("ddngo"), 4));
  revCheck.add(new Check<Text>(new Text("hi"), 3));
  try {
    partitioner.setConf(conf);
    NullWritable nw = NullWritable.get();
    for (Check<Text> chk : revCheck) {
      assertEquals(chk.data.toString(), chk.part,
          partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
    }
  } finally {
    p.getFileSystem(conf).delete(p, true);
  }
}


问题


面经


文章

微信
公众号

扫码关注公众号