Flink和Spark类型编码

Spark数据集的行类型编码器(Encoder for Row Type Spark Datasets)
转换时需要指定一个转换的Encorder,
在Scala代码中是通过隐式转换进行的,而在Java代码中则需要在代码中指明
Flink也有类似的系统

Spark中Encoders

org.apache.spark.sql.Encoders
  1. Encoders.STRING()
     Encoder<String> stringEncoder = Encoders.STRING();
    def STRING: Encoder[java.lang.String] = ExpressionEncoder()

 2. Encoders.bean()
    // Encoders are created for Java beans  自定义的Bean对象类型
    Encoder<Person> personEncoder = Encoders.bean(Person.class); 
    def bean[T](beanClass: Class[T]): Encoder[T] = ExpressionEncoder.javaBean(beanClass)

 3. Encoders.product()
  * An encoder for Scala's product type (tuples, case classes, etc).
   def product[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder()

 4. Encoders.tuple()
  /** An encoder for 5-ary tuples.
  */
  def tuple[T1, T2, T3, T4, T5](
    e1: Encoder[T1],
    e2: Encoder[T2],
    e3: Encoder[T3],
    e4: Encoder[T4],
    e5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = {
    ExpressionEncoder.tuple(
      encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4), encoderFor(e5))
  }
     Encoders.STRING()

5. javaSerialization
   def javaSerialization[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = false)
   def kryo             [T: ClassTag]: Encoder[T] = genericSerializer(useKryo = true)
   def javaSerialization[T](clazz: Class[T]): Encoder[T] = javaSerialization(ClassTag[T](clazz))
   def kryo             [T](clazz: Class[T]): Encoder[T] = kryo(ClassTag[T](clazz))
   类型Row为输出的类型
  // Encoder<Row> rowEncoder = Encoders.kryo(Row.class);
    Encoder<Row> rowEncoder = Encoders.javaSerialization(Row.class);
 Eg:
    Encoder<Row> rowEncoder = Encoders.javaSerialization(Row.class);
    Dataset<Row> dsString2Row = ds.map(
            (MapFunction<String, Row>) value -> RowFactory.create(value, value.length()), rowEncoder);
6.   Dataset<Row>
//创建DataSet的时候指明数据是 RowEncoder 类型
List<Row> gradeData = Arrays.asList(
        RowFactory.create("abc", "3", "年级人群"),
        RowFactory.create("abc", "4", "年级人群"),
        RowFactory.create("abc", "9", "年级人群")          
);
 StructType grdSchema = new StructType(new StructField[]{
        new StructField("class_cd", DataTypes.StringType, false, Metadata.empty()),
        new StructField("grade_cd", DataTypes.StringType, false, Metadata.empty()),
        new StructField("grade_nm", DataTypes.StringType, false, Metadata.empty())
});
//构造Dataset
Dataset<Row> dimensionDF = spark.createDataFrame(gradeData, gradeSchema);
// 构造 Row的 RowEncoder 
StructType montiorSchema = new StructType(new StructField[]{
        new StructField("class_cd", DataTypes.StringType, false, Metadata.empty()),
         new StructField("grade_cd", DataTypes.StringType, false, Metadata.empty()),
        new StructField("grade_nm", DataTypes.StringType, false, Metadata.empty())
});
ExpressionEncoder<Row> data2Encoder=  RowEncoder.apply(dimensionDF.schema());
Encoder<Row>  dataEncoder= RowEncoder.apply(montiorSchema);
//Dataset的 map方法
Dataset<Row>  montiorDF =   dimensionDF.map(
        new MapFunction<Row, Row>(){
            @Override
            public Row call(Row montiorRow) throws Exception {
                String class_cd = montiorRow.getAs("class_cd").toString().toLowerCase();
                String grade_cd = montiorRow.getAs("grade_cd");
                String grade_nm = montiorRow.getAs("grade_nm");
                return RowFactory.create(class_cd, grade_cd, grade_nm);
            } },
        dataEncoder);

Encoder for Row Type Spark Datasets  https://stackoverflow.com/questions/43238693/encoder-for-row-type-spark-datasets
 org.apache.spark.sql.catalyst.encoders.RowEncoder
Dataset的源码
 new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)
 def toDF(): DataFrame = new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema))

Spark 2. ExpressionEncoder

  * A factory for constructing encoders 
    that convert objects and primitives to and from the internal row format using catalyst expressions and code generation

Flink 的 TypeInformation 类

 org.apache.flink.api.common.typeinfo.Types
 org.apache.flink.api.common.typeinfo.TypeInformation; 
 org.apache.flink.api.common.typeinfo.TypeHint  
 org.apache.flink.api.common.typeutils.TypeSerializer
   创建 TypeInformation 或者 TypeSerializer
  TypeInformation<String> info = TypeInformation.of(String.class);

 This class gives access to the type information of the most common types for which Flink
  * has built-in serializers and comparators.

 对于泛型类型,你需要通过 TypeHint 来“捕获”泛型类型信息:
  TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});
 DataSet<SomeType> result = dataSet
  .map(new MyGenericNonInferrableFunction<Long, SomeType>())
    .returns(SomeType.class);  // returns(new TypeHint<Tuple2<Integer, SomeType>>(){})

Flink的Types

org.apache.flink.table.api.Types
  Flink 中DataSet和DataStream 都能与Table 互转
   This class enumerates all supported types of the Table API & SQL

Flink中的DataType

1.Data Types in the Table API
   org.apache.flink.table.types.DataType
  重构类型系统:在Flink 1.9版本中实现了一套全新的数据类型系统,这套全新的类型系统与SQL标准进行了完全对齐,能够支持更加丰富的类型。这套
   从 Table API 中移除对 Flink TypeInformation 的依赖,并提高其对 SQL 标准的遵从性
   The planning for code generation and serialization of runtime operators        
   Flink Improvement Proposal (FLIP) process     
   org.apache.flink.table.types.logical.LogicalType
     org.apache.flink.table.types.logical
       public abstract class LogicalType implements Serializable {
   https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals    
  DataType t = DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class);
  the difference between data type and type information   
  org.apache.flink.table.api.types.TypeConverters提供了DataType和Typeinformation之间的相互转化。
    createExternalTypeInfoFromDataType和createInternalTypeInfoFromDataType

2.SQL standard’s data type and Java Expression

3.Row - org.apache.spark.sql.Row
          RDD[Row]
          JavaRDD<Row>
         Dataset<Row> 
         List<Row>    createDataFrame(rows: java.util.List[Row], schema: StructType)
          List<Row> data = Arrays.asList(
                 RowFactory.create("L5", "a1群"),
                 RowFactory.create("L9", "a2群")
                )
             type DataFrame = Dataset[Row]
       org.apache.flink.types.Row
          Creates a new Row with projected fields from another row.
      java.sql.ResultSet

参考

  Spark SQL中的Encoder https://www.jianshu.com/p/d3c35e18af44      
  Data Types  https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/table/types.html
   https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/table/types.html#data-types-in-the-table-api

blogroll

social