码迷,mamicode.com
首页 > 其他好文 > 详细

Flink 操作示例 —— 输入与输出

时间:2020-04-08 10:03:08      阅读:78      评论:0      收藏:0      [点我收藏+]

标签:ring   parallel   style   reading   val   conf   extends   over   mes   

输入

实现 SourceFunction[...]

object SourceFunctionExample {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val numbers = env.addSource(new CountSource)
    numbers.print()
    env.execute()
  }
}

class CountSource extends SourceFunction[Long] {

  var isRunning: Boolean = true

  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {

    var cnt: Long = -1
    while (isRunning && cnt < Long.MaxValue) {
      cnt += 1
      ctx.collect(cnt)
    }
  }

  override def cancel(): Unit = isRunning = false
}

 

输出

实现 RichSinkFunction[...]

object SinkFunctionExample {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.getConfig.setAutoWatermarkInterval(1000L)

    val readings = env
      .addSource(new SensorSource)
      .assignTimestampsAndWatermarks(new SensorTimeAssigner)

    readings.addSink(new SimpleSocketSink("localhost", 9191))
      .setParallelism(1)

    env.execute()
  }
}

class SimpleSocketSink(val host: String, val port: Int) extends RichSinkFunction[SensorReading] {

  var socket: Socket = _
  var writer: PrintStream = _

  override def open(parameters: Configuration): Unit = {
    socket = new Socket(host, port)
    writer = new PrintStream(socket.getOutputStream)
  }

  override def close(): Unit = {
    writer.close()
    socket.close()
  }

  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    writer.println(value.toString)
    writer.flush()
  }
}

 

233

Flink 操作示例 —— 输入与输出

标签:ring   parallel   style   reading   val   conf   extends   over   mes   

原文地址:https://www.cnblogs.com/lemos/p/12657673.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!