码迷,mamicode.com
首页 > 其他好文 > 详细

Spark Shuffle 详解(1)

时间:2016-07-15 21:24:19      阅读:168      评论:0      收藏:0      [点我收藏+]

标签:

版本:1.6.2

不管是hadoop中map/reduce还是spark中各种算子,shuffle过程都是其中核心过程,shuffle的设计是否高效,基本确定了整个计算过程是否高效。 设计难点在于shuffle过程涉及到大数据的IO操作(包括本地临时文件IO和网络IO),以及可能存在的cpu密集型排序计算操作。
在spark1.6.2版本,spark针对大型数据有三种shuffle 机制,即“sort-based shuffle”,”hash-based shuffle”,”tungsten-sort shuffle"
 


下面 是官方对其的描述:

/**
 * In sort-based shuffle, incoming records are sorted according to their target partition ids, then
 * written to a single map output file. Reducers fetch contiguous regions of this file in order to
 * read their portion of the map output. In cases where the map output data is too large to fit in
 * memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged
 * to produce the final output file.
 *
 * Sort-based shuffle has two different write paths for producing its map output files:
 *
 *  - Serialized sorting: used when all three of the following conditions hold:
 *    1. The shuffle dependency specifies no aggregation or output ordering.
 *    2. The shuffle serializer supports relocation of serialized values (this is currently
 *       supported by KryoSerializer and Spark SQL‘s custom serializers).
 *    3. The shuffle produces fewer than 16777216 output partitions.
 *  - Deserialized sorting: used to handle all other cases.
 *
 * -----------------------
 * Serialized sorting mode
 * -----------------------
 *
 * In the serialized sorting mode, incoming records are serialized as soon as they are passed to the
 * shuffle writer and are buffered in a serialized form during sorting. This write path implements
 * several optimizations:
 *
 *  - Its sort operates on serialized binary data rather than Java objects, which reduces memory
 *    consumption and GC overheads. This optimization requires the record serializer to have certain
 *    properties to allow serialized records to be re-ordered without requiring deserialization.
 *    See SPARK-4550, where this optimization was first proposed and implemented, for more details.
 *
 *  - It uses a specialized cache-efficient sorter ([[ShuffleExternalSorter]]) that sorts
 *    arrays of compressed record pointers and partition ids. By using only 8 bytes of space per
 *    record in the sorting array, this fits more of the array into cache.
 *
 *  - The spill merging procedure operates on blocks of serialized records that belong to the same
 *    partition and does not need to deserialize records during the merge.
 *
 *  - When the spill compression codec supports concatenation of compressed data, the spill merge
 *    simply concatenates the serialized and compressed spill partitions to produce the final output
 *    partition.  This allows efficient data copying methods, like NIO‘s `transferTo`, to be used
 *    and avoids the need to allocate decompression or copying buffers during the merge.
 *
 * For more details on these optimizations, see SPARK-7081.
 */

 本文针对shuffle相关的代码逻辑做一次串读,其中包括shuffle的原理,以及shuffle代码级别的实现。 


Job,Stage,Task, Dependency

在Spark中,RDD是操作对象的单位,其中操作可以分为转换(transformation)和动作(actions),只有动作操作才会触发一个spark计算操作。
以rdd.map操作和rdd.count操作做比较


/**
 * Return a new RDD by applying a function to all elements of this RDD.
 */
def map[U: ClassTag](f: => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[UT](this(contextpiditer) => iter.map(cleanF))
}

/**
 * Return the number of elements in the RDD.
 */
def count(): Long = sc.runJob(thisUtils.getIteratorSize _).sum

map是一个转换操作,它只是在当前的rdd的基础上创建一个MapPartitionsRDD对象,而count是一个动作操作,它会调用sc.runJob向spark提交一个Job

Job是一组rdd的转换以及最后动作的操作集合,它是Spark里面计算最大最虚的概念,甚至在spark的任务页面中都无法看到job这个单位。 但是不管怎么样,在spark用户的角度,job是我们计算目标的单位,每次在一个rdd上做一个动作操作(acions)时,都会触发一个job,完成计算并返回我们想要的数据。

