Flink状态计算概述

Flink状态基本轮廓

01.状态一致性
 at-most-once at-least-once  exactly-once
02.状态计算: 
  Key State 和 Operator state
03.状态存储和恢复
  checkpoint:checkpoint的机制
  savepoint
 恢复机制:    
04.状态管理
  state backkend(分类-配置等)
  State Processor API(Flink 1.9版本)
05.状态查看
  Query

Flink状态具体介绍

1.状态一致性

State Consistency
 at-most-once at-least-once  exactly-once
fault tolerance 容错

2.状态计算-stateful

Stateful operators may maintain information about the events they have received before
01.Key State  应用于 KeyStream
  Manged State
      使用 StateDescription 来获取相应的State的操作类
       RichFunction中可用的RuntimeContext具有访问状态的方
      定义状态的名称,数据类型信息,状态自定义函数
        ValueState          ValueStateDescription    value()/update()
        ListState           ListStateDescription     get()/ update()/add()/addAll()
        MapState            MapStateDescription      get()/put()/keys()/values()/entries()
        ReducingState       ReducingStateDescription  get()/add() 
        AggregatingState    AggregatingStateDescription get()/add() 
     eg: 在RichFlatmapFunction中使用ValueState
       步骤:01.创建ValueStateDescription,02.使用getRuntimeContext.getState() 03.使用value()方法
     生命周期TTL
  Raw State
     raw state是被应用程序自己管理,flink会调用相应的接口方法来实现状态的restore和snapshot
02.Non-key State == operator state
  Manged State
     Checkpointed-Function
     ListCheckpoint
    001. Broadcast State是Flink支持的一种Operator State
       01.首先会创建一个Keyed或Non-Keyed的Data Stream 
       02.然后再创建一个 Broadcasted Stream 
       03.最后通过Data Stream来连接(调用connect方法)到Broadcasted Stream上,
       这样实现将Broadcast State广播到Data Stream下游的每个Task中
  状态数据重分布说明:
    Event-Split Redistribute
    Union Redistribution
  Raw State
03其他:状态数据的划分和动态扩容
   keyed State 动态扩展 当并发度改变时,Group在Task之间重新分配

3.状态数据的备份和恢复

01.原理: Checkpoint的执行流程是按照Chandy-Lamport算法实现的
     Initiating a snapshot: 也就是开始创建 snapshot,可以由系统中的任意一个进程发起
     Propagating a snapshot: 系统中其他进程开始逐个创建 snapshot 的过程
     Terminating a snapshot: 算法结束条件
    Flink核心思想是在 input source 端插入 barrier 来替代 Chandy-Lamport 算法中的 Marker,
    通过控制 barrier 的同步来实现 snapshot 的备份和 exactly-once 语义

02.配置
  检查点 Checkpointing --CheckpointConfig
   启动检查点机制
       public StreamExecutionEnvironment enableCheckpointing(long interval)
       public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode)
       public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force)
      public class CheckpointConfig implements java.io.Serializable {}
    01.checkpointing mode 
        Checkpointing(检查点)模式(exactly-once vs. at-least-once)
       EXACTLY_ONCE, AT_LEAST_ONCE
       使用:  CheckpointingMode.EXACTLY_ONCE
               CheckpointingMode.AT_LEAST_ONCE
               其中: DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE
    02.checkpointInterval 缺省值 -1,不开启
       设置检查点的步长- interval  ms
          check.point.interval = 6000
    03.checkpointTimeout 超时时间  缺省值: 10 minutes
        如果超过这个时间checkpoint还没结束,就会被认为是失败的
    04.设置checkpoint的并发度
      并发度为1
         setMinPauseBetweenCheckpoints 缺省值为 0
      并发度:
         setMaxConcurrentCheckpoints  缺省值为 1
          默认情况下,在一个checkpoint正在执行的过程中,系统不会触发另一个checkpoint
          当定义了相邻checkpoint最小间隔时间时,不能使用此选项
    05.checkpoint执行过程中出现失败或者错误,
        FailOnCheckpointingErrors  任务是否同时关闭-默认值是True
        setFailOnCheckpointingErrors
        enableExternalizedCheckpoints
          deleteOnCancellation
          DELETE_ON_CANCELLATION
          RETAIN_ON_CANCELLATION               
    06.获取信息
       判断是否启用检查点
        getCheckpointInterval
          Returns the checkpointing interval or -1 if checkpointing is disabled.
        isCheckpointingEnabled
        isFailOnCheckpointingErrors
        isExternalizedCheckpointsEnabled
        getExternalizedCheckpointCleanup
    07. State recovery 恢复的机制
       RestartStrategies job失败后如何重启
         noRestart           无重启策略
         fallBackRestart
         fixedDelayRestart
         failureRateRestart  失败率重启策略
        配置说明:
           01.如果checkpoint未启动,就会采用no restart策略,
           02.如果启动了checkpoint机制,但是未指定重启策略的话,就会采用 fixed-delay 策略,重试Integer.MAX_VALUE 次
             可以在配置文件中配置设置: 固定延迟重启策略(Fixed Delay Restart Strategy)
            env.setRestartStrategy
               (RestartStrategies.fixedDelayRestart( 3, // 尝试重启次数
                              Time.of(10, TimeUnit.SECONDS) // 延迟时间间隔)
        );
Savepoint:
   用户手动触发Checkpoint,并将结果持久化到指定的存储路径中
   常用的场景: 升级中保存系统的状态数据,保证实现端到端的Excatly-Once语义保证
    Operator ID

4.状态管理 State management

作用: 存储和管理checkpoint过程的状态数据
三种类型的状态管理器
  基于内存的   MemoryStateBackend   存储在JVM对内存中特点:快速高效但容量有限制
  基于文件的       FsStateBackend
  基于RocksDB RocksDBStateBackend   增量checkpoint,通过JNI方式进行数据交互,有大小限制2G
StateBackend 来存储和管理Checkpoints过程的状态数据
    说明:
      默认情况下,状态保存在TaskManager的内存中,checkpoint存储在JobManager的内存中。
        为了适配大状态的持久化,Flink支持在其它后端状态中存储和checkpoint状态
         CHECKPOINTS_DIRECTORY
      Flink目前只对没有迭代的作业提供处理保证。在迭代作业中进行checkpointing会导致异常发生,
         如果用户强制要在迭代应用中启用checkpoint,则需要设置env.enableCheckpointing(interval, force = true)
         但这也不能保证处在循环边界上的数据和状态不会丢失。
    在内存级别的查看

5.状态查看

架构 Querable State
    QuerableStateServer
    QueryableStateClient
    QueryableStateClientProxy
 开启可查询的状态服务
   01.集群层面: 引入jar包 flink-queryable-state-runtime.jar
   02.代码层面: StateDescription 中调用 setQueryable() 方法 -配置可查询状态服务
   03.通过外部应用查询状态数据

参考:

Flink使用 Broadcast State 实现流处理配置实时更新 http://ju.outofmemory.cn/entry/371335
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/broadcast_state.html
Apache Flink 零基础入门教程(六):状态管理及容错机制 https://juejin.im/post/5d8367c36fb9a06b2a20785a

blogroll

social