码迷,mamicode.com
首页 > 其他好文 > 详细

flink-demo2

时间:2021-06-29 15:38:43      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:res   bridge   one   blink   connect   set   iris   pac   pat   

package cn.irisz.steam

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, TableResult}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Demo2 {
  def main(args: Array[String]): Unit = {
    // 1. env
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)
    env.setParallelism(1)
    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tEnv = StreamTableEnvironment.create(env, settings)

    // 2. source
    // val fileSource: DataStream[String] = env.readTextFile("data/aceess.log_20200914.csv")
    tEnv.executeSql(
      """
        |CREATE TABLE log (
        |   `id` Int,
        |   `i_city` String,
        |   `i_country` String,
        |   `i_isp` String,
        |   `i_province` String,
        |   `ip` String,
        |   `length` BigInt,
        |   `method` String,
        |   `referer` String,
        |   `status_code` Int,
        |   `t_hour` Int,
        |   `t_minute` Int,
        |   `t` TIMESTAMP,
        |   `ua` String,
        |   `url` String,
        |   `url_param` String,
        |   `url_path` String,
        |   `version` String,
        |   `xff` String
        |)WITH (
        |   ‘connector‘ = ‘filesystem‘,
        |   ‘path‘ = ‘data/aceess.log_20200914.csv‘,
        |   ‘format‘ = ‘csv‘
        |)
        |""".stripMargin)

    tEnv.executeSql(
      """
        |CREATE TABLE `result` (
        |   `t_hour` Int,
        |   `t_minute` Int,
        |   `cnt` BigInt
        |) WITH (
        |   ‘connector‘ = ‘print‘
        |)
        |""".stripMargin)

    // 3. transfer

    // 4. sink
//    logStream.print()
val result: TableResult = tEnv.sqlQuery(
  """
    |   SELECT t_hour, t_minute, COUNT(1) AS cnt
    |   FROM log
    |   WHERE status_code = 200
    |   GROUP BY t_hour, t_minute
    |""".stripMargin).execute()

    result.print()

    // 5. execute
    env.execute("calc log count for minute and hour").wait()
//    tEnv.execute("calc log count for minute and hour")
  }
}


flink-demo2

标签:res   bridge   one   blink   connect   set   iris   pac   pat   

原文地址:https://www.cnblogs.com/zpzhue/p/14948086.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!