Published: 2019-07-08 21:10:00
By ytwan
In Big Data .
tags: Flink
1.Flink Type_Information
DataType
每一个数据类型都有其对应序列化器,flink有自身的序列化器和kryo序列化器
类型推断
类型自动推断
函数调用的输入类型 通常可以由之前操作的结果类型来推断
TypeExtractror 的类,可以利用方法签名、子类信息等蛛丝马迹,自动提取和恢复类型信息
类型手动返回
可以实现 ResultTypeQueryable 接口,通过输入格式和函数来告诉API它们确切的返回类型
用到类型的情况:
使用类型信息来生成序列化器和比较器,并提供语义检查
函数调用的输入类型 通常可以由之前操作的结果类型来推断
Mapping of Data Types to Table Schema
TypeConverters 提供了 DataType 和 Typeinformation 之间的相互转化。
自定义的类型信息
TypeSerializer
类型擦除解决方式:
将数据类型传递给TypeInformation构造函数:
对于非泛型数据类型,你可以传递Class:
对于泛型数据类型,你需要通过TypeHint来捕获数据类型
2.Flink中数据
Java类型信息
1.Basci 基本类型:所有的Java基本类型和装箱类型以及void,String,Date,BigDecimal,和BigInteger。
The data type of the resulting DataStream or DataSet,
01.原生数据类型 Atomic Type
Integer, String 和 Double等
02.值类型
Flink预定义的值类型与原生数据类型是一一对应的
2.BasciArray
3.Composite 复合类型
03.POJO (Java and Scala)
04.泛型类
05.Scala的case class类(Scala的Tuple也是一种特殊的case class),是一个复合类型,
包含了固定数量的不同类型的字段
Tuple字段用1到偏移位置坐标记
也可以根据字段名称来获取
Tuples (Scala and Java) and Case Classes (Scala only)
Row
其他
4.通用类型:这些不会被Flink本身序列化,而是由Kryo序列化
5.辅助类型(选项,任一,列表,Map,......)
3.Flink的数据类型
不同层级之间的数据类型以及操作的不同,以及之间的相互转化
DataStreaming数据流类型:
DataStream
KeyedStream
WindowedStream AllWindowedStream
JoinedStreams CoGroupedStreams ConnectedStreams
Table层级 --// register a Table
StreamTableEnvironment TableSource Table TableSink
操作:
数据类型
registerTable registerExternalCatalog
registerTableSource registerTableSink
操作:
SQLQuery: sqlQuery sqlUpdate
Table API: scan select insertInto filter groupBy where unionAll explain
不同API之间的转换
01.Convert a DataStream or DataSet into a Table:
02.Convert a Table into a DataStream or DataSet
操作:
数据: Mapping of Data Types to Table Schema
based on the field positions or
based on the field names.
4.类型
泛型 - 参数化类型,将类型当作参数传递给一个类或者是方法
编程时:
编译时:泛型又提供了一种类型检测的机制,只有相匹配的数据才能正常的赋值,否则编译器就不通过。
所以说,它是一种类型安全检测机制
类型转换
类型擦除
泛型信息只存在于代码编译阶段,在进入 JVM 之前,与泛型相关的信息会被擦除掉,专业术语叫做类型擦除。
类型擦除是泛型能够与之前的 java 版本代码兼容共存的原因
类型推断:
Java 需要通过上下文细节来推断该类型
局部变量类型推断(local-variable type inference)
推断的局限性
5.Flink Java Tuples
(Flink Java API的一部分):最多25个字段,空字段不支持
构建者模式:
Builder: 为创建Product对象的各个部件指定抽象接口。
ConcreteBuilder: 实现Builder的接口以构造和装配该产品的各个部件,定义并明确它所创建的表示,并提供一个检索产品的接口。
Director: 构造一个使用Builer接口的对象。
Product: 表示被构造的复杂对象。
ConcreteBuilder创建该产品的内部表示并定义它的装配过程,包含定义组成部件的类,以及将这些部件装配成最终产品的接口。
Tuple 的构建:
01 . Tuple -- 抽象Build类 - 构建与产品类相关的部件
public abstract class Tuple implements java . io . Serializable {
public static final int
public abstract < T > T getField ( int pos );
public < T > T getFieldNotNull ( int pos ){}
public abstract < T > void setField ( T value , int pos );
public abstract int getArity ();
public abstract < T extends Tuple > T copy ();
public static Class <? extends Tuple > getTupleClass ( int arity ) {}
public static Tuple newInstance ( int arity ) {}
private static final Class < ?> [] CLASSES = new Class <? > [] {}
}
02. Tuple2 -- 抽象Build的实现类 - 真正构建产品的类
public class Tuple2 < T0 , T1 > extends Tuple {
public T0 f0 ;
public T1 f1 ;
public Tuple2 () {}
public int getArity () {
return 2 ;
}
public < T > T getField ( int pos )
public < T > void setField ( T value , int pos )
public void setFields ( T0 value0 , T1 value1 ) {}
public String toString () {
public boolean equals ( Object o ) {
public int hashCode () {
public static < T0 , T1 > Tuple2 < T0 , T1 > of ( T0 value0 , T1 value1 ){
return new Tuple2 <> ( value0 , value1 );
}
}
03. Tuple2Builder Director -- 将builder构建的产品部件组装成产品 , 对外隐藏内部组装细节
public class Tuple2Builder < T0 , T1 > {
private List < Tuple2 < T0 , T1 >> tuples = new ArrayList <> ();
public Tuple2Builder < T0 , T1 > add ( T0 value0 , T1 value1 ){
tuples . add ( new Tuple2 <> ( value0 , value1 ));
return this ;
}
@ SuppressWarnings ( "unchecked" )
public Tuple2 < T0 , T1 > [] build (){
return tuples . toArray ( new Tuple2 [ tuples . size ()]);
}
}
04. main 使用
List < Tuple2 < String , Integer >> input1 = Arrays . asList (
new Tuple2Builder < String , Integer > ()
. add ( "foo" , 1 )
. add ( "foobar" , 1 )
. add ( "foo" , 1 )
. add ( "bar" , 1 )
. add ( "foo" , 1 )
. add ( "foo" , 1 )
. build ()
updateEmpty = mapOf ( Tuple2 . of ( 3 , "3" ), Tuple2 . of ( 5 , "5" ), Tuple2 . of ( 23 , null ), Tuple2 . of ( 10 , "10" ));
说明:
面对的场景:
各个子对象组合出这个复杂对象的过程相对来说比较稳定,但是子对象的创建过程各不相同并且可能面临变化
解决的方式
将配置从目标类中隔离出来,避免作为过多的setter方法
Builder模式比较常见的实现形式是通过链式调用
-- 复杂对象的 构建 与它的 表示 分离
TupleTypeInfo
public final class TupleTypeInfo < T extends Tuple > extends TupleTypeInfoBase < T > {
public TupleTypeInfo ( TypeInformation <? >... types ) {
this (( Class < T > ) Tuple . getTupleClass ( types . length ), types );
}
getFieldNames
getFieldIndex
createSerializer
private class TupleTypeComparatorBuilder
getGenericParameters
getBasicTupleTypeInfo
getBasicAndBasicValueTupleTypeInfo
}
public abstract class TupleTypeInfoBase < T > extends CompositeType < T >
public abstract class CompositeType < T > extends TypeInformation < T >
public abstract class TypeInformation < T > implements Serializable
TupleSerializer
public class TupleSerializer < T extends Tuple > extends TupleSerializerBase < T > implements SelfResolvingTypeSerializer < T > {
TupleSerializerBase
public abstract class TupleSerializerBase < T > extends TypeSerializer < T > {
Tuple:
Flink tuples 是固定长度固定类型的Java Tuple实现
TupleTypeInfo 任意的Flink tuple 类型 ( 支持Tuple1 to Tuple25 )
package org . apache . flink . api . java . tuple ;
package org . apache . flink . api . java . typeutils ;
package org . apache . flink . api . common . typeutils ;
package org . apache . flink . api . common . typeinfo ;
Java 和Scala
CaseClassTypeInfo : 任意的 Scala CaseClass ( 包括 Scala tuples ) 。
Flink的TypeInformation类
TypeInformation类是所有类型描述类的基类,它揭示了数据类型的一些基础属性,并产生序列化器在默写特殊情况下生成类型比较器
相关语法
三元运算符
@Override
public int hashCode() {
int result = f0 != null ? f0.hashCode() : 0;
return result;
}
@Override
public int hashCode() {
int result = f0 != null ? f0.hashCode() : 0;
result = 31 * result + (f1 != null ? f1.hashCode() : 0);
return result;
}
@Override
public int hashCode() {
int result = f0 != null ? f0.hashCode() : 0;
result = 31 * result + (f1 != null ? f1.hashCode() : 0);
result = 31 * result + (f2 != null ? f2.hashCode() : 0);
return result;
}
参考:
flink-annotations源码分析 https://blog.csdn.net/hxcaifly/article/details/84558346
http://wuchong.me/
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html