码迷,mamicode.com
首页 > 其他好文 > 详细

flink 自定义ProcessFunction方法

时间:2021-06-25 16:38:37      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:timer   err   out   sso   object   sele   parameter   删除   script   

import it.bigdata.flink.study.SensorReding
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object ProcessFunctionTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputStream = env.socketTextStream("10.18.35.155", 777)

    val dataStream = inputStream.map(data => {
      val agg = data.split(",")
      SensorReding(agg(0), agg(1).toLong, agg(2).toDouble)
    })
//      .keyBy(_.id)
//      .process(new MykeyedProcessFunction)

    val warningSteam = dataStream
      .keyBy(_.id)
      .process(new TempIncreWaring(10000L))
    warningSteam.print()

    env.execute("process function test")
  }
}


class TempIncreWaring(interval: Long) extends KeyedProcessFunction[String,SensorReding,String]{
  //定义状态:保存上一个温度值进行比较,保存注册定时器的时间戳用于删除
  lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp", classOf[Double]))
  lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time-ts", classOf[Long]))


  override def processElement(value: SensorReding, ctx: KeyedProcessFunction[String, SensorReding, String]#Context, out: Collector[String]): Unit = {
    //先取出状态
    val lastTemp = lastTempState.value()
    val timerTs = timerTsState.value()

    //更新温度
    lastTempState.update(value.temperature)

    //当前温度值和上次温度进行比较
    if(value.temperature>lastTemp && timerTs == 0){
      //如果温度上升,且没有定时器,那么注册当前数据时间10s之后的定时器
      val ts = ctx.timerService().currentProcessingTime() + interval
      ctx.timerService().registerProcessingTimeTimer(ts) //设置10s之后的定时器
      timerTsState.update(ts)  //更新状态
    }else if (value.temperature<lastTemp){
      //如果温度下降,那么删除定时器
      ctx.timerService().deleteProcessingTimeTimer(timerTs)
      timerTsState.clear()   //清空状态
    }
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReding, String]#OnTimerContext, out: Collector[String]): Unit = {
    out.collect("传感器"+ctx.getCurrentKey+"的温度,连续"+interval/1000+"秒,连续上升")
    timerTsState.clear()
  }
}


//keyedProcessFunction功能测试
class MykeyedProcessFunction extends KeyedProcessFunction[String,SensorReding,String]{
   var myState: ValueState[Int]= _

  //生命周期
  override def open(parameters: Configuration): Unit = {
     myState = getRuntimeContext.getState(new ValueStateDescriptor[Int]("mystate", classOf[Int]))
  }

  override def processElement(value: SensorReding, ctx: KeyedProcessFunction[String, SensorReding, String]#Context, out: Collector[String]): Unit ={
    ctx.getCurrentKey   //当前key
    ctx.timestamp()   //当前时间戳
//    ctx.output()            //侧输出流
    ctx.timerService().currentWatermark()  //当前事件事件
    ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 60000L)   //注册定时器(当前事件之后60秒的定时器)
    ctx.timerService().deleteEventTimeTimer(ctx.timestamp() + 60000L)  //删除定时器

  }

  //定时器之后需要做的操作
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReding, String]#OnTimerContext, out: Collector[String]): Unit = super.onTimer(timestamp, ctx, out)
}

数据

sensor_1,1547718101,35.8
sensor_1,1547718102,22.2
sensor_1,1547718101,55.3
sensor_1,1547718102,24.1
sensor_1,1547718103,57
sensor_1,1547718103,58
sensor_1,1547718103,59
sensor_6,1547718101,15.4
sensor_7,1547718102,6.7
sensor_10,1547718205,38.1

  

flink 自定义ProcessFunction方法

标签:timer   err   out   sso   object   sele   parameter   删除   script   

原文地址:https://www.cnblogs.com/gzgBlog/p/14928295.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!