diff options
Diffstat (limited to 'eth/filters')
-rw-r--r-- | eth/filters/api.go | 16 | ||||
-rw-r--r-- | eth/filters/filter.go | 12 | ||||
-rw-r--r-- | eth/filters/filter_system.go | 175 |
3 files changed, 97 insertions, 106 deletions
diff --git a/eth/filters/api.go b/eth/filters/api.go index a1d0b21..71e6454 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -27,11 +27,11 @@ import ( "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/rpc" - ethereum "github.com/ava-labs/go-ethereum" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/common/hexutil" - "github.com/ava-labs/go-ethereum/ethdb" - "github.com/ava-labs/go-ethereum/event" + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" ) var ( @@ -65,9 +65,8 @@ type PublicFilterAPI struct { func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { api := &PublicFilterAPI{ backend: backend, - mux: backend.EventMux(), chainDb: backend.ChainDb(), - events: NewEventSystem(backend.EventMux(), backend, lightMode), + events: NewEventSystem(backend, lightMode), filters: make(map[rpc.ID]*filter), } go api.timeoutLoop() @@ -79,6 +78,7 @@ func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { // Tt is started when the api is created. func (api *PublicFilterAPI) timeoutLoop() { ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() for { <-ticker.C api.filtersMu.Lock() @@ -428,7 +428,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { hashes := f.hashes f.hashes = nil return returnHashes(hashes), nil - case LogsSubscription: + case LogsSubscription, MinedAndPendingLogsSubscription: logs := f.logs f.logs = nil return returnLogs(logs), nil diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 6d4a74d..48a76c7 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -22,17 +22,16 @@ import ( "math/big" "github.com/ava-labs/coreth/core" + "github.com/ava-labs/coreth/core/bloombits" "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/rpc" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/core/bloombits" - "github.com/ava-labs/go-ethereum/ethdb" - "github.com/ava-labs/go-ethereum/event" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" ) type Backend interface { ChainDb() ethdb.Database - EventMux() *event.TypeMux HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) @@ -42,6 +41,7 @@ type Backend interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription + SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) @@ -210,7 +210,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err } } -// indexedLogs returns the logs matching the filter criteria based on raw block +// unindexedLogs returns the logs matching the filter criteria based on raw block // iteration and bloom matching. func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { var logs []*types.Log diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 41427c7..1c0f1bf 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -20,7 +20,6 @@ package filters import ( "context" - "errors" "fmt" "sync" "time" @@ -29,10 +28,10 @@ import ( "github.com/ava-labs/coreth/core/rawdb" "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/rpc" - ethereum "github.com/ava-labs/go-ethereum" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/event" - "github.com/ava-labs/go-ethereum/log" + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" ) // Type determines the kind of filter and is used to put the filter in to @@ -58,7 +57,6 @@ const ( ) const ( - // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 @@ -70,10 +68,6 @@ const ( chainEvChanSize = 10 ) -var ( - ErrInvalidSubscriptionID = errors.New("invalid id") -) - type subscription struct { id rpc.ID typ Type @@ -89,25 +83,25 @@ type subscription struct { // EventSystem creates subscriptions, processes events and broadcasts them to the // subscription which match the subscription criteria. type EventSystem struct { - mux *event.TypeMux backend Backend lightMode bool lastHead *types.Header // Subscriptions - txsSub event.Subscription // Subscription for new transaction event - logsSub event.Subscription // Subscription for new log event - rmLogsSub event.Subscription // Subscription for removed log event - chainSub event.Subscription // Subscription for new chain event - pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event + txsSub event.Subscription // Subscription for new transaction event + logsSub event.Subscription // Subscription for new log event + rmLogsSub event.Subscription // Subscription for removed log event + pendingLogsSub event.Subscription // Subscription for pending log event + chainSub event.Subscription // Subscription for new chain event // Channels - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification - txsCh chan core.NewTxsEvent // Channel to receive new transactions event - logsCh chan []*types.Log // Channel to receive new log event - rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event - chainCh chan core.ChainEvent // Channel to receive new chain event + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txsCh chan core.NewTxsEvent // Channel to receive new transactions event + logsCh chan []*types.Log // Channel to receive new log event + pendingLogsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -116,17 +110,17 @@ type EventSystem struct { // // The returned manager has a loop that needs to be stopped with the Stop function // or by stopping the given mux. -func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem { +func NewEventSystem(backend Backend, lightMode bool) *EventSystem { m := &EventSystem{ - mux: mux, - backend: backend, - lightMode: lightMode, - install: make(chan *subscription), - uninstall: make(chan *subscription), - txsCh: make(chan core.NewTxsEvent, txChanSize), - logsCh: make(chan []*types.Log, logsChanSize), - rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), - chainCh: make(chan core.ChainEvent, chainEvChanSize), + backend: backend, + lightMode: lightMode, + install: make(chan *subscription), + uninstall: make(chan *subscription), + txsCh: make(chan core.NewTxsEvent, txChanSize), + logsCh: make(chan []*types.Log, logsChanSize), + rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), + pendingLogsCh: make(chan []*types.Log, logsChanSize), + chainCh: make(chan core.ChainEvent, chainEvChanSize), } // Subscribe events @@ -134,12 +128,10 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) - // TODO(rjl493456442): use feed to subscribe pending log event - m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) + m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh) // Make sure none of the subscriptions are empty - if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || - m.pendingLogSub.Closed() { + if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil { log.Crit("Subscribe for event system failed") } @@ -316,58 +308,61 @@ func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscript type filterIndex map[Type]map[rpc.ID]*subscription -// broadcast event to filters that match criteria. -func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { - if ev == nil { +func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) { + if len(ev) == 0 { return } + for _, f := range filters[LogsSubscription] { + matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) + if len(matchedLogs) > 0 { + f.logs <- matchedLogs + } + } +} - switch e := ev.(type) { - case []*types.Log: - if len(e) > 0 { - for _, f := range filters[LogsSubscription] { - if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } - } +func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) { + if len(ev) == 0 { + return + } + for _, f := range filters[PendingLogsSubscription] { + matchedLogs := filterLogs(ev, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) + if len(matchedLogs) > 0 { + f.logs <- matchedLogs } - case core.RemovedLogsEvent: - for _, f := range filters[LogsSubscription] { - if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + } +} + +func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLogsEvent) { + for _, f := range filters[LogsSubscription] { + matchedLogs := filterLogs(ev.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) + if len(matchedLogs) > 0 { + f.logs <- matchedLogs } - case *event.TypeMuxEvent: - if muxe, ok := e.Data.(core.PendingLogsEvent); ok { - for _, f := range filters[PendingLogsSubscription] { - if e.Time.After(f.created) { - if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + } +} + +func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) { + hashes := make([]common.Hash, 0, len(ev.Txs)) + for _, tx := range ev.Txs { + hashes = append(hashes, tx.Hash()) + } + for _, f := range filters[PendingTransactionsSubscription] { + f.hashes <- hashes + } +} + +func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) { + for _, f := range filters[BlocksSubscription] { + f.headers <- ev.Block.Header() + } + if es.lightMode && len(filters[LogsSubscription]) > 0 { + es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) { + for _, f := range filters[LogsSubscription] { + if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } - } - case core.NewTxsEvent: - hashes := make([]common.Hash, 0, len(e.Txs)) - for _, tx := range e.Txs { - hashes = append(hashes, tx.Hash()) - } - for _, f := range filters[PendingTransactionsSubscription] { - f.hashes <- hashes - } - case core.ChainEvent: - for _, f := range filters[BlocksSubscription] { - f.headers <- e.Block.Header() - } - if es.lightMode && len(filters[LogsSubscription]) > 0 { - es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) { - for _, f := range filters[LogsSubscription] { - if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } - } - }) - } + }) } } @@ -448,10 +443,10 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. func (es *EventSystem) eventLoop() { // Ensure all subscriptions get cleaned up defer func() { - es.pendingLogSub.Unsubscribe() es.txsSub.Unsubscribe() es.logsSub.Unsubscribe() es.rmLogsSub.Unsubscribe() + es.pendingLogsSub.Unsubscribe() es.chainSub.Unsubscribe() }() @@ -462,20 +457,16 @@ func (es *EventSystem) eventLoop() { for { select { - // Handle subscribed events case ev := <-es.txsCh: - es.broadcast(index, ev) + es.handleTxsEvent(index, ev) case ev := <-es.logsCh: - es.broadcast(index, ev) + es.handleLogs(index, ev) case ev := <-es.rmLogsCh: - es.broadcast(index, ev) + es.handleRemovedLogs(index, ev) + case ev := <-es.pendingLogsCh: + es.handlePendingLogs(index, ev) case ev := <-es.chainCh: - es.broadcast(index, ev) - case ev, active := <-es.pendingLogSub.Chan(): - if !active { // system stopped - return - } - es.broadcast(index, ev) + es.handleChainEvent(index, ev) case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { |