packagecom.structureStreamingimportorg.apache.spark.SparkConfimportorg.apache.spark.sql.functions._importorg.apache.spark.sql.SparkSessionimportorg.apache.spark.sql.streaming.StreamingQueryimportorg.apache.spark.sql.types.{StringType,StructField,StructType}importscala.collection.mutableimportcom.util.CommonUtilsobjectkafkaInputextendsCommonUtils{defgetSparkSession={SparkSession.builder().config(newSparkConf().setMaster(getProperties("mr.master"))).appName(getProperties("mr.appName")).getOrCreate()}defcreateOrReplaceTempViewname(spark:SparkSession,kafkaTopic:String,sourceName:String):Unit={importspark.implicits._valdf=spark.readStream.format("kafka").option("kafka.bootstrap.servers",getProperties("kafka.brokers")).option("subscribe",kafkaTopic).option("startingOffsets","latest").load()//创建jsonschema-指定表结构的方式创建DataSet预定义json的格式并解析kafka的数据内容valschema=StructType(mutable.Seq(StructField("CT",StringType),StructField("LT",StringType),StructField("TA",StringType)))if(schema!=null){//DataFrame的createOrReplaceTempView("tablename")将其创建或者替换一个临时视图,即表tablename。//就可以用spark.sql方法在表tablename上运行SQL语句了valjsonDf=df.selectExpr("CAST(key AS STRING)","cast (value as string) as jsonFormatValue").select(from_json($"jsonFormatValue",schema=schema).as("parsedData"))jsonDf.select("parsedData.*").createOrReplaceTempView(sourceName)}else{println("error,schema is null")}}defcreateNormal(spark:SparkSession,kafkaTopic:String,sourceName:String):Unit={importspark.implicits._//.trigger(ProcessingTime("25 seconds"))valdf=spark.readStream.format("kafka").option("kafka.bootstrap.servers",getProperties("kafka.brokers")).option("subscribe",kafkaTopic).option("startingOffsets","latest").load()valjsonDf=df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)").as[(String,String)]jsonDf.createOrReplaceTempView(sourceName)}defsqlWriteStream(spark:SparkSession,sql:String):StreamingQuery={valquery=spark.sql(sql).writeStream.outputMode("append").format("console").option("truncate","false").option("numRows",25).start()query}defmain(args:Array[String]):Unit={//建立连接valspark=getSparkSession//读取kafka的主题topic,并将数据注册为临时表dy_testvaltopicName="data-people"println(topicName)importspark.implicits._valdf=spark.readStream.format("kafka").option("kafka.bootstrap.servers",getProperties("kafka.brokers")).option("subscribe",topicName).load()//value是字节数组,执行CAST的作用是将value转换为字符串valjsonDf=df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)").as[(String,String)]jsonDf.createOrReplaceTempView("dy_test")//createNormal(spark,topicName,"dy_test")valquerysql="select * from dy_test"println(querysql)valquerys=sqlWriteStream(spark,querysql)spark.streams.awaitAnyTermination()//从字符串中解析为各个字段并注册成临时表,进而对表进行SQL操作//createOrReplaceTempViewname(spark,topicName,"dy_test")/*valsqls=Array("select * from dyl_test","select *,'2' as e from dy_test")valquerys=mutable.ListBuffer[StreamingQuery]()for(sql<-sqls){println(sql)querys+=sqlWriteStream(spark,sql)spark.streams.awaitAnyTermination()}*/}}
排查错误形式
错误形式一:
Caused by: java.lang.ClassNotFoundException: Failed to find data source: kafka.
依赖关系:
kafka data source is an external module and is not available to Spark applications by default.
两种方式
一种是将依赖打入包内
另外一种是提交命令时添加到--package
解决参考:
https://stackoverflow.com/questions/48011941/why-does-formatkafka-fail-with-failed-to-find-data-source-kafka-with-ube
错误形式二:2018/6/29
报错
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.SparkSession
This may be because you have two spark version on the same machine
解决方式:
spark2-submit --master yarn --deploy-mode cluster
spark-submit --master yarn-cluster
其中
spark-submit -> /Users/test/sparks/spark-1.6.2-bin-hadoop2.6/bin/spark-submit
spark2-submit -> /Users/test/sparks/spark-2.1.1-bin-hadoop2.7/bin/spark-submit