aboutsummaryrefslogtreecommitdiff
path: root/core/bloombits
diff options
context:
space:
mode:
Diffstat (limited to 'core/bloombits')
-rw-r--r--core/bloombits/doc.go18
-rw-r--r--core/bloombits/generator.go93
-rw-r--r--core/bloombits/matcher.go650
-rw-r--r--core/bloombits/scheduler.go181
4 files changed, 942 insertions, 0 deletions
diff --git a/core/bloombits/doc.go b/core/bloombits/doc.go
new file mode 100644
index 0000000..3d159e7
--- /dev/null
+++ b/core/bloombits/doc.go
@@ -0,0 +1,18 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package bloombits implements bloom filtering on batches of data.
+package bloombits
diff --git a/core/bloombits/generator.go b/core/bloombits/generator.go
new file mode 100644
index 0000000..3dd54b6
--- /dev/null
+++ b/core/bloombits/generator.go
@@ -0,0 +1,93 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package bloombits
+
+import (
+ "errors"
+
+ "github.com/ava-labs/coreth/core/types"
+)
+
+var (
+ // errSectionOutOfBounds is returned if the user tried to add more bloom filters
+ // to the batch than available space, or if tries to retrieve above the capacity.
+ errSectionOutOfBounds = errors.New("section out of bounds")
+
+ // errBloomBitOutOfBounds is returned if the user tried to retrieve specified
+ // bit bloom above the capacity.
+ errBloomBitOutOfBounds = errors.New("bloom bit out of bounds")
+)
+
+// Generator takes a number of bloom filters and generates the rotated bloom bits
+// to be used for batched filtering.
+type Generator struct {
+ blooms [types.BloomBitLength][]byte // Rotated blooms for per-bit matching
+ sections uint // Number of sections to batch together
+ nextSec uint // Next section to set when adding a bloom
+}
+
+// NewGenerator creates a rotated bloom generator that can iteratively fill a
+// batched bloom filter's bits.
+func NewGenerator(sections uint) (*Generator, error) {
+ if sections%8 != 0 {
+ return nil, errors.New("section count not multiple of 8")
+ }
+ b := &Generator{sections: sections}
+ for i := 0; i < types.BloomBitLength; i++ {
+ b.blooms[i] = make([]byte, sections/8)
+ }
+ return b, nil
+}
+
+// AddBloom takes a single bloom filter and sets the corresponding bit column
+// in memory accordingly.
+func (b *Generator) AddBloom(index uint, bloom types.Bloom) error {
+ // Make sure we're not adding more bloom filters than our capacity
+ if b.nextSec >= b.sections {
+ return errSectionOutOfBounds
+ }
+ if b.nextSec != index {
+ return errors.New("bloom filter with unexpected index")
+ }
+ // Rotate the bloom and insert into our collection
+ byteIndex := b.nextSec / 8
+ bitMask := byte(1) << byte(7-b.nextSec%8)
+
+ for i := 0; i < types.BloomBitLength; i++ {
+ bloomByteIndex := types.BloomByteLength - 1 - i/8
+ bloomBitMask := byte(1) << byte(i%8)
+
+ if (bloom[bloomByteIndex] & bloomBitMask) != 0 {
+ b.blooms[i][byteIndex] |= bitMask
+ }
+ }
+ b.nextSec++
+
+ return nil
+}
+
+// Bitset returns the bit vector belonging to the given bit index after all
+// blooms have been added.
+func (b *Generator) Bitset(idx uint) ([]byte, error) {
+ if b.nextSec != b.sections {
+ return nil, errors.New("bloom not fully generated yet")
+ }
+ if idx >= types.BloomBitLength {
+ return nil, errBloomBitOutOfBounds
+ }
+ return b.blooms[idx], nil
+}
diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go
new file mode 100644
index 0000000..fdf296a
--- /dev/null
+++ b/core/bloombits/matcher.go
@@ -0,0 +1,650 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package bloombits
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "math"
+ "sort"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/ava-labs/go-ethereum/common/bitutil"
+ "github.com/ava-labs/go-ethereum/crypto"
+)
+
+// bloomIndexes represents the bit indexes inside the bloom filter that belong
+// to some key.
+type bloomIndexes [3]uint
+
+// calcBloomIndexes returns the bloom filter bit indexes belonging to the given key.
+func calcBloomIndexes(b []byte) bloomIndexes {
+ b = crypto.Keccak256(b)
+
+ var idxs bloomIndexes
+ for i := 0; i < len(idxs); i++ {
+ idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1])
+ }
+ return idxs
+}
+
+// partialMatches with a non-nil vector represents a section in which some sub-
+// matchers have already found potential matches. Subsequent sub-matchers will
+// binary AND their matches with this vector. If vector is nil, it represents a
+// section to be processed by the first sub-matcher.
+type partialMatches struct {
+ section uint64
+ bitset []byte
+}
+
+// Retrieval represents a request for retrieval task assignments for a given
+// bit with the given number of fetch elements, or a response for such a request.
+// It can also have the actual results set to be used as a delivery data struct.
+//
+// The contest and error fields are used by the light client to terminate matching
+// early if an error is encountered on some path of the pipeline.
+type Retrieval struct {
+ Bit uint
+ Sections []uint64
+ Bitsets [][]byte
+
+ Context context.Context
+ Error error
+}
+
+// Matcher is a pipelined system of schedulers and logic matchers which perform
+// binary AND/OR operations on the bit-streams, creating a stream of potential
+// blocks to inspect for data content.
+type Matcher struct {
+ sectionSize uint64 // Size of the data batches to filter on
+
+ filters [][]bloomIndexes // Filter the system is matching for
+ schedulers map[uint]*scheduler // Retrieval schedulers for loading bloom bits
+
+ retrievers chan chan uint // Retriever processes waiting for bit allocations
+ counters chan chan uint // Retriever processes waiting for task count reports
+ retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
+ deliveries chan *Retrieval // Retriever processes waiting for task response deliveries
+
+ running uint32 // Atomic flag whether a session is live or not
+}
+
+// NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
+// address and topic filtering on them. Setting a filter component to `nil` is
+// allowed and will result in that filter rule being skipped (OR 0x11...1).
+func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher {
+ // Create the matcher instance
+ m := &Matcher{
+ sectionSize: sectionSize,
+ schedulers: make(map[uint]*scheduler),
+ retrievers: make(chan chan uint),
+ counters: make(chan chan uint),
+ retrievals: make(chan chan *Retrieval),
+ deliveries: make(chan *Retrieval),
+ }
+ // Calculate the bloom bit indexes for the groups we're interested in
+ m.filters = nil
+
+ for _, filter := range filters {
+ // Gather the bit indexes of the filter rule, special casing the nil filter
+ if len(filter) == 0 {
+ continue
+ }
+ bloomBits := make([]bloomIndexes, len(filter))
+ for i, clause := range filter {
+ if clause == nil {
+ bloomBits = nil
+ break
+ }
+ bloomBits[i] = calcBloomIndexes(clause)
+ }
+ // Accumulate the filter rules if no nil rule was within
+ if bloomBits != nil {
+ m.filters = append(m.filters, bloomBits)
+ }
+ }
+ // For every bit, create a scheduler to load/download the bit vectors
+ for _, bloomIndexLists := range m.filters {
+ for _, bloomIndexList := range bloomIndexLists {
+ for _, bloomIndex := range bloomIndexList {
+ m.addScheduler(bloomIndex)
+ }
+ }
+ }
+ return m
+}
+
+// addScheduler adds a bit stream retrieval scheduler for the given bit index if
+// it has not existed before. If the bit is already selected for filtering, the
+// existing scheduler can be used.
+func (m *Matcher) addScheduler(idx uint) {
+ if _, ok := m.schedulers[idx]; ok {
+ return
+ }
+ m.schedulers[idx] = newScheduler(idx)
+}
+
+// Start starts the matching process and returns a stream of bloom matches in
+// a given range of blocks. If there are no more matches in the range, the result
+// channel is closed.
+func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
+ // Make sure we're not creating concurrent sessions
+ if atomic.SwapUint32(&m.running, 1) == 1 {
+ return nil, errors.New("matcher already running")
+ }
+ defer atomic.StoreUint32(&m.running, 0)
+
+ // Initiate a new matching round
+ session := &MatcherSession{
+ matcher: m,
+ quit: make(chan struct{}),
+ kill: make(chan struct{}),
+ ctx: ctx,
+ }
+ for _, scheduler := range m.schedulers {
+ scheduler.reset()
+ }
+ sink := m.run(begin, end, cap(results), session)
+
+ // Read the output from the result sink and deliver to the user
+ session.pend.Add(1)
+ go func() {
+ defer session.pend.Done()
+ defer close(results)
+
+ for {
+ select {
+ case <-session.quit:
+ return
+
+ case res, ok := <-sink:
+ // New match result found
+ if !ok {
+ return
+ }
+ // Calculate the first and last blocks of the section
+ sectionStart := res.section * m.sectionSize
+
+ first := sectionStart
+ if begin > first {
+ first = begin
+ }
+ last := sectionStart + m.sectionSize - 1
+ if end < last {
+ last = end
+ }
+ // Iterate over all the blocks in the section and return the matching ones
+ for i := first; i <= last; i++ {
+ // Skip the entire byte if no matches are found inside (and we're processing an entire byte!)
+ next := res.bitset[(i-sectionStart)/8]
+ if next == 0 {
+ if i%8 == 0 {
+ i += 7
+ }
+ continue
+ }
+ // Some bit it set, do the actual submatching
+ if bit := 7 - i%8; next&(1<<bit) != 0 {
+ select {
+ case <-session.quit:
+ return
+ case results <- i:
+ }
+ }
+ }
+ }
+ }
+ }()
+ return session, nil
+}
+
+// run creates a daisy-chain of sub-matchers, one for the address set and one
+// for each topic set, each sub-matcher receiving a section only if the previous
+// ones have all found a potential match in one of the blocks of the section,
+// then binary AND-ing its own matches and forwarding the result to the next one.
+//
+// The method starts feeding the section indexes into the first sub-matcher on a
+// new goroutine and returns a sink channel receiving the results.
+func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) chan *partialMatches {
+ // Create the source channel and feed section indexes into
+ source := make(chan *partialMatches, buffer)
+
+ session.pend.Add(1)
+ go func() {
+ defer session.pend.Done()
+ defer close(source)
+
+ for i := begin / m.sectionSize; i <= end/m.sectionSize; i++ {
+ select {
+ case <-session.quit:
+ return
+ case source <- &partialMatches{i, bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}:
+ }
+ }
+ }()
+ // Assemble the daisy-chained filtering pipeline
+ next := source
+ dist := make(chan *request, buffer)
+
+ for _, bloom := range m.filters {
+ next = m.subMatch(next, dist, bloom, session)
+ }
+ // Start the request distribution
+ session.pend.Add(1)
+ go m.distributor(dist, session)
+
+ return next
+}
+
+// subMatch creates a sub-matcher that filters for a set of addresses or topics, binary OR-s those matches, then
+// binary AND-s the result to the daisy-chain input (source) and forwards it to the daisy-chain output.
+// The matches of each address/topic are calculated by fetching the given sections of the three bloom bit indexes belonging to
+// that address/topic, and binary AND-ing those vectors together.
+func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloom []bloomIndexes, session *MatcherSession) chan *partialMatches {
+ // Start the concurrent schedulers for each bit required by the bloom filter
+ sectionSources := make([][3]chan uint64, len(bloom))
+ sectionSinks := make([][3]chan []byte, len(bloom))
+ for i, bits := range bloom {
+ for j, bit := range bits {
+ sectionSources[i][j] = make(chan uint64, cap(source))
+ sectionSinks[i][j] = make(chan []byte, cap(source))
+
+ m.schedulers[bit].run(sectionSources[i][j], dist, sectionSinks[i][j], session.quit, &session.pend)
+ }
+ }
+
+ process := make(chan *partialMatches, cap(source)) // entries from source are forwarded here after fetches have been initiated
+ results := make(chan *partialMatches, cap(source))
+
+ session.pend.Add(2)
+ go func() {
+ // Tear down the goroutine and terminate all source channels
+ defer session.pend.Done()
+ defer close(process)
+
+ defer func() {
+ for _, bloomSources := range sectionSources {
+ for _, bitSource := range bloomSources {
+ close(bitSource)
+ }
+ }
+ }()
+ // Read sections from the source channel and multiplex into all bit-schedulers
+ for {
+ select {
+ case <-session.quit:
+ return
+
+ case subres, ok := <-source:
+ // New subresult from previous link
+ if !ok {
+ return
+ }
+ // Multiplex the section index to all bit-schedulers
+ for _, bloomSources := range sectionSources {
+ for _, bitSource := range bloomSources {
+ select {
+ case <-session.quit:
+ return
+ case bitSource <- subres.section:
+ }
+ }
+ }
+ // Notify the processor that this section will become available
+ select {
+ case <-session.quit:
+ return
+ case process <- subres:
+ }
+ }
+ }
+ }()
+
+ go func() {
+ // Tear down the goroutine and terminate the final sink channel
+ defer session.pend.Done()
+ defer close(results)
+
+ // Read the source notifications and collect the delivered results
+ for {
+ select {
+ case <-session.quit:
+ return
+
+ case subres, ok := <-process:
+ // Notified of a section being retrieved
+ if !ok {
+ return
+ }
+ // Gather all the sub-results and merge them together
+ var orVector []byte
+ for _, bloomSinks := range sectionSinks {
+ var andVector []byte
+ for _, bitSink := range bloomSinks {
+ var data []byte
+ select {
+ case <-session.quit:
+ return
+ case data = <-bitSink:
+ }
+ if andVector == nil {
+ andVector = make([]byte, int(m.sectionSize/8))
+ copy(andVector, data)
+ } else {
+ bitutil.ANDBytes(andVector, andVector, data)
+ }
+ }
+ if orVector == nil {
+ orVector = andVector
+ } else {
+ bitutil.ORBytes(orVector, orVector, andVector)
+ }
+ }
+
+ if orVector == nil {
+ orVector = make([]byte, int(m.sectionSize/8))
+ }
+ if subres.bitset != nil {
+ bitutil.ANDBytes(orVector, orVector, subres.bitset)
+ }
+ if bitutil.TestBytes(orVector) {
+ select {
+ case <-session.quit:
+ return
+ case results <- &partialMatches{subres.section, orVector}:
+ }
+ }
+ }
+ }
+ }()
+ return results
+}
+
+// distributor receives requests from the schedulers and queues them into a set
+// of pending requests, which are assigned to retrievers wanting to fulfil them.
+func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
+ defer session.pend.Done()
+
+ var (
+ requests = make(map[uint][]uint64) // Per-bit list of section requests, ordered by section number
+ unallocs = make(map[uint]struct{}) // Bits with pending requests but not allocated to any retriever
+ retrievers chan chan uint // Waiting retrievers (toggled to nil if unallocs is empty)
+ )
+ var (
+ allocs int // Number of active allocations to handle graceful shutdown requests
+ shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests
+ )
+
+ // assign is a helper method fo try to assign a pending bit an actively
+ // listening servicer, or schedule it up for later when one arrives.
+ assign := func(bit uint) {
+ select {
+ case fetcher := <-m.retrievers:
+ allocs++
+ fetcher <- bit
+ default:
+ // No retrievers active, start listening for new ones
+ retrievers = m.retrievers
+ unallocs[bit] = struct{}{}
+ }
+ }
+
+ for {
+ select {
+ case <-shutdown:
+ // Graceful shutdown requested, wait until all pending requests are honoured
+ if allocs == 0 {
+ return
+ }
+ shutdown = nil
+
+ case <-session.kill:
+ // Pending requests not honoured in time, hard terminate
+ return
+
+ case req := <-dist:
+ // New retrieval request arrived to be distributed to some fetcher process
+ queue := requests[req.bit]
+ index := sort.Search(len(queue), func(i int) bool { return queue[i] >= req.section })
+ requests[req.bit] = append(queue[:index], append([]uint64{req.section}, queue[index:]...)...)
+
+ // If it's a new bit and we have waiting fetchers, allocate to them
+ if len(queue) == 0 {
+ assign(req.bit)
+ }
+
+ case fetcher := <-retrievers:
+ // New retriever arrived, find the lowest section-ed bit to assign
+ bit, best := uint(0), uint64(math.MaxUint64)
+ for idx := range unallocs {
+ if requests[idx][0] < best {
+ bit, best = idx, requests[idx][0]
+ }
+ }
+ // Stop tracking this bit (and alloc notifications if no more work is available)
+ delete(unallocs, bit)
+ if len(unallocs) == 0 {
+ retrievers = nil
+ }
+ allocs++
+ fetcher <- bit
+
+ case fetcher := <-m.counters:
+ // New task count request arrives, return number of items
+ fetcher <- uint(len(requests[<-fetcher]))
+
+ case fetcher := <-m.retrievals:
+ // New fetcher waiting for tasks to retrieve, assign
+ task := <-fetcher
+ if want := len(task.Sections); want >= len(requests[task.Bit]) {
+ task.Sections = requests[task.Bit]
+ delete(requests, task.Bit)
+ } else {
+ task.Sections = append(task.Sections[:0], requests[task.Bit][:want]...)
+ requests[task.Bit] = append(requests[task.Bit][:0], requests[task.Bit][want:]...)
+ }
+ fetcher <- task
+
+ // If anything was left unallocated, try to assign to someone else
+ if len(requests[task.Bit]) > 0 {
+ assign(task.Bit)
+ }
+
+ case result := <-m.deliveries:
+ // New retrieval task response from fetcher, split out missing sections and
+ // deliver complete ones
+ var (
+ sections = make([]uint64, 0, len(result.Sections))
+ bitsets = make([][]byte, 0, len(result.Bitsets))
+ missing = make([]uint64, 0, len(result.Sections))
+ )
+ for i, bitset := range result.Bitsets {
+ if len(bitset) == 0 {
+ missing = append(missing, result.Sections[i])
+ continue
+ }
+ sections = append(sections, result.Sections[i])
+ bitsets = append(bitsets, bitset)
+ }
+ m.schedulers[result.Bit].deliver(sections, bitsets)
+ allocs--
+
+ // Reschedule missing sections and allocate bit if newly available
+ if len(missing) > 0 {
+ queue := requests[result.Bit]
+ for _, section := range missing {
+ index := sort.Search(len(queue), func(i int) bool { return queue[i] >= section })
+ queue = append(queue[:index], append([]uint64{section}, queue[index:]...)...)
+ }
+ requests[result.Bit] = queue
+
+ if len(queue) == len(missing) {
+ assign(result.Bit)
+ }
+ }
+ // If we're in the process of shutting down, terminate
+ if allocs == 0 && shutdown == nil {
+ return
+ }
+ }
+ }
+}
+
+// MatcherSession is returned by a started matcher to be used as a terminator
+// for the actively running matching operation.
+type MatcherSession struct {
+ matcher *Matcher
+
+ closer sync.Once // Sync object to ensure we only ever close once
+ quit chan struct{} // Quit channel to request pipeline termination
+ kill chan struct{} // Term channel to signal non-graceful forced shutdown
+
+ ctx context.Context // Context used by the light client to abort filtering
+ err atomic.Value // Global error to track retrieval failures deep in the chain
+
+ pend sync.WaitGroup
+}
+
+// Close stops the matching process and waits for all subprocesses to terminate
+// before returning. The timeout may be used for graceful shutdown, allowing the
+// currently running retrievals to complete before this time.
+func (s *MatcherSession) Close() {
+ s.closer.Do(func() {
+ // Signal termination and wait for all goroutines to tear down
+ close(s.quit)
+ time.AfterFunc(time.Second, func() { close(s.kill) })
+ s.pend.Wait()
+ })
+}
+
+// Error returns any failure encountered during the matching session.
+func (s *MatcherSession) Error() error {
+ if err := s.err.Load(); err != nil {
+ return err.(error)
+ }
+ return nil
+}
+
+// AllocateRetrieval assigns a bloom bit index to a client process that can either
+// immediately request and fetch the section contents assigned to this bit or wait
+// a little while for more sections to be requested.
+func (s *MatcherSession) AllocateRetrieval() (uint, bool) {
+ fetcher := make(chan uint)
+
+ select {
+ case <-s.quit:
+ return 0, false
+ case s.matcher.retrievers <- fetcher:
+ bit, ok := <-fetcher
+ return bit, ok
+ }
+}
+
+// PendingSections returns the number of pending section retrievals belonging to
+// the given bloom bit index.
+func (s *MatcherSession) PendingSections(bit uint) int {
+ fetcher := make(chan uint)
+
+ select {
+ case <-s.quit:
+ return 0
+ case s.matcher.counters <- fetcher:
+ fetcher <- bit
+ return int(<-fetcher)
+ }
+}
+
+// AllocateSections assigns all or part of an already allocated bit-task queue
+// to the requesting process.
+func (s *MatcherSession) AllocateSections(bit uint, count int) []uint64 {
+ fetcher := make(chan *Retrieval)
+
+ select {
+ case <-s.quit:
+ return nil
+ case s.matcher.retrievals <- fetcher:
+ task := &Retrieval{
+ Bit: bit,
+ Sections: make([]uint64, count),
+ }
+ fetcher <- task
+ return (<-fetcher).Sections
+ }
+}
+
+// DeliverSections delivers a batch of section bit-vectors for a specific bloom
+// bit index to be injected into the processing pipeline.
+func (s *MatcherSession) DeliverSections(bit uint, sections []uint64, bitsets [][]byte) {
+ select {
+ case <-s.kill:
+ return
+ case s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}:
+ }
+}
+
+// Multiplex polls the matcher session for retrieval tasks and multiplexes it into
+// the requested retrieval queue to be serviced together with other sessions.
+//
+// This method will block for the lifetime of the session. Even after termination
+// of the session, any request in-flight need to be responded to! Empty responses
+// are fine though in that case.
+func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
+ for {
+ // Allocate a new bloom bit index to retrieve data for, stopping when done
+ bit, ok := s.AllocateRetrieval()
+ if !ok {
+ return
+ }
+ // Bit allocated, throttle a bit if we're below our batch limit
+ if s.PendingSections(bit) < batch {
+ select {
+ case <-s.quit:
+ // Session terminating, we can't meaningfully service, abort
+ s.AllocateSections(bit, 0)
+ s.DeliverSections(bit, []uint64{}, [][]byte{})
+ return
+
+ case <-time.After(wait):
+ // Throttling up, fetch whatever's available
+ }
+ }
+ // Allocate as much as we can handle and request servicing
+ sections := s.AllocateSections(bit, batch)
+ request := make(chan *Retrieval)
+
+ select {
+ case <-s.quit:
+ // Session terminating, we can't meaningfully service, abort
+ s.DeliverSections(bit, sections, make([][]byte, len(sections)))
+ return
+
+ case mux <- request:
+ // Retrieval accepted, something must arrive before we're aborting
+ request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx}
+
+ result := <-request
+ if result.Error != nil {
+ s.err.Store(result.Error)
+ s.Close()
+ }
+ s.DeliverSections(result.Bit, result.Sections, result.Bitsets)
+ }
+ }
+}
diff --git a/core/bloombits/scheduler.go b/core/bloombits/scheduler.go
new file mode 100644
index 0000000..6449c74
--- /dev/null
+++ b/core/bloombits/scheduler.go
@@ -0,0 +1,181 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package bloombits
+
+import (
+ "sync"
+)
+
+// request represents a bloom retrieval task to prioritize and pull from the local
+// database or remotely from the network.
+type request struct {
+ section uint64 // Section index to retrieve the a bit-vector from
+ bit uint // Bit index within the section to retrieve the vector of
+}
+
+// response represents the state of a requested bit-vector through a scheduler.
+type response struct {
+ cached []byte // Cached bits to dedup multiple requests
+ done chan struct{} // Channel to allow waiting for completion
+}
+
+// scheduler handles the scheduling of bloom-filter retrieval operations for
+// entire section-batches belonging to a single bloom bit. Beside scheduling the
+// retrieval operations, this struct also deduplicates the requests and caches
+// the results to minimize network/database overhead even in complex filtering
+// scenarios.
+type scheduler struct {
+ bit uint // Index of the bit in the bloom filter this scheduler is responsible for
+ responses map[uint64]*response // Currently pending retrieval requests or already cached responses
+ lock sync.Mutex // Lock protecting the responses from concurrent access
+}
+
+// newScheduler creates a new bloom-filter retrieval scheduler for a specific
+// bit index.
+func newScheduler(idx uint) *scheduler {
+ return &scheduler{
+ bit: idx,
+ responses: make(map[uint64]*response),
+ }
+}
+
+// run creates a retrieval pipeline, receiving section indexes from sections and
+// returning the results in the same order through the done channel. Concurrent
+// runs of the same scheduler are allowed, leading to retrieval task deduplication.
+func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) {
+ // Create a forwarder channel between requests and responses of the same size as
+ // the distribution channel (since that will block the pipeline anyway).
+ pend := make(chan uint64, cap(dist))
+
+ // Start the pipeline schedulers to forward between user -> distributor -> user
+ wg.Add(2)
+ go s.scheduleRequests(sections, dist, pend, quit, wg)
+ go s.scheduleDeliveries(pend, done, quit, wg)
+}
+
+// reset cleans up any leftovers from previous runs. This is required before a
+// restart to ensure the no previously requested but never delivered state will
+// cause a lockup.
+func (s *scheduler) reset() {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ for section, res := range s.responses {
+ if res.cached == nil {
+ delete(s.responses, section)
+ }
+ }
+}
+
+// scheduleRequests reads section retrieval requests from the input channel,
+// deduplicates the stream and pushes unique retrieval tasks into the distribution
+// channel for a database or network layer to honour.
+func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}, wg *sync.WaitGroup) {
+ // Clean up the goroutine and pipeline when done
+ defer wg.Done()
+ defer close(pend)
+
+ // Keep reading and scheduling section requests
+ for {
+ select {
+ case <-quit:
+ return
+
+ case section, ok := <-reqs:
+ // New section retrieval requested
+ if !ok {
+ return
+ }
+ // Deduplicate retrieval requests
+ unique := false
+
+ s.lock.Lock()
+ if s.responses[section] == nil {
+ s.responses[section] = &response{
+ done: make(chan struct{}),
+ }
+ unique = true
+ }
+ s.lock.Unlock()
+
+ // Schedule the section for retrieval and notify the deliverer to expect this section
+ if unique {
+ select {
+ case <-quit:
+ return
+ case dist <- &request{bit: s.bit, section: section}:
+ }
+ }
+ select {
+ case <-quit:
+ return
+ case pend <- section:
+ }
+ }
+ }
+}
+
+// scheduleDeliveries reads section acceptance notifications and waits for them
+// to be delivered, pushing them into the output data buffer.
+func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) {
+ // Clean up the goroutine and pipeline when done
+ defer wg.Done()
+ defer close(done)
+
+ // Keep reading notifications and scheduling deliveries
+ for {
+ select {
+ case <-quit:
+ return
+
+ case idx, ok := <-pend:
+ // New section retrieval pending
+ if !ok {
+ return
+ }
+ // Wait until the request is honoured
+ s.lock.Lock()
+ res := s.responses[idx]
+ s.lock.Unlock()
+
+ select {
+ case <-quit:
+ return
+ case <-res.done:
+ }
+ // Deliver the result
+ select {
+ case <-quit:
+ return
+ case done <- res.cached:
+ }
+ }
+ }
+}
+
+// deliver is called by the request distributor when a reply to a request arrives.
+func (s *scheduler) deliver(sections []uint64, data [][]byte) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ for i, section := range sections {
+ if res := s.responses[section]; res != nil && res.cached == nil { // Avoid non-requests and double deliveries
+ res.cached = data[i]
+ close(res.done)
+ }
+ }
+}