Published: 2019-06-08 18:21:00
By ytwan
In Big Data .
tags: things
1.SparkContext 以及 SparkSession
SparkSession和SparkContext之间的关系,以便使用RDD和Dataset(row)
命名习惯:
使用Spark-Shell的时候,Spark会自动帮助我们建立好了
一个名字为spark的SparkSesson和
一个名字为sc 的SparkContext
01.基于SparkSession创建 SparkSession
* Start a new session with isolated SQL configurations, temporary tables, registered functions are isolated,
* but sharing the underlying `SparkContext` and cached data.
spark.cloneSession()
spark.newSession()
spark.getDefaultSession()
02.基于 SparkSession 创建 SparkContext
SparkSession可以通过建造者模式创建。
如果SparkContext存在,那么SparkSession将会重用它;
但是如果SparkContext不存在,则创建它
val spark = SparkSession
.builder()
.appName("SQL-JSON")
.master("local[4]")
// .enableHiveSupport()
.getOrCreate()
// 它创建RDD或者是管理集群资源:
val sc = spark.sparkContext
sc.broadcast()
03.基于 SparkContext 创建 SparkSession
显式地创建SparkConf, SparkContext
val conf = new SparkConf().setMaster("master").setAppName("appName1")
val sc = new SparkContext(conf)
2 数据类型变换
01.比较:RDD 以及Dataset、 DataFrame 区别与联系
DataFrame 比 RDD 多了一个表头信息(Schema)-SchemaRDD
organized as columns with column name and types info
dataframe APIs does not support compile-time error.
Dataset 提供了强类型支持,也是在RDD的每行数据加了类型约束。
相比DataFrame,Dataset提供了编译时类型检查 offers compile-time type safety
RDD转换DataFrame后不可逆,但RDD转换Dataset是可逆的
DataSet 和 DataFrame 都使用 Spark’s Catalyst optimizer
02.RDD和DataSet的相互转换
I - A . 把 RDD 变为 DataSet 的方法 - converting RDDs into Datasets
Inferring the Schema Using Reflection 和 Programmatically Specifying the Schema
01. automatically converting an RDD containing case classes to a DataFrame
import spark.implicits._ ( case class 最多支持 22 个字段 ) 本方法的 schema 其实定义在了 case class 里面
. toDF ()
toDF ()
使用 toDF () 方法将 RDD 转换为 DataFrame
本地 seq + toDF 创建 DataFrame 如果直接用 toDF () 而不指定列名字,那么默认列名为 "_1" , "_2" ,
02. 步骤 --
( 1 ) 根据需求从源 RDD 转化成 RDD of rows .
( 2 ) 创建由符合在骤 1 中创建的 RDD 中的 Rows 结构的 StructType 表示的模式。
( 3 ) 通过 SparkSession 提供的 createDataFrame 方法将模式应用于行的 RDD .
03. 直接产生数据
create DataFrames from an existing RDD ,
from a Hive table ,
or from Spark data sources. based on the content of a JSON file
II - B . 把 DataSet 变为 RDD
// spark 不是某个包下面的东西,而是 SparkSession . builder () 对应的变量值
import spark.implicits._
val sourceDF : Dataset [ Row ] = spark . sql ( querySQL )
val sourceRDD = sourceDF . rdd . cache ()
03.不同类型RDD的转换
3.DataSet中视图
View的类型
判断了 viewType 的类型val viewType = if (global) GlobalTempView else LocalTempView
LocalTempView的意思是会话范围的本地临时视图。
*它的生命周期是创建它的会话的生命周期
* 当会话终止时它将被自动删除。
* 它没有绑定到任何数据库,即我们不能使用db1.view1引用一个本地临时视图
GlobalTempView意味着跨会话全局临时视图。
*它的生命周期是Spark应用程序的生命周期,
* 即当应用程序终止时,它将自动删除。
*它绑定到一个保存数据库“global_temp”的系统,我们必须使用限定名来引用全局临时视图,
例如 SELECT * FROM global_temp.view1。
即:GlobalTempView :跨会话 LocalTempView :不跨会话
org.apache.spark.sql.execution.command.views.scala
case class CreateViewCommand
停止使用
spark.catalog.dropTempView("tempViewName")
spark.catalog.dropGlobalTempView("tempViewName")
不同sessions 之间共享数据并保持活动直到application结束时,此功能非常有用
应用:
01.写入到Hive表中--全量表-更新-采用
resultDF
.coalesce(1)
.write
.mode("overwrite")
.saveAsTable(TableName)
02.分区表写入-采用视图-然后执行SQL的insert overwrite
resultDF.createOrReplaceTempView("table1")
val sqlText = "insert overwrite table databasename.table2 partition(partitionfield = "2019-06-15") select * from table1"
spark.sql(sqlText)
4.RDD中的的共享变量
共享变量 - 广播变量 -- import org.apache.spark.broadcast.Broadcast
broadcast
条件:只读比较大的值,广播出去的变量没法再修改
scala 中一切可序列化的对象都是可以进行广播
每个 task 是一个线程,而且同在一个进程运行 tasks 都属于同一个 application 。
因此每个节点( executor )上放一份就可以被所有 task 共享
// 要广播的数据
val data = List ( 1 , 2 , 3 , 4 , 5 , 6 )
// 声明为广播变量
val sc = spark . sparkContext
val bdata = sc . broadcast ( data )
// 其他的 RDD 使用广播变量
// bdata . value bdata . value . size
广播变量:底层有两种广播机制: HttpBroadast 和 TorrentBroadcast
HttpBroadast
HttpBroadcast 是通过传统的 http 协议和 httpServer 去传 data
driver 单点网络瓶颈的问题
TorrentBroadcast
基本思想就是将 data 分块成 data blocks ,然后假设有 executor fetch 到了一些 data blocks ,
那么这个 executor 就可以被当作 data server 了,随着 fetch 的 executor 越来越多,
有更多的 data server 加入, data 就很快能传播到全部的 executor 那里去
更新广播变量的基本思路:
广播变量是在 driver 端初始化 , 在 excetors 端获取这个变量 , 但是不能修改 , 所以 , 我们可以在 driver 端进行更新这个变量
将老的广播变量删除( unpersist ),
然后重新广播一遍新的广播变量
共享变量 -- 累加器
累加器
自定义累加器
累加器 - org . apache . spark . util . AccumulatorV2 }
5相关的一些内容
01.存储级别
MEMORY_ONLY : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中
cache() 方法是使用默认存储级别的快捷设置方法,默认的存储级别是 StorageLevel.MEMORY_ONLY
StorageLevel 对象给 persist() 方法
MEMORY_ONLY_SER : 将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。
这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast serializer时会节省更多的空间,
但是在读取时会增加 CPU 的计算负担。
MEMORY_AND_DISK : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘
DISK_ONLY
02.缓存机制:
SparkCore 的 CacheManager
Spark 的存储级别的选择,核心问题是在内存使用率和 CPU 效率之间进行权衡-直接连续调用cache()或者persist()
RDD 可以使用 persist() 方法或 cache() 方法进行持久化,
persist 会把数据以序列化的形式在JVM的 堆空间
cache()调用的persist(),是使用默认存储级别的快捷设置方法
缓存数据较多的时候,采用LRU-最近最少使用的缓存策略将最老的分区从内存中移除
unpersist
Spark的 checkpoint 机制(保存在hdfs中)
(checkpoint和cache都属于transformation 需要action才能执行
SparkSQL 的 CacheManager
Spark SQL使用内存式列式存储
SparkSQL 的缓存管理最终依靠RDD的Storage.Level来缓存数据
03.故障恢复:
01.RDD出现问题可以由它的依赖也就是Lineage信息可以用来故障恢复
02.对RDD做Checkpoint处理,检查RDD是否被物化或计算,并将结果持久化到磁盘或HDFS。
SparkContext def setCheckpointDir(directory: String)
StreamingContext def checkpoint(directory: String)
1. Checkpoint会把当前RDD保存到一个目录中。
2. Checkpoint的时候,会把所有依赖的父级rdd信息清除掉。
3. Checkpoint不会马上执行,要触发action操作的时候才会执行。
4. 因为 Checkpoint会清除父级RDD的信息,所以在Checkpoint应该先做persist(持久化)操作,否则就要重新计算一遍。
5. 一般来说,Lineage链较长、宽依赖的RDD需要采用检查点机制。
区别与联系
cache缓存数据由executor管理,当executor消失了,被cache的数据将被清除,RDD重新计算,
而checkpoint将数据保存到磁盘或HDFS,job可以从checkpoint点继续计算
适用场景:
01.一块是在spark core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,
将RDD数据保存到可靠存储(如HDFS)以便数据恢复;
02.另外一块是应用在spark streaming中,使用checkpoint用来保存DStreamGraph以及相关配置信息
DataFrame和SQL在底层的执行过程
逻辑计划和物理计划两个阶段
逻辑计划阶段
01. Parser 解析 SQL ,生成 AST ,并将 AST 最终转化成 Unresolved Logical Plan
02. 由 Analyzer 结合 Catalog 信息生成 Resolved Logical Plan
03. Optimizer 根据预先定义好的规则对 Resolved Logical Plan 进行优化并生成 Optimized Logical Plan
物理计划阶段
04. Query Planner 将 Optimized Logical Plan 转换成多个 Iterator [ Physical Plan ]
05. CBO 根据 Cost Model 算出每个 Physical Plan 的代价并选取代价最小的 Physical Plan 作为最终的 Physical Plan
06. Spark 对选取的物理算子树进行提交前的准备并以 DAG 的方法执行上述 Physical Plan
转换过程在集群的 Driver 端进行,不涉及分布式环境
07. 在执行 DAG 的过程中, Adaptive Execution 根据运行时信息动态调整执行计划从而提高执行效率
自适应执行引擎: 执行阶段 - 自动设置 shuffle partition 数,动态调整执行计划,动态处理数据倾斜等等。
源码:
SparkSession源码
SparkSession.builder 来创建一个 SparkSession 的实例 spark, 并通过 stop 或者close() 函数来停止
org.apache.spark.sql.SparkSession
object SparkSession {
def newSession(): SparkSession = {new SparkSession(sparkContext, Some(sharedState), parentSessionState = None, extensions)}
}
SparkSession override def close(): Unit = stop()
View源码
org.apache.spark.sql.DataSet
createOrReplaceTempView
def createOrReplaceTempView(viewName: String): Unit = withPlan {
createTempViewCommand(viewName, replace = true, global = false)
}
createTempView
def createTempView(viewName: String): Unit = withPlan {
createTempViewCommand(viewName, replace = false, global = false)
}
如果视图已经存在,则更新或者抛出异常
replace if true, and if the view already exists, updates it;
if false, and if the view already exists, throws analysis exception.
createGlobalTempView
def createGlobalTempView(viewName: String): Unit = withPlan {
createTempViewCommand(viewName, replace = false, global = true)
}
参考:
Spark踩坑记:共享变量 https://www.cnblogs.com/liuliliuli2017/p/6782687.html
spark中动态广播变量的使用 https://blog.csdn.net/xianpanjia4616/article/details/82914443
Spark SQL / Catalyst 内部原理 与 RBO http://www.jasongj.com/spark/rbo/