Spark高效数据结构

Spark Streaming在状态管理时应用了一些高效的数据结构,本文我们就来看下这些数据结构的实现。

  • BitSet
  • OpenHashSet
  • OpenHashMap

BitSet

org.apache.spark.util.collection.BitSet是一个简单,大小不可变的bit set实现。

基本思想

我们先来看一个例子:假设我们要对3,5存储到一个byte(8个字节)当中。
我们先开辟一个Byte空间,并将这些bit位全都置为0。如下图:

我们需要将3存储到这个bit数组当中,那么我们只要将下标为3的空间置为1。
记整个数为s,欲存储的数3为i1。那么我们就可以通过下列代码来实现。

var s = 0
val i1 = 3
s |= 1 << i1
println(s"${s}的二进制表达式: [ ${Integer.toBinaryString(s)} ]")

输出结果:

8的二进制表达式: [ 1000 ]

同样,我们要存储5,只要将下标5的空间置为1.

代码实现:

val i2 = 5
s |= 1 << i2
println(s"${s}的二进制表达式: [ ${Integer.toBinaryString(s)} ]")

输出结果:

40的二进制表达式: [ 101000 ]

读取过程:

我们可以通过判断某index位的bit是否为0来判定该空间是否有值。
例如,我们可以判断index = 3位是否为0来判断3是否存储在该byte值当中。

println(s & (1 << 3))
8
println(s & (1 << 4))
0

BitSet实现

有上诉的知识基础只是之后我们再来看下org.apache.spark.util.collection.BitSet的具体实现。

BitSet利用一个Long型words数组来存储数值

我们先来看下他的构造函数:

private val words = new Array[Long](bit2words(numBits))
// Long型个数
private val numWords = words.length
// 计算存储numBits数值需要多少个Long型
private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1

我们可以看到:

  • BitSet通过Long来存储正真的数据,原因是Java中,Long类型为8个字节,那么一个Long型可以存储(8 * 8 = 64)个数值。
  • bit2words方法是计算存储numBits个数值最少需要多少个Long型。
  • 所能容纳个数:((numBits - 1) / 64) + 1) * 64

再来看下其他一些重要方法的实现:

set方法

def set(index: Int) {
  val bitmask = 1L << (index & 0x3f)
  words(index >> 6) |= bitmask     
}
  • index & 0x3f等于保留index二进制的前六位。因为在Scala中,Long型为8个字节,一个字节8个bit,共64bit,而从上述知识我们可以知道,我们是利用bit下标来存储数值的,所以我们需要对index保留前六位的操作。
  • index >> 6等于index / 64。这是为了算出index在words数组中的位置。
  • words(index >> 6) |= bitmask是将index存储到words(index >> 6)所在的Long型当中。

get方法:

def get(index: Int): Boolean = {
  val bitmask = 1L << (index & 0x3f)  
  (words(index >> 6) & bitmask) != 0 
}

从上面我们就可以看到,我们可以这样理解,words是个大的二维数组,整个set过程就是将index置为1的过程,而get过程就是判断index是否为0。

iterator

  def iterator: Iterator[Int] = new Iterator[Int] {
    var ind = nextSetBit(0)
    override def hasNext: Boolean = ind >= 0
    override def next(): Int = {
      val tmp = ind
      ind = nextSetBit(ind + 1)
      tmp
    }
  }
  def nextSetBit(fromIndex: Int): Int = {
    var wordIndex = fromIndex >> 6
    if (wordIndex >= numWords) {
      return -1
    }

    val subIndex = fromIndex & 0x3f
    var word = words(wordIndex) >> subIndex
    // 如果当前word的剩余位还有值,说明下个bit在该word当中。
    if (word != 0) {
      // 下一个bit所在的下标 = wordIndex * 64 + 当前word已经移动的下标个数 + word剩余位第一个非0位下标
      return (wordIndex << 6) + subIndex + java.lang.Long.numberOfTrailingZeros(word)
    }

    // 下个bit位不和fromIndex在同一个word中的情况
    wordIndex += 1
    while (wordIndex < numWords) {
      word = words(wordIndex)
      if (word != 0) {
        // 下一个bit所在的下标 = wordIndex * 64 + word第一个非0位下标
        return (wordIndex << 6) + java.lang.Long.numberOfTrailingZeros(word)
      }
      wordIndex += 1
    }

    -1
  }