Job是由一组RDD上转换和动作组成,这组RDD之间的转换关系表现为一个有向无环图(DAG),每个RDD的生成依赖于前面1个或多个RDD。
在Spark中,两个RDD之间的依赖关系是Spark的核心。站在RDD的角度,两者依赖表现为点对点依赖, 但是在Spark中,RDD存在分区(partition)的概念,两个RDD之间的转换会被细化为两个RDD分区之间的转换。

技术分享

如上图所示,站在job角度,RDD_B由RDD_A转换而成,RDD_D由RDD_C转换而成,最后RDD_E由RDD_B和RDD_D转换,最后输出RDD_E上做了一个动作,将结果输出。 但是细化到RDD内分区之间依赖,RDD_B对RDD_A的依赖,RDD_D对RDD_C的依赖是不一样,他们的区别用专业词汇来描述即为窄依赖和宽依赖

所谓的窄依赖是说子RDD中的每一个数据分区只依赖于父RDD中的对应的有限个固定的数据分区,而宽依赖是指子RDD中的每个数据分区依赖于父RDD中的所有数据分区。

宽依赖很好理解,但是对于窄依赖比较绕口,特别是定义中有限与固定两个要求,宽依赖也满足有限和固定这两个要求?难道他们俩个之间区别也仅仅在于“有限”这个数字的大小? 其实就是这样的理解,“有限”就表现为所依赖的分区数目相比完整分区数相差很大,而且spark靠窄依赖来实现的RDD基本上都大部分都是一对一的依赖,所以就不需要纠结这个有限的关键字


这里还有一个问题,count操作是依赖父RDD的所有分区进行计算而得到,那么它是宽依赖吗?这么疑问,答案肯定就是否定的,首先这里依赖是父RDD和子RDD之间的关系描述,count操作只有输出, 没有子rdd的概念,就不要把依赖的关系硬套上给你带来麻烦。看上面的实现,count只是把sc.runJob计算返回的Array[U]做一次sum操作而已。

窄依赖和宽依赖的分类是Spark中很重要的特性,不同依赖在实现,任务调度机制,容错恢复上都有不同的机制。

  • 实现上:对于窄依赖,rdd之间的转换可以直接pipe化而宽依赖需要采用shuffle过程来实现
  • 任务调度上:窄依赖意味着可以在某一个计算节点上直接通过父RDD的某几块数据(通常是一块)计算得到子RDD某一块的数据; 而相对的,宽依赖意味着子RDD某一块数据的计算必须等到它的父RDD所有数据都计算完成之后才可以进行,而且需要对父RDD的计算结果需要经过shuffle才能被下一个rdd所操作。
  • 容错恢复上:窄依赖的错误恢复会比宽依赖的错误恢复要快很多,因为对于窄依赖来说,只有丢失的那一块数据需要被重新计算, 而宽依赖意味着所有的祖先RDD中所有的数据块都需要被重新计算一遍,这也是我们建议在长“血统”链条特别是有宽依赖的时候,需要在适当的时机设置一个数据检查点以避免过长的容错恢复。

在这边 可以使用:RDD.checkpoint的方法来实现检查点
/**
 * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
 * directory set with `SparkContext#setCheckpointDirand all references to its parent
 * RDDs will be removed. This function must be called before any job has been
 * executed on this RDD. It is strongly recommended that this RDD is persisted in
 * memory, otherwise saving it on a file will require recomputation.
 */
def checkpoint(): Unit = RDDCheckpointData.synchronized {
  // NOTE: we use a global lock here due to complexities downstream with ensuring
  // children RDD partitions point to the correct parent partitions. In the future
  // we should revisit this consideration.
  if (context.checkpointDir.isEmpty) {
    throw new SparkException("Checkpoint directory has not been set in the SparkContext")
  } else if (checkpointData.isEmpty) {
    checkpointData Some(new ReliableRDDCheckpointData(this))
  }
}

理清楚了Job层面RDD之间的关系,RDD层面分区之间的关系,那么下面讲述一下Stage概念。

