spark sortShuffle

ExternalSorter:
1.可以对数据进行聚合。
2.使用分区计数器首先将key分组到各个分区中,然后使用自定义的比较器对每个分区中的key进行可选的排序;
3.将每个分区输出到单个文件的不同字节范围中,以便方便reduce的获取,文件数量减少。

image.png

AppendOnlyMap的实现分析

1 内部结构

用数组存储数据:


image.png

initialCapacity:初始容量为64
capacity:数组当前容量(kv对的个数,数组长度为该值的二倍),创建时,取初始容量的最高位,其余为0,然后左移1.例如64(1000000),capacity为10000000,128。之后扩容时都是乘以2.
mask:capacity-1,这样取值的主要原因,例如capacity为128(10000000),mask为127(01111111)。在计算插入位置时是pos=hash&mask,因为mask的后7为都为1,前面都是0,也就是取hash值的后7位数,127由是当前数组的最大索引,所以pos是在0-127之间的。这个和求余的计算方原理相似。
扩容:
1.新建一个二倍容量的数组。
2.老的数据拷贝在新的数组中,重新hash。
3.如果计算的位置相同,那么就找下一个位置(第一次加1,第二次加2,第三次加3),以此类推。
4.更新增长阈值,growThreshold = (LOAD_FACTOR * newCapacity).toInt。LOAD_FACTOR默认是0.7。意思就是元素个数大于当前容量的0.7时就进行扩容。

2 数据的插入

def update(key: K, value: V): Unit = {
    assert(!destroyed, destructionMessage)
    val k = key.asInstanceOf[AnyRef]
    if (k.eq(null)) {
      if (!haveNullValue) {
        incrementSize()
      }
      nullValue = value
      haveNullValue = true
      return
    }
    var pos = rehash(key.hashCode) & mask
    var i = 1
    while (true) {
      val curKey = data(2 * pos)
      if (curKey.eq(null)) {
        data(2 * pos) = k
        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
        incrementSize()  // Since we added a new key
        return
      } else if (k.eq(curKey) || k.equals(curKey)) {
        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
        return
      } else {
        val delta = i
        pos = (pos + delta) & mask
        i += 1
      }
    }
  }

将key对应的值更新到数组中。
1.AppendOnlyMap可以保存key为null的值,保存方式为用一个变量保存nullValue,haveNullValue表示是否存在null的key。如果存在则size+1,然后更新值。
2.根据key的hash值和mask掩码计算元素放入的位置pos。
3.如果2*pos位置为null,说明该位置没有值,那么2*pos放入key,2*pos+1位置放入value。
4.如果2*pos位置不为null,但是该位置的key和当前要插入的key相同,那么更新2*pos+1位置的值。
5.如果2*pos位置不为null,而且位置的key和当前要插入的key不同,那么pos = (pos + delta) & mask。例如,pos=2,发生冲突,那么pos+1,如果在冲突,pos+2,pos+3,......,位置就是,3,5,8,......。mask的作用其实就是相当于求余运行,因此,这里只有不超过当前最大长度,可以看作是加的操作。这里使用的是“线性探测法”解决hash冲突。

3 缓存聚合算法:

def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    assert(!destroyed, destructionMessage)
    val k = key.asInstanceOf[AnyRef]
    if (k.eq(null)) {
      if (!haveNullValue) {
        incrementSize()
      }
      nullValue = updateFunc(haveNullValue, nullValue)
      haveNullValue = true
      return nullValue
    }
    var pos = rehash(k.hashCode) & mask
    var i = 1
    while (true) {
      val curKey = data(2 * pos)
      if (curKey.eq(null)) {
        val newValue = updateFunc(false, null.asInstanceOf[V])
        data(2 * pos) = k
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        incrementSize()
        return newValue
      } else if (k.eq(curKey) || k.equals(curKey)) {
        val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        return newValue
      } else {
        val delta = i
        pos = (pos + delta) & mask
        i += 1
      }
    }
    null.asInstanceOf[V] // Never reached but needed to keep compiler happy
  }

  1. key待聚合的key
  2. updateFunc是聚合函数,有两个参数,第一参数false表示没有聚合过,true表示已经聚合过。V表示,聚合过的值。
  3. 如果是null,则对null进行聚合。
  4. 非null,计算位置pos,如果该位置为null说明k是首次聚合到数组中,所以updateFunc的参数是false和null,在该位置放入key和value。
  5. 如果pos位置不为null并且key值和当前key相同,那么进行聚合,updateFunc参数为true和当前位置的value。之后更新value为聚合后的值。
  6. 如果pos位置不为null并且key值和当前key不同,采用线性探测法向后移动。

