深度解密Go语言之map(1)

map 的底层如何实现

首先声明我用的 Go 版本:
go version go1.9.2 darwin/amd64
前面说了 map 实现的几种方案,Go 语言采用的是哈希查找表,并且使用链表解决哈希冲突。

接下来我们要探索 map 的核心原理,一窥它的内部结构。

1.map 内存模型

在源码中,表示 map 的结构体是 hmap,它是 hashmap 的“缩写”:

// A header for a Go map.
type hmap struct{
    // 元素个数,调用 len(map) 时,直接返回此值
    count     int
    flags     uint8
    // buckets 的对数 log_2
    B         uint8
    // overflow 的 bucket 近似数
    noverflow uint16
    // 计算 key 的哈希的时候会传入哈希函数
    hash0     uint32
    // 指向 buckets 数组,大小为 2^B
    // 如果元素个数为0,就为 nil
    buckets    unsafe.Pointer
    // 扩容的时候,buckets 长度会是 oldbuckets 的两倍
    oldbuckets unsafe.Pointer
    // 指示扩容进度,小于此地址的 buckets 迁移完成
    nevacuate  uintptr
    extra *mapextra 
// optional fields
}

说明一下, B 是 buckets 数组的长度的对数,也就是说 buckets 数组的长度就是 2^B。bucket 里面存储了 key 和 value,后面会再讲。

buckets 是一个指针,最终它指向的是一个结构体:

type bmap struct {
    tophash [bucketCnt]uint8
}

但这只是表面(src/runtime/hashmap.go)的结构,编译期间会给它加料,动态地创建一个新的结构:

type bmap struct{
    topbits  [8]uint8
    keys     [8]keytype
    values   [8]valuetype
    pad      uintptr
    overflow uintptr
}

bmap 就是我们常说的“桶”,桶里面会最多装 8 个 key,这些 key 之所以会落入同一个桶,是因为它们经过哈希计算后,哈希结果是“一类”的。在桶内,又会根据 key 计算出来的 hash 值的高 8 位来决定 key 到底落入桶内的哪个位置(一个桶内最多有8个位置)。

来一个整体的图:

当 map 的 key 和 value 都不是指针,并且 size 都小于 128 字节的情况下,会把 bmap 标记为不含指针,这样可以避免 gc 时扫描整个 hmap。但是,我们看 bmap 其实有一个 overflow 的字段,是指针类型的,破坏了 bmap 不含指针的设想,这时会把 overflow 移动到 extra 字段来。

type mapextra struct{
    // overflow[0] contains overflow buckets for hmap.buckets.
    // overflow[1] contains overflow buckets for hmap.oldbuckets.
    overflow [2]*[]*bmap
    // nextOverflow 包含空闲的 overflow bucket,这是预分配的 bucket
    nextOverflow *bmap
}

bmap 是存放 k-v 的地方,我们把视角拉近,仔细看 bmap 的内部组成。



上图就是 bucket 的内存模型, HOBHash 指的就是 top hash。注意到 key 和 value 是各自放在一起的,并不是 key/value/key/value/... 这样的形式。源码里说明这样的好处是在某些情况下可以省略掉 padding 字段,节省内存空间。

例如,有这样一个类型的 map:
map[int64]int8
如果按照 key/value/key/value/... 这样的模式存储,那在每一个 key/value 对之后都要额外 padding 7 个字节;而将所有的 key,value 分别绑定到一起,这种形式 key/key/.../value/value/...,则只需要在最后添加 padding。
每个 bucket 设计成最多只能放 8 个 key-value 对,如果有第 9 个 key-value 落入当前的 bucket,那就需要再构建一个 bucket ,通过 overflow 指针连接起来。

创建 map

从语法层面上来说,创建 map 很简单:

ageMp := make(map[string]int)
// 指定 map 长度
ageMp := make(map[string]int, 8)
// ageMp 为 nil,不能向其添加元素,会直接panic
var ageMp map[string]int

通过汇编语言可以看到,实际上底层调用的是 makemap 函数,主要做的工作就是初始化 hmap 结构体的各种字段,例如计算 B 的大小,设置哈希种子 hash0 等等。
func makemap(t *maptype, hint int64, h *hmap, bucket unsafe.Pointer) *hmap
注意,这个函数返回的结果:*hmap,它是一个指针,而我们之前讲过的 makeslice 函数返回的是 Slice结构体:
func makeslice(et *_type, len, cap int) slice
回顾一下 slice 的结构体定义:

// runtime/slice.go
type slice struct{
    array unsafe.Pointer// 元素指针
    len   int// 长度
    cap   int// 容量
}

结构体内部包含底层的数据指针。
makemap 和 makeslice 的区别,带来一个不同点:当 map 和 slice 作为函数参数时,在函数参数内部对 map 的操作会影响 map 自身;而对 slice 却不会(之前讲 slice 的文章里有讲过)。

主要原因:一个是指针(*hmap),一个是结构体( slice)。Go 语言中的函数传参都是值传递,在函数内部,参数会被 copy 到本地。*hmap指针 copy 完之后,仍然指向同一个 map,因此函数内部对 map 的操作会影响实参。而 slice 被 copy 后,会成为一个新的 slice,对它进行的操作不会影响到实参。

哈希函数

map 的一个关键点在于,哈希函数的选择。在程序启动时,会检测 cpu 是否支持 aes,如果支持,则使用 aes hash,否则使用 memhash。这是在函数 alginit() 中完成,位于路径:src/runtime/alg.go 下。

hash 函数,有加密型和非加密型。加密型的一般用于加密数据、数字摘要等,典型代表就是 md5、sha1、sha256、aes256 这种;非加密型的一般就是查找。在 map 的应用场景中,用的是查找。选择 hash 函数主要考察的是两点:性能、碰撞概率。

之前我们讲过,表示类型的结构体:

type _type struct{
    size       uintptr
    ptrdata    uintptr 
// size of memory prefix holding all pointers
    hash       uint32
    tflag      tflag
    align      uint8
    fieldalign uint8
    kind       uint8
    alg        *typeAlg
    gcdata    *byte
    str       nameOff
    ptrToThis typeOff
}

其中 alg 字段就和哈希相关,它是指向如下结构体的指针:

// src/runtime/alg.go
type typeAlg struct{
    // (ptr to object, seed) -> hash
    hash func(unsafe.Pointer, uintptr) uintptr    
    // (ptr to object A, ptr to object B) -> ==?
    equal func(unsafe.Pointer,unsafe.Pointer) bool
}

typeAlg 包含两个函数,hash 函数计算类型的哈希值,而 equal 函数则计算两个类型是否“哈希相等”。
对于 string 类型,它的 hash、equal 函数如下:

func strhash(a unsafe.Pointer, h uintptr) uintptr {
    x := (*stringStruct)(a)
    return memhash(x.str, h, uintptr(x.len))
}
func strequal(p, q unsafe.Pointer)bool{
    return *(*string)(p)==*(*string)(q)
}

根据 key 的类型,_type 结构体的 alg 字段会被设置对应类型的 hash 和 equal 函数。

key 定位过程

key 经过哈希计算后得到哈希值,共 64 个 bit 位(64位机,32位机就不讨论了,现在主流都是64位机),计算它到底要落在哪个桶时,只会用到最后 B 个 bit 位。还记得前面提到过的 B 吗?如果 B = 5,那么桶的数量,也就是 buckets 数组的长度是 2^5 = 32。
例如,现在有一个 key 经过哈希函数计算后,得到的哈希结果是:
10010111|000011110110110010001111001010100010010110010101010│01010
用最后的 5 个 bit 位,也就是 01010,值为 10,也就是 10 号桶。这个操作实际上就是取余操作,但是取余开销太大,所以代码实现上用的位操作代替。

再用哈希值的高 8 位,找到此 key 在 bucket 中的位置,这是在寻找已有的 key。最开始桶内还没有 key,新加入的 key 会找到第一个空位,放入。

buckets 编号就是桶编号,当两个不同的 key 落在同一个桶中,也就是发生了哈希冲突。冲突的解决手段是用链表法:在 bucket 中,从前往后找到第一个空位。这样,在查找某个 key 时,先找到对应的桶,再去遍历 bucket 中的 key。

这里参考曹大 github 博客里的一张图,原图是 ascii 图,geek 味十足,可以从参考资料找到曹大的博客,推荐大家去看看。


