private void codecTestMapFile(Class<? extends CompressionCodec> clazz,
CompressionType type, int records) throws Exception {
FileSystem fs = FileSystem.get(conf);
LOG.info("Creating MapFiles with " + records +
" records using codec " + clazz.getSimpleName());
Path path = new Path(new Path(
System.getProperty("test.build.data", "/tmp")),
clazz.getSimpleName() + "-" + type + "-" + records);
LOG.info("Writing " + path);
createMapFile(conf, fs, path, clazz.newInstance(), type, records);
MapFile.Reader reader = new MapFile.Reader(path, conf);
Text key1 = new Text("002");
assertNotNull(reader.get(key1, new Text()));
Text key2 = new Text("004");
assertNotNull(reader.get(key2, new Text()));
}
java类org.apache.hadoop.io.MapFile的实例源码
TestCodec.java 文件源码
项目:hadoop-oss
阅读 25
收藏 0
点赞 0
评论 0
SequenceFileInputFormat.java 文件源码
项目:hadoop
阅读 17
收藏 0
点赞 0
评论 0
@Override
protected List<FileStatus> listStatus(JobContext job
)throws IOException {
List<FileStatus> files = super.listStatus(job);
int len = files.size();
for(int i=0; i < len; ++i) {
FileStatus file = files.get(i);
if (file.isDirectory()) { // it's a MapFile
Path p = file.getPath();
FileSystem fs = p.getFileSystem(job.getConfiguration());
// use the data file
files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME)));
}
}
return files;
}
TestFileOutputCommitter.java 文件源码
项目:hadoop
阅读 25
收藏 0
点赞 0
评论 0
private void validateMapFileOutputContent(
FileSystem fs, Path dir) throws IOException {
// map output is a directory with index and data files
Path expectedMapDir = new Path(dir, partFile);
assert(fs.getFileStatus(expectedMapDir).isDirectory());
FileStatus[] files = fs.listStatus(expectedMapDir);
int fileCount = 0;
boolean dataFileFound = false;
boolean indexFileFound = false;
for (FileStatus f : files) {
if (f.isFile()) {
++fileCount;
if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
indexFileFound = true;
}
else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
dataFileFound = true;
}
}
}
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
TestFileOutputCommitter.java 文件源码
项目:hadoop
阅读 22
收藏 0
点赞 0
评论 0
private void validateMapFileOutputContent(
FileSystem fs, Path dir) throws IOException {
// map output is a directory with index and data files
Path expectedMapDir = new Path(dir, partFile);
assert(fs.getFileStatus(expectedMapDir).isDirectory());
FileStatus[] files = fs.listStatus(expectedMapDir);
int fileCount = 0;
boolean dataFileFound = false;
boolean indexFileFound = false;
for (FileStatus f : files) {
if (f.isFile()) {
++fileCount;
if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
indexFileFound = true;
}
else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
dataFileFound = true;
}
}
}
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
TestCodec.java 文件源码
项目:hadoop
阅读 19
收藏 0
点赞 0
评论 0
private void codecTestMapFile(Class<? extends CompressionCodec> clazz,
CompressionType type, int records) throws Exception {
FileSystem fs = FileSystem.get(conf);
LOG.info("Creating MapFiles with " + records +
" records using codec " + clazz.getSimpleName());
Path path = new Path(new Path(
System.getProperty("test.build.data", "/tmp")),
clazz.getSimpleName() + "-" + type + "-" + records);
LOG.info("Writing " + path);
createMapFile(conf, fs, path, clazz.newInstance(), type, records);
MapFile.Reader reader = new MapFile.Reader(path, conf);
Text key1 = new Text("002");
assertNotNull(reader.get(key1, new Text()));
Text key2 = new Text("004");
assertNotNull(reader.get(key2, new Text()));
}
MapFileOutputFormat.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 16
收藏 0
点赞 0
评论 0
/** Open the output generated by this format. */
public static MapFile.Reader[] getReaders(Path dir,
Configuration conf) throws IOException {
FileSystem fs = dir.getFileSystem(conf);
PathFilter filter = new PathFilter() {
@Override
public boolean accept(Path path) {
String name = path.getName();
if (name.startsWith("_") || name.startsWith("."))
return false;
return true;
}
};
Path[] names = FileUtil.stat2Paths(fs.listStatus(dir, filter));
// sort names, so that hash partitioning works
Arrays.sort(names);
MapFile.Reader[] parts = new MapFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
}
return parts;
}
SequenceFileInputFormat.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 17
收藏 0
点赞 0
评论 0
@Override
protected List<FileStatus> listStatus(JobContext job
)throws IOException {
List<FileStatus> files = super.listStatus(job);
int len = files.size();
for(int i=0; i < len; ++i) {
FileStatus file = files.get(i);
if (file.isDirectory()) { // it's a MapFile
Path p = file.getPath();
FileSystem fs = p.getFileSystem(job.getConfiguration());
// use the data file
files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME)));
}
}
return files;
}
TestFileOutputCommitter.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 18
收藏 0
点赞 0
评论 0
private void validateMapFileOutputContent(
FileSystem fs, Path dir) throws IOException {
// map output is a directory with index and data files
Path expectedMapDir = new Path(dir, partFile);
assert(fs.getFileStatus(expectedMapDir).isDirectory());
FileStatus[] files = fs.listStatus(expectedMapDir);
int fileCount = 0;
boolean dataFileFound = false;
boolean indexFileFound = false;
for (FileStatus f : files) {
if (f.isFile()) {
++fileCount;
if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
indexFileFound = true;
}
else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
dataFileFound = true;
}
}
}
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
TestFileOutputCommitter.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 19
收藏 0
点赞 0
评论 0
private void validateMapFileOutputContent(
FileSystem fs, Path dir) throws IOException {
// map output is a directory with index and data files
Path expectedMapDir = new Path(dir, partFile);
assert(fs.getFileStatus(expectedMapDir).isDirectory());
FileStatus[] files = fs.listStatus(expectedMapDir);
int fileCount = 0;
boolean dataFileFound = false;
boolean indexFileFound = false;
for (FileStatus f : files) {
if (f.isFile()) {
++fileCount;
if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
indexFileFound = true;
}
else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
dataFileFound = true;
}
}
}
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
TestCodec.java 文件源码
项目:aliyun-oss-hadoop-fs
阅读 24
收藏 0
点赞 0
评论 0
private void codecTestMapFile(Class<? extends CompressionCodec> clazz,
CompressionType type, int records) throws Exception {
FileSystem fs = FileSystem.get(conf);
LOG.info("Creating MapFiles with " + records +
" records using codec " + clazz.getSimpleName());
Path path = new Path(new Path(
System.getProperty("test.build.data", "/tmp")),
clazz.getSimpleName() + "-" + type + "-" + records);
LOG.info("Writing " + path);
createMapFile(conf, fs, path, clazz.newInstance(), type, records);
MapFile.Reader reader = new MapFile.Reader(path, conf);
Text key1 = new Text("002");
assertNotNull(reader.get(key1, new Text()));
Text key2 = new Text("004");
assertNotNull(reader.get(key2, new Text()));
}
Utils.java 文件源码
项目:fst-bench
阅读 22
收藏 0
点赞 0
评论 0
public static final IndexedMapFile getSharedMapFile(String symbol, JobConf job) throws IOException {
int slots = job.getInt(symbol, 0);
if (slots <=0) {
log.error("slots number should be no less than 1 !!!");
System.exit(-1);
}
FileSystem fs = FileSystem.getLocal(job);
MapFile.Reader[] readers = new MapFile.Reader[slots];
for (int i=0; i<slots; i++) {
String symbfile = fs.getWorkingDirectory().toString() + "/" + symbol + "-" + Integer.toString(i);
readers[i] = new MapFile.Reader(fs, symbfile, job);
}
return new IndexedMapFile(slots, readers);
}
SegmentReader.java 文件源码
项目:GeoCrawler
阅读 22
收藏 0
点赞 0
评论 0
private List<Writable> getMapRecords(Path dir, Text key) throws Exception {
MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir,
getConf());
ArrayList<Writable> res = new ArrayList<Writable>();
Class<?> keyClass = readers[0].getKeyClass();
Class<?> valueClass = readers[0].getValueClass();
if (!keyClass.getName().equals("org.apache.hadoop.io.Text"))
throw new IOException("Incompatible key (" + keyClass.getName() + ")");
Writable value = (Writable) valueClass.newInstance();
// we don't know the partitioning schema
for (int i = 0; i < readers.length; i++) {
if (readers[i].get(key, value) != null) {
res.add(value);
value = (Writable) valueClass.newInstance();
Text aKey = (Text) keyClass.newInstance();
while (readers[i].next(aKey, value) && aKey.equals(key)) {
res.add(value);
value = (Writable) valueClass.newInstance();
}
}
readers[i].close();
}
return res;
}
TestCrawlDbMerger.java 文件源码
项目:GeoCrawler
阅读 17
收藏 0
点赞 0
评论 0
private void createCrawlDb(Configuration config, FileSystem fs, Path crawldb,
TreeSet<String> init, CrawlDatum cd) throws Exception {
LOG.fine("* creating crawldb: " + crawldb);
Path dir = new Path(crawldb, CrawlDb.CURRENT_NAME);
Option wKeyOpt = MapFile.Writer.keyClass(Text.class);
org.apache.hadoop.io.SequenceFile.Writer.Option wValueOpt = SequenceFile.Writer.valueClass(CrawlDatum.class);
MapFile.Writer writer = new MapFile.Writer(config, new Path(dir,
"part-00000"), wKeyOpt, wValueOpt);
Iterator<String> it = init.iterator();
while (it.hasNext()) {
String key = it.next();
writer.append(new Text(key), cd);
}
writer.close();
}
CrawlDBTestUtil.java 文件源码
项目:GeoCrawler
阅读 28
收藏 0
点赞 0
评论 0
/**
* Creates synthetic crawldb
*
* @param fs
* filesystem where db will be created
* @param crawldb
* path were db will be created
* @param init
* urls to be inserted, objects are of type URLCrawlDatum
* @throws Exception
*/
public static void createCrawlDb(Configuration conf, FileSystem fs,
Path crawldb, List<URLCrawlDatum> init) throws Exception {
LOG.trace("* creating crawldb: " + crawldb);
Path dir = new Path(crawldb, CrawlDb.CURRENT_NAME);
Option wKeyOpt = MapFile.Writer.keyClass(Text.class);
org.apache.hadoop.io.SequenceFile.Writer.Option wValueOpt = SequenceFile.Writer.valueClass(CrawlDatum.class);
MapFile.Writer writer = new MapFile.Writer(conf, new Path(dir,
"part-00000"), wKeyOpt, wValueOpt);
Iterator<URLCrawlDatum> it = init.iterator();
while (it.hasNext()) {
URLCrawlDatum row = it.next();
LOG.info("adding:" + row.url.toString());
writer.append(new Text(row.url), row.datum);
}
writer.close();
}
TestLinkDbMerger.java 文件源码
项目:GeoCrawler
阅读 17
收藏 0
点赞 0
评论 0
private void createLinkDb(Configuration config, FileSystem fs, Path linkdb,
TreeMap<String, String[]> init) throws Exception {
LOG.fine("* creating linkdb: " + linkdb);
Path dir = new Path(linkdb, LinkDb.CURRENT_NAME);
Option wKeyOpt = MapFile.Writer.keyClass(Text.class);
org.apache.hadoop.io.SequenceFile.Writer.Option wValueOpt = SequenceFile.Writer.valueClass(Inlinks.class);
MapFile.Writer writer = new MapFile.Writer(config, new Path(dir,
"part-00000"), wKeyOpt, wValueOpt);
Iterator<String> it = init.keySet().iterator();
while (it.hasNext()) {
String key = it.next();
Inlinks inlinks = new Inlinks();
String[] vals = init.get(key);
for (int i = 0; i < vals.length; i++) {
Inlink in = new Inlink(vals[i], vals[i]);
inlinks.add(in);
}
writer.append(new Text(key), inlinks);
}
writer.close();
}
SegmentHandler.java 文件源码
项目:GeoCrawler
阅读 21
收藏 0
点赞 0
评论 0
/** Open the output generated by this format. */
private MapFile.Reader[] getReaders(String subDir) throws IOException {
Path dir = new Path(segmentDir, subDir);
FileSystem fs = dir.getFileSystem(conf);
Path[] names = FileUtil.stat2Paths(fs.listStatus(dir,
SegmentPathFilter.INSTANCE));
// sort names, so that hash partitioning works
Arrays.sort(names);
MapFile.Reader[] parts = new MapFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
parts[i] = new MapFile.Reader(names[i], conf);
}
return parts;
}
SequenceFileInputFormat.java 文件源码
项目:big-c
阅读 18
收藏 0
点赞 0
评论 0
@Override
protected List<FileStatus> listStatus(JobContext job
)throws IOException {
List<FileStatus> files = super.listStatus(job);
int len = files.size();
for(int i=0; i < len; ++i) {
FileStatus file = files.get(i);
if (file.isDirectory()) { // it's a MapFile
Path p = file.getPath();
FileSystem fs = p.getFileSystem(job.getConfiguration());
// use the data file
files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME)));
}
}
return files;
}
TestFileOutputCommitter.java 文件源码
项目:big-c
阅读 22
收藏 0
点赞 0
评论 0
private void validateMapFileOutputContent(
FileSystem fs, Path dir) throws IOException {
// map output is a directory with index and data files
Path expectedMapDir = new Path(dir, partFile);
assert(fs.getFileStatus(expectedMapDir).isDirectory());
FileStatus[] files = fs.listStatus(expectedMapDir);
int fileCount = 0;
boolean dataFileFound = false;
boolean indexFileFound = false;
for (FileStatus f : files) {
if (f.isFile()) {
++fileCount;
if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
indexFileFound = true;
}
else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
dataFileFound = true;
}
}
}
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
TestFileOutputCommitter.java 文件源码
项目:big-c
阅读 19
收藏 0
点赞 0
评论 0
private void validateMapFileOutputContent(
FileSystem fs, Path dir) throws IOException {
// map output is a directory with index and data files
Path expectedMapDir = new Path(dir, partFile);
assert(fs.getFileStatus(expectedMapDir).isDirectory());
FileStatus[] files = fs.listStatus(expectedMapDir);
int fileCount = 0;
boolean dataFileFound = false;
boolean indexFileFound = false;
for (FileStatus f : files) {
if (f.isFile()) {
++fileCount;
if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
indexFileFound = true;
}
else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
dataFileFound = true;
}
}
}
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
TestCodec.java 文件源码
项目:big-c
阅读 24
收藏 0
点赞 0
评论 0
private void codecTestMapFile(Class<? extends CompressionCodec> clazz,
CompressionType type, int records) throws Exception {
FileSystem fs = FileSystem.get(conf);
LOG.info("Creating MapFiles with " + records +
" records using codec " + clazz.getSimpleName());
Path path = new Path(new Path(
System.getProperty("test.build.data", "/tmp")),
clazz.getSimpleName() + "-" + type + "-" + records);
LOG.info("Writing " + path);
createMapFile(conf, fs, path, clazz.newInstance(), type, records);
MapFile.Reader reader = new MapFile.Reader(path, conf);
Text key1 = new Text("002");
assertNotNull(reader.get(key1, new Text()));
Text key2 = new Text("004");
assertNotNull(reader.get(key2, new Text()));
}
AbstractMapFileWriter.java 文件源码
项目:DataVec
阅读 20
收藏 0
点赞 0
评论 0
/**
*
* @param outputDir Output directory for the map file(s)
* @param mapFileSplitSize Split size for the map file: if 0, use a single map file for all output. If > 0,
* multiple map files will be used: each will contain a maximum of mapFileSplitSize.
* This can be used to avoid having a single multi gigabyte map file, which may be
* undesirable in some cases (transfer across the network, for example)
* @param convertTextTo If null: Make no changes to Text writable objects. If non-null, Text writable instances
* will be converted to this type. This is useful, when would rather store numerical values
* even if the original record reader produces strings/text.
* @param indexInterval Index interval for the Map file. Defaults to 1, which is suitable for most cases
* @param filenamePattern The naming pattern for the map files. Used with String.format(pattern, int)
* @param hadoopConfiguration Hadoop configuration.
*/
public AbstractMapFileWriter(@NonNull File outputDir, int mapFileSplitSize, WritableType convertTextTo,
int indexInterval, String filenamePattern,
org.apache.hadoop.conf.Configuration hadoopConfiguration) {
if(indexInterval <= 0){
throw new UnsupportedOperationException("Index interval: must be >= 0 (got: " + indexInterval + ")");
}
this.outputDir = outputDir;
this.mapFileSplitSize = mapFileSplitSize;
if (convertTextTo == WritableType.Text) {
convertTextTo = null;
}
this.convertTextTo = convertTextTo;
this.indexInterval = indexInterval;
this.filenamePattern = filenamePattern;
this.hadoopConfiguration = hadoopConfiguration;
if(this.hadoopConfiguration.get(MAP_FILE_INDEX_INTERVAL_KEY) != null){
this.hadoopConfiguration.set(MAP_FILE_INDEX_INTERVAL_KEY, String.valueOf(indexInterval));
}
opts = new SequenceFile.Writer.Option[]{MapFile.Writer.keyClass(KEY_CLASS),
SequenceFile.Writer.valueClass(getValueClass())};
}
MapFileReader.java 文件源码
项目:DataVec
阅读 41
收藏 0
点赞 0
评论 0
public MapFileReader(List<String> paths, IndexToKey indexToKey, Class<? extends Writable> recordClass)
throws IOException {
this.indexToKey = indexToKey;
this.recordClass = recordClass;
this.readers = new MapFile.Reader[paths.size()];
SequenceFile.Reader.Option[] opts = new SequenceFile.Reader.Option[0];
Configuration config = new Configuration();
for (int i = 0; i < paths.size(); i++) {
readers[i] = new MapFile.Reader(new Path(paths.get(i)), config, opts);
if (readers[i].getValueClass() != recordClass) {
throw new UnsupportedOperationException("MapFile record class: " + readers[i].getValueClass()
+ ", but got class " + recordClass + ", path = " + paths.get(i));
}
}
recordIndexesEachReader = indexToKey.initialize(readers, recordClass);
}
HdfsProducerTest.java 文件源码
项目:Camel
阅读 25
收藏 0
点赞 0
评论 0
@Test
public void testMapWriteTextWithKey() throws Exception {
if (!canTest()) {
return;
}
String txtKey = "THEKEY";
String txtValue = "CIAO MONDO !";
template.sendBodyAndHeader("direct:write_text3", txtValue, "KEY", txtKey);
Configuration conf = new Configuration();
MapFile.Reader reader = new MapFile.Reader(new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text3"), conf);
Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
reader.next(key, value);
assertEquals(key.toString(), txtKey);
assertEquals(value.toString(), txtValue);
IOHelper.close(reader);
}
HdfsProducerTest.java 文件源码
项目:Camel
阅读 25
收藏 0
点赞 0
评论 0
@Test
public void testMapWriteTextWithKey() throws Exception {
if (!canTest()) {
return;
}
String txtKey = "THEKEY";
String txtValue = "CIAO MONDO !";
template.sendBodyAndHeader("direct:write_text3", txtValue, "KEY", txtKey);
Configuration conf = new Configuration();
Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text3");
FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
MapFile.Reader reader = new MapFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "/test-camel-text3", conf);
Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
reader.next(key, value);
assertEquals(key.toString(), txtKey);
assertEquals(value.toString(), txtValue);
IOHelper.close(reader);
}
TestFileOutputCommitter.java 文件源码
项目:hadoop-2.6.0-cdh5.4.3
阅读 17
收藏 0
点赞 0
评论 0
private void validateMapFileOutputContent(
FileSystem fs, Path dir) throws IOException {
// map output is a directory with index and data files
Path expectedMapDir = new Path(dir, partFile);
assert(fs.getFileStatus(expectedMapDir).isDirectory());
FileStatus[] files = fs.listStatus(expectedMapDir);
int fileCount = 0;
boolean dataFileFound = false;
boolean indexFileFound = false;
for (FileStatus f : files) {
if (f.isFile()) {
++fileCount;
if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
indexFileFound = true;
}
else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
dataFileFound = true;
}
}
}
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
TestFileOutputCommitter.java 文件源码
项目:hadoop-2.6.0-cdh5.4.3
阅读 16
收藏 0
点赞 0
评论 0
private void validateMapFileOutputContent(
FileSystem fs, Path dir) throws IOException {
// map output is a directory with index and data files
Path expectedMapDir = new Path(dir, partFile);
assert(fs.getFileStatus(expectedMapDir).isDirectory());
FileStatus[] files = fs.listStatus(expectedMapDir);
int fileCount = 0;
boolean dataFileFound = false;
boolean indexFileFound = false;
for (FileStatus f : files) {
if (f.isFile()) {
++fileCount;
if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
indexFileFound = true;
}
else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
dataFileFound = true;
}
}
}
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
TestCodec.java 文件源码
项目:hadoop-2.6.0-cdh5.4.3
阅读 25
收藏 0
点赞 0
评论 0
private void codecTestMapFile(Class<? extends CompressionCodec> clazz,
CompressionType type, int records) throws Exception {
FileSystem fs = FileSystem.get(conf);
LOG.info("Creating MapFiles with " + records +
" records using codec " + clazz.getSimpleName());
Path path = new Path(new Path(
System.getProperty("test.build.data", "/tmp")),
clazz.getSimpleName() + "-" + type + "-" + records);
LOG.info("Writing " + path);
createMapFile(conf, fs, path, clazz.newInstance(), type, records);
MapFile.Reader reader = new MapFile.Reader(path, conf);
Text key1 = new Text("002");
assertNotNull(reader.get(key1, new Text()));
Text key2 = new Text("004");
assertNotNull(reader.get(key2, new Text()));
}
MapFileOutputFormat.java 文件源码
项目:hadoop-2.6.0-cdh5.4.3
阅读 17
收藏 0
点赞 0
评论 0
/** Open the output generated by this format. */
public static MapFile.Reader[] getReaders(FileSystem ignored, Path dir,
Configuration conf)
throws IOException {
FileSystem fs = dir.getFileSystem(conf);
Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
// sort names, so that hash partitioning works
Arrays.sort(names);
MapFile.Reader[] parts = new MapFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
}
return parts;
}
SequenceFileInputFormat.java 文件源码
项目:hadoop-2.6.0-cdh5.4.3
阅读 17
收藏 0
点赞 0
评论 0
@Override
protected List<FileStatus> listStatus(JobContext job
)throws IOException {
List<FileStatus> files = super.listStatus(job);
int len = files.size();
for(int i=0; i < len; ++i) {
FileStatus file = files.get(i);
if (file.isDir()) { // it's a MapFile
Path p = file.getPath();
FileSystem fs = p.getFileSystem(job.getConfiguration());
// use the data file
files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME)));
}
}
return files;
}
MapFileOutputFormat.java 文件源码
项目:hadoop-EAR
阅读 21
收藏 0
点赞 0
评论 0
/** Open the output generated by this format. */
public static MapFile.Reader[] getReaders(FileSystem ignored, Path dir,
Configuration conf)
throws IOException {
FileSystem fs = dir.getFileSystem(conf);
Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
// sort names, so that hash partitioning works
Arrays.sort(names);
MapFile.Reader[] parts = new MapFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
}
return parts;
}