经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Hadoop » 查看文章
Mapreduce的排序(全局排序、分区加排序、Combiner优化)
来源:cnblogs  作者:给你一个公主抱  时间:2019/3/25 9:11:00  对本文有异议

一、MR排序的分类

  1.部分排序:MR会根据自己输出记录的KV对数据进行排序,保证输出到每一个文件内存都是经过排序的;

  2.全局排序;

  3.辅助排序:再第一次排序后经过分区再排序一次;

  4.二次排序:经过一次排序后又根据业务逻辑再次进行排序。

 

二、MR排序的接口——WritableComparable

  该接口继承了Hadoop的Writable接口和Java的Comparable接口,实现该接口要重写write、readFields、compareTo三个方法。

 

三、流量统计案例的排序与分区

  1. /**
  2. * @author: PrincessHug
  3. * @date: 2019/3/24, 15:36
  4. * @Blog: https://www.cnblogs.com/HelloBigTable/
  5. */
  6. public class FlowSortBean implements WritableComparable<FlowSortBean> {
  7. private long upFlow;
  8. private long dwFlow;
  9. private long flowSum;
  10.  
  11. public FlowSortBean() {
  12. }
  13.  
  14. public FlowSortBean(long upFlow, long dwFlow) {
  15. this.upFlow = upFlow;
  16. this.dwFlow = dwFlow;
  17. this.flowSum = upFlow + dwFlow;
  18. }
  19.  
  20. public long getUpFlow() {
  21. return upFlow;
  22. }
  23.  
  24. public void setUpFlow(long upFlow) {
  25. this.upFlow = upFlow;
  26. }
  27.  
  28. public long getDwFlow() {
  29. return dwFlow;
  30. }
  31.  
  32. public void setDwFlow(long dwFlow) {
  33. this.dwFlow = dwFlow;
  34. }
  35.  
  36. public long getFlowSum() {
  37. return flowSum;
  38. }
  39.  
  40. public void setFlowSum(long flowSum) {
  41. this.flowSum = flowSum;
  42. }
  43.  
  44. @Override
  45. public void write(DataOutput out) throws IOException {
  46. out.writeLong(upFlow);
  47. out.writeLong(dwFlow);
  48. out.writeLong(flowSum);
  49. }
  50.  
  51. @Override
  52. public void readFields(DataInput in) throws IOException {
  53. upFlow = in.readLong();
  54. dwFlow = in.readLong();
  55. flowSum = in.readLong();
  56. }
  57.  
  58. @Override
  59. public String toString() {
  60. return upFlow + "\t" + dwFlow + "\t" + flowSum;
  61. }
  62.  
  63. @Override
  64. public int compareTo(FlowSortBean o) {
  65. return this.flowSum > o.getFlowSum() ? -1:1;
  66. }
  67. }
  68.  
  69. public class FlowSortMapper extends Mapper<LongWritable, Text,FlowSortBean,Text> {
  70. @Override
  71. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  72. //获取数据
  73. String line = value.toString();
  74.  
  75. //切分数据
  76. String[] fields = line.split("\t");
  77.  
  78. //封装数据
  79. long upFlow = Long.parseLong(fields[1]);
  80. long dwFlow = Long.parseLong(fields[2]);
  81.  
  82. //传输数据
  83. context.write(new FlowSortBean(upFlow,dwFlow),new Text(fields[0]));
  84. }
  85. }
  86.  
  87. public class FlowSortReducer extends Reducer<FlowSortBean,Text,Text,FlowSortBean> {
  88. @Override
  89. protected void reduce(FlowSortBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  90. context.write(values.iterator().next(),key);
  91. }
  92. }
  93.  
  94. public class FlowSortPartitioner extends Partitioner<FlowSortBean, Text> {
  95. @Override
  96. public int getPartition(FlowSortBean key, Text value, int i) {
  97. String phoneNum = value.toString().substring(0, 3);
  98.  
  99. int partition = 4;
  100. if ("135".equals(phoneNum)){
  101. return 0;
  102. }else if ("137".equals(phoneNum)){
  103. return 1;
  104. }else if ("138".equals(phoneNum)){
  105. return 2;
  106. }else if ("139".equals(phoneNum)){
  107. return 3;
  108. }
  109. return partition;
  110. }
  111. }
  112.  
  113. public class FlowSortDriver {
  114. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  115. //设置配置,初始化Job类
  116. Configuration conf = new Configuration();
  117. Job job = Job.getInstance(conf);
  118.  
  119. //设置执行类
  120. job.setJarByClass(FlowSortDriver.class);
  121.  
  122. //设置Mapper、Reducer类
  123. job.setMapperClass(FlowSortMapper.class);
  124. job.setReducerClass(FlowSortReducer.class);
  125.  
  126. //设置Mapper输出数据类型
  127. job.setMapOutputKeyClass(FlowSortBean.class);
  128. job.setMapOutputValueClass(Text.class);
  129.  
  130. //设置Reducer输出数据类型
  131. job.setOutputKeyClass(Text.class);
  132. job.setOutputValueClass(FlowSortBean.class);
  133.  
  134. //设置自定义分区
  135. job.setPartitionerClass(FlowSortPartitioner.class);
  136. job.setNumReduceTasks(5);
  137.  
  138. //设置文件输入输出类型
  139. FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\flow\\flowsort\\in"));
  140. FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\flow\\flowsort\\partitionout"));
  141.  
  142. //提交任务
  143. if (job.waitForCompletion(true)){
  144. System.out.println("运行完成!");
  145. }else {
  146. System.out.println("运行失败!");
  147. }
  148.  
  149. }
  150. }

  注意:再写Mapper类的时候,要注意KV对输出的数据类型,Key的类型一定要为FlowSortBean,因为在Mapper和Reducer之间进行的排序(只是排序)是通过Mapper输出的Key来进行排序的,而分区可以指定是通过Key或者Value。

 

四、Combiner合并

  Combiner是在MR之外的一个组件,可以用来在maptask输出到环形缓冲区溢写之后,分区排序完成时进行局部的汇总,可以减少网络传输量,进而优化MR程序。

  Combiner是用在当数据量到达一定规模之后的,小的数据量并不是很明显。

  例如WordCount程序,当单词文件的大小到达一定程度,可以使用自定义Combiner进行优化:

  1. public class WordCountCombiner extends Reducer<Text,IntWritable,Text,IntWritable>{
  2. protected void reduce(Text key,Iterable<IntWritable> values,Context context){
  3. //计数
  4. int count = 0;
  5. //累加求和
  6. for(IntWritable v:values){
  7. count += v.get();
  8. }
  9. //输出
  10. context.write(key,new IntWritable(count));
  11. }
  12. }

  然后再Driver类中设置使用Combiner类

  1. job.setCombinerClass(WordCountCombiner.class);

  如果仔细观察,WordCount的自定义Combiner类与Reducer类是完全相同的,因为他们的逻辑是相同的,即在maptask之后的分区内先进行一次累加求和,然后到reducer后再进行总的累加求和,所以在设置Combiner时也可以这样:

  1. job.setCombinerClass(WordCountReducer.class);

 

  注意:Combiner的应用一定要注意不能影响最终业务逻辑的情况下使用,比如在求平均值的时候:

  mapper输出两个分区:3,5,7  =>avg=5

            2,6    =>avg=4

  reducer合并输出:  5,4     =>avg=4.5  但是实际应该为4.6,错误!

  所以在使用Combiner时要注意其不会影响最中的结果!!!

 

原文链接:http://www.cnblogs.com/HelloBigTable/p/10591267.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号