java类org.apache.hadoop.io.WritableUtils的实例源码

FSImageSerialization.java 文件源码 项目:hadoop 阅读 23 收藏 0 点赞 0 评论 0
/**
 * Write an array of blocks as compactly as possible. This uses
 * delta-encoding for the generation stamp and size, following
 * the principle that genstamp increases relatively slowly,
 * and size is equal for all but the last block of a file.
 */
public static void writeCompactBlockArray(
    Block[] blocks, DataOutputStream out) throws IOException {
  WritableUtils.writeVInt(out, blocks.length);
  Block prev = null;
  for (Block b : blocks) {
    long szDelta = b.getNumBytes() -
        (prev != null ? prev.getNumBytes() : 0);
    long gsDelta = b.getGenerationStamp() -
        (prev != null ? prev.getGenerationStamp() : 0);
    out.writeLong(b.getBlockId()); // blockid is random
    WritableUtils.writeVLong(out, szDelta);
    WritableUtils.writeVLong(out, gsDelta);
    prev = b;
  }
}
TestTFileStreams.java 文件源码 项目:hadoop-oss 阅读 24 收藏 0 点赞 0 评论 0
private long writeRecords(int count, boolean knownKeyLength,
    boolean knownValueLength, boolean close) throws IOException {
  long rawDataSize = 0;
  for (int nx = 0; nx < count; nx++) {
    String key = TestTFileByteArrays.composeSortedKey("key", nx);
    DataOutputStream outKey =
        writer.prepareAppendKey(knownKeyLength ? key.length() : -1);
    outKey.write(key.getBytes());
    outKey.close();
    String value = "value" + nx;
    DataOutputStream outValue =
        writer.prepareAppendValue(knownValueLength ? value.length() : -1);
    outValue.write(value.getBytes());
    outValue.close();
    rawDataSize +=
        WritableUtils.getVIntSize(key.getBytes().length)
            + key.getBytes().length
            + WritableUtils.getVIntSize(value.getBytes().length)
            + value.getBytes().length;
  }
  if (close) {
    closeOutput();
  }
  return rawDataSize;
}
Server.java 文件源码 项目:hadoop 阅读 32 收藏 0 点赞 0 评论 0
/**
 * Setup response for the IPC Call on Fatal Error from a 
 * client that is using old version of Hadoop.
 * The response is serialized using the previous protocol's response
 * layout.
 * 
 * @param response buffer to serialize the response into
 * @param call {@link Call} to which we are setting up the response
 * @param rv return value for the IPC Call, if the call was successful
 * @param errorClass error class, if the the call failed
 * @param error error message, if the call failed
 * @throws IOException
 */
private void setupResponseOldVersionFatal(ByteArrayOutputStream response, 
                           Call call,
                           Writable rv, String errorClass, String error) 
throws IOException {
  final int OLD_VERSION_FATAL_STATUS = -1;
  response.reset();
  DataOutputStream out = new DataOutputStream(response);
  out.writeInt(call.callId);                // write call id
  out.writeInt(OLD_VERSION_FATAL_STATUS);   // write FATAL_STATUS
  WritableUtils.writeString(out, errorClass);
  WritableUtils.writeString(out, error);

  if (call.connection.useWrap) {
    wrapWithSasl(response, call);
  }
  call.setResponse(ByteBuffer.wrap(response.toByteArray()));
}
HFileReaderV2.java 文件源码 项目:ditb 阅读 18 收藏 0 点赞 0 评论 0
/**
 * Actually do the mvcc read. Does no checks.
 * @param position
 */
