1.排序概述


2.排序分类

3.WritableComparable案例
这个文件,是大数据-Hadoop生态(12)-Hadoop序列化和源码追踪的输出文件,可以看到,文件根据key,也就是手机号进行了字典排序
- 13470253144 180 180 360
- 13509468723 7335 110349 117684
- 13560439638 918 4938 5856
- 13568436656 3597 25635 29232
- 13590439668 1116 954 2070
- 13630577991 6960 690 7650
- 13682846555 1938 2910 4848
- 13729199489 240 0 240
- 13736230513 2481 24681 27162
- 13768778790 120 120 240
- 13846544121 264 0 264
- 13956435636 132 1512 1644
- 13966251146 240 0 240
- 13975057813 11058 48243 59301
- 13992314666 3008 3720 6728
- 15043685818 3659 3538 7197
- 15910133277 3156 2936 6092
- 15959002129 1938 180 2118
- 18271575951 1527 2106 3633
- 18390173782 9531 2412 11943
- 84188413 4116 1432 5548
字段含义分别为手机号,上行流量,下行流量,总流量
需求是根据总流量进行排序
Bean对象,需要实现序列化,反序列化和Comparable接口
- package com.nty.writableComparable;
- import org.apache.hadoop.io.WritableComparable;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- /**
- * author nty
- * date time 2018-12-12 16:33
- */
-
- /**
- * 实现WritableComparable接口
- * 原先将bean序列化时,需要实现Writable接口,现在再实现Comparable接口
- *
- * public interface WritableComparable<T> extends Writable, Comparable<T>
- *
- * 所以我们可以实现Writable和Comparable两个接口,也可以实现WritableComparable接口
- */
- public class Flow implements WritableComparable<Flow> {
- private long upflow;
- private long downflow;
- private long total;
- public long getUpflow() {
- return upflow;
- }
- public void setUpflow(long upflow) {
- this.upflow = upflow;
- }
- public long getDownflow() {
- return downflow;
- }
- public void setDownflow(long downflow) {
- this.downflow = downflow;
- }
- public long getTotal() {
- return total;
- }
- public void setTotal(long total) {
- this.total = total;
- }
- //快速赋值
- public void setFlow(long upflow, long downflow){
- this.upflow = upflow;
- this.downflow = downflow;
- this.total = upflow + downflow;
- }
- @Override
- public String toString() {
- return upflow + "\t" + downflow + "\t" + total;
- }
- //重写compareTo方法
- @Override
- public int compareTo(Flow o) {
- return Long.compare(o.total, this.total);
- }
- //序列化方法
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(upflow);
- out.writeLong(downflow);
- out.writeLong(total);
- }
- //反序列化方法
- @Override
- public void readFields(DataInput in) throws IOException {
- upflow = in.readLong();
- downflow = in.readLong();
- total = in.readLong();
- }
- }
Mapper类
- package com.nty.writableComparable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import java.io.IOException;
- /**
- * author nty
- * date time 2018-12-12 16:47
- */
- public class FlowMapper extends Mapper<LongWritable, Text, Flow, Text> {
- private Text phone = new Text();
- private Flow flow = new Flow();
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- //13470253144 180 180 360
- //分割行数据
- String[] flieds = value.toString().split("\t");
- //赋值
- phone.set(flieds[0]);
- flow.setFlow(Long.parseLong(flieds[1]), Long.parseLong(flieds[2]));
- //写出
- context.write(flow, phone);
- }
- }
Reducer类
- package com.nty.writableComparable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- import java.io.IOException;
- /**
- * author nty
- * date time 2018-12-12 16:47
- */
- //注意一下输出类型
- public class FlowReducer extends Reducer<Flow, Text, Text, Flow> {
- @Override
- protected void reduce(Flow key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- for (Text value : values) {
- //输出
- context.write(value,key);
- }
- }
- }
Driver类
- package com.nty.writableComparable;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- /**
- * author nty
- * date time 2018-12-12 16:47
- */
- public class FlowDriver {
- public static void main(String[] args) throws Exception {
- //1. 获取Job实例
- Configuration configuration = new Configuration();
- Job instance = Job.getInstance(configuration);
- //2. 设置类路径
- instance.setJarByClass(FlowDriver.class);
- //3. 设置Mapper和Reducer
- instance.setMapperClass(FlowMapper.class);
- instance.setReducerClass(FlowReducer.class);
- //4. 设置输出类型
- instance.setMapOutputKeyClass(Flow.class);
- instance.setMapOutputValueClass(Text.class);
- instance.setOutputKeyClass(Text.class);
- instance.setOutputValueClass(Flow.class);
- //5. 设置输入输出路径
- FileInputFormat.setInputPaths(instance, new Path("d:\\Hadoop_test"));
- FileOutputFormat.setOutputPath(instance, new Path("d:\\Hadoop_test_out"));
- //6. 提交
- boolean b = instance.waitForCompletion(true);
- System.exit(b ? 0 : 1);
- }
- }
结果

