Flink数据类型和Tuple

1.Flink Type_Information

DataType
  每一个数据类型都有其对应序列化器,flink有自身的序列化器和kryo序列化器   
类型推断
 类型自动推断
  函数调用的输入类型 通常可以由之前操作的结果类型来推断
    TypeExtractror 的类,可以利用方法签名、子类信息等蛛丝马迹,自动提取和恢复类型信息
 类型手动返回
  可以实现 ResultTypeQueryable 接口,通过输入格式和函数来告诉API它们确切的返回类型
 用到类型的情况:
   使用类型信息来生成序列化器和比较器,并提供语义检查
   函数调用的输入类型 通常可以由之前操作的结果类型来推断
   Mapping of Data Types to Table Schema       
    TypeConverters 提供了 DataType 和 Typeinformation 之间的相互转化。
   自定义的类型信息
     TypeSerializer
类型擦除解决方式:
  将数据类型传递给TypeInformation构造函数:
    对于非泛型数据类型,你可以传递Class:
    对于泛型数据类型,你需要通过TypeHint来捕获数据类型

2.Flink中数据

Java类型信息
1.Basci 基本类型:所有的Java基本类型和装箱类型以及void,String,Date,BigDecimal,和BigInteger。
The data type of the resulting DataStream or DataSet, 
   01.原生数据类型 Atomic Type
     Integer, String 和 Double等
   02.值类型
     Flink预定义的值类型与原生数据类型是一一对应的
2.BasciArray
3.Composite 复合类型
   03.POJO (Java and Scala)
   04.泛型类      
   05.Scala的case class类(Scala的Tuple也是一种特殊的case class),是一个复合类型,
   包含了固定数量的不同类型的字段
      Tuple字段用1到偏移位置坐标记
    也可以根据字段名称来获取
    Tuples (Scala and Java) and Case Classes (Scala only)
    Row
其他
   4.通用类型:这些不会被Flink本身序列化,而是由Kryo序列化
   5.辅助类型(选项,任一,列表,Map,......)

3.Flink的数据类型

不同层级之间的数据类型以及操作的不同,以及之间的相互转化
DataStreaming数据流类型:
    DataStream  
     KeyedStream 
     WindowedStream  AllWindowedStream
     JoinedStreams  CoGroupedStreams ConnectedStreams
Table层级 --// register a Table
    StreamTableEnvironment TableSource Table  TableSink
    操作:
      数据类型
        registerTable registerExternalCatalog
        registerTableSource registerTableSink
      操作:
        SQLQuery:   sqlQuery  sqlUpdate
        Table API:  scan  select insertInto filter groupBy  where unionAll explain                     
不同API之间的转换
   01.Convert a DataStream or DataSet into a Table:
   02.Convert a Table into a DataStream or DataSet
  操作:

  数据: Mapping of Data Types to Table Schema
      based on the field positions or 
      based on the field names.

4.类型

泛型 - 参数化类型,将类型当作参数传递给一个类或者是方法
  编程时:
  编译时:泛型又提供了一种类型检测的机制,只有相匹配的数据才能正常的赋值,否则编译器就不通过。
  所以说,它是一种类型安全检测机制
类型转换

类型擦除
  泛型信息只存在于代码编译阶段,在进入 JVM 之前,与泛型相关的信息会被擦除掉,专业术语叫做类型擦除。
  类型擦除是泛型能够与之前的 java 版本代码兼容共存的原因
类型推断:
  Java 需要通过上下文细节来推断该类型
  局部变量类型推断(local-variable type inference)
  推断的局限性

5.Flink Java Tuples

 (Flink Java API的一部分):最多25个字段,空字段不支持
构建者模式:
Builder:         为创建Product对象的各个部件指定抽象接口。
ConcreteBuilder: 实现Builder的接口以构造和装配该产品的各个部件,定义并明确它所创建的表示,并提供一个检索产品的接口。
Director:        构造一个使用Builer接口的对象。
Product:         表示被构造的复杂对象。
   ConcreteBuilder创建该产品的内部表示并定义它的装配过程,包含定义组成部件的类,以及将这些部件装配成最终产品的接口。

Tuple 的构建:

