Spark分区进一步探究

1.基本概念:

rdd.partitioner
rdd.partitions.length
分区的策略以及分区的数目
1.分区:是RDD内部并行计算的一个计算单元,RDD的数据集在逻辑上被划分为多个分片,每一个分片称为分区。并行任务task的个数,也是由 RDD分区partition的个数决定的
stage 的划分是根据宽依赖
2.分区器- partitioner:分区策略- Spark内部提供了HashPartitioner和RangePartitioner两种分区策,
Spark中分区器直接决定了RDD中分区 numPartitions的个数;也决定了RDD中每条数据经过Shuffle过程属于哪个分区;
也决定了Reduce的个数。RDD的分区方式主要包含两种(HashPartitioner和RangePartitioner),
这两种分区类型都是针对Key-Value类型的数据。如是非Key-Value类型,则分区为None。
Hash是以key作为分区条件的散列分布,分区数据不连续,极端情况也可能散列到少数几个分区上,导致数据不均等;
Range按Key的排序平衡分布,分区内数据连续,大小也相对均等。
分区器的使用:
  01.没有分区器
  02.自动为RDD生成分区器
  03.主动使用分区器
  04.自定义分区器:自定义分区策略

01.没有分区器

一般而言,对于初始读入的数据是不具有任何的数据分区方式的
对RDD进行查询,println(pairsRDD.partitioner) 返回None值,即没有使用分区器

02.自动使用分区器

没有显式指定分区器,则会调用org.apache.spark包下伴生对象Partitioner的defaultPartitioner静态方法返回的分区器作为默认分区器。
尝试利用父RDD的partitioner,如果父RDD没有partitioner,则会查看sparkConf中是否定义了 spark.default.parallelism 配置参数,
如果定义了就返回new HashPartitioner(rdd.context.defaultParallelism) 作为默认分区器,
如果没定义就返回new HashPartitioner(rdds.map(_.partitions.length).max)作为默认分区器

03.主动使用分区器

  提前对key根据某种规则来分配到相同的分区,减少后续操作的网络传输=RDD的分区方式主要包含两种(HashPartitioner和RangePartitioner),这两种分区类型都是针对Key-Value类型的数据。如是非Key-Value类型,则分区为None
使用场景:partitionBy
  rdd2.partitionBy(new org.apache.spark.HashPartitioner(10))
  partitionBy-partitionBy 只能用于 PairRdd
使用场景:reduceByKey
  执行reduceByKey任务,我们可以显式的指定分区器
使用场景
  repartitionAndSortWithinPartitions
 使用说明:
    HashPartitioner takes a single argument which defines number of partitions -- numPartitions 
 使用的分区器有两种HashPartitioner 和 RangePartitioner
HashPartitioner
* HashPartitioner 最大的弊端是:数据倾斜!!!极端情况下某(几)个分区拥有RDD的所有数据。 同时HashPartitioner存在 分区碰撞问题,即不同的值可能计算出来的分区是一样
实现原理: 对于给定的key,计算其hashCode,并除于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID。实现原理可以看出,其结果可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据
RangePartitioner
*RangePartitioner 最为重要的是尽量保证每个Partition中的数据量是均匀的。 同时分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;
但是分区内的元素是不能保证顺序的即RangePartitioner主要是依赖的RDD的数据划分成不同的分区,且不同的分区之间是有序的。
应用:
// 元组的第一个元素是当前处理的分区的index,元组的第二个元素是当前处理的分区元素组成的Iterator
val filteRDD = shuffRDD.mapPartitionsWithIndex {
  (partid:Int, iter:Iterator[(String,String)]) => {
    iter.map(partValue => (partid ,partValue ))
  }
}//.filter(line => line._1 >= 2)
spark的mapPartitionsWithIndex中iterator尽量不要使用toList,
 原因iterator是流式的处理,处理完一条记录才会去读取下一条记录并且会丢弃已读的记录,无法重复使用;
 而iterator.toList会将所有的记录进行缓存,便于重复使用-toList相当于将迭代数据进行了缓存,容易导致OutOfMemory的异常
 scala中iterator只能执行一次迭代,如果需要多次执行同一个迭代体,建议调用iterator.toList等方法,将迭代体转化为集合
  val filteRDD = shuffRDD.mapPartitionsWithIndex {
  (partid:Int, iter:Iterator[(String,String)]) => {
    iter.toList.map(partValue => (partid ,partValue )).iterator
  }
 }.filter(line => line._1> 2)

方式
var rdd2 = rdd1.mapPartitionsWithIndex{
    (x,iter) => {
      var result = List[String]()
        var i = 0
        while(iter.hasNext){
          i += iter.next()
        }
        result.::(x + "|" + i).iterator

    }
  }
 注释:
  迭代器(Iterator)不是一个集合,是构建了一种访问集合的方法。
  当构建一个集合需要很大的开销时(比如把文件得所有行都读取到内存),迭代器就发挥了很好的作用。
迭代器有两个操作,next 和hasNext。next返回迭代器的下一个元素,hasNext用于检查是否还有下一个元素。
 注意: range分区的数目与 pairRDD中key值得去重数目,如果大于的话,分区数目不一定是range的给的数目。
 sortByKey底层使用的数据分区器就是RangePartitioner分区器

04.自定义分区

自定义Partitioner也是解决数据倾斜问题的手段之一,分区数在shuffle操作会变化
Partitioner必须满足两个前提,1、RDD是k-v形式,如RDD[(K, V)],2、有shuffle操作。
自定义分区需要继承Partitioner,并实现三个方法
    numPartitions
    getPartition  
    equals

2.分区相关操作:

01.配置中的并行度:

spark.sql.shuffle.partitions 和 spark.default.parallelism
spark.default.parallelism   在处理RDD时 
spark.sql.shuffle.partitions 则是对Spark SQL的设置

02.一些API比较

一些常用的函数和方法
getPartition  获取分区
foreachPartition
mapPartitions
mapPartitionsWithIndex
mapPartitionsWithIndexInternal
zipPartitions
001.Map和 MapPartitions比较    
 map操作通常不会导致内存的OOM异常。
 但是MapPartitions操作,对于大量数据来说,比如甚至一个partition,100万数据,一次传入一个function以后,那么可能一下子内存不够,但是又没有办法去腾出内存空间来,可能就OOM,内存溢出。
002.foreachPartition 与mapPartitions 以及 mapPartitionsWithIndex()的比较
  01.从算子的类型:
    foreachPartition 属于action运算操作,而 
    mapPartitions 是在Transformation中
  02.返回值上:
    foreachPartition 没有返回值
    mapPartitions    返回值类型迭代器: Iterator[U]  该元素的迭代器
  03.应用场景上:
     mapPartitions   可以获取返回值,继续在返回RDD上做其他的操作
     foreachPartition 因为没有返回值并且是action操作,
     所以使用foreachPartition一般都是在程序末尾比如说要落地数据到存储系统中如mysql,es,clcikhouse或者hbase中
003.mapPartitionsWithIndex和mapPartitions
  mapPartitionsWithIndex 和 mapPartitions 类似,只是其参数多了个分区索引号
  mapPartitionsWithIndex 既可以拿到分区的迭代器,又可以拿到分区编号
  mapPartitionsWithContext
分区的其他操作:
def zipWithUniqueId(): RDD[(T, Long)]
   该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:
   每个分区中第一个元素的唯一ID值为:该分区索引号,
   每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)
zipPartitions
  zipPartitions函数将多个RDD按照partition组合成为新的RDD,
  该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。 
def zipWithIndex(): RDD[(T, Long)]
   该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。
 mapPartitionsWithIndex
 宽依赖(发生shuffle)和窄依赖(不发生shuffle)

03.重分区repartition 和coalesce

repartition对数据进行重新分区,默认是使用 HashPartitioner
      repartition 使用了一个随机生成的数来当做 Key,而不是使用原来的 Key
repartition只是coalesce接口中shuffle为true的简易实现
 repartition(numPartitions:Int):RDD[T]
 coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]
coalesce
 减少分区的时候,使用coalesce(),coalesce结合现有分区以避免完全洗牌。你将1000个分区转换成100个分区,这个过程不会发生shuffle,
 增加分区比如Rdd的分区是100,设置成1000,如果shuffle为false,并不会起作用。这时候就需要设置shuffle为true
repartition方法可用于增加或减少DataFrame中的分区数

