经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Hadoop » 查看文章
大数据-Hadoop生态(18)-MapReduce框架原理-WritableComparable排序和GroupingComparator分组
来源:cnblogs  作者:nt杨  时间:2018/12/13 9:20:17  对本文有异议

1.排序概述

2.排序分类

 

3.WritableComparable案例

这个文件,是大数据-Hadoop生态(12)-Hadoop序列化和源码追踪的输出文件,可以看到,文件根据key,也就是手机号进行了字典排序

  1. 13470253144 180 180 360
  2. 13509468723 7335 110349 117684
  3. 13560439638 918 4938 5856
  4. 13568436656 3597 25635 29232
  5. 13590439668 1116 954 2070
  6. 13630577991 6960 690 7650
  7. 13682846555 1938 2910 4848
  8. 13729199489 240 0 240
  9. 13736230513 2481 24681 27162
  10. 13768778790 120 120 240
  11. 13846544121 264 0 264
  12. 13956435636 132 1512 1644
  13. 13966251146 240 0 240
  14. 13975057813 11058 48243 59301
  15. 13992314666 3008 3720 6728
  16. 15043685818 3659 3538 7197
  17. 15910133277 3156 2936 6092
  18. 15959002129 1938 180 2118
  19. 18271575951 1527 2106 3633
  20. 18390173782 9531 2412 11943
  21. 84188413 4116 1432 5548

字段含义分别为手机号,上行流量,下行流量,总流量

需求是根据总流量进行排序

 

Bean对象,需要实现序列化,反序列化和Comparable接口

  1. package com.nty.writableComparable;
  2. import org.apache.hadoop.io.WritableComparable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. /**
  7. * author nty
  8. * date time 2018-12-12 16:33
  9. */
  10.  
  11. /**
  12. * 实现WritableComparable接口
  13. * 原先将bean序列化时,需要实现Writable接口,现在再实现Comparable接口
  14. *
  15. * public interface WritableComparable<T> extends Writable, Comparable<T>
  16. *
  17. * 所以我们可以实现Writable和Comparable两个接口,也可以实现WritableComparable接口
  18. */
  19. public class Flow implements WritableComparable<Flow> {
  20. private long upflow;
  21. private long downflow;
  22. private long total;
  23. public long getUpflow() {
  24. return upflow;
  25. }
  26. public void setUpflow(long upflow) {
  27. this.upflow = upflow;
  28. }
  29. public long getDownflow() {
  30. return downflow;
  31. }
  32. public void setDownflow(long downflow) {
  33. this.downflow = downflow;
  34. }
  35. public long getTotal() {
  36. return total;
  37. }
  38. public void setTotal(long total) {
  39. this.total = total;
  40. }
  41. //快速赋值
  42. public void setFlow(long upflow, long downflow){
  43. this.upflow = upflow;
  44. this.downflow = downflow;
  45. this.total = upflow + downflow;
  46. }
  47. @Override
  48. public String toString() {
  49. return upflow + "\t" + downflow + "\t" + total;
  50. }
  51. //重写compareTo方法
  52. @Override
  53. public int compareTo(Flow o) {
  54. return Long.compare(o.total, this.total);
  55. }
  56. //序列化方法
  57. @Override
  58. public void write(DataOutput out) throws IOException {
  59. out.writeLong(upflow);
  60. out.writeLong(downflow);
  61. out.writeLong(total);
  62. }
  63. //反序列化方法
  64. @Override
  65. public void readFields(DataInput in) throws IOException {
  66. upflow = in.readLong();
  67. downflow = in.readLong();
  68. total = in.readLong();
  69. }
  70. }

Mapper类

  1. package com.nty.writableComparable;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import java.io.IOException;
  6. /**
  7. * author nty
  8. * date time 2018-12-12 16:47
  9. */
  10. public class FlowMapper extends Mapper<LongWritable, Text, Flow, Text> {
  11. private Text phone = new Text();
  12. private Flow flow = new Flow();
  13. @Override
  14. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  15. //13470253144 180 180 360
  16. //分割行数据
  17. String[] flieds = value.toString().split("\t");
  18. //赋值
  19. phone.set(flieds[0]);
  20. flow.setFlow(Long.parseLong(flieds[1]), Long.parseLong(flieds[2]));
  21. //写出
  22. context.write(flow, phone);
  23. }
  24. }

