经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Hadoop » 查看文章
Mapreduce的序列化和流量统计程序开发
来源:cnblogs  作者:给你一个公主抱  时间:2019/3/25 9:11:01  对本文有异议

一、Hadoop数据序列化的数据类型

  Java数据类型 => Hadoop数据类型

  int         IntWritable

  float        FloatWritable

  long        LongWritable

  double         DoubleWritable

  String       Text

  boolean      BooleanWritable

  byte        ByteWritable

  map          MapWritable

  array        ArrayWritable

二、Hadoop的序列化

  1.什么是序列化?

   在java中,序列化接口是Serializable,它下面又实现了很多的序列化接口,所以java的序列化是一个重量级的序列化框架,一个对象被java序列化之后会附带很多额外的信息(校验信息、header、继承体系等),不便于在网络中进行高效的传输,所以Hadoop开发了一套自己的序列化框架——Writable。

      序列化就是把内存当中的对象,转化为字节序列以便于存储和网络传输;

   反序列化是将收到的字节序列或硬盘当中的持续化数据,转换成内存中的对象。

  2.序列化的理解方法(自己悟的,不对勿喷~~)

    比如下面流量统计案例中,流量的封装类FlowBean实现了Writable接口,其中定义了变量upFlow、dwFlow、flowSum;

    在Mapper和Reducer类中初始化封装类FlowBean时,内存会分配空间加载这些对象,而这些对象不便于在网络中高效的传输,这是封装类FlowBean中的序列化方法将这些对象转换为字节序列,方便了存储和传输;

    当Mapper或Reducer需要将这些对象的字节序列写出到磁盘时,封装类FlowBean中的反序列化方法将字节序列转换为对象,然后写道磁盘中。

  3.序列化特点

   序列化与反序列化时分布式数据处理当中经常会出现的,比如hadoop通信是通过远程调用(rpc)实现的,这个过程就需要序列化。

  特点:1)紧凑;

     2)快速

     3)可扩展

     4)可互操作

三、Mapreduce的流量统计程序案例

  1.代码

  1. /**
  2. * @author: PrincessHug
  3. * @date: 2019/3/23, 23:38
  4. * @Blog: https://www.cnblogs.com/HelloBigTable/
  5. */
  6. public class FlowBean implements Writable {
  7. private long upFlow;
  8. private long dwFlow;
  9. private long flowSum;
  10.  
  11. public long getUpFlow() {
  12. return upFlow;
  13. }
  14.  
  15. public void setUpFlow(long upFlow) {
  16. this.upFlow = upFlow;
  17. }
  18.  
  19. public long getDwFlow() {
  20. return dwFlow;
  21. }
  22.  
  23. public void setDwFlow(long dwFlow) {
  24. this.dwFlow = dwFlow;
  25. }
  26.  
  27. public long getFlowSum() {
  28. return flowSum;
  29. }
  30.  
  31. public void setFlowSum(long flowSum) {
  32. this.flowSum = flowSum;
  33. }
  34.  
  35. public FlowBean() {
  36. }
  37.  
  38. public FlowBean(long upFlow, long dwFlow) {
  39. this.upFlow = upFlow;
  40. this.dwFlow = dwFlow;
  41. this.flowSum = upFlow + dwFlow;
  42. }
  43.  
  44. /**
  45. * 序列化
  46. * @param out 输出流
  47. * @throws IOException
  48. */
  49. @Override
  50. public void write(DataOutput out) throws IOException {
  51. out.writeLong(upFlow);
  52. out.writeLong(dwFlow);
  53. out.writeLong(flowSum);
  54. }
  55.  
  56. /**
  57. * 反序列化
  58. * @param in
  59. * @throws IOException
  60. */
  61. @Override
  62. public void readFields(DataInput in) throws IOException {
  63. upFlow = in.readLong();
  64. dwFlow = in.readLong();
  65. flowSum = in.readLong();
  66. }
  67.  
  68. @Override
  69. public String toString() {
  70. return upFlow + "\t" + dwFlow + "\t" + flowSum;
  71. }
  72. }
  73.  
  74. public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
  75. @Override
  76. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  77. //获取数据
  78. String line = value.toString();
  79.  
  80. //切分数据
  81. String[] fields = line.split("\t");
  82.  
  83. //封装数据
  84. String phoneNum = fields[1];
  85. long upFlow = Long.parseLong(fields[fields.length - 3]);
  86. long dwFlow = Long.parseLong(fields[fields.length - 2]);
  87.  
  88. //发送数据
  89. context.write(new Text(phoneNum),new FlowBean(upFlow,dwFlow));
  90. }
  91. }
  92.  
  93. public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
  94. @Override
  95. protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
  96. //聚合数据
  97. long upFlow_sum = 0;
  98. long dwFlow_sum = 0;
  99. for (FlowBean f:values){
  100. upFlow_sum += f.getUpFlow();
  101. dwFlow_sum += f.getDwFlow();
  102. }
  103. //发送数据
  104. context.write(key,new FlowBean(upFlow_sum,dwFlow_sum));
  105. }
  106. }
  107.  
  108.  
  109. public class FlowPartitioner extends Partitioner<Text,FlowBean> {
  110. @Override
  111. public int getPartition(Text key, FlowBean value, int i) {
  112. //获取用来分区的电话号码前三位
  113. String phoneNum = key.toString().substring(0, 3);
  114. //设置分区逻辑
  115. int partitionNum = 4;
  116. if ("135".equals(phoneNum)){
  117. return 0;
  118. }else if ("137".equals(phoneNum)){
  119. return 1;
  120. }else if ("138".equals(phoneNum)){
  121. return 2;
  122. }else if ("139".equals(phoneNum)){
  123. return 3;
  124. }
  125. return partitionNum;
  126. }
  127. }
  128. public class FlowCountDriver {
  129. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  130. //获取配置,定义工具
  131. Configuration conf = new Configuration();
  132. Job job = Job.getInstance();
  133.  
  134. //设置运行类
  135. job.setJarByClass(FlowCountDriver.class);
  136.  
  137. //设置Mapper类及Mapper输出数据类型
  138. job.setMapperClass(FlowCountMapper.class);
  139. job.setMapOutputKeyClass(Text.class);
  140. job.setMapOutputValueClass(FlowBean.class);
  141.  
  142. //设置Reducer类及其输出数据类型
  143. job.setReducerClass(FlowCountReducer.class);
  144. job.setOutputKeyClass(Text.class);
  145. job.setOutputValueClass(FlowBean.class);
  146.  
  147. //设置自定义分区
  148. job.setPartitionerClass(FlowPartitioner.class);
  149. job.setNumReduceTasks(5);
  150.  
  151. //设置文件输入输出流
  152. FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\flow\\in"));
  153. FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\flow\\inpartitionout"));
  154.  
  155. //返回运行完成
  156. if (job.waitForCompletion(true)){
  157. System.out.println("运行完毕!");
  158. }else {
  159. System.out.println("运行出错!");
  160. }
  161. }
  162. }

 

 

 

  

原文链接:http://www.cnblogs.com/HelloBigTable/p/10590705.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号