- hdfs和hbase的交互,和写MapReduce程序类似,只是需要修改输入输出数据和使用hbase的javaAPI对其进行操作处理即可
- public class HBaseToHdfs extends ToolRunner implements Tool {
- private Configuration configuration;
- //配置文件需要配置的属性
- private static final String HDFS_NAME = "fs.defaultFS";
- private static final String HDFS_VALUE = "hdfs://mycluster";
- private static final String MAPREDUCE_NAME = "mapreduce.framework.name";
- private static final String MAPREDUCE_VALUE = "yarn";
- private static final String HBASE_NAME = "hbase.zookeeper.quorum";
- private static final String HBASE_VALUE = "qiaojunlong3:2181,qiaojunlong4:2181,qiaojunlong5:2181";
- //获取hbase表的扫描对象
- private Scan getscan() {
- return new Scan();
- }
- @Override
- public int run(String[] args) throws Exception {
- getConf();
- //获取job实例对象
- Job job = Job.getInstance(configuration, "copy_move");
- //map/reduce的class链接
- job.setMapperClass(hbase_To_Hdfs.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(NullWritable.class);
- //设置输入输出
- //由hbase导数据到hdfs故输入端需要使用TableMapReduceUtil类
- TableMapReduceUtil.initTableMapperJob("ns3:t5", getscan(), hbase_To_Hdfs.class, Text.class, NullWritable.class, job);
- FileOutputFormat.setOutputPath(job, new Path(args[0]));
- //设置jar包
- job.setJarByClass(HBaseToHdfs.class);
- //提交作业
- int b = job.waitForCompletion(true) ? 0 : 1;
- return b;
- }
- @Override
- public void setConf(Configuration configuration) {
- configuration.set(HDFS_NAME, HDFS_VALUE);
- configuration.set(MAPREDUCE_NAME, MAPREDUCE_VALUE);
- configuration.set(HBASE_NAME, HBASE_VALUE);
- this.configuration = configuration;
- }
- @Override
- public Configuration getConf() {
- return configuration;
- }
- public static void main(String[] args) throws Exception {
- ToolRunner.run(HBaseConfiguration.create(),new HBaseToHdfs() , args);
- }
- // 创建map程序
- private static Text mkey = new Text();
- static class hbase_To_Hdfs extends TableMapper<Text, NullWritable> {
- @Override
- protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
- //定义字符串拼接
- StringBuffer stringBuffer = new StringBuffer();
- /**
- * 使用value获取扫描器,获取hbase表的列名/列值等信息
- * 使用StringBuffer来对需要的信息进行字符串拼接
- */
- CellScanner cellScanner = value.cellScanner();
- while (cellScanner.advance()) {
- Cell cell = cellScanner.current();
- stringBuffer.append(new String(CellUtil.cloneValue(cell))).append("\t");
- }
- mkey.set(stringBuffer.toString());
- context.write(mkey, NullWritable.get());
- }
- }
- }