以太坊源码深入分析(8)-- 以太坊核心BlockChain源码分析

前面几节都在分析以太坊的通信协议,怎么广播,怎么同步,怎么下载。这一节讲讲以太坊的核心模块BlockChain,也就是以太坊的区块链。

一,BlockChain的初始化
Ethereum服务初始化的时候会调用core.SetupGenesisBlock来加载创始区块。顾名思义,创始区块就是以太坊区块链中的第一个区块,number值为0。紧接着调用core.NewBlockChain来加载以太坊的区块链。

func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) {
    if cacheConfig == nil {
        cacheConfig = &CacheConfig{
            TrieNodeLimit: 256 * 1024 * 1024,
            TrieTimeLimit: 5 * time.Minute,
        }
    }
    bodyCache, _ := lru.New(bodyCacheLimit)
    bodyRLPCache, _ := lru.New(bodyCacheLimit)
    blockCache, _ := lru.New(blockCacheLimit)
    futureBlocks, _ := lru.New(maxFutureBlocks)
    badBlocks, _ := lru.New(badBlockLimit)

    bc := &BlockChain{
        chainConfig:  chainConfig,
        cacheConfig:  cacheConfig,
        db:           db,
        triegc:       prque.New(),
        stateCache:   state.NewDatabase(db),
        quit:         make(chan struct{}),
        bodyCache:    bodyCache,
        bodyRLPCache: bodyRLPCache,
        blockCache:   blockCache,
        futureBlocks: futureBlocks,
        engine:       engine,
        vmConfig:     vmConfig,
        badBlocks:    badBlocks,
    }
    bc.SetValidator(NewBlockValidator(chainConfig, bc, engine))
    bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine))

    var err error
    bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt)
    if err != nil {
        return nil, err
    }
    bc.genesisBlock = bc.GetBlockByNumber(0)
    if bc.genesisBlock == nil {
        return nil, ErrNoGenesis
    }
    if err := bc.loadLastState(); err != nil {
        return nil, err
    }
    // Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
    for hash := range BadHashes {
        if header := bc.GetHeaderByHash(hash); header != nil {
            // get the canonical block corresponding to the offending header's number
            headerByNumber := bc.GetHeaderByNumber(header.Number.Uint64())
            // make sure the headerByNumber (if present) is in our current canonical chain
            if headerByNumber != nil && headerByNumber.Hash() == header.Hash() {
                log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash)
                bc.SetHead(header.Number.Uint64() - 1)
                log.Error("Chain rewind was successful, resuming normal operation")
            }
        }
    }
    // Take ownership of this particular state
    go bc.update()
    return bc, nil
}

初始化方法做了这么几件事:
1,创建各种lru缓存(最近最少使用的算法)
2,初始化triegc(用于垃圾回收的区块number 对应的优先级队列),初始化stateDb,NewBlockValidator()初始化区块和状态验证器,NewStateProcessor()初始化区块状态处理器
3,NewHeaderChain()初始化区块头部链
4,bc.genesisBlock = bc.GetBlockByNumber(0) 拿到第0个区块,也就是创世区块
5,bc.loadLastState() 加载最新的状态数据
6,查找本地区块链上时候有硬分叉的区块,如果有调用bc.SetHead回到硬分叉之前的区块头
7,go bc.update() 定时处理future block

二,看看bc.loadLastState()方法

