public TextRecordInputStream(FileStatus f) throws IOException {
final Path fpath = f.getPath();
final Configuration lconf = getConf();
r = new SequenceFile.Reader(lconf,
SequenceFile.Reader.file(fpath));
key = ReflectionUtils.newInstance(
r.getKeyClass().asSubclass(Writable.class), lconf);
val = ReflectionUtils.newInstance(
r.getValueClass().asSubclass(Writable.class), lconf);
inbuf = new DataInputBuffer();
outbuf = new DataOutputBuffer();
}
java类org.apache.hadoop.io.SequenceFile的实例源码
Display.java 文件源码
项目:hadoop-oss
阅读 28
收藏 0
点赞 0
评论 0
HashTable.java 文件源码
项目:ditb
阅读 24
收藏 0
点赞 0
评论 0
private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
throws IOException {
@SuppressWarnings("deprecation")
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
ImmutableBytesWritable key = new ImmutableBytesWritable();
partitions = new ArrayList<ImmutableBytesWritable>();
while (reader.next(key)) {
partitions.add(new ImmutableBytesWritable(key.copyBytes()));
}
reader.close();
if (!Ordering.natural().isOrdered(partitions)) {
throw new IOException("Partitions are not ordered!");
}
}
SimpleCopyListing.java 文件源码
项目:circus-train
阅读 31
收藏 0
点赞 0
评论 0
@VisibleForTesting
public void doBuildListing(SequenceFile.Writer fileListWriter, S3MapReduceCpOptions options, List<Path> globbedPaths)
throws IOException {
try {
for (Path path : globbedPaths) {
FileSystem sourceFS = path.getFileSystem(getConf());
path = makeQualified(path);
FileStatus rootStatus = sourceFS.getFileStatus(path);
Path sourcePathRoot = computeSourceRootPath(rootStatus, options);
LOG.info("Root source path is {}", sourcePathRoot);
FileStatus[] sourceFiles = sourceFS.listStatus(path);
boolean explore = (sourceFiles != null && sourceFiles.length > 0);
if (explore || rootStatus.isDirectory()) {
for (FileStatus sourceStatus : sourceFiles) {
if (sourceStatus.isFile()) {
LOG.debug("Recording source-path: {} for copy.", sourceStatus.getPath());
CopyListingFileStatus sourceCopyListingStatus = new CopyListingFileStatus(sourceStatus);
writeToFileListing(fileListWriter, sourceCopyListingStatus, sourcePathRoot, options);
}
if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
LOG.debug("Traversing non-empty source dir: {}", sourceStatus.getPath());
traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot, options);
}
}
}
}
fileListWriter.close();
fileListWriter = null;
} finally {
IoUtil.closeSilently(LOG, fileListWriter);
}
}
SimpleCopyListing.java 文件源码
项目:circus-train
阅读 29
收藏 0
点赞 0
评论 0
private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
FileSystem fs = pathToListFile.getFileSystem(getConf());
if (fs.exists(pathToListFile)) {
fs.delete(pathToListFile, false);
}
return SequenceFile.createWriter(getConf(), SequenceFile.Writer.file(pathToListFile),
SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(CopyListingFileStatus.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
}
UniformSizeInputFormat.java 文件源码
项目:circus-train
阅读 22
收藏 0
点赞 0
评论 0
private SequenceFile.Reader getListingFileReader(Configuration configuration) {
final Path listingFilePath = getListingFilePath(configuration);
try {
final FileSystem fileSystem = listingFilePath.getFileSystem(configuration);
if (!fileSystem.exists(listingFilePath)) {
throw new IllegalArgumentException("Listing file doesn't exist at: " + listingFilePath);
}
return new SequenceFile.Reader(configuration, SequenceFile.Reader.file(listingFilePath));
} catch (IOException exception) {
LOG.error("Couldn't find listing file at: " + listingFilePath, exception);
throw new IllegalArgumentException("Couldn't find listing-file at: " + listingFilePath, exception);
}
}
CopyListing.java 文件源码
项目:circus-train
阅读 16
收藏 0
点赞 0
评论 0
/**
* Validate the final resulting path listing. Checks if there are duplicate entries. If preserving ACLs, checks that
* file system can support ACLs. If preserving XAttrs, checks that file system can support XAttrs.
*
* @param pathToListFile path listing build by doBuildListing
* @param options Input options to S3MapReduceCp
* @throws IOException Any issues while checking for duplicates and throws
* @throws DuplicateFileException if there are duplicates
*/
private void validateFinalListing(Path pathToListFile, S3MapReduceCpOptions options)
throws DuplicateFileException, IOException {
Configuration config = getConf();
FileSystem fs = pathToListFile.getFileSystem(config);
Path sortedList = sortListing(fs, config, pathToListFile);
SequenceFile.Reader reader = new SequenceFile.Reader(config, SequenceFile.Reader.file(sortedList));
try {
Text lastKey = new Text("*"); // source relative path can never hold *
CopyListingFileStatus lastFileStatus = new CopyListingFileStatus();
Text currentKey = new Text();
while (reader.next(currentKey)) {
if (currentKey.equals(lastKey)) {
CopyListingFileStatus currentFileStatus = new CopyListingFileStatus();
reader.getCurrentValue(currentFileStatus);
throw new DuplicateFileException("File "
+ lastFileStatus.getPath()
+ " and "
+ currentFileStatus.getPath()
+ " would cause duplicates. Aborting");
}
reader.getCurrentValue(lastFileStatus);
lastKey.set(currentKey);
}
} finally {
IOUtils.closeStream(reader);
}
}
CopyListing.java 文件源码
项目:circus-train
阅读 20
收藏 0
点赞 0
评论 0
/**
* Sort sequence file containing FileStatus and Text as key and value respecitvely
*
* @param fs File System
* @param conf Configuration
* @param sourceListing Source listing file
* @return Path of the sorted file. Is source file with _sorted appended to the name
* @throws IOException Any exception during sort.
*/
private static Path sortListing(FileSystem fs, Configuration conf, Path sourceListing) throws IOException {
SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class, CopyListingFileStatus.class, conf);
Path output = new Path(sourceListing.toString() + "_sorted");
if (fs.exists(output)) {
fs.delete(output, false);
}
sorter.sort(sourceListing, output);
return output;
}
SimpleCopyListingTest.java 文件源码
项目:circus-train
阅读 16
收藏 0
点赞 0
评论 0
@Test(timeout = 10000)
public void buildListingForSingleFile() throws Exception {
FileSystem fs = cluster.getFileSystem();
String testRootString = "/tmp/source";
Path testRoot = new Path(testRootString);
if (fs.exists(testRoot)) {
delete(fs, testRootString);
}
Path sourceFile = new Path(testRoot, "foo/bar/source.txt");
Path decoyFile = new Path(testRoot, "target/mooc");
URI targetFile = URI.create("s3://bucket/target/moo/target.txt");
createFile(fs, sourceFile.toString());
createFile(fs, decoyFile.toString());
final Path listFile = new Path(testRoot, "/tmp/fileList.seq");
listing.buildListing(listFile, options(sourceFile, targetFile));
try (SequenceFile.Reader reader = new SequenceFile.Reader(CONFIG, SequenceFile.Reader.file(listFile))) {
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
Text relativePath = new Text();
assertThat(reader.next(relativePath, fileStatus), is(true));
assertThat(relativePath.toString(), is("/source.txt"));
}
}
SimpleCopyListingTest.java 文件源码
项目:circus-train
阅读 20
收藏 0
点赞 0
评论 0
@Test(timeout = 10000)
public void buildListingForMultipleSources() throws Exception {
FileSystem fs = cluster.getFileSystem();
String testRootString = "/tmp/source";
Path testRoot = new Path(testRootString);
if (fs.exists(testRoot)) {
delete(fs, testRootString);
}
Path sourceDir1 = new Path(testRoot, "foo/baz/");
Path sourceDir2 = new Path(testRoot, "foo/bang/");
Path sourceFile1 = new Path(testRoot, "foo/bar/source.txt");
URI target = URI.create("s3://bucket/target/moo/");
fs.mkdirs(sourceDir1);
fs.mkdirs(sourceDir2);
createFile(fs, new Path(sourceDir1, "baz_1.dat"));
createFile(fs, new Path(sourceDir1, "baz_2.dat"));
createFile(fs, new Path(sourceDir2, "bang_0.dat"));
createFile(fs, sourceFile1.toString());
final Path listFile = new Path(testRoot, "/tmp/fileList.seq");
listing.buildListing(listFile, options(Arrays.asList(sourceFile1, sourceDir1, sourceDir2), target));
try (SequenceFile.Reader reader = new SequenceFile.Reader(CONFIG, SequenceFile.Reader.file(listFile))) {
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
Text relativePath = new Text();
assertThat(reader.next(relativePath, fileStatus), is(true));
assertThat(relativePath.toString(), is("/source.txt"));
assertThat(reader.next(relativePath, fileStatus), is(true));
assertThat(relativePath.toString(), is("/baz_1.dat"));
assertThat(reader.next(relativePath, fileStatus), is(true));
assertThat(relativePath.toString(), is("/baz_2.dat"));
assertThat(reader.next(relativePath, fileStatus), is(true));
assertThat(relativePath.toString(), is("/bang_0.dat"));
}
}
HashTable.java 文件源码
项目:ditb
阅读 29
收藏 0
点赞 0
评论 0
void writePartitionFile(Configuration conf, Path path) throws IOException {
FileSystem fs = path.getFileSystem(conf);
@SuppressWarnings("deprecation")
SequenceFile.Writer writer = SequenceFile.createWriter(
fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
for (int i = 0; i < partitions.size(); i++) {
writer.append(partitions.get(i), NullWritable.get());
}
writer.close();
}