上图中,假定 B = 5,所以 bucket 总数就是 2^5 = 32。首先计算出待查找 key 的哈希,使用低 5 位 00110,找到对应的 6 号 bucket,使用高 8 位 10010111,对应十进制 151,在 6 号 bucket 中寻找 tophash 值(HOB hash)为 151 的 key,找到了 2 号槽位,这样整个查找过程就结束了。

如果在 bucket 中没找到,并且 overflow 不为空,还要继续去 overflow bucket 中寻找,直到找到或是所有的 key 槽位都找遍了,包括所有的 overflow bucket。

我们来看下源码吧,哈哈!通过汇编语言可以看到,查找某个 key 的底层函数是 mapacess系列函数,函数的作用类似,区别在下一节会讲到。这里我们直接看 mapacess1 函数:

func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer{
// ……
// 如果 h 什么都没有,返回零值   
if h == nil || h.count == 0 {        
return unsafe.Pointer(&zeroVal[0])   
 }
// 写和读冲突   
 if h.flags&hashWriting != 0 {      
  throw("concurrent map read and map write")    
}    
// 不同类型 key 使用的 hash 算法在编译期确定
    alg := t.key.alg    
// 计算哈希值,并且加入 hash0 引入随机性    
    hash := alg.hash(key, uintptr(h.hash0))    
// 比如 B=5,那 m 就是31,二进制是全 1
// 求 bucket num 时,将 hash 与 m 相与,
// 达到 bucket num 由 hash 的低 8 位决定的效果
    m := uintptr(1)<<h.B - 1    
// b 就是 bucket 的地址
    b := (*bmap)(add(h.buckets,(hash&m)*uintptr(t.bucketsize)))
    // oldbuckets 不为 nil,说明发生了扩容
    if c := h.oldbuckets; c !=nil{
        // 如果不是同 size 扩容(看后面扩容的内容)
        // 对应条件 1 的解决方案
      if !h.sameSizeGrow(){
            // 新 bucket 数量是老的 2 倍
            m >>=1
        }
        // 求出 key 在老的 map 中的 bucket 位置
        oldb :=(*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
        // 如果 oldb 没有搬迁到新的 bucket
        // 那就在老的 bucket 中寻找
        if !evacuated(oldb){
            b = oldb
        }
    }
    // 计算出高 8 位的 hash
    // 相当于右移 56 位,只取高8位
    top := uint8(hash >>(sys.PtrSize*8- 8))
    // 增加一个 minTopHash
    if top < minTopHash {        
          top += minTopHash
    }
    for {
        // 遍历 8 个 bucket
        for i := uintptr(0); i < bucketCnt; i++ {
            // tophash 不匹配,继续
            if b.tophash[i] != top {
                continue
            }
            // tophash 匹配,定位到 key 的位置
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            // key 是指针
            if t.indirectkey {
                // 解引用
               k = *((*unsafe.Pointer)(k))
            }
            // 如果 key 相等
            if alg.equal(key, k) {
                // 定位到 value 的位置
               v := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
                // value 解引用
                if t.indirectvalue {
                    v =*((*unsafe.Pointer)(v))
                }
                return v
            }
        }
        // bucket 找完(还没找到),继续到 overflow bucket 里找
        b = b.overflow(t)
        // overflow bucket 也找完了,说明没有目标 key
        // 返回零值
        if b ==nil {
            return unsafe.Pointer(&zeroVal[0])
        }
    }
}

函数返回 h[key] 的指针,如果 h 中没有此 key,那就会返回一个 key 相应类型的零值,不会返回 nil。
代码整体比较直接,没什么难懂的地方。跟着上面的注释一步步理解就好了。
这里,说一下定位 key 和 value 的方法以及整个循环的写法。

// key 定位公式
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
// value 定位公式
v := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))

b 是 bmap 的地址,这里 bmap 还是源码里定义的结构体,只包含一个 tophash 数组,经编译器扩充之后的结构体才包含 key,value,overflow 这些字段。dataOffset 是 key 相对于 bmap 起始地址的偏移:

dataOffset = unsafe.Offsetof(struct{
        b bmap
        v int64
    }{}.v)

因此 bucket 里 key 的起始地址就是 unsafe.Pointer(b)+dataOffset。第 i 个 key 的地址就要在此基础上跨过 i 个 key 的大小;而我们又知道,value 的地址是在所有 key 之后,因此第 i 个 value 的地址还需要加上所有 key 的偏移。理解了这些,上面 key 和 value 的定位公式就很好理解了。

