`

MapReduce牛逼(1)MR单词计数例子

 
阅读更多

package cmd;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ConfiguredTest extends Configured implements Tool {

	@Override
	public int run(String[] args) throws Exception {
		String INPUT_PAHT = args[0];
		String OUTPUT_PAHT = args[1];

		Job job = new Job(new Configuration(), ConfiguredTest.class.getName());
		job.setJarByClass(ConfiguredTest.class);
		// 1.1 输入
		FileInputFormat.setInputPaths(job, new Path(INPUT_PAHT));
		job.setInputFormatClass(TextInputFormat.class);
		// 1.2 Mapper
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		// 1.3 分区
		job.setPartitionerClass(HashPartitioner.class);
		job.setNumReduceTasks(1);
		// 1.4 排序、分组
		job.setGroupingComparatorClass(cls)
		job.setSortComparatorClass(cls);
		// 1.5 规约合并
		job.setCombinerClass(MyReducer.class);

		// 2.1 suffered 多个mapper 通过网络,传输到各自分区的reducer上
		// 2.2 reducer
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		// 2.3 输出
		job.setOutputFormatClass(TextOutputFormat.class);
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PAHT));

		job.waitForCompletion(true);
		return 0;
	}

	public static void main(String[] args) throws Exception {
		ConfiguredTest configuredTest = new ConfiguredTest();
		ToolRunner.run(configuredTest.getConf(), configuredTest, args);
	}

	static class MyMapper extends
			Mapper<LongWritable, Text, Text, LongWritable> {
		protected void map(
				LongWritable key,
				Text value,
				org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, LongWritable>.Context context)
				throws java.io.IOException, InterruptedException {

			String[] split = value.toString().split("\t");
			for (String str : split) {
				context.write(new Text(str), new LongWritable(1));
			}
		};
	}

	static class MyReducer extends
			Reducer<Text, LongWritable, Text, LongWritable> {
		protected void reduce(
				Text key,
				java.lang.Iterable<LongWritable> it,
				org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)
				throws java.io.IOException, InterruptedException {

			long num = 0;
			for (LongWritable longWritable : it) {
				num += longWritable.get();
			}
			context.write(key, new LongWritable(num));

		};
	}

}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics