一、Hadoop的数据压缩
1.概述
在进行MR程序的过程中,在Mapper和Reducer端会发生大量的数据传输和磁盘IO,如果在这个过程中对数据进行压缩处理,可以有效的减少底层存储(HDFS)读写的字节数,,并且通过减少Map和Reduce阶段数据的输入输出来提升MR程序的速度,提高了网络带宽和磁盘空间的效率;
数据压缩可以有效的节省资源,它是MR程序的优化策略之一;
数据压缩会增加cpu的计算负担,但是能很大程度较少磁盘的IO。由于数据压缩占用cpu资源很小,总体还是利大于弊的。
2.数据压缩使用原则:
运算密集型的任务尽量少用压缩、IO密集型的任务多用压缩。
3.MapReduce支持的压缩编码
hadoop中的压缩格式 |
是否自带 |
文件的拓展名 |
是否可以切分 |
DEFAULT |
是 |
.default |
否 |
Gzip |
是 |
.gz |
否 |
bzip2 |
是 |
.bz2 |
是 |
LZO |
否 |
.lzo |
是 |
Snappy |
否 |
.snappy |
否 |
4.编码解码器
DEFAULT |
org.apache.hadoop.io.compress.DefaultCodeC |
Gzip |
org.apahce.hadoop.io.compress.GzioCodeC |
bzop2 |
org.apache.hadoop.io.compress.bzio2CodeC |
LZO |
com.apache.hadoop.compression.lzoCodeC |
Snappy |
org.apache.hadoop.io.compress.SnappyCodeC |
5.压缩性能
压缩格式 |
原始文件 |
压缩后文件 |
压缩速度 |
解压速度 |
Gzip |
8.3G |
1.8G |
17.5MB/s |
58MB/s |
bzip2 |
8.3G |
1.1G |
2.4MB/s |
9.5MB/s |
LZO |
8.3G |
2.9G |
49MB/s |
74.6MB/s |
二 、Hadoop压缩的使用
1.应用在WordCount程序中
1)在map端对数据进行压缩
在Driver类中的获取job对象后加入配置信息:
- //开启map端的输入压缩
- conf.setBoolean("mapreduce.map.output.compress",true);
-
- //设置压缩方法
- //默认
- conf.setClass("mapreduce.map.output.compress.codec",DefaultCodeC.class,CompressionCodec.class);
- //Bzip2
- conf.setClass("mapreduce.mapt.output.compress.codec",Bzip2Codec.class,CompressionCode.class);
- //LZO
- conf.setClass("mapreduce.map.output.compress.codec".LZOCodec.class,CompressionCodec.class);
注意:在map端开启压缩并不能从结果文件中看到结果的改变,只要程序运行成功就代表设置没问题!
2)在reduce端对数据进行压缩
在设置reduce输出数据类型之后加入配置信息:
- //开启reduce端的输出压缩
- FileOutputFormat.setCompressOutput(job,true);
- //设置压缩方法
- //默认
- FileOutputFormat.setOutputCompressorClass(job,DefaultCodec.class);
- //Bzip2
- FileOutputFormat.setOutputCompressorClass(job,Bzip2Codec.class);
- //Gzip
- FileOutputFormat.setOutputCompressorClass(job,GzipCodec.class);
三种选择一种即可,可以看到对应的结果文件看到被压缩的结果文件。
2.自定义压缩方法
- /**
- * @author: PrincessHug
- * @date: 2019/4/8, 9:49
- * @Blog: https://www.cnblogs.com/HelloBigTable/
- */
- public class TestCompress {
- public static void main(String[] args) throws IOException, ClassNotFoundException {
- Compress("G:\\weblog.log","org.apache.hadoop.io.compress.BZip2Codec");
- }
-
- //自定义压缩方法
- private static void Compress(String fileName,String method) throws IOException, ClassNotFoundException {
- //获取输入流
- FileInputStream fis = new FileInputStream(new File(fileName));
-
- //通过反射获取压缩方法并初始化
- Class cName = Class.forName(method);
- CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(cName, new Configuration());
-
- //定义输出流
- FileOutputStream fos = new FileOutputStream(new File(fileName + codec.getDefaultExtension()));
-
- //创建压缩输出流
- CompressionOutputStream cos = codec.createOutputStream(fos);
-
- //流的拷贝
- IOUtils.copyBytes(fis,cos,2*1024*1024,false);
-
- //关闭资源
- fis.close();
- cos.close();
- fos.close();
- }
- }