经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Hadoop » 查看文章
hadoop(二MapReduce)
来源:cnblogs  作者:不穿格子衫的徍爺  时间:2019/8/12 8:39:31  对本文有异议

hadoop(二MapReduce)


介绍

MapReduce:其实就是把数据分开处理后再将数据合在一起.

  • Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
  • Reduce负责“合”,即对map阶段的结果进行全局汇总。
  • MapReduce运行在yarn集群

1565535533568

MapReduce中定义了如下的MapReduce两个抽象的编程接口,由用户去编程实现.Map和Reduce,

MapReduce处理的数据类型是键值对

1565535661117

1565535712603

1565535722812


代码处理

MapReduce 的开发一共有八个步骤, 其中 Map 阶段分为 2 个步骤,Shuwle 阶段 4 个步 
骤,Reduce 阶段分为 2 个步骤

​ Map 阶段 2 个步骤

  1. 设置 InputFormat 类, 将数据切分为 Key-Value(K1和V1) 对, 输入到第二步
  2. 自定义 Map 逻辑, 将第一步的结果转换成另外的 Key-Value(K2和V2) 对, 输出结果 
    Shuwle 阶段 4 个步骤
  3. 对输出的 Key-Value 对进行分区
  4. 对不同分区的数据按照相同的 Key 排序
  5. (可选) 对分组过的数据初步规约, 降低数据的网络拷贝
  6. 对数据进行分组, 相同 Key 的 Value 放入一个集合中 
    Reduce 阶段 2 个步骤
  7. 对多个 Map 任务的结果进行排序以及合并, 编写 Reduce 函数实现自己的逻辑, 对输入的 
    Key-Value 进行处理, 转为新的 Key-Value(K3和V3)输出
  8. 设置 OutputFormat 处理并保存 Reduce 输出的 Key-Value 数据

常用Maven依赖

    1. <packaging>jar</packaging>
    2. <dependencies>
    3. <dependency>
    4. <groupId>org.apache.hadoop</groupId>
    5. <artifactId>hadoop-common</artifactId>
    6. <version>2.7.5</version>
    7. </dependency>
    8. <dependency>
    9. <groupId>org.apache.hadoop</groupId>
    10. <artifactId>hadoop-client</artifactId>
    11. <version>2.7.5</version>
    12. </dependency>
    13. <dependency>
    14. <groupId>org.apache.hadoop</groupId>
    15. <artifactId>hadoop-hdfs</artifactId>
    16. <version>2.7.5</version>
    17. </dependency>
    18. <dependency>
    19. <groupId>org.apache.hadoop</groupId>
    20. <artifactId>hadoop-mapreduce-client-core</artifactId>
    21. <version>2.7.5</version>
    22. </dependency>
    23. <dependency>
    24. <groupId>junit</groupId>
    25. <artifactId>junit</artifactId>
    26. <version>RELEASE</version>
    27. </dependency>
    28. </dependencies>
    29. <build>
    30. <plugins>
    31. <plugin>
    32. <groupId>org.apache.maven.plugins</groupId>
    33. <artifactId>maven-compiler-plugin</artifactId>
    34. <version>3.1</version>
    35. <configuration>
    36. <source>1.8</source>
    37. <target>1.8</target>
    38. <encoding>UTF-8</encoding>
    39. <!-- <verbal>true</verbal>-->
    40. </configuration>
    41. </plugin>
    42. <plugin>
    43. <groupId>org.apache.maven.plugins</groupId>
    44. <artifactId>maven-shade-plugin</artifactId>
    45. <version>2.4.3</version>
    46. <executions>
    47. <execution>
    48. <phase>package</phase>
    49. <goals>
    50. <goal>shade</goal>
    51. </goals>
    52. <configuration>
    53. <minimizeJar>true</minimizeJar>
    54. </configuration>
    55. </execution>
    56. </executions>
    57. </plugin>
    58. </plugins>
    59. </build>

入门---统计

结构