private void _readMvccVersion(final int position) {
  // This is Bytes#bytesToVint inlined so can save a few instructions in this hot method; i.e.
  // previous if one-byte vint, we'd redo the vint call to find int size.
  // Also the method is kept small so can be inlined.
  byte firstByte = blockBuffer.array()[position];
  int len = WritableUtils.decodeVIntSize(firstByte);
  if (len == 1) {
    this.currMemstoreTS = firstByte;
  } else {
    long i = 0;
    for (int idx = 0; idx < len - 1; idx++) {
      byte b = blockBuffer.array()[position + 1 + idx];
      i = i << 8;
      i = i | (b & 0xFF);
    }
    currMemstoreTS = (WritableUtils.isNegativeVInt(firstByte) ? ~i : i);
  }
  this.currMemstoreTSLen = len;
}
Server.java 文件源码 项目:spark_deep 阅读 27 收藏 0 点赞 0 评论 0
/**
 * Setup response for the IPC Call.
 * 
 * @param response buffer to serialize the response into
 * @param call {@link Call} to which we are setting up the response
 * @param status {@link Status} of the IPC call
 * @param rv return value for the IPC Call, if the call was successful
 * @param errorClass error class, if the the call failed
 * @param error error message, if the call failed
 * @throws IOException
 */
private void setupResponse(ByteArrayOutputStream response, 
                           Call call, Status status, 
                           Writable rv, String errorClass, String error) 
throws IOException {
  response.reset();
  DataOutputStream out = new DataOutputStream(response);
  out.writeInt(call.id);                // write call id
  out.writeInt(status.state);           // write status

  if (status == Status.SUCCESS) {
    rv.write(out);
  } else {
    WritableUtils.writeString(out, errorClass);
    WritableUtils.writeString(out, error);
  }
  /*if (call.connection.useWrap) {
    wrapWithSasl(response, call);
  }*/
  call.setResponse(ByteBuffer.wrap(response.toByteArray()));
}
ColumnInfo.java 文件源码 项目:ditb 阅读 19 收藏 0 点赞 0 评论 0
@Override public void readFields(DataInput in) throws IOException {
  family = WritableUtils.readCompressedByteArray(in);
  qualifier = WritableUtils.readCompressedByteArray(in);
  dataType = DataType.valueOf(WritableUtils.readString(in));
  isIndex = WritableUtils.readVInt(in) == 1;
  hashCode = calHashCode();
}
LoadSplit.java 文件源码 项目:hadoop 阅读 23 收藏 0 点赞 0 评论 0
@Override
public void write(DataOutput out) throws IOException {
  super.write(out);
  WritableUtils.writeVInt(out, id);
  WritableUtils.writeVInt(out, maps);
  WritableUtils.writeVLong(out, inputRecords);
  WritableUtils.writeVLong(out, outputBytes);
  WritableUtils.writeVLong(out, outputRecords);
  WritableUtils.writeVLong(out, maxMemory);
  WritableUtils.writeVInt(out, reduces);
  for (int i = 0; i < reduces; ++i) {
    out.writeDouble(reduceBytes[i]);
    out.writeDouble(reduceRecords[i]);
  }
  WritableUtils.writeVInt(out, nSpec);
  for (int i = 0; i < nSpec; ++i) {
    WritableUtils.writeVLong(out, reduceOutputBytes[i]);
    WritableUtils.writeVLong(out, reduceOutputRecords[i]);
  }
  mapMetrics.write(out);
  int numReduceMetrics = (reduceMetrics == null) ? 0 : reduceMetrics.length;
  WritableUtils.writeVInt(out, numReduceMetrics);
  for (int i = 0; i < numReduceMetrics; ++i) {
    reduceMetrics[i].write(out);
  }
}
HFileBlockIndex.java 文件源码 项目:ditb 阅读 24 收藏 0 点赞 0 评论 0
/**
 * Adds a new entry to this block index chunk.
 *
 * @param firstKey the first key in the block pointed to by this entry
 * @param blockOffset the offset of the next-level block pointed to by this
 *          entry
 * @param onDiskDataSize the on-disk data of the block pointed to by this
 *          entry, including header size
 * @param curTotalNumSubEntries if this chunk is the root index chunk under
 *          construction, this specifies the current total number of
 *          sub-entries in all leaf-level chunks, including the one
 *          corresponding to the second-level entry being added.
 */
