@Override
protected void reduce(StatsUserDimension key, Iterable<TimeOutputValue> values, Context context)
throws IOException, InterruptedException {
this.unique.clear();
// 开始计算uuid的个数
for (TimeOutputValue value : values) {
this.unique.add(value.getId());// uid,用户ID
}
MapWritable map = new MapWritable();// 相当于java中的hashmap
map.put(new IntWritable(-1), new IntWritable(this.unique.size()));
outputValue.setValue(map);
// 设置kpi名称
String kpiName = key.getStatsCommon().getKpi().getKpiName();
if (KpiType.NEW_INSTALL_USER.name.equals(kpiName)) {
// 计算stats_user表中的新增用户
outputValue.setKpi(KpiType.NEW_INSTALL_USER);
} else if (KpiType.BROWSER_NEW_INSTALL_USER.name.equals(kpiName)) {
// 计算stats_device_browser表中的新增用户
outputValue.setKpi(KpiType.BROWSER_NEW_INSTALL_USER);
}
context.write(key, outputValue);
}
java类org.apache.hadoop.io.MapWritable的实例源码
NewInstallUserReducer.java 文件源码
项目:big_data
阅读 18
收藏 0
点赞 0
评论 0
JdbcExportJob.java 文件源码
项目:aliyun-maxcompute-data-collectors
阅读 24
收藏 0
点赞 0
评论 0
private void configureGenericRecordExportInputFormat(Job job, String tableName)
throws IOException {
ConnManager connManager = context.getConnManager();
Map<String, Integer> columnTypeInts;
if (options.getCall() == null) {
columnTypeInts = connManager.getColumnTypes(
tableName,
options.getSqlQuery());
} else {
columnTypeInts = connManager.getColumnTypesForProcedure(
options.getCall());
}
String[] specifiedColumns = options.getColumns();
MapWritable columnTypes = new MapWritable();
for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) {
String column = e.getKey();
column = (specifiedColumns == null) ? column : options.getColumnNameCaseInsensitive(column);
if (column != null) {
Text columnName = new Text(column);
Text columnType = new Text(connManager.toJavaType(tableName, column, e.getValue()));
columnTypes.put(columnName, columnType);
}
}
DefaultStringifier.store(job.getConfiguration(), columnTypes,
AvroExportMapper.AVRO_COLUMN_TYPES_MAP);
}
ParquetExportMapper.java 文件源码
项目:aliyun-maxcompute-data-collectors
阅读 19
收藏 0
点赞 0
评论 0
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
// Instantiate a copy of the user's class to hold and parse the record.
String recordClassName = conf.get(
ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
if (null == recordClassName) {
throw new IOException("Export table class name ("
+ ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+ ") is not set!");
}
try {
Class cls = Class.forName(recordClassName, true,
Thread.currentThread().getContextClassLoader());
recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
if (null == recordImpl) {
throw new IOException("Could not instantiate object of type "
+ recordClassName);
}
columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
MapWritable.class);
}
HostDatum.java 文件源码
项目:GeoCrawler
阅读 19
收藏 0
点赞 0
评论 0
@Override
public void readFields(DataInput in) throws IOException {
score = in.readFloat();
lastCheck = new Date(in.readLong());
homepageUrl = Text.readString(in);
dnsFailures = in.readInt();
connectionFailures = in.readInt();
unfetched= in.readInt();
fetched= in.readInt();
notModified= in.readInt();
redirTemp= in.readInt();
redirPerm = in.readInt();
gone = in.readInt();
metaData = new org.apache.hadoop.io.MapWritable();
metaData.readFields(in);
}
QueryUtils.java 文件源码
项目:incubator-pirk
阅读 48
收藏 0
点赞 0
评论 0
/**
* Pulls the correct selector from the MapWritable data element given the queryType
* <p>
* Pulls first element of array if element is an array type
*/
public static String getSelectorByQueryType(MapWritable dataMap, QuerySchema qSchema, DataSchema dSchema)
{
String selector;
String fieldName = qSchema.getSelectorName();
if (dSchema.isArrayElement(fieldName))
{
if (dataMap.get(dSchema.getTextName(fieldName)) instanceof WritableArrayWritable)
{
String[] selectorArray = ((WritableArrayWritable) dataMap.get(dSchema.getTextName(fieldName))).toStrings();
selector = selectorArray[0];
}
else
{
String[] elementArray = ((ArrayWritable) dataMap.get(dSchema.getTextName(fieldName))).toStrings();
selector = elementArray[0];
}
}
else
{
selector = dataMap.get(dSchema.getTextName(fieldName)).toString();
}
return selector;
}
FilterData.java 文件源码
项目:incubator-pirk
阅读 28
收藏 0
点赞 0
评论 0
@Override
public Boolean call(MapWritable dataElement) throws Exception
{
accum.incNumRecordsReceived(1);
// Perform the filter
boolean passFilter = ((DataFilter) filter).filterDataElement(dataElement, dSchema);
if (passFilter)
{
accum.incNumRecordsAfterFilter(1);
}
else
// false, then we filter out the record
{
accum.incNumRecordsFiltered(1);
}
return passFilter;
}
ComputeResponse.java 文件源码
项目:incubator-pirk
阅读 17
收藏 0
点赞 0
评论 0
/**
* Method to read in data from an allowed input source/format and perform the query
*/
public void performQuery() throws IOException, PIRException
{
logger.info("Performing query: ");
JavaRDD<MapWritable> inputRDD;
switch (dataInputFormat)
{
case InputFormatConst.BASE_FORMAT:
inputRDD = readData();
break;
case InputFormatConst.ES:
inputRDD = readDataES();
break;
default:
throw new PIRException("Unknown data input format " + dataInputFormat);
}
performQuery(inputRDD);
}
ComputeStreamingResponse.java 文件源码
项目:incubator-pirk
阅读 17
收藏 0
点赞 0
评论 0
/**
* Method to read in data from an allowed input source/format and perform the query
*/
public void performQuery() throws IOException, PIRException
{
logger.info("Performing query: ");
JavaDStream<MapWritable> inputRDD = null;
if (dataInputFormat.equals(InputFormatConst.BASE_FORMAT))
{
inputRDD = readData();
}
else if (dataInputFormat.equals(InputFormatConst.ES))
{
inputRDD = readDataES();
}
else
{
throw new PIRException("Unknown data input format " + dataInputFormat);
}
performQuery(inputRDD);
}
JSONRecordReader.java 文件源码
项目:incubator-pirk
阅读 23
收藏 0
点赞 0
评论 0
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException
{
key = new Text();
value = new MapWritable();
jsonParser = new JSONParser();
lineReader = new LineRecordReader();
lineReader.initialize(inputSplit, context);
queryString = context.getConfiguration().get("query", "?q=*");
// Load the data schemas
FileSystem fs = FileSystem.get(context.getConfiguration());
try
{
SystemConfiguration.setProperty("data.schemas", context.getConfiguration().get("data.schemas"));
DataSchemaLoader.initialize(true, fs);
} catch (Exception e)
{
e.printStackTrace();
}
String dataSchemaName = context.getConfiguration().get("dataSchemaName");
dataSchema = DataSchemaRegistry.get(dataSchemaName);
}
AbstractMRNewApiSearchTest.java 文件源码
项目:es-hadoop-v2.2.0
阅读 24
收藏 0
点赞 0
评论 0
private Configuration createConf() throws IOException {
Configuration conf = HdpBootstrap.hadoopConfig();
HadoopCfgUtils.setGenericOptions(conf);
Job job = new Job(conf);
job.setInputFormatClass(EsInputFormat.class);
job.setOutputFormatClass(PrintStreamOutputFormat.class);
job.setOutputKeyClass(Text.class);
boolean type = random.nextBoolean();
Class<?> mapType = (type ? MapWritable.class : LinkedMapWritable.class);
job.setOutputValueClass(mapType);
conf.set(ConfigurationOptions.ES_QUERY, query);
conf.set(ConfigurationOptions.ES_READ_METADATA, String.valueOf(readMetadata));
conf.set(ConfigurationOptions.ES_OUTPUT_JSON, String.valueOf(readAsJson));
QueryTestParams.provisionQueries(conf);
job.setNumReduceTasks(0);
//PrintStreamOutputFormat.stream(conf, Stream.OUT);
Configuration cfg = job.getConfiguration();
HdpBootstrap.addProperties(cfg, TestSettings.TESTING_PROPS, false);
return cfg;
}
AbstractExtraMRTests.java 文件源码
项目:es-hadoop-v2.2.0
阅读 17
收藏 0
点赞 0
评论 0
private JobConf createReadJobConf() throws IOException {
JobConf conf = HdpBootstrap.hadoopConfig();
conf.setInputFormat(EsInputFormat.class);
conf.setOutputFormat(PrintStreamOutputFormat.class);
conf.setOutputKeyClass(Text.class);
boolean type = random.nextBoolean();
Class<?> mapType = (type ? MapWritable.class : LinkedMapWritable.class);
conf.setOutputValueClass(MapWritable.class);
HadoopCfgUtils.setGenericOptions(conf);
conf.setNumReduceTasks(0);
conf.set(ConfigurationOptions.ES_READ_METADATA, String.valueOf(random.nextBoolean()));
conf.set(ConfigurationOptions.ES_READ_METADATA_VERSION, String.valueOf(true));
conf.set(ConfigurationOptions.ES_OUTPUT_JSON, "true");
FileInputFormat.setInputPaths(conf, new Path(TestUtils.gibberishDat(conf)));
return conf;
}
AbstractMROldApiSearchTest.java 文件源码
项目:es-hadoop-v2.2.0
阅读 17
收藏 0
点赞 0
评论 0
private JobConf createJobConf() throws IOException {
JobConf conf = HdpBootstrap.hadoopConfig();
conf.setInputFormat(EsInputFormat.class);
conf.setOutputFormat(PrintStreamOutputFormat.class);
conf.setOutputKeyClass(Text.class);
boolean type = random.nextBoolean();
Class<?> mapType = (type ? MapWritable.class : LinkedMapWritable.class);
conf.setOutputValueClass(mapType);
HadoopCfgUtils.setGenericOptions(conf);
conf.set(ConfigurationOptions.ES_QUERY, query);
conf.setNumReduceTasks(0);
conf.set(ConfigurationOptions.ES_READ_METADATA, String.valueOf(readMetadata));
conf.set(ConfigurationOptions.ES_READ_METADATA_VERSION, String.valueOf(true));
conf.set(ConfigurationOptions.ES_OUTPUT_JSON, String.valueOf(readAsJson));
QueryTestParams.provisionQueries(conf);
FileInputFormat.setInputPaths(conf, new Path(TestUtils.sampleArtistsDat()));
HdpBootstrap.addProperties(conf, TestSettings.TESTING_PROPS, false);
return conf;
}
SolrReader.java 文件源码
项目:hive-solr-search
阅读 16
收藏 0
点赞 0
评论 0
@Override
public boolean next(LongWritable keyHolder, MapWritable valueHolder)
throws IOException {
if (StringUtils.isBlank(facetMapping)) {
SolrDocument doc = cursor.nextDocument();
if (doc == null) {
return false;
}
keyHolder.set(pos++);
Object[] values = new Object[solrColumns.length];
for (int i = 0; i < solrColumns.length; i++) {
values[i] = doc.getFieldValue(solrColumns[i]);
}
setValueHolder(valueHolder, values);
} else {
FacetEntry facetEntry = cursor.nextFacetEntry();
if (facetEntry == null) {
return false;
}
keyHolder.set(pos++);
setValueHolder(valueHolder, new Object[] { facetEntry.getValue(),
facetEntry.getCount() });
}
return true;
}
SolrWriter.java 文件源码
项目:hive-solr-search
阅读 18
收藏 0
点赞 0
评论 0
@Override
public void write(Writable w) throws IOException {
MapWritable map = (MapWritable) w;
SolrInputDocument doc = new SolrInputDocument();
for (final Map.Entry<Writable, Writable> entry : map.entrySet()) {
String key = entry.getKey().toString();
if (entry.getValue() instanceof TimestampWritable) {
Timestamp t = ((TimestampWritable)entry.getValue()).getTimestamp();
doc.setField(key, dateFormat.format( new Date(t.getTime()) ));
} else if (entry.getValue() instanceof ShortWritable) {
doc.setField(key, ((ShortWritable)entry.getValue()).get());
} else {
doc.setField(key, entry.getValue().toString());
}
}
log.debug("doc:"+doc.toString());
table.save(doc);
}
DataValidationReducer.java 文件源码
项目:jumbune
阅读 16
收藏 0
点赞 0
评论 0
private void processTupleViolation(MapWritable fieldMapWritable,
Map<String, Integer> fieldFileViolationsMap, StringBuffer wb,
DataViolationWritableBean fileViolationsWritable, String fileName)
throws IOException {
IntWritable fieldNumber = new IntWritable();
IntWritable fieldViolations = new IntWritable(0);
int violations;
fieldNumber = new IntWritable(fileViolationsWritable.getFieldNumber());
fieldViolations = (IntWritable) fieldMapWritable.get((fieldNumber));
fieldViolations = setFieldViolations(fieldViolations);
fieldMapWritable.put(fieldNumber, fieldViolations);
violations = extractViolationsFromMap(fieldFileViolationsMap, fileName);
violations += 1;
fieldFileViolationsMap.put(fileName, violations);
writeViolationsToBuffer(fileViolationsWritable, fileName, wb, violations);
}
KafkaKey.java 文件源码
项目:HiveKa
阅读 20
收藏 0
点赞 0
评论 0
@Override
public void readFields(DataInput in) throws IOException {
this.leaderId = UTF8.readString(in);
this.partition = in.readInt();
this.beginOffset = in.readLong();
this.offset = in.readLong();
this.checksum = in.readLong();
this.topic = in.readUTF();
this.time = in.readLong();
this.server = in.readUTF(); // left for legacy
this.service = in.readUTF(); // left for legacy
this.partitionMap = new MapWritable();
try {
this.partitionMap.readFields(in);
} catch (IOException e) {
this.setServer(this.server);
this.setService(this.service);
}
}
MapIdxWritable.java 文件源码
项目:piggybank-squeal
阅读 16
收藏 0
点赞 0
评论 0
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof MapWritable) {
Map map = (Map) obj;
if (size() != map.size()) {
return false;
}
return entrySet().equals(map.entrySet());
}
return false;
}
NutchMap.java 文件源码
项目:YarnExamples
阅读 16
收藏 0
点赞 0
评论 0
@Override
public void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
TrecOLParser document = new TrecOLParser(value.toString());
documentAnalyzed = new MapWritable();
if (document.isParsed()) {
this.tokenizer.tokenize(document.getDocContent());
while (this.tokenizer.hasMoreTokens()) {
IntWritable counter = CastingTypes.zero;
String newTerm = this.tokenizer.nextToken();
Text term = new Text(newTerm);
if (documentAnalyzed.containsKey(term)) {
counter = CastingTypes.strToIntWr(documentAnalyzed.get(term).toString());
}
documentAnalyzed.put(term, CastingTypes.intToIntWr(counter.get()+1));
}
if ( ! documentAnalyzed.isEmpty()) {
context.write(CastingTypes.strToIntWr(document.getDocId()), documentAnalyzed);
}
}
}
EsFeederMapper.java 文件源码
项目:datacentermr
阅读 17
收藏 0
点赞 0
评论 0
@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
String prefix = conf.get("prefix");
MapWritable doc = new MapWritable();
String[] line = value.toString().split(",");
doc.put(new Text(prefix+"Id"),new Text(line[1]+"-"+line[2]+"-"+line[0]));
doc.put(new Text(prefix+"SiteName"), new Text(line[1]));
doc.put(new Text(prefix+"RoomName"), new Text(line[2]));
doc.put(new Text(prefix+"Fecha"), new Text(line[3].replace(' ','T')));
doc.put(new Text(prefix+"Power"), new FloatWritable(Float.parseFloat(line[4])));
doc.put(new Text(prefix+"Temp"), new FloatWritable(Float.parseFloat(line[5])));
doc.put(new Text(prefix+"Humidity"), new FloatWritable(Float.parseFloat(line[6])));
doc.put(new Text(prefix+"Timestamp"), new Text(line[6].replace(' ','T')));
context.write(NullWritable.get(), doc);
}
EsFeeder.java 文件源码
项目:datacentermr
阅读 20
收藏 0
点赞 0
评论 0
@Override
public int run(String[] args) throws Exception{
Configuration conf = super.getConf();
optParser(args);
conf.set("es.nodes", this.servers);
conf.set("prefix",this.prefix);
conf.set("es.resource", this.index + "/{"+this.prefix+"SiteName}");
conf.set("es.mapping.id",this.prefix+"Id");
Job job = Job.getInstance(conf,"Description");
job.setJarByClass(EsFeeder.class);
job.setMapperClass(datacentermr.EsFeederMapper.class);
job.setSpeculativeExecution(false);
job.setOutputFormatClass(EsOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(MapWritable.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(this.input));
System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}
CassandraHiveRecordReader.java 文件源码
项目:Hive-Cassandra
阅读 16
收藏 0
点赞 0
评论 0
private void populateMap(SortedMap<ByteBuffer, IColumn> cvalue, MapWritable value)
{
for (Map.Entry<ByteBuffer, IColumn> e : cvalue.entrySet())
{
ByteBuffer k = e.getKey();
IColumn v = e.getValue();
if (!v.isLive()) {
continue;
}
BytesWritable newKey = convertByteBuffer(k);
BytesWritable newValue = convertByteBuffer(v.value());
value.put(newKey, newValue);
}
}
AzureTablesRecordReader.java 文件源码
项目:hive-azuretables
阅读 16
收藏 0
点赞 0
评论 0
/**
* Grabs the next result and process the DynamicTableEntity into a Hive
* friendly MapWriteable
*
* @param key
* The RowID for the entity. Not that this is not really an Azure
* key, since the partition is implicit in the key
* @param value
* A MapWriteable which will be populated with values from the
* DynamicTableEntity returned by the Azure query.
*/
public boolean next(Text key, MapWritable value) throws IOException {
if (!results.hasNext())
return false;
DynamicTableEntity entity = results.next();
key.set(entity.getRowKey());
for (Entry<String, EntityProperty> entry : entity.getProperties()
.entrySet()) {
final EntityProperty property = entry.getValue();
// Note that azure table entity keys are forced to lower case for
// matching with hive column names
final String propertyKey = entry.getKey().toLowerCase();
final String propertyValue = property.getValueAsString();
final Writable writableValue = SERIALIZED_NULL
.equals(propertyValue) ? NullWritable.get() : new Text(
propertyValue);
value.put(new Text(propertyKey), writableValue);
}
pos++;
return true;
}
AzureTablesSerDe.java 文件源码
项目:hive-azuretables
阅读 18
收藏 0
点赞 0
评论 0
@Override
public Object deserialize(final Writable wr) throws SerDeException {
if (!(wr instanceof MapWritable)) {
throw new SerDeException("Expected MapWritable, received "
+ wr.getClass().getName());
}
final MapWritable input = (MapWritable) wr;
final Text t = new Text();
row.clear();
for (int i = 0; i < fieldCount; i++) {
t.set(columnNames.get(i));
final Writable value = input.get(t);
if (value != null && !NullWritable.get().equals(value)) {
row.add(value.toString());
} else {
row.add(null);
}
}
return row;
}
AbstractMRNewApiSearchTest.java 文件源码
项目:elasticsearch-hadoop
阅读 17
收藏 0
点赞 0
评论 0
private Configuration createConf() throws IOException {
Configuration conf = HdpBootstrap.hadoopConfig();
HadoopCfgUtils.setGenericOptions(conf);
Job job = new Job(conf);
job.setInputFormatClass(EsInputFormat.class);
job.setOutputFormatClass(PrintStreamOutputFormat.class);
job.setOutputKeyClass(Text.class);
boolean type = random.nextBoolean();
Class<?> mapType = (type ? MapWritable.class : LinkedMapWritable.class);
job.setOutputValueClass(mapType);
conf.set(ConfigurationOptions.ES_QUERY, query);
conf.set(ConfigurationOptions.ES_READ_METADATA, String.valueOf(readMetadata));
conf.set(ConfigurationOptions.ES_OUTPUT_JSON, String.valueOf(readAsJson));
QueryTestParams.provisionQueries(conf);
job.setNumReduceTasks(0);
//PrintStreamOutputFormat.stream(conf, Stream.OUT);
Configuration cfg = job.getConfiguration();
HdpBootstrap.addProperties(cfg, TestSettings.TESTING_PROPS, false);
return cfg;
}
AbstractExtraMRTests.java 文件源码
项目:elasticsearch-hadoop
阅读 27
收藏 0
点赞 0
评论 0
private JobConf createReadJobConf() throws IOException {
JobConf conf = HdpBootstrap.hadoopConfig();
conf.setInputFormat(EsInputFormat.class);
conf.setOutputFormat(PrintStreamOutputFormat.class);
conf.setOutputKeyClass(Text.class);
boolean type = random.nextBoolean();
Class<?> mapType = (type ? MapWritable.class : LinkedMapWritable.class);
conf.setOutputValueClass(MapWritable.class);
HadoopCfgUtils.setGenericOptions(conf);
conf.setNumReduceTasks(0);
conf.set(ConfigurationOptions.ES_READ_METADATA, String.valueOf(random.nextBoolean()));
conf.set(ConfigurationOptions.ES_READ_METADATA_VERSION, String.valueOf(true));
conf.set(ConfigurationOptions.ES_OUTPUT_JSON, "true");
FileInputFormat.setInputPaths(conf, new Path(TestUtils.gibberishDat(conf)));
return conf;
}
AbstractMROldApiSearchTest.java 文件源码
项目:elasticsearch-hadoop
阅读 19
收藏 0
点赞 0
评论 0
private JobConf createJobConf() throws IOException {
JobConf conf = HdpBootstrap.hadoopConfig();
conf.setInputFormat(EsInputFormat.class);
conf.setOutputFormat(PrintStreamOutputFormat.class);
conf.setOutputKeyClass(Text.class);
boolean type = random.nextBoolean();
Class<?> mapType = (type ? MapWritable.class : LinkedMapWritable.class);
conf.setOutputValueClass(mapType);
HadoopCfgUtils.setGenericOptions(conf);
conf.set(ConfigurationOptions.ES_QUERY, query);
conf.setNumReduceTasks(0);
conf.set(ConfigurationOptions.ES_READ_METADATA, String.valueOf(readMetadata));
conf.set(ConfigurationOptions.ES_READ_METADATA_VERSION, String.valueOf(true));
conf.set(ConfigurationOptions.ES_OUTPUT_JSON, String.valueOf(readAsJson));
QueryTestParams.provisionQueries(conf);
FileInputFormat.setInputPaths(conf, new Path(TestUtils.sampleArtistsDat()));
HdpBootstrap.addProperties(conf, TestSettings.TESTING_PROPS, false);
return conf;
}
JdbcExportJob.java 文件源码
项目:sqoop
阅读 23
收藏 0
点赞 0
评论 0
@Override
protected void configureInputFormat(Job job, String tableName,
String tableClassName, String splitByCol)
throws ClassNotFoundException, IOException {
fileType = getInputFileType();
super.configureInputFormat(job, tableName, tableClassName, splitByCol);
if (fileType == FileType.AVRO_DATA_FILE) {
LOG.debug("Configuring for Avro export");
ConnManager connManager = context.getConnManager();
Map<String, Integer> columnTypeInts =
connManager.getColumnTypes(tableName, options.getSqlQuery());
MapWritable columnTypes = new MapWritable();
for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) {
Text columnName = new Text(e.getKey());
Text columnText = new Text(
connManager.toJavaType(tableName, e.getKey(), e.getValue()));
columnTypes.put(columnName, columnText);
}
DefaultStringifier.store(job.getConfiguration(), columnTypes,
AvroExportMapper.AVRO_COLUMN_TYPES_MAP);
}
}
MapMultipleValuesReducer.java 文件源码
项目:HadoopHowTo
阅读 32
收藏 0
点赞 0
评论 0
@Override
public void reduce(Text key, Iterable<MapWritable> values, Context context)
throws IOException, InterruptedException {
for (MapWritable value : values) {
int i = ((IntWritable) value.get(new IntWritable(1))).get();
float f = ((FloatWritable) value.get(new IntWritable(2))).get();
String s = ((Text) value.get(new IntWritable(3))).toString();
mw.put(new IntWritable(1), new IntWritable(i * 2));
mw.put(new IntWritable(2), new FloatWritable(f * 2));
mw.put(new IntWritable(3), new Text(s));
context.write(key, mw);
}
}
XPathApplierTextReducer.java 文件源码
项目:alfred-mpi
阅读 16
收藏 0
点赞 0
评论 0
@Override
public void reduce(Text key, Iterable<MapWritable> listOfMaps, Context context) throws IOException, InterruptedException {
for (MapWritable partialResultMap : listOfMaps) {
for (Writable attributeText : partialResultMap.keySet()) {
MapWritable partialInsideMap = (MapWritable) partialResultMap.get(attributeText);
MapWritable partialOutputMap = new MapWritable();
for (Writable rule : partialInsideMap.keySet()) {
Text regola = (Text) rule;
Text valore = (Text) partialInsideMap.get(rule);
partialOutputMap.put(new Text(regola.toString()), new Text(valore.toString()));
}
result.put((Text)attributeText, partialOutputMap);
}
}
Text resultWrite = new Text(MapWritableConverter.toJsonText(result));
context.write(key,resultWrite);
}
MapWritableConverter.java 文件源码
项目:alfred-mpi
阅读 17
收藏 0
点赞 0
评论 0
private static Map<String, Map<String,String>> convertToMap(MapWritable inputMap) {
Map<String, Map<String,String>> mapResult = Maps.newHashMap();
for (Writable attributeText : inputMap.keySet()) {
MapWritable partialInsideMap = (MapWritable) inputMap.get(attributeText);
Map<String,String> partialOutputMap = Maps.newHashMap();
for (Writable rule : partialInsideMap.keySet()) {
Text regola = (Text) rule;
Text valore = (Text) partialInsideMap.get(rule);
partialOutputMap.put(regola.toString(), valore.toString());
}
mapResult.put(((Text)attributeText).toString(), partialOutputMap);
}
return mapResult;
}