aboutsummaryrefslogtreecommitdiff
path: root/eth/filters/filter_system.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/filters/filter_system.go')
-rw-r--r--eth/filters/filter_system.go175
1 files changed, 83 insertions, 92 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 41427c7..1c0f1bf 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -20,7 +20,6 @@ package filters
import (
"context"
- "errors"
"fmt"
"sync"
"time"
@@ -29,10 +28,10 @@ import (
"github.com/ava-labs/coreth/core/rawdb"
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/rpc"
- ethereum "github.com/ava-labs/go-ethereum"
- "github.com/ava-labs/go-ethereum/common"
- "github.com/ava-labs/go-ethereum/event"
- "github.com/ava-labs/go-ethereum/log"
+ ethereum "github.com/ethereum/go-ethereum"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
)
// Type determines the kind of filter and is used to put the filter in to
@@ -58,7 +57,6 @@ const (
)
const (
-
// txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
@@ -70,10 +68,6 @@ const (
chainEvChanSize = 10
)
-var (
- ErrInvalidSubscriptionID = errors.New("invalid id")
-)
-
type subscription struct {
id rpc.ID
typ Type
@@ -89,25 +83,25 @@ type subscription struct {
// EventSystem creates subscriptions, processes events and broadcasts them to the
// subscription which match the subscription criteria.
type EventSystem struct {
- mux *event.TypeMux
backend Backend
lightMode bool
lastHead *types.Header
// Subscriptions
- txsSub event.Subscription // Subscription for new transaction event
- logsSub event.Subscription // Subscription for new log event
- rmLogsSub event.Subscription // Subscription for removed log event
- chainSub event.Subscription // Subscription for new chain event
- pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
+ txsSub event.Subscription // Subscription for new transaction event
+ logsSub event.Subscription // Subscription for new log event
+ rmLogsSub event.Subscription // Subscription for removed log event
+ pendingLogsSub event.Subscription // Subscription for pending log event
+ chainSub event.Subscription // Subscription for new chain event
// Channels
- install chan *subscription // install filter for event notification
- uninstall chan *subscription // remove filter for event notification
- txsCh chan core.NewTxsEvent // Channel to receive new transactions event
- logsCh chan []*types.Log // Channel to receive new log event
- rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
- chainCh chan core.ChainEvent // Channel to receive new chain event
+ install chan *subscription // install filter for event notification
+ uninstall chan *subscription // remove filter for event notification
+ txsCh chan core.NewTxsEvent // Channel to receive new transactions event
+ logsCh chan []*types.Log // Channel to receive new log event
+ pendingLogsCh chan []*types.Log // Channel to receive new log event
+ rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
+ chainCh chan core.ChainEvent // Channel to receive new chain event
}
// NewEventSystem creates a new manager that listens for event on the given mux,
@@ -116,17 +110,17 @@ type EventSystem struct {
//
// The returned manager has a loop that needs to be stopped with the Stop function
// or by stopping the given mux.
-func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem {
+func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
m := &EventSystem{
- mux: mux,
- backend: backend,
- lightMode: lightMode,
- install: make(chan *subscription),
- uninstall: make(chan *subscription),
- txsCh: make(chan core.NewTxsEvent, txChanSize),
- logsCh: make(chan []*types.Log, logsChanSize),
- rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
- chainCh: make(chan core.ChainEvent, chainEvChanSize),
+ backend: backend,
+ lightMode: lightMode,
+ install: make(chan *subscription),
+ uninstall: make(chan *subscription),
+ txsCh: make(chan core.NewTxsEvent, txChanSize),
+ logsCh: make(chan []*types.Log, logsChanSize),
+ rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
+ pendingLogsCh: make(chan []*types.Log, logsChanSize),
+ chainCh: make(chan core.ChainEvent, chainEvChanSize),
}
// Subscribe events
@@ -134,12 +128,10 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
- // TODO(rjl493456442): use feed to subscribe pending log event
- m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
+ m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)
// Make sure none of the subscriptions are empty
- if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
- m.pendingLogSub.Closed() {
+ if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
log.Crit("Subscribe for event system failed")
}
@@ -316,58 +308,61 @@ func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscript
type filterIndex map[Type]map[rpc.ID]*subscription
-// broadcast event to filters that match criteria.
-func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
- if ev == nil {
+func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
+ if len(ev) == 0 {
return
}
+ for _, f := range filters[LogsSubscription] {
+ matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
+ if len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
+ }
+}
- switch e := ev.(type) {
- case []*types.Log:
- if len(e) > 0 {
- for _, f := range filters[LogsSubscription] {
- if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
- }
+func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) {
+ if len(ev) == 0 {
+ return
+ }
+ for _, f := range filters[PendingLogsSubscription] {
+ matchedLogs := filterLogs(ev, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
+ if len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
- case core.RemovedLogsEvent:
- for _, f := range filters[LogsSubscription] {
- if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ }
+}
+
+func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLogsEvent) {
+ for _, f := range filters[LogsSubscription] {
+ matchedLogs := filterLogs(ev.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
+ if len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
- case *event.TypeMuxEvent:
- if muxe, ok := e.Data.(core.PendingLogsEvent); ok {
- for _, f := range filters[PendingLogsSubscription] {
- if e.Time.After(f.created) {
- if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ }
+}
+
+func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
+ hashes := make([]common.Hash, 0, len(ev.Txs))
+ for _, tx := range ev.Txs {
+ hashes = append(hashes, tx.Hash())
+ }
+ for _, f := range filters[PendingTransactionsSubscription] {
+ f.hashes <- hashes
+ }
+}
+
+func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
+ for _, f := range filters[BlocksSubscription] {
+ f.headers <- ev.Block.Header()
+ }
+ if es.lightMode && len(filters[LogsSubscription]) > 0 {
+ es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
+ for _, f := range filters[LogsSubscription] {
+ if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
- }
- case core.NewTxsEvent:
- hashes := make([]common.Hash, 0, len(e.Txs))
- for _, tx := range e.Txs {
- hashes = append(hashes, tx.Hash())
- }
- for _, f := range filters[PendingTransactionsSubscription] {
- f.hashes <- hashes
- }
- case core.ChainEvent:
- for _, f := range filters[BlocksSubscription] {
- f.headers <- e.Block.Header()
- }
- if es.lightMode && len(filters[LogsSubscription]) > 0 {
- es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
- for _, f := range filters[LogsSubscription] {
- if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
- }
- })
- }
+ })
}
}
@@ -448,10 +443,10 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
func (es *EventSystem) eventLoop() {
// Ensure all subscriptions get cleaned up
defer func() {
- es.pendingLogSub.Unsubscribe()
es.txsSub.Unsubscribe()
es.logsSub.Unsubscribe()
es.rmLogsSub.Unsubscribe()
+ es.pendingLogsSub.Unsubscribe()
es.chainSub.Unsubscribe()
}()
@@ -462,20 +457,16 @@ func (es *EventSystem) eventLoop() {
for {
select {
- // Handle subscribed events
case ev := <-es.txsCh:
- es.broadcast(index, ev)
+ es.handleTxsEvent(index, ev)
case ev := <-es.logsCh:
- es.broadcast(index, ev)
+ es.handleLogs(index, ev)
case ev := <-es.rmLogsCh:
- es.broadcast(index, ev)
+ es.handleRemovedLogs(index, ev)
+ case ev := <-es.pendingLogsCh:
+ es.handlePendingLogs(index, ev)
case ev := <-es.chainCh:
- es.broadcast(index, ev)
- case ev, active := <-es.pendingLogSub.Chan():
- if !active { // system stopped
- return
- }
- es.broadcast(index, ev)
+ es.handleChainEvent(index, ev)
case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {