Flink数据类型之POJO和Table中的Row

Flink数据类型支持

TypeInformation主要是为Flink系统内有效的对数据结构类型进行管理,
对数据的类型进行管理和推断
   原生数据类型
   复合类型
     元组 tuple -Java和Scala
     样例类 -Scala
     POJOs类型
     Flink Value类型 : org.apache.flink.types.Value

POJO类型

一个POJO类型:
1、class是public的且是独立的(不是非static内部类)The class is public and standalone (no non-static inner class)
2、class有public 无参构造函数
3、所有 class 中的
   01.非静态的、非局部字段是private类型
   02.或者非静态的、非局部字段是private类型,且public类型的getter和setter方法
4.字段类型须是Flink支持的
 eg:本例中采用的是字段是private,且有public类型的getter和setter方法
 public class Person implements Serializable {
    private String name;
    private int age;

    /**
     * public constructor to make it a Flink POJO
    */
    public Person() {
    }

    public  String getName() {
        return name;
    }
    public int getAge() {
        return age;
    }

    public void setName(String name) {
        this.name= name;
    }
    public void setAge(int age) {
        this.age= age;
    }

    @Override
    public String toString() {
        return "Person{" + "name=" + name + ", age='" + age + '\''  + '}';
    }

  }
 在程序中的使用
   //DataStream 造数据
    Person userA = new Person();
    userA.setName("Happy");
    userA.setAge(19);
    Person userB = new Person();
    userB.setName("Lucky");
    userB.setAge(20);
    DataStream<Person> orderAStream = env.fromElements(
            userB,
            userA
         );
    orderAStream.keyBy("age")

Flink Table&SQL中

关系型API:Table和SQL
    Table API提供LINQ(:Language Integrated Quer)语言集成查询,是Scala和Java的LINQ
    SQL      提供结构化查询语言(Structured Query Language)
