ExternalSorter:
1.可以对数据进行聚合。
2.使用分区计数器首先将key分组到各个分区中,然后使用自定义的比较器对每个分区中的key进行可选的排序;
3.将每个分区输出到单个文件的不同字节范围中,以便方便reduce的获取,文件数量减少。
AppendOnlyMap的实现分析
1 内部结构
用数组存储数据:
initialCapacity:初始容量为64
capacity:数组当前容量(kv对的个数,数组长度为该值的二倍),创建时,取初始容量的最高位,其余为0,然后左移1.例如64(1000000),capacity为10000000,128。之后扩容时都是乘以2.
mask:capacity-1,这样取值的主要原因,例如capacity为128(10000000),mask为127(01111111)。在计算插入位置时是pos=hash&mask,因为mask的后7为都为1,前面都是0,也就是取hash值的后7位数,127由是当前数组的最大索引,所以pos是在0-127之间的。这个和求余的计算方原理相似。
扩容:
1.新建一个二倍容量的数组。
2.老的数据拷贝在新的数组中,重新hash。
3.如果计算的位置相同,那么就找下一个位置(第一次加1,第二次加2,第三次加3),以此类推。
4.更新增长阈值,growThreshold = (LOAD_FACTOR * newCapacity).toInt。LOAD_FACTOR默认是0.7。意思就是元素个数大于当前容量的0.7时就进行扩容。
2 数据的插入
def update(key: K, value: V): Unit = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
incrementSize()
}
nullValue = value
haveNullValue = true
return
}
var pos = rehash(key.hashCode) & mask
var i = 1
while (true) {
val curKey = data(2 * pos)
if (curKey.eq(null)) {
data(2 * pos) = k
data(2 * pos + 1) = value.asInstanceOf[AnyRef]
incrementSize() // Since we added a new key
return
} else if (k.eq(curKey) || k.equals(curKey)) {
data(2 * pos + 1) = value.asInstanceOf[AnyRef]
return
} else {
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
}
将key对应的值更新到数组中。
1.AppendOnlyMap可以保存key为null的值,保存方式为用一个变量保存nullValue,haveNullValue表示是否存在null的key。如果存在则size+1,然后更新值。
2.根据key的hash值和mask掩码计算元素放入的位置pos。
3.如果2*pos位置为null,说明该位置没有值,那么2*pos放入key,2*pos+1位置放入value。
4.如果2*pos位置不为null,但是该位置的key和当前要插入的key相同,那么更新2*pos+1位置的值。
5.如果2*pos位置不为null,而且位置的key和当前要插入的key不同,那么pos = (pos + delta) & mask。例如,pos=2,发生冲突,那么pos+1,如果在冲突,pos+2,pos+3,......,位置就是,3,5,8,......。mask的作用其实就是相当于求余运行,因此,这里只有不超过当前最大长度,可以看作是加的操作。这里使用的是“线性探测法”解决hash冲突。
3 缓存聚合算法:
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
incrementSize()
}
nullValue = updateFunc(haveNullValue, nullValue)
haveNullValue = true
return nullValue
}
var pos = rehash(k.hashCode) & mask
var i = 1
while (true) {
val curKey = data(2 * pos)
if (curKey.eq(null)) {
val newValue = updateFunc(false, null.asInstanceOf[V])
data(2 * pos) = k
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
incrementSize()
return newValue
} else if (k.eq(curKey) || k.equals(curKey)) {
val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
return newValue
} else {
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
null.asInstanceOf[V] // Never reached but needed to keep compiler happy
}
- key待聚合的key
- updateFunc是聚合函数,有两个参数,第一参数false表示没有聚合过,true表示已经聚合过。V表示,聚合过的值。
- 如果是null,则对null进行聚合。
- 非null,计算位置pos,如果该位置为null说明k是首次聚合到数组中,所以updateFunc的参数是false和null,在该位置放入key和value。
- 如果pos位置不为null并且key值和当前key相同,那么进行聚合,updateFunc参数为true和当前位置的value。之后更新value为聚合后的值。
- 如果pos位置不为null并且key值和当前key不同,采用线性探测法向后移动。
4 排序
def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
destroyed = true
// Pack KV pairs into the front of the underlying array
var keyIndex, newIndex = 0
while (keyIndex < capacity) {
if (data(2 * keyIndex) != null) {
data(2 * newIndex) = data(2 * keyIndex)
data(2 * newIndex + 1) = data(2 * keyIndex + 1)
newIndex += 1
}
keyIndex += 1
}
assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)
new Iterator[(K, V)] {
var i = 0
var nullValueReady = haveNullValue
def hasNext: Boolean = (i < newIndex || nullValueReady)
def next(): (K, V) = {
if (nullValueReady) {
nullValueReady = false
(null.asInstanceOf[K], nullValue)
} else {
val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
i += 1
item
}
}
}
}
- 把数据向前移动,从0开始。
2.排序。
3.生成迭代器。
5 SizeTrackingAppendOnlyMap
private[spark] class SizeTrackingAppendOnlyMap[K, V]
extends AppendOnlyMap[K, V] with SizeTracker
{
override def update(key: K, value: V): Unit = {
super.update(key, value)
super.afterUpdate()
}
override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
val newValue = super.changeValue(key, updateFunc)
super.afterUpdate()
newValue
}
override protected def growTable(): Unit = {
super.growTable()
resetSamples()
}
}
主要是重写了AppendOnlyMap的update、changeValue、growTable。但是实际调用的还是AppendOnlyMap的方法,只是在结束之后加入了采样估计大小。
6 PartitionedAppendOnlyMap
private[spark] class PartitionedAppendOnlyMap[K, V]
extends SizeTrackingAppendOnlyMap[(Int, K), V] with WritablePartitionedPairCollection[K, V] {
def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
: Iterator[((Int, K), V)] = {
val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
destructiveSortedIterator(comparator)
}
def insert(partition: Int, key: K, value: V): Unit = {
update((partition, key), value)
}
}
其中insert方法,调用的是AppendOnlyMap的update方法,输入的key是分区id和记录的key组成的元组。
partitionedDestructiveSortedIterator方法调用的是AppendOnlyMap的destructiveSortedIterator方法,比较器comparator的生成逻辑是:
(1) 如果没有指定比较器,即keyComparator里面没有值,那么用WritablePartitionedPairCollection伴生对象中的partitionComparator(相当与java的静态方法)方法,该方法的逻辑是只按照分区id排序。
def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] {
override def compare(a: (Int, K), b: (Int, K)): Int = {
a._1 - b._1
}
}
(2)如果指定了比较器,那么把指定的比较器作为参数调用WritablePartitionedPairCollection伴生对象中的partitionKeyComparator生成比较器。该比较器的逻辑是先按照分区排序,相同分区的在按照key排序。
7 PartitionedPairBuffer
存储结构是一个数组,从0开始存储数据,key,value,key,value的顺序存储。
主要方法partitionedDestructiveSortedIterator:
对数组中的数据进行排序,比较器的逻辑和AppendOnlyMap相同。即没有指定比较器,则只按照分区排序,否则按照分区和key排序。
返回迭代器,迭代器返回的值是((partitionid,key),value)