4.GroupingComparator案例
订单id 商品id 商品金额
- 0000001 Pdt_01 222.8
- 0000002 Pdt_05 722.4
- 0000001 Pdt_02 33.8
- 0000003 Pdt_06 232.8
- 0000003 Pdt_02 33.8
- 0000002 Pdt_03 522.8
- 0000002 Pdt_04 122.4
求出每一个订单中最贵的商品
需求分析:
1) 将订单id和商品金额作为key,在Map阶段先用订单id升序排序,如果订单id相同,再用商品金额降序排序
2) 在Reduce阶段,用groupingComparator按照订单分组,每一组的第一个即是最贵的商品
先定义bean对象,重写序列化反序列话排序方法
- package com.nty.groupingComparator;
- import org.apache.hadoop.io.WritableComparable;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- /**
- * author nty
- * date time 2018-12-12 18:07
- */
- public class Order implements WritableComparable<Order> {
- private String orderId;
- private String productId;
- private double price;
- public String getOrderId() {
- return orderId;
- }
- public Order setOrderId(String orderId) {
- this.orderId = orderId;
- return this;
- }
- public String getProductId() {
- return productId;
- }
- public Order setProductId(String productId) {
- this.productId = productId;
- return this;
- }
- public double getPrice() {
- return price;
- }
- public Order setPrice(double price) {
- this.price = price;
- return this;
- }
- @Override
- public String toString() {
- return orderId + "\t" + productId + "\t" + price;
- }
- @Override
- public int compareTo(Order o) {
- //先按照订单排序,正序
- int compare = this.orderId.compareTo(o.getOrderId());
- if(0 == compare){
- //订单相同,再比较价格,倒序
- return Double.compare( o.getPrice(),this.price);
- }
- return compare;
- }
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeUTF(orderId);
- out.writeUTF(productId);
- out.writeDouble(price);
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- this.orderId = in.readUTF();
- this.productId = in.readUTF();
- this.price = in.readDouble();
- }
- }
Mapper类
- package com.nty.groupingComparator;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import java.io.IOException;
- /**
- * author nty
- * date time 2018-12-12 18:07
- */
- public class OrderMapper extends Mapper<LongWritable, Text, Order, NullWritable> {
- private Order order = new Order();
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- //0000001 Pdt_01 222.8
- //分割行数据
- String[] fields = value.toString().split("\t");
- //为order赋值
- order.setOrderId(fields[0]).setProductId(fields[1]).setPrice(Double.parseDouble(fields[2]));
- //写出
- context.write(order,NullWritable.get());
- }
- }
GroupingComparator类
- package com.nty.groupingComparator;
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.io.WritableComparator;
- /**
- * author nty
- * date time 2018-12-12 18:08
- */
- public class OrderGroupingComparator extends WritableComparator {
- //用作比较的对象的具体类型
- public OrderGroupingComparator() {
- super(Order.class,true);
- }
- //重写的方法要选对哦,一共有三个,选择参数为WritableComparable的方法
- //默认的compare方法调用的是a,b对象的compare方法,但是现在我们排序和分组的规则不一致,所以要重写分组规则
- @Override
- public int compare(WritableComparable a, WritableComparable b) {
- Order oa = (Order) a;
- Order ob = (Order) b;
- //按照订单id分组
- return oa.getOrderId().compareTo(ob.getOrderId());
- }
- }
Reducer类
- package com.nty.groupingComparator;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.mapreduce.Reducer;
- import java.io.IOException;
- /**
- * author nty
- * date time 2018-12-12 18:07
- */
- public class OrderReducer extends Reducer<Order, NullWritable,Order, NullWritable> {
- @Override
- protected void reduce(Order key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
- //每一组的第一个即是最高价商品,不需要遍历
- context.write(key, NullWritable.get());
- }
- }
Driver类
- package com.nty.groupingComparator;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import java.io.IOException;
- /**
- * author nty
- * date time 2018-12-12 18:07
- */
- public class OrderDriver {
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- //1获取实例
- Configuration configuration = new Configuration();
- Job job = Job.getInstance(configuration);
- //2设置类路径
- job.setJarByClass(OrderDriver.class);
- //3.设置Mapper和Reducer
- job.setMapperClass(OrderMapper.class);
- job.setReducerClass(OrderReducer.class);
- //4.设置自定义分组类
- job.setGroupingComparatorClass(OrderGroupingComparator.class);
- //5. 设置输出类型
- job.setMapOutputKeyClass(Order.class);
- job.setMapOutputValueClass(NullWritable.class);
- job.setOutputKeyClass(Order.class);
- job.setOutputValueClass(NullWritable.class);
- //6. 设置输入输出路径
- FileInputFormat.setInputPaths(job, new Path("d:\\Hadoop_test"));
- FileOutputFormat.setOutputPath(job, new Path("d:\\Hadoop_test_out"));
- //7. 提交
- boolean b = job.waitForCompletion(true);
- System.exit(b ? 0 : 1);
- }
- }
输出结果
