以太坊源码阅读-eth-filter

首先看bloombit.go

// Retrieval represents a request for retrieval task assignments for a given
// bit with the given number of fetch elements, or a response for such a request.
// It can also have the actual results set to be used as a delivery data struct.
//
// The contest and error fields are used by the light client to terminate matching
// early if an error is encountered on some path of the pipeline.
type Retrieval struct {
Bit uint
Sections []uint64
Bitsets [][]byte

Context context.Context
Error   error

}

// startBloomHandlers starts a batch of goroutines to accept bloom bit database
// retrievals from possibly a range of filters and serving the data to satisfy.
func (eth *Ethereum) startBloomHandlers() {
for i := 0; i < bloomServiceThreads; i++ {
go func() {
for {
select {
case <-eth.shutdownChan:
return

            case request := <-eth.bloomRequests:
                task := <-request
                task.Bitsets = make([][]byte, len(task.Sections))
                for i, section := range task.Sections {
                    head := rawdb.ReadCanonicalHash(eth.chainDb, (section+1)*params.BloomBitsBlocks-1)
                    if compVector, err := rawdb.ReadBloomBits(eth.chainDb, task.Bit, section, head); err == nil {
                        if blob, err := bitutil.DecompressBytes(compVector, int(params.BloomBitsBlocks)/8); err == nil {
                            task.Bitsets[i] = blob
                        } else {
                            task.Error = err
                        }
                    } else {
                        task.Error = err
                    }
                }
                request <- task
            }
        }
    }()
}

}

再看数据结构
// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
// for the Ethereum header bloom filters, permitting blazing fast filtering.
type BloomIndexer struct {
size uint64 // section size to generate bloombits for
db ethdb.Database // database instance to write index data and metadata into
gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
section uint64 // Section is the section number being processed currently
head common.Hash // Head is the hash of the last header processed
}

reset实现了ChainIndexerBackedn方法,启动新的section
// Reset implements core.ChainIndexerBackend, starting a new bloombits index
// section.
func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error {
gen, err := bloombits.NewGenerator(uint(b.size))
b.gen, b.section, b.head = gen, section, common.Hash{}
return err
}

Process实现ChainIndexerBackend, 增加一个新的区块头布隆过滤器到index中。
// Process implements core.ChainIndexerBackend, adding a new header's bloom into the index.
func (b *BloomIndexer) Process(header types.Header) {
b.gen.AddBloom(uint(header.Number.Uint64()-b.section
b.size), header.Bloom)
b.head = header.Hash()
}

Commit实现了ChainIndexerBackend,持久化并写入数据库。
// Commit implements core.ChainIndexerBackend, finalizing the bloom section and writing it out into the database.
func (b *BloomIndexer) Commit() error {
batch := b.db.NewBatch()

for i := 0; i < types.BloomBitLength; i++ {
    bits, err := b.gen.Bitset(uint(i))
    if err != nil {
        return err
    }
    core.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits))
}
return batch.Write()

}

filter/api.go 提供了过滤功能,通过调用对交易或者区块进行过滤获得结果,如果5分钟内无操作,则删除这个过滤器。
// filter is a helper struct that holds meta information over the filter type
// and associated subscription in the event system.
type filter struct {
typ Type
deadline time.Timer // filter is inactiv when deadline triggers
hashes []common.Hash
crit FilterCriteria
logs []
types.Log
s *Subscription // associated subscription in event system
}

// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
backend Backend
mux *event.TypeMux
quit chan struct{}
chainDb ethdb.Database
events EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]
filter
}

// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(backend Backend, lightMode bool) PublicFilterAPI {
api := &PublicFilterAPI{
backend: backend,
mux: backend.EventMux(),
chainDb: backend.ChainDb(),
events: NewEventSystem(backend.EventMux(), backend, lightMode),
filters: make(map[rpc.ID]
filter),
}
go api.timeoutLoop()

return api

}

超时检查
// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used. Tt is started when the api is created.
func (api *PublicFilterAPI) timeoutLoop() {
ticker := time.NewTicker(5 * time.Minute)
for {
<-ticker.C
api.filtersMu.Lock()
for id, f := range api.filters {
select {
case <-f.deadline.C:
f.s.Unsubscribe()
delete(api.filters, id)
default:
continue
}
}
api.filtersMu.Unlock()
}
}

