一、Mapreduce概述
Mapreduce是分布式程序编程框架,也是分布式计算框架,它简化了开发!
Mapreduce将用户编写的业务逻辑代码和自带默认组合整合成一个完整的分布式运算程序,并发的运行在hadoop集群上。
二、Mapreduce优缺点
优点:1.易于编程:只用实现几个接口即可完成一个并发的程序。
2.良好的拓展性:再不行当前程序运行的情况下,可以通过增加节点来解决用户/数据扩展,计算量增加的问题。
3.高容错性:可以运行在廉价的集群机器上。
4.适合处理PB级别以上的离线处理。
缺点:1.不擅长做实时计算、流式计算。
2.不支持DAG(有向图)计算,有依赖的程序(spark支持)。
3.每次把计算结果写入磁盘当中,造成磁盘io,性能较低。
三、Mapreduce编程思想
需求:统计一个200M的单词文件,查询出每个单词出现的次数。
思想:1.将200M的文件切分为两块,128M和72M;
2.将两块文件分别交给两个maptask处理,对数据进行读取,切分,封装,然后传输到reducetask;
3.reducetask将数据再次整合,累加,输出到结果文件中。
注意:mapreduce中的所有maptask都是并行运行的,reducetask也是,
但是reducetask的运行要依赖maptask的输出。
四、WordCount程序
- /**
- * @author: PrincessHug
- * @date: 2019/3/24, 0:52
- * @Blog: https://www.cnblogs.com/HelloBigTable/
- */
- public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- //读取数据
- String line = value.toString();
-
- //切分数据
- String[] fields = line.split(" ");
-
- //传输数据
- for (String f:fields){
- context.write(new Text(f),new IntWritable(1));
- }
- }
- }
-
- public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
- //累加
- int sum = 0;
- for (IntWritable i:values){
- sum += i.get();
- }
- //输出
- context.write(key,new IntWritable(sum));
- }
- }
-
- public class WordCountDriver {
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- //配置,job对象
- Configuration conf = new Configuration();
- Job job = Job.getInstance();
-
- //设置运行类
- job.setJarByClass(WordCountDriver.class);
-
- //设置Mapper,Reducer类
- job.setMapperClass(WordCountMapper.class);
- job.setReducerClass(WordCountReducer.class);
-
- //设置Mapper输出数据类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
-
- //设置Reducer输出数据类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- //设置输入输出流
- FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\wordcount\\in"));
- FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\wordcount\\out"));
-
- //提交任务
- if (job.waitForCompletion(true)){
- System.out.println("运行完成!");
- }else {
- System.out.println("运行失败!");
- }
-
- }
-
- }