java类org.apache.hadoop.io.WritableUtils的实例源码

Bytes.java 文件源码 项目:ditb 阅读 24 收藏 0 点赞 0 评论 0
/**
 * Reads a zero-compressed encoded long from input buffer and returns it.
 * @param buffer Binary array
 * @param offset Offset into array at which vint begins.
 * @return deserialized long from buffer.
 */
public static long readAsVLong(final byte [] buffer, final int offset) {
  byte firstByte = buffer[offset];
  int len = WritableUtils.decodeVIntSize(firstByte);
  if (len == 1) {
    return firstByte;
  }
  long i = 0;
  for (int idx = 0; idx < len-1; idx++) {
    byte b = buffer[offset + 1 + idx];
    i = i << 8;
    i = i | (b & 0xFF);
  }
  return (WritableUtils.isNegativeVInt(firstByte) ? ~i : i);
}
LobFile.java 文件源码 项目:aliyun-maxcompute-data-collectors 阅读 35 收藏 0 点赞 0 评论 0
public void readFields(DataInput in) throws IOException {
  int numEntries = WritableUtils.readVInt(in);
  entries.clear();
  for (int i = 0; i < numEntries; i++) {
    String key = Text.readString(in);
    BytesWritable val = new BytesWritable();
    val.readFields(in);
    entries.put(key, val);
  }
}
ByteBufferUtils.java 文件源码 项目:ditb 阅读 27 收藏 0 点赞 0 评论 0
/**
 * Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a
 * {@link ByteBuffer}.
 */
public static long readVLong(ByteBuffer in) {
  byte firstByte = in.get();
  int len = WritableUtils.decodeVIntSize(firstByte);
  if (len == 1) {
    return firstByte;
  }
  long i = 0;
  for (int idx = 0; idx < len-1; idx++) {
    byte b = in.get();
    i = i << 8;
    i = i | (b & 0xFF);
  }
  return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
}
LobFile.java 文件源码 项目:aliyun-maxcompute-data-collectors 阅读 26 收藏 0 点赞 0 评论 0
public void write(DataOutput out) throws IOException {
  // Write the SEGMENT_HEADER_ID to distinguish this from a LobRecord.
  WritableUtils.writeVLong(out, SEGMENT_HEADER_ID);

  // The length of the main body of the segment is the length of the
  // data byte array.
  int segmentBytesLen = recordLenBytes.getLength();
  WritableUtils.writeVLong(out, segmentBytesLen);

  // Write the body of the segment.
  out.write(recordLenBytes.getBytes(), 0, segmentBytesLen);
}
KeyValueCompression.java 文件源码 项目:ditb 阅读 22 收藏 0 点赞 0 评论 0
/**
 * Uncompresses a KeyValue from a DataInput and returns it.
 * 
 * @param in the DataInput
 * @param readContext the compressionContext to use.
 * @return an uncompressed KeyValue
 * @throws IOException
 */

