一个POJO类型:
1、class是public的且是独立的(不是非static内部类)The class is public and standalone (no non-static inner class)
2、class有public 无参构造函数
3、所有 class 中的
01.非静态的、非局部字段是private类型
02.或者非静态的、非局部字段是private类型,且public类型的getter和setter方法
4.字段类型须是Flink支持的
eg:本例中采用的是字段是private,且有public类型的getter和setter方法
public class Person implements Serializable {
private String name;
private int age;
/**
* public constructor to make it a Flink POJO
*/
public Person() {
}
public String getName() {
return name;
}
public int getAge() {
return age;
}
public void setName(String name) {
this.name= name;
}
public void setAge(int age) {
this.age= age;
}
@Override
public String toString() {
return "Person{" + "name=" + name + ", age='" + age + '\'' + '}';
}
}
在程序中的使用
//DataStream 造数据
Person userA = new Person();
userA.setName("Happy");
userA.setAge(19);
Person userB = new Person();
userB.setName("Lucky");
userB.setAge(20);
DataStream<Person> orderAStream = env.fromElements(
userB,
userA
);
orderAStream.keyBy("age")
Flink Table&SQL中
关系型API:Table和SQL
Table API提供LINQ(:Language Integrated Quer)语言集成查询,是Scala和Java的LINQ
SQL 提供结构化查询语言(Structured Query Language)
Row :具有任意数量字段的元组并支持空字段
Row 和 BinaryRow
org.apache.flink.types.Row
说明: /**
* A Row can have arbitrary number of fields and contain a set of fields,
* which may all be different types.
* The fields in Row can be null. Due to Row is not strongly typed, Flink's
* type extraction mechanism can't extract correct field types. So that users should manually
* tell Flink the type information via creating a {@link RowTypeInfo}.
* The fields in the Row can be accessed by position (zero-based) {@link #getField(int)}. And can
* set fields by {@link #setField(int, Object)}.
* Row is in principle serializable. However, it may contain non-serializable fields,
* in which case serialization will fail.
示例:
* Row.of("hello", true, 1L);}
* Or
* Row row = new Row(3);
* row.setField(0, "hello");
* row.setField(1, true);
* row.setField(2, 1L);
Flink 1.9- 1.10的数据类型 BinaryRow
org.apache.flink.table.dataformat.BinaryRow
A Row has two part: Fixed-length part and variable-length part.
A special row which is backed by MemorySegment instead of Object
Table API and SQL 中的数据类型
1.兼容性- Time types, array types, and the decimal type need special attention
JVM Types Java Expression String
org.apache.flink.table.api.Types
org.apache.flink.table.api.DataTypes
SQL standard’s data type
Flink 1.9
import static org.apache.flink.table.api.DataTypes.*;
DataType t = DataTypes.ARRAY(DataTypes.INT().notNull()).bridgedTo(int[].class);
eg:
SQL_DATE SQL_DATE() DATE().bridgedTo(java.sql.Date.class)
StreamTableEnvironment和TableEnvironment
public abstract class StreamExecutionEnvironment {
fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)
public <OUT> DataStreamSource<OUT>
fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo)
public <OUT> DataStreamSource<OUT>
fromCollection(Iterator<OUT> data, Class<OUT> type) {
return fromCollection(data, TypeExtractor.getForClass(type));
}
typeInfo.getTypeClass()
typeInfo.createSerializer(getConfig())
*Because the iterator will remain unmodified until the actual execution happens,
* the type of data returned by the iterator must be given explicitly in the form of the type
* class (this is due to the fact that the Java compiler erases the generic type information)
}
object StreamTableEnvironment {
def fromDataStream[T](dataStream: DataStream[T], fields: String):
}
各个类的之间区别和联系
StreamTableEnvironment 和 TableEnvironment
flink-table-planner
org.apache.flink.table.api.java.StreamTableEnvironment
abstract class StreamTableEnvironment(private[flink] val execEnv: StreamExecutionEnvironment,config: TableConfig)
extends TableEnvironment(config) {}
org.apache.flink.table.api.TableEnvironment
* The abstract base class for batch and stream TableEnvironments
abstract class TableEnvironment(val config: TableConfig)
持有对ExecutionEnvironment 或StreamExecutionEnvironment的引用
Blink规划器将多个接收器优化为一个DAG(仅在TableEnvironment上支持,而不在StreamTableEnvironment上支持)。
旧规划器将始终将每个接收器优化为新的DAG,其中所有DAG彼此独立
类的方法的区别
TableEnvironment
getTableEnvironment @deprecated
create
Table 数据的元数据存储介质 CataLog
Data Types & Serialization
Flink 会试图去推断出在分布式计算过程中交换和存储的数据的类型信息
Flink的 TypeInformation 类,TypeInformation类是所有类型描述类的基类,
它揭示了数据类型的一些基础属性,并产生序列化器在默写特殊情况下生成类型比较器
Flink 内部实现了名为 TypeExtractror 的类,可以利用方法签名、子类信息等蛛丝马迹,自动提取和恢复类型信息
TypeHint 的原理
是创建匿名子类, type hints
运行时 TypeExtractor 可以通过 getGenericSuperclass(). getActualTypeArguments() 方法获取保存的实际类型
TypeHints in the form of returns(new TypeHint<Tuple2<Integer, SomeType>>(){}).
The TypeHint class
can capture generic type information and preserve it for the runtime (via an anonymous subclass).
Flink 自带了很多 TypeSerializer 子类,大多数情况下各种自定义类型都是常用类型的排列组合,因而可以直接复用
如果不能满足,那么可以继承 TypeSerializer 及其子类以实现自己的序列化器。
Flink应对类型擦除
01.Flink通过Java反射机制尽可能重构类型信息
02.借助类型提示来告诉系统函数传入的参数类型信息和输出参数信息
//Java中
// 对于非泛型数据类型,你可以传递Class:
TypeInformation.of(String.class);
// 对于泛型数据类型,你需要通过TypeHint来捕获数据累心:
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}
其他:
泛型
Java泛型之类型参数:
E - Element (在集合中使用,因为集合中存放的是元素)
T - Type(Java 类)
K - Key(键)
V - Value(值)
N - Number(数值类型)
? - 表示不确定的java类型
< ? extends E> -上界通配符
< ? super E> -下界通配符
S、U、V - 2nd、3rd、4th types
Flink
T
O
IN1
OUT
ACC
数据类型
数组
枚举
enum
Enumeration
Spliterator * @since 1.8
Arrays
Properties
Java集合框架 - Java Collections Framework
* @param <E> the type of elements in this collection
* Set
* List
* Map
* SortedSet
* SortedMap
* HashSet
* TreeSet
* ArrayList
* LinkedList
* Vector
* Collections
* Arrays
* AbstractCollection
* @since 1.2