func (bc *BlockChain) loadLastState() error {
    // Restore the last known head block
    head := GetHeadBlockHash(bc.db)
    if head == (common.Hash{}) {
        // Corrupt or empty database, init from scratch
        log.Warn("Empty database, resetting chain")
        return bc.Reset()
    }
    // Make sure the entire head block is available
    currentBlock := bc.GetBlockByHash(head)
    if currentBlock == nil {
        // Corrupt or empty database, init from scratch
        log.Warn("Head block missing, resetting chain", "hash", head)
        return bc.Reset()
    }
    // Make sure the state associated with the block is available
    if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
        // Dangling block without a state associated, init from scratch
        log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash())
        if err := bc.repair(&currentBlock); err != nil {
            return err
        }
    }
    // Everything seems to be fine, set as the head block
    bc.currentBlock.Store(currentBlock)

    // Restore the last known head header
    currentHeader := currentBlock.Header()
    if head := GetHeadHeaderHash(bc.db); head != (common.Hash{}) {
        if header := bc.GetHeaderByHash(head); header != nil {
            currentHeader = header
        }
    }
    bc.hc.SetCurrentHeader(currentHeader)

    // Restore the last known head fast block
    bc.currentFastBlock.Store(currentBlock)
    if head := GetHeadFastBlockHash(bc.db); head != (common.Hash{}) {
        if block := bc.GetBlockByHash(head); block != nil {
            bc.currentFastBlock.Store(block)
        }
    }

    // Issue a status log for the user
    currentFastBlock := bc.CurrentFastBlock()

    headerTd := bc.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64())
    blockTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
    fastTd := bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64())

    log.Info("Loaded most recent local header", "number", currentHeader.Number, "hash", currentHeader.Hash(), "td", headerTd)
    log.Info("Loaded most recent local full block", "number", currentBlock.Number(), "hash", currentBlock.Hash(), "td", blockTd)
    log.Info("Loaded most recent local fast block", "number", currentFastBlock.Number(), "hash", currentFastBlock.Hash(), "td", fastTd)

    return nil
}

1,获取到最新区块以及它的hash
2,从stateDb中打开最新区块的状态trie,如果打开失败调用bc.repair(&currentBlock)方法进行修复。修复方法就是从当前区块一个个的往前面找,直到找到好的区块,然后赋值给currentBlock。
3,获取到最新的区块头
4,找到最新的fast模式下的block,并设置bc.currentFastBlock

三,再看看用来回滚区块的bc.SetHead()方法

func (bc *BlockChain) SetHead(head uint64) error {
    log.Warn("Rewinding blockchain", "target", head)

    bc.mu.Lock()
    defer bc.mu.Unlock()

    // Rewind the header chain, deleting all block bodies until then
    delFn := func(hash common.Hash, num uint64) {
        DeleteBody(bc.db, hash, num)
    }
    bc.hc.SetHead(head, delFn)
    currentHeader := bc.hc.CurrentHeader()

    // Clear out any stale content from the caches
    bc.bodyCache.Purge()
    bc.bodyRLPCache.Purge()
    bc.blockCache.Purge()
    bc.futureBlocks.Purge()

    // Rewind the block chain, ensuring we don't end up with a stateless head block
    if currentBlock := bc.CurrentBlock(); currentBlock != nil && currentHeader.Number.Uint64() < currentBlock.NumberU64() {
        bc.currentBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64()))
    }
    if currentBlock := bc.CurrentBlock(); currentBlock != nil {
        if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
            // Rewound state missing, rolled back to before pivot, reset to genesis
            bc.currentBlock.Store(bc.genesisBlock)
        }
    }
    // Rewind the fast block in a simpleton way to the target head
    if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentHeader.Number.Uint64() < currentFastBlock.NumberU64() {
        bc.currentFastBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64()))
    }
    // If either blocks reached nil, reset to the genesis state
    if currentBlock := bc.CurrentBlock(); currentBlock == nil {
        bc.currentBlock.Store(bc.genesisBlock)
    }
    if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock == nil {
        bc.currentFastBlock.Store(bc.genesisBlock)
    }
    currentBlock := bc.CurrentBlock()
    currentFastBlock := bc.CurrentFastBlock()
    if err := WriteHeadBlockHash(bc.db, currentBlock.Hash()); err != nil {
        log.Crit("Failed to reset head full block", "err", err)
    }
    if err := WriteHeadFastBlockHash(bc.db, currentFastBlock.Hash()); err != nil {
        log.Crit("Failed to reset head fast block", "err", err)
    }
    return bc.loadLastState()
}

1,首先调用bc.hc.SetHead(head, delFn),回滚head对应的区块头。并清除中间区块头所有的数据和缓存。设置head为新的currentHeadr。
2,重新设置bc.currentBlock,bc.currentFastBlock
3,调用bc.loadLastState(),重新加载状态

四,之前分析Downloader和Fetcher的时候,在同步完区块后会调用InsertChain方法插入到本地BlockChain中。我们看看InsertChain怎么工作的

