org.apache.spark.sql.Encoders
1. Encoders.STRING()
Encoder<String> stringEncoder = Encoders.STRING();
def STRING: Encoder[java.lang.String] = ExpressionEncoder()
2. Encoders.bean()
// Encoders are created for Java beans 自定义的Bean对象类型
Encoder<Person> personEncoder = Encoders.bean(Person.class);
def bean[T](beanClass: Class[T]): Encoder[T] = ExpressionEncoder.javaBean(beanClass)
3. Encoders.product()
* An encoder for Scala's product type (tuples, case classes, etc).
def product[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder()
4. Encoders.tuple()
/** An encoder for 5-ary tuples.
*/
def tuple[T1, T2, T3, T4, T5](
e1: Encoder[T1],
e2: Encoder[T2],
e3: Encoder[T3],
e4: Encoder[T4],
e5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = {
ExpressionEncoder.tuple(
encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4), encoderFor(e5))
}
Encoders.STRING()
5. javaSerialization
def javaSerialization[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = false)
def kryo [T: ClassTag]: Encoder[T] = genericSerializer(useKryo = true)
def javaSerialization[T](clazz: Class[T]): Encoder[T] = javaSerialization(ClassTag[T](clazz))
def kryo [T](clazz: Class[T]): Encoder[T] = kryo(ClassTag[T](clazz))
类型Row为输出的类型
// Encoder<Row> rowEncoder = Encoders.kryo(Row.class);
Encoder<Row> rowEncoder = Encoders.javaSerialization(Row.class);
Eg:
Encoder<Row> rowEncoder = Encoders.javaSerialization(Row.class);
Dataset<Row> dsString2Row = ds.map(
(MapFunction<String, Row>) value -> RowFactory.create(value, value.length()), rowEncoder);
6. Dataset<Row>
//创建DataSet的时候指明数据是 RowEncoder 类型
List<Row> gradeData = Arrays.asList(
RowFactory.create("abc", "3", "年级人群"),
RowFactory.create("abc", "4", "年级人群"),
RowFactory.create("abc", "9", "年级人群")
);
StructType grdSchema = new StructType(new StructField[]{
new StructField("class_cd", DataTypes.StringType, false, Metadata.empty()),
new StructField("grade_cd", DataTypes.StringType, false, Metadata.empty()),
new StructField("grade_nm", DataTypes.StringType, false, Metadata.empty())
});
//构造Dataset
Dataset<Row> dimensionDF = spark.createDataFrame(gradeData, gradeSchema);
// 构造 Row的 RowEncoder
StructType montiorSchema = new StructType(new StructField[]{
new StructField("class_cd", DataTypes.StringType, false, Metadata.empty()),
new StructField("grade_cd", DataTypes.StringType, false, Metadata.empty()),
new StructField("grade_nm", DataTypes.StringType, false, Metadata.empty())
});
ExpressionEncoder<Row> data2Encoder= RowEncoder.apply(dimensionDF.schema());
Encoder<Row> dataEncoder= RowEncoder.apply(montiorSchema);
//Dataset的 map方法
Dataset<Row> montiorDF = dimensionDF.map(
new MapFunction<Row, Row>(){
@Override
public Row call(Row montiorRow) throws Exception {
String class_cd = montiorRow.getAs("class_cd").toString().toLowerCase();
String grade_cd = montiorRow.getAs("grade_cd");
String grade_nm = montiorRow.getAs("grade_nm");
return RowFactory.create(class_cd, grade_cd, grade_nm);
} },
dataEncoder);
Encoder for Row Type Spark Datasets https://stackoverflow.com/questions/43238693/encoder-for-row-type-spark-datasets
org.apache.spark.sql.catalyst.encoders.RowEncoder
Dataset的源码
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)
def toDF(): DataFrame = new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema))
org.apache.flink.api.common.typeinfo.Types
org.apache.flink.api.common.typeinfo.TypeInformation;
org.apache.flink.api.common.typeinfo.TypeHint
org.apache.flink.api.common.typeutils.TypeSerializer
创建 TypeInformation 或者 TypeSerializer
TypeInformation<String> info = TypeInformation.of(String.class);
This class gives access to the type information of the most common types for which Flink
* has built-in serializers and comparators.
对于泛型类型,你需要通过 TypeHint 来“捕获”泛型类型信息:
TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});
DataSet<SomeType> result = dataSet
.map(new MyGenericNonInferrableFunction<Long, SomeType>())
.returns(SomeType.class); // returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
1.Data Types in the Table API
org.apache.flink.table.types.DataType
重构类型系统:在Flink 1.9版本中实现了一套全新的数据类型系统,这套全新的类型系统与SQL标准进行了完全对齐,能够支持更加丰富的类型。这套
从 Table API 中移除对 Flink TypeInformation 的依赖,并提高其对 SQL 标准的遵从性
The planning for code generation and serialization of runtime operators
Flink Improvement Proposal (FLIP) process
org.apache.flink.table.types.logical.LogicalType
org.apache.flink.table.types.logical
public abstract class LogicalType implements Serializable {
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
DataType t = DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class);
the difference between data type and type information
org.apache.flink.table.api.types.TypeConverters提供了DataType和Typeinformation之间的相互转化。
createExternalTypeInfoFromDataType和createInternalTypeInfoFromDataType
2.SQL standard’s data type and Java Expression
3.Row - org.apache.spark.sql.Row
RDD[Row]
JavaRDD<Row>
Dataset<Row>
List<Row> createDataFrame(rows: java.util.List[Row], schema: StructType)
List<Row> data = Arrays.asList(
RowFactory.create("L5", "a1群"),
RowFactory.create("L9", "a2群")
)
type DataFrame = Dataset[Row]
org.apache.flink.types.Row
Creates a new Row with projected fields from another row.
java.sql.ResultSet