码迷,mamicode.com
首页 > 数据库 > 详细

kafka -> structuredStreaming读取kafka日志 ->自定义输出到mysql

时间:2020-06-24 16:23:50      阅读:132      评论:0      收藏:0      [点我收藏+]

标签:connect   row   tst   ast   data   mod   write   owa   main   

package test
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
import java.sql.{Connection, DriverManager}
/** *
  *
  * @autor gaowei
  * @Date 2020-04-13 17:59 
  */
object kafkaToMysqlTest {
  class MysqlSink(url: String, user: String, pwd: String) extends ForeachWriter[Row] {
    var conn: Connection = _


    override def open(partitionId: Long, epochId: Long): Boolean = {
      Class.forName("com.mysql.jdbc.Driver")
      conn = DriverManager.getConnection(url, user, pwd)
      true
    }


    override def process(value: Row): Unit = {
      val p = conn.prepareStatement("replace into test(pid,pv) values(?,?)")
      p.setString(1, value(0).toString)
      p.setLong(2, value(1).toString.toLong)
      p.execute()
    }


    override def close(errorOrNull: Throwable): Unit = {
      conn.close()
    }
  }

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("kafkaToMysqlTest").getOrCreate()
    val brokers = "localhost:9092"
    val topics = "test1"
    val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", topics).load()
    import spark.implicits._
    val kafkaDf = df.selectExpr("CAST(value AS STRING)").as[String]
    val dataFrame = kafkaDf.groupBy("value").count().
       toDF("pid","pv")


    //todo 将数据写到MYSQL
    val mysqlSink = new MysqlSink("jdbc:mysql://localhost:3306/warehouse", "root", "410410410")

    val query = dataFrame.writeStream.outputMode("complete").foreach(mysqlSink).start()

    query.awaitTermination()
  }

}

 

kafka -> structuredStreaming读取kafka日志 ->自定义输出到mysql

标签:connect   row   tst   ast   data   mod   write   owa   main   

原文地址:https://www.cnblogs.com/kwzblog/p/13187728.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!