一、辅助排序
需求:先有一个订单数据文件,包含了订单id、商品id、商品价格,要求将订单id正序,商品价格倒序,且生成结果文件个数为订单id的数量,每个结果文件中只要一条该订单最贵商品的数据。
思路:1.封装订单类OrderBean,实现WritableComparable接口;
2.自定义Mapper类,确定输入输出数据类型,写业务逻辑;
3.自定义分区,根据不同的订单id返回不同的分区值;
4.自定义Reducer类;
5.辅助排序类OrderGroupingComparator继承WritableComparator类,并定义无参构成方法、重写compare方法;
6.书写Driver类;
代码如下:
- /**
- * @author: PrincessHug
- * @date: 2019/3/25, 21:42
- * @Blog: https://www.cnblogs.com/HelloBigTable/
- */
- public class OrderBean implements WritableComparable<OrderBean> {
- private int orderId;
- private double orderPrice;
-
- public OrderBean() {
- }
-
- public OrderBean(int orderId, double orderPrice) {
- this.orderId = orderId;
- this.orderPrice = orderPrice;
- }
-
- public int getOrderId() {
- return orderId;
- }
-
- public void setOrderId(int orderId) {
- this.orderId = orderId;
- }
-
- public double getOrderPrice() {
- return orderPrice;
- }
-
- public void setOrderPrice(double orderPrice) {
- this.orderPrice = orderPrice;
- }
-
- @Override
- public String toString() {
- return orderId + "\t" + orderPrice;
- }
-
- @Override
- public int compareTo(OrderBean o) {
- int rs ;
- if (this.orderId > o.getOrderId()){
- rs = 1;
- }else if (this.orderId < o.getOrderId()){
- rs = -1;
- }else {
- rs = (this.orderPrice > o.getOrderPrice()) ? -1:1;
- }
- return rs;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(orderId);
- out.writeDouble(orderPrice);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- orderId = in.readInt();
- orderPrice = in.readDouble();
- }
- }
-
- public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- //获取数据
- String line = value.toString();
-
- //切割数据
- String[] fields = line.split("\t");
-
- //封装数据
- int orderId = Integer.parseInt(fields[0]);
- double orderPrice = Double.parseDouble(fields[2]);
- OrderBean orderBean = new OrderBean(orderId, orderPrice);
-
- //发送数据
- context.write(orderBean,NullWritable.get());
- }
- }
-
- public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {
- @Override
- public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {
- //构造参数中i的值为reducetask的个数
- return (orderBean.getOrderId() & Integer.MAX_VALUE ) % i;
- }
- }
-
- public class OrderReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> {
- @Override
- protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
- context.write(key,NullWritable.get());
- }
- }
-
- public class OrderGrouptingComparator extends WritableComparator {
- //必须使用super调用父类的构造方法来定义对比的类为OrderBean
- protected OrderGrouptingComparator(){
- super(OrderBean.class,true);
- }
-
- @Override
- public int compare(WritableComparable a, WritableComparable b) {
- OrderBean aBean = (OrderBean)a;
- OrderBean bBean = (OrderBean)b;
-
- int rs ;
- if (aBean.getOrderId() > bBean.getOrderId()){
- rs = 1;
- }else if (aBean.getOrderId() < bBean.getOrderId()){
- rs = -1;
- }else {
- rs = 0;
- }
- return rs;
- }
- }
-
- public class OrderDriver {
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- //配置信息,Job对象
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
-
- //执行类
- job.setJarByClass(OrderBean.class);
-
- //设置Mapper、Reducer类
- job.setMapperClass(OrderMapper.class);
- job.setReducerClass(OrderReducer.class);
-
- //设置Mapper输出数据类型
- job.setMapOutputKeyClass(OrderBean.class);
- job.setMapOutputValueClass(NullWritable.class);
-
- //设置Reducer输出数据类型
- job.setOutputKeyClass(OrderBean.class);
- job.setOutputValueClass(NullWritable.class);
-
- //设置辅助排序
- job.setGroupingComparatorClass(OrderGrouptingComparator.class);
-
- //设置分区类
- job.setPartitionerClass(OrderPartitioner.class);
-
- //设置reducetask数量
- job.setNumReduceTasks(3);
-
- //设置文件输入输出流
- FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\order\\in"));
- FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\order\\out"));
-
- //提交任务
- if (job.waitForCompletion(true)){
- System.out.println("运行完成!");
- }else {
- System.out.println("运行失败!");
- }
- }
- }
由于这是敲了很多次的代码,没有加太多注释,请谅解!
二、Mapreduce整体的流程
1.有一块200M的文本文件,首先将待处理的数据提交客户端;
2.客户端会向Yarn平台提交切片信息,然后Yarn计算出所需要的maptask的数量为2;
3.程序默认使用FileInputFormat的TextInputFormat方法将文件数据读到maptask;
4.maptask运行业务逻辑,然后将数据通过InputOutputContext写入到环形缓冲区;
5.环形缓冲区其实是内存开辟的一块空间,就是内存,当环形缓冲区内数据达到默认大小100M的80%时,发生溢写;
6.溢写出的数据会进行多次的分区排序(shuffle机制,下一个随笔详细解释);
7.分区排序后的数据块可以选择进行Combiner合并,然后写入本地磁盘;
8.reducetask等maptask完全运行完毕后,开始从磁盘中读取maptask产出写出的数据,然后进行合并文件,归并排序(这时就是进行上面辅助排序的时候);
9.Reducer一次读取一组数据,然后使用默认的TextOutputFormat方法将数据写出到结果文件。