KS 是 KeyedStream 中 key 的类型;
IN1 是数据流(即非广播流)的元素类型;
IN2 是广播流的元素类型;
OUT 是两个流连接完成后,输出流的元素类型
public abstract class KeyedBroadcastProcessFunction <KS, IN1, IN2, OUT>
extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction {
processElement // 函数处理数据流的数据
value 是数据流中的一个元素;
ctx 是上下文,可以提供计时器服务、当前 key和只读的 BroadcastState; ctx.getBroadcastState
out 是输出流收集器。
processBroadcastElement// 处理广播流的数据
onTimer
public abstract class Context extends
public abstract class ReadOnlyContext extends
public abstract class OnTimerContext extends
}
public abstract class BaseBroadcastProcessFunction extends AbstractRichFunction {
getBroadcastState
}
public abstract class AbstractRichFunction implements RichFunction, Serializable {
getRuntimeContext
getIterationRuntimeContext
open
close
}
Flink源码 BroadcastProcessFunction
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
processElement
processBroadcastElement
public abstract class Context extends
public abstract class ReadOnlyContext extends
}
Flink中有两种方式 make data available to all parallel instances of a function
Via function parameters/closures or via broadcast sets.
Spark Broadcast 广播变量
//将标签广播
List<String> labelColums = Arrays.asList(labels);
Broadcast<List<String>> broadcastColumnList = spark.sparkContext().broadcast(labelColums, ClassManifestFactory.classType(List.class));
//使用-具名函数的方式
JavaRDD<Row> TestRdd = TestSet.javaRDD().flatMap(new TestMatchFuntion(broadcastColumnList))
其中函数
public class TestMatchFuntion implements FlatMapFunction<Row, Row>{
public TestMatchFuntion(Broadcast<List<String> broadcastColumnList) {
cityList = broadcastColumnList.getValue();
}
@Override
public Iterator<Row> call(Row r) throws Exception {}
}
Spark源码 broadcast
org.apache.spark.broadcast
Spark's broadcast variables, used to broadcast immutable datasets to all nodes.
作用: BroadCast就是将数据从一个节点发送到其它的节点
abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging
private[spark] trait BroadcastFactory
private[spark] class BroadcastManager
private[spark] class TorrentBroadcastFactory extends BroadcastFactory
private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) extends Broadcast[T](id) with Logging with Serializable
其中: BroadcastManager.newBroadcast( )
使用getValue时,会执行实例初始化时定义的lazy的函数readBroadcastBlock
BroadCast的两种广播方式
HttpBroadCast方式 HttpBroadCast会出现单点故障,网络IO瓶颈
TorrentBroadCast方式
Spark广播使用案例
packagecom.letour.main;importcom.google.common.collect.Lists;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.FlatMapFunction;importorg.apache.spark.broadcast.Broadcast;importorg.apache.spark.sql.*;importorg.apache.spark.sql.types.DataTypes;importorg.apache.spark.sql.types.StructField;importorg.apache.spark.sql.types.StructType;importjava.util.*;publicclassDimenBroad{publicstaticvoidmain(String[]args){SparkSessionspark=SparkSession.builder().appName("Invalid").enableHiveSupport().config("spark.sql.shuffle.partitions",200).getOrCreate();/***待匹配的表年级和年级对应的表*/Dataset<Row>gradeDF=spark.sql("select grade_nm,grade_cd from tmp_test.tmp_date_ed where grade_cd is not null ");JavaRDD<Row>origGradeRDD=gradeDF.javaRDD();/***待广播的表*/List<Row>grades=spark.sql("select grade from tmp_test.tmp_data_td").collectAsList();//将年级广播List<String>gradeColums=newArrayList(grades);if(gradeColums!=null&&!gradeColums.isEmpty()){for(Rowb:grades){Stringgrade=b.getAs("grade");gradeColums.add(grade);}}//构造广播的情况JavaSparkContextsparkContext=JavaSparkContext.fromSparkContext(spark.sparkContext());finalBroadcast<List<String>>dimGradeBroad=sparkContext.broadcast(gradeColums);JavaRDD<Row>memRDD=origGradeRDD.flatMap(newFlatMapFunction<Row,Row>(){@OverridepublicIterator<Row>call(Rowt)throwsException{Stringgrade_nm=t.getAs("grade_nm");Stringgrade_cd=t.getAs("grade_cd");Rowresult=null;List<String>iList=dimGradeBroad.value();if(iList.contains(grade_nm)){result=RowFactory.create(Grade_nm,grade_cd);}return(Iterator<Row>)result;}}).repartition(50);//转换为Dataset数据类型Dataset<Row>matchDF=spark.createDataFrame(memRDD,getSchema());/***写入到表*/matchDF.write().mode(SaveMode.Overwrite).insertInto("tmp_test.result_date_ed");//matchDF.createOrReplaceTempView("tmp_chase_d");//spark.sql("insert overwrite table tmp_test.result_date_d partition(stat='2019') select * from tmp_chase_d");//执行结束spark.stop();}/***转换为Dataset数据类型所需要的结构*/privatestaticStructTypegetSchema(){ArrayList<StructField>fields=newArrayList<StructField>();StructFieldGrade_nm=DataTypes.createStructField("grade_nm",DataTypes.StringType,true);StructFieldGrade_cd=DataTypes.createStructField("grade_cd",DataTypes.StringType,true);fields.add(grade_nm);fields.add(grade_cd);returnDataTypes.createStructType(fields);}}
Spark中的Row
To create a new Row, use RowFactory.create() in Java or Row.apply() in Scala
A Row object can be constructed by providing field values
01.RowFactory
public class RowFactory {
public static Row create(Object ... values) {
return new GenericRow(values);
}
}
02.GenericRow
class GenericRow(protected[sql] val values: Array[Any]) extends Row{}
03.Row
trait Row extends Serializable {
def size: Int = length
def length: Int
def schema: StructType = null
def apply(i: Int): Any = get(i)
def get(i: Int): Any
getBoolean getByte getShort getInt getLong getFloat getDouble getString getDecimal
getDate getTimestamp
getSeq getList getMap getJavaMap getStruct getAs getAnyValAs
getValuesMap
/** Checks whether the value at position i is null. */
def isNullAt(i: Int): Boolean = get(i) == null
anyNull
fieldIndex
toSeq mkString
equals copy toString hashCode
}
BooleanType -> java.lang.Boolean
ByteType -> java.lang.Byte
ShortType -> java.lang.Short
IntegerType -> java.lang.Integer
FloatType -> java.lang.Float
DoubleType -> java.lang.Double
StringType -> String
DecimalType -> java.math.BigDecimal
DateType -> java.sql.Date
TimestampType -> java.sql.Timestamp
BinaryType -> byte array
ArrayType -> scala.collection.Seq (use getList for java.util.List)
MapType -> scala.collection.Map (use getJavaMap for java.util.Map)
StructType -> org.apache.spark.sql.Row
参考:
Variables Closures vs. Broadcast Variables
https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables