标签:local env window link adt count() imp code 运行
1.运行环境
有一些三种方式获取当前环境
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment
2.批处理
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("file:///o.txt");
text.print()
3.map操作
text.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
4.table操作
import org.apache.flink.table.api.* import static org.apache.flink.table.api.Expressions.* // environment configuration ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); // register Orders table in table environment // ... // specify table program Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime) Table counts = orders .groupBy($("a")) .select($("a"), $("b").count().as("cnt")); // conversion to DataSet DataSet<Row> result = tEnv.toDataSet(counts, Row.class); result.print();
Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime)
Table result = orders
.filter(
and(
$("a").isNotNull(),
$("b").isNotNull(),
$("c").isNotNull()
))
.select($("a").lowerCase().as("a"), $("b"), $("rowtime"))
.window(Tumble.over(lit(1).hours()).on($("rowtime")).as("hourlyWindow"))
.groupBy($("hourlyWindow"), $("a"))
.select($("a"), $("hourlyWindow").end().as("hour"), $("b").avg().as("avgBillingAmount"));
5.batch和stream
Table orders = tableEnv.from("Orders");
// Distinct aggregation on group by
Table groupByDistinctResult = orders
.groupBy($("a"))
.select($("a"), $("b").sum().distinct().as("d"));
// Distinct aggregation on time window group by
Table groupByWindowDistinctResult = orders
.window(Tumble
.over(lit(5).minutes()))
.on($("rowtime"))
.as("w")
)
.groupBy($("a"), $("w"))
.select($("a"), $("b").sum().distinct().as("d"));
// Distinct aggregation on over window
Table result = orders
.window(Over
.partitionBy($("a"))
.orderBy($("rowtime"))
.preceding(UNBOUNDED_RANGE)
.as("w"))
.select(
$("a"), $("b").avg().distinct().over($("w")),
$("b").max().over($("w")),
$("b").min().over($("w"))
);
标签:local env window link adt count() imp code 运行
原文地址:https://www.cnblogs.com/yangyang12138/p/13352682.html