From 78745551c077bf54151202138c2629f288769561 Mon Sep 17 00:00:00 2001
From: Determinant <tederminant@gmail.com>
Date: Tue, 15 Sep 2020 23:55:34 -0400
Subject: WIP: geth-tavum

---
 core/bloombits/matcher.go | 63 +++++++++++++++++++----------------------------
 1 file changed, 26 insertions(+), 37 deletions(-)

(limited to 'core/bloombits')

diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go
index fdf296a..927232b 100644
--- a/core/bloombits/matcher.go
+++ b/core/bloombits/matcher.go
@@ -26,8 +26,8 @@ import (
 	"sync/atomic"
 	"time"
 
-	"github.com/ava-labs/go-ethereum/common/bitutil"
-	"github.com/ava-labs/go-ethereum/crypto"
+	"github.com/ethereum/go-ethereum/common/bitutil"
+	"github.com/ethereum/go-ethereum/crypto"
 )
 
 // bloomIndexes represents the bit indexes inside the bloom filter that belong
@@ -155,7 +155,6 @@ func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uin
 	session := &MatcherSession{
 		matcher: m,
 		quit:    make(chan struct{}),
-		kill:    make(chan struct{}),
 		ctx:     ctx,
 	}
 	for _, scheduler := range m.schedulers {
@@ -386,10 +385,8 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
 		requests   = make(map[uint][]uint64) // Per-bit list of section requests, ordered by section number
 		unallocs   = make(map[uint]struct{}) // Bits with pending requests but not allocated to any retriever
 		retrievers chan chan uint            // Waiting retrievers (toggled to nil if unallocs is empty)
-	)
-	var (
-		allocs   int            // Number of active allocations to handle graceful shutdown requests
-		shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests
+		allocs     int                       // Number of active allocations to handle graceful shutdown requests
+		shutdown   = session.quit            // Shutdown request channel, will gracefully wait for pending requests
 	)
 
 	// assign is a helper method fo try to assign a pending bit an actively
@@ -409,15 +406,12 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
 	for {
 		select {
 		case <-shutdown:
-			// Graceful shutdown requested, wait until all pending requests are honoured
+			// Shutdown requested. No more retrievers can be allocated,
+			// but we still need to wait until all pending requests have returned.
+			shutdown = nil
 			if allocs == 0 {
 				return
 			}
-			shutdown = nil
-
-		case <-session.kill:
-			// Pending requests not honoured in time, hard terminate
-			return
 
 		case req := <-dist:
 			// New retrieval request arrived to be distributed to some fetcher process
@@ -499,8 +493,9 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
 					assign(result.Bit)
 				}
 			}
-			// If we're in the process of shutting down, terminate
-			if allocs == 0 && shutdown == nil {
+
+			// End the session when all pending deliveries have arrived.
+			if shutdown == nil && allocs == 0 {
 				return
 			}
 		}
@@ -514,7 +509,6 @@ type MatcherSession struct {
 
 	closer sync.Once     // Sync object to ensure we only ever close once
 	quit   chan struct{} // Quit channel to request pipeline termination
-	kill   chan struct{} // Term channel to signal non-graceful forced shutdown
 
 	ctx context.Context // Context used by the light client to abort filtering
 	err atomic.Value    // Global error to track retrieval failures deep in the chain
@@ -529,7 +523,6 @@ func (s *MatcherSession) Close() {
 	s.closer.Do(func() {
 		// Signal termination and wait for all goroutines to tear down
 		close(s.quit)
-		time.AfterFunc(time.Second, func() { close(s.kill) })
 		s.pend.Wait()
 	})
 }
@@ -542,10 +535,10 @@ func (s *MatcherSession) Error() error {
 	return nil
 }
 
-// AllocateRetrieval assigns a bloom bit index to a client process that can either
+// allocateRetrieval assigns a bloom bit index to a client process that can either
 // immediately request and fetch the section contents assigned to this bit or wait
 // a little while for more sections to be requested.
-func (s *MatcherSession) AllocateRetrieval() (uint, bool) {
+func (s *MatcherSession) allocateRetrieval() (uint, bool) {
 	fetcher := make(chan uint)
 
 	select {
@@ -557,9 +550,9 @@ func (s *MatcherSession) AllocateRetrieval() (uint, bool) {
 	}
 }
 
-// PendingSections returns the number of pending section retrievals belonging to
+// pendingSections returns the number of pending section retrievals belonging to
 // the given bloom bit index.
-func (s *MatcherSession) PendingSections(bit uint) int {
+func (s *MatcherSession) pendingSections(bit uint) int {
 	fetcher := make(chan uint)
 
 	select {
@@ -571,9 +564,9 @@ func (s *MatcherSession) PendingSections(bit uint) int {
 	}
 }
 
-// AllocateSections assigns all or part of an already allocated bit-task queue
+// allocateSections assigns all or part of an already allocated bit-task queue
 // to the requesting process.
-func (s *MatcherSession) AllocateSections(bit uint, count int) []uint64 {
+func (s *MatcherSession) allocateSections(bit uint, count int) []uint64 {
 	fetcher := make(chan *Retrieval)
 
 	select {
@@ -589,14 +582,10 @@ func (s *MatcherSession) AllocateSections(bit uint, count int) []uint64 {
 	}
 }
 
-// DeliverSections delivers a batch of section bit-vectors for a specific bloom
+// deliverSections delivers a batch of section bit-vectors for a specific bloom
 // bit index to be injected into the processing pipeline.
-func (s *MatcherSession) DeliverSections(bit uint, sections []uint64, bitsets [][]byte) {
-	select {
-	case <-s.kill:
-		return
-	case s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}:
-	}
+func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets [][]byte) {
+	s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}
 }
 
 // Multiplex polls the matcher session for retrieval tasks and multiplexes it into
@@ -608,17 +597,17 @@ func (s *MatcherSession) DeliverSections(bit uint, sections []uint64, bitsets []
 func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
 	for {
 		// Allocate a new bloom bit index to retrieve data for, stopping when done
-		bit, ok := s.AllocateRetrieval()
+		bit, ok := s.allocateRetrieval()
 		if !ok {
 			return
 		}
 		// Bit allocated, throttle a bit if we're below our batch limit
-		if s.PendingSections(bit) < batch {
+		if s.pendingSections(bit) < batch {
 			select {
 			case <-s.quit:
 				// Session terminating, we can't meaningfully service, abort
-				s.AllocateSections(bit, 0)
-				s.DeliverSections(bit, []uint64{}, [][]byte{})
+				s.allocateSections(bit, 0)
+				s.deliverSections(bit, []uint64{}, [][]byte{})
 				return
 
 			case <-time.After(wait):
@@ -626,13 +615,13 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
 			}
 		}
 		// Allocate as much as we can handle and request servicing
-		sections := s.AllocateSections(bit, batch)
+		sections := s.allocateSections(bit, batch)
 		request := make(chan *Retrieval)
 
 		select {
 		case <-s.quit:
 			// Session terminating, we can't meaningfully service, abort
-			s.DeliverSections(bit, sections, make([][]byte, len(sections)))
+			s.deliverSections(bit, sections, make([][]byte, len(sections)))
 			return
 
 		case mux <- request:
@@ -644,7 +633,7 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
 				s.err.Store(result.Error)
 				s.Close()
 			}
-			s.DeliverSections(result.Bit, result.Sections, result.Bitsets)
+			s.deliverSections(result.Bit, result.Sections, result.Bitsets)
 		}
 	}
 }
-- 
cgit v1.2.3-70-g09d2