void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
    long curTotalNumSubEntries) {
  // Record the offset for the secondary index
  secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
  curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
      + firstKey.length;

  curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
      + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;

  blockKeys.add(firstKey);
  blockOffsets.add(blockOffset);
  onDiskDataSizes.add(onDiskDataSize);

  if (curTotalNumSubEntries != -1) {
    numSubEntriesAt.add(curTotalNumSubEntries);

    // Make sure the parallel arrays are in sync.
    if (numSubEntriesAt.size() != blockKeys.size()) {
      throw new IllegalStateException("Only have key/value count " +
          "stats for " + numSubEntriesAt.size() + " block index " +
          "entries out of " + blockKeys.size());
    }
  }
}
LobFile.java 文件源码 项目:aliyun-maxcompute-data-collectors 阅读 23 收藏 0 点赞 0 评论 0
public void readFields(DataInput in) throws IOException {
  // After the RecordStartMark, we expect to get a SEGMENT_HEADER_ID (-1).
  long segmentId = WritableUtils.readVLong(in);
  if (SEGMENT_HEADER_ID != segmentId) {
    throw new IOException("Expected segment header id " + SEGMENT_HEADER_ID
        + "; got " + segmentId);
  }

  // Get the length of the rest of the segment, in bytes.
  long length = WritableUtils.readVLong(in);

  // Now read the actual main byte array.
  if (length > Integer.MAX_VALUE) {
    throw new IOException("Unexpected oversize data array length: "
        + length);
  } else if (length < 0) {
    throw new IOException("Unexpected undersize data array length: "
        + length);
  }
  byte [] segmentData = new byte[(int) length];
  in.readFully(segmentData);
  recordLenBytes = new BytesWritable(segmentData);

  reset(); // Reset the iterator allowing the user to yield offset/lengths.
}
HbaseObjectWritableFor96Migration.java 文件源码 项目:ditb 阅读 22 收藏 0 点赞 0 评论 0
/** Reads and returns the class as written by {@link #writeClass(DataOutput, Class)} */
static Class<?> readClass(Configuration conf, DataInput in) throws IOException {
  Class<?> instanceClass = null;
  int b = (byte)WritableUtils.readVInt(in);
  if (b == NOT_ENCODED) {
    String className = Text.readString(in);
    try {
      instanceClass = getClassByName(conf, className);
    } catch (ClassNotFoundException e) {
      LOG.error("Can't find class " + className, e);
      throw new IOException("Can't find class " + className, e);
    }
  } else {
    instanceClass = CODE_TO_CLASS.get(b);
  }
  return instanceClass;
}
InMemoryWriter.java 文件源码 项目:hadoop 阅读 23 收藏 0 点赞 0 评论 0
public void append(DataInputBuffer key, DataInputBuffer value)
throws IOException {
  int keyLength = key.getLength() - key.getPosition();
  if (keyLength < 0) {
    throw new IOException("Negative key-length not allowed: " + keyLength + 
                          " for " + key);
  }

  int valueLength = value.getLength() - value.getPosition();
  if (valueLength < 0) {
    throw new IOException("Negative value-length not allowed: " + 
                          valueLength + " for " + value);
  }

  WritableUtils.writeVInt(out, keyLength);
  WritableUtils.writeVInt(out, valueLength);
  out.write(key.getData(), key.getPosition(), keyLength); 
  out.write(value.getData(), value.getPosition(), valueLength); 
}
TaskStatus.java 文件源码 项目:hadoop 阅读 20 收藏 0 点赞 0 评论 0
public void readFields(DataInput in) throws IOException {
  this.taskid.readFields(in);
  setProgress(in.readFloat());
  this.numSlots = in.readInt();
  this.runState = WritableUtils.readEnum(in, State.class);
  setDiagnosticInfo(StringInterner.weakIntern(Text.readString(in)));
  setStateString(StringInterner.weakIntern(Text.readString(in)));
  this.phase = WritableUtils.readEnum(in, Phase.class); 
  this.startTime = in.readLong(); 
  this.finishTime = in.readLong(); 
  counters = new Counters();
  this.includeAllCounters = in.readBoolean();
  this.outputSize = in.readLong();
  counters.readFields(in);
  nextRecordRange.readFields(in);
}
Compressor.java 文件源码 项目:ditb 阅读 25 收藏 0 点赞 0 评论 0
/**
 * Compresses and writes an array to a DataOutput
 * 
 * @param data the array to write.
 * @param out the DataOutput to write into
 * @param dict the dictionary to use for compression
 */
