Spark程序入口以及不同层级的API

1.SparkContext 以及 SparkSession

SparkSession和SparkContext之间的关系,以便使用RDD和Dataset(row)
命名习惯:
使用Spark-Shell的时候,Spark会自动帮助我们建立好了
  一个名字为spark的SparkSesson和
  一个名字为sc   的SparkContext

01.基于SparkSession创建 SparkSession

 * Start a new session with isolated SQL configurations, temporary tables, registered functions are isolated, 
 * but sharing the underlying `SparkContext` and cached data.
spark.cloneSession()
spark.newSession()
spark.getDefaultSession()

02.基于 SparkSession 创建 SparkContext

 SparkSession可以通过建造者模式创建。
  如果SparkContext存在,那么SparkSession将会重用它;
  但是如果SparkContext不存在,则创建它
  val spark = SparkSession
   .builder()
   .appName("SQL-JSON")
   .master("local[4]")
  // .enableHiveSupport()
   .getOrCreate()
  // 它创建RDD或者是管理集群资源:
  val sc =   spark.sparkContext     
   sc.broadcast()

03.基于 SparkContext 创建 SparkSession

 显式地创建SparkConf, SparkContext 
  val conf = new SparkConf().setMaster("master").setAppName("appName1")
  val sc = new SparkContext(conf)

2 数据类型变换

01.比较:RDD 以及Dataset、 DataFrame 区别与联系

  DataFrame 比 RDD 多了一个表头信息(Schema)-SchemaRDD
      organized as columns with column name and types info
      dataframe APIs does not support compile-time error.
  Dataset 提供了强类型支持,也是在RDD的每行数据加了类型约束。
    相比DataFrame,Dataset提供了编译时类型检查 offers compile-time type safety
  RDD转换DataFrame后不可逆,但RDD转换Dataset是可逆的
  DataSet 和 DataFrame 都使用 Spark’s Catalyst optimizer

02.RDD和DataSet的相互转换

  I-A.RDD变为DataSet的方法-converting  RDDs into Datasets
      Inferring the Schema Using Reflection Programmatically Specifying the Schema
   01.automatically converting an RDD containing case classes to a DataFrame
     import spark.implicits._  (case class最多支持22个字段) 本方法的schema其实定义在了case class里面
         .toDF()
             toDF()
        使用toDF()方法将RDD转换为DataFrame
       本地seq + toDF创建DataFrame 如果直接用toDF()而不指定列名字,那么默认列名为"_1", "_2",
   02.步骤--
      (1)根据需求从源RDD转化成RDD of rows.
      (2)创建由符合在骤1中创建的RDD中的Rows结构的StructType表示的模式。
      (3)通过SparkSession提供的createDataFrame方法将模式应用于行的RDD.
 03.直接产生数据
     create DataFrames from an existing RDD, 
                       from a Hive table, 
                    or from Spark data sources.  based on the content of a JSON file

 II-B.DataSet变为RDD
      // spark不是某个包下面的东西,而是SparkSession.builder()对应的变量值
       import spark.implicits._
       val sourceDF:Dataset[Row] = spark.sql(querySQL)
       val sourceRDD  = sourceDF.rdd.cache()

03.不同类型RDD的转换

Scala中的隐式类型转换
显式转换

3.DataSet中视图

View的类型
判断了 viewType 的类型val viewType = if (global) GlobalTempView else LocalTempView
 LocalTempView的意思是会话范围的本地临时视图。
  *它的生命周期是创建它的会话的生命周期
  * 当会话终止时它将被自动删除。
  * 它没有绑定到任何数据库,即我们不能使用db1.view1引用一个本地临时视图
 GlobalTempView意味着跨会话全局临时视图。
  *它的生命周期是Spark应用程序的生命周期,
  * 即当应用程序终止时,它将自动删除。
  *它绑定到一个保存数据库“global_temp”的系统,我们必须使用限定名来引用全局临时视图,
   例如 SELECT * FROM global_temp.view1。
   即:GlobalTempView :跨会话 LocalTempView :不跨会话
    org.apache.spark.sql.execution.command.views.scala
       case class CreateViewCommand
  停止使用
     spark.catalog.dropTempView("tempViewName")
     spark.catalog.dropGlobalTempView("tempViewName")
   不同sessions 之间共享数据并保持活动直到application结束时,此功能非常有用
 应用:
  01.写入到Hive表中--全量表-更新-采用
   resultDF
  .coalesce(1)
  .write
  .mode("overwrite")
  .saveAsTable(TableName)
  02.分区表写入-采用视图-然后执行SQL的insert overwrite
   resultDF.createOrReplaceTempView("table1")
   val sqlText = "insert overwrite table  databasename.table2   partition(partitionfield = "2019-06-15") select * from table1"
   spark.sql(sqlText)

4.RDD中的的共享变量

共享变量-广播变量--import org.apache.spark.broadcast.Broadcast
  broadcast
   条件:只读比较大的值,广播出去的变量没法再修改
        scala中一切可序列化的对象都是可以进行广播
  每个 task 是一个线程,而且同在一个进程运行 tasks 都属于同一个 application
  因此每个节点(executor)上放一份就可以被所有 task 共享
  //要广播的数据
  val data = List(1, 2, 3, 4, 5, 6)
  //声明为广播变量
  val sc =   spark.sparkContext     
  val bdata = sc.broadcast(data)
  //其他的RDD使用广播变量
   // bdata.value  bdata.value.size

 广播变量:底层有两种广播机制: HttpBroadast  TorrentBroadcast
  HttpBroadast  
   HttpBroadcast 是通过传统的 http 协议和 httpServer 去传 data
   driver 单点网络瓶颈的问题
  TorrentBroadcast
    基本思想就是将 data 分块成 data blocks,然后假设有 executor fetch 到了一些 data blocks
       那么这个 executor 就可以被当作 data server 了,随着 fetch  executor 越来越多,
      有更多的 data server 加入,data 就很快能传播到全部的 executor 那里去