1565536157181

    1. /*
    2. 四个泛型解释:
    3. KEYIN :K1的类型
    4. VALUEIN: V1的类型
    5. KEYOUT: K2的类型
    6. VALUEOUT: V2的类型
    7. */
    8. public class WordCountMapper extends Mapper<LongWritable,Text, Text , LongWritable> {
    9. //map方法就是将K1和V1 转为 K2和V2
    10. /*
    11. 参数:
    12. key : K1 行偏移量(默认几乎一直固定为LongWritable)
    13. value : V1 每一行的文本数据
    14. context :表示上下文对象
    15. */
    16. /*
    17. 如何将K1和V1 转为 K2和V2
    18. K1 V1
    19. 0 hello,world,hadoop
    20. 15 hdfs,hive,hello
    21. ---------------------------
    22. K2 V2
    23. hello 1
    24. world 1
    25. hdfs 1
    26. hadoop 1
    27. hello 1
    28. */
    29. @Override
    30. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    31. Text text = new Text();
    32. LongWritable longWritable = new LongWritable();
    33. //1:将一行的文本数据进行拆分
    34. String[] split = value.toString().split(",");
    35. //2:遍历数组,组装 K2 和 V2
    36. for (String word : split) {
    37. //3:将K2和V2写入上下文
    38. text.set(word);
    39. longWritable.set(1);
    40. context.write(text, longWritable);
    41. }
    42. }
    43. }

1565536309305


    1. /*
    2. 四个泛型解释:
    3. KEYIN: K2类型
    4. VALULEIN: V2类型
    5. KEYOUT: K3类型
    6. VALUEOUT:V3类型
    7. */
    8. public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
    9. //reduce方法作用: 将新的K2和V2转为 K3和V3 ,将K3和V3写入上下文中
    10. /*
    11. 参数:
    12. key : 新K2
    13. values: 集合 新 V2
    14. context :表示上下文对象
    15. ----------------------
    16. 如何将新的K2和V2转为 K3和V3
    17. 新 K2 V2
    18. hello <1,1,1>
    19. world <1,1>
    20. hadoop <1>
    21. ------------------------
    22. K3 V3
    23. hello 3
    24. world 2
    25. hadoop 1
    26. */
    27. @Override
    28. protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
    29. long count = 0;
    30. //1:遍历集合,将集合中的数字相加,得到 V3
    31. for (LongWritable value : values) {
    32. count += value.get();
    33. }
    34. //2:将K3和V3写入上下文中
    35. context.write(key, new LongWritable(count));
    36. }
    37. }

1565536420326

    1. public class JobMain extends Configured implements Tool {
    2. //该方法用于指定一个job任务
    3. @Override
    4. public int run(String[] args) throws Exception {
    5. //1:创建一个job任务对象
    6. Job job = Job.getInstance(super.getConf(), "wordcount");
    7. //如果打包运行出错,则需要加该配置
    8. job.setJarByClass(JobMain.class);
    9. //2:配置job任务对象(八个步骤)
    10. //第一步:指定文件的读取方式和读取路径
    11. job.setInputFormatClass(TextInputFormat.class);
    12. TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/wordcount"));
    13. //TextInputFormat.addInputPath(job, new Path("file:///D:\\mapreduce\\input"));
    14. //第二步:指定Map阶段的处理方式和数据类型
    15. job.setMapperClass(WordCountMapper.class);
    16. //设置Map阶段K2的类型
    17. job.setMapOutputKeyClass(Text.class);
    18. //设置Map阶段V2的类型
    19. job.setMapOutputValueClass(LongWritable.class);
    20. //第三,四,五,六 采用默认的方式
    21. //第七步:指定Reduce阶段的处理方式和数据类型
    22. job.setReducerClass(WordCountReducer.class);
    23. //设置K3的类型
    24. job.setOutputKeyClass(Text.class);
    25. //设置V3的类型
    26. job.setOutputValueClass(LongWritable.class);
    27. //第八步: 设置输出类型
    28. job.setOutputFormatClass(TextOutputFormat.class);
    29. //设置输出的路径
    30. Path path = new Path("hdfs://node01:8020/wordcount_out");
    31. TextOutputFormat.setOutputPath(job, path);
    32. //TextOutputFormat.setOutputPath(job, new Path("file:///D:\\mapreduce\\output"));
    33. //获取FileSystem
    34. FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
    35. //判断目录是否存在
    36. boolean bl2 = fileSystem.exists(path);
    37. if(bl2){
    38. //删除目标目录
    39. fileSystem.delete(path, true);
    40. }
    41. //等待任务结束
    42. boolean bl = job.waitForCompletion(true);
    43. return bl ? 0:1;
    44. }
    45. public static void main(String[] args) throws Exception {
    46. Configuration configuration = new Configuration();
    47. //启动job任务
    48. int run = ToolRunner.run(configuration, new JobMain(), args);
    49. System.exit(run);
    50. }
    51. }

