/**
* Write an array of blocks as compactly as possible. This uses
* delta-encoding for the generation stamp and size, following
* the principle that genstamp increases relatively slowly,
* and size is equal for all but the last block of a file.
*/
public static void writeCompactBlockArray(
Block[] blocks, DataOutputStream out) throws IOException {
WritableUtils.writeVInt(out, blocks.length);
Block prev = null;
for (Block b : blocks) {
long szDelta = b.getNumBytes() -
(prev != null ? prev.getNumBytes() : 0);
long gsDelta = b.getGenerationStamp() -
(prev != null ? prev.getGenerationStamp() : 0);
out.writeLong(b.getBlockId()); // blockid is random
WritableUtils.writeVLong(out, szDelta);
WritableUtils.writeVLong(out, gsDelta);
prev = b;
}
}
java类org.apache.hadoop.io.WritableUtils的实例源码
FSImageSerialization.java 文件源码
项目:hadoop
阅读 23
收藏 0
点赞 0
评论 0
TestTFileStreams.java 文件源码
项目:hadoop-oss
阅读 24
收藏 0
点赞 0
评论 0
private long writeRecords(int count, boolean knownKeyLength,
boolean knownValueLength, boolean close) throws IOException {
long rawDataSize = 0;
for (int nx = 0; nx < count; nx++) {
String key = TestTFileByteArrays.composeSortedKey("key", nx);
DataOutputStream outKey =
writer.prepareAppendKey(knownKeyLength ? key.length() : -1);
outKey.write(key.getBytes());
outKey.close();
String value = "value" + nx;
DataOutputStream outValue =
writer.prepareAppendValue(knownValueLength ? value.length() : -1);
outValue.write(value.getBytes());
outValue.close();
rawDataSize +=
WritableUtils.getVIntSize(key.getBytes().length)
+ key.getBytes().length
+ WritableUtils.getVIntSize(value.getBytes().length)
+ value.getBytes().length;
}
if (close) {
closeOutput();
}
return rawDataSize;
}
Server.java 文件源码
项目:hadoop
阅读 32
收藏 0
点赞 0
评论 0
/**
* Setup response for the IPC Call on Fatal Error from a
* client that is using old version of Hadoop.
* The response is serialized using the previous protocol's response
* layout.
*
* @param response buffer to serialize the response into
* @param call {@link Call} to which we are setting up the response
* @param rv return value for the IPC Call, if the call was successful
* @param errorClass error class, if the the call failed
* @param error error message, if the call failed
* @throws IOException
*/
private void setupResponseOldVersionFatal(ByteArrayOutputStream response,
Call call,
Writable rv, String errorClass, String error)
throws IOException {
final int OLD_VERSION_FATAL_STATUS = -1;
response.reset();
DataOutputStream out = new DataOutputStream(response);
out.writeInt(call.callId); // write call id
out.writeInt(OLD_VERSION_FATAL_STATUS); // write FATAL_STATUS
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
if (call.connection.useWrap) {
wrapWithSasl(response, call);
}
call.setResponse(ByteBuffer.wrap(response.toByteArray()));
}
HFileReaderV2.java 文件源码
项目:ditb
阅读 18
收藏 0
点赞 0
评论 0
/**
* Actually do the mvcc read. Does no checks.
* @param position
*/
private void _readMvccVersion(final int position) {
// This is Bytes#bytesToVint inlined so can save a few instructions in this hot method; i.e.
// previous if one-byte vint, we'd redo the vint call to find int size.
// Also the method is kept small so can be inlined.
byte firstByte = blockBuffer.array()[position];
int len = WritableUtils.decodeVIntSize(firstByte);
if (len == 1) {
this.currMemstoreTS = firstByte;
} else {
long i = 0;
for (int idx = 0; idx < len - 1; idx++) {
byte b = blockBuffer.array()[position + 1 + idx];
i = i << 8;
i = i | (b & 0xFF);
}
currMemstoreTS = (WritableUtils.isNegativeVInt(firstByte) ? ~i : i);
}
this.currMemstoreTSLen = len;
}
Server.java 文件源码
项目:spark_deep
阅读 27
收藏 0
点赞 0
评论 0
/**
* Setup response for the IPC Call.
*
* @param response buffer to serialize the response into
* @param call {@link Call} to which we are setting up the response
* @param status {@link Status} of the IPC call
* @param rv return value for the IPC Call, if the call was successful
* @param errorClass error class, if the the call failed
* @param error error message, if the call failed
* @throws IOException
*/
private void setupResponse(ByteArrayOutputStream response,
Call call, Status status,
Writable rv, String errorClass, String error)
throws IOException {
response.reset();
DataOutputStream out = new DataOutputStream(response);
out.writeInt(call.id); // write call id
out.writeInt(status.state); // write status
if (status == Status.SUCCESS) {
rv.write(out);
} else {
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
/*if (call.connection.useWrap) {
wrapWithSasl(response, call);
}*/
call.setResponse(ByteBuffer.wrap(response.toByteArray()));
}
ColumnInfo.java 文件源码
项目:ditb
阅读 19
收藏 0
点赞 0
评论 0
@Override public void readFields(DataInput in) throws IOException {
family = WritableUtils.readCompressedByteArray(in);
qualifier = WritableUtils.readCompressedByteArray(in);
dataType = DataType.valueOf(WritableUtils.readString(in));
isIndex = WritableUtils.readVInt(in) == 1;
hashCode = calHashCode();
}
LoadSplit.java 文件源码
项目:hadoop
阅读 23
收藏 0
点赞 0
评论 0
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
WritableUtils.writeVInt(out, id);
WritableUtils.writeVInt(out, maps);
WritableUtils.writeVLong(out, inputRecords);
WritableUtils.writeVLong(out, outputBytes);
WritableUtils.writeVLong(out, outputRecords);
WritableUtils.writeVLong(out, maxMemory);
WritableUtils.writeVInt(out, reduces);
for (int i = 0; i < reduces; ++i) {
out.writeDouble(reduceBytes[i]);
out.writeDouble(reduceRecords[i]);
}
WritableUtils.writeVInt(out, nSpec);
for (int i = 0; i < nSpec; ++i) {
WritableUtils.writeVLong(out, reduceOutputBytes[i]);
WritableUtils.writeVLong(out, reduceOutputRecords[i]);
}
mapMetrics.write(out);
int numReduceMetrics = (reduceMetrics == null) ? 0 : reduceMetrics.length;
WritableUtils.writeVInt(out, numReduceMetrics);
for (int i = 0; i < numReduceMetrics; ++i) {
reduceMetrics[i].write(out);
}
}
HFileBlockIndex.java 文件源码
项目:ditb
阅读 24
收藏 0
点赞 0
评论 0
/**
* Adds a new entry to this block index chunk.
*
* @param firstKey the first key in the block pointed to by this entry
* @param blockOffset the offset of the next-level block pointed to by this
* entry
* @param onDiskDataSize the on-disk data of the block pointed to by this
* entry, including header size
* @param curTotalNumSubEntries if this chunk is the root index chunk under
* construction, this specifies the current total number of
* sub-entries in all leaf-level chunks, including the one
* corresponding to the second-level entry being added.
*/
void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
long curTotalNumSubEntries) {
// Record the offset for the secondary index
secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
+ firstKey.length;
curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
+ WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
blockKeys.add(firstKey);
blockOffsets.add(blockOffset);
onDiskDataSizes.add(onDiskDataSize);
if (curTotalNumSubEntries != -1) {
numSubEntriesAt.add(curTotalNumSubEntries);
// Make sure the parallel arrays are in sync.
if (numSubEntriesAt.size() != blockKeys.size()) {
throw new IllegalStateException("Only have key/value count " +
"stats for " + numSubEntriesAt.size() + " block index " +
"entries out of " + blockKeys.size());
}
}
}
LobFile.java 文件源码
项目:aliyun-maxcompute-data-collectors
阅读 23
收藏 0
点赞 0
评论 0
public void readFields(DataInput in) throws IOException {
// After the RecordStartMark, we expect to get a SEGMENT_HEADER_ID (-1).
long segmentId = WritableUtils.readVLong(in);
if (SEGMENT_HEADER_ID != segmentId) {
throw new IOException("Expected segment header id " + SEGMENT_HEADER_ID
+ "; got " + segmentId);
}
// Get the length of the rest of the segment, in bytes.
long length = WritableUtils.readVLong(in);
// Now read the actual main byte array.
if (length > Integer.MAX_VALUE) {
throw new IOException("Unexpected oversize data array length: "
+ length);
} else if (length < 0) {
throw new IOException("Unexpected undersize data array length: "
+ length);
}
byte [] segmentData = new byte[(int) length];
in.readFully(segmentData);
recordLenBytes = new BytesWritable(segmentData);
reset(); // Reset the iterator allowing the user to yield offset/lengths.
}
HbaseObjectWritableFor96Migration.java 文件源码
项目:ditb
阅读 22
收藏 0
点赞 0
评论 0
/** Reads and returns the class as written by {@link #writeClass(DataOutput, Class)} */
static Class<?> readClass(Configuration conf, DataInput in) throws IOException {
Class<?> instanceClass = null;
int b = (byte)WritableUtils.readVInt(in);
if (b == NOT_ENCODED) {
String className = Text.readString(in);
try {
instanceClass = getClassByName(conf, className);
} catch (ClassNotFoundException e) {
LOG.error("Can't find class " + className, e);
throw new IOException("Can't find class " + className, e);
}
} else {
instanceClass = CODE_TO_CLASS.get(b);
}
return instanceClass;
}
InMemoryWriter.java 文件源码
项目:hadoop
阅读 23
收藏 0
点赞 0
评论 0
public void append(DataInputBuffer key, DataInputBuffer value)
throws IOException {
int keyLength = key.getLength() - key.getPosition();
if (keyLength < 0) {
throw new IOException("Negative key-length not allowed: " + keyLength +
" for " + key);
}
int valueLength = value.getLength() - value.getPosition();
if (valueLength < 0) {
throw new IOException("Negative value-length not allowed: " +
valueLength + " for " + value);
}
WritableUtils.writeVInt(out, keyLength);
WritableUtils.writeVInt(out, valueLength);
out.write(key.getData(), key.getPosition(), keyLength);
out.write(value.getData(), value.getPosition(), valueLength);
}
TaskStatus.java 文件源码
项目:hadoop
阅读 20
收藏 0
点赞 0
评论 0
public void readFields(DataInput in) throws IOException {
this.taskid.readFields(in);
setProgress(in.readFloat());
this.numSlots = in.readInt();
this.runState = WritableUtils.readEnum(in, State.class);
setDiagnosticInfo(StringInterner.weakIntern(Text.readString(in)));
setStateString(StringInterner.weakIntern(Text.readString(in)));
this.phase = WritableUtils.readEnum(in, Phase.class);
this.startTime = in.readLong();
this.finishTime = in.readLong();
counters = new Counters();
this.includeAllCounters = in.readBoolean();
this.outputSize = in.readLong();
counters.readFields(in);
nextRecordRange.readFields(in);
}
Compressor.java 文件源码
项目:ditb
阅读 25
收藏 0
点赞 0
评论 0
/**
* Compresses and writes an array to a DataOutput
*
* @param data the array to write.
* @param out the DataOutput to write into
* @param dict the dictionary to use for compression
*/
@Deprecated
static void writeCompressed(byte[] data, int offset, int length,
DataOutput out, Dictionary dict)
throws IOException {
short dictIdx = Dictionary.NOT_IN_DICTIONARY;
if (dict != null) {
dictIdx = dict.findEntry(data, offset, length);
}
if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
// not in dict
out.writeByte(Dictionary.NOT_IN_DICTIONARY);
WritableUtils.writeVInt(out, length);
out.write(data, offset, length);
} else {
out.writeShort(dictIdx);
}
}
GridmixKey.java 文件源码
项目:hadoop
阅读 18
收藏 0
点赞 0
评论 0
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
di.reset(b1, s1, l1);
final int x1 = WritableUtils.readVInt(di);
di.reset(b2, s2, l2);
final int x2 = WritableUtils.readVInt(di);
final int ret = (b1[s1 + x1] != b2[s2 + x2])
? b1[s1 + x1] - b2[s2 + x2]
: super.compare(b1, s1, x1, b2, s2, x2);
di.reset(reset, 0, 0);
return ret;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
CompositeInputSplit.java 文件源码
项目:hadoop
阅读 21
收藏 0
点赞 0
评论 0
/**
* {@inheritDoc}
* @throws IOException If the child InputSplit cannot be read, typically
* for failing access checks.
*/
@SuppressWarnings("unchecked") // Generic array assignment
public void readFields(DataInput in) throws IOException {
int card = WritableUtils.readVInt(in);
if (splits == null || splits.length != card) {
splits = new InputSplit[card];
}
Class<? extends InputSplit>[] cls = new Class[card];
try {
for (int i = 0; i < card; ++i) {
cls[i] =
Class.forName(Text.readString(in)).asSubclass(InputSplit.class);
}
for (int i = 0; i < card; ++i) {
splits[i] = ReflectionUtils.newInstance(cls[i], null);
SerializationFactory factory = new SerializationFactory(conf);
Deserializer deserializer = factory.getDeserializer(cls[i]);
deserializer.open((DataInputStream)in);
splits[i] = (InputSplit)deserializer.deserialize(splits[i]);
}
} catch (ClassNotFoundException e) {
throw new IOException("Failed split init", e);
}
}
ReduceContextImpl.java 文件源码
项目:hadoop
阅读 24
收藏 0
点赞 0
评论 0
@Override
public void mark() throws IOException {
if (getBackupStore() == null) {
backupStore = new BackupStore<KEYIN,VALUEIN>(conf, taskid);
}
isMarked = true;
if (!inReset) {
backupStore.reinitialize();
if (currentKeyLength == -1) {
// The user has not called next() for this iterator yet, so
// there is no current record to mark and copy to backup store.
return;
}
assert (currentValueLength != -1);
int requestedSize = currentKeyLength + currentValueLength +
WritableUtils.getVIntSize(currentKeyLength) +
WritableUtils.getVIntSize(currentValueLength);
DataOutputStream out = backupStore.getOutputStream(requestedSize);
writeFirstKeyValueBytes(out);
backupStore.updateCounters(requestedSize);
} else {
backupStore.mark();
}
}
FieldEncryptor.java 文件源码
项目:PACE
阅读 19
收藏 0
点赞 0
评论 0
/**
* Retrieve a field encryption key to use in <strong>decrypting</strong> the field.
* <p>
* Metadata can be read from the DataInput object. All meta-data that was written to the stream should be read out, regardless if it is used.
*
* @param visibility
* Visibility expression for the field.
* @param in
* Stream from which metadata is read.
* @return Field encryption key.
* @throws IOException
* Not actually thrown.
*/
private byte[] getKey(ColumnVisibility visibility, DataInput in) throws IOException {
if (config.encryptUsingVisibility) {
if (visibility.getParseTree().getType() != NodeType.EMPTY) {
// Rebuild the key from the shares created based on the visibility expression.
byte[] key = readVisibilityShare(visibility.getParseTree(), visibility.getExpression(), in, false);
if (key == null) {
throw new IllegalKeyRequestException();
}
return key;
} else {
return new byte[config.keyLength];
}
} else {
int version = WritableUtils.readVInt(in);
return keys.getKey(config.keyId, version, config.keyLength);
}
}
GridmixSplit.java 文件源码
项目:hadoop
阅读 25
收藏 0
点赞 0
评论 0
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
WritableUtils.writeVInt(out, id);
WritableUtils.writeVInt(out, maps);
WritableUtils.writeVLong(out, inputRecords);
WritableUtils.writeVLong(out, outputBytes);
WritableUtils.writeVLong(out, outputRecords);
WritableUtils.writeVLong(out, maxMemory);
WritableUtils.writeVInt(out, reduces);
for (int i = 0; i < reduces; ++i) {
out.writeDouble(reduceBytes[i]);
out.writeDouble(reduceRecords[i]);
}
WritableUtils.writeVInt(out, nSpec);
for (int i = 0; i < nSpec; ++i) {
WritableUtils.writeVLong(out, reduceOutputBytes[i]);
WritableUtils.writeVLong(out, reduceOutputRecords[i]);
}
}
QueueInfo.java 文件源码
项目:hadoop
阅读 17
收藏 0
点赞 0
评论 0
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, queueName);
WritableUtils.writeEnum(out, queueState);
if(schedulingInfo!= null) {
Text.writeString(out, schedulingInfo);
}else {
Text.writeString(out, "N/A");
}
out.writeInt(stats.length);
for (JobStatus stat : stats) {
stat.write(out);
}
out.writeInt(children.size());
for(QueueInfo childQueueInfo : children) {
childQueueInfo.write(out);
}
}
FileSystemCounterGroup.java 文件源码
项目:hadoop
阅读 27
收藏 0
点赞 0
评论 0
/**
* FileSystemGroup ::= #scheme (scheme #counter (key value)*)*
*/
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, map.size()); // #scheme
for (Map.Entry<String, Object[]> entry : map.entrySet()) {
WritableUtils.writeString(out, entry.getKey()); // scheme
// #counter for the above scheme
WritableUtils.writeVInt(out, numSetCounters(entry.getValue()));
for (Object counter : entry.getValue()) {
if (counter == null) continue;
@SuppressWarnings("unchecked")
FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter();
WritableUtils.writeVInt(out, c.key.ordinal()); // key
WritableUtils.writeVLong(out, c.getValue()); // value
}
}
}
AbstractDelegationTokenIdentifier.java 文件源码
项目:hadoop-oss
阅读 22
收藏 0
点赞 0
评论 0
@VisibleForTesting
void writeImpl(DataOutput out) throws IOException {
out.writeByte(VERSION);
owner.write(out);
renewer.write(out);
realUser.write(out);
WritableUtils.writeVLong(out, issueDate);
WritableUtils.writeVLong(out, maxDate);
WritableUtils.writeVInt(out, sequenceNumber);
WritableUtils.writeVInt(out, masterKeyId);
}
DelegationKey.java 文件源码
项目:hadoop-oss
阅读 18
收藏 0
点赞 0
评论 0
/**
*/
@Override
public void readFields(DataInput in) throws IOException {
keyId = WritableUtils.readVInt(in);
expiryDate = WritableUtils.readVLong(in);
int len = WritableUtils.readVIntInRange(in, -1, MAX_KEY_LEN);
if (len == -1) {
keyBytes = null;
} else {
keyBytes = new byte[len];
in.readFully(keyBytes);
}
}
PrefixTreeEncoder.java 文件源码
项目:ditb
阅读 19
收藏 0
点赞 0
评论 0
/***************** internal add methods ************************/
private void addAfterRowFamilyQualifier(Cell cell){
// timestamps
timestamps[totalCells] = cell.getTimestamp();
timestampEncoder.add(cell.getTimestamp());
// memstore timestamps
if (includeMvccVersion) {
mvccVersions[totalCells] = cell.getMvccVersion();
mvccVersionEncoder.add(cell.getMvccVersion());
totalUnencodedBytes += WritableUtils.getVIntSize(cell.getMvccVersion());
}else{
//must overwrite in case there was a previous version in this array slot
mvccVersions[totalCells] = 0L;
if(totalCells == 0){//only need to do this for the first cell added
mvccVersionEncoder.add(0L);
}
//totalUncompressedBytes += 0;//mvccVersion takes zero bytes when disabled
}
// types
typeBytes[totalCells] = cell.getTypeByte();
cellTypeEncoder.add(cell.getTypeByte());
// values
totalValueBytes += cell.getValueLength();
// double the array each time we run out of space
values = ArrayUtils.growIfNecessary(values, totalValueBytes, 2 * totalValueBytes);
CellUtil.copyValueTo(cell, values, valueOffsets[totalCells]);
if (cell.getValueLength() > maxValueLength) {
maxValueLength = cell.getValueLength();
}
valueOffsets[totalCells + 1] = totalValueBytes;
// general
totalUnencodedBytes += KeyValueUtil.length(cell);
++totalCells;
}
Bytes.java 文件源码
项目:ditb
阅读 27
收藏 0
点赞 0
评论 0
/**
* Read byte-array written with a WritableableUtils.vint prefix.
* @param in Input to read from.
* @return byte array read off <code>in</code>
* @throws IOException e
*/
public static byte [] readByteArray(final DataInput in)
throws IOException {
int len = WritableUtils.readVInt(in);
if (len < 0) {
throw new NegativeArraySizeException(Integer.toString(len));
}
byte [] result = new byte[len];
in.readFully(result, 0, len);
return result;
}
Configuration.java 文件源码
项目:hadoop-oss
阅读 27
收藏 0
点赞 0
评论 0
@Override
public void readFields(DataInput in) throws IOException {
clear();
int size = WritableUtils.readVInt(in);
for(int i=0; i < size; ++i) {
String key = org.apache.hadoop.io.Text.readString(in);
String value = org.apache.hadoop.io.Text.readString(in);
set(key, value);
String sources[] = WritableUtils.readCompressedStringArray(in);
if(sources != null) {
updatingResource.put(key, sources);
}
}
}
Server.java 文件源码
项目:hadoop-oss
阅读 26
收藏 0
点赞 0
评论 0
/**
* Setup response for the IPC Call on Fatal Error from a
* client that is using old version of Hadoop.
* The response is serialized using the previous protocol's response
* layout.
*
* @param response buffer to serialize the response into
* @param call {@link Call} to which we are setting up the response
* @param rv return value for the IPC Call, if the call was successful
* @param errorClass error class, if the the call failed
* @param error error message, if the call failed
* @throws IOException
*/
private void setupResponseOldVersionFatal(ByteArrayOutputStream response,
Call call,
Writable rv, String errorClass, String error)
throws IOException {
final int OLD_VERSION_FATAL_STATUS = -1;
response.reset();
DataOutputStream out = new DataOutputStream(response);
out.writeInt(call.callId); // write call id
out.writeInt(OLD_VERSION_FATAL_STATUS); // write FATAL_STATUS
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
call.setResponse(ByteBuffer.wrap(response.toByteArray()));
}
SplitMetaInfoReader.java 文件源码
项目:hadoop
阅读 16
收藏 0
点赞 0
评论 0
public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo(
JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir)
throws IOException {
long maxMetaInfoSize = conf.getLong(MRJobConfig.SPLIT_METAINFO_MAXSIZE,
MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir);
String jobSplitFile = JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString();
FileStatus fStatus = fs.getFileStatus(metaSplitFile);
if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
throw new IOException("Split metadata size exceeded " +
maxMetaInfoSize +". Aborting job " + jobId);
}
FSDataInputStream in = fs.open(metaSplitFile);
byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
in.readFully(header);
if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
throw new IOException("Invalid header on split file");
}
int vers = WritableUtils.readVInt(in);
if (vers != JobSplit.META_SPLIT_VERSION) {
in.close();
throw new IOException("Unsupported split version " + vers);
}
int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values
JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo =
new JobSplit.TaskSplitMetaInfo[numSplits];
for (int i = 0; i < numSplits; i++) {
JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
splitMetaInfo.readFields(in);
JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
jobSplitFile,
splitMetaInfo.getStartOffset());
allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
splitMetaInfo.getLocations(),
splitMetaInfo.getInputDataLength());
}
in.close();
return allSplitMetaInfo;
}
CopyListingFileStatus.java 文件源码
项目:circus-train
阅读 20
收藏 0
点赞 0
评论 0
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
byte aclEntriesSize = in.readByte();
if (aclEntriesSize != NO_ACL_ENTRIES) {
aclEntries = Lists.newArrayListWithCapacity(aclEntriesSize);
for (int i = 0; i < aclEntriesSize; ++i) {
aclEntries.add(new AclEntry.Builder()
.setScope(ACL_ENTRY_SCOPES[in.readByte()])
.setType(ACL_ENTRY_TYPES[in.readByte()])
.setName(WritableUtils.readString(in))
.setPermission(FS_ACTIONS[in.readByte()])
.build());
}
} else {
aclEntries = null;
}
int xAttrsSize = in.readInt();
if (xAttrsSize != NO_XATTRS) {
xAttrs = Maps.newHashMap();
for (int i = 0; i < xAttrsSize; ++i) {
final String name = WritableUtils.readString(in);
final int valueLen = in.readInt();
byte[] value = null;
if (valueLen > -1) {
value = new byte[valueLen];
if (valueLen > 0) {
in.readFully(value);
}
}
xAttrs.put(name, value);
}
} else {
xAttrs = null;
}
}
Configuration.java 文件源码
项目:spark_deep
阅读 23
收藏 0
点赞 0
评论 0
@Override
public void readFields(DataInput in) throws IOException {
clear();
int size = WritableUtils.readVInt(in);
for(int i=0; i < size; ++i) {
set(org.apache.hadoop.io.Text.readString(in),
org.apache.hadoop.io.Text.readString(in));
}
}
Configuration.java 文件源码
项目:spark_deep
阅读 28
收藏 0
点赞 0
评论 0
public void write(DataOutput out) throws IOException {
Properties props = getProps();
WritableUtils.writeVInt(out, props.size());
for(Map.Entry<Object, Object> item: props.entrySet()) {
org.apache.hadoop.io.Text.writeString(out, (String) item.getKey());
org.apache.hadoop.io.Text.writeString(out, (String) item.getValue());
}
}