更新广播变量的基本思路:
  广播变量是在driver端初始化,excetors端获取这个变量,但是不能修改,所以,我们可以在driver端进行更新这个变量
    将老的广播变量删除(unpersist),
    然后重新广播一遍新的广播变量
共享变量--累加器
  累加器
  自定义累加器
  累加器-org.apache.spark.util.AccumulatorV2 }

5相关的一些内容

01.存储级别

MEMORY_ONLY : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中
  cache() 方法是使用默认存储级别的快捷设置方法,默认的存储级别是 StorageLevel.MEMORY_ONLY
  StorageLevel 对象给 persist() 方法
MEMORY_ONLY_SER : 将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。
  这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast serializer时会节省更多的空间,
  但是在读取时会增加 CPU 的计算负担。
MEMORY_AND_DISK : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘
DISK_ONLY

02.缓存机制:

SparkCore 的 CacheManager
 Spark 的存储级别的选择,核心问题是在内存使用率和 CPU 效率之间进行权衡-直接连续调用cache()或者persist()
   RDD 可以使用 persist() 方法或 cache() 方法进行持久化,
     persist 会把数据以序列化的形式在JVM的 堆空间
     cache()调用的persist(),是使用默认存储级别的快捷设置方法
   缓存数据较多的时候,采用LRU-最近最少使用的缓存策略将最老的分区从内存中移除
   unpersist
 Spark的 checkpoint 机制(保存在hdfs中)
    (checkpoint和cache都属于transformation 需要action才能执行
 SparkSQL 的  CacheManager
    Spark SQL使用内存式列式存储
    SparkSQL 的缓存管理最终依靠RDD的Storage.Level来缓存数据

03.故障恢复:

 01.RDD出现问题可以由它的依赖也就是Lineage信息可以用来故障恢复

 02.对RDD做Checkpoint处理,检查RDD是否被物化或计算,并将结果持久化到磁盘或HDFS。
  SparkContext      def setCheckpointDir(directory: String)
  StreamingContext  def checkpoint(directory: String)
  1. Checkpoint会把当前RDD保存到一个目录中。 
  2. Checkpoint的时候,会把所有依赖的父级rdd信息清除掉。 
  3. Checkpoint不会马上执行,要触发action操作的时候才会执行。 
  4. 因为 Checkpoint会清除父级RDD的信息,所以在Checkpoint应该先做persist(持久化)操作,否则就要重新计算一遍。 
  5. 一般来说,Lineage链较长、宽依赖的RDD需要采用检查点机制。 
区别与联系
  cache缓存数据由executor管理,当executor消失了,被cache的数据将被清除,RDD重新计算,
  而checkpoint将数据保存到磁盘或HDFS,job可以从checkpoint点继续计算
适用场景:
  01.一块是在spark core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,
    将RDD数据保存到可靠存储(如HDFS)以便数据恢复;
  02.另外一块是应用在spark streaming中,使用checkpoint用来保存DStreamGraph以及相关配置信息

DataFrame和SQL在底层的执行过程

逻辑计划和物理计划两个阶段
 逻辑计划阶段
  01.Parser 解析 SQL,生成AST,并将AST最终转化成 Unresolved Logical Plan
  02. Analyzer 结合 Catalog 信息生成 Resolved Logical Plan
  03.Optimizer根据预先定义好的规则对 Resolved Logical Plan 进行优化并生成 Optimized Logical Plan
 物理计划阶段
  04.Query Planner  Optimized Logical Plan 转换成多个Iterator[Physical Plan]
  05.CBO 根据 Cost Model 算出每个 Physical Plan 的代价并选取代价最小的 Physical Plan 作为最终的 Physical Plan
  06.Spark 对选取的物理算子树进行提交前的准备并以 DAG 的方法执行上述 Physical Plan
 转换过程在集群的Driver端进行,不涉及分布式环境
  07.在执行 DAG 的过程中,Adaptive Execution 根据运行时信息动态调整执行计划从而提高执行效率
  自适应执行引擎: 执行阶段-自动设置shuffle partition数,动态调整执行计划,动态处理数据倾斜等等。

源码:

SparkSession源码

 SparkSession.builder 来创建一个 SparkSession 的实例 spark, 并通过 stop 或者close() 函数来停止 
 org.apache.spark.sql.SparkSession
 object SparkSession {
 def newSession(): SparkSession = {new SparkSession(sparkContext, Some(sharedState), parentSessionState = None, extensions)}
 }
 SparkSession  override def close(): Unit = stop()

View源码

 org.apache.spark.sql.DataSet
 createOrReplaceTempView
     def createOrReplaceTempView(viewName: String): Unit = withPlan {
     createTempViewCommand(viewName, replace = true, global = false)
      }
 createTempView
   def createTempView(viewName: String): Unit = withPlan {
     createTempViewCommand(viewName, replace = false, global = false)
   }
    如果视图已经存在,则更新或者抛出异常
    replace if true,  and if the view already exists, updates it; 
            if false, and if the view already exists, throws analysis exception.
 createGlobalTempView
   def createGlobalTempView(viewName: String): Unit = withPlan {
     createTempViewCommand(viewName, replace = false, global = true)
   }

参考:

Spark踩坑记:共享变量 https://www.cnblogs.com/liuliliuli2017/p/6782687.html
spark中动态广播变量的使用 https://blog.csdn.net/xianpanjia4616/article/details/82914443
Spark SQL / Catalyst 内部原理 与 RBO http://www.jasongj.com/spark/rbo/

blogroll

social