shuwle阶段

分区

分区实则目的是按照我们的需求,将不同类型的数据分开处理,最终分开获取

代码实现

结构

1565536771431

    1. public class MyPartitioner extends Partitioner<Text,NullWritable> {
    2. /*
    3. 1:定义分区规则
    4. 2:返回对应的分区编号
    5. */
    6. @Override
    7. public int getPartition(Text text, NullWritable nullWritable, int i) {
    8. //1:拆分行文本数据(K2),获取中奖字段的值
    9. String[] split = text.toString().split("\t");
    10. String numStr = split[5];
    11. //2:判断中奖字段的值和15的关系,然后返回对应的分区编号
    12. if(Integer.parseInt(numStr) > 15){
    13. return 1;
    14. }else{
    15. return 0;
    16. }
    17. }
    18. }

    1. //第三步,指定分区类
    2. job.setPartitionerClass(MyPartitioner.class);
    3. //第四, 五,六步
    4. //设置ReduceTask的个数
    5. job.setNumReduceTasks(2);

MapReduce 中的计数器

计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计

可辅助诊断系统故障

看能否用一个计数器值来记录某一特定事件的发生 ,比分析一堆日志文件容易

1565537191724

通过enum枚举类型来定义计数器 统计reduce端数据的输入的key有多少个

    1. public class PartitionerReducer extends Reducer<Text,NullWritable,Text,NullWritable> {
    2. public static enum Counter{
    3. MY_INPUT_RECOREDS,MY_INPUT_BYTES
    4. }
    5. @Override
    6. protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    7. //方式2:使用枚枚举来定义计数器
    8. context.getCounter(Counter.MY_INPUT_RECOREDS).increment(1L);
    9. context.write(key, NullWritable.get());
    10. }
    11. }

排序(包含序列化)

  • 序列化 (Serialization) 是指把结构化对象转化为字节流
  • 反序列化 (Deserialization) 是序列化的逆过程. 把字节流转为结构化对象. 当要在进程间传 
    递对象或持久化对象的时候, 就需要序列化对象成字节流, 反之当要将接收到或从磁盘读取 
    的字节流转换为对象, 就要进行反序列化
  • Java 的序列化 (Serializable) 是一个重量级序列化框架, 一个对象被序列化后, 会附带很多额 
    外的信息 (各种校验信息, header, 继承体系等), 不便于在网络中高效传输. 所以, Hadoop 
    自己开发了一套序列化机制(Writable), 精简高效. 不用像 Java 对象类一样传输多层的父子 
    关系, 需要哪个属性就传输哪个属性值, 大大的减少网络传输的开销
  • Writable 是 Hadoop 的序列化格式, Hadoop 定义了这样一个 Writable 接口. 一个类要支持可 
    序列化只需实现这个接口即可
  • 另外 Writable 有一个子接口是 WritableComparable, WritableComparable 是既可实现序列 
    化, 也可以对key进行比较, 我们这里可以通过自定义 Key 实现 WritableComparable 来实现 
    我们的排序功能

1565537717043

