Spark实时计算--Spark Structure Streaming 集成Kafka

Spark实时数据计算,采用Spark进行实时计算有两种形式-
    一种是采用Spark Streaming,其数据类型是Dstreams,
另外一种是采用Spark Structured Streaming,其数据类型是DataFrames

输入和输出

输入源:File 和 Socket  以及Kafka
I.输入源:
 1.读取kafka数据
        key是偏移量,value是一个byte数组
        如果使用聚合,将会有window的概念,对应属性watermark
    01.滑动窗口功能由三个参数决定其功能:窗口时间、滑动步长和触发时间
      window
        timecolumn:具有时间戳的列;
        windowDuration:为窗口的时间长度;
        slideDuration:为滑动的步长;
    02.返回的数据类型是Column
        Column对象传递给DataFrame对象,从而就实现了窗口功能的设置。
II.输出:
  structured streaming
   支持四种输出形式:
        console,parquet,memory,foreach 四种      
   支持三种输出模式:
        Complete mode: 整个更新的结果表都会被输出。
        Append mode: 只有新增加到结果表的数据会被输出。
        Updated mode: 只有被更新的结果表会输出。当前版本暂不支持这个特性
    其他参数设置
        "truncate"只是为了在控制台输出时,不进行列宽度自动缩小。
        option("checkpointLocation", "path/to/HDFS/dir")

代码示例

package com.structureStreaming

import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import scala.collection.mutable
import com.util.CommonUtils

object kafkaInput  extends CommonUtils  {  
def getSparkSession = {
SparkSession
.builder()
.config(new SparkConf().setMaster(getProperties("mr.master")))
.appName(getProperties("mr.appName"))
.getOrCreate()
}

def createOrReplaceTempViewname(spark: SparkSession, kafkaTopic: String, sourceName: String): Unit = {
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", getProperties("kafka.brokers"))
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.load()
// 创建 json schema-指定表结构的方式创建DataSet 预定义json的格式并解析kafka的数据内容
val schema = StructType(mutable.Seq(
 StructField("CT", StringType )
,StructField("LT", StringType )
,StructField("TA", StringType )
))
if (schema != null) {
// DataFramecreateOrReplaceTempView("tablename")将其创建或者替换一个临时视图,即表tablename
// 就可以用spark.sql方法在表tablename上运行SQL语句了
val jsonDf = df.selectExpr("CAST(key AS STRING)", "cast (value as string) as jsonFormatValue")
.select(from_json($"jsonFormatValue", schema = schema).as("parsedData"))
jsonDf.select("parsedData.*").createOrReplaceTempView(sourceName)
} else {
println("error,schema is null")
}
}

def createNormal(spark: SparkSession, kafkaTopic: String, sourceName: String): Unit = {
import spark.implicits._
// .trigger(ProcessingTime("25 seconds"))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", getProperties("kafka.brokers"))
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.load()
val jsonDf = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
jsonDf.createOrReplaceTempView(sourceName)
}

def sqlWriteStream(spark: SparkSession, sql: String): StreamingQuery = {
val query = spark.sql(sql)
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.option("numRows", 25)
.start()
query
}

def main(args: Array[String]): Unit = {
//建立连接
val spark = getSparkSession

//读取kafka的主题topic,并将数据注册为临时表 dy_test
val topicName = "data-people"
println(topicName)
import spark.implicits._

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", getProperties("kafka.brokers"))
.option("subscribe", topicName)
.load()
//value是字节数组,执行CAST的作用是将value转换为字符串
val jsonDf = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
jsonDf.createOrReplaceTempView("dy_test")
//createNormal(spark, topicName, "dy_test")
val querysql = "select * from dy_test"
println(querysql)
val querys = sqlWriteStream(spark, querysql)
spark.streams.awaitAnyTermination()
// 从字符串中解析为各个字段并注册成临时表,进而对表进行SQL操作
//createOrReplaceTempViewname(spark, topicName, "dy_test")
  /*   val sqls = Array("select * from dyl_test", "select *,'2' as e from dy_test")
val querys = mutable.ListBuffer[StreamingQuery]()
for (sql <- sqls) {
println(sql)
querys += sqlWriteStream(spark, sql)
spark.streams.awaitAnyTermination()
}*/
}
}

排查错误形式

错误形式一:
Caused by: java.lang.ClassNotFoundException: Failed to find data source: kafka.
依赖关系:
   kafka data source is an external module and is not available to Spark applications by default.
   两种方式
    一种是将依赖打入包内
    另外一种是提交命令时添加到--package
解决参考: 
https://stackoverflow.com/questions/48011941/why-does-formatkafka-fail-with-failed-to-find-data-source-kafka-with-ube

错误形式二:2018/6/29
 报错
 Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.SparkSession
  This may be because you have two spark version on the same machine
  解决方式:
  spark2-submit  --master yarn --deploy-mode cluster  
  spark-submit   --master yarn-cluster 
  其中
  spark-submit  -> /Users/test/sparks/spark-1.6.2-bin-hadoop2.6/bin/spark-submit
  spark2-submit -> /Users/test/sparks/spark-2.1.1-bin-hadoop2.7/bin/spark-submit

参考:

Structured Streaming整合kafka https://blog.csdn.net/dongyunlon/article/details/79037366

blogroll

social