经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Hadoop » 查看文章
辅助排序和Mapreduce整体流程
来源:cnblogs  作者:给你一个公主抱  时间:2019/3/29 9:09:03  对本文有异议

一、辅助排序

  需求:先有一个订单数据文件,包含了订单id、商品id、商品价格,要求将订单id正序,商品价格倒序,且生成结果文件个数为订单id的数量,每个结果文件中只要一条该订单最贵商品的数据。

  思路:1.封装订单类OrderBean,实现WritableComparable接口;

     2.自定义Mapper类,确定输入输出数据类型,写业务逻辑;

     3.自定义分区,根据不同的订单id返回不同的分区值;

     4.自定义Reducer类;

     5.辅助排序类OrderGroupingComparator继承WritableComparator类,并定义无参构成方法、重写compare方法;

     6.书写Driver类;

  代码如下:

  1. /**
  2. * @author: PrincessHug
  3. * @date: 2019/3/25, 21:42
  4. * @Blog: https://www.cnblogs.com/HelloBigTable/
  5. */
  6. public class OrderBean implements WritableComparable<OrderBean> {
  7. private int orderId;
  8. private double orderPrice;
  9.  
  10. public OrderBean() {
  11. }
  12.  
  13. public OrderBean(int orderId, double orderPrice) {
  14. this.orderId = orderId;
  15. this.orderPrice = orderPrice;
  16. }
  17.  
  18. public int getOrderId() {
  19. return orderId;
  20. }
  21.  
  22. public void setOrderId(int orderId) {
  23. this.orderId = orderId;
  24. }
  25.  
  26. public double getOrderPrice() {
  27. return orderPrice;
  28. }
  29.  
  30. public void setOrderPrice(double orderPrice) {
  31. this.orderPrice = orderPrice;
  32. }
  33.  
  34. @Override
  35. public String toString() {
  36. return orderId + "\t" + orderPrice;
  37. }
  38.  
  39. @Override
  40. public int compareTo(OrderBean o) {
  41. int rs ;
  42. if (this.orderId > o.getOrderId()){
  43. rs = 1;
  44. }else if (this.orderId < o.getOrderId()){
  45. rs = -1;
  46. }else {
  47. rs = (this.orderPrice > o.getOrderPrice()) ? -1:1;
  48. }
  49. return rs;
  50. }
  51.  
  52. @Override
  53. public void write(DataOutput out) throws IOException {
  54. out.writeInt(orderId);
  55. out.writeDouble(orderPrice);
  56. }
  57.  
  58. @Override
  59. public void readFields(DataInput in) throws IOException {
  60. orderId = in.readInt();
  61. orderPrice = in.readDouble();
  62. }
  63. }
  64.  
  65. public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {
  66. @Override
  67. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  68. //获取数据
  69. String line = value.toString();
  70.  
  71. //切割数据
  72. String[] fields = line.split("\t");
  73.  
  74. //封装数据
  75. int orderId = Integer.parseInt(fields[0]);
  76. double orderPrice = Double.parseDouble(fields[2]);
  77. OrderBean orderBean = new OrderBean(orderId, orderPrice);
  78.  
  79. //发送数据
  80. context.write(orderBean,NullWritable.get());
  81. }
  82. }
  83.  
  84. public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {
  85. @Override
  86. public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {
  87. //构造参数中i的值为reducetask的个数
  88. return (orderBean.getOrderId() & Integer.MAX_VALUE ) % i;
  89. }
  90. }
  91.  
  92. public class OrderReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> {
  93. @Override
  94. protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
  95. context.write(key,NullWritable.get());
  96. }
  97. }
  98.  
  99. public class OrderGrouptingComparator extends WritableComparator {
  100. //必须使用super调用父类的构造方法来定义对比的类为OrderBean
  101. protected OrderGrouptingComparator(){
  102. super(OrderBean.class,true);
  103. }
  104.  
  105. @Override
  106. public int compare(WritableComparable a, WritableComparable b) {
  107. OrderBean aBean = (OrderBean)a;
  108. OrderBean bBean = (OrderBean)b;
  109.  
  110. int rs ;
  111. if (aBean.getOrderId() > bBean.getOrderId()){
  112. rs = 1;
  113. }else if (aBean.getOrderId() < bBean.getOrderId()){
  114. rs = -1;
  115. }else {
  116. rs = 0;
  117. }
  118. return rs;
  119. }
  120. }
  121.  
  122. public class OrderDriver {
  123. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  124. //配置信息,Job对象
  125. Configuration conf = new Configuration();
  126. Job job = Job.getInstance(conf);
  127.  
  128. //执行类
  129. job.setJarByClass(OrderBean.class);
  130.  
  131. //设置Mapper、Reducer类
  132. job.setMapperClass(OrderMapper.class);
  133. job.setReducerClass(OrderReducer.class);
  134.  
  135. //设置Mapper输出数据类型
  136. job.setMapOutputKeyClass(OrderBean.class);
  137. job.setMapOutputValueClass(NullWritable.class);
  138.  
  139. //设置Reducer输出数据类型
  140. job.setOutputKeyClass(OrderBean.class);
  141. job.setOutputValueClass(NullWritable.class);
  142.  
  143. //设置辅助排序
  144. job.setGroupingComparatorClass(OrderGrouptingComparator.class);
  145.  
  146. //设置分区类
  147. job.setPartitionerClass(OrderPartitioner.class);
  148.  
  149. //设置reducetask数量
  150. job.setNumReduceTasks(3);
  151.  
  152. //设置文件输入输出流
  153. FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\order\\in"));
  154. FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\order\\out"));
  155.  
  156. //提交任务
  157. if (job.waitForCompletion(true)){
  158. System.out.println("运行完成!");
  159. }else {
  160. System.out.println("运行失败!");
  161. }
  162. }
  163. }

  由于这是敲了很多次的代码,没有加太多注释,请谅解!

 

二、Mapreduce整体的流程

  1.有一块200M的文本文件,首先将待处理的数据提交客户端;

  2.客户端会向Yarn平台提交切片信息,然后Yarn计算出所需要的maptask的数量为2;

  3.程序默认使用FileInputFormat的TextInputFormat方法将文件数据读到maptask;

  4.maptask运行业务逻辑,然后将数据通过InputOutputContext写入到环形缓冲区;

  5.环形缓冲区其实是内存开辟的一块空间,就是内存,当环形缓冲区内数据达到默认大小100M的80%时,发生溢写;

  6.溢写出的数据会进行多次的分区排序(shuffle机制,下一个随笔详细解释)

  7.分区排序后的数据块可以选择进行Combiner合并,然后写入本地磁盘;

  8.reducetask等maptask完全运行完毕后,开始从磁盘中读取maptask产出写出的数据,然后进行合并文件,归并排序(这时就是进行上面辅助排序的时候);

  9.Reducer一次读取一组数据,然后使用默认的TextOutputFormat方法将数据写出到结果文件。

 

原文链接:http://www.cnblogs.com/HelloBigTable/p/10617937.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号