@Deprecated
static void writeCompressed(byte[] data, int offset, int length,
    DataOutput out, Dictionary dict)
    throws IOException {
  short dictIdx = Dictionary.NOT_IN_DICTIONARY;
  if (dict != null) {
    dictIdx = dict.findEntry(data, offset, length);
  }
  if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
    // not in dict
    out.writeByte(Dictionary.NOT_IN_DICTIONARY);
    WritableUtils.writeVInt(out, length);
    out.write(data, offset, length);
  } else {
    out.writeShort(dictIdx);
  }
}
GridmixKey.java 文件源码 项目:hadoop 阅读 18 收藏 0 点赞 0 评论 0
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  try {
    di.reset(b1, s1, l1);
    final int x1 = WritableUtils.readVInt(di);
    di.reset(b2, s2, l2);
    final int x2 = WritableUtils.readVInt(di);
    final int ret = (b1[s1 + x1] != b2[s2 + x2])
      ? b1[s1 + x1] - b2[s2 + x2]
      : super.compare(b1, s1, x1, b2, s2, x2);
    di.reset(reset, 0, 0);
    return ret;
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}
CompositeInputSplit.java 文件源码 项目:hadoop 阅读 21 收藏 0 点赞 0 评论 0
/**
 * {@inheritDoc}
 * @throws IOException If the child InputSplit cannot be read, typically
 *                     for failing access checks.
 */
@SuppressWarnings("unchecked")  // Generic array assignment
public void readFields(DataInput in) throws IOException {
  int card = WritableUtils.readVInt(in);
  if (splits == null || splits.length != card) {
    splits = new InputSplit[card];
  }
  Class<? extends InputSplit>[] cls = new Class[card];
  try {
    for (int i = 0; i < card; ++i) {
      cls[i] =
        Class.forName(Text.readString(in)).asSubclass(InputSplit.class);
    }
    for (int i = 0; i < card; ++i) {
      splits[i] = ReflectionUtils.newInstance(cls[i], null);
      SerializationFactory factory = new SerializationFactory(conf);
      Deserializer deserializer = factory.getDeserializer(cls[i]);
      deserializer.open((DataInputStream)in);
      splits[i] = (InputSplit)deserializer.deserialize(splits[i]);
    }
  } catch (ClassNotFoundException e) {
    throw new IOException("Failed split init", e);
  }
}
ReduceContextImpl.java 文件源码 项目:hadoop 阅读 24 收藏 0 点赞 0 评论 0
@Override
public void mark() throws IOException {
  if (getBackupStore() == null) {
    backupStore = new BackupStore<KEYIN,VALUEIN>(conf, taskid);
  }
  isMarked = true;
  if (!inReset) {
    backupStore.reinitialize();
    if (currentKeyLength == -1) {
      // The user has not called next() for this iterator yet, so
      // there is no current record to mark and copy to backup store.
      return;
    }
    assert (currentValueLength != -1);
    int requestedSize = currentKeyLength + currentValueLength + 
      WritableUtils.getVIntSize(currentKeyLength) +
      WritableUtils.getVIntSize(currentValueLength);
    DataOutputStream out = backupStore.getOutputStream(requestedSize);
    writeFirstKeyValueBytes(out);
    backupStore.updateCounters(requestedSize);
  } else {
    backupStore.mark();
  }
}
FieldEncryptor.java 文件源码 项目:PACE 阅读 19 收藏 0 点赞 0 评论 0
/**
 * Retrieve a field encryption key to use in <strong>decrypting</strong> the field.
 * <p>
 * Metadata can be read from the DataInput object. All meta-data that was written to the stream should be read out, regardless if it is used.
 *
 * @param visibility
 *          Visibility expression for the field.
 * @param in
 *          Stream from which metadata is read.
 * @return Field encryption key.
 * @throws IOException
 *           Not actually thrown.
 */
