Dgraph的Key解密
[if !supportLists]一. [endif]Dgrap的key有预定义有几种:
//key类型
ByteData = byte(0x00)
ByteIndex = byte(0x02)
ByteReverse = byte(0x04)
ByteCount = byte(0x08)
ByteCountRev= ByteCount | ByteReverse
//前缀
defaultPrefix= byte(0x00)
byteSchema = byte(0x01)
byteType = byte(0x02)
[if !supportLists]1. [endif]DataKey:
buf := make([]byte, 2+len(attr)+2+8)
00411097109101000000040
defaultPrefix Length n a m e ByteData Uid:1024从QL层的谓语值sid计算
uid的转换函数:
func (bigEndian) PutUint64(b []byte, vuint64) {
_= b[7] // early bounds check to guarantee safety of writes below
b[0]= byte(v >> 56)
b[1]= byte(v >> 48)
b[2]= byte(v >> 40)
b[3]= byte(v >> 32)
b[4]= byte(v >> 24)
b[5]= byte(v >> 16)
b[6]= byte(v >> 8)
b[7]= byte(v)
}
[if !supportLists]2. [endif]SchemaKey:
buf := make([]byte, 1+2+len(attr))
10397103101
byteSchema Length Length a g e
[if !supportLists]3. [endif]Typekey:
buf := make([]byte, 1+2+len(typeName))
103737884
byteType Length Length typeName:INT
[if !supportLists]4. [endif]IndexKey:
buf := make([]byte,2+len(attr)+2+len(term)) term---->
00 4110971091012 16510810599101
defaultPrefix length length attr:n a m e ByteIndex Identifier A l i c e
identifier:
const (
IdentNone = 0x0
IdentTerm = 0x1
IdentExact = 0x2
IdentYear = 0x4
IdentMonth = 0x41
IdentDay = 0x42
IdentHour = 0x43
IdentGeo = 0x5
IdentInt = 0x6
IdentFloat = 0x7
IdentFullText= 0x8
IdentBool = 0x9
IdentTrigram = 0xA
IdentHash = 0xB
IdentCustom = 0x80
)
[if !supportLists]5. [endif]ReverseKey
buf := make([]byte, 2+len(attr)+2+8)
00 4110971091014
00000040
defaultPrefix length length attr:n a m e ByteReverse uid:1024
[if !supportLists]6. [endif]CountKey
buf := make([]byte, 1+2+len(attr)+1+4)
00 4110971091014/8 003230
defaultPrefix length length attr:n a m e ByteCount/ByteCountRev count:998
count的转换:
func (bigEndian) PutUint32(b []byte, vuint32) {
_= b[3] // early bounds check to guarantee safety of writes below
b[0]= byte(v >> 24)
b[1]= byte(v >> 16)
b[2]= byte(v >> 8)
b[3]= byte(v)
}
[if !supportLists]二. [endif]Key的使用-写入data
[if !supportLists]1. [endif]最初Dgraph会把GraphQL解析的NQuad对象请求转换成DirectedEdge对象
type NQuad struct { // GraphQL中RDF NQuad数据格式
Subject string
Predicate string
ObjectId string
ObjectValue *Value
Label string
Lang string
Facets []*Facet
XXX_NoUnkeyedLiteralstruct{}
XXX_unrecognized []byte
XXX_sizecache int32
}
type DirectedEdge struct { //Represents the original uid -> value edge
Entity uint64 //实体 RDF里的subject,uid
Attr string //属性—谓语 predicate
Value []byte //值object
ValueType Posting_ValType //值类型 包括binaryint bool uid pasword类型
ValueId uint64 //值的id
Label string //label
Lang string //语言
Op DirectedEdge_Op //操作setor delete,默认是set
Facets []*api.Facet //predicate上的面
XXX_NoUnkeyedLiteralstruct{}
XXX_unrecognized []byte
XXX_sizecache int32
}
type Posting struct { //Protocol Buffers协议,提供序列化数据结构,用于raft协议通信和存储,取出数据
Uid uint64
Value []byte
ValType Posting_ValType //值类型 包括binaryint bool uid pasword类型
PostingTypePosting_PostingType //Posting_REF uid类型,Posting_VALUE值类型,Posting_VALUE_LANG语言类
LangTag []byte
Label string
Facets []*api.Facet
Op uint32
StartTs uint64
CommitTs uint64
XXX_NoUnkeyedLiteralstruct{}
XXX_unrecognized []byte
XXX_sizecache int32
}
============================================================================
[if !supportLists]2. [endif]验证并转换DirectedEdge对象的ValueType,Value列转换成Posting对象
执行mutation时,wokerserver执行worker/mutation.go下的runMutation函数
// runMutation goes through all the edgesand applies them.
在其中,ValidateAndConvert验证DirectedEdge对象的TypeID和Posting对象的Posting_ValType是否一致,并尝试把DirectedEdge对象的Value列的值转换成Posting_ValType类型,再转换成[]byte,验证是否成功
//验证的过程中Convert和Marshal的转换有重复
如果都成功,修改DirectedEdge对象的ValueType的值,Value改为刚才转换的[]byte
============================================================================
[if !supportLists]3. [endif]使用DataKey方法产生key,并在提交前使用Key来检查本地缓存是否有数据
加工的edge提交的Attr和Enity使用DataKey方法产生key
再根据key从事务中本地缓存中找到对应的Posting对象(四层的truct嵌套为List对象),如果缓存为空,从badgerdb中查询.
[if !supportLists]4. [endif]使用Posting对象提交事务并处理索引
使用List对象的AddMutationWithIndex方法来提交事务,如果谓语有索引,通过检查postlist是否有提交语句中的值的uid(通过hash64值获得),并检查当前提交与新提交的post中的 值和值的类型是否相同.
最后使用list.addMutation来应用mutation,通过检查是否是upsert属性,如果是,检查key冲突,如果不是只检查数据冲突
使用DirectedEdge的方法NewPosting,转换一个Posting对象:
t *pb.DirectedEdge
return&pb.Posting{
Uid: t.ValueId,
Value: t.Value,
ValType: t.ValueType,
PostingType:postingType,
LangTag: []byte(t.Lang),
Label: t.Label,
Op: op,
Facets: t.Facets,
}
[if !supportLists]5. [endif]处理提交
最终这个Posting对象放到ProtocolBuffs中PostingList对象中的mutationMap这个map,用Posting的StartTs值作为下标;增加,并把key放到txn的deltas中确保顺序
[if !supportLists]① [endif].当事务提交到磁盘的时候使用事务的CommitToDisk方法,调用Plist的GetMutation方法:
[if !supportLists]1. [endif]先从deltas中获取key并添加到keys切片中
[if !supportLists]2. [endif]按照顺序从keys取出key,再根据key从缓存中获取List,从而取出mutationMap中的Plist,进过Marsh(),这里取出的是value值
[if !supportLists]3. [endif]使用key和value值组成entry写入数据,调用badgerdb的sendToWriteCh方法写入Entry,
这里的key实际把key在末尾增加八个byte并把MaxUint64减去CommitTs的值写入后八位byte
[if !supportLists]② [endif]. 如果是提交到内存中, rebuild index的时候使用
第一步一样,但是不需要从缓存取出数据,而是直接使用list.CommitMutation方法提交,只需要传参txn.StartTs和本地提交的commitTs,但是只处理事务的PostingList的CommitTs值.再使用List.MarshalToKv获取key value并使用badger的TxnWriter.SetAt方法写入key value值,写入预写日志之后Flush
写入内存如果发生冲突5ms重试,写入磁盘10ms重试
[if !supportLists]三. [endif]其他key的使用
[if !supportLists]1. [endif]SchemaKey:
当发生schema改变或者predicate迁移组时使用txn.SetWithMeta方法把变更写入txn.writes和pendingWrites,最后使用txn.CommitWith或者txn.Commit提交
[if !supportLists]2. [endif]TypeKey:
在predicate的类型变更时使用,调用的方法和SchemaKey一样
[if !supportLists]3. [endif]IndexKey
通过Key获取list对象, key作为迭代器的PreFix
[if !supportLists]4. [endif]ReverseKey
[if !supportLists]5. [endif]CountKey
最终保存在badger中的数据的结构:
//Entry provides Key, Value, UserMeta and ExpiresAt. This struct can be used bythe user to set data.
typeEntry struct {
Key []byte
Value []byte
UserMeta byte
ExpiresAt uint64 // time.Unix
meta byte
// Fields maintained internally.
offset uint32
}