01. Tuple  --抽象Build类-构建与产品类相关的部件
public abstract class Tuple implements java.io.Serializable{
  public static final int
  public abstract <T> T getField(int pos);
  public <T> T getFieldNotNull(int pos){}
  public abstract <T> void setField(T value, int pos);
  public abstract int getArity();
  public abstract <T extends Tuple> T copy();
  public static Class<? extends Tuple> getTupleClass(int arity) {}
  public static Tuple newInstance(int arity) {}
  private static final Class<?>[] CLASSES = new Class<?>[] {}
  }

02. Tuple2  --抽象Build的实现类-真正构建产品的类

public class Tuple2<T0, T1> extends Tuple{

    public T0 f0;
    public T1 f1;
    public Tuple2() {}
    public int getArity() {
        return 2;
    }
    public <T> T getField(int pos)
    public <T> void setField(T value, int pos)
    public void setFields(T0 value0, T1 value1) {}
    public String toString() {
    public boolean equals(Object o) {
    public int hashCode() {
    public static <T0, T1> Tuple2<T0, T1> of (T0 value0, T1 value1){
     return new Tuple2<>(value0,value1);
     }   
}

03.Tuple2Builder   Director --将builder构建的产品部件组装成产品,对外隐藏内部组装细节   
public class Tuple2Builder<T0, T1> {
    private List<Tuple2<T0, T1>> tuples = new ArrayList<>();
    public Tuple2Builder<T0, T1> add(T0 value0, T1 value1){
        tuples.add(new Tuple2<>(value0, value1));
        return this;
    }
    @SuppressWarnings("unchecked")
    public Tuple2<T0, T1>[] build(){
        return tuples.toArray(new Tuple2[tuples.size()]);
    }
}

04.main  使用
 List<Tuple2<String, Integer>> input1 = Arrays.asList(
                    new Tuple2Builder<String, Integer>()
                            .add("foo", 1)
                            .add("foobar", 1)
                            .add("foo", 1)
                            .add("bar", 1)
                            .add("foo", 1)
                            .add("foo", 1)
                            .build()    
updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(23, null), Tuple2.of(10, "10"));

说明:
    面对的场景:
     各个子对象组合出这个复杂对象的过程相对来说比较稳定,但是子对象的创建过程各不相同并且可能面临变化
    解决的方式
      将配置从目标类中隔离出来,避免作为过多的setter方法
      Builder模式比较常见的实现形式是通过链式调用
    -- 复杂对象的 构建 与它的 表示 分离

TupleTypeInfo

      public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
        public TupleTypeInfo(TypeInformation<?>... types) {
         this((Class<T>) Tuple.getTupleClass(types.length), types);
          }
          getFieldNames
          getFieldIndex
          createSerializer
         private class TupleTypeComparatorBuilder
         getGenericParameters
         getBasicTupleTypeInfo
         getBasicAndBasicValueTupleTypeInfo
       }
    public abstract class TupleTypeInfoBase<T> extends CompositeType<T> 
    public abstract class CompositeType<T> extends TypeInformation<T> 
    public abstract class TypeInformation<T> implements Serializable 
TupleSerializer
     public class TupleSerializer<T extends Tuple> extends TupleSerializerBase<T> implements SelfResolvingTypeSerializer<T> {
TupleSerializerBase 
    public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {

Tuple:
     Flink tuples 是固定长度固定类型的Java Tuple实现
     TupleTypeInfo 任意的Flink tuple 类型(支持Tuple1 to Tuple25)
     package org.apache.flink.api.java.tuple;
     package org.apache.flink.api.java.typeutils;
     package org.apache.flink.api.common.typeutils;
     package org.apache.flink.api.common.typeinfo;
Java 和Scala
CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)
Flink的TypeInformation类
TypeInformation类是所有类型描述类的基类,它揭示了数据类型的一些基础属性,并产生序列化器在默写特殊情况下生成类型比较器

相关语法

三元运算符
 @Override
 public int hashCode() {
    int result = f0 != null ? f0.hashCode() : 0;
    return result;
 }

 @Override
 public int hashCode() {
    int result = f0 != null ? f0.hashCode() : 0;
    result = 31 * result + (f1 != null ? f1.hashCode() : 0);
    return result;
 }

 @Override
 public int hashCode() {
    int result = f0 != null ? f0.hashCode() : 0;
    result = 31 * result + (f1 != null ? f1.hashCode() : 0);
    result = 31 * result + (f2 != null ? f2.hashCode() : 0);
    return result;
 }

参考:

flink-annotations源码分析 https://blog.csdn.net/hxcaifly/article/details/84558346
http://wuchong.me/
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html

blogroll

social