经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Hadoop » 查看文章
Hadoop学习(4)-mapreduce的一些注意事项
来源:cnblogs  作者:两千个秘密  时间:2019/7/25 11:02:44  对本文有异议

关于mapreduce的一些注意细节

如果把mapreduce程序打包放到了liux下去运行,

命令java  –cp  xxx.jar 主类名

如果报错了,说明是缺少相关的依赖jar包

用命令hadoop jar xxx.jar 类名因为在集群机器上用 hadoop jar xx.jar mr.wc.JobSubmitter 命令来启动客户端main方法时,hadoop jar这个命令会将所在机器上的hadoop安装目录中的jar包和配置文件加入到运行时的classpath中

那么,我们的客户端main方法中的new Configuration()语句就会加载classpath中的配置文件,自然就有了

fs.defaultFS 和 mapreduce.framework.name 和 yarn.resourcemanager.hostname 这些参数配置

会把本地hadoop的相关的所有jar包都会引用

Mapreduce也有本地的job运行,就是可以不用提交到yarn上,可以以单机的模式跑一边以多个线程模拟也可以。

就是如果不管在Linux下还是windows下,提交job都会默认的提交到本地去运行,

如果在linux默认提交到yarn上运行,需要写配置文件hadoop/etc/mapred-site.xml文件

<configuration>

<property>

<name>mapreduce.framework.name</name>

<value>yarn</value>

</property>

</configuration>

 

Key,value对,如果是自己的类的话,那么这个类要实现Writable,同时要把你想序列化的数据转化成二进制,然后放到重写方法wirte参数的DataOutput里面,另一个readFields重写方法是用来反序列化用的,

注意反序列化的时候,会先拿这个类的无参构造方法构造出一个对象出来,然后再通过readFields方法来复原这个对象。

 

DataOutput也是一种流,只不过是hadoop的在封装,自己用的时候,里面需要加个FileOutputStream对象

DataOutput写字符串的时候要用writeUTF(“字符串”),他这样编码的时候,会在字符串的前面先加上字符串的长度,这是考虑到字符编码对其的问题,hadoop解析的时候就会先读前面两个字节,看一看这个字符串有多长,不然如果用write(字符串.getBytes())这样他不知道这个字符串到底有多少个字节。

 

在reduce阶段,如果把一个对象写到hdfs里面,那么会调用字符串的toString方法,你可以重写这个类的toString方法 

举例,下面这个类就可以在hadoop里序列化

  1. package mapreduce2;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write;
  6. import org.apache.hadoop.io.Writable;
  7. import org.apache.hadoop.util.Waitable;
  8. public class FlowBean implements Writable {
  9. private int up;//上行流量
  10. private int down;//下行流量
  11. private int sum;//总流量
  12. private String phone;//电话号
  13. public FlowBean(int up, int down, String phone) {
  14. this.up = up;
  15. this.down = down;
  16. this.sum = up + down;
  17. this.phone = phone;
  18. }
  19. public int getUp() {
  20. return up;
  21. }
  22. public void setUp(int up) {
  23. this.up = up;
  24. }
  25. public int getDown() {
  26. return down;
  27. }
  28. public void setDown(int down) {
  29. this.down = down;
  30. }
  31. public int getSum() {
  32. return sum;
  33. }
  34. public void setSum(int sum) {
  35. this.sum = sum;
  36. }
  37. public String getPhone() {
  38. return phone;
  39. }
  40. public void setPhone(String phone) {
  41. this.phone = phone;
  42. }
  43. @Override
  44. public void readFields(DataInput di) throws IOException {
  45. //注意这里读的顺序要和写的顺序是一样的
  46. this.up = di.readInt();
  47. this.down = di.readInt();
  48. this.sum = this.up + this.down;
  49. this.phone = di.readUTF();
  50. }
  51. @Override
  52. public void write(DataOutput Do) throws IOException {
  53. Do.writeInt(this.up);
  54. Do.writeInt(this.down);
  55. Do.writeInt(this.sum);
  56. Do.writeUTF(this.phone);
  57. }
  58. @Override
  59. public String toString() {
  60. return "电话号"+this.phone+" 总流量"+this.sum;
  61. }
  62. }

 

 

 当所有的reduceTask都运行完之后,还会调用一个cleanup方法

