Spark on Yarn分yarn-cluster和yarn-client两种模式。 
本文通过Cluster模式的TaskScheduler实现入手,梳理一遍spark on yarn的大致实现逻辑。
前提我对两种模式以及yarn任务的整体运行逻辑不是很清楚。
cluster模式中,使用的TaskScheduler是YarnClusterScheduler。 
它继承了默认使用的TaskSchedulerImpl类,额外在postStartHook方法里,唤醒了ApplicationMaster类的设置sparkcontext的方法。 
ApplicationMaster相当于是spark在yarn上的AM,内部的YarnRMClient类,负责向RM注册和注销AM,以及拿到attemptId。注册AM之后,得到一个可以申请/释放资源的YarnAllocationHandler类,从而可以维护container与executor之间的关系。
下节具体介绍几个主要类的实现逻辑。
ApplicationMaster,通过YarnRMClient来完成自己的注册和注销。
AM的启动方式
/**
 * This object does not provide any special functionality. It exists so that it‘s easy to tell
 * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps.
 */
object ExecutorLauncher {
  def main(args: Array[String]) = {
    ApplicationMaster.main(args)
  }
}
main里面调用AM的run方法:
  def main(args: Array[String]) = {
    SignalLogger.register(log)
    val amArgs = new ApplicationMasterArguments(args)
    SparkHadoopUtil.get.runAsSparkUser { () =>
      master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs))
      System.exit(master.run())
    }
  }
如果AM的启动参数里有用户自己定义的类,则是Driver模式,即cluster模式。
run方法里 
1. 如果不是Driver模式,执行runExecutorLauncher逻辑: 
启动后,执行registerAM,里面new了YarnAllocator的实现,调用allocateResources,申请并执行container。同时,启动一个reporter线程,每隔一段时间调用YarnAllocator的allocateResources方法,或汇报有太多executor fail了。 
2. 如果是Driver模式,执行runDriver逻辑: 
也是执行registerAM,但是之前需要反射执行jar包里用户定义的driver类。
YarnAllocator负责向yarn申请和释放containers,维护containe、executor相关关系,有一个线程池。申请到container之后,在container里执行ExecutorRunnable。需要子类实现的是申请和释放这两个方法:
protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse
protected def releaseContainer(container: Container): UnitYarnAllocationHandler继承了YarnAllocator。
ExecutorRunnable与Yarn的关系: 
1.  向ContainerManager建立连接,让cm来startContainer。 
2. ContainerLaunchContext包含了yarn的NodeManager启动一个container需要的所有信息。ExecutorRunnable会构建这个container申请信息。 
可以参考这段启动逻辑:
def startContainer = {
    logInfo("Setting up ContainerLaunchContext")
    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
      .asInstanceOf[ContainerLaunchContext]
    ctx.setContainerId(container.getId())
    ctx.setResource(container.getResource())
    val localResources = prepareLocalResources
    ctx.setLocalResources(localResources)
    val env = prepareEnvironment
    ctx.setEnvironment(env)
    ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
    val dob = new DataOutputBuffer()
    credentials.writeTokenStorageToStream(dob)
    ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
    val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
      appAttemptId, localResources)
    logInfo("Setting up executor with commands: " + commands)
    ctx.setCommands(commands)
    ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
    // If external shuffle service is enabled, register with the Yarn shuffle service already
    // started on the NodeManager and, if authentication is enabled, provide it with our secret
    // key for fetching shuffle files later
    if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
      val secretString = securityMgr.getSecretKey()
      val secretBytes =
        if (secretString != null) {
          // This conversion must match how the YarnShuffleService decodes our secret
          JavaUtils.stringToBytes(secretString)
        } else {
          // Authentication is not enabled, so just provide dummy metadata
          ByteBuffer.allocate(0)
        }
      ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes))
    }
    // Send the start request to the ContainerManager
    val startReq = Records.newRecord(classOf[StartContainerRequest])
    .asInstanceOf[StartContainerRequest]
    startReq.setContainerLaunchContext(ctx)
    cm.startContainer(startReq)
  }
值得注意的是setServiceData方法,如果在node manager上启动了external shuffle service。Yarn的AuxiliaryService支持在NodeManager上启动辅助服务。spark有一个参数spark.shuffle.service.enabled来设置该服务是否被启用,我看的1.2.0版本里貌似没有服务的实现代码。
此外,从ExecutorRunnableUtil的prepareCommand方法可以得知,ExecutorRunnable通过命令行启动了CoarseGrainedExecutorBackend进程,与粗粒度的mesos模式和standalone模式一致,task最终落到CoarseGrainedExecutorBackend里面执行。
全文完:)
Spark on Yarn: Cluster模式Scheduler实现
原文地址:http://blog.csdn.net/pelick/article/details/43836563