Reducer类

  1. package com.nty.writableComparable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.io.IOException;
  5. /**
  6. * author nty
  7. * date time 2018-12-12 16:47
  8. */
  9. //注意一下输出类型
  10. public class FlowReducer extends Reducer<Flow, Text, Text, Flow> {
  11. @Override
  12. protected void reduce(Flow key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  13. for (Text value : values) {
  14. //输出
  15. context.write(value,key);
  16. }
  17. }
  18. }

Driver类

  1. package com.nty.writableComparable;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. /**
  9. * author nty
  10. * date time 2018-12-12 16:47
  11. */
  12. public class FlowDriver {
  13. public static void main(String[] args) throws Exception {
  14. //1. 获取Job实例
  15. Configuration configuration = new Configuration();
  16. Job instance = Job.getInstance(configuration);
  17. //2. 设置类路径
  18. instance.setJarByClass(FlowDriver.class);
  19. //3. 设置Mapper和Reducer
  20. instance.setMapperClass(FlowMapper.class);
  21. instance.setReducerClass(FlowReducer.class);
  22. //4. 设置输出类型
  23. instance.setMapOutputKeyClass(Flow.class);
  24. instance.setMapOutputValueClass(Text.class);
  25. instance.setOutputKeyClass(Text.class);
  26. instance.setOutputValueClass(Flow.class);
  27. //5. 设置输入输出路径
  28. FileInputFormat.setInputPaths(instance, new Path("d:\\Hadoop_test"));
  29. FileOutputFormat.setOutputPath(instance, new Path("d:\\Hadoop_test_out"));
  30. //6. 提交
  31. boolean b = instance.waitForCompletion(true);
  32. System.exit(b ? 0 : 1);
  33. }
  34. }

 

结果

 

 4.GroupingComparator案例

     订单id           商品id          商品金额        

  1. 0000001 Pdt_01 222.8
  2. 0000002 Pdt_05 722.4
  3. 0000001 Pdt_02 33.8
  4. 0000003 Pdt_06 232.8
  5. 0000003 Pdt_02 33.8
  6. 0000002 Pdt_03 522.8
  7. 0000002 Pdt_04 122.4

求出每一个订单中最贵的商品

需求分析:

1) 将订单id和商品金额作为key,在Map阶段先用订单id升序排序,如果订单id相同,再用商品金额降序排序

2) 在Reduce阶段,用groupingComparator按照订单分组,每一组的第一个即是最贵的商品

 

先定义bean对象,重写序列化反序列话排序方法

  1. package com.nty.groupingComparator;
  2. import org.apache.hadoop.io.WritableComparable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. /**
  7. * author nty
  8. * date time 2018-12-12 18:07
  9. */
  10. public class Order implements WritableComparable<Order> {
  11. private String orderId;
  12. private String productId;
  13. private double price;
  14. public String getOrderId() {
  15. return orderId;
  16. }
  17. public Order setOrderId(String orderId) {
  18. this.orderId = orderId;
  19. return this;
  20. }
  21. public String getProductId() {
  22. return productId;
  23. }
  24. public Order setProductId(String productId) {
  25. this.productId = productId;
  26. return this;
  27. }
  28. public double getPrice() {
  29. return price;
  30. }
  31. public Order setPrice(double price) {
  32. this.price = price;
  33. return this;
  34. }
  35. @Override
  36. public String toString() {
  37. return orderId + "\t" + productId + "\t" + price;
  38. }
  39. @Override
  40. public int compareTo(Order o) {
  41. //先按照订单排序,正序
  42. int compare = this.orderId.compareTo(o.getOrderId());
  43. if(0 == compare){
  44. //订单相同,再比较价格,倒序
  45. return Double.compare( o.getPrice(),this.price);
  46. }
  47. return compare;
  48. }
  49. @Override
  50. public void write(DataOutput out) throws IOException {
  51. out.writeUTF(orderId);
  52. out.writeUTF(productId);
  53. out.writeDouble(price);
  54. }
  55. @Override
  56. public void readFields(DataInput in) throws IOException {
  57. this.orderId = in.readUTF();
  58. this.productId = in.readUTF();
  59. this.price = in.readDouble();
  60. }
  61. }

