博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
从hdfs中插入数据到hbase中
阅读量:4160 次
发布时间:2019-05-26

本文共 3556 字,大约阅读时间需要 11 分钟。

package mr.hdfstoHbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/** * 从hdfs中插入数据到hbase中,批量导入 * 从hdfs中map阶段获取相关数据,保存为hfile格式 * 然后在hfile插入hbase */public class HdfsToHbaseBulk {    public static class HdfsToHbaseBulkMapper extends            Mapper
{ private ImmutableBytesWritable mapKey = new ImmutableBytesWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); mapKey.set(Bytes.toBytes(split[0])); Put put = new Put(Bytes.toBytes(split[0])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(split[2])); context.write(mapKey, put); } } public static void main(String[] args) throws Exception { System.setProperty("hadoop.home.dir", "E:\\software\\bigdate\\hadoop-2.6.0-cdh5.15.0\\hadoop-2.6.0-cdh5.15.0"); //在调试阶段 可以在window下本地执行 //和hdfs 连接 Configuration conf = new Configuration(); //hdfs入口 conf.set("fs.defaultFS", "hdfs://wang:9000"); //和hbase连接 conf.set("zookeeper.znode.parent", "/hbase"); conf.set("hbase.zookeeper.quorum", "wang"); conf.set("hbase.zookeeper.property.clientPort", "2181"); Job job = Job.getInstance(conf); job.setJobName("HdfsToHbaseBulkJob"); job.setJarByClass(HdfsToHbaseBulk.class); //设置input hdfs路径 Path inputPath = new Path("/user/wang/hbase_data/human.txt"); FileInputFormat.addInputPath(job, inputPath); //map 集群中执行任务 遵循 移动计算 而不移动数据 //Mapper类 job.setMapperClass(HdfsToHbaseBulkMapper.class); //key类 job.setMapOutputKeyClass(ImmutableBytesWritable.class); //value类 job.setMapOutputValueClass(Put.class); //output Path outputPath = new Path("/user/wang/hbase_data/BlukOut5"); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(Bytes.toBytes("hadoop:human"))); RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(Bytes.toBytes("hadoop:human"))); HFileOutputFormat2.configureIncrementalLoad(job,table,regionLocator); job.setOutputFormatClass(HFileOutputFormat2.class); //执行任务 FileOutputFormat.setOutputPath(job, outputPath); boolean flag = job.waitForCompletion(true); //mr任务正常执行后,会在对应目录下生成hfile文件 if(flag){ //hbase 导入hfile格式文件使用LoadIncrementalHFiles LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(conf); //解析hile文件,将hfile 上传到hbase loadIncrementalHFiles.doBulkLoad(outputPath,connection.getAdmin(),table,regionLocator); } }}

 

转载地址:http://rrjxi.baihongyu.com/

你可能感兴趣的文章
向其他软件窗口、控件发送消息的方法
查看>>
word或者pdf文件全部保存为图片的方法
查看>>
VS2010下SQLite3生成lib库文件
查看>>
sqlite3的helloworld
查看>>
MFC下支持中文的SQLite3封装类使用
查看>>
简单高效的多线程日志类
查看>>
研华USB4711A采集卡高速中断模式采集总结
查看>>
从零起步CMFCToolBar用法详解
查看>>
CMFCRibbonStatusBar用法
查看>>
CMFCControlRendererInfo类的参数
查看>>
史上最详细MFC调用mapX5.02.26步骤(附地图测试GST文件)
查看>>
CMFCShellListCtrl使用方法
查看>>
mapnik的demo运行
查看>>
python支持下的mapnik安装
查看>>
milvus手册
查看>>
多目标跟踪的简单理解
查看>>
Near-Online Multi-target Tracking with Aggregated Local Flow Descriptor
查看>>
Joint Tracking and Segmentation of Multiple Targets
查看>>
Subgraph Decomposition for Multi-Target Tracking
查看>>
JOTS: Joint Online Tracking and Segmentation
查看>>