func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
    n, events, logs, err := bc.insertChain(chain)
    bc.PostChainEvents(events, logs)
    return n, err
}

调用bc.insertChain(chain),并将插入的结果广播给订阅了blockChain事件的对象。

func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) {
    // Do a sanity check that the provided chain is actually ordered and linked
    for i := 1; i < len(chain); i++ {
        if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() {
            // Chain broke ancestry, log a messge (programming error) and skip insertion
            log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(),
                "parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash())

            return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(),
                chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])
        }
    }
    // Pre-checks passed, start the full block imports
    bc.wg.Add(1)
    defer bc.wg.Done()

    bc.chainmu.Lock()
    defer bc.chainmu.Unlock()

    // A queued approach to delivering events. This is generally
    // faster than direct delivery and requires much less mutex
    // acquiring.
    var (
        stats         = insertStats{startTime: mclock.Now()}
        events        = make([]interface{}, 0, len(chain))
        lastCanon     *types.Block
        coalescedLogs []*types.Log
    )
    // Start the parallel header verifier
    headers := make([]*types.Header, len(chain))
    seals := make([]bool, len(chain))

    for i, block := range chain {
        headers[i] = block.Header()
        seals[i] = true
    }
    abort, results := bc.engine.VerifyHeaders(bc, headers, seals)
    defer close(abort)

    // Iterate over the blocks and insert when the verifier permits
    for i, block := range chain {
        // If the chain is terminating, stop processing blocks
        if atomic.LoadInt32(&bc.procInterrupt) == 1 {
            log.Debug("Premature abort during blocks processing")
            break
        }
        // If the header is a banned one, straight out abort
        if BadHashes[block.Hash()] {
            bc.reportBlock(block, nil, ErrBlacklistedHash)
            return i, events, coalescedLogs, ErrBlacklistedHash
        }
        // Wait for the block's verification to complete
        bstart := time.Now()

        err := <-results
        if err == nil {
            err = bc.Validator().ValidateBody(block)
        }
        switch {
        case err == ErrKnownBlock:
            // Block and state both already known. However if the current block is below
            // this number we did a rollback and we should reimport it nonetheless.
            if bc.CurrentBlock().NumberU64() >= block.NumberU64() {
                stats.ignored++
                continue
            }

        case err == consensus.ErrFutureBlock:
            // Allow up to MaxFuture second in the future blocks. If this limit is exceeded
            // the chain is discarded and processed at a later time if given.
            max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
            if block.Time().Cmp(max) > 0 {
                return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max)
            }
            bc.futureBlocks.Add(block.Hash(), block)
            stats.queued++
            continue

        case err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()):
            bc.futureBlocks.Add(block.Hash(), block)
            stats.queued++
            continue

        case err == consensus.ErrPrunedAncestor:
            // Block competing with the canonical chain, store in the db, but don't process
            // until the competitor TD goes above the canonical TD
            currentBlock := bc.CurrentBlock()
            localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
            externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty())
            if localTd.Cmp(externTd) > 0 {
                if err = bc.WriteBlockWithoutState(block, externTd); err != nil {
                    return i, events, coalescedLogs, err
                }
                continue
            }
            // Competitor chain beat canonical, gather all blocks from the common ancestor
            var winner []*types.Block

            parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
            for !bc.HasState(parent.Root()) {
                winner = append(winner, parent)
                parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1)
            }
            for j := 0; j < len(winner)/2; j++ {
                winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j]
            }
            // Import all the pruned blocks to make the state available
            bc.chainmu.Unlock()
            _, evs, logs, err := bc.insertChain(winner)
            bc.chainmu.Lock()
            events, coalescedLogs = evs, logs

            if err != nil {
                return i, events, coalescedLogs, err
            }

        case err != nil:
            bc.reportBlock(block, nil, err)
            return i, events, coalescedLogs, err
        }
        // Create a new statedb using the parent block and report an
        // error if it fails.
        var parent *types.Block
        if i == 0 {
            parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
        } else {
            parent = chain[i-1]
        }
        state, err := state.New(parent.Root(), bc.stateCache)
        if err != nil {
            return i, events, coalescedLogs, err
        }
        // Process block using the parent state as reference point.
        receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig)
        if err != nil {
            bc.reportBlock(block, receipts, err)
            return i, events, coalescedLogs, err
        }
        // Validate the state using the default validator
        err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas)
        if err != nil {
            bc.reportBlock(block, receipts, err)
            return i, events, coalescedLogs, err
        }
        proctime := time.Since(bstart)

        // Write the block to the chain and get the status.
        status, err := bc.WriteBlockWithState(block, receipts, state)
        if err != nil {
            return i, events, coalescedLogs, err
        }
        switch status {
        case CanonStatTy:
            log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
                "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))

            coalescedLogs = append(coalescedLogs, logs...)
            blockInsertTimer.UpdateSince(bstart)
            events = append(events, ChainEvent{block, block.Hash(), logs})
            lastCanon = block

            // Only count canonical blocks for GC processing time
            bc.gcproc += proctime

        case SideStatTy:
            log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed",
                common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()))

            blockInsertTimer.UpdateSince(bstart)
            events = append(events, ChainSideEvent{block})
        }
        stats.processed++
        stats.usedGas += usedGas
        stats.report(chain, i, bc.stateCache.TrieDB().Size())
    }
    // Append a single chain head event if we've progressed the chain
    if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
        events = append(events, ChainHeadEvent{lastCanon})
    }
    return 0, events, coalescedLogs, nil
}