NewPendingTransactionFilter,用来创建一个PendingTransactionFilter,分别对应http和websocket两种方式。
// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
// as transactions enter the pending state.
// It is part of the filter package because this filter can be used throug the
// eth_getFilterChanges polling method that is also used for log filters.
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter
func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
var (
pendingTxs = make(chan common.Hash)
pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs)
)
api.filtersMu.Lock()
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub}
api.filtersMu.Unlock()

go func() {
    for {
        select {
        case ph := <-pendingTxs: 
            api.filtersMu.Lock()
            if f, found := api.filters[pendingTxSub.ID]; found {
                f.hashes = append(f.hashes, ph)
            }
            api.filtersMu.Unlock()
        case <-pendingTxSub.Err():
            api.filtersMu.Lock()
            delete(api.filters, pendingTxSub.ID)
            api.filtersMu.Unlock()
            return
        }
    }
}()

return pendingTxSub.ID

}

GetFilterChanges
// GetFilterChanges returns the logs for the filter with the given id since
// last time it was called. This can be used for polling.
// For pending transaction and block filters the result is []common.Hash.
// (pending)Log filters return []Log.
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()

if f, found := api.filters[id]; found {
    if !f.deadline.Stop() { 
        // timer expired but filter is not yet removed in timeout loop
        // receive timer value and reset timer
        <-f.deadline.C
    }
    f.deadline.Reset(deadline)

    switch f.typ {
    case PendingTransactionsSubscription, BlocksSubscription:
        hashes := f.hashes
        f.hashes = nil
        return returnHashes(hashes), nil
    case LogsSubscription:
        logs := f.logs
        f.logs = nil
        return returnLogs(logs), nil
    }
}

return []interface{}{}, fmt.Errorf("filter not found")

}

websocket,可以直接使用rpc的发送订阅模式。
// NewPendingTransactions creates a subscription that is triggered each time a transaction
// enters the transaction pool and was signed from one of the transactions this nodes manages.
func (api PublicFilterAPI) NewPendingTransactions(ctx context.Context) (rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()

go func() {
    txHashes := make(chan common.Hash)
    pendingTxSub := api.events.SubscribePendingTxEvents(txHashes)

    for {
        select {
        case h := <-txHashes:
            notifier.Notify(rpcSub.ID, h)
        case <-rpcSub.Err():
            pendingTxSub.Unsubscribe()
            return
        case <-notifier.Closed():
            pendingTxSub.Unsubscribe()
            return
        }
    }
}()

return rpcSub, nil

}

日志过滤功能
// FilterCriteria represents a request to create a new filter.
type FilterCriteria struct {
FromBlock *big.Int
ToBlock *big.Int
Addresses []common.Address
Topics [][]common.Hash
}

// GetLogs returns logs matching the given argument that are stored within the state.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
func (api PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]types.Log, error) {
// Convert the RPC block numbers into internal representations
if crit.FromBlock == nil {
crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
if crit.ToBlock == nil {
crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
// Create and run the filter to get all the logs
filter := New(api.backend, crit.FromBlock.Int64(), crit.ToBlock.Int64(), crit.Addresses, crit.Topics)
logs, err := filter.Logs(ctx)
if err != nil {
return nil, err
}
return returnLogs(logs), err
}

fiter.go里面定义了一个Filter对象。这个对象根据区块的BloomIndexer和布隆过滤器等来过滤日志。
后端的数据结构。
type Backend interface {
ChainDb() ethdb.Database
EventMux() event.TypeMux
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (
types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)

SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription

BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)

}

// Filter can be used to retrieve and filter logs.
type Filter struct {
backend Backend
db ethdb.Database
begin, end int64
addresses []common.Address
topics [][]common.Hash
matcher *bloombits.Matcher
}

构造函数
// New creates a new filter which uses a bloom filter on blocks to figure out whether
// a particular block is interesting or not.
func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
// Flatten the address and topic filter clauses into a single bloombits filter
// system. Since the bloombits are not positional, nil topics are permitted,
// which get flattened into a nil byte slice.
var filters [][][]byte
if len(addresses) > 0 {
filter := make([][]byte, len(addresses))
for i, address := range addresses {
filter[i] = address.Bytes()
}
filters = append(filters, filter)
}
for _, topicList := range topics {
filter := make([][]byte, len(topicList))
for i, topic := range topicList {
filter[i] = topic.Bytes()
}
filters = append(filters, filter)
}
// Assemble and return the filter
size, _ := backend.BloomStatus()

return &Filter{
    backend:   backend,
    begin:     begin,
    end:       end,
    addresses: addresses,
    topics:    topics,
    db:        backend.ChainDb(),
    matcher:   bloombits.NewMatcher(size, filters),
}

}