private byte[] getKey(ColumnVisibility visibility, DataInput in) throws IOException {
  if (config.encryptUsingVisibility) {
    if (visibility.getParseTree().getType() != NodeType.EMPTY) {
      // Rebuild the key from the shares created based on the visibility expression.
      byte[] key = readVisibilityShare(visibility.getParseTree(), visibility.getExpression(), in, false);

      if (key == null) {
        throw new IllegalKeyRequestException();
      }
      return key;
    } else {
      return new byte[config.keyLength];
    }
  } else {
    int version = WritableUtils.readVInt(in);
    return keys.getKey(config.keyId, version, config.keyLength);
  }
}
GridmixSplit.java 文件源码 项目:hadoop 阅读 25 收藏 0 点赞 0 评论 0
@Override
public void write(DataOutput out) throws IOException {
  super.write(out);
  WritableUtils.writeVInt(out, id);
  WritableUtils.writeVInt(out, maps);
  WritableUtils.writeVLong(out, inputRecords);
  WritableUtils.writeVLong(out, outputBytes);
  WritableUtils.writeVLong(out, outputRecords);
  WritableUtils.writeVLong(out, maxMemory);
  WritableUtils.writeVInt(out, reduces);
  for (int i = 0; i < reduces; ++i) {
    out.writeDouble(reduceBytes[i]);
    out.writeDouble(reduceRecords[i]);
  }
  WritableUtils.writeVInt(out, nSpec);
  for (int i = 0; i < nSpec; ++i) {
    WritableUtils.writeVLong(out, reduceOutputBytes[i]);
    WritableUtils.writeVLong(out, reduceOutputRecords[i]);
  }
}
QueueInfo.java 文件源码 项目:hadoop 阅读 17 收藏 0 点赞 0 评论 0
@Override
public void write(DataOutput out) throws IOException {
  Text.writeString(out, queueName);
  WritableUtils.writeEnum(out, queueState);

  if(schedulingInfo!= null) {
    Text.writeString(out, schedulingInfo);
  }else {
    Text.writeString(out, "N/A");
  }
  out.writeInt(stats.length);
  for (JobStatus stat : stats) {
    stat.write(out);
  }
  out.writeInt(children.size());
  for(QueueInfo childQueueInfo : children) {
    childQueueInfo.write(out);
  }
}
FileSystemCounterGroup.java 文件源码 项目:hadoop 阅读 27 收藏 0 点赞 0 评论 0
/**
 * FileSystemGroup ::= #scheme (scheme #counter (key value)*)*
 */
@Override
public void write(DataOutput out) throws IOException {
  WritableUtils.writeVInt(out, map.size()); // #scheme
  for (Map.Entry<String, Object[]> entry : map.entrySet()) {
    WritableUtils.writeString(out, entry.getKey()); // scheme
    // #counter for the above scheme
    WritableUtils.writeVInt(out, numSetCounters(entry.getValue()));
    for (Object counter : entry.getValue()) {
      if (counter == null) continue;
      @SuppressWarnings("unchecked")
      FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter();
      WritableUtils.writeVInt(out, c.key.ordinal());  // key
      WritableUtils.writeVLong(out, c.getValue());    // value
    }
  }
}
AbstractDelegationTokenIdentifier.java 文件源码 项目:hadoop-oss 阅读 22 收藏 0 点赞 0 评论 0
@VisibleForTesting
void writeImpl(DataOutput out) throws IOException {
  out.writeByte(VERSION);
  owner.write(out);
  renewer.write(out);
  realUser.write(out);
  WritableUtils.writeVLong(out, issueDate);
  WritableUtils.writeVLong(out, maxDate);
  WritableUtils.writeVInt(out, sequenceNumber);
  WritableUtils.writeVInt(out, masterKeyId);
}
DelegationKey.java 文件源码 项目:hadoop-oss 阅读 18 收藏 0 点赞 0 评论 0
/**
 */
@Override
public void readFields(DataInput in) throws IOException {
  keyId = WritableUtils.readVInt(in);
  expiryDate = WritableUtils.readVLong(in);
  int len = WritableUtils.readVIntInRange(in, -1, MAX_KEY_LEN);
  if (len == -1) {
    keyBytes = null;
  } else {
    keyBytes = new byte[len];
    in.readFully(keyBytes);
  }
}
PrefixTreeEncoder.java 文件源码 项目:ditb 阅读 19 收藏 0 点赞 0 评论 0
/***************** internal add methods ************************/

  private void addAfterRowFamilyQualifier(Cell cell){
    // timestamps
    timestamps[totalCells] = cell.getTimestamp();
    timestampEncoder.add(cell.getTimestamp());

    // memstore timestamps
    if (includeMvccVersion) {
      mvccVersions[totalCells] = cell.getMvccVersion();
      mvccVersionEncoder.add(cell.getMvccVersion());
      totalUnencodedBytes += WritableUtils.getVIntSize(cell.getMvccVersion());
    }else{
      //must overwrite in case there was a previous version in this array slot
      mvccVersions[totalCells] = 0L;
      if(totalCells == 0){//only need to do this for the first cell added
        mvccVersionEncoder.add(0L);
      }
      //totalUncompressedBytes += 0;//mvccVersion takes zero bytes when disabled
    }

    // types
    typeBytes[totalCells] = cell.getTypeByte();
    cellTypeEncoder.add(cell.getTypeByte());

    // values
    totalValueBytes += cell.getValueLength();
    // double the array each time we run out of space
    values = ArrayUtils.growIfNecessary(values, totalValueBytes, 2 * totalValueBytes);
    CellUtil.copyValueTo(cell, values, valueOffsets[totalCells]);
    if (cell.getValueLength() > maxValueLength) {
      maxValueLength = cell.getValueLength();
    }
    valueOffsets[totalCells + 1] = totalValueBytes;

    // general
    totalUnencodedBytes += KeyValueUtil.length(cell);
    ++totalCells;
  }
Bytes.java 文件源码 项目:ditb 阅读 27 收藏 0 点赞 0 评论 0
/**
 * Read byte-array written with a WritableableUtils.vint prefix.
 * @param in Input to read from.
 * @return byte array read off <code>in</code>
 * @throws IOException e
 */
public static byte [] readByteArray(final DataInput in)
throws IOException {
  int len = WritableUtils.readVInt(in);
  if (len < 0) {
    throw new NegativeArraySizeException(Integer.toString(len));
  }
  byte [] result = new byte[len];
  in.readFully(result, 0, len);
  return result;
}
Configuration.java 文件源码 项目:hadoop-oss 阅读 27 收藏 0 点赞 0 评论 0
@Override
public void readFields(DataInput in) throws IOException {
  clear();
  int size = WritableUtils.readVInt(in);
  for(int i=0; i < size; ++i) {
    String key = org.apache.hadoop.io.Text.readString(in);
    String value = org.apache.hadoop.io.Text.readString(in);
    set(key, value); 
    String sources[] = WritableUtils.readCompressedStringArray(in);
    if(sources != null) {
      updatingResource.put(key, sources);
    }
  }
}
Server.java 文件源码 项目:hadoop-oss 阅读 26 收藏 0 点赞 0 评论 0
/**
 * Setup response for the IPC Call on Fatal Error from a 
 * client that is using old version of Hadoop.
 * The response is serialized using the previous protocol's response
 * layout.
 * 
 * @param response buffer to serialize the response into
 * @param call {@link Call} to which we are setting up the response
 * @param rv return value for the IPC Call, if the call was successful
 * @param errorClass error class, if the the call failed
 * @param error error message, if the call failed
 * @throws IOException
 */
private void setupResponseOldVersionFatal(ByteArrayOutputStream response, 
                           Call call,
                           Writable rv, String errorClass, String error) 
throws IOException {
  final int OLD_VERSION_FATAL_STATUS = -1;
  response.reset();
  DataOutputStream out = new DataOutputStream(response);
  out.writeInt(call.callId);                // write call id
  out.writeInt(OLD_VERSION_FATAL_STATUS);   // write FATAL_STATUS
  WritableUtils.writeString(out, errorClass);
  WritableUtils.writeString(out, error);
  call.setResponse(ByteBuffer.wrap(response.toByteArray()));
}
SplitMetaInfoReader.java 文件源码 项目:hadoop 阅读 16 收藏 0 点赞 0 评论 0
public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo(
    JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir) 
throws IOException {
  long maxMetaInfoSize = conf.getLong(MRJobConfig.SPLIT_METAINFO_MAXSIZE,
      MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
  Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir);
  String jobSplitFile = JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString();
  FileStatus fStatus = fs.getFileStatus(metaSplitFile);
  if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
    throw new IOException("Split metadata size exceeded " +
        maxMetaInfoSize +". Aborting job " + jobId);
  }
  FSDataInputStream in = fs.open(metaSplitFile);
  byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
  in.readFully(header);
  if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
    throw new IOException("Invalid header on split file");
  }
  int vers = WritableUtils.readVInt(in);
  if (vers != JobSplit.META_SPLIT_VERSION) {
    in.close();
    throw new IOException("Unsupported split version " + vers);
  }
  int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values
  JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = 
    new JobSplit.TaskSplitMetaInfo[numSplits];
  for (int i = 0; i < numSplits; i++) {
    JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
    splitMetaInfo.readFields(in);
    JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
        jobSplitFile, 
        splitMetaInfo.getStartOffset());
    allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex, 
        splitMetaInfo.getLocations(), 
        splitMetaInfo.getInputDataLength());
  }
  in.close();
  return allSplitMetaInfo;
}
CopyListingFileStatus.java 文件源码 项目:circus-train 阅读 20 收藏 0 点赞 0 评论 0
@Override
public void readFields(DataInput in) throws IOException {
  super.readFields(in);
  byte aclEntriesSize = in.readByte();
  if (aclEntriesSize != NO_ACL_ENTRIES) {
    aclEntries = Lists.newArrayListWithCapacity(aclEntriesSize);
    for (int i = 0; i < aclEntriesSize; ++i) {
      aclEntries.add(new AclEntry.Builder()
          .setScope(ACL_ENTRY_SCOPES[in.readByte()])
          .setType(ACL_ENTRY_TYPES[in.readByte()])
          .setName(WritableUtils.readString(in))
          .setPermission(FS_ACTIONS[in.readByte()])
          .build());
    }
  } else {
    aclEntries = null;
  }

  int xAttrsSize = in.readInt();
  if (xAttrsSize != NO_XATTRS) {
    xAttrs = Maps.newHashMap();
    for (int i = 0; i < xAttrsSize; ++i) {
      final String name = WritableUtils.readString(in);
      final int valueLen = in.readInt();
      byte[] value = null;
      if (valueLen > -1) {
        value = new byte[valueLen];
        if (valueLen > 0) {
          in.readFully(value);
        }
      }
      xAttrs.put(name, value);
    }
  } else {
    xAttrs = null;
  }
}
Configuration.java 文件源码 项目:spark_deep 阅读 23 收藏 0 点赞 0 评论 0
@Override
public void readFields(DataInput in) throws IOException {
  clear();
  int size = WritableUtils.readVInt(in);
  for(int i=0; i < size; ++i) {
    set(org.apache.hadoop.io.Text.readString(in),
          org.apache.hadoop.io.Text.readString(in));
  }
}
Configuration.java 文件源码 项目:spark_deep 阅读 28 收藏 0 点赞 0 评论 0
public void write(DataOutput out) throws IOException {
  Properties props = getProps();
  WritableUtils.writeVInt(out, props.size());
  for(Map.Entry<Object, Object> item: props.entrySet()) {
    org.apache.hadoop.io.Text.writeString(out, (String) item.getKey());
    org.apache.hadoop.io.Text.writeString(out, (String) item.getValue());
  }
}


问题


面经


文章

微信
公众号

扫码关注公众号