优点

  • 运算效率高。 get和set过程都采用移位运算,提高运算效率。
  • 占有的存储空间少。 一个Long型可以存储64个数值。那么如果N = 10,000,000,需要N/8 = 10,000,000/8 Byte 约等 1.25M

完整代码请访问GitHub

OpenHashSet

  • A simple, fast hash set optimized for non-null insertion-only use case, where keys are never removed.

看完BitSet实现之后,我们再来看下org.apache.spark.util.collection.OpenHashSet的实现。

  protected val hasher: Hasher[T] = 
  protected var _capacity = nextPowerOf2(initialCapacity)
  protected var _mask = _capacity - 1
  protected var _size = 0
  protected var _growThreshold = (loadFactor * _capacity).toInt
  protected var _bitset = new BitSet(_capacity)
  protected var _data: Array[T] = _
  _data = new Array[T](_capacity)
  • OpenHashSet利用一个var _bitset = new BitSet(_capacity)来存储hash值,var _data = new Array[T] (_capacity)来存储真正的数。
def add(k: T) {
  // 将元素添加到set当中,并且不触发扩容
  addWithoutResize(k)
  // 检查是否需要扩容,如果是则进行扩容
  rehashIfNeeded(k, grow, move)
}

rehash的判断条件是当前set的size是否大于阀值_growThreshold

我们再来具体看下addWithoutResize的实现。

def addWithoutResize(k: T): Int = {
  // 获取元素k的hash值。采用对k的hashCode做murmur3_32,为了获取一个k的随机分布,减少hash冲突。
  var pos = hashcode(hasher.hash(k)) & _mask
  var delta = 1
  while (true) {
    if (!_bitset.get(pos)) {
      // 当前bitset没值
      _data(pos) = k
      _bitset.set(pos)
      _size += 1
      return pos | NONEXISTENCE_MASK
    } else if (_data(pos) == k) {
      // 当前bitset位有值,而且元素与k相等,则直接返回pos
      return pos
    } else {
      // 发生hash冲突,则每次递增1进行hash冲突解决。& _mask保证0 < pos + delta < _capacity
      pos = (pos + delta) & _mask
      delta += 1
    }
  }
  throw new RuntimeException("Should never reach here.")
}

rehash

  private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
    // 扩容为原容量两倍
    val newCapacity = _capacity * 2
    allocateFunc(newCapacity)
    val newBitset = new BitSet(newCapacity)
    val newData = new Array[T](newCapacity)
    val newMask = newCapacity - 1

    var oldPos = 0
    while (oldPos < capacity) {
      if (_bitset.get(oldPos)) {
        val key = _data(oldPos)
        var newPos = hashcode(hasher.hash(key)) & newMask
        var i = 1
        var keepGoing = true
        while (keepGoing) {
          if (!newBitset.get(newPos)) {
            // 将key插入新的位置
            newData(newPos) = key
            newBitset.set(newPos)
            moveFunc(oldPos, newPos)
            keepGoing = false
          } else { // 无需检查key的相等性,无重复数据。
            // 哈希冲突解决
            val delta = i
            newPos = (newPos + delta) & newMask
            i += 1
          }
        }
      }
      oldPos += 1
    }
  • rehash的是将原来的容量扩大为两倍,并将bitset中的hash值和_data中的真正的元素都插入到新的newBitset和newData当中。
  • 提供了两个回调函数allocateFunc(newSize: Int)moveFunc(oldPos: Int, newPos: Int)。OpenHashSet的默认回调函数实现为空,即什么都不操作。

NOET: 完整代码访问GitHub

OpenHashMap

protected var _keySet = new OpenHashSet[K](initialCapacity)
private var _values: Array[V] = _
// 临时数组,用于扩容时_values的拷贝
private var _oldValues: Array[V] = null
  • var _keySet: OpenHashSet[K]存储key,var _value: Array[V]存储value

updata

  def update(k: K, v: V) {
    if (k == null) {
      haveNullValue = true
      nullValue = v
    } else {
      val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
      _values(pos) = v
      _keySet.rehashIfNeeded(k, grow, move)
      _oldValues = null
    }
  }
  protected var grow = (newCapacity: Int) => {
    _oldValues = _values
    _values = new Array[V](newCapacity)
  }

  protected var move = (oldPos: Int, newPos: Int) => {
    _values(newPos) = _oldValues(oldPos)
  }

NOET: 完整代码访问GitHub

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容