Mapper类

  1. package com.nty.groupingComparator;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.NullWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import java.io.IOException;
  7. /**
  8. * author nty
  9. * date time 2018-12-12 18:07
  10. */
  11. public class OrderMapper extends Mapper<LongWritable, Text, Order, NullWritable> {
  12. private Order order = new Order();
  13. @Override
  14. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  15. //0000001 Pdt_01 222.8
  16. //分割行数据
  17. String[] fields = value.toString().split("\t");
  18. //为order赋值
  19. order.setOrderId(fields[0]).setProductId(fields[1]).setPrice(Double.parseDouble(fields[2]));
  20. //写出
  21. context.write(order,NullWritable.get());
  22. }
  23. }

GroupingComparator类

  1. package com.nty.groupingComparator;
  2. import org.apache.hadoop.io.WritableComparable;
  3. import org.apache.hadoop.io.WritableComparator;
  4. /**
  5. * author nty
  6. * date time 2018-12-12 18:08
  7. */
  8. public class OrderGroupingComparator extends WritableComparator {
  9. //用作比较的对象的具体类型
  10. public OrderGroupingComparator() {
  11. super(Order.class,true);
  12. }
  13. //重写的方法要选对哦,一共有三个,选择参数为WritableComparable的方法
  14. //默认的compare方法调用的是a,b对象的compare方法,但是现在我们排序和分组的规则不一致,所以要重写分组规则
  15. @Override
  16. public int compare(WritableComparable a, WritableComparable b) {
  17. Order oa = (Order) a;
  18. Order ob = (Order) b;
  19. //按照订单id分组
  20. return oa.getOrderId().compareTo(ob.getOrderId());
  21. }
  22. }

Reducer类

  1. package com.nty.groupingComparator;
  2. import org.apache.hadoop.io.NullWritable;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.io.IOException;
  5. /**
  6. * author nty
  7. * date time 2018-12-12 18:07
  8. */
  9. public class OrderReducer extends Reducer<Order, NullWritable,Order, NullWritable> {
  10. @Override
  11. protected void reduce(Order key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
  12. //每一组的第一个即是最高价商品,不需要遍历
  13. context.write(key, NullWritable.get());
  14. }
  15. }

Driver类

  1. package com.nty.groupingComparator;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. import java.io.IOException;
  9. /**
  10. * author nty
  11. * date time 2018-12-12 18:07
  12. */
  13. public class OrderDriver {
  14. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  15. //1获取实例
  16. Configuration configuration = new Configuration();
  17. Job job = Job.getInstance(configuration);
  18. //2设置类路径
  19. job.setJarByClass(OrderDriver.class);
  20. //3.设置Mapper和Reducer
  21. job.setMapperClass(OrderMapper.class);
  22. job.setReducerClass(OrderReducer.class);
  23. //4.设置自定义分组类
  24. job.setGroupingComparatorClass(OrderGroupingComparator.class);
  25. //5. 设置输出类型
  26. job.setMapOutputKeyClass(Order.class);
  27. job.setMapOutputValueClass(NullWritable.class);
  28. job.setOutputKeyClass(Order.class);
  29. job.setOutputValueClass(NullWritable.class);
  30. //6. 设置输入输出路径
  31. FileInputFormat.setInputPaths(job, new Path("d:\\Hadoop_test"));
  32. FileOutputFormat.setOutputPath(job, new Path("d:\\Hadoop_test_out"));
  33. //7. 提交
  34. boolean b = job.waitForCompletion(true);
  35. System.exit(b ? 0 : 1);
  36. }
  37. }

输出结果

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号