Flink失败恢复重启和KeySelector

重启方式:

RestartStrategyConfigurations
    无重启           (No restart)
    固定间隔延迟重启 (Fixed delay)
    失败率           (Failure rate)    
    备选     (Fallback )
Fallback 后备重启策略,即所谓的默认重启策略;Cluster level default restart strategy
      集群中如果没有在配置文件(flink-conf.yaml)中显示的配置重启策略,
      也没有在编程中配置重启策略,在检查点机制开启的情况下
-- NoOrFixedIfCheckpointingEnabledRestartStrategyFactory
    Default restart strategy that resolves 
    either to {@link NoRestartStrategy} or {@link FixedDelayRestartStrategy}
    depending if checkpointing was enabled.
    如果没有启用 checkpointing,则使用无重启 (no restart) 策略。
    如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略,其中 Integer.MAX_VALUE 参数是尝试重启次数

重启的源码:

RestartStrategyConfiguration是个抽象类,它定义了getDescription抽象方法,
它有NoRestartStrategyConfiguration、
    FallbackRestartStrategyConfiguration
    FixedDelayRestartStrategyConfiguration、
    FailureRateRestartStrategyConfiguration、
具体示例:
    NoRestartStrategyConfiguration extends RestartStrategyConfiguration
    00. RestartStrategyConfiguration   // Abstract configuration for restart strategies.
    01.
    02.FallbackRestartStrategyConfiguration
          Cluster level default restart strategy

使用案例:
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10L, TimeUnit.SECONDS)));
     //NANOSECONDS   纳秒,符号ns(英语: nanoseconds )
    // Time unit representing one thousandth of a millisecond   MICROSECONDS   
    // Time unit representing one thousandth of a second        MILLISECONDS  微秒,符号μs microsecond 
    //  SECONDS    MINUTES  HOURS  DAYS

重启级别

全局重启策略        全局配置文件flink-conf.yaml
单个job的重启策略   可以通过编程的方式指定 一般情况下,编程方式指定的单个job重启策略会将全局策略覆盖
api的方式是优先于配置文件的,作用范围仅限于当前任务执行的上下文

KeySelector

KeySelector接口继承了Function接口定义了getKey方法用于从IN类型中提取出KEY
  The KeySelector to be used for extracting the key for partitioning
实现KeySelector接口可以自定义key值选取
  public interface KeySelector<IN, KEY> extends Function, Serializable {
    KEY getKey(IN value) throws Exception;}

public class NullByteKeySelector<T> implements KeySelector<T, Byte> {
   private static final long serialVersionUID = 614256539098549020L;
   @Override
   public Byte getKey(T value) throws Exception {
    return 0;
   }
   }
 源码使用示例
 public class DataStream<T> {
        public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
        Preconditions.checkNotNull(key);
        return new KeyedStream<>(this, clean(key));
    }}

 public class AllWindowedStream<T, W extends Window> {
     public AllWindowedStream(DataStream<T> input,WindowAssigner<? super T, W> windowAssigner) {
        this.input = input.keyBy(new NullByteKeySelector<T>());
        this.windowAssigner = windowAssigner;
        this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
      }}
 public class KeyedStream<T, KEY> extends DataStream<T> {
   public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {}
   public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {}
   public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {}
   public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {}  
 }

blogroll

social