Flink广播状态和Spark广播变量

基本概念

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;

Flink Broadcast State

// 设置广播变量和获取广播变量。
访问 state 前先要定义状态描述符(StateDescriptor). BroadcastState 的状态描述符是 MapStateDescriptor.
MapStateDescriptor 的 value 类型即是广播流的元素类型,这个例子里是 Map<String,Object>。
// 01.数据流
   KeyedStream<Tuple2<String, String>, String> keyedStream  = ……
// 02.广播流
//001.定义mapstatdesc 
    MapStateDescriptor<String, String> testStateDescriptor = new MapStateDescriptor<>("testStat", String.class, TestStat.class);
//002.广播测试数据
 BroadcastStream<String> testBroadcastStream = testInfoLogSource.map(new TestInfoParseFunction())
                                                                        .name("TestInfoParseMap")
                                                                        .broadcast(testStateDescriptor);
//03.连接数据流  数据流.connect(广播流)    
   DataStream<TestResultBean> matchStream = keyedStream.connect(testBroadcastStream)
                                                        .process(new TestMatchFunctionNew())
                                                        .setParallelism("Test")
                                                        .name("TestMatchFunction")  ;   
其中:  
   public class TestMatchFunctionNew extends KeyedBroadcastProcessFunction<String, Tuple2<String,String>, String, String>{}
 KeyedStream
     .connect(BroadcastStream)
    .process(new KeyedBroadcastProcessFunction<>(…))
    KeyedStream 连接 BroadcastStream 的,
  只能使用 KeyedBroadcastProcessFunction 函数处理连接逻辑
 KeyedBroadcastProcessFunction 比 BroadcastProcessFunction 
    多了计时器服务和获取当前 key 接口

 noKeyedStream
   .connect(BroadcastStream)
   .process(new BroadcastProcessFunction<>(…)) 
   非 KeyedStream 连接 BroadcastStream 的,
    只能使用 BroadcastProcessFunction 函数处理连接逻辑

Flink源码 KeyedBroadcastProcessFunction

    KS 是 KeyedStream 中 key 的类型;
    IN1 是数据流(即非广播流)的元素类型;
    IN2 是广播流的元素类型;
    OUT 是两个流连接完成后,输出流的元素类型
   public abstract class KeyedBroadcastProcessFunction <KS, IN1, IN2, OUT> 
      extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction {
         processElement         // 函数处理数据流的数据
            value 是数据流中的一个元素;
            ctx 是上下文,可以提供计时器服务、当前 key和只读的 BroadcastState; ctx.getBroadcastState
            out 是输出流收集器。
         processBroadcastElement// 处理广播流的数据
         onTimer
          public abstract class Context extends
          public abstract class ReadOnlyContext extends
         public abstract class OnTimerContext extends
      }
       public abstract class BaseBroadcastProcessFunction extends AbstractRichFunction {
         getBroadcastState
        }
         public abstract class AbstractRichFunction implements RichFunction, Serializable {
           getRuntimeContext
           getIterationRuntimeContext
           open
           close
         }

Flink源码 BroadcastProcessFunction

   public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
      processElement
      processBroadcastElement
      public abstract class Context extends
      public abstract class ReadOnlyContext  extends      
    }
Flink中有两种方式  make data available to all parallel instances of a function
   Via function parameters/closures or via broadcast sets.

Spark Broadcast 广播变量

  //将标签广播
  List<String> labelColums = Arrays.asList(labels);
   Broadcast<List<String>> broadcastColumnList = spark.sparkContext().broadcast(labelColums, ClassManifestFactory.classType(List.class));
 //使用-具名函数的方式
   JavaRDD<Row> TestRdd = TestSet.javaRDD().flatMap(new TestMatchFuntion(broadcastColumnList))

   其中函数
     public class TestMatchFuntion implements FlatMapFunction<Row, Row>{
           public TestMatchFuntion(Broadcast<List<String> broadcastColumnList) {
           cityList = broadcastColumnList.getValue();
        }
         @Override
          public Iterator<Row> call(Row r) throws Exception {}
     }

Spark源码 broadcast

  org.apache.spark.broadcast
  Spark's broadcast variables, used to broadcast immutable datasets to all nodes.
  作用: BroadCast就是将数据从一个节点发送到其它的节点
    abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging
    private[spark] trait BroadcastFactory
    private[spark] class BroadcastManager
    private[spark] class TorrentBroadcastFactory extends BroadcastFactory 
    private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) extends Broadcast[T](id) with Logging with Serializable
其中: BroadcastManager.newBroadcast( ) 
 使用getValue时,会执行实例初始化时定义的lazy的函数readBroadcastBlock
