`

MapReduce牛逼(2)MR简单实现 导入数据到hbase例子

 
阅读更多
package cmd;

/**
 * MapReduce 读取hdfs上的文件,
 *  以HTable.put(put)的方式在map中完成数据写入,无reduce过程
 */
import java.io.IOException;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class HBaseImport extends Configured implements Tool {
	static final Log LOG = LogFactory.getLog(HBaseImport.class);
	public static final String JOBNAME = "MRImport ";

	public static class Map extends
			Mapper<LongWritable, Text, NullWritable, NullWritable> {
		Configuration configuration = null;
		HTable xTable = null;
		private boolean wal = true;
		static long count = 0;

		@Override
		protected void cleanup(Context context) throws IOException,
				InterruptedException {
			super.cleanup(context);
			xTable.flushCommits();
			xTable.close();
		}

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String all[] = value.toString().split("/t");
			Put put = null;
			if (all.length == 2) {
				put = new Put(Bytes.toBytes(all[0]));
				put.add(Bytes.toBytes("xxx"), Bytes.toBytes("20110313"),
						Bytes.toBytes(all[1]));
			}
			if (!wal) {
				put.setWriteToWAL(false);
			}
			xTable.put(put);
			if ((++count % 100) == 0) {
				context.setStatus(count + " DOCUMENTS done!");
				context.progress();
				System.out.println(count + " DOCUMENTS done!");
			}
		}

		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			super.setup(context);
			configuration = context.getConfiguration();
			xTable = new HTable(configuration, "testKang");
			xTable.setAutoFlush(false);
			xTable.setWriteBufferSize(12 * 1024 * 1024);
			wal = true;
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		String input = args[0];
		Configuration conf = HBaseConfiguration.create(getConf());
		conf.set("hbase.master", "m0:60000");
		Job job = new Job(conf, JOBNAME);
		job.setJarByClass(HBaseImport.class);
		job.setMapperClass(Map.class);
		job.setNumReduceTasks(0);
		job.setInputFormatClass(TextInputFormat.class);
		TextInputFormat.setInputPaths(job, input);
		job.setOutputFormatClass(NullOutputFormat.class);

		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws IOException {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();
		int res = 1;
		try {
			res = ToolRunner.run(conf, new HBaseImport(), otherArgs);
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.exit(res);
	}

}

package data2hbase;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class DataToHBase {
	private static final String TABLE_NAME = "TrafficInfo";
	private static final String FAMILY_NAME = "cf";
	private static final String INPUT_PATH = "hdfs://hadoop.master:9000/traffic_in.dat";
//	private static final String OUT_PATH = "hdfs://hadoop.master:/9000/traffic_out.dat";

	public static void main(String[] args) throws Exception {
		// 创建table,
		Configuration conf = new Configuration();
		
//		conf.set("hbase.rootdir", "hdfs://hadoop.master:9000/hbase");
		//使用eclipse时必须添加这个,否则无法定位
//		conf.set("hbase.zookeeper.quorum",
//		"hadoop.master,hadoop.slave0,hadoop.slave1,hadoop.slave2");
		Configuration cf = HBaseConfiguration.create(conf);
		HBaseAdmin hbaseAdmin = new HBaseAdmin(cf);

		boolean tableExists = hbaseAdmin.tableExists(TABLE_NAME);
		if (tableExists) {
			hbaseAdmin.disableTable(TABLE_NAME);
			hbaseAdmin.deleteTable(TABLE_NAME);
			System.err.println("............del table: " + TABLE_NAME);
		}

		HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
		HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME.getBytes());
		desc.addFamily(family);
		hbaseAdmin.createTable(desc);
		System.err.println(".................create table: " + TABLE_NAME);

		// 1.1
		conf = new Configuration();
//		// 设置zookeeper
//		conf.set("hbase.zookeeper.quorum",
//				"hadoop.master,hadoop.slave0,hadoop.slave1,hadoop.slave2");
		// 设置hbase表名称
		conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);
		conf.set("dfs.socket.timeout", "180000");
		Job job = new Job(conf, DataToHBase.class.getName());
		job.setJarByClass(DataToHBase.class);
		job.setInputFormatClass(TextInputFormat.class);
		FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
		// 1.2
		job.setMapperClass(BatchImportMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		// 1.3
		job.setPartitionerClass(HashPartitioner.class);
		job.setNumReduceTasks(1);
		// 1.4
		// job.setGroupingComparatorClass(cls);
		// 1.5
		// job.setCombinerClass(cls)

		// 2.1
		// 2.2
		job.setReducerClass(BatchImportReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setOutputFormatClass(TableOutputFormat.class);

		// 2.3
//		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));

		job.waitForCompletion(true);

		// 只想批量导入

	}

	// static class BatchImportMapper extends TableMapper<Text, Text> {
	// protected void map(
	// org.apache.hadoop.hbase.io.ImmutableBytesWritable key,
	// org.apache.hadoop.hbase.client.Result value,
	// org.apache.hadoop.mapreduce.Mapper<org.apache.hadoop.hbase.io.ImmutableBytesWritable,
	// org.apache.hadoop.hbase.client.Result, Text, Text>.Context context)
	// throws java.io.IOException, InterruptedException {
	// };
	// }

	static class BatchImportMapper extends
			Mapper<LongWritable, Text, Text, Text> {

		SimpleDateFormat simpleDataFormat = new SimpleDateFormat(
				"yyyyMMddHHmmss");

		protected void map(
				LongWritable key,
				Text value,
				org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text>.Context context)
				throws java.io.IOException, InterruptedException {

			String[] split = value.toString().split("\t");
			String time = split[0].trim();
			System.err.println("=="+time+"==");
			String formatDate = simpleDataFormat.format(new Date(Long
					.parseLong(time)));
			context.write(new Text(split[1]), new Text(split[1] + ":"
					+ formatDate + "\t" + value.toString()));
		};
	}

	static class BatchImportReducer extends TableReducer<Text, Text, Text> {
		protected void reduce(
				Text k2,
				java.lang.Iterable<Text> v2s,
				org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, org.apache.hadoop.io.Writable>.Context context)
				throws java.io.IOException, InterruptedException {

			for (Text text : v2s) {
				String[] split = text.toString().split("\t");
				String tableRowKey = split[0].trim();
				String phoneNum = split[2];
				String upPackNum = split[7];
				String downPackNum = split[8];
				String upPayLoad = split[9];
				String downPayLoad = split[10];
				String host = split[5];

				Put put = new Put(tableRowKey.getBytes());
				put.add(FAMILY_NAME.getBytes(), "phoneNum".getBytes(),
						phoneNum.getBytes());
				put.add(FAMILY_NAME.getBytes(), "upPackNum".getBytes(),
						upPackNum.getBytes());
				put.add(FAMILY_NAME.getBytes(), "downPackNum".getBytes(),
						downPackNum.getBytes());
				put.add(FAMILY_NAME.getBytes(), "upPayLoad".getBytes(),
						upPayLoad.getBytes());
				put.add(FAMILY_NAME.getBytes(), "downPayLoad".getBytes(),
						downPayLoad.getBytes());
				put.add(FAMILY_NAME.getBytes(), "host".getBytes(),
						host.getBytes());

				context.write(new Text(tableRowKey), put);

				// HTable htable=new HTable(new Configuration(),TABLE_NAME);
				// htable.put(put);
				//
				// context.write(new Text(tableRowKey), new Text(text));
			}

		};
	}

}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics