码迷,mamicode.com
首页 > 其他好文 > 详细

4、flink自定义source、sink

时间:2020-04-24 21:42:02      阅读:85      评论:0      收藏:0      [点我收藏+]

标签:bigdata   contex   bsp   color   spark   output   算子   sock   open   

一、Source

代码地址:https://gitee.com/nltxwz_xxd/abc_bigdata

1.1、flink内置数据源

1、基于文件

env.readTextFile("file://path")
env.readFile(inputFormat, "file://path");

2、基于socket数据源

env.socketTextStream("localhost", 6666, ‘\n‘)

3. 基于Collection 

import org.apache.flink.api.scala._
env.fromCollection(List(1,2,3))
env.fromElements(1,2,3)
env.generateSequence(0, 1000) 

1.2、自定义数据源

1、实现SourceFunction

SourceFunction 是非并行的,所以不能指定并行度,即不能用setParallelism(num) 算子;

SocketTextStreamFunction就是实现的SourceFunction ,源码中也有详细的用例;

技术图片

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
// 需要加上这一行隐式转换 否则在调用flatmap方法的时候会报错
import org.apache.flink.api.scala._

// SourceFunction 是非并行的,所以不能指定并行度 即 不能 用 setParallelism(num) 算子
class MySourceFunction extends SourceFunction[String]{
  var num: Long = 0
  var isCancel: Boolean = true

   //在cancel的时候被执行,传递变量用于控制run方法中的执行
  override def cancel(): Unit = {
    println("cancel")
    isCancel = false
  }

  // 调用run 方法向下游产生数据
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    while (isCancel){
      ctx.collect(s"xxd\t${num}")
      Thread.sleep(1000)
      num += 1
    }
  }

}

object SourceFunctionWordCount{
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    var conf: Configuration = new Configuration()
    // 开启spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否则打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath)
    // 配置有多少个solor
    conf.setString("taskmanager.numberOfTaskSlots","3")
    // 获取本地运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 定义数据源
    val sourceDataStream: DataStream[String] = env.addSource(new MySourceFunction)
    // 增加 setParallelism就会报错
    // val sourceDataStream: DataStream[String] = env.addSource(new MySourceFunction).setParallelism(2)
    // 定义 operators,作用是解析数据,分组,窗口化,并且聚合就SUM
    val wordCountData: DataStream[(String, Int)] = sourceDataStream.flatMap(new FlatMapFunction[String, (String, Int)] {
      override def flatMap(value: String, out: Collector[(String, Int)]): Unit = {
        val strings: Array[String] = value.split(" ")
        for (f <- strings) {
          out.collect((f, 1))
        }
      }
    }).setParallelism(2).keyBy(_._1).sum(1).setParallelism(2)
    // 定义sink打印输出
    wordCountData.print().setParallelism(2)
    // 打印任务执行计划
    println(env.getExecutionPlan)
    // 运行
    env.execute("Socket Window WordCount")

  }
}

2、实现ParallelSourceFunction

ParallelSourceFunction是并行化的source所以能指定并行度

技术图片

 

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
//ParallelSourceFunction是并行化的source所以能指定并行度
class MyParallelSource extends ParallelSourceFunction[String] {
  var num = 0
  var isCancel = true

  override def cancel(): Unit = {
    println("cancel")
    isCancel = false
  }

  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    while (isCancel) {
      ctx.collect(s"xxd\t${num}")
      Thread.sleep(1000)
      num += 1
    }
  }
}

object ParallelSourceWordCount {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    var conf: Configuration = new Configuration()
    // 开启spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否则打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
    // 配置有多少个solor
    conf.setString("taskmanager.numberOfTaskSlots", "8")
    // 获取本地运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 定义数据源
    val sourceDataStream: DataStream[String] = env.addSource(new MyParallelSource).setParallelism(4)
    // 定义 operators,作用是解析数据,分组,窗口化,并且聚合就SUM
    val wordCountData: DataStream[(String, Int)] = sourceDataStream.flatMap(new FlatMapFunction[String, (String, Int)] {
      override def flatMap(value: String, out: Collector[(String, Int)]): Unit = {
        val strings: Array[String] = value.split(" ")
        for (f <- strings) {
          out.collect((f, 1))
        }
      }
    }).setParallelism(2).keyBy(_._1).sum(1).setParallelism(2)
    // 定义sink打印输出
    wordCountData.print().setParallelism(2)
    // 打印任务执行计划
    println(env.getExecutionPlan)
    // 运行
    env.execute("Socket Window WordCount")

  }
}

3、继承RichParallelSourceFunction

RichParallelSourceFunction不仅实现了ParallelSourceFunction,还继承了AbstractRichFunction

所以RichParallelSourceFunction不仅能够并行化,还比ParallelSourceFunction增加了open和close方法、getRuntimeContext

技术图片

 

 技术图片

 

 

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._

//RichParallelSourceFunction不但能并行化
//还比ParallelSourceFunction增加了open和close方法、getRuntimeContext
class MyRichParallelSource extends RichParallelSourceFunction[String]{
  var num = 0
  var isCancel = true
  //初始化 在source开启的时候执行一次,比如可以在这里开启mysql的连接
  override def open(parameters: Configuration): Unit = {
    println("open")
    num = 100
  }
  //在source关闭的时候执行一次
  //比如mysql连接用完了,给还回连接池
  override def close(): Unit = {
    while (isMysql){
      Thread.sleep(1000)
      println("close sleep")
    }
    println("close")
    num = 0
  }

  //在输出的时候被执行,传递变量用于控制run方法中的执行
  //这个是被手动触发,在执行完cancel之后,会再执行close
  override def cancel(): Unit = {
    println("cancel")
    isCancel = false
  }

  //调用run方法向下游产生数据
  //手动cancel之后,不会等待run方法中处理结束而是强制执行close方法
  //这样就可能导致run方法中正在使用的连接被close了
  //所以此时需要加一个处理完成标识,用于判断是否可以进行close
  var isMysql = false
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    import scala.util.control.Breaks._
    breakable{
      while (isCancel){
        println(getRuntimeContext.getIndexOfThisSubtask) // 获取执行的taskid
        ctx.collect(s"xxd\t${num}")
        Thread.sleep(2000)
        num += 1
        if (num > 1200){
          break()
        }
      }
    }
    isMysql = true
  }

}

object RichParallelSourceWordCount{
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    var conf: Configuration = new Configuration()
    // 开启spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否则打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath)
    // 配置有多少个solor
    conf.setString("taskmanager.numberOfTaskSlots","8")
    // 获取本地运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 定义数据源
    val sourceDataStream: DataStream[String] = env.addSource(new MyRichParallelSource).setParallelism(4)
    // 定义 operators,作用是解析数据,分组,窗口化,并且聚合就SUM
    val wordCountData: DataStream[(String, Int)] = sourceDataStream.flatMap(new FlatMapFunction[String, (String, Int)] {
      override def flatMap(value: String, out: Collector[(String, Int)]): Unit = {
        val strings: Array[String] = value.split(" ")
        for (f <- strings) {
          out.collect((f, 1))
        }
      }
    }).setParallelism(2).keyBy(_._1).sum(1).setParallelism(2)
    // 定义sink打印输出
    wordCountData.slotSharingGroup("xxd").print().setParallelism(2)
    // 打印任务执行计划
    println(env.getExecutionPlan)
    // 运行
    env.execute("Socket Window WordCount")

  }
}

二、sink

2.1、内置数据输出源

1、基于文件

#使用TextOutputFormat
stream.writeAsText("/path/to/file")
#使用CsvOutputFormat
stream.writeAsCsv("/path/to/file")

2、基于socket

stream.writeToSocket(host, port, SerializationSchema)

3、基于标准/错误输出

#注: 线上应用杜绝使用,采用抽样打印或者日志的方式
stream.print()
stream.printToErr()

2.2、自定义输出源

1、实现SinkFunction

import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

class MySinkFunction extends SinkFunction[(String, Int)] {
  override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
    println(s"value:${value}," +
      s"processTime:${context.currentProcessingTime()}," +
      s"waterMark:${context.currentWatermark()}")
  }
}


object SinkFunctionWordCount {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    var conf: Configuration = new Configuration()
    // 开启spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否则打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
    // 配置有多少个solor
    conf.setString("taskmanager.numberOfTaskSlots", "8")
    // 获取本地运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 定义数据源
    val input = env.fromElements("xxd xxd xxd")
    val output: DataStream[(String, Int)] = input.flatMap(f => f.split(" ")).map((_, 1))
    // 使用自定义的sink
    output.addSink(new MySinkFunction)
    env.execute()

  }
}

2、继承RichSinkFunction

package com.xxd.flink.sink
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

class MyRichSinkFunction extends RichSinkFunction[(String, Int)]{
  //在Sink开启的时候执行一次,比如可以在这里开启mysql的连接
  override def open(parameters: Configuration): Unit = {
    println("open")
  }

  //在Sink关闭的时候执行一次
  //比如mysql连接用完了,给还回连接池
  override def close(): Unit = {
    println("close")
  }

  //调用invoke方法,执行数据的输出
  override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
    //在rich方法中可以使用getRuntimeContext方法得到比如广播变量和累加?
    //getRuntimeContext.getBroadcastVariable("")
    println(s"value:${value}," +
      s"processTime:${context.currentProcessingTime()}," +
      s"waterMark:${context.currentWatermark()}")
  }
}

object RichSinkFunctionWordCount {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    var conf: Configuration = new Configuration()
    // 开启spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否则打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath)
    // 配置有多少个solor
    conf.setString("taskmanager.numberOfTaskSlots","8")
    // 获取本地运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 定义数据源
    val input = env.fromElements("xxd xxd xxd")
    val output: DataStream[(String, Int)] = input.flatMap(f => f.split(" ")).map((_, 1))
      // 使用自定义的sink
      output.addSink(new MyRichSinkFunction)
    env.execute()
  }
}

3、使用自定义OutputFormat,然后使用stream.writeUsingOutputFormat("自定义outputFormat") 

 

import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

class MyOutPutFormat extends OutputFormat[(String, Int)]{
  //配置outputformat
  override def configure(parameters: Configuration): Unit = {
    println("configure")
  }
  //在Sink开启的时候执行一次,比如可以在这里开启mysql的连接
  override def open(taskNumber: Int, numTasks: Int): Unit = {
    //taskNumber第几个tak,numTasks总任务数
    println(s"taskNumber:${taskNumber},numTasks:${numTasks}")
  }
  //调用writeRecord方法,执行数据的输出
  override def writeRecord(record: (String,Int)): Unit = {
    println(record)
  }
  //在Sink关闭的时候执行一次
  //比如mysql连接用完了,给还回连接池
  override def close(): Unit = {
    println("close")
  }

}

object OutputFormatWordCount {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    var conf: Configuration = new Configuration()
    // 开启spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否则打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath)
    // 配置有多少个solor
    conf.setString("taskmanager.numberOfTaskSlots","8")
    // 获取本地运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 定义数据源
    val input = env.fromElements("xxd xxd xxd")
    val output: DataStream[(String, Int)] = input.flatMap(f => f.split(" ")).map((_,1))
    //使用自定义的outputFormat
    output.writeUsingOutputFormat(new MyOutPutFormat)
    env.execute()
  }
}

 

4、flink自定义source、sink

标签:bigdata   contex   bsp   color   spark   output   算子   sock   open   

原文地址:https://www.cnblogs.com/xiexiandong/p/12770187.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!