1,首先确保插入的blocks是安次序排列的
2,调用共识引擎bc.engine.VerifyHeaders,验证区块链的这些headers。(共识验证是个复杂的事情,在讲共识机制的时候再分析)
3,如果共识验证没有问题,再调用bc.Validator().ValidateBody(block)验证block的Body,这个方法只验证block的叔区块hash和区块交易列表的hash。
4,根据ValidateBody验证结果,如果是还没有插入本地的区块,但是其父区块在bc.futureBlocks就加入bc.futureBlocks。如果父区块是本地区块,但是没有状态,就递归调用bc.insertChain(winner),直到有状态才插入。
5,获得父区块的状态,调用processor.Process()处理block的交易数据,并生成收据,日志等信息,产生本区块的状态。Process()方法,执行了Block里面包含的的所有交易,根据交易的过程和结果生成所有交易的收据和日志信息。(fast模式下收据数据是同步过来的,full模式下是本地重现了交易并生成了收据数据)
6,调用bc.Validator().ValidateState,对产生的区块交易收据数据,和消费的gas于收到的block相关数据进行对比验证。对比消费的gas是否一样,对比bloom是否一致,根据收据生成hash是否一致,对比header.root和stateDb的merkle树的根hash是否一致。
7,调用bc.WriteBlockWithState(block, receipts, state),将block写入到本地的区块链中,并返回status。根据status判断插入的是主链还是side链,如果是主链bc.gcproc需要加上验证花费的时间。
8,返回结果事件和日志信息,用于通知给订阅了区块插入事件的对象

五, 再分析一下bc.WriteBlockWithState是怎么工作的:

func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
    bc.wg.Add(1)
    defer bc.wg.Done()

    // Calculate the total difficulty of the block
    ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
    if ptd == nil {
        return NonStatTy, consensus.ErrUnknownAncestor
    }
    // Make sure no inconsistent state is leaked during insertion
    bc.mu.Lock()
    defer bc.mu.Unlock()

    currentBlock := bc.CurrentBlock()
    localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
    externTd := new(big.Int).Add(block.Difficulty(), ptd)

    // Irrelevant of the canonical status, write the block itself to the database
    if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
        return NonStatTy, err
    }
    // Write other block data using a batch.
    batch := bc.db.NewBatch()
    if err := WriteBlock(batch, block); err != nil {
        return NonStatTy, err
    }
    root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
    if err != nil {
        return NonStatTy, err
    }
    triedb := bc.stateCache.TrieDB()

    // If we're running an archive node, always flush
    if bc.cacheConfig.Disabled {
        if err := triedb.Commit(root, false); err != nil {
            return NonStatTy, err
        }
    } else {
        // Full but not archive node, do proper garbage collection
        triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
        bc.triegc.Push(root, -float32(block.NumberU64()))

        if current := block.NumberU64(); current > triesInMemory {
            // Find the next state trie we need to commit
            header := bc.GetHeaderByNumber(current - triesInMemory)
            chosen := header.Number.Uint64()

            // Only write to disk if we exceeded our memory allowance *and* also have at
            // least a given number of tries gapped.
            var (
                size  = triedb.Size()
                limit = common.StorageSize(bc.cacheConfig.TrieNodeLimit) * 1024 * 1024
            )
            if size > limit || bc.gcproc > bc.cacheConfig.TrieTimeLimit {
                // If we're exceeding limits but haven't reached a large enough memory gap,
                // warn the user that the system is becoming unstable.
                if chosen < lastWrite+triesInMemory {
                    switch {
                    case size >= 2*limit:
                        log.Warn("State memory usage too high, committing", "size", size, "limit", limit, "optimum", float64(chosen-lastWrite)/triesInMemory)
                    case bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit:
                        log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/triesInMemory)
                    }
                }
                // If optimum or critical limits reached, write to disk
                if chosen >= lastWrite+triesInMemory || size >= 2*limit || bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
                    triedb.Commit(header.Root, true)
                    lastWrite = chosen
                    bc.gcproc = 0
                }
            }
            // Garbage collect anything below our required write retention
            for !bc.triegc.Empty() {
                root, number := bc.triegc.Pop()
                if uint64(-number) > chosen {
                    bc.triegc.Push(root, number)
                    break
                }
                triedb.Dereference(root.(common.Hash), common.Hash{})
            }
        }
    }
    if err := WriteBlockReceipts(batch, block.Hash(), block.NumberU64(), receipts); err != nil {
        return NonStatTy, err
    }
    // If the total difficulty is higher than our known, add it to the canonical chain
    // Second clause in the if statement reduces the vulnerability to selfish mining.
    // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
    reorg := externTd.Cmp(localTd) > 0
    currentBlock = bc.CurrentBlock()
    if !reorg && externTd.Cmp(localTd) == 0 {
        // Split same-difficulty blocks by number, then at random
        reorg = block.NumberU64() < currentBlock.NumberU64() || (block.NumberU64() == currentBlock.NumberU64() && mrand.Float64() < 0.5)
    }
    if reorg {
        // Reorganise the chain if the parent is not the head block
        if block.ParentHash() != currentBlock.Hash() {
            if err := bc.reorg(currentBlock, block); err != nil {
                return NonStatTy, err
            }
        }
        // Write the positional metadata for transaction and receipt lookups
        if err := WriteTxLookupEntries(batch, block); err != nil {
            return NonStatTy, err
        }
        // Write hash preimages
        if err := WritePreimages(bc.db, block.NumberU64(), state.Preimages()); err != nil {
            return NonStatTy, err
        }
        status = CanonStatTy
    } else {
        status = SideStatTy
    }
    if err := batch.Write(); err != nil {
        return NonStatTy, err
    }

    // Set new head.
    if status == CanonStatTy {
        bc.insert(block)
    }
    bc.futureBlocks.Remove(block.Hash())
    return status, nil
}

1,从数据库中获取到parent的td。加上block 的difficulty,计算新的total difficulty值,并写入数据库。
2,调用WriteBlock(batch, block) 把block的body和header都写到数据库
3,调用state.Commit(bc.chainConfig.IsEIP158(block.Number()))把状态写入数据库并获取到状态root。
4,按规则处理bc.stateCache缓存,并清理垃圾回收器
5,调用WriteBlockReceipts,把收据数据写入数据库
6,如果发现block的父区块不是本地当前最新区块,调用bc.reorg(currentBlock, block),如果新区块比老区块td高,则把高出来的区块一一insert进blockChain。
7,调用WriteTxLookupEntries,根据交易hash建立数据库的索引。调用WritePreimages,把Preimages写入数据库,Preimages在evm执行sha3指令时产生。
8,如果新的td大于或等于本地td,说明是主链区块,调用bc.insert(block),更新blockChain的currentBlock,currentHeader,currentFastBolck等信息。如果不是主链区块则不会。

六,在分析Downloader的时候,只有full模式才会调用InsertChain()方法,而fast模式是InsertReceiptChain()方法。我们来看看InsertReceiptChain()方法做了什么,它和InsertChain()方法有什么区别。