1565537764056

    1. public class SortBean implements WritableComparable<SortBean>{
    2. private String word;
    3. private int num;
    4. public String getWord() {
    5. return word;
    6. }
    7. public void setWord(String word) {
    8. this.word = word;
    9. }
    10. public int getNum() {
    11. return num;
    12. }
    13. public void setNum(int num) {
    14. this.num = num;
    15. }
    16. @Override
    17. public String toString() {
    18. return word + "\t"+ num ;
    19. }
    20. //实现比较器,指定排序的规则
    21. /*
    22. 规则:
    23. 第一列(word)按照字典顺序进行排列 // aac aad
    24. 第一列相同的时候, 第二列(num)按照升序进行排列
    25. */
    26. @Override
    27. public int compareTo(SortBean sortBean) {
    28. //先对第一列排序: Word排序
    29. int result = this.word.compareTo(sortBean.word);
    30. //如果第一列相同,则按照第二列进行排序
    31. if(result == 0){
    32. return this.num - sortBean.num;
    33. }
    34. return result;
    35. }
    36. //实现序列化
    37. @Override
    38. public void write(DataOutput out) throws IOException {
    39. out.writeUTF(word);
    40. out.writeInt(num);
    41. }
    42. //实现反序列
    43. @Override
    44. public void readFields(DataInput in) throws IOException {
    45. this.word = in.readUTF();
    46. this.num = in.readInt();
    47. }
    48. }
    1. public class SortMapper extends Mapper<LongWritable,Text,SortBean,NullWritable> {
    2. /*
    3. map方法将K1和V1转为K2和V2:
    4. K1 V1
    5. 0 a 3
    6. 5 b 7
    7. ----------------------
    8. K2 V2
    9. SortBean(a 3) NullWritable
    10. SortBean(b 7) NullWritable
    11. */
    12. @Override
    13. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    14. //1:将行文本数据(V1)拆分,并将数据封装到SortBean对象,就可以得到K2
    15. String[] split = value.toString().split("\t");
    16. SortBean sortBean = new SortBean();
    17. sortBean.setWord(split[0]);
    18. sortBean.setNum(Integer.parseInt(split[1]));
    19. //2:将K2和V2写入上下文中
    20. context.write(sortBean, NullWritable.get());
    21. }
    22. }
    1. public class SortReducer extends Reducer<SortBean,NullWritable,SortBean,NullWritable> {
    2. //reduce方法将新的K2和V2转为K3和V3
    3. @Override
    4. protected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    5. context.write(key, NullWritable.get());
    6. }
    7. }

job略


规约Combiner

在三大阶段的第一阶段map处理完后,可能数据过多,利用分布式思想,抢在reduce前先做一次合并,后再由reduce合并,目的是:提高网络IO 性能

实现步骤

1565538246526

1565538259324

    1. //第三(分区),四 (排序)
    2. //第五步: 规约(Combiner)
    3. job.setCombinerClass(MyCombiner.class);
    4. //第六步 分布

1565538317935


案例:流量统计(key相同则++++++++)