4 排序

def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
    destroyed = true
    // Pack KV pairs into the front of the underlying array
    var keyIndex, newIndex = 0
    while (keyIndex < capacity) {
      if (data(2 * keyIndex) != null) {
        data(2 * newIndex) = data(2 * keyIndex)
        data(2 * newIndex + 1) = data(2 * keyIndex + 1)
        newIndex += 1
      }
      keyIndex += 1
    }
    assert(curSize == newIndex + (if (haveNullValue) 1 else 0))

    new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)

    new Iterator[(K, V)] {
      var i = 0
      var nullValueReady = haveNullValue
      def hasNext: Boolean = (i < newIndex || nullValueReady)
      def next(): (K, V) = {
        if (nullValueReady) {
          nullValueReady = false
          (null.asInstanceOf[K], nullValue)
        } else {
          val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
          i += 1
          item
        }
      }
    }
  }
  1. 把数据向前移动,从0开始。
    2.排序。
    3.生成迭代器。

5 SizeTrackingAppendOnlyMap

private[spark] class SizeTrackingAppendOnlyMap[K, V]
  extends AppendOnlyMap[K, V] with SizeTracker
{
  override def update(key: K, value: V): Unit = {
    super.update(key, value)
    super.afterUpdate()
  }

  override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    val newValue = super.changeValue(key, updateFunc)
    super.afterUpdate()
    newValue
  }

  override protected def growTable(): Unit = {
    super.growTable()
    resetSamples()
  }
}

主要是重写了AppendOnlyMap的update、changeValue、growTable。但是实际调用的还是AppendOnlyMap的方法,只是在结束之后加入了采样估计大小。

6 PartitionedAppendOnlyMap

private[spark] class PartitionedAppendOnlyMap[K, V]
  extends SizeTrackingAppendOnlyMap[(Int, K), V] with WritablePartitionedPairCollection[K, V] {

  def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
    : Iterator[((Int, K), V)] = {
    val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
    destructiveSortedIterator(comparator)
  }

  def insert(partition: Int, key: K, value: V): Unit = {
    update((partition, key), value)
  }
}

其中insert方法,调用的是AppendOnlyMap的update方法,输入的key是分区id和记录的key组成的元组。
partitionedDestructiveSortedIterator方法调用的是AppendOnlyMap的destructiveSortedIterator方法,比较器comparator的生成逻辑是:
(1) 如果没有指定比较器,即keyComparator里面没有值,那么用WritablePartitionedPairCollection伴生对象中的partitionComparator(相当与java的静态方法)方法,该方法的逻辑是只按照分区id排序。

def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] {
    override def compare(a: (Int, K), b: (Int, K)): Int = {
      a._1 - b._1
    }
  }

(2)如果指定了比较器,那么把指定的比较器作为参数调用WritablePartitionedPairCollection伴生对象中的partitionKeyComparator生成比较器。该比较器的逻辑是先按照分区排序,相同分区的在按照key排序。

7 PartitionedPairBuffer

存储结构是一个数组,从0开始存储数据,key,value,key,value的顺序存储。

主要方法partitionedDestructiveSortedIterator:
对数组中的数据进行排序,比较器的逻辑和AppendOnlyMap相同。即没有指定比较器,则只按照分区排序,否则按照分区和key排序。
返回迭代器,迭代器返回的值是((partitionid,key),value)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352