0. 架构:消息传输层-流处理层
消息传输层:
Kafka 的出现,直接解决了replayable的数据框架的问题。建造和分析分布式数据处理系统的思维体系
持久化:kafka文件存储
Pulsar 是pub-sub模式的分布式消息平台:为了解决吞吐等问题,Pulsar和Kafka一样,采用了分区(Partition)的机制
持久化-Pulsar通过 BookKeeper 来存储消息,保证消息不会丢失-服务和数据是分离的
Broker:提供发布和订阅的服务(Pulsar的组件)
Bookie:提供存储能力(BookKeeper的存储组件)
Time-to-Live(TTL)
流处理层:
Flink
Spark Streaming/ Structure Streaming
流处理层
1.Key Words:- Terminology
01.Streaming Data 流数据 Unbounded datasets - 无界数据集
Cardinality and Constitution:
基数 Cardinality: Bounded data and Unbounded data
constitution:Table and Stream
Time: 事件时间和处理时间,当然并不是所有应用场景都关注事件时间,但其仍然很重要
时间特性(base time characteristic)
处理时间 Processing Time
事件时间 Event Time
提取时间 Ingestion Time
Window 窗口 Tumbling 窗口长度
Sliding 滑动步长 窗口长度
Session 活动时间-长度是动态的
WaterMark 水印
可以在消息侧嵌入,也可以在流处理引擎侧嵌入
作用: 嵌入事件时间轴上,用于判断事件时间窗口内所有数据均已到达引擎的一种时间推理工具
种类:完美水印 和启发式水印
水印迟到:会拉长窗口生存期,水印早:到会导致数据处理结果不准确
迟到生存期(Allowed Lateness)
最大允许延迟(超过即可回收)
Triggers :触发器
提前,准时和延迟
在太慢的情况下 在太快的情况下
Accumulation
累加模式: Accumulation 解决: 定义同一个事件时间窗口多个聚合结果之间的关系:
丢弃 discarding
累加 accumulating
撤回 retracting
Excellent Performance
Latency : 延迟
Throughput 吞吐量
Guaranteed correctness
Exactley-once
Fault Tolerance 容错--
State: 有状态的计算 状态管理器 StateBackend 基于内存,基于文件 基于 RockDB
引擎定义的状态
应用程序自定义的状态
Checkpoint
SnapShot 快照
Copy On Write
Redirect On Write
SavePoint
Stream and Table -- 把relation algebra发展成为time-varying relation
Streams → tables 流表一体化-
动态表(Dynamic Table) -- 流 Stream。 流可以看做动态表,动态表可以看做流
Tables are data at rest -- Streams are data in motion
four what, where, when, how questions map onto a streams/tables world
What: Transformations
Where: Windowing
When: Triggers
How: Accumulation
Data: Keys, values, windows, and partitioning
Operations :
stream → stream: Nongrouping (element-wise) operations
stream → table: Grouping operations
table → stream: Ungrouping (triggering) operations
2.面对的问题
无序且事件时间偏差不固定
开始和结束的标志信息不存在--根据领域知识推定
idle
BackPressure:
常用的场景:短时负载高峰导致系统接收数据的速率远高于它处理数据的速率
反压机制就是指系统能够自己检测到被阻塞的Operator,然后系统自适应地降低源头或者上游的发送速率
理论四问:
What/Where/When/How
解决数据乱序的问题
方案一: 设定固定时间间隔的session窗口
方案二: 设定session的事件推进标志 --使用水印
即在source节点或者业务逻辑系统中定期插入时间推进控制信息
方案三: 提供触发器机制实时生成近似值,如果之后观察到这个节点,撤回之前的结果并生成新的数据
3.具体的开发
1.数据输入- DataSource
第三方数据源 数据源连接器
内置数据源:集合数据源 文件数据源 Socket数据源
2.DataStream 转换操作 Transformation Operator
01.Single-DataStream
filter map
02.Multi-DataStream
流合并
Union 需要保证两个数据集的格式一致,输出和输入格式一致
Connect 合并两种或多种不同类型的数据集-合并后混合数据类型保留了数据集的数据类型
Join
滑动窗口关联 滚动窗口关联 会话窗口关联 间隔关联
流拆分:
Split 算子 + select
03.物理分区操作
Random Partitioning
Roundobin Partitioning
Rescaling Partitioning
Custom Partitioning
BroadCasting
3.DataSinks数据输出
基本数据输出
第三方数据输出
具体案例:
使用用户自定义的分区来为每一个元素选择具体的task.
dataStream.partitionCustom(partitioner, "someKey")
第二个参数是指定分区器使用到的字段--分区字段 Tuple可以通过字段名称指定,其他类型的数据集则通过位置索引指定
4.物理执行过程
Flink会尽可能地将operator的subtask链接(chain)在一起形成task
并不是任意两个 operator 就能 chain 一起的。其条件还是很苛刻的:
上下游的并行度一致
下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
上下游节点都在同一个 slot group 中
下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
两个节点间数据分区方式是 forward(参考理解数据流的分区)
用户没有禁用 chain
Operator chain的行为可以通过编程API中进行指定。可以通过在DataStream的operator
这两个方法都是通过调整operator的 chain 策略(HEAD、NEVER)来实现的。
另外,也可以通过调用StreamExecutionEnvironment.disableOperatorChaining()来全局禁用chaining
5. 管理
管控——Manager
01.高可用:
故障恢复
主节点崩溃 JobManager
从节点崩溃
主从节点通信故障
通常 JobManager 负责处理 后面的来两种故障
保持 JobManager的高可用
YARN 负责保证集群的高可用
02.安全:
外部系统访问内部数据传输安全
节点通信安全
监控和日志
监控: 动态准实时或实时系统
01.层级:
系统级监控
应用级监控:(自定义和框架提供的)
02.构成:
Metrix 度量指标
Reporter: 向监控展示工具 实时 上传度量指标
03.监控方式:
DashBoard
Rest API 获取
04.监控指标的方式
A:定义指标
定义方式:
计数器 Counter
度量器 Gauge 没有内置实现,需要自己实现Gauge
直方图 Histogram 封装的类,没有提供默认实现
计量器 Meter
指标的作用域
System-Provided Scope
User-Defined Scope
B: 上传指标-Report
配置文件中配置
JMX Graphite Prometheus
StatsD DataLog Slf4j
自定义Report接口
说明:
JMX,是Java Management Extensions(Java管理扩展)的缩写,是一个为应用程序植入管理功能的框架。
用户可以在任何Java应用程序中使用这些代理和服务实现管理
Graphite Graphite是一个开源实时的、显示时间序列度量数据的图形系统,通常用于监控基础设施级别的度量
Prometheus是一个开源监控系统,它前身是SoundCloud的警告工具包
StatsD 是一个 NodeJs 的 daemon 程序,简单轻巧,使用 UDP 协议,专门用来收集数据,收集完数据就发送到其他服务器进行处理
思路:
监控不能只是各种数据的采集和罗列,不仅仅是弄若干个报表并进一步配置成仪表盘,而是有一定智能,
仿照我们日常的排查问题思路,建立一定规则,自动检查,深度检查,友情提示
日志: 事后静态分析方法
Flink中的
日志门面: 日志记录是使用 slf4j 日志记录界面实现的。
日志库: 作为底层日志记录框架,具体实现日志的相关功能。
Flink使用 log4j, 同时还提供了logback配置文件
附录:
日志门面:广泛使用的 slf4j 和 commons-logging
日志库: log4j logback log-jdk
日志适配器
日志门面适配器: sld4j-log4j 适配器解决接口不兼容的问题
日志库适配器
基本概念:
SLF4J代表Simple Logging Facade for Java,项目的logging与logging具体的实现分离,
将应用程序和日志记录框架分离
建了SLF4J作为Jakarta commons-logging框架的替代品-Apache Common Logging(即Jakarta Commons Logging,简称JCL
logger 被定义为 static 变量,是因为这个 logger 与当前类绑定 ,
避免每次都 new 一个新对象,造成资源浪费,甚至引发 OutOfMernoryError 问题。
Dashbord 页面介绍:
OverView
Running Jobs
Completed Jobs
Task Managers
Job Manager
Submit new Job
具体介绍
1.OverView
左边:显示这个job申请的Task Managers 个数,task slots个数,和(Available )可用的task slots数,
右边:显示了总共的job,正在运行的job,以及完成的,取消的,和失败的job的个数
下面:
Running Jobs
Start_Time End_Time Duration Job_Name Job_ID Tasks Status
Completed Jobs
2.Running Jobs
Overview
job graph
Subtasks Task_Metrics Watermarks Accumulators Checkpoints Back_Pressure
Timeline
Exceptions
COnfiguration
3.Completed Jobs
4.Task Managers
Path, ID Data_Port Last_Heartbeat All_Slots Free_Slots
CPU_Cores Physical_Memory JVM_Heap_Size Flink_Managed_Memory
点击进入:
Metrics
Overview
Memory: JVM (Heap/Non-Heap) --Committed -- Used --Maximum
Type: Heap Non-Heap
Outside JVM --Count --Used --Capacity
Direct Mapped
Network : Memory Segments
Garbage Collection
5.Job Manager:
configuration:会显示在conf里面配置的所有配置信息
checkpoint.mode at-least-once
checkpoint.interval
checkpoint.timeout
state.backend filesystem
state.checkpoints.dir
state.savepoints.dir
6.Submit new Job
Flink不仅支持用命令提交任务,也支持在页面提交任务,接下来我们就在页面上提交一个任务
点击add new按钮,会弹出选择jar包,我们还选择刚才那个.
然后右边有显示一个upload按钮,点击upload后,右边会显示一个进度条
参考:
Flink web UI的使用介绍 https://blog.csdn.net/xianpanjia4616/article/details/88958504