1565538587584

    1. public class FlowBean implements Writable {
    2. private Integer upFlow; //上行数据包数
    3. private Integer downFlow; //下行数据包数
    4. private Integer upCountFlow; //上行流量总和
    5. private Integer downCountFlow;//下行流量总和
    6. //下略get set 序列化 反序列化
    1. public class FlowCountMapper extends Mapper<LongWritable,Text,Text,FlowBean> {
    2. /*
    3. 将K1和V1转为K2和V2:
    4. K1 V1
    5. 0 1363157985059 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 www.baidu.com 综合门户 19 128 1177 16852 200
    6. ------------------------------
    7. K2 V2
    8. 13600217502 FlowBean(19 128 1177 16852)
    9. */
    10. @Override
    11. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    12. //1:拆分行文本数据,得到手机号--->K2
    13. String[] split = value.toString().split("\t");
    14. String phoneNum = split[1];
    15. //2:创建FlowBean对象,并从行文本数据拆分出流量的四个四段,并将四个流量字段的值赋给FlowBean对象
    16. FlowBean flowBean = new FlowBean();
    17. flowBean.setUpFlow(Integer.parseInt(split[6]));
    18. flowBean.setDownFlow(Integer.parseInt(split[7]));
    19. flowBean.setUpCountFlow(Integer.parseInt(split[8]));
    20. flowBean.setDownCountFlow(Integer.parseInt(split[9]));
    21. //3:将K2和V2写入上下文中
    22. context.write(new Text(phoneNum), flowBean);
    23. }
    24. }
    1. public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
    2. @Override
    3. protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
    4. //1:遍历集合,并将集合中的对应的四个字段累计
    5. Integer upFlow = 0; //上行数据包数
    6. Integer downFlow = 0; //下行数据包数
    7. Integer upCountFlow = 0; //上行流量总和
    8. Integer downCountFlow = 0;//下行流量总和
    9. for (FlowBean value : values) {
    10. upFlow += value.getUpFlow();
    11. downFlow += value.getDownFlow();
    12. upCountFlow += value.getUpCountFlow();
    13. downCountFlow += value.getDownCountFlow();
    14. }
    15. //2:创建FlowBean对象,并给对象赋值 V3
    16. FlowBean flowBean = new FlowBean();
    17. flowBean.setUpFlow(upFlow);
    18. flowBean.setDownFlow(downFlow);
    19. flowBean.setUpCountFlow(upCountFlow);
    20. flowBean.setDownCountFlow(downCountFlow);
    21. //3:将K3和V3下入上下文中
    22. context.write(key, flowBean);
    23. }
    24. }
    1. public class JobMain extends Configured implements Tool {
    2. //该方法用于指定一个job任务
    3. @Override
    4. public int run(String[] args) throws Exception {
    5. //1:创建一个job任务对象
    6. Job job = Job.getInstance(super.getConf(), "mapreduce_flowcount");
    7. //如果打包运行出错,则需要加该配置
    8. job.setJarByClass(JobMain.class);
    9. //2:配置job任务对象(八个步骤)
    10. //第一步:指定文件的读取方式和读取路径
    11. job.setInputFormatClass(TextInputFormat.class);
    12. //TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/wordcount"));
    13. TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\flowcount_input"));
    14. //第二步:指定Map阶段的处理方式和数据类型
    15. job.setMapperClass(FlowCountMapper.class);
    16. //设置Map阶段K2的类型
    17. job.setMapOutputKeyClass(Text.class);
    18. //设置Map阶段V2的类型
    19. job.setMapOutputValueClass(FlowBean.class);
    20. //第三(分区),四 (排序)
    21. //第五步: 规约(Combiner)
    22. //第六步 分组
    23. //第七步:指定Reduce阶段的处理方式和数据类型
    24. job.setReducerClass(FlowCountReducer.class);
    25. //设置K3的类型
    26. job.setOutputKeyClass(Text.class);
    27. //设置V3的类型
    28. job.setOutputValueClass(FlowBean.class);
    29. //第八步: 设置输出类型
    30. job.setOutputFormatClass(TextOutputFormat.class);
    31. //设置输出的路径
    32. TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\flowcount_out"));
    33. //等待任务结束
    34. boolean bl = job.waitForCompletion(true);
    35. return bl ? 0:1;
    36. }
    37. public static void main(String[] args) throws Exception {
    38. Configuration configuration = new Configuration();
    39. //启动job任务
    40. int run = ToolRunner.run(configuration, new JobMain(), args);
    41. System.exit(run);
    42. }
    43. }

如增加需求:

上行流量倒序排序

    1. public class FlowBean implements WritableComparable<FlowBean> {
    2. //指定排序的规则
    3. @Override
    4. public int compareTo(FlowBean flowBean) {
    5. // return this.upFlow.compareTo(flowBean.getUpFlow()) * -1;
    6. return flowBean.upFlow - this.upFlow ;
    7. }
    8. }

需求:手机号码分区

1565538916347

    1. public class FlowCountPartition extends Partitioner<Text,FlowBean> {
    2. /*
    3. 该方法用来指定分区的规则:
    4. 135 开头数据到一个分区文件
    5. 136 开头数据到一个分区文件
    6. 137 开头数据到一个分区文件
    7. 其他分区
    8. 参数:
    9. text : K2 手机号
    10. flowBean: V2
    11. i : ReduceTask的个数
    12. */
    13. @Override
    14. public int getPartition(Text text, FlowBean flowBean, int i) {
    15. //1:获取手机号
    16. String phoneNum = text.toString();
    17. //2:判断手机号以什么开头,返回对应的分区编号(0-3)
    18. if(phoneNum.startsWith("135")){
    19. return 0;
    20. }else if(phoneNum.startsWith("136")){
    21. return 1;
    22. }else if(phoneNum.startsWith("137")){
    23. return 2;
    24. }else{
    25. return 3;
    26. }
    27. }
    28. }
    1. //第三(分区),四 (排序)
    2. job.setPartitionerClass(FlowCountPartition.class);
    3. //第五步: 规约(Combiner)
    4. //第六步 分组
    5. //设置reduce个数
    6. job.setNumReduceTasks(4);

原文链接:http://www.cnblogs.com/leccoo/p/11337386.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号