0%

深入解读 Spark 宽依赖和窄依赖(ShuffleDependency & NarrowDependency)

目前,网上有关宽窄依赖的博客大多都使用下面这张图作为讲解:

Image for post

实际上,这幅图所表达的内容并不完善。其中,窄依赖的内容表达的不够全面,而宽依赖的部分容易让人产生误解。本文,我将用新的绘图带大家搞清楚究竟什么是宽依赖(ShuffleDependency),什么是窄依赖(NarrowDependency)。

为什么会有宽窄依赖?

我们知道,在 Spark 中,数据抽象表示为统一的数据模型 RDD。每一次对 RDD 进行转换(Transformation)操作,我们都会得到一个新的 RDD。例如,rdd2 = rdd1.map(func)。那么,前后的 RDD 自然就形成了某种联系,即新生成的 child RDD 会依赖旧的 parent RDD。而这其中的问题,实际上就是新生成的 RDD 的分区如何依赖 parent RDD 的分区。

对于某些一元操作,比如 map()filter() 等,child RDD 的各个分区分别只依赖 parent RDD 中的各个分区,是一一映射的关系。而对于某些聚合操作,比如reduceByKey()groupByKey() 等,在计算的时候需要对 parent RDD 的各个分区进行计算,child RDD 的各个分区可能都只依赖 parent RDD 各个分区中的一部分,是多对一映射的关系。所以,Spark 所做的就是抽象出可以通用的方法,来处理各种情况的依赖。

目前,Spark 将这些依赖关系分为两大类:宽依赖(ShuffleDependency)和窄依赖(NarrowDependency)。

窄依赖 NarrowDependency

在官方文档中 API-NarrowDependency,窄依赖的描述为:

Base class for dependencies where each partition of the child RDD depends on a small number of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.

即 child RDD 中的每个分区都依赖 parent RDD 中的一小部分分区。那么如何理解这句话呢,我们首先来看下面这张图。

1-NarrowDependency

本图囊括了有关窄依赖的各种依赖情况,我们一一来看。

  • OneToOneDependency:一对一依赖。从图中我们可以看出,child RDD 中的每个分区都只依赖 parent RDD 中的一个分区,并且 child RDD 的分区数和 parent RDD 的分区数相同。这种我们称之为 OneToOneDependency。属于这种依赖关系的转换算子有 map()flatMap()filter() 等。通过阅读 Spark 源码,我们可以发现,这些算子生成的 RDD 的依赖关系使用的就是 OneToOneDependency 这个类。

    1
    2
    3
    4
    5
    6
    7
    8
    /**
    * :: DeveloperApi ::
    * Represents a one-to-one dependency between partitions of the parent and child RDDs.
    */
    @DeveloperApi
    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
    override def getParents(partitionId: Int): List[Int] = List(partitionId)
    }
  • RangeDependency:范围依赖。child RDD 和 parent RDD 的分区经过划分,每个范围内的父子 RDD 的分区都为一一对应的关系。属于这种依赖关系的转换算子有 union() 等。通过阅读源码,我们可以看到,在 UnionRDDgetDependencies() 方法中,创建了一个 RangeDependency 类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    override def getDependencies: Seq[Dependency[_]] = {
    val deps = new ArrayBuffer[Dependency[_]]
    var pos = 0
    for (rdd <- rdds) {
    deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length)
    pos += rdd.partitions.length
    }
    deps
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    /**
    * :: DeveloperApi ::
    * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
    * @param rdd the parent RDD
    * @param inStart the start of the range in the parent RDD
    * @param outStart the start of the range in the child RDD
    * @param length the length of the range
    */
    @DeveloperApi
    class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
    extends NarrowDependency[T](rdd) {

    override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
    List(partitionId - outStart + inStart)
    } else {
    Nil
    }
    }
    }
  • NarrowDependency: 窄依赖类。通过代码我们可以发现,上面的 OneToOneDependencyRangeDependency 都继承了 NarrowDependency 这个类。现在我们来看下上图的下半部分。

    • 左边我们可以看作是多对一的依赖,属于这种依赖关系的转换算子有特殊情形的 join()cogroup() 等。为什么说特殊情形呢,用 cogroup() 举例。cogroup() 可以聚合多个 RDD,其中如果某些 parent RDD 和 child RDD 的 partitioner 和分区数相同(比如,都为 HashPartitioner),那么这些 parent RDD 的分区就可以直接流入到 child RDD 的对应分区中,为 OneToOneDependency 情形。而其它不符合这种条件的分区,则为 ShuffleDependency

    • 右边我们可以看做是多对多的依赖,属于这种依赖关系的转换算子有 cartesian()。可能大家一不注意就把这种情形当成是 ShuffleDependency 了,但通过源码我们可以发现,CartesianRDD 中创建了两个 NarrowDependency 完成了笛卡尔乘积操作,属于窄依赖。

      1
      2
      3
      4
      5
      6
      7
      8
      override def getDependencies: Seq[Dependency[_]] = List(
      new NarrowDependency(rdd1) {
      def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
      },
      new NarrowDependency(rdd2) {
      def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
      }
      )

宽依赖 ShuffleDependency

接下来我们再来看宽依赖。在官方文档中 API-ShuffleDependency,宽依赖的描述为:

Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle, the RDD is transient since we don’t need it on the executor side.

官方这里并没有从 RDD 分区角度来解释什么是 ShuffleDependency ,只是说需要 shuffle 的两个 Stage 的依赖。那到底什么是 ShuffleDependency 呢?我们来看下图。

看到这可能有的同学会说,这不和 NarrowDependency 一样么?仔细看,NarrowDependency 虽然也有 child RDD 的一个分区依赖 parent RDD 的多个分区的情况,但都是依赖分区的全部。而 ShuffleDependency 中,child RDD 的一个分区依赖的是 parent RDD 中各个分区的某一部分。如上图左半部分,child RDD 的两个分区分别只依赖 parent RDD 中的 1 和 2 部分。而计算出 1 或者 2 部分的过程,以及 child RDD 分别读取 1 和 2 的过程,即为 shuffle write/shuffle read,这个过程正是 shuffle 开销所在。

总结

简单来说,NarrowDependency 为 parent RDD 的一个或多个分区的数据全部流入到 child RDD 的一个或多个分区,而 ShuffleDependency 则为 parent RDD 的每个分区的每一部分,分别流入到 child RDD 的不同分区。

Spark 之所以要将依赖关系分为 NarrowDependencyShuffleDependency ,是可以更好的将各种依赖类型进行分类,明确数据怎么流出流入,从而更容易生成对应的物理执行计划。NarrowDependency 不需要 shuffle 操作,并且可以用于流式操作(pipeline)。ShuffleDependency 则需要进行 shuffle 操作,有 shuffle 的地方需要划分不同的 stage。

以上就是有关 Spark 中 NarrowDependencyShuffleDependency 的介绍。


我是因特马,一个爱分享的斜杠程序员~

欢迎关注我的公众号:因特马

  • 本文作者: 因特马
  • 本文链接: https://www.interhorse.cn/a/2301943088/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!