应用练习:统计一个页面访问总量为n条的数据

方案一:只用一个reducetask,利用cleanup方法,在reducetask阶段,先不直接放到hdfs里面,而是存到一个Treemap里面

再在reducetask结束后,在cleanup里面通过把Treemap里面前五输出到HDFS里面;

  1. package cn.edu360.mr.page.topn;
  2. public class PageCount implements Comparable<PageCount>{
  3. private String page;
  4. private int count;
  5. public void set(String page, int count) {
  6. this.page = page;
  7. this.count = count;
  8. }
  9. public String getPage() {
  10. return page;
  11. }
  12. public void setPage(String page) {
  13. this.page = page;
  14. }
  15. public int getCount() {
  16. return count;
  17. }
  18. public void setCount(int count) {
  19. this.count = count;
  20. }
  21. @Override
  22. public int compareTo(PageCount o) {
  23. return o.getCount()-this.count==0?this.page.compareTo(o.getPage()):o.getCount()-this.count;
  24. }
  25. }

 

map类

  1. import java.io.IOException;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. public class PageTopnMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
  7. @Override
  8. protected void map(LongWritable key, Text value, Context context)
  9. throws IOException, InterruptedException {
  10. String line = value.toString();
  11. String[] split = line.split(" ");
  12. context.write(new Text(split[1]), new IntWritable(1));
  13. }
  14. }

reduce类

  1. package cn.edu360.mr.page.topn;
  2. import java.io.IOException;
  3. import java.util.Map.Entry;
  4. import java.util.Set;
  5. import java.util.TreeMap;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.io.IntWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. public class PageTopnReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
  11. TreeMap<PageCount, Object> treeMap = new TreeMap<>();
  12. @Override
  13. protected void reduce(Text key, Iterable<IntWritable> values,
  14. Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  15. int count = 0;
  16. for (IntWritable value : values) {
  17. count += value.get();
  18. }
  19. PageCount pageCount = new PageCount();
  20. pageCount.set(key.toString(), count);
  21. treeMap.put(pageCount,null);
  22. }
  23. @Override
  24. protected void cleanup(Context context)
  25. throws IOException, InterruptedException {
  26. Configuration conf = context.getConfiguration();
        //可以在cleanup里面拿到configuration,从里面读取要拿前几条数据
  27. int topn = conf.getInt("top.n", 5);
  28. Set<Entry<PageCount, Object>> entrySet = treeMap.entrySet();
  29. int i= 0;
  30. for (Entry<PageCount, Object> entry : entrySet) {
  31. context.write(new Text(entry.getKey().getPage()), new IntWritable(entry.getKey().getCount()));
  32. i++;
  33. if(i==topn) return;
  34. }
  35. }
  36. }

然后jobSubmit类,注意这个要设定Configuration,这里面有几种方法

第一种是加载配置文件

  1. Configuration conf = new Configuration();
  2. conf.addResource("xx-oo.xml");

然后再在xx-oo.xml文件里面写

  1. <configuration>
  2. <property>
  3. <name>top.n</name>
  4. <value>6</value>
  5. </property>
  6. </configuration>

第二种方式

  1.     //通过直接设定
  2. conf.setInt("top.n", 3);
  3. //通过对java主程序 直接传进来的参数
  4. conf.setInt("top.n", Integer.parseInt(args[0]));

第三种方式通过获取配置文件参数

  1.      Properties props = new Properties();
  2. props.load(JobSubmitter.class.getClassLoader().getResourceAsStream("topn.properties"));
  3. conf.setInt("top.n", Integer.parseInt(props.getProperty("top.n")));

