本文共 3556 字,大约阅读时间需要 11 分钟。
package mr.hdfstoHbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/** * 从hdfs中插入数据到hbase中,批量导入 * 从hdfs中map阶段获取相关数据,保存为hfile格式 * 然后在hfile插入hbase */public class HdfsToHbaseBulk { public static class HdfsToHbaseBulkMapper extends Mapper{ private ImmutableBytesWritable mapKey = new ImmutableBytesWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); mapKey.set(Bytes.toBytes(split[0])); Put put = new Put(Bytes.toBytes(split[0])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(split[2])); context.write(mapKey, put); } } public static void main(String[] args) throws Exception { System.setProperty("hadoop.home.dir", "E:\\software\\bigdate\\hadoop-2.6.0-cdh5.15.0\\hadoop-2.6.0-cdh5.15.0"); //在调试阶段 可以在window下本地执行 //和hdfs 连接 Configuration conf = new Configuration(); //hdfs入口 conf.set("fs.defaultFS", "hdfs://wang:9000"); //和hbase连接 conf.set("zookeeper.znode.parent", "/hbase"); conf.set("hbase.zookeeper.quorum", "wang"); conf.set("hbase.zookeeper.property.clientPort", "2181"); Job job = Job.getInstance(conf); job.setJobName("HdfsToHbaseBulkJob"); job.setJarByClass(HdfsToHbaseBulk.class); //设置input hdfs路径 Path inputPath = new Path("/user/wang/hbase_data/human.txt"); FileInputFormat.addInputPath(job, inputPath); //map 集群中执行任务 遵循 移动计算 而不移动数据 //Mapper类 job.setMapperClass(HdfsToHbaseBulkMapper.class); //key类 job.setMapOutputKeyClass(ImmutableBytesWritable.class); //value类 job.setMapOutputValueClass(Put.class); //output Path outputPath = new Path("/user/wang/hbase_data/BlukOut5"); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(Bytes.toBytes("hadoop:human"))); RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(Bytes.toBytes("hadoop:human"))); HFileOutputFormat2.configureIncrementalLoad(job,table,regionLocator); job.setOutputFormatClass(HFileOutputFormat2.class); //执行任务 FileOutputFormat.setOutputPath(job, outputPath); boolean flag = job.waitForCompletion(true); //mr任务正常执行后,会在对应目录下生成hfile文件 if(flag){ //hbase 导入hfile格式文件使用LoadIncrementalHFiles LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(conf); //解析hile文件,将hfile 上传到hbase loadIncrementalHFiles.doBulkLoad(outputPath,connection.getAdmin(),table,regionLocator); } }}
转载地址:http://rrjxi.baihongyu.com/