public static KeyValue readKV(DataInput in, CompressionContext readContext)
    throws IOException {
  int keylength = WritableUtils.readVInt(in);
  int vlength = WritableUtils.readVInt(in);
  int tagsLength = WritableUtils.readVInt(in);
  int length = (int) KeyValue.getKeyValueDataStructureSize(keylength, vlength, tagsLength);

  byte[] backingArray = new byte[length];
  int pos = 0;
  pos = Bytes.putInt(backingArray, pos, keylength);
  pos = Bytes.putInt(backingArray, pos, vlength);

  // the row
  int elemLen = Compressor.uncompressIntoArray(backingArray,
      pos + Bytes.SIZEOF_SHORT, in, readContext.rowDict);
  checkLength(elemLen, Short.MAX_VALUE);
  pos = Bytes.putShort(backingArray, pos, (short)elemLen);
  pos += elemLen;

  // family
  elemLen = Compressor.uncompressIntoArray(backingArray,
      pos + Bytes.SIZEOF_BYTE, in, readContext.familyDict);
  checkLength(elemLen, Byte.MAX_VALUE);
  pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
  pos += elemLen;

  // qualifier
  elemLen = Compressor.uncompressIntoArray(backingArray, pos, in,
      readContext.qualifierDict);
  pos += elemLen;

  // the rest
  in.readFully(backingArray, pos, length - pos);

  return new KeyValue(backingArray, 0, length);
}
AbstractDelegationTokenIdentifier.java 文件源码 项目:hadoop 阅读 25 收藏 0 点赞 0 评论 0
@VisibleForTesting
void writeImpl(DataOutput out) throws IOException {
  out.writeByte(VERSION);
  owner.write(out);
  renewer.write(out);
  realUser.write(out);
  WritableUtils.writeVLong(out, issueDate);
  WritableUtils.writeVLong(out, maxDate);
  WritableUtils.writeVInt(out, sequenceNumber);
  WritableUtils.writeVInt(out, masterKeyId);
}
LobFile.java 文件源码 项目:aliyun-maxcompute-data-collectors 阅读 22 收藏 0 点赞 0 评论 0
public void write(DataOutput out) throws IOException {
  // Start with the record type id.
  WritableUtils.writeVLong(out, INDEX_TABLE_ID);

  // Then the count of the records.
  WritableUtils.writeVInt(out, tableEntries.size());

  // Followed by the table itself.
  for (IndexTableEntry entry : tableEntries) {
    entry.write(out);
  }
}
TaskCompletionEvent.java 文件源码 项目:hadoop 阅读 21 收藏 0 点赞 0 评论 0
public void write(DataOutput out) throws IOException {
  taskId.write(out); 
  WritableUtils.writeVInt(out, idWithinJob);
  out.writeBoolean(isMap);
  WritableUtils.writeEnum(out, status); 
  WritableUtils.writeString(out, taskTrackerHttp);
  WritableUtils.writeVInt(out, taskRunTime);
  WritableUtils.writeVInt(out, eventId);
}
TestGridMixClasses.java 文件源码 项目:hadoop 阅读 22 收藏 0 点赞 0 评论 0
@Test (timeout=3000)
public void testLoadJobLoadSortComparator() throws Exception {
  LoadJob.LoadSortComparator test = new LoadJob.LoadSortComparator();

  ByteArrayOutputStream data = new ByteArrayOutputStream();
  DataOutputStream dos = new DataOutputStream(data);
  WritableUtils.writeVInt(dos, 2);
  WritableUtils.writeVInt(dos, 1);
  WritableUtils.writeVInt(dos, 4);
  WritableUtils.writeVInt(dos, 7);
  WritableUtils.writeVInt(dos, 4);

  byte[] b1 = data.toByteArray();

  byte[] b2 = data.toByteArray();

  // the same data should be equals
  assertEquals(0, test.compare(b1, 0, 1, b2, 0, 1));
  b2[2] = 5;
  // compare like GridMixKey first byte: shift count -1=4-5
  assertEquals(-1, test.compare(b1, 0, 1, b2, 0, 1));
  b2[2] = 2;
  // compare like GridMixKey first byte: shift count 2=4-2
  assertEquals(2, test.compare(b1, 0, 1, b2, 0, 1));
  // compare arrays by first byte witch offset (2-1) because 4==4
  b2[2] = 4;
  assertEquals(1, test.compare(b1, 0, 1, b2, 1, 1));

}
LobFile.java 文件源码 项目:aliyun-maxcompute-data-collectors 阅读 34 收藏 0 点赞 0 评论 0
@Override
/** {@inheritDoc} */
public InputStream readBlobRecord() throws IOException {
  if (!isRecordAvailable()) {
    // we're not currently aligned on a record-start.
    // Try to get the next one.
    if (!next()) {
      // No more records available.
      throw new EOFException("End of file reached.");
    }
  }

  // Ensure any previously-open user record stream is closed.
  closeUserStream();

  // Mark this record as consumed.
  this.isAligned = false;

  // The length of the stream we can return to the user is
  // the indexRecordLen minus the length of any per-record headers.
  // That includes the RecordStartMark, the entryId, and the claimedLen.
  long streamLen = this.indexRecordLen - RecordStartMark.START_MARK_LENGTH
      - WritableUtils.getVIntSize(this.curEntryId)
      - WritableUtils.getVIntSize(this.claimedRecordLen);
  LOG.debug("Yielding stream to user with length " + streamLen);
  this.userInputStream = new FixedLengthInputStream(this.dataIn, streamLen);
  if (this.codec != null) {
    // The user needs to decompress the data; wrap the InputStream.
    decompressor.reset();
    this.userInputStream = new DecompressorStream(
        this.userInputStream, decompressor);
  }
  return this.userInputStream;
}


问题


面经


文章

微信
公众号

扫码关注公众号