2008-05-08

hadoop的reducer输出多个文件

关键字: hadoop, mapreduce
有时候我们想到这样的功能: reducer能根据key(或value)值来输出多个文件,同一key(或value)处于同一个文件中。现在hadoop的0.17.x版本可以重写MultipleOutputFormat的generateFileNameForKeyValue就可以实现此功能。

比如:
package org.apache.hadoop.mapred.lib;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;

public class MultipleTextOutputFormat<K extends WritableComparable, V extends Writable>
    extends MultipleOutputFormat<K, V> {

  private TextOutputFormat<K, V> theTextOutputFormat = null;

  @Override
  protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
      String name, Progressable arg3) throws IOException {
    if (theTextOutputFormat == null) {
      theTextOutputFormat = new TextOutputFormat<K, V>();
    }
    return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
  }

	@Override
	protected String generateFileNameForKeyValue(K key, V value, String name) {
		return name + "_" + value.toString();
	}
  
  
}


试一下wordcount这个例子,把WordCount.java的run函数加上一行
conf.setOutputFormat(org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.class);

public int run(String[] args) throws Exception {
    JobConf conf = new JobConf(getConf(), WordCount.class);
    conf.setJobName("wordcount");
 
    // the keys are words (strings)
    conf.setOutputKeyClass(Text.class);
    // the values are counts (ints)
    conf.setOutputValueClass(IntWritable.class);
    
    conf.setMapperClass(MapClass.class);        
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);
    
    conf.setOutputFormat(org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.class);
    
    List<String> other_args = new ArrayList<String>();
    for(int i=0; i < args.length; ++i) {
      try {
        if ("-m".equals(args[i])) {
          conf.setNumMapTasks(Integer.parseInt(args[++i]));
        } else if ("-r".equals(args[i])) {
          conf.setNumReduceTasks(Integer.parseInt(args[++i]));
        } else {
          other_args.add(args[i]);
        }
      } catch (NumberFormatException except) {
        System.out.println("ERROR: Integer expected instead of " + args[i]);
        return printUsage();
      } catch (ArrayIndexOutOfBoundsException except) {
        System.out.println("ERROR: Required parameter missing from " +
                           args[i-1]);
        return printUsage();
      }
    }
    // Make sure there are exactly 2 parameters left.
    if (other_args.size() != 2) {
      System.out.println("ERROR: Wrong number of parameters: " +
                         other_args.size() + " instead of 2.");
      return printUsage();
    }
    FileInputFormat.setInputPaths(conf, other_args.get(0));
    FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
        
    JobClient.runJob(conf);
    return 0;
  }


则使用
bin/hadoop jar build/hadoop-*-examples.jar wordcount conf wordcount_output
可输出一个目录wordcount_output
$ls wordcount_output/
part-00000_1    part-00000_13   part-00000_16  part-00000_214  part-00000_28  part-00000_38  part-00000_5   part-00000_8
part-00000_10   part-00000_14   part-00000_17  part-00000_22   part-00000_29  part-00000_4   part-00000_6   part-00000_9
part-00000_102  part-00000_141  part-00000_19  part-00000_23   part-00000_3   part-00000_42  part-00000_62
part-00000_11   part-00000_143  part-00000_2   part-00000_24   part-00000_31  part-00000_44  part-00000_63
part-00000_117  part-00000_15   part-00000_20  part-00000_25   part-00000_35  part-00000_46  part-00000_7
part-00000_12   part-00000_152  part-00000_21  part-00000_26   part-00000_36  part-00000_47  part-00000_70
评论
发表评论

您还没有登录,请登录后发表评论

coderplay
搜索本博客
存档
最新评论