经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Spark » 查看文章
spark RDD,reduceByKey vs groupByKey
来源:cnblogs  作者:zzzzMing  时间:2018/10/29 9:46:33  对本文有异议

Spark 中有两个类似的api,分别是 reduceByKey 和 groupByKey 。这两个的功能类似,但底层实现却有些不同,那么为什么要这样设计呢?我们来从源码的角度分析一下。

先看两者的调用顺序(都是使用默认的Partitioner,即defaultPartitioner)

所用 spark 版本:spark 2.1.0

先看reduceByKey

Step1

  1. def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
  2. reduceByKey(defaultPartitioner(self), func)
  3. }

Setp2

  1. def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
  2. combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  3. }

Setp3

  1. def combineByKeyWithClassTag[C](
  2. createCombiner: V => C,
  3. mergeValue: (C, V) => C,
  4. mergeCombiners: (C, C) => C,
  5. partitioner: Partitioner,
  6. mapSideCombine: Boolean = true,
  7. serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  8. require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
  9. if (keyClass.isArray) {
  10. if (mapSideCombine) {
  11. throw new SparkException("Cannot use map-side combining with array keys.")
  12. }
  13. if (partitioner.isInstanceOf[HashPartitioner]) {
  14. throw new SparkException("HashPartitioner cannot partition array keys.")
  15. }
  16. }
  17. val aggregator = new Aggregator[K, V, C](
  18. self.context.clean(createCombiner),
  19. self.context.clean(mergeValue),
  20. self.context.clean(mergeCombiners))
  21. if (self.partitioner == Some(partitioner)) {
  22. self.mapPartitions(iter => {
  23. val context = TaskContext.get()
  24. new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
  25. }, preservesPartitioning = true)
  26. } else {
  27. new ShuffledRDD[K, V, C](self, partitioner)
  28. .setSerializer(serializer)
  29. .setAggregator(aggregator)
  30. .setMapSideCombine(mapSideCombine)
  31. }
  32. }

姑且不去看方法里面的细节,我们会只要知道最后调用的是 combineByKeyWithClassTag 这个方法。这个方法有两个参数我们来重点看一下,

  1. def combineByKeyWithClassTag[C](
  2. createCombiner: V => C,
  3. mergeValue: (C, V) => C,
  4. mergeCombiners: (C, C) => C,
  5. partitioner: Partitioner,
  6. mapSideCombine: Boolean = true,
  7. serializer: Serializer = null)

首先是 partitioner 参数 ,这个即是 RDD 的分区设置。除了默认的 defaultPartitioner,Spark 还提供了 RangePartitioner 和 HashPartitioner 外,此外用户也可以自定义 partitioner 。通过源码可以发现如果是 HashPartitioner 的话,那么是会抛出一个错误的。

然后是 mapSideCombine 参数 ,这个参数正是 reduceByKey 和 groupByKey 最大不同的地方,它决定是是否会先在节点上进行一次 Combine 操作,下面会有更具体的例子来介绍。

然后是groupByKey

Step1

  1. def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
  2. groupByKey(defaultPartitioner(self))
  3. }

Step2

  1. def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
  2. // groupByKey shouldn't use map side combine because map side combine does not
  3. // reduce the amount of data shuffled and requires all map side data be inserted
  4. // into a hash table, leading to more objects in the old gen.
  5. val createCombiner = (v: V) => CompactBuffer(v)
  6. val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
  7. val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
  8. val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
  9. createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
  10. bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  11. }

Setp3

  1. def combineByKeyWithClassTag[C](
  2. createCombiner: V => C,
  3. mergeValue: (C, V) => C,
  4. mergeCombiners: (C, C) => C,
  5. partitioner: Partitioner,
  6. mapSideCombine: Boolean = true,
  7. serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  8. require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
  9. if (keyClass.isArray) {
  10. if (mapSideCombine) {
  11. throw new SparkException("Cannot use map-side combining with array keys.")
  12. }
  13. if (partitioner.isInstanceOf[HashPartitioner]) {
  14. throw new SparkException("HashPartitioner cannot partition array keys.")
  15. }
  16. }
  17. val aggregator = new Aggregator[K, V, C](
  18. self.context.clean(createCombiner),
  19. self.context.clean(mergeValue),
  20. self.context.clean(mergeCombiners))
  21. if (self.partitioner == Some(partitioner)) {
  22. self.mapPartitions(iter => {
  23. val context = TaskContext.get()
  24. new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
  25. }, preservesPartitioning = true)
  26. } else {
  27. new ShuffledRDD[K, V, C](self, partitioner)
  28. .setSerializer(serializer)
  29. .setAggregator(aggregator)
  30. .setMapSideCombine(mapSideCombine)
  31. }
  32. }

结合上面 reduceByKey 的调用链,可以发现最终其实都是调用 combineByKeyWithClassTag 这个方法的,但调用的参数不同。
reduceByKey的调用

  1. combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)

groupByKey的调用

  1. combineByKeyWithClassTag[CompactBuffer[V]](
  2. createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)

正是两者不同的调用方式导致了两个方法的差别,我们分别来看

  • reduceByKey的泛型参数直接是[V],而groupByKey的泛型参数是[CompactBuffer[V]]。这直接导致了 reduceByKey 和 groupByKey 的返回值不同,前者是RDD[(K, V)],而后者是RDD[(K, Iterable[V])]

  • 然后就是mapSideCombine = false 了,这个mapSideCombine 参数的默认是true的。这个值有什么用呢,上面也说了,这个参数的作用是控制要不要在map端进行初步合并(Combine)。可以看看下面具体的例子。

从功能上来说,可以发现 ReduceByKey 其实就是会在每个节点先进行一次合并的操作,而 groupByKey 没有。

这么来看 ReduceByKey 的性能会比 groupByKey 好很多,因为有些工作在节点已经处理了。那么 groupByKey 为什么存在,它的应用场景是什么呢?我也不清楚,如果观看这篇文章的读者知道的话不妨在评论里说出来吧。非常感谢!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号