经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Hadoop » 查看文章
大数据-Hadoop生态(16)-MapReduce框架原理-自定义FileInputFormat
来源:cnblogs  作者:nt杨  时间:2018/12/12 9:42:28  对本文有异议

 

1. 需求

将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value

三个小文件

one.txt

  1. yongpeng weidong weinan
  2. sanfeng luozong xiaoming

two.txt

  1. shuaige changmo zhenqiang
  2. dongli lingu xuanxuan

three.txt

  1. longlong fanfan
  2. mazong kailun yuhang yixin
  3. longlong fanfan
  4. mazong kailun yuhang yixin

 

2. 需求分析

 

3.案例代码

1) 自定义RecordReader

  1. package com.nty.inputformat;
  2. import org.apache.hadoop.fs.FSDataInputStream;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.BytesWritable;
  6. import org.apache.hadoop.io.IOUtils;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.InputSplit;
  9. import org.apache.hadoop.mapreduce.RecordReader;
  10. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  11. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  12. import java.io.IOException;
  13. /**
  14. * author nty
  15. * date time 2018-12-11 9:10
  16. */
  17. public class CustomRecordReader extends RecordReader<Text, BytesWritable> {
  18. /**
  19. * 由于采用了FileInputFormat的输入方式,所以输入源3个文件,会分成三个切片,所以一个RecordReader只处理一个文件,一次读完
  20. */
  21.  
  22. //标记文件是否被读过,true表示没被读过
  23. private boolean flag = true;
  24. private Text key = new Text();
  25. private BytesWritable value = new BytesWritable();
  26. //输入流
  27. FSDataInputStream fis;
  28. private FileSplit fs;
  29. /**
  30. * 初始化方法,只调用一次
  31. * @param split
  32. * @param context
  33. * @throws IOException
  34. * @throws InterruptedException
  35. */
  36. public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
  37. //FileSplit是InputSplit的子类
  38. fs = (FileSplit) split;
  39. //获取文件路径
  40. Path path = fs.getPath();
  41. //获取文件系统
  42. FileSystem fileSystem = FileSystem.get(context.getConfiguration());
  43. //FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
  44. //开流
  45. fis = fileSystem.open(path);
  46. }
  47. /**
  48. * 读取下一组KV
  49. * @return 读到了返回true,反之返回false
  50. * @throws IOException
  51. * @throws InterruptedException
  52. */
  53. public boolean nextKeyValue() throws IOException, InterruptedException {
  54. if(flag){
  55. //读取文件进入key和value
  56. String path = fs.getPath().toString();
  57. key.set(path);
  58. //文件是一次性读完,bytes的长度不能为普遍的1024,当然这么写会涉及到大文件的问题,不做讨论.
  59. byte[] bytes = new byte[(int) fs.getLength()];
  60. fis.read(bytes);
  61. value.set(bytes,0,bytes.length);
  62. //重新标记
  63. flag = false;
  64. return true;
  65. }
  66. return false;
  67. }
  68. /**
  69. * 获取当前读到的key
  70. * @return
  71. * @throws IOException
  72. * @throws InterruptedException
  73. */
  74. public Text getCurrentKey() throws IOException, InterruptedException {
  75. return this.key;
  76. }
  77. /**
  78. * 获取当前读到的value
  79. * @return
  80. * @throws IOException
  81. * @throws InterruptedException
  82. */
  83. public BytesWritable getCurrentValue() throws IOException, InterruptedException {
  84. return this.value;
  85. }
  86. /**
  87. * 获取当前读取的进度
  88. * @return
  89. * @throws IOException
  90. * @throws InterruptedException
  91. */
  92. public float getProgress() throws IOException, InterruptedException {
  93. //文件一次读完,只有0和1的进度,根据flag来判断
  94. return flag ? 0f : 1f;
  95. }
  96. /**
  97. * 关闭资源
  98. * @throws IOException
  99. */
  100. public void close() throws IOException {
  101. IOUtils.closeStream(fis);
  102. }
  103. }

2) 自定义Inputformat

  1. package com.nty.inputformat;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.BytesWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.InputSplit;
  6. import org.apache.hadoop.mapreduce.JobContext;
  7. import org.apache.hadoop.mapreduce.RecordReader;
  8. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10. import java.io.IOException;
  11. /**
  12. * author nty
  13. * date time 2018-12-11 9:09
  14. */
  15. //需求中,key为文件路径+名称,所以key类型为Text,value为文件内容,用BytesWritable
  16. public class CustomInputFormat extends FileInputFormat<Text, BytesWritable> {
  17. //最后输出的value为一个文件,所让文件不能被切分,返回false
  18. @Override
  19. protected boolean isSplitable(JobContext context, Path filename) {
  20. return false;
  21. }
  22. //返回自定义的 RecordReader
  23. public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
  24. return new CustomRecordReader();
  25. }
  26. }

3) 编写Mapper类

  1. package com.nty.inputformat;
  2. import org.apache.hadoop.io.BytesWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import java.io.IOException;
  6. /**
  7. * author nty
  8. * date time 2018-12-11 9:10
  9. */
  10. public class CustomMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
  11. @Override
  12. protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
  13. context.write(key,value);
  14. }
  15. }

4) 编写Reducer类

  1. package com.nty.inputformat;
  2. import org.apache.hadoop.io.BytesWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. /**
  7. * author nty
  8. * date time 2018-12-11 9:10
  9. */
  10. public class CustomReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
  11. @Override
  12. protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
  13. for (BytesWritable value : values) {
  14. context.write(key, value);
  15. }
  16. }
  17. }

5) 编写Driver类

  1. package com.nty.inputformat;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.BytesWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
  10. /**
  11. * author nty
  12. * date time 2018-12-11 9:10
  13. */
  14. public class CustomDriver {
  15. public static void main(String[] args) throws Exception{
  16. //获取job
  17. Configuration configuration = new Configuration();
  18. Job job = Job.getInstance(configuration);
  19. //设置类
  20. job.setJarByClass(CustomDriver.class);
  21. //设置input和output
  22. job.setInputFormatClass(CustomInputFormat.class);
  23. job.setOutputFormatClass(SequenceFileOutputFormat.class);
  24. //设置Mapper和Reducer
  25. job.setMapperClass(CustomMapper.class);
  26. job.setReducerClass(CustomReducer.class);
  27. //设置Mapper和Reducer的输入输出
  28. job.setMapOutputKeyClass(Text.class);
  29. job.setMapOutputValueClass(BytesWritable.class);
  30. job.setOutputKeyClass(Text.class);
  31. job.setOutputValueClass(BytesWritable.class);
  32. //设置文件路径
  33. FileInputFormat.setInputPaths(job, new Path("d:\\Hadoop_test"));
  34. FileOutputFormat.setOutputPath(job, new Path("d:\\Hadoop_test_out"));
  35. //提交
  36. boolean b = job.waitForCompletion(true);
  37. System.exit(b ? 0 : 1);
  38. }
  39. }

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号