Stage的划分是对一个Job里面一系列RDD转换和动作进行划分。

  • 首先job是因动作而产生,因此每个job肯定都有一个ResultStage,否则job就不会启动。
  • 其次,如果Job内部RDD之间存在宽依赖,Spark会针对它产生一个中间Stage,即为ShuffleStage,严格来说应该是ShuffleMapStage,这个stage是针对父RDD而产生的, 相当于在父RDD上做一个父rdd.map().collect()的操作。ShuffleMapStage生成的map输入,对于子RDD,如果检测到所自己所“宽依赖”的stage完成计算,就可以启动一个shuffleFectch, 从而将父RDD输出的数据拉取过程,进行后续的计算。

因此一个Job由一个ResultStage和多个ShuffleMapStage组成。

无Shuffle Job的执行过程

对一个无Shuffle的job执行过程的剖析可以知晓我们执行一个"动作"时,spark的处理流程. 下面我们就以一个简单例子进行讲解:

sc.textFile(“mahuacai").count
//def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

 

这个例子很简单就是统计这个文件的行数;上面一行代码,对应了下面三个过程中:

  • sc.textFile(“mahucai")会返回一个rdd,
  • 然后在这个rdd上做count动作,触发了一次Job的提交sc.runJob(this, Utils.getIteratorSize _)
  • 对runJob返回的Array结构进行sum操作;

核心过程就是第二步,下面我们以代码片段的方式来描述这个过程,这个过程肯定是线性的,就用step来标示每一步,以及相关的代码类:


//step1:SparkContext

/**
 * Run a function on a given set of partitions in an RDD and return the results as an array.
 */
def runJob[TU: ClassTag](
    rdd: RDD[T],
    func: (TaskContextIterator[T]) => U,
    partitions: Seq[Int]): Array[U] = {
  val results = new Array[U](partitions.size)
  runJob[TU](rddfuncpartitions(index, res) => results(index) = res)
  results
}

sc.runJob(this, Utils.getIteratorSize _)的过程会经过一组runJob的重载函数,进入上述step1中的runJob函数,相比原始的runJob,到达这边做的工作不多,比如设置partitions个数, Utils.getIteratorSize _到func转化等,以后像这样简单的过程就不再描述.

Step1做的一个很重要的工作是构造一个Array,并构造一个函数对象"(index, res) => results(index) = res"继续传递给runJob函数,然后等待runJob函数运行结束,将results返回; 对这里的解释相当在runJob添加一个回调函数,将runJob的运行结果保存到Array到, 回调函数,index表示mapindex, res为单个map的运行结果,对于我们这里例子.res就为每个分片的 文件行数.


//step2:SparkContext
/**
 * Run a function on a given set of partitions in an RDD and pass the results to the given
 * handler function. This is the main entry point for all actions in Spark.
 */
def runJob[T, U: ClassTag](
    rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD‘s recursive dependencies:\n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}


Step2中runJob就有一个resultHandler参数,这就是Step1构造的回调函数,dagScheduler是Spark里面最外层调度器,通过调用它的runJob函数,将相关参见传入到Spark调度器中. 只有Step1中的runJob函数的返回值有返回值,这里的runJob,包括dagScheduler.runJob都是没有返回值的;返回是通过Step1的回调函数进行设置的.

为什么我要一再强调返回值是通过Step1的回调函数来设置的?这个很重要,否则你都不知道spark调度的job的运行结果是怎么样被我们自己的逻辑代码所获取的!!

还有一点很重要,Step2是Step1以后的直接步骤,所以Step2中的dagScheduler.runJob是堵塞的操作,即直到Spark完成Job的运行之前,rdd.doCheckpoint()是不会执行的;


//Step3:DAGScheduler
/**
 * Run an action job on the given RDD and pass all the results to the resultHandler function as
 * they arrive.
 *
 * @param rdd target RDD to run tasks on
 * @param func a function to run on each partition of the RDD
 * @param partitions set of partitions to run on; some jobs may not want to compute on all
 *   partitions of the target RDD, e.g. for operations like first()
 * @param callSite where in the user program this job was called
 * @param resultHandler callback to pass each result to
 * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
 *
 * @throws Exception when the job fails
 */
def runJob[TU](
    rdd: RDD[T],
    func: (TaskContextIterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
  val start = System.nanoTime
  val waiter = submitJob(rddfuncpartitionscallSiteresultHandlerproperties)
  waiter.awaitResult() match {
    case JobSucceeded =>
      logInfo("Job %d finished: %s, took %f s".format
        (waiter.jobIdcallSite.shortForm(System.nanoTime - start) / 1e9))
    case JobFailed(exception: Exception) =>
      logInfo("Job %d failed: %s, took %f s".format
        (waiter.jobIdcallSite.shortForm(System.nanoTime - start) / 1e9))
      // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
      val callerStackTrace = Thread.currentThread().getStackTrace.tail
      exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
      throw exception
  }
}

Step2中说了dagScheduler.runJob是堵塞的,堵塞就堵塞在Step3的waiter.awaitResult()操作,即submitJob会返回一个waiter对象,而我们的awaitResult()就堵塞了;

到目前为止,我们终于从runJob这个多处出现的函数名称跳到submitJob这个函数名称;继续下一步


//Step4:DAGScheduler
/**
 * Submit an action job to the scheduler.
 *
 * @param rdd target RDD to run tasks on
 * @param func a function to run on each partition of the RDD
 * @param partitions set of partitions to run on; some jobs may not want to compute on all
 *   partitions of the target RDD, e.g. for operations like first()
 * @param callSite where in the user program this job was called
 * @param resultHandler callback to pass each result to
 * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
 *
 * @return a JobWaiter object that can be used to block until the job finishes executing
 *         or can be used to cancel the job.
 *
 * @throws IllegalArgumentException when partitions ids are illegal
 */
def submitJob[TU](
    rdd: RDD[T],
    func: (TaskContextIterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {
  // Check to make sure we are not launching a task on a partition that does not exist.
  val maxPartitions = rdd.partitions.length
  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
    throw new IllegalArgumentException(
      "Attempting to access a non-existent partition: " + p + ". " +
        "Total number of partitions: " + maxPartitions)
  }

  val jobId = nextJobId.getAndIncrement()
  if (partitions.size == 0) {
    // Return immediately if the job is running 0 tasks
    return new JobWaiter[U](thisjobId0resultHandler)
  }

  assert(partitions.size > 0)
  val func2 = func.asInstanceOf[(TaskContextIterator[_]) => _]
  val waiter = new JobWaiter(thisjobIdpartitions.sizeresultHandler)
  eventProcessLoop.post(JobSubmitted(
    jobIdrddfunc2partitions.toArraycallSitewaiter,
    SerializationUtils.clone(properties)))
  waiter
}

在Step4的submitJob中,我们给这次job分配了一个jobID, 通过创建了一个JobWaiter对象,返回给Step3;最重要的步骤就是调用eventProcessLoop.post(JobSubmitted(jobIdrddfunc2partitions.toArraycallSitewaiter,SerializationUtils.clone(properties)))向DAG调度器 发送一个JobSubmitted的消息;

/**
 * Put the event into the event queue. The event thread will process it later.
 */
def post(event: E): Unit = {
  eventQueue.put(event)
}

到目前为止我们都没有关系函数的参数,这里我们要分析一下发送的JobSubmitted的消息包:
  • jobId,rdd,func2,partitions.toArray这几个都比较好理解,就不阐述了
  • callSite/properties:个人不是很感兴趣,姑且理解为不重要的
  • waiter就是上面创建的JobWaiter对象,这个很重要,因为这个对象封装了几个重要的参数:
    • jobId:Job编号
    • partitions.size:分区编号
    • resultHandler:我们Step1设置的回调函数

为什么JobWaiter重要,这个对象包含了我们分区的个数.我们知道分区的个数和task个数是相同的,因此JobWaiter成功返回的前提是: 它接受到partitions.size个归属于jobid的task成功运行的结果,并通过resultHandler来将这些task运行结果回调给Step2的Array

这句话应该不难理解,其实这句话也包含了我们后面job调度的整体过程, 下面我们就一步一步来分析从job到Stage,到task以及直到task运行成功,调用我们的resultHandler回调的过程.


//Step5:DAGScheduler
private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContextIterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {
  var finalStage: ResultStage = null
  try {
    // New stage creation may throw an exception if, for example, jobs are run on a
    // HadoopRDD whose underlying HDFS files have been deleted.
    finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
  } catch {
    case e: Exception =>
      logWarning("Creating new stage failed due to exception - job: " + jobIde)
      listener.jobFailed(e)
      return
  }

  val job = new ActiveJob(jobIdfinalStagecallSitelistenerproperties)
  clearCacheLocs()
  logInfo("Got job %s (%s) with %d output partitions".format(
    job.jobIdcallSite.shortFormpartitions.length))
  logInfo("Final stage: " + finalStage + " (" + finalStage.name ")")
  logInfo("Parents of final stage: " + finalStage.parents)
  logInfo("Missing parents: " + getMissingParentStages(finalStage))

  val jobSubmissionTime = clock.getTimeMillis()
  jobIdToActiveJob(jobId) = job
  activeJobs += job
  finalStage.setActiveJob(job)
  val stageIds = jobIdToStageIds(jobId).toArray
  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
  listenerBus.post(
    SparkListenerJobStart(job.jobIdjobSubmissionTimestageInfosproperties))
  submitStage(finalStage)

  submitWaitingStages()
}
Step4发送的消息最后被Step5中的handleJobSubmitted函数进行处理,我这里删除了handleJobSubmitted中很多我们不关心的代码,Step5的核心代码就是创建一个finalStage, 并调用 submitStage将stage提交给Dag进行调度;这里我们从Job单位层面进入Stage层;

这个Stage命名很好:
finalStage,它是整个DAG上的最后一个stage,它不是一个集合,而是单一的stage,这说明一个道理,runJob肯定只对应一个finalStage,即最终的输出肯定只有一个, 中间的stage就是我们传说中的shuffleStage,shuffleStage的生成就是在生成finalStage过程中生成的,即newStage.

那么我们就进入newResultStage这个函数,等一下我们还会回到submitStage,来分析怎么将Stage解析为Task提交给Spark进行运行;

//Step5.1:DAGScheduler
/**
 * Create a ResultStage associated with the provided jobId.
 */
private def newResultStage(
    rdd: RDD[_],
    func: (TaskContextIterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {
  val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rddjobId)
  val stage = new ResultStage(idrddfuncpartitionsparentStagesjobIdcallSite)
  stageIdToStage(id) = stage
  updateJobIdStageIdMaps(jobIdstage)
  stage
}

getParentStagesAndId 的具体的实现
/**
 * Helper function to eliminate some code re-use when creating new stages.
 */
private def getParentStagesAndId(rdd: RDD[_]firstJobId: Int): (List[Stage], Int) = {
  val parentStages = getParentStages(rdd, firstJobId)
  val id = nextStageId.getAndIncrement()
  (parentStagesid)
}

getParentStages 的具体的实现
从实现的代码里面 不难看出 这边使用的 是图算法里面的 广度遍历

/**
 * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
 * the provided firstJobId.
 */
private def getParentStages(rdd: RDD[_]firstJobId: Int): List[Stage] = {
  val parents = new HashSet[Stage] // 存储parent stage
  val visited = new HashSet[RDD[_]] //存储已经被访问过的RDD
  // We are manually maintaining a stack here to prevent StackOverflowError
  // caused by recursively visiting // 存储需要被处理的RDD。Stack中得RDD都需要被处理。
  val waitingForVisit = new Stack[RDD[_]]
  def visit(r: RDD[_]) {
    if (!visited(r)) {
      visited += r
      // Kind of ugly: need to register RDDs with the cache here since
      // we can‘t do it in its constructor because # of partitions is unknown
      for (dep <- r.dependencies) {
        dep match {
          case shufDep: ShuffleDependency[___] =>
            parents += getShuffleMapStage(shufDep, firstJobId) // 在ShuffleDependency时需要生成新的stage
          case _ =>
            waitingForVisit.push(dep.rdd)
        }
      }
    }
  }
  waitingForVisit.push(rdd) // 输入的rdd作为第一个需要处理的RDD。然后从该rdd开始,顺序访问其parent rdd
  while (waitingForVisit.nonEmpty) { //只要stack不为空,则一直处理。 
    visit(waitingForVisit.pop()) //每次visit如果遇到了ShuffleDependency,那么就会形成一个Stage,否则这些RDD属于同一个Stage 
  }
  parents.toList
}


RDD[_] 表示 是任何类型的 RDD


getShuffleMapStage 的逻辑
**
 * Get or create a shuffle map stage for the given shuffle dependency‘s map side.
 */
private def getShuffleMapStage(
    shuffleDep: ShuffleDependency[___],
    firstJobId: Int): ShuffleMapStage = {
  shuffleToMapStage.get(shuffleDep.shuffleIdmatch {
    case Some(stage) => stage
    case None =>
      // We are going to register ancestor shuffle dependencies
      getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
        shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(depfirstJobId)
      }
      // Then register current shuffleDep
      val stage = newOrUsedShuffleStage(shuffleDepfirstJobId)
      shuffleToMapStage(shuffleDep.shuffleId) = stage
      stage
  }
}


这边 可以简单的看一下 关于的 ShuffleDependency
@DeveloperApi
class ShuffleDependency[K: ClassTagV: ClassTagC: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[KV]],
    val partitioner: Partitioner,
    val serializer: Option[Serializer] = None,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[KVC]] = None,
    val mapSideCombine: Boolean false)
  extends Dependency[Product2[KV]] {

  override def rdd: RDD[Product2[KV]] = _rdd.asInstanceOf[RDD[Product2[KV]]]

  private[spark] val keyClassNameString = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassNameString = reflect.classTag[V].runtimeClass.getName
  // Note: It‘s possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleIdInt = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId_rdd.partitions.sizethis)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}


Step5.1中首先是在当前的rdd上调用getParentStagesAndId来生成父Stage,父Stages是一个列表;我们这里分析的cache是没有Shuffle的,那么肯定就没有父Stage这个过程;我们就不深入 去分析这个过程;

然后就创建一个Stage对象,并更新Stage和job之间的关系.

/**
 * Helper function to eliminate some code re-use when creating new stages.
 */
private def getParentStagesAndId(rdd: RDD[_]firstJobId: Int): (List[Stage], Int) = {
  val parentStages = getParentStages(rddfirstJobId)
  val id = nextStageId.getAndIncrement()
  (parentStagesid)
}



下面我们要从维度5.1跳转到一个和执行流程无关的代码,即Stage类的实现,毕竟是Spark的核心对象,对它的理解还是很重要的;
官方对stage 的解释 说明

/**
 * A stage is a set of parallel tasks all computing the same function that need to run as part
 * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
 * by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
 * DAGScheduler runs these stages in topological order.
 *
 * Each Stage can either be a shuffle map stage, in which case its tasks‘ results are input for
 * other stage(s), or a result stage, in which case its tasks directly compute a Spark action
 * (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also
 * track the nodes that each output partition is on.
 *
 * Each Stage also has a firstJobId, identifying the job that first submitted the stage.  When FIFO
 * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
 * faster on failure.
 *
 * Finally, a single stage can be re-executed in multiple attempts due to fault recovery. In that
 * case, the Stage object will track multiple StageInfo objects to pass to listeners or the web UI.
 * The latest one will be accessible through latestInfo.
 *
 * @param id Unique stage ID
 * @param rdd RDD that this stage runs on: for a shuffle map stage, it‘s the RDD we run map tasks
 *   on, while for a result stage, it‘s the target RDD that we ran an action on
 * @param numTasks Total number of tasks in stage; result stages in particular may not need to
 *   compute all partitions, e.g. for first(), lookup(), and take().
 * @param parents List of stages that this stage depends on (through shuffle dependencies).
 * @param firstJobId ID of the first job this stage was part of, for FIFO scheduling.
 * @param callSite Location in the user program associated with this stage: either where the target
 *   RDD was created, for a shuffle map stage, or where the action for a result stage was called.
 */



private[scheduler] abstract class Stage(
    val id: Int,
    val rdd: RDD[_],
    val numTasks: Int,
    val parents: List[Stage],
    val firstJobId: Int,
    val callSite: CallSite)
  extends Logging {

  val numPartitions = rdd.partitions.length

  /** Set of jobs that this stage belongs to. */
  val jobIds new HashSet[Int]

  val pendingPartitions new HashSet[Int]

  /** The ID to use for the next new attempt for this stage. */
  private var nextAttemptIdInt 0

  val nameString = callSite.shortForm
  val detailsString = callSite.longForm

  private var _internalAccumulatorsSeq[Accumulator[Long]] = Seq.empty

  /** Internal accumulators shared across all tasks in this stage. */
  def internalAccumulatorsSeq[Accumulator[Long]] = _internalAccumulators

  /**
   * Re-initialize the internal accumulators associated with this stage.
   *
   * This is called every time the stage is submitted, *except* when a subset of tasks
   * belonging to this stage has already finished. Otherwise, reinitializing the internal
   * accumulators here again will override partial values from the finished tasks.
   */
  def resetInternalAccumulators(): Unit = {
    _internalAccumulators = InternalAccumulator.create(rdd.sparkContext)
  }

  /**
   * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
   * here, before any attempts have actually been created, because the DAGScheduler uses this
   * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
   * have been created).
   */
  private var _latestInfo: StageInfo = StageInfo.fromStage(thisnextAttemptId)

  /**
   * Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these
   * failures in order to avoid endless retries if a stage keeps failing with a FetchFailure.
   * We keep track of each attempt ID that has failed to avoid recording duplicate failures if
   * multiple tasks from the same stage attempt fail (SPARK-5945).
   */
  private val fetchFailedAttemptIds new HashSet[Int]

  private[scheduler] def clearFailures() : Unit = {
    fetchFailedAttemptIds.clear()
  }

  /**
   * Check whether we should abort the failedStage due to multiple consecutive fetch failures.
   *
   * This method updates the running set of failed stage attempts and returns
   * true if the number of failures exceeds the allowable number of failures.
   */
  private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = {
    fetchFailedAttemptIds.add(stageAttemptId)
    fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES
  }

  /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
  def makeNewStageAttempt(
      numPartitionsToCompute: Int,
      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
    _latestInfo = StageInfo.fromStage(
      thisnextAttemptIdSome(numPartitionsToCompute)taskLocalityPreferences)
    nextAttemptId += 1
  }

  /** Returns the StageInfo for the most recent attempt for this stage. */
  def latestInfo: StageInfo = _latestInfo

  override final def hashCode(): Int = id

  override final def equals(other: Any): Boolean = other match {
    case stage: Stage => stage != null && stage.id == id
    case _ => false
  }

  /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
  def findMissingPartitions(): Seq[Int]
}

private[scheduler] object Stage {
  // The number of consecutive failures allowed before a stage is aborted
  val MAX_CONSECUTIVE_FETCH_FAILURES 4
}


官方的解释

/**
 * ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
 * They occur right before each shuffle operation, and might contain multiple pipelined operations
 * before that (e.g. map and filter). When executed, they save map output files that can later be
 * fetched by reduce tasks. The `shuffleDepfield describes the shuffle each stage is part of,
 * and variables like `outputLocsand `numAvailableOutputstrack how many map outputs are ready.
 *
 * ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
 * For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that
 * there can be multiple ActiveJobs trying to compute the same shuffle map stage.
 */


private[spark] class ShuffleMapStage(
    id: Int,
    rdd: RDD[_],
    numTasks: Int,
    parents: List[Stage],
    firstJobId: Int,
    callSite: CallSite,
    val shuffleDep: ShuffleDependency[___])
  extends Stage(idrddnumTasksparentsfirstJobIdcallSite) {

  private[thisvar _mapStageJobsList[ActiveJob] = Nil

  private[thisvar _numAvailableOutputsInt 0

  /**
   * List of [[MapStatus]] for each partition. The index of the array is the map partition id,
   * and each value in the array is the list of possible [[MapStatus]] for a partition
   * (a single task might run multiple times).
   */
  private[thisval outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)

  override def toStringString "ShuffleMapStage " + id

  /**
   * Returns the list of active jobs,
   * i.e. map-stage jobs that were submitted to execute this stage independently (if any).
   */
  def mapStageJobsSeq[ActiveJob] = _mapStageJobs

  /** Adds the job to the active job list. */
  def addActiveJob(job: ActiveJob): Unit = {
    _mapStageJobs = job :: _mapStageJobs
  }

  /** Removes the job from the active job list. */
  def removeActiveJob(job: ActiveJob): Unit = {
    _mapStageJobs = _mapStageJobs.filter(_ != job)
  }

  /**
   * Number of partitions that have shuffle outputs.
   * When this reaches [[numPartitions]], this map stage is ready.
   * This should be kept consistent as `outputLocs.filter(!_.isEmpty).size`.
   */
  def numAvailableOutputsInt _numAvailableOutputs

  /**
   * Returns true if the map stage is ready, i.e. all partitions have shuffle outputs.
   * This should be the same as `outputLocs.contains(Nil)`.
   */
 def isAvailable: Boolean = _numAvailableOutputs == numPartitions

  /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
  override def findMissingPartitions(): Seq[Int] = {
    val missing = (until numPartitions).filter(id => outputLocs(id).isEmpty)
    assert(missing.size == numPartitions _numAvailableOutputs,
      s"${missing.size} missing, expected ${numPartitions _numAvailableOutputs}")
    missing
  }

  def addOutputLoc(partition: Int, status: MapStatus): Unit = {
    val prevList = outputLocs(partition)
    outputLocs(partition) = status :: prevList
    if (prevList == Nil) {
      _numAvailableOutputs += 1
    }
  }

  def removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit = {
    val prevList = outputLocs(partition)
    val newList = prevList.filterNot(_.location == bmAddress)
    outputLocs(partition) = newList
    if (prevList != Nil && newList == Nil) {
      _numAvailableOutputs -= 1
    }
  }

  /**
   * Returns an array of [[MapStatus]] (index by partition id). For each partition, the returned
   * value contains only one (i.e. the first) [[MapStatus]]. If there is no entry for the partition,
   * that position is filled with null.
   */
  def outputLocInMapOutputTrackerFormat(): Array[MapStatus] = {
    outputLocs.map(_.headOption.orNull)
  }

  /**
   * Removes all shuffle outputs associated with this executor. Note that this will also remove
   * outputs which are served by an external shuffle server (if one exists), as they are still
   * registered with this execId.
   */
  def removeOutputsOnExecutor(execId: String): Unit = {
    var becameUnavailable = false
    for (partition <- until numPartitions) {
      val prevList = outputLocs(partition)
      val newList = prevList.filterNot(_.location.executorId == execId)
      outputLocs(partition) = newList
      if (prevList != Nil && newList == Nil) {
        becameUnavailable = true
        _numAvailableOutputs -= 1
      }
    }
    if (becameUnavailable) {
      logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
        thisexecId_numAvailableOutputsnumPartitionsisAvailable))
    }
  }
}

首先我们看ShuffleMapStage(是 stage 的实现 )几个字段,其中shuffleDep和parents最为重要,首先如果一个Stage的shuffleDep不为空,那么当前的Stage是因为shuffleMap输出而生成的Stage;

怎么解释呢?shuffleDep就是该Stage的生成原因;因为下游rdd对当前的rdd有这个依赖而生成在当前rdd上生成一个Stage. 因此FinalStage,shuffleDep值为none

parents参数就是父Stage列表,当前rdd被调度的前提是所有的父Stage都调度完成;对于我们当前研究这个case来说,shuffleDep和parents都为none;

Stage这个类还有两个比较重要的函数:


//ShuffleMapStage.class
/**
 * Returns true if the map stage is ready, i.e. all partitions have shuffle outputs.
 * This should be the same as `outputLocs.contains(Nil)`.
 */
def isAvailableBoolean _numAvailableOutputs == numPartitions

def addOutputLoc(partition: Int, status: MapStatus): Unit = {
  val prevList = outputLocs(partition)
  outputLocs(partition) = status :: prevList
  if (prevList == Nil) {
    _numAvailableOutputs += 1
  }
}




Spark Shuffle 详解(1)

标签:

原文地址:http://blog.csdn.net/mahuacai/article/details/51916428

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!