数据分析应该分别起到助力(Empower)、优化(Optimize)、创新(Innovate)的三大作用。
Apache Flink
Table API
面向流处理对应DataStream API,
面向批处理对应DataSet API
I、 Flink组件
3.封装的计算框架
基于DataSet: 01.MRQL 类似于Spark SQL;02.Flink ML、04.Table API
03.图计算Spargel(基础)和Gelly(库) 类似于Spark中的 GraphX
基于DataStream: Table API、 Complex Event Processing (CEP API):复杂事件处理
2. API层:面向无界Stream 的流处理层DataStream以及面向Batch的批处理层API-DataSet
1.核心层:Runtime层--支持Flink计算的全部核心计算
0.部署层: 部署模式-本地模式-集群模式-云模式
3.说明:目前历史:API的层级
01.Stateful streaming-- The ProcessFunctions
02. Core API :Flink有2套基础的API,The DataStream API
一套是面向流处理的DataStream,一套是面向批处理的DataSet。
03.Table API and SQL.
最新:01.统一 API Stack
让开发者在不同的节点,不同的边上定义不同的属性,来规划数据是流属性还是批属性
02.统一 SQL方案
II、流程
Client、 JobGraph、JobManager 、TaskManager
TaskManager 负责执行的计算的Worker Task Slot
III、Flink开发模型:
基础构建模块是 streams 以及 transformation
输入--source
输出--sink Hbase、Redis -- addSink
时间窗口:滚动窗口、 滑动窗口、会话窗口
transformation的算子
DataSet API
map flatMap MapPartition Filter
keyBy timeWindow
reduce fold
DataStream API
Blink
I、Flink SQL 作为核心的API。传统 SQL 是为传统批处理设计的,不是为流处理设计的,
1.Flink SQL 的核心概念:流与表的二象性(duality)
动态表(Dynamic Table) -- 流 Stream。 流可以看做动态表,动态表可以看做流
一个流可以看做对表的一系列更新操作(changelog)
依据动态表--在流上定义 SQL- 流式 SQL 可以想象成连续查询(Continuous Query)
2. Flink 5种源表插件、4种维表插件、11种结果表插件
源表--流式数据存储 -- 每个实时计算子作业必须提供至少一个流式数据存储
01.源表: 数据总线(DataHub)源表、日志服务(Log Service)源表、
消息队列(MQ)源表 、消息队列(Kafka)源表、
02.结果表 --仅支持INSERT操作
数据总线(DataHub)结果表、日志服务(Log Service)结果表、
消息队列(MQ)结果表、 Kafka结果表
表格存储(Table Store)结果表、分析型数据库(AnalyticDB)结果表
云数据库(RDS/DRDS)结果表、cloudhbase结果表
高性能时间序列数据库(HiTSDB)结果表、云数据库HybridDB for MySQL结果表
03.维度表-维表是一张不断变化的表
云数据库(RDS/DRDS)维表、表格存储(Table Store)维表、cloudhbase维表
PERIOD FOR SYSTEM_TIME的声明。这行声明定义了维表的变化周期,
即表明该表是一张会变化的表。
3.Connector 是连接外部数据和blink计算框架的桥梁,也是流计算的入口和出口
中间状态-拍快照 snapshot
Watermark 是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性
WATERMARK FOR ts AS withOffset(ts, 1000)
DataHub本身是流数据存储,实时计算只能将其作为流式数据输入
II、Flink SQL中的窗口函数
支持的窗口聚合-- 窗口聚合主要是两种:window aggregate和over aggregate
窗口函数:
01.Window aggregate是Group By Window,支持两种时间类型做窗口:
Event Time和Processing Time
滚动窗口(TUMBLE)、滑动窗口(HOP)和会话窗口(SESSION)
滚动窗口(TUMBLE)将每个元素分配到一个指定大小的窗口中,窗口函数用在 GROUP BY 子句
TUMBLE_START(ts, INTERVAL '1' MINUTE),
滑动窗口(HOP),也被称作 Sliding Window。不同于滚动窗口,滑动窗口的窗口可以重叠。
滑动窗口有两个参数:size 和 slide。size 为窗口的大小,slide 为每次滑动的步长
如果 slide < size,则窗口会重叠,每个元素会被分配到多个窗口
如果 slide = size,则等同于滚动窗口(TUMBLE)
如果 slide > size,则为跳跃窗口,窗口之间不重叠且有间隙
会话窗口
会话窗口(SESSION)通过session活动来对元素进行分组 。即会话断开时,这个窗口就会关闭
02.over窗口
OVER窗口(OVER Window)是传统数据库的标准开窗,OVER Window不同于Group By Window
每一个元素都对应一个OVER Window,每一个元素都触发一次数据计算
ROWS OVER Window - 每一行元素都视为新的计算行,即每一行都是一个新的窗口,
也有Unbounded和Bounded的两种情况
RANGE OVER Window - 具有相同时间值的所有元素行视为同一计算行,即具有相同时间值的所有行都是同一个窗口。
ROWS BETWEEN 2 preceding AND CURRENT ROW
RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW
使用:
001.用户需要显式定义一个processing time列,这个定义需要在source的DDL中显式指明
示例命令: filedName as PROCTIME()
实际示例: d AS PROCTIME()
002.Event Time是用户的原始数据,用户不需要显式重新定义一个event time列,
但必须指定watermark的计算方法
III、时间说明:
Event Time和Processing Time的声明只能在源表上声明
Processing Time:系统对事件进行处理的本地系统时间
processing time是系统为流记录增加的时间属性, 是系统产生的
Event Time:用户提供的事件时间,event time一定是用户提供在Schema里的数据
通常是数据的最原始的创建时间,流记录本身携带的时间属性
IV、 Watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性,
watermark的定义是source表DDL定义的一部分
WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
实际示例:
WATERMARK FOR rowtime AS withOffset(rowtime, 4000)
Flink何时触发window?
watermark与timestamp的时间,并通过数据来看看window的触发时机
window的触发要符合以下几个条件
1、watermark时间 >= window_end_time
2、在[window_start_time,window_end_time)中有数据存在
或者:
1、Event Time < watermark时间(对于late element太多的数据而言)
V、 Flink如何处理乱序
watermark+window处理乱序
流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的
理论上数据都是按照事件产生的时间顺序来的,但
但是也不排除由于网络、背压等原因,导致乱序的产生延迟的事件
有个机制来保证一个特定的时间后,必须触发window去进行计算
接收到watermark数据的operator以此不断调整自己管理的window event time clock
首先,eventTime计算意味着flink必须有一个地方用于抽取每条消息中自带的时间戳
其次,在数据进入window前,需要有一个Watermarker生成当前的event time对应的水位线
flink支持两种后置的Watermarker:Periodic和Punctuated,
With Periodic Watermarks
是定期产生watermark(即使没有消息产生), Watermark需要实现接口为Watermark getCurrentWatermark()
可以定义一个最大允许乱序的时间,这种情况应用较多
watermark、Event Time和window的关系
标量函数(Scalar-Valued Function)和表值函数(Table-Valued Function)。
其中表值函数又分为Inline table-valued functions和Multistatement table-valued functions。
应用案例
案例1、
-- udf str.length()
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
-- udtf str.split("\\|");
CREATE FUNCTION splitUdtf AS 'com.hjc.test.blink.sql.udx.SplitUdtf';
-- udaf 计算count
CREATE FUNCTION countUdaf AS 'com.hjc.test.blink.sql.udx.CountUdaf';
create table sls_stream( a int, b bigint, c varchar) with ( type='sls', endPoint='xxxxxxx', accessKeyId='xxxxxxx', accessKeySecret='yF6xxxxxxxx',
startTime = '2017-07-04 00:00:00', project='ali-cloud-streamtest', logStore='stream-test2', consumerGroup='consumerGroupTest3' );
create table rds_output( len1 bigint, len2 bigint ) with ( type='rds', url='jdbc:mysql:xxxxxxxx',tableName='xxxxx', userName='xxxxx', password='xxxxx' );
insert into rds_output select count(a), countUdaf(a) from sls_stream
无限流的双流 JOIN 和带窗口的双流 JOIN
窗口函数
支持滚动窗口(Tumble)、滑动窗口(Hop)、会话窗口(Session)以及
传统数据库中的OVER窗口
Retraction 撤回机制、 checkpoint 机制、failover 的策略
流处理会不断产生结果而不会结束,批处理往往只返回一个最终结果并且结束
流计算需要做checkpoint并保留状态,这样在failover的时候能够快速续跑
流数据处理是对最终结果的一个提前观测,往往需要把提前计算的结果撤回(Retraction)做更改而批计算则不会
create table kafka_stream(
messageKey VARBINARY,
`message` VARBINARY, topic varchar, `partition` int, `offset` bigint ) with (
type ='kafka010', topic = 'xxx', `group.id` = 'xxx', bootstrap.servers = 'ip:端口,ip:端口,ip:端口' );
案例2、flink中UDF开发
开发过程:
依赖:org.apache.flink
flink-core-blink-2.0
flink-streaming-java_2.11
flink-table_2.11
介绍:
01.标量函数(UDF)将0个、1个或多个标量值映射到一个新的标量值。
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
需要继承类 extends ScalarFunction
@Override eval
@Override open方法和 close方法可以不写
02.用户自定义聚合函数(UDAF)将多条记录聚合成一条值
需要继承抽象类 extends AggregateFunction<Long, CountUdaf.CountAccum>
import org.apache.flink.table.functions.AggregateFunction;
001.createAccumulator、getValue 和 accumulate三个方法一起使用,就能设计出一个最基本的UDAF
createAccumulator和getValue的输入输出是确定的,可以定义在AggregateFunction抽象类内。
除了这两个方法,要设计一个最基本的
UDAF必须要有一个accumulate方法-- 用户需要实现一个accumulate方法,
来描述如何计算用户的输入的数据,并更新到accumulator中
002.实时计算有一些特殊的场景需要用户提供retract和merge两个方法才能完成
03.用户定义的表函数(UDTF)将0个、1个或多个标量值作为输入参数
返回的行可以由一个或多个列组成,UDTF返回多列,只需要将返回值声明成Tuple或Row即可
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
extends TableFunction<String>
extends TableFunction<Tuple3<String, Long, Integer>>
注意:
UDTF 支持cross join 和 left join,
在使用UDTF时需要带上 LATERAL和TABLE两个关键字
LEFT JOIN UDTF 语句后面必须接 ON TRUE参数
参考:
UDX概述 https://help.aliyun.com/document_detail/69463.html?spm=a2c4g.11186623.4.3.64ce40fcxbzHYx
窗口函数概述 https://help.aliyun.com/document_detail/62510.html?spm=a2c4g.11186631.6.638.7ade34ecmkR5xJ
Flink流计算编程--watermark(水位线)简介 https://blog.csdn.net/lmalds/article/details/52704170
实时计算 Flink SQL 核心功能解密 https://yq.aliyun.com/articles/457438
创建消息队列(Kafka)源表 https://help.aliyun.com/knowledge_detail/86824.html