序言 基于最新的v1.14.4 梳理下批流统一的用法cuiyaonan2000@163.com
官方的文档看的头晕,要把所有的都穿起来还是有难度.先基于其它码友的经验文章,在去看官网我觉得效果更好.
批流统一总的来说,使用上层的API以操作传统关系型数据库表的方式来进行计算
参考资料:
- 概览 | Apache Flink
- DataStream API Integration | Apache Flink
- 流式概念 | Apache Flink
- 概览 | Apache Flink--- 官方默认提供的source 和 sink
- User-defined Sources & Sinks | Apache Flink----自定义source和sink的方法
Planner的一些信息 Blink将批处理作业,视为流式处理的特殊情况 。所以,blink不支持表和DataSet之间的转换,批处理作业将不转换为DataSet应用程序,而是跟流处理一样,转换为DataStream程序来处理 。
因为批流统一,Blink planner也不支持BatchTableSource,而使用有界的StreamTableSource代替.
由上知道:
- Blink不支持 表 和 DataSet之间的转换,即批流统一不支持DataSet使用
- 批流统一处理有界流和无界流,最终都是转换成DataStream的Api进行处理
- 有界流使用StreamTableSource,无界流使用TableSource 作为批流统一的原始数据源.
创建表环境 创建表环境最简单的方式,就是基于流处理执行环境,调create方法直接创建:
//创建流式环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//创建表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); TableEnvironment 是 Table API 和 SQL 的核心概念 。它负责:- 在内部的 catalog 中注册 Table
- 注册外部的 catalog
- 加载可插拔模块
- 执行 SQL 查询
- 注册自定义函数 (scalar、table 或 aggregation)
- 将 DataStream 或 DataSet 转换成 Table----由此可见批流的操作对象是Table
- 持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
设置表环境环境变量 可以看到在创建表环境StreamTableEnvironment 时,可以设置表环境的配置信息
当前版本还是不稳定,之前很多可用的配置都被取消了,官方能别这么着急发版本么cuiyaonan2000@163.com
如下是创建环境信息的方法
表 TableEnvironment可以注册目录Catalog,并可以基于Catalog注册表. TableEnvironment会维护一个Catalog-Table表之间的map.
命名空间 表(Table)是由一个“标识符”来指定的,由3部分组成(如果没有指定目录或数据库,就使用当前的默认值):
- Catalog名---- 如果连接进入同一个catalog,数据库则可以共享该命名空间下的所有表cuiyaonan2000@163.com
- 数据库(database)名
- 对象名(表名)
表类型
- 常规的(Table,表): 一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream转换而来
- 虚拟的(View,视图): 可以从现有的表中创建,通常是table API或者SQL查询的一个结果
常规表 如上可知常规表,一般都是外部数据.来源可以是文件,kafka,mysql,mongodb,hive等
Flink的改变很大,很多历史博客的连接外部资源使用的是tableEnv.connect()这个方法,但是在最新的版本中该方法已经消失了. 参照官网的连接都是使用的sql方式.即以属性添加外部资源信息的方式,使用sql来注册外部资源cuiyaonan2000@163.com
表连接器&表格式 Filink提供了2个概念以连接外部资源(注入Kafka,Hive,Mongodb等),
- 表连接器(table connector): 表连接即通过类Sql的形式连接外部资源.
- 表格式(table format): 表格式是一种存储格式,定义了如何将外部资源的信息映射到Flink内部自定义的格式上.
表格式 Flink 提供了一套与表连接器(table connector)一起使用的表格式(table format) 。表格式是一种存储格式,定义了如何把二进制数据映射到表的列上
具体的使用其实也比较简单,但是可能对于使用SQL这种方式连接Kafka,Mysql的不习惯,第一次看比较陌生.
如下所示表格式的配置,在表连接配置下方
关于表格式的属性可以参考官网信息.这里紧列举Json的格式配置信息
表的查询 利用外部系统的连接器connector,我们可以读写数据,并在环境的Catalog中注册表 。接下来就可以对表做查询转换了 。
Flink给我们提供了两种查询方式:Table API和 SQL 。
Table API的调用 Table API是集成在Scala和Java语言内的查询API 。与SQL不同,Table API的查询不会用字符串表示,而是在宿主语言中一步一步调用完成的 。
Table API基于代表一张“表”的Table类,并提供一整套操作处理的方法API 。这些方法会返回一个新的Table对象,这个对象就表示对输入表应用转换操作的结果 。有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构 。例如table.select(…).filter(…),其中select(…)表示选择表中指定的字段,filter(…)表示筛选条件 。
val sensorTable: Table = tableEnv.from("inputTable")val resultTable: Table = senorTable.select("id, temperature").filter("id ='sensor_1'")SQL查询 Flink的SQL集成,基于的是ApacheCalcite,它实现了SQL标准 。在Flink中,用常规字符串来定义SQL查询语句 。SQL 查询的结果,是一个新的 Table 。
代码实现如下:
val resultSqlTable: Table = tableEnv.sqlQuery("select id, temperature from inputTable where id ='sensor_1'")
【Flink的批流统一 :Ⅳ】
- 春季老年人吃什么养肝?土豆、米饭换着吃
- 三八妇女节节日祝福分享 三八妇女节节日语录
- 老人谨慎!选好你的“第三只脚”
- 校方进行了深刻的反思 青岛一大学生坠亡校方整改校规
- 脸皮厚的人长寿!有这特征的老人最长寿
- 长寿秘诀:记住这10大妙招 100%增寿
- 春季老年人心血管病高发 3条保命要诀
- 眼睛花不花要看四十八 老年人怎样延缓老花眼
- 香槟然能防治老年痴呆症? 一天三杯它人到90不痴呆
- 老人手抖的原因 为什么老人手会抖
