Published: 2019-03-24 20:50:00
By ytwan
In Big Data .
tags: things
Spark:
Spark有 Driver 和 Executor
大多是 Driver 和 Executor 之间的图,都不涉及物理机器
Spark中的Driver即运行Application的main()函数,并且创建SparkContext
##Client:
在Client模式下,Driver进程会在当前客户端启动,客户端进程一直存在直到应用程序运行结束
01.先有driver再用AM,此时driver负责RDD生成、task生成和分发,向AM申请资源等 ,AM负责向RM - ResourceManager 申请资源,
其他的都由driver来完成
##Cluster:
Cient将用户程序提交到到spark集群中就与spark集群断开联系了,
此时client将不会发挥其他任何作用,仅仅负责提交
Executor
Job--Stages-Task
根据表的大小决定并发数(DAG节点分裂个数)
01考虑的方面:
集群粒度的调优,包括CPU与内存分配,数据分布,shuffle等
代码层面的优化
02.checklist
1. CPU,内存资源分配问题
2. 数据本地性
3. 数据shuffle相关
4. 数据格式,cache level,序列化,压缩等问题
5. 计算并行度,straggler排查
03.性能分析与优化方式
性能分析
分析工具与参数
较长的 job/ stage 内部
03.资源调优
核心参数
driver-memory num-executors executor-memory executor-cores
spark.default.parallelizm spark.storage.memoryFraction spark.shuffle.memoryFraction
--master yarn --deploy-mode client
--master yarn --deploy-mode cluster
在Spark中,有Yarn-Client和Yarn-Cluster两种模式可以运行在Yarn上,通常Yarn-Cluster适用于生产环境,而Yarn-Clientr更适用于交互,调试模式
一般yarn-client用于测试环境调试程序;yarn-cluster用于生产环境
yarn-cluster是基于yarn集群,yarn集群上有ResourceManager(RM)和NodeManager(NM)
yarn-client 执行时可以在本地看到所有的log,便于调试。所以一般用于测试环境
yarn-cluster 只能通过yarn application-logs application id命令来查看
内存参数
--driver-cores NUM Driver的核数,默认是1。在yarn集群模式下使用
--driver-memory MEM Driver内存,默认1G
--num-executors NUM 启动的executor数量。默认为2。在yarn下使用
--executor-memory MEM 每个executor的内存,默认是1G
--executor-cores NUM 每个executor的核数。在yarn或者standalone下使用
网络设置
spark.core.connection.ack.wait.timeout
spark.akka.timeout
spark.storage.blockManagerSlaveTimeoutMs
spark.shuffle.io.connectionTimeout
spark.rpc.askTimeout or spark.rpc.lookupTimeout
并行度:
200是Spark sql的默认并行度-- spark.sql.shuffle.partitions
开发调优篇
原则一:避免创建重复的RDD
原则二:尽可能复用同一个RDD
原则三:对多次使用的RDD进行持久化
Spark通过persist()或cache()方法可以标记一个要被持久化的RDD
cache()方法使用了默认的存储级别—StorageLevel.MEMORY_ONLY
选择一种最合适的持久化策略
可以利用不同的存储级别存储每一个被持久化的RDD。
例如,它允许我们持久化集合到磁盘上、
将集合作为序列化的Java对象持久化到内存中、
在节点间复制集合或者存储集合到Tachyon中
原则四:尽量避免使用shuffle类算子
去重-排序-聚合-联结-集合运算-重分区
distinct sortBy sortBykey
reduceByKey aggregateByKey CombineByKey
groupBy groupByKey join intersection substract substractByKey
coalesce repartition
原则五:使用map-side预聚合的shuffle操作 -- 使用reduceByKey/aggregateByKey替代groupByKey
因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。
而groupByKey算子是不会进行预聚合的
原则六:广播大变量
配置调优-资源调度
驱动器
执行器
存储级别
设置RDD持久化数据在Executor内存中能占的比例,默认是0.6
shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个 节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作
产生的原因:
key的分布不均匀或者说某些key太集中。
数据倾斜:
某个key对应的数据量特别大的话,就会发生数据倾斜
聚合类的shuffle操作
join类的shuffle操作
倾斜:
时间上spark中一个stage的执行时间受限于最后那个执行完的task
空间上: 过多的数据在同一个task中执行,将会把executor撑爆,造成OOM,程序终止运行
解决:
01.Stage
当前这个stage各个task分配的数据量
数据倾斜发生在哪一个stage之后,
接着我们就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部
DAGScheduler会把DAG划分相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖
02.并行度:
stage是由一组并行的task组成
conf.set("spark.default.parallelism","4")//设置默认的并行度
spark.files.maxPartitionBytes = 128 M(默认
dataFrame和sparkSql可以设置spark.sql.shuffle.partitions参数控制shuffle的并发度,默认为200。
RDD操作可以设置spark.default.parallelism控制并发度
该参数用于设置每个stage的默认task数量
shuffle并行度
局限性: 只是让每个task执行更少的不同的key。无法解决个别key特别大的情况造成的倾斜,
如果某些key的大小非常大,即使一个task单独执行它,也会受到数据倾斜的困扰。
03.联结导致的
002. 使用map join 代替reduce join
01.reduce-side-join 的缺陷在于会将key相同的数据发送到同一个partition中进行运算
02.spark.sql.autoBroadcastJoinThreshold
04.对数据进行预处理
05.两阶段聚合(局部聚合+全局聚合)
第一次是局部聚合,先给每个key都打上限定范围的一个随机数,比如10以内的随机数,
此时原先一样的key就变成不一样的了 --接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合
局部聚合结果-- 然后将各个key的前缀给去掉,再次进行全局聚合操作
导致倾斜的
如果导致倾斜的key特别多的话
如果导致倾斜的key就只有几个
Spark SQL
Hive On Spark和SparkSQL都是一个翻译层,
把一个SQL翻译成分布式可执行的Spark程序。而且大家的引擎都是spark
两种方式使用SparkSQL,Spark SQL is a Spark module for structured data processing.
一种是直接写sql语句,这个需要有元数据库支持,例如Hive等
另一种是通过Dataset/DataFrame编写Spark应用程序
using
either SQL or a familiar DataFrame API.
In the Scala API, DataFrame is simply a type alias of Dataset[Row].
While, in Java API, users need to use Dataset<Row> to represent a DataFrame.
Spark SQL的运行机制--
数据库的常见模型中:表一般分为两种:事实表和维度表
加载SessionCatalog步骤
Spark SQL对SQL
SQL的语法解析器、分析器以及优化器
Catalyst 执行优化器-- Spark SQL 语句最终都能通过它来解析、优化-- catalyst 模块
Spark SQL会先将SQL语句解析成一棵树,然后使用规则(Rule)对Tree进行绑定、优化等处理过程
TreeNode Rule & RuleExecutor
分析器Analyzer绑定逻辑计划
分析阶段,分析逻辑树,解决引用
逻辑优化阶段
物理计划阶段,Catalyst 会生成多个计划,并基于成本进行对比
输入SQL语句到生成Dataset --使用execute()执行可执行物理计划--调用SparkPlan的execute执行计算RDD
Spark SQL由Core、Catalyst、Hive、Hive-ThriftServer四部分构成:
Core:负责处理数据的输入/输出,从不同的数据源获取数据(如RDD、Parquet文件),然后将查询结果输出成DataFrame
Catalyst:负责处理查询语句的整个过程,包括解析、绑定、优化、物理计划等
Hive on Spark
Hive使用SparkSQL引擎:Hive on Spark
优化的方面
SparkSQL并行度是SparkSQL的一个调优点,默认的并行度是200,
需要根据实际情况进行设置
示例:
SparkSession.builder()
.config("spark.sql.shuffle.partitions",100)//设置并行度100
3.调用hiveql解析器来解析SQL
01. 经过complier编译器 -- 逻辑方案(logical plan)
Parse 解析(anlr解析其生成语法树AST
Semantic Analyzer 将抽象语法树AST转换此查询块(query block tree)
Logic Plan Generator 将查询块query block tree转换成逻辑查询计划(logic plan Generator)
02.优化(optimization):
重写查询计划(logical optimizer)-
Rule-Based Optimization(基于规则的优化方式RBO)/Cost Based(基于代价CBO)的优化方法
CBO: 收集表分区等统计信息来计算每种执行方式的Cost
RBO:
HBO: 基于历史
03.Physical Plan Gernerator 将逻辑查询计划转成物理计划(physical plan generator)
04.Physical Optimizer
05.execution engine 执行引擎--执行逻辑计算