码迷,mamicode.com
首页 > 其他好文 > 详细

深入理解Spark Streaming

时间:2020-07-20 13:13:18      阅读:64      评论:0      收藏:0      [点我收藏+]

标签:dep   unit   gis   length   依赖   play   new   方法   list   

一.DStream的两类操作

  DStream内部其实是RDD序列,所有的DStream操作最终都转换为RDD操作。通过分析源码,可以进一步窥探这种转换是如何进行的。

  DStream有一些与RDD类似的基础属性:

  • 依赖的其它DStream列表。
  • 生成RDD的时间间隔。
  • 一个名为compute的计算函数,用于生成RDD,类似于RDD的compute。

  DStream的操作分为两类,一类是Transformation操作,对应RDD的Transformation操作。以flatMap为例,DStream中的flatMap不过是返回一个新的DStream派生类FlatMappedDStream,这一点跟RDD的flatMap非常类似。DStream的flatMap定义如下:

  /**
   * Return a new DStream by applying a function to all elements of this DStream,
   * and then flattening the results
   */
  def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
  }

  而FlatMappedDStream的实现也很简单,主要作用是像RDD一样维护计算关系链,完整定义如下:

private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](
    parent: DStream[T],
    flatMapFunc: T => TraversableOnce[U]
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
  }
}

  其中compute调用DStream的getOrCompute方法用于读取RDD的内存,要么放到缓存中,要么调用接口函数compute计算生成。

  DStream另外一类操作是OutPut操作,Output操作才会触发DStream的实际执行,作用非常类似于RDD的Action操作,类如print操作,定义如下:

 /**
   * Print the first ten elements of each RDD generated in this DStream. This is an output
   * operator, so this DStream will be registered as an output stream and there materialized.
   */
  def print(): Unit = ssc.withScope {
    print(10)
  }

  /**
   * Print the first num elements of each RDD generated in this DStream. This is an output
   * operator, so this DStream will be registered as an output stream and there materialized.
   */
  def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

  DStream.print调用了RDD.take方法,而后者是一个Action操作,是不是所有的DStream输出操作最后都调用一个RDD的Action操作呢,看看saveAsTextFiles和saveAsObjectFiles,它们没有直接调用RDD Action操作【而是先调用一下rdd.saveAsTextFile】,然后通过foreachRDD来实现的,传入的函数中调用了RDD的Action。saveAsTextFiles的定义如下:

  /**
   * Save each RDD in this DStream as at text file, using string representation
   * of elements. The file name at each batch interval is generated based on
   * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
   */
  def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
    val saveFunc = (rdd: RDD[T], time: Time) => {
      val file = rddToFileName(prefix, suffix, time)
      rdd.saveAsTextFile(file)
    }
    this.foreachRDD(saveFunc, displayInnerRDDOps = false)
  }

  相比之下,另外一个最灵活的Output操作foreachRDD完全依赖传入的函数来实现功能,所以对于foreachRDD的使用至少要包含一个RDD Action调用。因为Spark Streaming的调度是由Output方法触发的,每个周期调用一次所有定义的Output方法。Output内部再调用RDD Action最终完成计算,否则程序只会接收数据,然后丢弃,不执行计算。

深入理解Spark Streaming

标签:dep   unit   gis   length   依赖   play   new   方法   list   

原文地址:https://www.cnblogs.com/yszd/p/13344251.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!