然后再在topn.properties里面配置参数

  1. top.n=5

subsubmit类,默认在本机模拟运行

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. public class JobSubmitter {
  9. public static void main(String[] args) throws Exception {
  10. /**
  11. * 通过加载classpath下的*-site.xml文件解析参数
  12. */
  13. Configuration conf = new Configuration();
  14. conf.addResource("xx-oo.xml");
  15. /**
  16. * 通过代码设置参数
  17. */
  18. //conf.setInt("top.n", 3);
  19. //conf.setInt("top.n", Integer.parseInt(args[0]));
  20. /**
  21. * 通过属性配置文件获取参数
  22. */
  23. /*Properties props = new Properties();
  24. props.load(JobSubmitter.class.getClassLoader().getResourceAsStream("topn.properties"));
  25. conf.setInt("top.n", Integer.parseInt(props.getProperty("top.n")));*/
  26. Job job = Job.getInstance(conf);
  27. job.setJarByClass(JobSubmitter.class);
  28. job.setMapperClass(PageTopnMapper.class);
  29. job.setReducerClass(PageTopnReducer.class);
  30. job.setMapOutputKeyClass(Text.class);
  31. job.setMapOutputValueClass(IntWritable.class);
  32. job.setOutputKeyClass(Text.class);
  33. job.setOutputValueClass(IntWritable.class);
  34. FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\url\\input"));
  35. FileOutputFormat.setOutputPath(job, new Path("F:\\mrdata\\url\\output"));
  36. job.waitForCompletion(true);
  37. }
  38. }

 

 

 

额外java知识点补充

Treemap,放进去的东西会自动排序

两种Treemap的自定义方法,第一种是传入一个Comparator

  1. public class TreeMapTest {
  2. public static void main(String[] args) {
  3. TreeMap<FlowBean, String> tm1 = new TreeMap<>(new Comparator<FlowBean>() {
  4. @Override
  5. public int compare(FlowBean o1, FlowBean o2) {
  6. //如果两个类总流量相同的会比较电话号
  7. if( o2.getAmountFlow()-o1.getAmountFlow()==0){
  8. return o1.getPhone().compareTo(o2.getPhone());
  9. }
  10. //如果流量不同,就按从小到大的顺序排序
  11. return o2.getAmountFlow()-o1.getAmountFlow();
  12. }
  13. });
  14. FlowBean b1 = new FlowBean("1367788", 500, 300);
  15. FlowBean b2 = new FlowBean("1367766", 400, 200);
  16. FlowBean b3 = new FlowBean("1367755", 600, 400);
  17. FlowBean b4 = new FlowBean("1367744", 300, 500);
  18. tm1.put(b1, null);
  19. tm1.put(b2, null);
  20. tm1.put(b3, null);
  21. tm1.put(b4, null);
  22. //treeset的遍历
  23. Set<Entry<FlowBean,String>> entrySet = tm1.entrySet();
  24. for (Entry<FlowBean,String> entry : entrySet) {
  25. System.out.println(entry.getKey() +"\t"+ entry.getValue());
  26. }
  27. }
  28. }

第二种是在这个类中,实现一个Comparable接口

  1. package cn.edu360.mr.page.topn;
  2. public class PageCount implements Comparable<PageCount>{
  3. private String page;
  4. private int count;
  5. public void set(String page, int count) {
  6. this.page = page;
  7. this.count = count;
  8. }
  9. public String getPage() {
  10. return page;
  11. }
  12. public void setPage(String page) {
  13. this.page = page;
  14. }
  15. public int getCount() {
  16. return count;
  17. }
  18. public void setCount(int count) {
  19. this.count = count;
  20. }
  21. @Override
  22. public int compareTo(PageCount o) {
  23. return o.getCount()-this.count==0?this.page.compareTo(o.getPage()):o.getCount()-this.count;
  24. }
  25. }

 

 

 

 

原文链接:http://www.cnblogs.com/wpbing/p/11242866.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号