// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package bloombits
import (
"bytes"
"context"
"errors"
"math"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/ava-labs/go-ethereum/common/bitutil"
"github.com/ava-labs/go-ethereum/crypto"
)
// bloomIndexes represents the bit indexes inside the bloom filter that belong
// to some key.
type bloomIndexes [3]uint
// calcBloomIndexes returns the bloom filter bit indexes belonging to the given key.
func calcBloomIndexes(b []byte) bloomIndexes {
b = crypto.Keccak256(b)
var idxs bloomIndexes
for i := 0; i < len(idxs); i++ {
idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1])
}
return idxs
}
// partialMatches with a non-nil vector represents a section in which some sub-
// matchers have already found potential matches. Subsequent sub-matchers will
// binary AND their matches with this vector. If vector is nil, it represents a
// section to be processed by the first sub-matcher.
type partialMatches struct {
section uint64
bitset []byte
}
// Retrieval represents a request for retrieval task assignments for a given
// bit with the given number of fetch elements, or a response for such a request.
// It can also have the actual results set to be used as a delivery data struct.
//
// The contest and error fields are used by the light client to terminate matching
// early if an error is encountered on some path of the pipeline.
type Retrieval struct {
Bit uint
Sections []uint64
Bitsets [][]byte
Context context.Context
Error error
}
// Matcher is a pipelined system of schedulers and logic matchers which perform
// binary AND/OR operations on the bit-streams, creating a stream of potential
// blocks to inspect for data content.
type Matcher struct {
sectionSize uint64 // Size of the data batches to filter on
filters [][]bloomIndexes // Filter the system is matching for
schedulers map[uint]*scheduler // Retrieval schedulers for loading bloom bits
retrievers chan chan uint // Retriever processes waiting for bit allocations
counters chan chan uint // Retriever processes waiting for task count reports
retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
deliveries chan *Retrieval // Retriever processes waiting for task response deliveries
running uint32 // Atomic flag whether a session is live or not
}
// NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
// address and topic filtering on them. Setting a filter component to `nil` is
// allowed and will result in that filter rule being skipped (OR 0x11...1).
func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher {
// Create the matcher instance
m := &Matcher{
sectionSize: sectionSize,
schedulers: make(map[uint]*scheduler),
retrievers: make(chan chan uint),
counters: make(chan chan uint),
retrievals: make(chan chan *Retrieval),
deliveries: make(chan *Retrieval),
}
// Calculate the bloom bit indexes for the groups we're interested in
m.filters = nil
for _, filter := range filters {
// Gather the bit indexes of the filter rule, special casing the nil filter
if len(filter) == 0 {
continue
}
bloomBits := make([]bloomIndexes, len(filter))
for i, clause := range filter {
if clause == nil {
bloomBits = nil
break
}
bloomBits[i] = calcBloomIndexes(clause)
}
// Accumulate the filter rules if no nil rule was within
if bloomBits != nil {
m.filters = append(m.filters, bloomBits)
}
}
// For every bit, create a scheduler to load/download the bit vectors
for _, bloomIndexLists := range m.filters {
for _, bloomIndexList := range bloomIndexLists {
for _, bloomIndex := range bloomIndexList {
m.addScheduler(bloomIndex)
}
}
}
return m
}
// addScheduler adds a bit stream retrieval scheduler for the given bit index if
// it has not existed before. If the bit is already selected for filtering, the
// existing scheduler can be used.
func (m *Matcher) addScheduler(idx uint) {
if _, ok := m.schedulers[idx]; ok {
return
}
m.schedulers[idx] = newScheduler(idx)
}
// Start starts the matching process and returns a stream of bloom matches in
// a given range of blocks. If there are no more matches in the range, the result
// channel is closed.
func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
// Make sure we're not creating concurrent sessions
if atomic.SwapUint32(&m.running, 1) == 1 {
return nil, errors.New("matcher already running")
}
defer atomic.StoreUint32(&m.running, 0)
// Initiate a new matching round
session := &MatcherSession{
matcher: m,
quit: make(chan struct{}),
kill: make(chan struct{}),
ctx: ctx,
}
for _, scheduler := range m.schedulers {
scheduler.reset()
}
sink := m.run(begin, end, cap(results), session)
// Read the output from the result sink and deliver to the user
session.pend.Add(1)
go func() {
defer session.pend.Done()
defer close(results)
for {
select {
case <-session.quit:
return
case res, ok := <-sink:
// New match result found
if !ok {
return
}
// Calculate the first and last blocks of the section
sectionStart := res.section * m.sectionSize
first := sectionStart
if begin > first {
first = begin
}
last := sectionStart + m.sectionSize - 1
if end < last {
last = end
}
// Iterate over all the blocks in the section and return the matching ones
for i := first; i <= last; i++ {
// Skip the entire byte if no matches are found inside (and we're processing an entire byte!)
next := res.bitset[(i-sectionStart)/8]
if next == 0 {
if i%8 == 0 {
i += 7
}
continue
}
// Some bit it set, do the actual submatching
if bit := 7 - i%8; next&(1<<bit) != 0 {
select {
case <-session.quit:
return
case results <- i:
}
}
}
}
}
}()
return session, nil
}
// run creates a daisy-chain of sub-matchers, one for the address set and one
// for each topic set, each sub-matcher receiving a section only if the previous
// ones have all found a potential match in one of the blocks of the section,
// then binary AND-ing its own matches and forwarding the result to the next one.
//
// The method starts feeding the section indexes into the first sub-matcher on a
// new goroutine and returns a sink channel receiving the results.
func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) chan *partialMatches {
// Create the source channel and feed section indexes into
source := make(chan *partialMatches, buffer)
session.pend.Add(1)
go fu