过程

Map阶段  Shuffle阶段  Reduce阶段
  Shuffle阶段: 在Spark Shuffle阶段中,共分为Shuffle Write阶段和Shuffle Read阶段
     Shuffle Write阶段中,Shuffle 对Map Task数据进行处理产生中间数据,然后再根据数据分区方式对中间数据进行分区。
  最终Shffle Read阶段中的 Shuffle Read Task会拉取Shuffle Write阶段中产生的并已经分好区的中间数据

3.源码

01.分区器相关的源码位置

Spark\core\src\main\scala\org\apache\spark
trait Partition extends Serializable
abstract class Partitioner extends Serializable {
   def numPartitions: Int
   def getPartition(key: Any): Int
   }
object Partitioner
     defaultPartitioner
class HashPartitioner(partitions: Int) extends Partitioner
class RangePartitioner[K : Ordering : ClassTag, V]
private[spark] object RangePartitioner 
  Reservoir sampling(水塘抽样)

02.Repartition和coalesce源码位置

repartition()
位置:org.apache.spark.rdd
 def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
      coalesce(numPartitions, shuffle = true)
  }
 coalesce()
def coalesce(numPartitions: Int, shuffle: Boolean = false,
           partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
          (implicit ord: Ordering[T] = null)
  : RDD[T]
      new HashPartitioner(numPartitions)),
repartition对数据进行重新分区,默认是使用 HashPartitioner
repartition 使用了一个随机生成的数来当做 Key,而不是使用原来的 Key

03.DataSet 分区源码位置

.config("spark.default.parallelism", 20)
使用hash 分区
The resulting Dataset is hash partitioned.This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
org.apache.spark.sql.Dataset.scala  
 def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
    Repartition(numPartitions, shuffle = true, logicalPlan)
    }
def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
  RepartitionByExpression(
  partitionExprs.map(_.expr), logicalPlan, sparkSession.sessionState.conf.numShufflePartitions)
  }

 def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
   Repartition(numPartitions, shuffle = false, logicalPlan)
  }

org.apache.spark.sql.catalyst.plans.logical.basicLogicalOperators.scala
case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
extends RepartitionOperation {
   require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
  }
abstract class RepartitionOperation extends UnaryNode {
 def shuffle: Boolean
 def numPartitions: Int
 override def output: Seq[Attribute] = child.output
}

04.rangePartitioner

Google的面试题:如何在一个不确定数据规模的范围内进行排序? (rangePartitioner的水塘抽样算法)
总数不知道的情况下如何等概率地从中抽取一行
水塘抽样算法(Reservoir Sampling)
在不知道文件总行数的情况下,如何从文件中随机的抽取一行?
org.apache.spark.Partitioner.scala
  RangePartitioner.sketch
org.apache.spark.util.random.SamplingUtils.scala
  SamplingUtils.reservoirSampleAndCount
private var rangeBounds: Array[K] = {
   val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
   }
*源码中SampleSize 乘以3的目的是保证在数据量特别小的分区能够抽取到足够的数据,同时保证数据量特别大的分区能够二次采样。-------------------- 水塘抽样,它是一系列的随机算法
新建一个k大小的数组reservoir,如果元数据中数据少于k,直接返回原数据数组和原数据个数。
如果大于,则对接下来的元素进行比较,随机生成一个数i,
如果这个数小于k,则替换数组reservoir中第i个数,直至没有元素,则返回reservoir的copy数组。确定每个partition中要抽取的样本数量
依赖于数组变量rangeBounds: Array[K] ,数组存放的是排序好的(K类型进行的排序)一序列K值,根据这些值来确定RDD中每一个元素shuffle后的存放的partition

4.参考

Fast Filtering with Spark PartitionFilters and PushedFilters
https://www.mungingdata.com/apache-spark/partition-filters-pushed-filters
Spark分区方式详解  https://blog.csdn.net/dmy1115143060/article/details/82620715
https://stackoverflow.com/questions/41474175/spark-mappartitionswithindex-handling-empty-partitions
https://stackoverflow.com/questions/44501995/spark-mappartitionswithindex-identify-a-partition
Spark之深入理解RDD结构  https://blog.csdn.net/u011094454/article/details/78992293

blogroll

social