再说整个大循环的写法,最外层是一个无限循环,通过

b = b.overflow(t)

遍历所有的 bucket,这相当于是一个 bucket 链表。

当定位到一个具体的 bucket 时,里层循环就是遍历这个 bucket 里所有的 cell,或者说所有的槽位,也就是 bucketCnt=8 个槽位。整个循环过程:


再说一下 minTopHash,当一个 cell 的 tophash 值小于 minTopHash 时,标志这个 cell 的迁移状态。因为这个状态值是放在 tophash 数组里,为了和正常的哈希值区分开,会给 key 计算出来的哈希值一个增量:minTopHash。这样就能区分正常的 top hash 值和表示状态的哈希值。

下面的这几种状态就表征了 bucket 的情况:

// 空的 cell,也是初始时 bucket 的状态
empty =0
// 空的 cell,表示 cell 已经被迁移到新的 bucket
evacuatedEmpty =1
// key,value 已经搬迁完毕,但是 key 都在新 bucket 前半部分,
// 后面扩容部分会再讲到。
evacuatedX     =2
// 同上,key 在后半部分
evacuatedY     =3
// tophash 的最小正常值
minTopHash     =4

源码里判断这个 bucket 是否已经搬迁完毕,用到的函数:

func evacuated(b *bmap) bool{
    h := b.tophash[0]
    return h > empty &&h < minTopHash
}

只取了 tophash 数组的第一个值,判断它是否在 0-4 之间。对比上面的常量,当 top hash 是 evacuatedEmptyevacuatedXevacuatedY 这三个值之一,说明此 bucket 中的 key 全部被搬迁到了新 bucket。

map 的两种 get 操作

Go 语言中读取 map 有两种语法:带 comma 和 不带 comma。当要查询的 key 不在 map 里,带 comma 的用法会返回一个 bool 型变量提示 key 是否在 map 中;而不带 comma 的语句则会返回一个 key 类型的零值。如果 key 是 int 型就会返回 0,如果 key 是 string 类型,就会返回空字符串。

package main
import "fmt"
func main(){
    ageMap := make(map[string]int)
    ageMap["qcrao"] = 18
    // 不带 comma 用法
    age1 := ageMap["stefno"]
    fmt.Println(age1)
    // 带 comma 用法
    age2, ok := ageMap["stefno"]
    fmt.Println(age2, ok)
}

运行结果:

0
0 false

以前一直觉得好神奇,怎么实现的?这其实是编译器在背后做的工作:分析代码后,将两种语法对应到底层两个不同的函数。

// src/runtime/hashmap.go
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool)

源码里,函数命名不拘小节,直接带上后缀 1,2,完全不理会《代码大全》里的那一套命名的做法。从上面两个函数的声明也可以看出差别了, mapaccess2 函数返回值多了一个 bool 型变量,两者的代码也是完全一样的,只是在返回值后面多加了一个 false 或者 true。

另外,根据 key 的不同类型,编译器还会将查找、插入、删除的函数用更具体的函数替换,以优化效率:

key 类型 查找
uint32 mapaccess1_fast32(t *maptype, h *hmap, key uint32) unsafe.Pointer
uint32 mapaccess2_fast32(t *maptype, h *hmap, key uint32) (unsafe.Pointer, bool)
uint64 mapaccess1_fast64(t *maptype, h *hmap, key uint64) unsafe.Pointer
uint64 mapaccess2_fast64(t *maptype, h *hmap, key uint64) (unsafe.Pointer, bool)
string mapaccess1_faststr(t *maptype, h *hmap, ky string) unsafe.Pointer
string mapaccess2_faststr(t *maptype, h *hmap, ky string) (unsafe.Pointer, bool)

这些函数的参数类型直接是具体的 uint32、unt64、string,在函数内部由于提前知晓了 key 的类型,所以内存布局是很清楚的,因此能节省很多操作,提高效率。

上面这些函数都是在文件 src/runtime/hashmap_fast.go 里。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,347评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,435评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,509评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,611评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,837评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,987评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,730评论 0 267
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,194评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,525评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,664评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,334评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,944评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,764评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,997评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,389评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,554评论 2 349

推荐阅读更多精彩内容