BroadCast的两种广播方式
   HttpBroadCast方式   HttpBroadCast会出现单点故障,网络IO瓶颈
   TorrentBroadCast方式

Spark广播使用案例

package com.letour.main;   
import com.google.common.collect.Lists;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.*;

public class DimenBroad {
public static void main(String[] args) {
    SparkSession spark = SparkSession
            .builder()
            .appName("Invalid")
            .enableHiveSupport()
            .config("spark.sql.shuffle.partitions", 200)
            .getOrCreate();
    /**
     * 待匹配的表 年级和年级对应的表
     */
    Dataset<Row> gradeDF = spark.sql("select grade_nm,grade_cd from tmp_test.tmp_date_ed where grade_cd is  not null ");
    JavaRDD<Row> origGradeRDD = gradeDF.javaRDD();

    /**
     * 待广播的表
     */
    List<Row> grades = spark.sql("select grade from tmp_test.tmp_data_td").collectAsList();
    //将年级广播
    List<String> gradeColums = new ArrayList(grades);
    if (gradeColums != null && !gradeColums.isEmpty()) {
        for (Row b : grades) {
            String grade = b.getAs("grade");
            gradeColums.add(grade);
        }
    }
    //构造广播的情况
    JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
    final Broadcast<List<String>> dimGradeBroad = sparkContext.broadcast(gradeColums);
    JavaRDD<Row> memRDD = origGradeRDD.flatMap(new FlatMapFunction<Row, Row>() {
        @Override
        public Iterator<Row> call(Row t) throws Exception {
            String grade_nm = t.getAs("grade_nm");
            String grade_cd = t.getAs("grade_cd");
            Row result = null;
            List<String> iList = dimGradeBroad.value();
            if (iList.contains(grade_nm)) {
                result= RowFactory.create(Grade_nm, grade_cd);
            }
            return (Iterator<Row>) result;
        }
    }).repartition(50);
   //转换为Dataset数据类型
    Dataset<Row> matchDF = spark.createDataFrame(memRDD, getSchema());

    /**
     * 写入到表
     */
     matchDF.write().mode(SaveMode.Overwrite).insertInto("tmp_test.result_date_ed");
    //  matchDF.createOrReplaceTempView("tmp_chase_d");
    //  spark.sql("insert overwrite table tmp_test.result_date_d partition(stat='2019') select * from tmp_chase_d");
    //执行结束
    spark.stop();
}
 /**
  * 转换为Dataset数据类型所需要的结构
  */
  private static StructType getSchema() {
      ArrayList<StructField> fields = new ArrayList<StructField>();
      StructField Grade_nm = DataTypes.createStructField("grade_nm", DataTypes.StringType, true);
      StructField Grade_cd = DataTypes.createStructField("grade_cd", DataTypes.StringType, true);
      fields.add(grade_nm);
      fields.add(grade_cd);
    return DataTypes.createStructType(fields);
 }
}

Spark中的Row

To create a new Row, use RowFactory.create() in Java or Row.apply() in Scala
A Row object can be constructed by providing field values
01.RowFactory
  public class RowFactory {
  public static Row create(Object ... values) {
    return new GenericRow(values);
  }
}
02.GenericRow
  class GenericRow(protected[sql] val values: Array[Any]) extends Row{}
03.Row
  trait Row extends Serializable {
  def size: Int = length
  def length: Int
  def schema: StructType = null
  def apply(i: Int): Any = get(i)
  def get(i: Int): Any
      getBoolean getByte getShort getInt getLong getFloat getDouble getString getDecimal
      getDate getTimestamp
      getSeq getList getMap getJavaMap getStruct getAs getAnyValAs
      getValuesMap
   /** Checks whether the value at position i is null. */
    def isNullAt(i: Int): Boolean = get(i) == null
   anyNull
   fieldIndex
   toSeq  mkString
   equals  copy toString hashCode
}

BooleanType -> java.lang.Boolean
ByteType -> java.lang.Byte
ShortType -> java.lang.Short
IntegerType -> java.lang.Integer
FloatType -> java.lang.Float
DoubleType -> java.lang.Double
StringType -> String
DecimalType -> java.math.BigDecimal

DateType -> java.sql.Date
TimestampType -> java.sql.Timestamp

BinaryType -> byte array
ArrayType -> scala.collection.Seq (use getList for java.util.List)
MapType -> scala.collection.Map (use getJavaMap for java.util.Map)
StructType -> org.apache.spark.sql.Row

参考:

 Variables Closures vs. Broadcast Variables
    https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables

blogroll

social