码迷,mamicode.com
首页 > 其他好文 > 详细

flink下沉数据到kafka

时间:2021-06-25 16:39:11      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:rod   path   source   res   ide   env   apach   def   textfile   

import it.bigdata.flink.study.SensorReding
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer


object KafkaSink {
  def main(args: Array[String]): Unit = {
    //创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //读取数据
    val inputPath="D:\\ideaDemo\\maven_flink\\src\\main\\resources\\sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    //简单转换
    val dataStream = inputStream.map(data => {
      var arr = data.split(",")
      SensorReding(arr(0), arr(1).toLong, arr(1).toDouble).toString
    })

//    dataStream.print()
    dataStream.addSink(new FlinkKafkaProducer[String](
      "192.168.0.20:9092",
      "flink_sink_test",
      new SimpleStringSchema()
       )
    )



    env.execute("Kafka sink test")
  }
}

 

flink下沉数据到kafka

标签:rod   path   source   res   ide   env   apach   def   textfile   

原文地址:https://www.cnblogs.com/gzgBlog/p/14928289.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!