标签:
1. 启动脚本
sbin/start-master.sh
"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
(1)SPARK_MASTER_IP
(2)SPARK_MASTER_PORT
(3)SPARK_MASTER_WEBUI_PORT
Master类最终会通过bin/spark-class脚本启动。
其中的参数“1”用于表示master编号,在生成日志文件时起作用,并不会传入Master类。
spark-xxx-org.apache.spark.deploy.master.Master-1-CentOS-01.outspark-xxx-org.apache.spark.deploy.master.Master-1.pid
def main(argStrings: Array[String]) {SignalLogger.register(log)val conf = new SparkConfval args = new MasterArguments(argStrings, conf)val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)actorSystem.awaitTermination()}
(1)创建MasterArguments对象并初始化其成员;
(2)调用startSystemAndActor方法,创建ActorSystem对象并启动Master actor;
parse(args.toList)// This mutates the SparkConf, so all accesses to it must be made after this linepropertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
(1)parse方法负责解析启动脚本所带的命令行参数;
(2)loadDefaultSparkProperties负责从配置文件中加载spark运行属性,默认而配置文件为spark-defaults.conf;
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,securityManager = securityMgr)val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
(1)通过AkkaUtils.createActorSystem创建ActorSystem对象
(2)创建Master actor并启动
val workers = new HashSet[WorkerInfo]val idToWorker = new HashMap[String, WorkerInfo]val addressToWorker = new HashMap[Address, WorkerInfo]val apps = new HashSet[ApplicationInfo]val idToApp = new HashMap[String, ApplicationInfo]val actorToApp = new HashMap[ActorRef, ApplicationInfo]val addressToApp = new HashMap[Address, ApplicationInfo]val waitingApps = new ArrayBuffer[ApplicationInfo]val completedApps = new ArrayBuffer[ApplicationInfo]var nextAppNumber = 0val appIdToUI = new HashMap[String, SparkUI]val drivers = new HashSet[DriverInfo]val completedDrivers = new ArrayBuffer[DriverInfo]val waitingDrivers = new ArrayBuffer[DriverInfo] // Drivers currently spooled for scheduling
// Listen for remote client disconnection events, since they don‘t go through Akka‘s watch()context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
监听RemotingLifecycleEvent事件,它一个trait:
sealed trait RemotingLifecycleEvent extends Serializable {def logLevel: Logging.LogLevel}
Master只处理了DisassociatedEvent消息。
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
启动定时器,检查Worker超时;以Work超时时间为周期,向Master发送CheckForWorkerTimeOut消息;默认超时时间为60秒,可通过spark.worker.timeout属性设置。
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {case "ZOOKEEPER" =>logInfo("Persisting recovery state to ZooKeeper")val zkFactory =new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))case "FILESYSTEM" =>val fsFactory =new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system))(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))case "CUSTOM" =>val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))val factory = clazz.getConstructor(conf.getClass, Serialization.getClass).newInstance(conf, SerializationExtension(context.system)).asInstanceOf[StandaloneRecoveryModeFactory](factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))case _ =>(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))}
根据RECOVERY_MODE创建持久化引擎和领导选择代理。RECOVERY_MODE默认值为NONE,通过spark.deploy.recoveryMode进行配置。
假设RECOVERY_MODE值为NONE。
(1)创建BlackHolePersistenceEngine对象,不做任何持久化操作;
(2)创建MonarchyLeaderAgent对象,其主构造函数将向Master发送ElectedLeader消息
case ElectedLeader => {val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {RecoveryState.ALIVE} else {RecoveryState.RECOVERING}logInfo("I have been elected leader! New state: " + state)if (state == RecoveryState.RECOVERING) {beginRecovery(storedApps, storedDrivers, storedWorkers)recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,CompleteRecovery)}}
前面假设RECOVERY_MODE值为NONE,所以不执行任何recovery操作,直接将state设置为RecoveryState.ALIVE。
case CheckForWorkerTimeOut => {timeOutDeadWorkers()}
检查超时Worker节点。Worker节点超时时间默认为60秒,通过spark.worker.timeout属性设置。
case DisassociatedEvent(_, address, _) => {// The disconnected client could‘ve been either a worker or an app; remove whichever it waslogInfo(s"$address got disassociated, removing it.")addressToWorker.get(address).foreach(removeWorker)addressToApp.get(address).foreach(finishApplication)if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }}
这是Worker和Master之间的注册消息。
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,sender, workerUiPort, publicAddress)if (registerWorker(worker)) {persistenceEngine.addWorker(worker)sender ! RegisteredWorker(masterUrl, masterWebUiUrl)schedule()}
(1)创建WorkerInfo对象;
(2)调用registerWorker方法,记录Worker信息;
(3)向Worker发送RegisteredWorker消息;
(4)调用schedule方法,该方法的职责是为Driver和App分配资源。
private[spark] class WorkerInfo(val id: String,val host: String,val port: Int,val cores: Int,val memory: Int,val actor: ActorRef,val webUiPort: Int,val publicAddress: String)extends Serializable {...init()...private def init() {executors = new mutable.HashMapdrivers = new mutable.HashMapstate = WorkerState.ALIVEcoresUsed = 0memoryUsed = 0lastHeartbeat = System.currentTimeMillis()}
创建WorkerInfo对象,并调用init进行初始化。
workers.filter { w =>(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)}.foreach { w =>workers -= w}
移除状态位DEAD的WorkerInfo
val workerAddress = worker.actor.path.addressif (addressToWorker.contains(workerAddress)) {val oldWorker = addressToWorker(workerAddress)if (oldWorker.state == WorkerState.UNKNOWN) {// A worker registering from UNKNOWN implies that the worker was restarted during recovery.// The old worker must thus be dead, so we will remove it and accept the new worker.removeWorker(oldWorker)} else {logInfo("Attempted to re-register worker at same address: " + workerAddress)return false}}workers += workeridToWorker(worker.id) = workeraddressToWorker(workerAddress) = worker
记录WorkInfo信息至workers、idToWorker、addressToWorker。
到此,启动过程就完成了。
接下来开始等待worker及driver消息请求。
标签:
原文地址:http://www.cnblogs.com/linker1119/p/4441118.html