码迷,mamicode.com
首页 > 其他好文 > 详细

SparkStreamingTest.scala

时间:2015-09-09 16:09:16      阅读:144      评论:0      收藏:0      [点我收藏+]

标签:

/**
 * Created by root on 9/8/15.
 */
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.sql.SQLContext

object SparkStreamingTest {
  def main(args: Array[String]) {
    //create a local StreamingContext with two working thread and batch interval of 1 second.
    val conf = new SparkConf().setAppName("Spark streaming test").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))
    //val sc = ssc.sparkContext
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))

    //word count
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts.print()

    //convert spark streaming to sparksql
    words.foreachRDD((rdd: RDD[String], time: Time) => {
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._
      val wordsDataFrame = rdd.map(w => Record(w)).toDF()
      wordsDataFrame.registerTempTable("words")
      val wordsCountDataFrame = sqlContext.sql("select word, count(*) as total from words group by word")
      println(s"======================= $time =======================")
      wordsCountDataFrame.show()
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

case class Record(word: String)

object SQLContextSingleton {
  @transient private var instance: SQLContext = _
  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

 

SparkStreamingTest.scala

标签:

原文地址:http://www.cnblogs.com/sunflower627/p/4794675.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!