标签:
原创文章,转载请注明:转载自 周岳飞博客(http://www.cnblogs.com/zhouyf/)

try {validate()// Start the streaming scheduler in a new thread, so that thread local properties// like call sites and job groups can be reset without affecting those of the// current thread.ThreadUtils.runInNewThread("streaming-start") {sparkContext.setCallSite(startSite.get)sparkContext.clearJobGroup()sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()).asInstanceOf[Properties])scheduler.start()}state = StreamingContextState.ACTIVE} catch {case NonFatal(e) =>logError("Error starting the context, marking it as stopped", e)scheduler.stop(false)state = StreamingContextState.STOPPEDthrow e}
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)}eventLoop.start()// attach rate controllers of input streams to receive batch completion updatesfor {inputDStream <- ssc.graph.getInputStreamsrateController <- inputDStream.rateController} ssc.addStreamingListener(rateController)listenerBus.start()receiverTracker = new ReceiverTracker(ssc)inputInfoTracker = new InputInfoTracker(ssc)executorAllocationManager = ExecutorAllocationManager.createIfEnabled(ssc.sparkContext,receiverTracker,ssc.conf,ssc.graph.batchDuration.milliseconds,clock)executorAllocationManager.foreach(ssc.addStreamingListener)receiverTracker.start()jobGenerator.start()executorAllocationManager.foreach(_.start())logInfo("Started JobScheduler")
private def processEvent(event: JobSchedulerEvent) {try {event match {case JobStarted(job, startTime) => handleJobStart(job, startTime)case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)case ErrorReported(m, e) => handleError(m, e)}} catch {case e: Throwable =>reportError("Error in job scheduler", e)}}
/** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }

/** Generate jobs and perform checkpoint for the given `time`. */private def generateJobs(time: Time) {// Checkpoint all RDDs marked for checkpointing to ensure their lineages are// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")Try {jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batchgraph.generateJobs(time) // generate jobs using allocated block} match {case Success(jobs) =>val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))case Failure(e) =>jobScheduler.reportError("Error generating jobs for time " + time, e)}eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))}
def generateJobs(time: Time): Seq[Job] = {logDebug("Generating jobs for time " + time)val jobs = this.synchronized {outputStreams.flatMap { outputStream =>val jobOption = outputStream.generateJob(time)jobOption.foreach(_.setCallSite(outputStream.creationSite))jobOption}}logDebug("Generated " + jobs.length + " jobs for time " + time)jobs}
def submitJobSet(jobSet: JobSet) {if (jobSet.jobs.isEmpty) {logInfo("No jobs added for time " + jobSet.time)} else {listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))jobSets.put(jobSet.time, jobSet)jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))logInfo("Added jobs for time " + jobSet.time)}}
6.Spark streaming技术内幕 : Job动态生成原理与源码解析
标签:
原文地址:http://www.cnblogs.com/zhouyf/p/5503682.html