func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
    bc.wg.Add(1)
    defer bc.wg.Done()

    // Do a sanity check that the provided chain is actually ordered and linked
    for i := 1; i < len(blockChain); i++ {
        if blockChain[i].NumberU64() != blockChain[i-1].NumberU64()+1 || blockChain[i].ParentHash() != blockChain[i-1].Hash() {
            log.Error("Non contiguous receipt insert", "number", blockChain[i].Number(), "hash", blockChain[i].Hash(), "parent", blockChain[i].ParentHash(),
                "prevnumber", blockChain[i-1].Number(), "prevhash", blockChain[i-1].Hash())
            return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, blockChain[i-1].NumberU64(),
                blockChain[i-1].Hash().Bytes()[:4], i, blockChain[i].NumberU64(), blockChain[i].Hash().Bytes()[:4], blockChain[i].ParentHash().Bytes()[:4])
        }
    }

    var (
        stats = struct{ processed, ignored int32 }{}
        start = time.Now()
        bytes = 0
        batch = bc.db.NewBatch()
    )
    for i, block := range blockChain {
        receipts := receiptChain[i]
        // Short circuit insertion if shutting down or processing failed
        if atomic.LoadInt32(&bc.procInterrupt) == 1 {
            return 0, nil
        }
        // Short circuit if the owner header is unknown
        if !bc.HasHeader(block.Hash(), block.NumberU64()) {
            return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
        }
        // Skip if the entire data is already known
        if bc.HasBlock(block.Hash(), block.NumberU64()) {
            stats.ignored++
            continue
        }
        // Compute all the non-consensus fields of the receipts
        SetReceiptsData(bc.chainConfig, block, receipts)
        // Write all the data out into the database
        if err := WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()); err != nil {
            return i, fmt.Errorf("failed to write block body: %v", err)
        }
        if err := WriteBlockReceipts(batch, block.Hash(), block.NumberU64(), receipts); err != nil {
            return i, fmt.Errorf("failed to write block receipts: %v", err)
        }
        if err := WriteTxLookupEntries(batch, block); err != nil {
            return i, fmt.Errorf("failed to write lookup metadata: %v", err)
        }
        stats.processed++

        if batch.ValueSize() >= ethdb.IdealBatchSize {
            if err := batch.Write(); err != nil {
                return 0, err
            }
            bytes += batch.ValueSize()
            batch.Reset()
        }
    }
    if batch.ValueSize() > 0 {
        bytes += batch.ValueSize()
        if err := batch.Write(); err != nil {
            return 0, err
        }
    }

    // Update the head fast sync block if better
    bc.mu.Lock()
    head := blockChain[len(blockChain)-1]
    if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case
        currentFastBlock := bc.CurrentFastBlock()
        if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 {
            if err := WriteHeadFastBlockHash(bc.db, head.Hash()); err != nil {
                log.Crit("Failed to update head fast block hash", "err", err)
            }
            bc.currentFastBlock.Store(head)
        }
    }
    bc.mu.Unlock()

    log.Info("Imported new block receipts",
        "count", stats.processed,
        "elapsed", common.PrettyDuration(time.Since(start)),
        "number", head.Number(),
        "hash", head.Hash(),
        "size", common.StorageSize(bytes),
        "ignored", stats.ignored)
    return 0, nil
}

1,首先确保插入的blocks是安次序排列的
2,调用SetReceiptsData(bc.chainConfig, block, receipts)把收到的交易收据数据加入到block中
3,调用WriteBody(),把blockbody数据写入数据库
4,调用WriteBlockReceipts(),把收据数据写入数据库
5,调用WriteTxLookupEntries,根据交易hash建立数据库的索引。
6,更新BlockChain的currentFastBlock信息

总结
BlockChain模块的代码不多,逻辑也不复杂,不像Downloader和Fetcher 那样一堆goroutine满天飞,很好读。BlockChain模块是以太坊所有数据的大集合,所有数据都在这里汇集,并在这里对数据进行校验,把结果写入数据库。这个模块最重要的是要区分Full模式同步数据处理和Fast模式处理的区别。我们发现Fast模式把最耗时的数据验证和交易回放都跳过了,大大的缩减了同步的时间,同时也节约了计算的能源消耗。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容