Row :具有任意数量字段的元组并支持空字段
Row 和 BinaryRow
 org.apache.flink.types.Row
 说明: /**  
    * A Row can have arbitrary number of fields and contain a set of fields,
    * which may all be different types. 
    * The fields in Row can be null. Due to Row is not strongly typed, Flink's
    * type extraction mechanism can't extract correct field types. So that users should manually
    * tell Flink the type information via creating a {@link RowTypeInfo}.
    * The fields in the Row can be accessed by position (zero-based) {@link #getField(int)}. And can
    * set fields by {@link #setField(int, Object)}.
    * Row is in principle serializable. However, it may contain non-serializable fields,
    * in which case serialization will fail.
示例:
   *  Row.of("hello", true, 1L);}
   * Or
   *  Row row = new Row(3);
   *  row.setField(0, "hello");
   *  row.setField(1, true);
   *  row.setField(2, 1L);
Flink 1.9- 1.10的数据类型 BinaryRow
   org.apache.flink.table.dataformat.BinaryRow  
     A Row has two part: Fixed-length part and variable-length part.
     A special row which is backed by MemorySegment instead of Object
Table API and SQL 中的数据类型
     1.兼容性-  Time types, array types, and the decimal type need special attention
     JVM Types Java Expression String
     org.apache.flink.table.api.Types
     org.apache.flink.table.api.DataTypes
     SQL standard’s data type
     Flink 1.9
      import static org.apache.flink.table.api.DataTypes.*;
        DataType t = DataTypes.ARRAY(DataTypes.INT().notNull()).bridgedTo(int[].class);
    eg:
    SQL_DATE SQL_DATE()  DATE().bridgedTo(java.sql.Date.class)

StreamTableEnvironment和TableEnvironment

 public abstract class StreamExecutionEnvironment {
  fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)
  public <OUT> DataStreamSource<OUT> 
      fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo)

   public <OUT> DataStreamSource<OUT> 
       fromCollection(Iterator<OUT> data, Class<OUT> type) {
    return fromCollection(data, TypeExtractor.getForClass(type));
     }
      typeInfo.getTypeClass()
      typeInfo.createSerializer(getConfig())
   *Because the iterator will remain unmodified until the actual execution happens,
    * the type of data returned by the iterator must be given explicitly in the form of the type
    * class (this is due to the fact that the Java compiler erases the generic type information)
    }


    object StreamTableEnvironment {
      def fromDataStream[T](dataStream: DataStream[T], fields: String):
    }

各个类的之间区别和联系
      StreamTableEnvironment 和 TableEnvironment
        flink-table-planner
         org.apache.flink.table.api.java.StreamTableEnvironment
          abstract class StreamTableEnvironment(private[flink] val execEnv: StreamExecutionEnvironment,config: TableConfig)
                         extends TableEnvironment(config) {}
        org.apache.flink.table.api.TableEnvironment
          * The abstract base class for batch and stream TableEnvironments
          abstract class TableEnvironment(val config: TableConfig) 
          持有对ExecutionEnvironment 或StreamExecutionEnvironment的引用
        Blink规划器将多个接收器优化为一个DAG(仅在TableEnvironment上支持,而不在StreamTableEnvironment上支持)。
          旧规划器将始终将每个接收器优化为新的DAG,其中所有DAG彼此独立

类的方法的区别
  TableEnvironment
   getTableEnvironment @deprecated
   create
  Table 数据的元数据存储介质 CataLog

Data Types & Serialization

Flink 会试图去推断出在分布式计算过程中交换和存储的数据的类型信息
Flink的 TypeInformation 类,TypeInformation类是所有类型描述类的基类,
    它揭示了数据类型的一些基础属性,并产生序列化器在默写特殊情况下生成类型比较器
    Flink 内部实现了名为 TypeExtractror 的类,可以利用方法签名、子类信息等蛛丝马迹,自动提取和恢复类型信息
TypeHint 的原理
    是创建匿名子类, type hints
   运行时 TypeExtractor 可以通过 getGenericSuperclass(). getActualTypeArguments() 方法获取保存的实际类型
   TypeHints in the form of returns(new TypeHint<Tuple2<Integer, SomeType>>(){}). 
   The TypeHint class 
       can capture generic type information and preserve it for the runtime (via an anonymous subclass).
 Flink 自带了很多 TypeSerializer 子类,大多数情况下各种自定义类型都是常用类型的排列组合,因而可以直接复用
     如果不能满足,那么可以继承 TypeSerializer 及其子类以实现自己的序列化器。
 Flink应对类型擦除
  01.Flink通过Java反射机制尽可能重构类型信息
  02.借助类型提示来告诉系统函数传入的参数类型信息和输出参数信息
    //Java中
    // 对于非泛型数据类型,你可以传递Class:
        TypeInformation.of(String.class);
    // 对于泛型数据类型,你需要通过TypeHint来捕获数据累心:
       TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}

其他:

泛型
 Java泛型之类型参数:
  E              - Element (在集合中使用,因为集合中存放的是元素)
  T               - Type(Java 类)
  K               - Key(键)
  V               - Value(值)
  N               - Number(数值类型)
  ?               -  表示不确定的java类型
  < ? extends E>  -上界通配符
  < ? super E>    -下界通配符 
  S、U、V  - 2nd、3rd、4th types

 Flink
     T
     O
    IN1
    OUT
    ACC
 数据类型
    数组
    枚举  
     enum
     Enumeration
     Spliterator  * @since 1.8
     Arrays
    Properties
 Java集合框架 - Java Collections Framework 
  * @param <E> the type of elements in this collection
  *  Set
  *  List
  *  Map
  *  SortedSet
  *  SortedMap
  *  HashSet
  *  TreeSet
  *  ArrayList
  *  LinkedList
  *  Vector
  *  Collections
  *  Arrays
  *  AbstractCollection
  * @since 1.2

迭代器

1.JVM
 01.java.util.Iterator   
  hasNext next  remove
   @since 1.8
   import java.util.function.Consumer;
       default void forEachRemaining(Consumer<? super E> action) {
           Objects.requireNonNull(action);
           while (hasNext())
               action.accept(next());
       }
  foreachforEachRemaining是可以依托于迭代器通过Lamdba遍历的,
  其中forEachRemaining只能用一次
 02.java.lang.Iterable
   public interface Iterable<T> { Iterator<T> iterator();
   * @since 1.8
     */
    default void forEach(Consumer<? super T> action) {
        Objects.requireNonNull(action);
        for (T t : this) {
            action.accept(t);
        }
 java.util.ListIterator
  * @since 1.8 
 java.util.Spliterator
  除了支持顺序遍历之外,添加了Java 8对于并行的支持
   tryAdvance
 Stream  
     public interface Stream<T> extends BaseStream<T, Stream<T>> 
 Java8新功能
   行为参数化-Lambda表达式
   Java8 加入默认方法
   方法引用
java.util.function
   Function: 接收参数,并返回结果,主要方法 R apply(T t)
   Consumer: 接收参数,无返回结果, 主要方法为 void accept(T t)
  Supplier: 不接收参数,但返回结构,主要方法为 T get()
  Predicate: 接收参数,返回boolean值,主要方法为 boolean test(T t)
2.Flink
org.apache.flink.util
 SplittableIterator
  public abstract class SplittableIterator<T> implements Iterator<T>, Serializable {

参考

Flink原理优化

blogroll

social