// Logs searches the blockchain for matching log entries, returning all from the
// first block that contains matches, updating the start of the filter accordingly.
func (f Filter) Logs(ctx context.Context) ([]types.Log, error) {
// Figure out the limits of the filter range
header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if header == nil {
return nil, nil
}
head := header.Number.Uint64()

if f.begin == -1 {
    f.begin = int64(head)
}
end := uint64(f.end)
if f.end == -1 {
    end = head
}
// Gather all indexed logs, and finish with non indexed ones
var (
    logs []*types.Log
    err  error
)
size, sections := f.backend.BloomStatus()
if indexed := sections * size; indexed > uint64(f.begin) {
    if indexed > end {
        logs, err = f.indexedLogs(ctx, end)
    } else {
        logs, err = f.indexedLogs(ctx, indexed-1)
    }
    if err != nil {
        return logs, err
    }
}
rest, err := f.unindexedLogs(ctx, end)
logs = append(logs, rest...)
return logs, err

}

// indexedLogs returns the logs matching the filter criteria based on the bloom
// bits indexed available locally or via the network.
func (f Filter) indexedLogs(ctx context.Context, end uint64) ([]types.Log, error) {
// Create a matcher session and request servicing from the backend
matches := make(chan uint64, 64)
session, err := f.matcher.Start(uint64(f.begin), end, matches)
if err != nil {
return nil, err
}
defer session.Close(time.Second)
f.backend.ServiceFilter(ctx, session)

// Iterate over the matches until exhausted or context closed
var logs []*types.Log

for {
    select {
    case number, ok := <-matches:
        // Abort if all matches have been fulfilled
        if !ok { 
            f.begin = int64(end) + 1 
            return logs, nil
        }
        // Retrieve the suggested block and pull any truly matching logs
        header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
        if header == nil || err != nil {
            return logs, err
        }
        found, err := f.checkMatches(ctx, header) 
        if err != nil {
            return logs, err
        }
        logs = append(logs, found...)

    case <-ctx.Done():
        return logs, ctx.Err()
    }
}

}

checkMatches
// checkMatches checks if the receipts belonging to the given header contain any log events that
// match the filter criteria. This function is called when the bloom filter signals a potential match.
func (f Filter) checkMatches(ctx context.Context, header types.Header) (logs []types.Log, err error) {
// Get the logs of the block
receipts, err := f.backend.GetReceipts(ctx, header.Hash())
if err != nil {
return nil, err
}
var unfiltered []
types.Log
for _, receipt := range receipts {
unfiltered = append(unfiltered, ([]*types.Log)(receipt.Logs)...)
}
logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
if len(logs) > 0 {
return logs, nil
}
return nil, nil
}

filterLogs
// filterLogs creates a slice of logs matching the given criteria.
func filterLogs(logs []types.Log, fromBlock, toBlock big.Int, addresses []common.Address, topics [][]common.Hash) []types.Log {
var ret []
types.Log
Logs:
for _, log := range logs {
if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber {
continue
}
if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber {
continue
}

    if len(addresses) > 0 && !includes(addresses, log.Address) {
        continue
    }
    // If the to filtered topics is greater than the amount of topics in logs, skip.
    if len(topics) > len(log.Topics) {
        continue Logs
    }
    for i, topics := range topics {
        match := len(topics) == 0 // empty rule set == wildcard
        for _, topic := range topics {
            if log.Topics[i] == topic {
                match = true
                break
            }
        }
        if !match {
            continue Logs
        }
    }
    ret = append(ret, log)
}
return ret

}

unindexedLogs
// indexedLogs returns the logs matching the filter criteria based on raw block
// iteration and bloom matching.
func (f Filter) unindexedLogs(ctx context.Context, end uint64) ([]types.Log, error) {
var logs []*types.Log

for ; f.begin <= int64(end); f.begin++ {
    header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
    if header == nil || err != nil {
        return logs, err
    }
    if bloomFilter(header.Bloom, f.addresses, f.topics) {
        found, err := f.checkMatches(ctx, header)
        if err != nil {
            return logs, err
        }
        logs = append(logs, found...)
    }
}
return logs, nil

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,576评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,515评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,017评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,626评论 1 296
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,625评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,255评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,825评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,729评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,271评论 1 320
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,363评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,498评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,183评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,867评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,338评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,458评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,906评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,507评论 2 359