public SeqFileAppendable(FileSystem fs, Path path, int osBufferSize,
String compress, int minBlkSize) throws IOException {
Configuration conf = new Configuration();
CompressionCodec codec = null;
if ("lzo".equals(compress)) {
codec = Compression.Algorithm.LZO.getCodec();
}
else if ("gz".equals(compress)) {
codec = Compression.Algorithm.GZ.getCodec();
}
else if (!"none".equals(compress))
throw new IOException("Codec not supported.");
this.fsdos = fs.create(path, true, osBufferSize);
if (!"none".equals(compress)) {
writer =
SequenceFile.createWriter(conf, fsdos, BytesWritable.class,
BytesWritable.class, SequenceFile.CompressionType.BLOCK, codec);
}
else {
writer =
SequenceFile.createWriter(conf, fsdos, BytesWritable.class,
BytesWritable.class, SequenceFile.CompressionType.NONE, null);
}
}
java类org.apache.hadoop.io.SequenceFile的实例源码
TestTFileSeqFileComparison.java 文件源码
项目:hadoop-oss
阅读 29
收藏 0
点赞 0
评论 0
SimpleCopyListing.java 文件源码
项目:circus-train
阅读 28
收藏 0
点赞 0
评论 0
/**
* Collect the list of <sourceRelativePath, sourceFileStatus> to be copied and write to the sequence file. In essence,
* any file or directory that need to be copied or sync-ed is written as an entry to the sequence file, with the
* possible exception of the source root: when either -update (sync) or -overwrite switch is specified, and if the the
* source root is a directory, then the source root entry is not written to the sequence file, because only the
* contents of the source directory need to be copied in this case. See
* {@link com.hotels.bdp.circustrain.s3mapreducecp.util.ConfigurationUtil#getRelativePath} for how relative path is
* computed. See computeSourceRootPath method for how the root path of the source is computed.
*
* @param fileListWriter
* @param options
* @param globbedPaths
* @throws IOException
*/
@VisibleForTesting
public void doBuildListing(SequenceFile.Writer fileListWriter, S3MapReduceCpOptions options) throws IOException {
List<Path> globbedPaths = new ArrayList<>(options.getSources().size());
for (Path sourcePath : options.getSources()) {
FileSystem fs = sourcePath.getFileSystem(getConf());
FileStatus sourceFileStatus = fs.getFileStatus(sourcePath);
if (sourceFileStatus.isFile()) {
LOG.debug("Adding path {}", sourceFileStatus.getPath());
globbedPaths.add(sourceFileStatus.getPath());
} else {
FileStatus[] inputs = fs.globStatus(sourcePath);
if (inputs != null && inputs.length > 0) {
for (FileStatus onePath : inputs) {
LOG.debug("Adding path {}", onePath.getPath());
globbedPaths.add(onePath.getPath());
}
} else {
throw new InvalidInputException("Source path " + sourcePath + " doesn't exist");
}
}
}
doBuildListing(fileListWriter, options, globbedPaths);
}
SimpleCopyListing.java 文件源码
项目:circus-train
阅读 29
收藏 0
点赞 0
评论 0
private void traverseNonEmptyDirectory(
SequenceFile.Writer fileListWriter,
FileStatus sourceStatus,
Path sourcePathRoot,
S3MapReduceCpOptions options)
throws IOException {
FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
Stack<FileStatus> pathStack = new Stack<>();
pathStack.push(sourceStatus);
while (!pathStack.isEmpty()) {
for (FileStatus child : getChildren(sourceFS, pathStack.pop())) {
if (child.isFile()) {
LOG.debug("Recording source-path: {} for copy.", sourceStatus.getPath());
CopyListingFileStatus childCopyListingStatus = new CopyListingFileStatus(child);
writeToFileListing(fileListWriter, childCopyListingStatus, sourcePathRoot, options);
}
if (isDirectoryAndNotEmpty(sourceFS, child)) {
LOG.debug("Traversing non-empty source dir: {}", sourceStatus.getPath());
pathStack.push(child);
}
}
}
}
SimpleCopyListing.java 文件源码
项目:circus-train
阅读 29
收藏 0
点赞 0
评论 0
private void writeToFileListing(
SequenceFile.Writer fileListWriter,
CopyListingFileStatus fileStatus,
Path sourcePathRoot,
S3MapReduceCpOptions options)
throws IOException {
LOG.debug("REL PATH: {}, FULL PATH: {}", PathUtil.getRelativePath(sourcePathRoot, fileStatus.getPath()),
fileStatus.getPath());
FileStatus status = fileStatus;
if (!shouldCopy(fileStatus.getPath(), options)) {
return;
}
fileListWriter.append(new Text(PathUtil.getRelativePath(sourcePathRoot, fileStatus.getPath())), status);
fileListWriter.sync();
if (!fileStatus.isDirectory()) {
totalBytesToCopy += fileStatus.getLen();
}
totalPaths++;
}
SimpleCopyListing.java 文件源码
项目:hadoop
阅读 29
收藏 0
点赞 0
评论 0
private void writeToFileListing(SequenceFile.Writer fileListWriter,
CopyListingFileStatus fileStatus,
Path sourcePathRoot,
DistCpOptions options) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
}
FileStatus status = fileStatus;
if (!shouldCopy(fileStatus.getPath(), options)) {
return;
}
fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
fileStatus.getPath())), status);
fileListWriter.sync();
if (!fileStatus.isDirectory()) {
totalBytesToCopy += fileStatus.getLen();
}
totalPaths++;
}
SimpleCopyListingTest.java 文件源码
项目:circus-train
阅读 17
收藏 0
点赞 0
评论 0
@Test(timeout = 10000)
public void skipFlagFiles() throws Exception {
FileSystem fs = cluster.getFileSystem();
Path source = new Path("/tmp/in4");
URI target = URI.create("s3://bucket/tmp/out4/");
createFile(fs, new Path(source, "1/_SUCCESS"));
createFile(fs, new Path(source, "1/file"));
createFile(fs, new Path(source, "2"));
Path listingFile = new Path("/tmp/list4");
listing.buildListing(listingFile, options(source, target));
assertThat(listing.getNumberOfPaths(), is(2L));
try (SequenceFile.Reader reader = new SequenceFile.Reader(CONFIG, SequenceFile.Reader.file(listingFile))) {
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
Text relativePath = new Text();
assertThat(reader.next(relativePath, fileStatus), is(true));
assertThat(relativePath.toString(), is("/1/file"));
assertThat(reader.next(relativePath, fileStatus), is(true));
assertThat(relativePath.toString(), is("/2"));
assertThat(reader.next(relativePath, fileStatus), is(false));
}
}
SimpleCopyListingTest.java 文件源码
项目:circus-train
阅读 18
收藏 0
点赞 0
评论 0
@Test
public void failOnCloseError() throws IOException {
File inFile = File.createTempFile("TestCopyListingIn", null);
inFile.deleteOnExit();
File outFile = File.createTempFile("TestCopyListingOut", null);
outFile.deleteOnExit();
Path source = new Path(inFile.toURI());
Exception expectedEx = new IOException("boom");
SequenceFile.Writer writer = mock(SequenceFile.Writer.class);
doThrow(expectedEx).when(writer).close();
SimpleCopyListing listing = new SimpleCopyListing(CONFIG, CREDENTIALS);
Exception actualEx = null;
try {
listing.doBuildListing(writer, options(source, outFile.toURI()));
} catch (Exception e) {
actualEx = e;
}
Assert.assertNotNull("close writer didn't fail", actualEx);
Assert.assertEquals(expectedEx, actualEx);
}
HDFSSequenceFile.java 文件源码
项目:flume-release-1.7.0
阅读 20
收藏 0
点赞 0
评论 0
protected void open(Path dstPath, CompressionCodec codeC,
CompressionType compType, Configuration conf, FileSystem hdfs)
throws IOException {
if (useRawLocalFileSystem) {
if (hdfs instanceof LocalFileSystem) {
hdfs = ((LocalFileSystem)hdfs).getRaw();
} else {
logger.warn("useRawLocalFileSystem is set to true but file system " +
"is not of type LocalFileSystem: " + hdfs.getClass().getName());
}
}
if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile(dstPath)) {
outStream = hdfs.append(dstPath);
} else {
outStream = hdfs.create(dstPath);
}
writer = SequenceFile.createWriter(conf, outStream,
serializer.getKeyClass(), serializer.getValueClass(), compType, codeC);
registerCurrentStream(outStream, hdfs, dstPath);
}
TestBucketWriter.java 文件源码
项目:flume-release-1.7.0
阅读 27
收藏 0
点赞 0
评论 0
@Test
public void testEventCountingRoller() throws IOException, InterruptedException {
int maxEvents = 100;
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
BucketWriter bucketWriter = new BucketWriter(
0, 0, maxEvents, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
for (int i = 0; i < 1000; i++) {
bucketWriter.append(e);
}
logger.info("Number of events written: {}", hdfsWriter.getEventsWritten());
logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten());
logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened());
Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten());
Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten());
Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened());
}
TestBucketWriter.java 文件源码
项目:flume-release-1.7.0
阅读 21
收藏 0
点赞 0
评论 0
@Test
public void testSizeRoller() throws IOException, InterruptedException {
int maxBytes = 300;
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
BucketWriter bucketWriter = new BucketWriter(
0, maxBytes, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
for (int i = 0; i < 1000; i++) {
bucketWriter.append(e);
}
logger.info("Number of events written: {}", hdfsWriter.getEventsWritten());
logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten());
logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened());
Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten());
Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten());
Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened());
}
TestGlobbedCopyListing.java 文件源码
项目:hadoop
阅读 21
收藏 0
点赞 0
评论 0
private void verifyContents(Path listingPath) throws Exception {
SequenceFile.Reader reader = new SequenceFile.Reader(cluster.getFileSystem(),
listingPath, new Configuration());
Text key = new Text();
CopyListingFileStatus value = new CopyListingFileStatus();
Map<String, String> actualValues = new HashMap<String, String>();
while (reader.next(key, value)) {
if (value.isDirectory() && key.toString().equals("")) {
// ignore root with empty relPath, which is an entry to be
// used for preserving root attributes etc.
continue;
}
actualValues.put(value.getPath().toString(), key.toString());
}
Assert.assertEquals(expectedValues.size(), actualValues.size());
for (Map.Entry<String, String> entry : actualValues.entrySet()) {
Assert.assertEquals(entry.getValue(), expectedValues.get(entry.getKey()));
}
}
TestBucketWriter.java 文件源码
项目:flume-release-1.7.0
阅读 22
收藏 0
点赞 0
评论 0
@Test
public void testInUsePrefix() throws IOException, InterruptedException {
final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
final String PREFIX = "BRNO_IS_CITY_IN_CZECH_REPUBLIC";
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
HDFSTextSerializer formatter = new HDFSTextSerializer();
BucketWriter bucketWriter = new BucketWriter(
ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null,
SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
bucketWriter.append(e);
Assert.assertTrue("Incorrect in use prefix", hdfsWriter.getOpenedFilePath().contains(PREFIX));
}
TestBucketWriter.java 文件源码
项目:flume-release-1.7.0
阅读 23
收藏 0
点赞 0
评论 0
@Test
public void testInUseSuffix() throws IOException, InterruptedException {
final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
final String SUFFIX = "WELCOME_TO_THE_HELLMOUNTH";
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
HDFSTextSerializer serializer = new HDFSTextSerializer();
BucketWriter bucketWriter = new BucketWriter(
ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null,
SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
bucketWriter.append(e);
Assert.assertTrue("Incorrect in use suffix", hdfsWriter.getOpenedFilePath().contains(SUFFIX));
}
TestBucketWriter.java 文件源码
项目:flume-release-1.7.0
阅读 25
收藏 0
点赞 0
评论 0
@Test
public void testCallbackOnClose() throws IOException, InterruptedException {
final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
final String SUFFIX = "WELCOME_TO_THE_EREBOR";
final AtomicBoolean callbackCalled = new AtomicBoolean(false);
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
BucketWriter bucketWriter = new BucketWriter(
ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null,
SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
new HDFSEventSink.WriterCallback() {
@Override
public void run(String filePath) {
callbackCalled.set(true);
}
}, "blah", 30000, Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
bucketWriter.append(e);
bucketWriter.close(true);
Assert.assertTrue(callbackCalled.get());
}
TestHDFSCompressedDataStream.java 文件源码
项目:flume-release-1.7.0
阅读 20
收藏 0
点赞 0
评论 0
@Test
public void testGzipDurability() throws Exception {
Context context = new Context();
HDFSCompressedDataStream writer = new HDFSCompressedDataStream();
writer.configure(context);
writer.open(fileURI, factory.getCodec(new Path(fileURI)),
SequenceFile.CompressionType.BLOCK);
String[] bodies = { "yarf!" };
writeBodies(writer, bodies);
byte[] buf = new byte[256];
GZIPInputStream cmpIn = new GZIPInputStream(new FileInputStream(file));
int len = cmpIn.read(buf);
String result = new String(buf, 0, len, Charsets.UTF_8);
result = result.trim(); // BodyTextEventSerializer adds a newline
Assert.assertEquals("input and output must match", bodies[0], result);
}
DistCpV1.java 文件源码
项目:hadoop
阅读 31
收藏 0
点赞 0
评论 0
static private void finalize(Configuration conf, JobConf jobconf,
final Path destPath, String presevedAttributes) throws IOException {
if (presevedAttributes == null) {
return;
}
EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
if (!preseved.contains(FileAttribute.USER)
&& !preseved.contains(FileAttribute.GROUP)
&& !preseved.contains(FileAttribute.PERMISSION)) {
return;
}
FileSystem dstfs = destPath.getFileSystem(conf);
Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
try (SequenceFile.Reader in =
new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) {
Text dsttext = new Text();
FilePair pair = new FilePair();
for(; in.next(dsttext, pair); ) {
Path absdst = new Path(destPath, pair.output);
updateDestStatus(pair.input, dstfs.getFileStatus(absdst),
preseved, dstfs);
}
}
}
SequenceFileIterator.java 文件源码
项目:LDA
阅读 21
收藏 0
点赞 0
评论 0
/**
* @throws IOException if path can't be read, or its key or value class can't be instantiated
*/
public SequenceFileIterator(Path path, boolean reuseKeyValueInstances, Configuration conf) throws IOException {
key = null;
value = null;
FileSystem fs = path.getFileSystem(conf);
path = path.makeQualified(fs);
reader = new SequenceFile.Reader(fs, path, conf);
this.conf = conf;
keyClass = (Class<K>) reader.getKeyClass();
valueClass = (Class<V>) reader.getValueClass();
noValue = NullWritable.class.equals(valueClass);
this.reuseKeyValueInstances = reuseKeyValueInstances;
}
GenerateDistCacheData.java 文件源码
项目:hadoop
阅读 57
收藏 0
点赞 0
评论 0
@Override
public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
final JobConf jobConf = new JobConf(jobCtxt.getConfiguration());
final JobClient client = new JobClient(jobConf);
ClusterStatus stat = client.getClusterStatus(true);
int numTrackers = stat.getTaskTrackers();
final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1);
// Total size of distributed cache files to be generated
final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1);
// Get the path of the special file
String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST);
if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) {
throw new RuntimeException("Invalid metadata: #files (" + fileCount
+ "), total_size (" + totalSize + "), filelisturi ("
+ distCacheFileList + ")");
}
Path sequenceFile = new Path(distCacheFileList);
FileSystem fs = sequenceFile.getFileSystem(jobConf);
FileStatus srcst = fs.getFileStatus(sequenceFile);
// Consider the number of TTs * mapSlotsPerTracker as number of mappers.
int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2);
int numSplits = numTrackers * numMapSlotsPerTracker;
List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
// Average size of data to be generated by each map task
final long targetSize = Math.max(totalSize / numSplits,
DistributedCacheEmulator.AVG_BYTES_PER_MAP);
long splitStartPosition = 0L;
long splitEndPosition = 0L;
long acc = 0L;
long bytesRemaining = srcst.getLen();
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
while (reader.next(key, value)) {
// If adding this file would put this split past the target size,
// cut the last split and put this file in the next split.
if (acc + key.get() > targetSize && acc != 0) {
long splitSize = splitEndPosition - splitStartPosition;
splits.add(new FileSplit(
sequenceFile, splitStartPosition, splitSize, (String[])null));
bytesRemaining -= splitSize;
splitStartPosition = splitEndPosition;
acc = 0L;
}
acc += key.get();
splitEndPosition = reader.getPosition();
}
} finally {
if (reader != null) {
reader.close();
}
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(
sequenceFile, splitStartPosition, bytesRemaining, (String[])null));
}
return splits;
}
JHLogAnalyzer.java 文件源码
项目:hadoop
阅读 18
收藏 0
点赞 0
评论 0
public void run() {
try {
for(int i=start; i < end; i++) {
String name = getFileName(i);
Path controlFile = new Path(INPUT_DIR, "in_file_" + name);
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, fs.getConf(), controlFile,
Text.class, LongWritable.class,
CompressionType.NONE);
String logFile = jhLogFiles[i].getPath().toString();
writer.append(new Text(logFile), new LongWritable(0));
} catch(Exception e) {
throw new IOException(e);
} finally {
if (writer != null)
writer.close();
writer = null;
}
}
} catch(IOException ex) {
LOG.error("FileCreateDaemon failed.", ex);
}
numFinishedThreads++;
}
TestDatamerge.java 文件源码
项目:hadoop
阅读 24
收藏 0
点赞 0
评论 0
private static SequenceFile.Writer[] createWriters(Path testdir,
Configuration conf, int srcs, Path[] src) throws IOException {
for (int i = 0; i < srcs; ++i) {
src[i] = new Path(testdir, Integer.toString(i + 10, 36));
}
SequenceFile.Writer out[] = new SequenceFile.Writer[srcs];
for (int i = 0; i < srcs; ++i) {
out[i] = new SequenceFile.Writer(testdir.getFileSystem(conf), conf,
src[i], IntWritable.class, IntWritable.class);
}
return out;
}
GenericMRLoadGenerator.java 文件源码
项目:hadoop
阅读 23
收藏 0
点赞 0
评论 0
public List<InputSplit> getSplits(JobContext job)
throws IOException {
Configuration conf = job.getConfiguration();
Path src = new Path(conf.get(INDIRECT_INPUT_FILE, null));
FileSystem fs = src.getFileSystem(conf);
List<InputSplit> splits = new ArrayList<InputSplit>();
LongWritable key = new LongWritable();
Text value = new Text();
for (SequenceFile.Reader sl = new SequenceFile.Reader(fs, src, conf);
sl.next(key, value);) {
splits.add(new IndirectSplit(new Path(value.toString()), key.get()));
}
return splits;
}
TestTotalOrderPartitioner.java 文件源码
项目:hadoop
阅读 23
收藏 0
点赞 0
评论 0
private static <T extends WritableComparable<?>> Path writePartitionFile(
String testname, Configuration conf, T[] splits) throws IOException {
final FileSystem fs = FileSystem.getLocal(conf);
final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
).makeQualified(fs);
Path p = new Path(testdir, testname + "/_partition.lst");
TotalOrderPartitioner.setPartitionFile(conf, p);
conf.setInt(MRJobConfig.NUM_REDUCES, splits.length + 1);
SequenceFile.Writer w = null;
try {
w = SequenceFile.createWriter(fs, conf, p,
splits[0].getClass(), NullWritable.class,
SequenceFile.CompressionType.NONE);
for (int i = 0; i < splits.length; ++i) {
w.append(splits[i], NullWritable.get());
}
} finally {
if (null != w)
w.close();
}
return p;
}
TestJoinDatamerge.java 文件源码
项目:hadoop
阅读 22
收藏 0
点赞 0
评论 0
private static int countProduct(IntWritable key, Path[] src,
Configuration conf) throws IOException {
int product = 1;
for (Path p : src) {
int count = 0;
SequenceFile.Reader r = new SequenceFile.Reader(
cluster.getFileSystem(), p, conf);
IntWritable k = new IntWritable();
IntWritable v = new IntWritable();
while (r.next(k, v)) {
if (k.equals(key)) {
count++;
}
}
r.close();
if (count != 0) {
product *= count;
}
}
return product;
}
TestCombineSequenceFileInputFormat.java 文件源码
项目:hadoop
阅读 20
收藏 0
点赞 0
评论 0
private static void createFiles(int length, int numFiles, Random random,
Job job) throws IOException {
Range[] ranges = createRanges(length, numFiles, random);
for (int i = 0; i < numFiles; i++) {
Path file = new Path(workDir, "test_" + i + ".seq");
// create a file with length entries
@SuppressWarnings("deprecation")
SequenceFile.Writer writer =
SequenceFile.createWriter(localFs, job.getConfiguration(), file,
IntWritable.class, BytesWritable.class);
Range range = ranges[i];
try {
for (int j = range.start; j < range.end; j++) {
IntWritable key = new IntWritable(j);
byte[] data = new byte[random.nextInt(10)];
random.nextBytes(data);
BytesWritable value = new BytesWritable(data);
writer.append(key, value);
}
} finally {
writer.close();
}
}
}
NNBench.java 文件源码
项目:hadoop
阅读 26
收藏 0
点赞 0
评论 0
/**
* Create control files before a test run.
* Number of files created is equal to the number of maps specified
*
* @throws IOException on error
*/
private static void createControlFiles() throws IOException {
FileSystem tempFS = FileSystem.get(config);
LOG.info("Creating " + numberOfMaps + " control files");
for (int i = 0; i < numberOfMaps; i++) {
String strFileName = "NNBench_Controlfile_" + i;
Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME),
strFileName);
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class,
LongWritable.class, CompressionType.NONE);
writer.append(new Text(strFileName), new LongWritable(0l));
} finally {
if (writer != null) {
writer.close();
}
}
}
}
UniformSizeInputFormat.java 文件源码
项目:hadoop
阅读 21
收藏 0
点赞 0
评论 0
private SequenceFile.Reader getListingFileReader(Configuration configuration) {
final Path listingFilePath = getListingFilePath(configuration);
try {
final FileSystem fileSystem = listingFilePath.getFileSystem(configuration);
if (!fileSystem.exists(listingFilePath))
throw new IllegalArgumentException("Listing file doesn't exist at: "
+ listingFilePath);
return new SequenceFile.Reader(configuration,
SequenceFile.Reader.file(listingFilePath));
}
catch (IOException exception) {
LOG.error("Couldn't find listing file at: " + listingFilePath, exception);
throw new IllegalArgumentException("Couldn't find listing-file at: "
+ listingFilePath, exception);
}
}
TotalOrderPartitioner.java 文件源码
项目:hadoop
阅读 17
收藏 0
点赞 0
评论 0
/**
* Read the cut points from the given IFile.
* @param fs The file system
* @param p The path to read
* @param keyClass The map output key class
* @param job The job config
* @throws IOException
*/
// matching key types enforced by passing in
@SuppressWarnings("unchecked") // map output key class
private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
Configuration conf) throws IOException {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
ArrayList<K> parts = new ArrayList<K>();
K key = ReflectionUtils.newInstance(keyClass, conf);
NullWritable value = NullWritable.get();
try {
while (reader.next(key, value)) {
parts.add(key);
key = ReflectionUtils.newInstance(keyClass, conf);
}
reader.close();
reader = null;
} finally {
IOUtils.cleanup(LOG, reader);
}
return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
}
TestClientDistributedCacheManager.java 文件源码
项目:hadoop
阅读 24
收藏 0
点赞 0
评论 0
@SuppressWarnings("deprecation")
void createTempFile(Path p, Configuration conf) throws IOException {
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, conf, p,
Text.class, Text.class,
CompressionType.NONE);
writer.append(new Text("text"), new Text("moretext"));
} catch(Exception e) {
throw new IOException(e.getLocalizedMessage());
} finally {
if (writer != null) {
writer.close();
}
writer = null;
}
LOG.info("created: " + p);
}
SequenceFileAnalyzer.java 文件源码
项目:wherehowsX
阅读 21
收藏 0
点赞 0
评论 0
@Override
public DatasetJsonRecord getSchema(Path path) throws IOException {
DatasetJsonRecord record = null;
if (!fs.exists(path))
LOG.error("sequencefileanalyzer file : " + path.toUri().getPath() + " is not exist on hdfs");
else {
try {
LOG.info("sequencefileanalyzer start parse schema for file path : {}", path.toUri().getPath());
SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(path));
String keyName = "Key";
String keyType = getWritableType(reader.getKeyClassName());
String valueName = "Value";
String valueType = getWritableType(reader.getValueClassName());
FileStatus status = fs.getFileStatus(path);
String storage = STORAGE_TYPE;
String abstractPath = path.toUri().getPath();
String codec = "sequence.codec";
String schemaString = "{\"fields\": [{\"name\": \"" + keyName + "\", \"type\": \"" + keyType + "\"}, {\"name\": \"" + valueName + "\", \"type\": \"" + valueType + "\"}], \"name\": \"Result\", \"namespace\": \"com.tencent.lake\", \"type\": \"record\"}";
record = new DatasetJsonRecord(schemaString, abstractPath, status.getModificationTime(), status.getOwner(), status.getGroup(),
status.getPermission().toString(), codec, storage, "");
LOG.info("sequencefileanalyzer parse path :{},schema is {}", path.toUri().getPath(), record.toCsvString());
} catch (Exception e) {
LOG.error("path : {} content " + " is not Sequence File format content ",path.toUri().getPath());
LOG.info(e.getStackTrace().toString());
}
}
return record;
}
SequenceFileAnalyzer.java 文件源码
项目:wherehowsX
阅读 18
收藏 0
点赞 0
评论 0
@Override
public SampleDataRecord getSampleData(Path path) throws IOException {
SampleDataRecord dataRecord = null;
if (!fs.exists(path))
LOG.error("sequence file : " + path.toUri().getPath() + " is not exist on hdfs");
else {
try {
LOG.info("sequencefileanalyzer start parse sampledata for file path : {}", path.toUri().getPath());
SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(path));
List<Object> sampleValues = new ArrayList<Object>();
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf());
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf());
int count = 0;
String keyName = "Key";
String valueName = "Value";
while (reader.next(key, value) && count < 12) {
sampleValues.add("{\"" + keyName + "\": \"" + key + "\", \"" + valueName + "\": \"" + value + "\"}");
count++;
}
dataRecord = new SampleDataRecord(path.toUri().getPath(), sampleValues);
LOG.info("sequence file path : {}, sample data is {}", path.toUri().getPath(), sampleValues);
} catch (Exception e) {
LOG.error("path : {} content " + " is not Sequence File format content ",path.toUri().getPath());
LOG.info(e.getStackTrace().toString());
}
}
return dataRecord;
}