

1. 需求
将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value
三个小文件
one.txt
- yongpeng weidong weinan
- sanfeng luozong xiaoming
two.txt
- shuaige changmo zhenqiang
- dongli lingu xuanxuan
three.txt
- longlong fanfan
- mazong kailun yuhang yixin
- longlong fanfan
- mazong kailun yuhang yixin
2. 需求分析

3.案例代码
1) 自定义RecordReader
- package com.nty.inputformat;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.BytesWritable;
- import org.apache.hadoop.io.IOUtils;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import java.io.IOException;
- /**
- * author nty
- * date time 2018-12-11 9:10
- */
- public class CustomRecordReader extends RecordReader<Text, BytesWritable> {
- /**
- * 由于采用了FileInputFormat的输入方式,所以输入源3个文件,会分成三个切片,所以一个RecordReader只处理一个文件,一次读完
- */
-
- //标记文件是否被读过,true表示没被读过
- private boolean flag = true;
- private Text key = new Text();
- private BytesWritable value = new BytesWritable();
- //输入流
- FSDataInputStream fis;
- private FileSplit fs;
- /**
- * 初始化方法,只调用一次
- * @param split
- * @param context
- * @throws IOException
- * @throws InterruptedException
- */
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- //FileSplit是InputSplit的子类
- fs = (FileSplit) split;
- //获取文件路径
- Path path = fs.getPath();
- //获取文件系统
- FileSystem fileSystem = FileSystem.get(context.getConfiguration());
- //FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
- //开流
- fis = fileSystem.open(path);
- }
- /**
- * 读取下一组KV
- * @return 读到了返回true,反之返回false
- * @throws IOException
- * @throws InterruptedException
- */
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if(flag){
- //读取文件进入key和value
- String path = fs.getPath().toString();
- key.set(path);
- //文件是一次性读完,bytes的长度不能为普遍的1024,当然这么写会涉及到大文件的问题,不做讨论.
- byte[] bytes = new byte[(int) fs.getLength()];
- fis.read(bytes);
- value.set(bytes,0,bytes.length);
- //重新标记
- flag = false;
- return true;
- }
- return false;
- }
- /**
- * 获取当前读到的key
- * @return
- * @throws IOException
- * @throws InterruptedException
- */
- public Text getCurrentKey() throws IOException, InterruptedException {
- return this.key;
- }
- /**
- * 获取当前读到的value
- * @return
- * @throws IOException
- * @throws InterruptedException
- */
- public BytesWritable getCurrentValue() throws IOException, InterruptedException {
- return this.value;
- }
- /**
- * 获取当前读取的进度
- * @return
- * @throws IOException
- * @throws InterruptedException
- */
- public float getProgress() throws IOException, InterruptedException {
- //文件一次读完,只有0和1的进度,根据flag来判断
- return flag ? 0f : 1f;
- }
- /**
- * 关闭资源
- * @throws IOException
- */
- public void close() throws IOException {
- IOUtils.closeStream(fis);
- }
- }
2) 自定义Inputformat
- package com.nty.inputformat;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.BytesWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.JobContext;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import java.io.IOException;
- /**
- * author nty
- * date time 2018-12-11 9:09
- */
- //需求中,key为文件路径+名称,所以key类型为Text,value为文件内容,用BytesWritable
- public class CustomInputFormat extends FileInputFormat<Text, BytesWritable> {
- //最后输出的value为一个文件,所让文件不能被切分,返回false
- @Override
- protected boolean isSplitable(JobContext context, Path filename) {
- return false;
- }
- //返回自定义的 RecordReader
- public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- return new CustomRecordReader();
- }
- }
3) 编写Mapper类
- package com.nty.inputformat;
- import org.apache.hadoop.io.BytesWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import java.io.IOException;
- /**
- * author nty
- * date time 2018-12-11 9:10
- */
- public class CustomMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
- @Override
- protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
- context.write(key,value);
- }
- }
4) 编写Reducer类
- package com.nty.inputformat;
- import org.apache.hadoop.io.BytesWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- import java.io.IOException;
- /**
- * author nty
- * date time 2018-12-11 9:10
- */
- public class CustomReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
- @Override
- protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
- for (BytesWritable value : values) {
- context.write(key, value);
- }
- }
- }
5) 编写Driver类
- package com.nty.inputformat;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.BytesWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
- /**
- * author nty
- * date time 2018-12-11 9:10
- */
- public class CustomDriver {
- public static void main(String[] args) throws Exception{
- //获取job
- Configuration configuration = new Configuration();
- Job job = Job.getInstance(configuration);
- //设置类
- job.setJarByClass(CustomDriver.class);
- //设置input和output
- job.setInputFormatClass(CustomInputFormat.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- //设置Mapper和Reducer
- job.setMapperClass(CustomMapper.class);
- job.setReducerClass(CustomReducer.class);
- //设置Mapper和Reducer的输入输出
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(BytesWritable.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(BytesWritable.class);
- //设置文件路径
- FileInputFormat.setInputPaths(job, new Path("d:\\Hadoop_test"));
- FileOutputFormat.setOutputPath(job, new Path("d:\\Hadoop_test_out"));
- //提交
- boolean b = job.waitForCompletion(true);
- System.exit(b ? 0 : 1);
- }
- }