@Override
public boolean next( final NullWritable key, final ColumnAndIndex value ) throws IOException {
if( currentSpread == null || currentIndex == currentIndexList.size() ){
if( ! nextReader() ){
updateCounter( reader.getReadStats() );
isEnd = true;
return false;
}
}
spreadColumn.setSpread( currentSpread );
value.column = spreadColumn;
value.index = currentIndexList.get( currentIndex );
value.columnIndex = spreadCounter.get();
currentIndex++;
return true;
}
java类org.apache.hadoop.io.NullWritable的实例源码
MDSHiveLineReader.java 文件源码
项目:multiple-dimension-spread
阅读 20
收藏 0
点赞 0
评论 0
DistSum.java 文件源码
项目:hadoop
阅读 19
收藏 0
点赞 0
评论 0
/** {@inheritDoc} */
@Override
public void init(Job job) {
// setup mapper
job.setMapperClass(PartitionMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(SummationWritable.class);
// setup partitioner
job.setPartitionerClass(IndexPartitioner.class);
// setup reducer
job.setReducerClass(SummingReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(TaskResult.class);
final Configuration conf = job.getConfiguration();
final int nParts = conf.getInt(N_PARTS, 1);
job.setNumReduceTasks(nParts);
// setup input
job.setInputFormatClass(SummationInputFormat.class);
}
TestJoinDatamerge.java 文件源码
项目:hadoop
阅读 21
收藏 0
点赞 0
评论 0
public void testEmptyJoin() throws Exception {
Configuration conf = new Configuration();
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
conf.set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose("outer",
MapReduceTestUtil.Fake_IF.class, src));
MapReduceTestUtil.Fake_IF.setKeyClass(conf,
MapReduceTestUtil.IncomparableKey.class);
Job job = Job.getInstance(conf);
job.setInputFormatClass(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
job.setOutputKeyClass(MapReduceTestUtil.IncomparableKey.class);
job.setOutputValueClass(NullWritable.class);
job.waitForCompletion(true);
assertTrue(job.isSuccessful());
base.getFileSystem(conf).delete(base, true);
}
AvroImportMapper.java 文件源码
项目:aliyun-maxcompute-data-collectors
阅读 16
收藏 0
点赞 0
评论 0
@Override
protected void map(LongWritable key, SqoopRecord val, Context context)
throws IOException, InterruptedException {
try {
// Loading of LOBs was delayed until we have a Context.
val.loadLargeObjects(lobLoader);
} catch (SQLException sqlE) {
throw new IOException(sqlE);
}
GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(),
schema, bigDecimalFormatString);
wrapper.datum(outKey);
context.write(wrapper, NullWritable.get());
}
PostgreSQLCopyExportJob.java 文件源码
项目:aliyun-maxcompute-data-collectors
阅读 18
收藏 0
点赞 0
评论 0
@Override
protected void configureMapper(Job job, String tableName,
String tableClassName) throws ClassNotFoundException, IOException {
if (isHCatJob) {
throw new IOException("Sqoop-HCatalog Integration is not supported.");
}
switch (getInputFileType()) {
case AVRO_DATA_FILE:
throw new IOException("Avro data file is not supported.");
case SEQUENCE_FILE:
case UNKNOWN:
default:
job.setMapperClass(getMapperClass());
}
// Concurrent writes of the same records would be problematic.
ConfigurationHelper.setJobMapSpeculativeExecution(job, false);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
}
TotalOrderPartitioner.java 文件源码
项目:hadoop
阅读 21
收藏 0
点赞 0
评论 0
/**
* Read the cut points from the given IFile.
* @param fs The file system
* @param p The path to read
* @param keyClass The map output key class
* @param job The job config
* @throws IOException
*/
// matching key types enforced by passing in
@SuppressWarnings("unchecked") // map output key class
private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
Configuration conf) throws IOException {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
ArrayList<K> parts = new ArrayList<K>();
K key = ReflectionUtils.newInstance(keyClass, conf);
NullWritable value = NullWritable.get();
try {
while (reader.next(key, value)) {
parts.add(key);
key = ReflectionUtils.newInstance(keyClass, conf);
}
reader.close();
reader = null;
} finally {
IOUtils.cleanup(LOG, reader);
}
return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
}
OdpsExportMapper.java 文件源码
项目:aliyun-maxcompute-data-collectors
阅读 18
收藏 0
点赞 0
评论 0
public void map(LongWritable key, Record val, Context context)
throws IOException, InterruptedException{
try {
odpsImpl.parse(val);
context.write(odpsImpl, NullWritable.get());
} catch (Exception e) {
LOG.error("Exception raised during data export");
LOG.error("Exception: ", e);
LOG.error("On input: " + val);
LOG.error("At position " + key);
InputSplit is = context.getInputSplit();
LOG.error("");
LOG.error("Currently processing split:");
LOG.error(is);
LOG.error("");
LOG.error("This issue might not necessarily be caused by current input");
LOG.error("due to the batching nature of export.");
LOG.error("");
throw new IOException("Can't export data, please check failed map task logs", e);
}
}
TestFileOutputCommitter.java 文件源码
项目:hadoop
阅读 19
收藏 0
点赞 0
评论 0
private void writeOutput(RecordWriter theRecordWriter,
TaskAttemptContext context) throws IOException, InterruptedException {
NullWritable nullWritable = NullWritable.get();
try {
theRecordWriter.write(key1, val1);
theRecordWriter.write(null, nullWritable);
theRecordWriter.write(null, val1);
theRecordWriter.write(nullWritable, val2);
theRecordWriter.write(key2, nullWritable);
theRecordWriter.write(key1, null);
theRecordWriter.write(null, null);
theRecordWriter.write(key2, val2);
} finally {
theRecordWriter.close(context);
}
}
AvroRecordReader.java 文件源码
项目:aliyun-maxcompute-data-collectors
阅读 23
收藏 0
点赞 0
评论 0
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!reader.hasNext() || reader.pastSync(end)) {
key = null;
value = null;
return false;
}
if (key == null) {
key = new AvroWrapper<T>();
}
if (value == null) {
value = NullWritable.get();
}
key.datum(reader.next(key.datum()));
return true;
}
OrcCompactionReducer.java 文件源码
项目:dataSqueeze
阅读 20
收藏 0
点赞 0
评论 0
/**
* {@inheritDoc}
*/
protected void reduce(final Text key, final Iterable<OrcValue> values, final Context context) throws IOException, InterruptedException {
final Configuration configuration = context.getConfiguration();
final String sourcePath = configuration.get("compactionSourcePath");
final String targetPath = configuration.get("compactionTargetPath");
// Reducer stores data at the target directory retaining the directory structure of files
String filePath = key.toString().replace(sourcePath, targetPath);
if (key.toString().endsWith("/")) {
filePath = filePath.concat("file");
}
log.info("Compaction output path {}", filePath);
final URI uri = URI.create(filePath);
final MultipleOutputs multipleOutputs = new MultipleOutputs<NullWritable, OrcValue>(context);
try {
for (final OrcValue text : values) {
multipleOutputs.write(NullWritable.get(), text, uri.toString());
}
} finally {
multipleOutputs.close();
}
}
GenerateDistCacheData.java 文件源码
项目:hadoop
阅读 20
收藏 0
点赞 0
评论 0
@Override
public Job call() throws IOException, InterruptedException,
ClassNotFoundException {
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
ugi.doAs( new PrivilegedExceptionAction <Job>() {
public Job run() throws IOException, ClassNotFoundException,
InterruptedException {
job.setMapperClass(GenDCDataMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setInputFormatClass(GenDCDataFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setJarByClass(GenerateDistCacheData.class);
try {
FileInputFormat.addInputPath(job, new Path("ignored"));
} catch (IOException e) {
LOG.error("Error while adding input path ", e);
}
job.submit();
return job;
}
});
return job;
}
TestFileOutputCommitter.java 文件源码
项目:hadoop
阅读 19
收藏 0
点赞 0
评论 0
private void writeOutput(RecordWriter theRecordWriter,
TaskAttemptContext context) throws IOException, InterruptedException {
NullWritable nullWritable = NullWritable.get();
try {
theRecordWriter.write(key1, val1);
theRecordWriter.write(null, nullWritable);
theRecordWriter.write(null, val1);
theRecordWriter.write(nullWritable, val2);
theRecordWriter.write(key2, nullWritable);
theRecordWriter.write(key1, null);
theRecordWriter.write(null, null);
theRecordWriter.write(key2, val2);
} finally {
theRecordWriter.close(null);
}
}
VPMapOnlyMapper.java 文件源码
项目:PigSPARQL
阅读 19
收藏 0
点赞 0
评论 0
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] parsedTriple = rdfParser.parseTriple(value.toString());
if (parsedTriple != null) {
// Convert liters to Pig Types, if possible
parsedTriple[2] = Util.toPigTypes(parsedTriple[2]);
// Use Predicate for Vertical Partitioning
multipleOutputs.write(NullWritable.get(), new Text(parsedTriple[0] + "\t" + parsedTriple[2]),
Util.generateFileName(parsedTriple[1]));
// Write all parsed triples also to "inputData" for queries where Predicate is not known
multipleOutputs.write(NullWritable.get(), new Text(parsedTriple[0] + "\t" + parsedTriple[1] + "\t" + parsedTriple[2]),
Util.generateFileName("inputData"));
context.getCounter("RDF Dataset Properties", VALID_TRIPLES).increment(1);
} else {
if (value.getLength() == 0 || value.toString().startsWith("@")) {
System.out.println("IGNORING: " + value);
context.getCounter("RDF Dataset Properties", IGNORED_LINES).increment(1);
} else {
System.out.println("DISCARDED: " + value);
context.getCounter("RDF Dataset Properties", INVALID_TRIPLES).increment(1);
}
}
}
TestDatamerge.java 文件源码
项目:hadoop
阅读 21
收藏 0
点赞 0
评论 0
public void testEmptyJoin() throws Exception {
JobConf job = new JobConf();
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
job.set("mapreduce.join.expr", CompositeInputFormat.compose("outer",
Fake_IF.class, src));
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(IncomparableKey.class);
job.setOutputValueClass(NullWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
TestReduceFetchFromPartialMem.java 文件源码
项目:hadoop
阅读 20
收藏 0
点赞 0
评论 0
public RecordReader<NullWritable,NullWritable> getRecordReader(
InputSplit ignored, JobConf conf, Reporter reporter) {
return new RecordReader<NullWritable,NullWritable>() {
private boolean done = false;
public boolean next(NullWritable key, NullWritable value)
throws IOException {
if (done)
return false;
done = true;
return true;
}
public NullWritable createKey() { return NullWritable.get(); }
public NullWritable createValue() { return NullWritable.get(); }
public long getPos() throws IOException { return 0L; }
public void close() throws IOException { }
public float getProgress() throws IOException { return 0.0f; }
};
}
CredentialsTestJob.java 文件源码
项目:hadoop
阅读 20
收藏 0
点赞 0
评论 0
public Job createJob()
throws IOException {
Configuration conf = getConf();
conf.setInt(MRJobConfig.NUM_MAPS, 1);
Job job = Job.getInstance(conf, "test");
job.setNumReduceTasks(1);
job.setJarByClass(CredentialsTestJob.class);
job.setNumReduceTasks(1);
job.setMapperClass(CredentialsTestJob.CredentialsTestMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(CredentialsTestJob.CredentialsTestReducer.class);
job.setInputFormatClass(SleepJob.SleepInputFormat.class);
job.setPartitionerClass(SleepJob.SleepJobPartitioner.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setSpeculativeExecution(false);
job.setJobName("test job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
TestTotalOrderPartitioner.java 文件源码
项目:hadoop
阅读 31
收藏 0
点赞 0
评论 0
public void testTotalOrderBinarySearch() throws Exception {
TotalOrderPartitioner<Text,NullWritable> partitioner =
new TotalOrderPartitioner<Text,NullWritable>();
Configuration conf = new Configuration();
Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
"totalorderbinarysearch", conf, splitStrings);
conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, Text.class, Object.class);
try {
partitioner.setConf(conf);
NullWritable nw = NullWritable.get();
for (Check<Text> chk : testStrings) {
assertEquals(chk.data.toString(), chk.part,
partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
}
} finally {
p.getFileSystem(conf).delete(p, true);
}
}
TestTableSnapshotInputFormat.java 文件源码
项目:ditb
阅读 25
收藏 0
点赞 0
评论 0
@Override
protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
int numRegions, int expectedNumSplits) throws Exception {
setupCluster();
TableName tableName = TableName.valueOf("testWithMockedMapReduce");
try {
createTableAndSnapshot(
util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
JobConf job = new JobConf(util.getConfiguration());
Path tmpTableDir = util.getRandomDir();
TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, false, tmpTableDir);
// mapred doesn't support start and end keys? o.O
verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
} finally {
util.getHBaseAdmin().deleteSnapshot(snapshotName);
util.deleteTable(tableName);
tearDownCluster();
}
}
TestInputSampler.java 文件源码
项目:hadoop
阅读 20
收藏 0
点赞 0
评论 0
/**
* Verify IntervalSampler contract, that samples are taken at regular
* intervals from the given splits.
*/
@Test
@SuppressWarnings("unchecked") // IntWritable comparator not typesafe
public void testIntervalSampler() throws Exception {
final int TOT_SPLITS = 16;
final int PER_SPLIT_SAMPLE = 4;
final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE;
final double FREQ = 1.0 / TOT_SPLITS;
InputSampler.Sampler<IntWritable,NullWritable> sampler =
new InputSampler.IntervalSampler<IntWritable,NullWritable>(
FREQ, NUM_SAMPLES);
int inits[] = new int[TOT_SPLITS];
for (int i = 0; i < TOT_SPLITS; ++i) {
inits[i] = i;
}
Job ignored = Job.getInstance();
Object[] samples = sampler.getSample(new TestInputSamplerIF(
NUM_SAMPLES, TOT_SPLITS, inits), ignored);
assertEquals(NUM_SAMPLES, samples.length);
Arrays.sort(samples, new IntWritable.Comparator());
for (int i = 0; i < NUM_SAMPLES; ++i) {
assertEquals(i, ((IntWritable)samples[i]).get());
}
}
TestInputSampler.java 文件源码
项目:hadoop
阅读 19
收藏 0
点赞 0
评论 0
/**
* Verify IntervalSampler in mapred.lib.InputSampler, which is added back
* for binary compatibility of M/R 1.x
*/
@Test (timeout = 30000)
@SuppressWarnings("unchecked") // IntWritable comparator not typesafe
public void testMapredIntervalSampler() throws Exception {
final int TOT_SPLITS = 16;
final int PER_SPLIT_SAMPLE = 4;
final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE;
final double FREQ = 1.0 / TOT_SPLITS;
org.apache.hadoop.mapred.lib.InputSampler.Sampler<IntWritable,NullWritable>
sampler = new org.apache.hadoop.mapred.lib.InputSampler.IntervalSampler
<IntWritable,NullWritable>(FREQ, NUM_SAMPLES);
int inits[] = new int[TOT_SPLITS];
for (int i = 0; i < TOT_SPLITS; ++i) {
inits[i] = i;
}
Job ignored = Job.getInstance();
Object[] samples = sampler.getSample(new TestInputSamplerIF(
NUM_SAMPLES, TOT_SPLITS, inits), ignored);
assertEquals(NUM_SAMPLES, samples.length);
Arrays.sort(samples, new IntWritable.Comparator());
for (int i = 0; i < NUM_SAMPLES; ++i) {
assertEquals(i,
((IntWritable)samples[i]).get());
}
}
TestTotalOrderPartitioner.java 文件源码
项目:hadoop
阅读 26
收藏 0
点赞 0
评论 0
private static <T extends WritableComparable<?>> Path writePartitionFile(
String testname, Configuration conf, T[] splits) throws IOException {
final FileSystem fs = FileSystem.getLocal(conf);
final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
).makeQualified(fs);
Path p = new Path(testdir, testname + "/_partition.lst");
TotalOrderPartitioner.setPartitionFile(conf, p);
conf.setInt(MRJobConfig.NUM_REDUCES, splits.length + 1);
SequenceFile.Writer w = null;
try {
w = SequenceFile.createWriter(fs, conf, p,
splits[0].getClass(), NullWritable.class,
SequenceFile.CompressionType.NONE);
for (int i = 0; i < splits.length; ++i) {
w.append(splits[i], NullWritable.get());
}
} finally {
if (null != w)
w.close();
}
return p;
}
TestTableSnapshotInputFormat.java 文件源码
项目:ditb
阅读 28
收藏 0
点赞 0
评论 0
@Override
public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
String snapshotName, Path tmpTableDir) throws Exception {
Job job = new Job(UTIL.getConfiguration());
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, false, tmpTableDir);
}
DistSum.java 文件源码
项目:hadoop
阅读 19
收藏 0
点赞 0
评论 0
/** {@inheritDoc} */
@Override
public void init(Job job) {
// setup mapper
job.setMapperClass(SummingMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(TaskResult.class);
// zero reducer
job.setNumReduceTasks(0);
// setup input
job.setInputFormatClass(PartitionInputFormat.class);
}
MDSHiveDirectVectorizedReader.java 文件源码
项目:multiple-dimension-spread
阅读 21
收藏 0
点赞 0
评论 0
@Override
public boolean next( final NullWritable key, final VectorizedRowBatch outputBatch ) throws IOException {
outputBatch.reset();
setting.setPartitionValues( outputBatch );
if( indexSize <= currentIndex ){
if( ! currentReader.hasNext() ){
updateCounter( currentReader.getReadStats() );
outputBatch.endOfFile = true;
isEnd = true;
return false;
}
while( ! setSpread() ){
if( ! currentReader.hasNext() ){
updateCounter( currentReader.getReadStats() );
outputBatch.endOfFile = true;
isEnd = true;
return false;
}
}
}
int maxSize = outputBatch.getMaxSize();
if( indexSize < currentIndex + maxSize ){
maxSize = indexSize - currentIndex;
}
for( int colIndex : needColumnIds ){
assignors[colIndex].setColumnVector( outputBatch.cols[colIndex] , currentIndexList , currentIndex , maxSize );
}
outputBatch.size = maxSize;
currentIndex += maxSize;
if( indexSize <= currentIndex && ! currentReader.hasNext() ){
outputBatch.endOfFile = true;
}
return outputBatch.size > 0;
}
SleepJob.java 文件源码
项目:hadoop
阅读 22
收藏 0
点赞 0
评论 0
@Override
public Job call()
throws IOException, InterruptedException, ClassNotFoundException {
ugi.doAs(
new PrivilegedExceptionAction<Job>() {
public Job run()
throws IOException, ClassNotFoundException, InterruptedException {
job.setMapperClass(SleepMapper.class);
job.setReducerClass(SleepReducer.class);
job.setNumReduceTasks((mapTasksOnly) ? 0 : jobdesc.getNumberReduces());
job.setMapOutputKeyClass(GridmixKey.class);
job.setMapOutputValueClass(NullWritable.class);
job.setSortComparatorClass(GridmixKey.Comparator.class);
job.setGroupingComparatorClass(SpecGroupingComparator.class);
job.setInputFormatClass(SleepInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setPartitionerClass(DraftPartitioner.class);
job.setJarByClass(SleepJob.class);
job.getConfiguration().setBoolean(Job.USED_GENERIC_PARSER, true);
job.submit();
return job;
}
});
return job;
}
TextCompactionReducerTest.java 文件源码
项目:dataSqueeze
阅读 21
收藏 0
点赞 0
评论 0
@Test
public void testReduceParentKey() throws IOException {
configuration.set("compactionSourcePath", "/src/path");
configuration.set("compactionTargetPath", "/target/path");
values.add(value1);
values.add(value2);
reduceDriver.withInput(inputParentKey, values);
final BytesWritable bytesWritable = new BytesWritable("value/src/path".getBytes());
reduceDriver.withPathOutput(NullWritable.get(), value1, "value/target/path");
reduceDriver.withPathOutput(NullWritable.get(), value2, "value/target/path");
reduceDriver.runTest();
}
OutputHandler.java 文件源码
项目:hadoop
阅读 23
收藏 0
点赞 0
评论 0
/**
* Create a handler that will handle any records output from the application.
* @param collector the "real" collector that takes the output
* @param reporter the reporter for reporting progress
*/
public OutputHandler(OutputCollector<K, V> collector, Reporter reporter,
RecordReader<FloatWritable,NullWritable> recordReader,
String expectedDigest) {
this.reporter = reporter;
this.collector = collector;
this.recordReader = recordReader;
this.expectedDigest = expectedDigest;
}
TestMDSHiveDirectVectorizedReader.java 文件源码
项目:multiple-dimension-spread
阅读 20
收藏 0
点赞 0
评论 0
@Test
public void T_allTest_1() throws IOException{
String dirName = this.getClass().getClassLoader().getResource( "io/out" ).getPath();
String outPath = String.format( "%s/TestMDSHiveDirectVectorizedReader_T_allTest_1.mds" , dirName );
createFile( outPath );
HiveVectorizedReaderSetting setting = getHiveVectorizedReaderSetting( new HiveReaderSetting( new Configuration() , new OrExpressionNode() , true , false , false ) );
File inFile = new File( outPath );
MDSHiveDirectVectorizedReader reader = new MDSHiveDirectVectorizedReader( new FileInputStream( inFile ) , inFile.length() , 0 , inFile.length() , setting , new DummyJobReporter() );
NullWritable key = reader.createKey();
VectorizedRowBatch value = reader.createValue();
int colCount = 0;
while( reader.next( key , value ) ){
BytesColumnVector str = (BytesColumnVector)value.cols[0];
LongColumnVector num2 = (LongColumnVector)value.cols[2];
LongColumnVector p = (LongColumnVector)value.cols[4];
assertEquals( null , value.cols[1] );
assertEquals( null , value.cols[3] );
for( int i = 0 ; i < value.size ; i++,colCount++ ){
assertEquals( new String( str.vector[i] , str.start[i] , str.length[i] ) , "a-" + colCount );
assertEquals( num2.vector[i] , colCount * 2 );
assertEquals( p.vector[0] , 100 );
}
}
reader.getPos();
reader.getProgress();
reader.close();
}
LanguageModel.java 文件源码
项目:mapreduce-samples
阅读 15
收藏 0
点赞 0
评论 0
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//this -> <is=1000, is book=10>
TreeMap<Integer, List<String>> tm = new TreeMap<Integer, List<String>>(Collections.reverseOrder());
for (Text val : values) {
String cur_val = val.toString().trim();
String word = cur_val.split("=")[0].trim();
int count = Integer.parseInt(cur_val.split("=")[1].trim());
if(tm.containsKey(count)) {
tm.get(count).add(word);
}
else {
List<String> list = new ArrayList<>();
list.add(word);
tm.put(count, list);
}
}
Iterator<Integer> iter = tm.keySet().iterator();
for(int j=0 ; iter.hasNext() && j < n; j++) {
int keyCount = iter.next();
List<String> words = tm.get(keyCount);
for(String curWord: words) {
context.write(new DBOutputWritable(key.toString(), curWord, keyCount), NullWritable.get());
j++;
}
}
}
TestTotalOrderPartitioner.java 文件源码
项目:hadoop
阅读 29
收藏 0
点赞 0
评论 0
public void testTotalOrderCustomComparator() throws Exception {
TotalOrderPartitioner<Text,NullWritable> partitioner =
new TotalOrderPartitioner<Text,NullWritable>();
Configuration conf = new Configuration();
Text[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length);
Arrays.sort(revSplitStrings, new ReverseStringComparator());
Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
"totalordercustomcomparator", conf, revSplitStrings);
conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, Text.class, Object.class);
conf.setClass(MRJobConfig.KEY_COMPARATOR,
ReverseStringComparator.class, RawComparator.class);
ArrayList<Check<Text>> revCheck = new ArrayList<Check<Text>>();
revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
revCheck.add(new Check<Text>(new Text("aaabb"), 9));
revCheck.add(new Check<Text>(new Text("aabbb"), 9));
revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
revCheck.add(new Check<Text>(new Text("babbb"), 8));
revCheck.add(new Check<Text>(new Text("baabb"), 8));
revCheck.add(new Check<Text>(new Text("yai"), 1));
revCheck.add(new Check<Text>(new Text("yak"), 1));
revCheck.add(new Check<Text>(new Text("z"), 0));
revCheck.add(new Check<Text>(new Text("ddngo"), 4));
revCheck.add(new Check<Text>(new Text("hi"), 3));
try {
partitioner.setConf(conf);
NullWritable nw = NullWritable.get();
for (Check<Text> chk : revCheck) {
assertEquals(chk.data.toString(), chk.part,
partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
}
} finally {
p.getFileSystem(conf).delete(p, true);
}
}