一、Dependency

Spark RDD五大属性之一的:依赖列表(Dependency),不仅描述父子 RDD 的血缘关系,更关键描述了父子 RDD 的partitions之间的关系。同时也是判断是否需要划分stage的关键,而stage的划分一定伴随着shuffle

spark 的依赖通过抽象类定义

@DeveloperApi
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

其有两个实现类

  • ShuffleDependency:宽依赖
  • NarrowDependency:窄依赖

1.1 窄依赖

指的是父 RDD中的一个分区最多只会被子 RDD中的一个分区使用,同时意味着父子 RDD 的计算逻辑可以在同一个 task 中运行,不需要shuffle进行数据重组。

其中窄依赖包括两种:

  1. OneToOneDependency: 一对一依赖
  2. RangeDependency: 范围依赖

常用的mapflatmapflatmap产生的 RDD 中使用的依赖都是OneToOneDependency,其分区的依赖示意图如下

窄依赖

RangeDependency通常仅在union算子是被使用,其分区的依赖示意图如下

窄依赖 RangeDependency

注意:上述两种均属于窄依赖,所以一些博主所说的判断宽窄依赖的方式为一对一、多对一是不准确的

1.2 宽依赖

指的是父 RDD 的一个分区会被子 RDD 多个分区或所有分区所依赖,因为父 RDD 的一个分区数据需要被切分来形成子 RDD 的各个分区,因此宽依赖也意味着从父 RDD 计算出子 RDD 的过程需要经历 shuffle 过程。常见的算子如reduceByKeygroupByKeyjoin等,这类宽依赖算子往往伴随着 stage 切分以及 shuffle 过程,通常又被称为 shuffle 算子

这类 shuffle 算子产生的 RDD 一定是宽依赖吗?是否一定伴随着 shuffle 过程?spark 内部会有哪些不为人知的代码细节

二、不产生 shuffle 的 shuffle 算子

2.1 原理分析

在阅读源码时产生 shuffle 操作的算子返回的是ShuffledRDD,该算子Dependency属性被赋值为ShuffleDependency,但通常意义上的 shuffle 算子在源码层级并不是一味得返回ShuffleDependency,例如reduceByKey

def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
  // key 为 Array 类型不允许使用 HashPartitioner 分区器以及不允许进行预聚合
  // 因为 Array 在计算 hash 值可能会产生不确定性
  if (keyClass.isArray) {
    if (mapSideCombine) {
      throw SparkCoreErrors.cannotUseMapSideCombiningWithArrayKeyError()
    }
    if (partitioner.isInstanceOf[HashPartitioner]) {
      throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
    }
  }
  // 定义累加器,用于预聚合
  val aggregator = new Aggregator[K, V, C](
    self.context.clean(createCombiner),
    self.context.clean(mergeValue),
    self.context.clean(mergeCombiners))
  // 核心代码,判断是否需要 shuffle 
  if (self.partitioner == Some(partitioner)) {
    self.mapPartitions(iter => {
      val context = TaskContext.get()
      new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
    }, preservesPartitioning = true)
  } else {
    new ShuffledRDD[K, V, C](self, partitioner)
      .setSerializer(serializer)
      .setAggregator(aggregator)
      .setMapSideCombine(mapSideCombine)
  }
}

核心逻辑是if (self.partitioner == Some(partitioner)),其中self.partitioner为调用者的分区器,而reduceByKey的调用者即为父 RDD,Some(partitioner)为传入的分区器可以理解为子 RDD 的分区器,需要注意的是这类 shuffle 算子一定需要一个分区器(这也是 RDD 五大属性之一),开发中通常都没有传入此时 spark 会通过一系列复杂的判断来赋默认值。因此上面代码的逻辑表明:如何父子 RDD 的分区器相同返回由mapPartitions产生的 RDD(窄依赖)否则创建一个ShuffledRDD(宽依赖)

分区器的源码如下:

abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

因此父子分区器相同意味着

  1. 分区数相同
  2. 分区逻辑相同

例如HashPartitionerequals方法

override def equals(other: Any): Boolean = other match {
  case h: HashPartitioner =>
    h.numPartitions == numPartitions // 同数据类型表示分区逻辑相同判断分区数
  case _ =>
    false // 数据类型不同视为不同分区器
}

2.2 实战演示

package fun.uhope.practise

import fun.uhope.util.InitSparkContext
import org.apache.spark.HashPartitioner

import java.util.concurrent.TimeUnit


object P2 extends Serializable {
  def main(args: Array[String]): Unit = {
    val sc = InitSparkContext.withLocal()
    sc.parallelize(seq = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9), numSlices = 4) // 四个分区
      .map((_, 1)) // 构造一个 kv 结构数据
      .partitionBy(new HashPartitioner(4))
      .reduceByKey(new HashPartitioner(4), _ + _)
      .map(_._1)
      .foreach(println)

    TimeUnit.DAYS.sleep(1)
    sc.stop()
  }
}

本地运行在 http://127.0.0.1:4040 查看任务 DAG(注意观察日志,4040 端口如果被占用 spark 会递增选取端口)

任务 DAG

因为reduceByKey与父 RDD 使用的是相同分区器,数据归属的分区已经在父 RDD 被确认好不需要再做一次 shuffle。因此这种情况的reduceByKey产生的依赖的窄依赖