aboutsummaryrefslogtreecommitdiff
path: root/eth/filters
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2019-10-01 13:28:55 -0400
committerDeterminant <[email protected]>2019-10-01 13:28:55 -0400
commitbc43122f54ed8de21f74ee6393549c9554d732e8 (patch)
tree549fffb8771dd280b29eec1e8691e67eb4b24895 /eth/filters
parent841b2b7225a9318718c3c856a9debdf01bc4f061 (diff)
support "accepted" as block number in JSON-RPC
Diffstat (limited to 'eth/filters')
-rw-r--r--eth/filters/api.go576
-rw-r--r--eth/filters/filter.go348
-rw-r--r--eth/filters/filter_system.go512
3 files changed, 1436 insertions, 0 deletions
diff --git a/eth/filters/api.go b/eth/filters/api.go
new file mode 100644
index 0000000..9d9e757
--- /dev/null
+++ b/eth/filters/api.go
@@ -0,0 +1,576 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package filters
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "math/big"
+ "sync"
+ "time"
+
+ ethereum "github.com/ava-labs/go-ethereum"
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/common/hexutil"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/ethdb"
+ "github.com/ava-labs/go-ethereum/event"
+ "github.com/ava-labs/go-ethereum/rpc"
+)
+
+var (
+ deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline
+)
+
+// filter is a helper struct that holds meta information over the filter type
+// and associated subscription in the event system.
+type filter struct {
+ typ Type
+ deadline *time.Timer // filter is inactiv when deadline triggers
+ hashes []common.Hash
+ crit FilterCriteria
+ logs []*types.Log
+ s *Subscription // associated subscription in event system
+}
+
+// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
+// information related to the Ethereum protocol such als blocks, transactions and logs.
+type PublicFilterAPI struct {
+ backend Backend
+ mux *event.TypeMux
+ quit chan struct{}
+ chainDb ethdb.Database
+ events *EventSystem
+ filtersMu sync.Mutex
+ filters map[rpc.ID]*filter
+}
+
+// NewPublicFilterAPI returns a new PublicFilterAPI instance.
+func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
+ api := &PublicFilterAPI{
+ backend: backend,
+ mux: backend.EventMux(),
+ chainDb: backend.ChainDb(),
+ events: NewEventSystem(backend.EventMux(), backend, lightMode),
+ filters: make(map[rpc.ID]*filter),
+ }
+ go api.timeoutLoop()
+
+ return api
+}
+
+// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used.
+// Tt is started when the api is created.
+func (api *PublicFilterAPI) timeoutLoop() {
+ ticker := time.NewTicker(5 * time.Minute)
+ for {
+ <-ticker.C
+ api.filtersMu.Lock()
+ for id, f := range api.filters {
+ select {
+ case <-f.deadline.C:
+ f.s.Unsubscribe()
+ delete(api.filters, id)
+ default:
+ continue
+ }
+ }
+ api.filtersMu.Unlock()
+ }
+}
+
+// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
+// as transactions enter the pending state.
+//
+// It is part of the filter package because this filter can be used through the
+// `eth_getFilterChanges` polling method that is also used for log filters.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter
+func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
+ var (
+ pendingTxs = make(chan []common.Hash)
+ pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
+ )
+
+ api.filtersMu.Lock()
+ api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case ph := <-pendingTxs:
+ api.filtersMu.Lock()
+ if f, found := api.filters[pendingTxSub.ID]; found {
+ f.hashes = append(f.hashes, ph...)
+ }
+ api.filtersMu.Unlock()
+ case <-pendingTxSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, pendingTxSub.ID)
+ api.filtersMu.Unlock()
+ return
+ }
+ }
+ }()
+
+ return pendingTxSub.ID
+}
+
+// NewPendingTransactions creates a subscription that is triggered each time a transaction
+// enters the transaction pool and was signed from one of the transactions this nodes manages.
+func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+ }
+
+ rpcSub := notifier.CreateSubscription()
+
+ go func() {
+ txHashes := make(chan []common.Hash, 128)
+ pendingTxSub := api.events.SubscribePendingTxs(txHashes)
+
+ for {
+ select {
+ case hashes := <-txHashes:
+ // To keep the original behaviour, send a single tx hash in one notification.
+ // TODO(rjl493456442) Send a batch of tx hashes in one notification
+ for _, h := range hashes {
+ notifier.Notify(rpcSub.ID, h)
+ }
+ case <-rpcSub.Err():
+ pendingTxSub.Unsubscribe()
+ return
+ case <-notifier.Closed():
+ pendingTxSub.Unsubscribe()
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
+// NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
+// It is part of the filter package since polling goes with eth_getFilterChanges.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter
+func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
+ var (
+ headers = make(chan *types.Header)
+ headerSub = api.events.SubscribeNewHeads(headers)
+ )
+
+ api.filtersMu.Lock()
+ api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case h := <-headers:
+ api.filtersMu.Lock()
+ if f, found := api.filters[headerSub.ID]; found {
+ f.hashes = append(f.hashes, h.Hash())
+ }
+ api.filtersMu.Unlock()
+ case <-headerSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, headerSub.ID)
+ api.filtersMu.Unlock()
+ return
+ }
+ }
+ }()
+
+ return headerSub.ID
+}
+
+// NewHeads send a notification each time a new (header) block is appended to the chain.
+func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+ }
+
+ rpcSub := notifier.CreateSubscription()
+
+ go func() {
+ headers := make(chan *types.Header)
+ headersSub := api.events.SubscribeNewHeads(headers)
+
+ for {
+ select {
+ case h := <-headers:
+ notifier.Notify(rpcSub.ID, h)
+ case <-rpcSub.Err():
+ headersSub.Unsubscribe()
+ return
+ case <-notifier.Closed():
+ headersSub.Unsubscribe()
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
+// Logs creates a subscription that fires for all new log that match the given filter criteria.
+func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+ }
+
+ var (
+ rpcSub = notifier.CreateSubscription()
+ matchedLogs = make(chan []*types.Log)
+ )
+
+ logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs)
+ if err != nil {
+ return nil, err
+ }
+
+ go func() {
+
+ for {
+ select {
+ case logs := <-matchedLogs:
+ for _, log := range logs {
+ notifier.Notify(rpcSub.ID, &log)
+ }
+ case <-rpcSub.Err(): // client send an unsubscribe request
+ logsSub.Unsubscribe()
+ return
+ case <-notifier.Closed(): // connection dropped
+ logsSub.Unsubscribe()
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
+// FilterCriteria represents a request to create a new filter.
+// Same as ethereum.FilterQuery but with UnmarshalJSON() method.
+type FilterCriteria ethereum.FilterQuery
+
+// NewFilter creates a new filter and returns the filter id. It can be
+// used to retrieve logs when the state changes. This method cannot be
+// used to fetch logs that are already stored in the state.
+//
+// Default criteria for the from and to block are "latest".
+// Using "latest" as block number will return logs for mined blocks.
+// Using "pending" as block number returns logs for not yet mined (pending) blocks.
+// In case logs are removed (chain reorg) previously returned logs are returned
+// again but with the removed property set to true.
+//
+// In case "fromBlock" > "toBlock" an error is returned.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
+func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
+ logs := make(chan []*types.Log)
+ logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), logs)
+ if err != nil {
+ return rpc.ID(""), err
+ }
+
+ api.filtersMu.Lock()
+ api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*types.Log, 0), s: logsSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case l := <-logs:
+ api.filtersMu.Lock()
+ if f, found := api.filters[logsSub.ID]; found {
+ f.logs = append(f.logs, l...)
+ }
+ api.filtersMu.Unlock()
+ case <-logsSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, logsSub.ID)
+ api.filtersMu.Unlock()
+ return
+ }
+ }
+ }()
+
+ return logsSub.ID, nil
+}
+
+// GetLogs returns logs matching the given argument that are stored within the state.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
+func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
+ var filter *Filter
+ if crit.BlockHash != nil {
+ // Block filter requested, construct a single-shot filter
+ filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics)
+ } else {
+ // Convert the RPC block numbers into internal representations
+ begin := rpc.LatestBlockNumber.Int64()
+ if crit.FromBlock != nil {
+ begin = crit.FromBlock.Int64()
+ }
+ end := rpc.LatestBlockNumber.Int64()
+ if crit.ToBlock != nil {
+ end = crit.ToBlock.Int64()
+ }
+ // Construct the range filter
+ filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics)
+ }
+ // Run the filter and return all the logs
+ logs, err := filter.Logs(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return returnLogs(logs), err
+}
+
+// UninstallFilter removes the filter with the given filter id.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter
+func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool {
+ api.filtersMu.Lock()
+ f, found := api.filters[id]
+ if found {
+ delete(api.filters, id)
+ }
+ api.filtersMu.Unlock()
+ if found {
+ f.s.Unsubscribe()
+ }
+
+ return found
+}
+
+// GetFilterLogs returns the logs for the filter with the given id.
+// If the filter could not be found an empty array of logs is returned.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
+func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Log, error) {
+ api.filtersMu.Lock()
+ f, found := api.filters[id]
+ api.filtersMu.Unlock()
+
+ if !found || f.typ != LogsSubscription {
+ return nil, fmt.Errorf("filter not found")
+ }
+
+ var filter *Filter
+ if f.crit.BlockHash != nil {
+ // Block filter requested, construct a single-shot filter
+ filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
+ } else {
+ // Convert the RPC block numbers into internal representations
+ begin := rpc.LatestBlockNumber.Int64()
+ if f.crit.FromBlock != nil {
+ begin = f.crit.FromBlock.Int64()
+ }
+ end := rpc.LatestBlockNumber.Int64()
+ if f.crit.ToBlock != nil {
+ end = f.crit.ToBlock.Int64()
+ }
+ // Construct the range filter
+ filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
+ }
+ // Run the filter and return all the logs
+ logs, err := filter.Logs(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return returnLogs(logs), nil
+}
+
+// GetFilterChanges returns the logs for the filter with the given id since
+// last time it was called. This can be used for polling.
+//
+// For pending transaction and block filters the result is []common.Hash.
+// (pending)Log filters return []Log.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
+func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
+ api.filtersMu.Lock()
+ defer api.filtersMu.Unlock()
+
+ if f, found := api.filters[id]; found {
+ if !f.deadline.Stop() {
+ // timer expired but filter is not yet removed in timeout loop
+ // receive timer value and reset timer
+ <-f.deadline.C
+ }
+ f.deadline.Reset(deadline)
+
+ switch f.typ {
+ case PendingTransactionsSubscription, BlocksSubscription:
+ hashes := f.hashes
+ f.hashes = nil
+ return returnHashes(hashes), nil
+ case LogsSubscription:
+ logs := f.logs
+ f.logs = nil
+ return returnLogs(logs), nil
+ }
+ }
+
+ return []interface{}{}, fmt.Errorf("filter not found")
+}
+
+// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
+// otherwise the given hashes array is returned.
+func returnHashes(hashes []common.Hash) []common.Hash {
+ if hashes == nil {
+ return []common.Hash{}
+ }
+ return hashes
+}
+
+// returnLogs is a helper that will return an empty log array in case the given logs array is nil,
+// otherwise the given logs array is returned.
+func returnLogs(logs []*types.Log) []*types.Log {
+ if logs == nil {
+ return []*types.Log{}
+ }
+ return logs
+}
+
+// UnmarshalJSON sets *args fields with given data.
+func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
+ type input struct {
+ BlockHash *common.Hash `json:"blockHash"`
+ FromBlock *rpc.BlockNumber `json:"fromBlock"`
+ ToBlock *rpc.BlockNumber `json:"toBlock"`
+ Addresses interface{} `json:"address"`
+ Topics []interface{} `json:"topics"`
+ }
+
+ var raw input
+ if err := json.Unmarshal(data, &raw); err != nil {
+ return err
+ }
+
+ if raw.BlockHash != nil {
+ if raw.FromBlock != nil || raw.ToBlock != nil {
+ // BlockHash is mutually exclusive with FromBlock/ToBlock criteria
+ return fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock, choose one or the other")
+ }
+ args.BlockHash = raw.BlockHash
+ } else {
+ if raw.FromBlock != nil {
+ args.FromBlock = big.NewInt(raw.FromBlock.Int64())
+ }
+
+ if raw.ToBlock != nil {
+ args.ToBlock = big.NewInt(raw.ToBlock.Int64())
+ }
+ }
+
+ args.Addresses = []common.Address{}
+
+ if raw.Addresses != nil {
+ // raw.Address can contain a single address or an array of addresses
+ switch rawAddr := raw.Addresses.(type) {
+ case []interface{}:
+ for i, addr := range rawAddr {
+ if strAddr, ok := addr.(string); ok {
+ addr, err := decodeAddress(strAddr)
+ if err != nil {
+ return fmt.Errorf("invalid address at index %d: %v", i, err)
+ }
+ args.Addresses = append(args.Addresses, addr)
+ } else {
+ return fmt.Errorf("non-string address at index %d", i)
+ }
+ }
+ case string:
+ addr, err := decodeAddress(rawAddr)
+ if err != nil {
+ return fmt.Errorf("invalid address: %v", err)
+ }
+ args.Addresses = []common.Address{addr}
+ default:
+ return errors.New("invalid addresses in query")
+ }
+ }
+
+ // topics is an array consisting of strings and/or arrays of strings.
+ // JSON null values are converted to common.Hash{} and ignored by the filter manager.
+ if len(raw.Topics) > 0 {
+ args.Topics = make([][]common.Hash, len(raw.Topics))
+ for i, t := range raw.Topics {
+ switch topic := t.(type) {
+ case nil:
+ // ignore topic when matching logs
+
+ case string:
+ // match specific topic
+ top, err := decodeTopic(topic)
+ if err != nil {
+ return err
+ }
+ args.Topics[i] = []common.Hash{top}
+
+ case []interface{}:
+ // or case e.g. [null, "topic0", "topic1"]
+ for _, rawTopic := range topic {
+ if rawTopic == nil {
+ // null component, match all
+ args.Topics[i] = nil
+ break
+ }
+ if topic, ok := rawTopic.(string); ok {
+ parsed, err := decodeTopic(topic)
+ if err != nil {
+ return err
+ }
+ args.Topics[i] = append(args.Topics[i], parsed)
+ } else {
+ return fmt.Errorf("invalid topic(s)")
+ }
+ }
+ default:
+ return fmt.Errorf("invalid topic(s)")
+ }
+ }
+ }
+
+ return nil
+}
+
+func decodeAddress(s string) (common.Address, error) {
+ b, err := hexutil.Decode(s)
+ if err == nil && len(b) != common.AddressLength {
+ err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for address", len(b), common.AddressLength)
+ }
+ return common.BytesToAddress(b), err
+}
+
+func decodeTopic(s string) (common.Hash, error) {
+ b, err := hexutil.Decode(s)
+ if err == nil && len(b) != common.HashLength {
+ err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for topic", len(b), common.HashLength)
+ }
+ return common.BytesToHash(b), err
+}
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
new file mode 100644
index 0000000..fa3d985
--- /dev/null
+++ b/eth/filters/filter.go
@@ -0,0 +1,348 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package filters
+
+import (
+ "context"
+ "errors"
+ "math/big"
+
+ "github.com/ava-labs/coreth/rpc"
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/core"
+ "github.com/ava-labs/go-ethereum/core/bloombits"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/ethdb"
+ "github.com/ava-labs/go-ethereum/event"
+)
+
+type Backend interface {
+ ChainDb() ethdb.Database
+ EventMux() *event.TypeMux
+ HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
+ HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
+ GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
+ GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
+
+ SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
+ SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
+ SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
+ SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
+
+ BloomStatus() (uint64, uint64)
+ ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
+}
+
+// Filter can be used to retrieve and filter logs.
+type Filter struct {
+ backend Backend
+
+ db ethdb.Database
+ addresses []common.Address
+ topics [][]common.Hash
+
+ block common.Hash // Block hash if filtering a single block
+ begin, end int64 // Range interval if filtering multiple blocks
+
+ matcher *bloombits.Matcher
+}
+
+// NewRangeFilter creates a new filter which uses a bloom filter on blocks to
+// figure out whether a particular block is interesting or not.
+func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
+ // Flatten the address and topic filter clauses into a single bloombits filter
+ // system. Since the bloombits are not positional, nil topics are permitted,
+ // which get flattened into a nil byte slice.
+ var filters [][][]byte
+ if len(addresses) > 0 {
+ filter := make([][]byte, len(addresses))
+ for i, address := range addresses {
+ filter[i] = address.Bytes()
+ }
+ filters = append(filters, filter)
+ }
+ for _, topicList := range topics {
+ filter := make([][]byte, len(topicList))
+ for i, topic := range topicList {
+ filter[i] = topic.Bytes()
+ }
+ filters = append(filters, filter)
+ }
+ size, _ := backend.BloomStatus()
+
+ // Create a generic filter and convert it into a range filter
+ filter := newFilter(backend, addresses, topics)
+
+ filter.matcher = bloombits.NewMatcher(size, filters)
+ filter.begin = begin
+ filter.end = end
+
+ return filter
+}
+
+// NewBlockFilter creates a new filter which directly inspects the contents of
+// a block to figure out whether it is interesting or not.
+func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
+ // Create a generic filter and convert it into a block filter
+ filter := newFilter(backend, addresses, topics)
+ filter.block = block
+ return filter
+}
+
+// newFilter creates a generic filter that can either filter based on a block hash,
+// or based on range queries. The search criteria needs to be explicitly set.
+func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash) *Filter {
+ return &Filter{
+ backend: backend,
+ addresses: addresses,
+ topics: topics,
+ db: backend.ChainDb(),
+ }
+}
+
+// Logs searches the blockchain for matching log entries, returning all from the
+// first block that contains matches, updating the start of the filter accordingly.
+func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
+ // If we're doing singleton block filtering, execute and return
+ if f.block != (common.Hash{}) {
+ header, err := f.backend.HeaderByHash(ctx, f.block)
+ if err != nil {
+ return nil, err
+ }
+ if header == nil {
+ return nil, errors.New("unknown block")
+ }
+ return f.blockLogs(ctx, header)
+ }
+ // Figure out the limits of the filter range
+ header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
+ if header == nil {
+ return nil, nil
+ }
+ head := header.Number.Uint64()
+
+ if f.begin == -1 {
+ f.begin = int64(head)
+ }
+ end := uint64(f.end)
+ if f.end == -1 {
+ end = head
+ }
+ // Gather all indexed logs, and finish with non indexed ones
+ var (
+ logs []*types.Log
+ err error
+ )
+ size, sections := f.backend.BloomStatus()
+ if indexed := sections * size; indexed > uint64(f.begin) {
+ if indexed > end {
+ logs, err = f.indexedLogs(ctx, end)
+ } else {
+ logs, err = f.indexedLogs(ctx, indexed-1)
+ }
+ if err != nil {
+ return logs, err
+ }
+ }
+ rest, err := f.unindexedLogs(ctx, end)
+ logs = append(logs, rest...)
+ return logs, err
+}
+
+// indexedLogs returns the logs matching the filter criteria based on the bloom
+// bits indexed available locally or via the network.
+func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ // Create a matcher session and request servicing from the backend
+ matches := make(chan uint64, 64)
+
+ session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
+ if err != nil {
+ return nil, err
+ }
+ defer session.Close()
+
+ f.backend.ServiceFilter(ctx, session)
+
+ // Iterate over the matches until exhausted or context closed
+ var logs []*types.Log
+
+ for {
+ select {
+ case number, ok := <-matches:
+ // Abort if all matches have been fulfilled
+ if !ok {
+ err := session.Error()
+ if err == nil {
+ f.begin = int64(end) + 1
+ }
+ return logs, err
+ }
+ f.begin = int64(number) + 1
+
+ // Retrieve the suggested block and pull any truly matching logs
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
+ if header == nil || err != nil {
+ return logs, err
+ }
+ found, err := f.checkMatches(ctx, header)
+ if err != nil {
+ return logs, err
+ }
+ logs = append(logs, found...)
+
+ case <-ctx.Done():
+ return logs, ctx.Err()
+ }
+ }
+}
+
+// indexedLogs returns the logs matching the filter criteria based on raw block
+// iteration and bloom matching.
+func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ var logs []*types.Log
+
+ for ; f.begin <= int64(end); f.begin++ {
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
+ if header == nil || err != nil {
+ return logs, err
+ }
+ found, err := f.blockLogs(ctx, header)
+ if err != nil {
+ return logs, err
+ }
+ logs = append(logs, found...)
+ }
+ return logs, nil
+}
+
+// blockLogs returns the logs matching the filter criteria within a single block.
+func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
+ if bloomFilter(header.Bloom, f.addresses, f.topics) {
+ found, err := f.checkMatches(ctx, header)
+ if err != nil {
+ return logs, err
+ }
+ logs = append(logs, found...)
+ }
+ return logs, nil
+}
+
+// checkMatches checks if the receipts belonging to the given header contain any log events that
+// match the filter criteria. This function is called when the bloom filter signals a potential match.
+func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
+ // Get the logs of the block
+ logsList, err := f.backend.GetLogs(ctx, header.Hash())
+ if err != nil {
+ return nil, err
+ }
+ var unfiltered []*types.Log
+ for _, logs := range logsList {
+ unfiltered = append(unfiltered, logs...)
+ }
+ logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
+ if len(logs) > 0 {
+ // We have matching logs, check if we need to resolve full logs via the light client
+ if logs[0].TxHash == (common.Hash{}) {
+ receipts, err := f.backend.GetReceipts(ctx, header.Hash())
+ if err != nil {
+ return nil, err
+ }
+ unfiltered = unfiltered[:0]
+ for _, receipt := range receipts {
+ unfiltered = append(unfiltered, receipt.Logs...)
+ }
+ logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
+ }
+ return logs, nil
+ }
+ return nil, nil
+}
+
+func includes(addresses []common.Address, a common.Address) bool {
+ for _, addr := range addresses {
+ if addr == a {
+ return true
+ }
+ }
+
+ return false
+}
+
+// filterLogs creates a slice of logs matching the given criteria.
+func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log {
+ var ret []*types.Log
+Logs:
+ for _, log := range logs {
+ if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber {
+ continue
+ }
+ if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber {
+ continue
+ }
+
+ if len(addresses) > 0 && !includes(addresses, log.Address) {
+ continue
+ }
+ // If the to filtered topics is greater than the amount of topics in logs, skip.
+ if len(topics) > len(log.Topics) {
+ continue Logs
+ }
+ for i, sub := range topics {
+ match := len(sub) == 0 // empty rule set == wildcard
+ for _, topic := range sub {
+ if log.Topics[i] == topic {
+ match = true
+ break
+ }
+ }
+ if !match {
+ continue Logs
+ }
+ }
+ ret = append(ret, log)
+ }
+ return ret
+}
+
+func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
+ if len(addresses) > 0 {
+ var included bool
+ for _, addr := range addresses {
+ if types.BloomLookup(bloom, addr) {
+ included = true
+ break
+ }
+ }
+ if !included {
+ return false
+ }
+ }
+
+ for _, sub := range topics {
+ included := len(sub) == 0 // empty rule set == wildcard
+ for _, topic := range sub {
+ if types.BloomLookup(bloom, topic) {
+ included = true
+ break
+ }
+ }
+ if !included {
+ return false
+ }
+ }
+ return true
+}
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
new file mode 100644
index 0000000..a27859f
--- /dev/null
+++ b/eth/filters/filter_system.go
@@ -0,0 +1,512 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package filters implements an ethereum filtering system for block,
+// transactions and log events.
+package filters
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ myrpc "github.com/ava-labs/coreth/rpc"
+ ethereum "github.com/ava-labs/go-ethereum"
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/core"
+ "github.com/ava-labs/go-ethereum/core/rawdb"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/event"
+ "github.com/ava-labs/go-ethereum/log"
+ "github.com/ava-labs/go-ethereum/rpc"
+)
+
+// Type determines the kind of filter and is used to put the filter in to
+// the correct bucket when added.
+type Type byte
+
+const (
+ // UnknownSubscription indicates an unknown subscription type
+ UnknownSubscription Type = iota
+ // LogsSubscription queries for new or removed (chain reorg) logs
+ LogsSubscription
+ // PendingLogsSubscription queries for logs in pending blocks
+ PendingLogsSubscription
+ // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
+ MinedAndPendingLogsSubscription
+ // PendingTransactionsSubscription queries tx hashes for pending
+ // transactions entering the pending state
+ PendingTransactionsSubscription
+ // BlocksSubscription queries hashes for blocks that are imported
+ BlocksSubscription
+ // LastSubscription keeps track of the last index
+ LastIndexSubscription
+)
+
+const (
+
+ // txChanSize is the size of channel listening to NewTxsEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
+ // rmLogsChanSize is the size of channel listening to RemovedLogsEvent.
+ rmLogsChanSize = 10
+ // logsChanSize is the size of channel listening to LogsEvent.
+ logsChanSize = 10
+ // chainEvChanSize is the size of channel listening to ChainEvent.
+ chainEvChanSize = 10
+)
+
+var (
+ ErrInvalidSubscriptionID = errors.New("invalid id")
+)
+
+type subscription struct {
+ id rpc.ID
+ typ Type
+ created time.Time
+ logsCrit ethereum.FilterQuery
+ logs chan []*types.Log
+ hashes chan []common.Hash
+ headers chan *types.Header
+ installed chan struct{} // closed when the filter is installed
+ err chan error // closed when the filter is uninstalled
+}
+
+// EventSystem creates subscriptions, processes events and broadcasts them to the
+// subscription which match the subscription criteria.
+type EventSystem struct {
+ mux *event.TypeMux
+ backend Backend
+ lightMode bool
+ lastHead *types.Header
+
+ // Subscriptions
+ txsSub event.Subscription // Subscription for new transaction event
+ logsSub event.Subscription // Subscription for new log event
+ rmLogsSub event.Subscription // Subscription for removed log event
+ chainSub event.Subscription // Subscription for new chain event
+ pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
+
+ // Channels
+ install chan *subscription // install filter for event notification
+ uninstall chan *subscription // remove filter for event notification
+ txsCh chan core.NewTxsEvent // Channel to receive new transactions event
+ logsCh chan []*types.Log // Channel to receive new log event
+ rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
+ chainCh chan core.ChainEvent // Channel to receive new chain event
+}
+
+// NewEventSystem creates a new manager that listens for event on the given mux,
+// parses and filters them. It uses the all map to retrieve filter changes. The
+// work loop holds its own index that is used to forward events to filters.
+//
+// The returned manager has a loop that needs to be stopped with the Stop function
+// or by stopping the given mux.
+func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem {
+ m := &EventSystem{
+ mux: mux,
+ backend: backend,
+ lightMode: lightMode,
+ install: make(chan *subscription),
+ uninstall: make(chan *subscription),
+ txsCh: make(chan core.NewTxsEvent, txChanSize),
+ logsCh: make(chan []*types.Log, logsChanSize),
+ rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
+ chainCh: make(chan core.ChainEvent, chainEvChanSize),
+ }
+
+ // Subscribe events
+ m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
+ m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
+ m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
+ m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
+ // TODO(rjl493456442): use feed to subscribe pending log event
+ m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
+
+ // Make sure none of the subscriptions are empty
+ if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
+ m.pendingLogSub.Closed() {
+ log.Crit("Subscribe for event system failed")
+ }
+
+ go m.eventLoop()
+ return m
+}
+
+// Subscription is created when the client registers itself for a particular event.
+type Subscription struct {
+ ID rpc.ID
+ f *subscription
+ es *EventSystem
+ unsubOnce sync.Once
+}
+
+// Err returns a channel that is closed when unsubscribed.
+func (sub *Subscription) Err() <-chan error {
+ return sub.f.err
+}
+
+// Unsubscribe uninstalls the subscription from the event broadcast loop.
+func (sub *Subscription) Unsubscribe() {
+ sub.unsubOnce.Do(func() {
+ uninstallLoop:
+ for {
+ // write uninstall request and consume logs/hashes. This prevents
+ // the eventLoop broadcast method to deadlock when writing to the
+ // filter event channel while the subscription loop is waiting for
+ // this method to return (and thus not reading these events).
+ select {
+ case sub.es.uninstall <- sub.f:
+ break uninstallLoop
+ case <-sub.f.logs:
+ case <-sub.f.hashes:
+ case <-sub.f.headers:
+ }
+ }
+
+ // wait for filter to be uninstalled in work loop before returning
+ // this ensures that the manager won't use the event channel which
+ // will probably be closed by the client asap after this method returns.
+ <-sub.Err()
+ })
+}
+
+// subscribe installs the subscription in the event broadcast loop.
+func (es *EventSystem) subscribe(sub *subscription) *Subscription {
+ es.install <- sub
+ <-sub.installed
+ return &Subscription{ID: sub.id, f: sub, es: es}
+}
+
+// SubscribeLogs creates a subscription that will write all logs matching the
+// given criteria to the given logs channel. Default value for the from and to
+// block is "latest". If the fromBlock > toBlock an error is returned.
+func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) (*Subscription, error) {
+ var from, to myrpc.BlockNumber
+ if crit.FromBlock == nil {
+ from = myrpc.LatestBlockNumber
+ } else {
+ from = myrpc.BlockNumber(crit.FromBlock.Int64())
+ }
+ if crit.ToBlock == nil {
+ to = myrpc.LatestBlockNumber
+ } else {
+ to = myrpc.BlockNumber(crit.ToBlock.Int64())
+ }
+
+ // only interested in pending logs
+ if from == myrpc.PendingBlockNumber && to == myrpc.PendingBlockNumber {
+ return es.subscribePendingLogs(crit, logs), nil
+ }
+ // only interested in new mined logs
+ if from == myrpc.LatestBlockNumber && to == myrpc.LatestBlockNumber {
+ return es.subscribeLogs(crit, logs), nil
+ }
+ // only interested in mined logs within a specific block range
+ if from >= 0 && to >= 0 && to >= from {
+ return es.subscribeLogs(crit, logs), nil
+ }
+ // interested in mined logs from a specific block number, new logs and pending logs
+ if from >= myrpc.LatestBlockNumber && to == myrpc.PendingBlockNumber {
+ return es.subscribeMinedPendingLogs(crit, logs), nil
+ }
+ // interested in logs from a specific block number to new mined blocks
+ if from >= 0 && to == myrpc.LatestBlockNumber {
+ return es.subscribeLogs(crit, logs), nil
+ }
+ return nil, fmt.Errorf("invalid from and to block combination: from > to")
+}
+
+// subscribeMinedPendingLogs creates a subscription that returned mined and
+// pending logs that match the given criteria.
+func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: MinedAndPendingLogsSubscription,
+ logsCrit: crit,
+ created: time.Now(),
+ logs: logs,
+ hashes: make(chan []common.Hash),
+ headers: make(chan *types.Header),
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+ return es.subscribe(sub)
+}
+
+// subscribeLogs creates a subscription that will write all logs matching the
+// given criteria to the given logs channel.
+func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: LogsSubscription,
+ logsCrit: crit,
+ created: time.Now(),
+ logs: logs,
+ hashes: make(chan []common.Hash),
+ headers: make(chan *types.Header),
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+ return es.subscribe(sub)
+}
+
+// subscribePendingLogs creates a subscription that writes transaction hashes for
+// transactions that enter the transaction pool.
+func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: PendingLogsSubscription,
+ logsCrit: crit,
+ created: time.Now(),
+ logs: logs,
+ hashes: make(chan []common.Hash),
+ headers: make(chan *types.Header),
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+ return es.subscribe(sub)
+}
+
+// SubscribeNewHeads creates a subscription that writes the header of a block that is
+// imported in the chain.
+func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: BlocksSubscription,
+ created: time.Now(),
+ logs: make(chan []*types.Log),
+ hashes: make(chan []common.Hash),
+ headers: headers,
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+ return es.subscribe(sub)
+}
+
+// SubscribePendingTxs creates a subscription that writes transaction hashes for
+// transactions that enter the transaction pool.
+func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: PendingTransactionsSubscription,
+ created: time.Now(),
+ logs: make(chan []*types.Log),
+ hashes: hashes,
+ headers: make(chan *types.Header),
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+ return es.subscribe(sub)
+}
+
+type filterIndex map[Type]map[rpc.ID]*subscription
+
+// broadcast event to filters that match criteria.
+func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
+ if ev == nil {
+ return
+ }
+
+ switch e := ev.(type) {
+ case []*types.Log:
+ if len(e) > 0 {
+ for _, f := range filters[LogsSubscription] {
+ if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
+ }
+ }
+ case core.RemovedLogsEvent:
+ for _, f := range filters[LogsSubscription] {
+ if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
+ }
+ case *event.TypeMuxEvent:
+ if muxe, ok := e.Data.(core.PendingLogsEvent); ok {
+ for _, f := range filters[PendingLogsSubscription] {
+ if e.Time.After(f.created) {
+ if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
+ }
+ }
+ }
+ case core.NewTxsEvent:
+ hashes := make([]common.Hash, 0, len(e.Txs))
+ for _, tx := range e.Txs {
+ hashes = append(hashes, tx.Hash())
+ }
+ for _, f := range filters[PendingTransactionsSubscription] {
+ f.hashes <- hashes
+ }
+ case core.ChainEvent:
+ for _, f := range filters[BlocksSubscription] {
+ f.headers <- e.Block.Header()
+ }
+ if es.lightMode && len(filters[LogsSubscription]) > 0 {
+ es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
+ for _, f := range filters[LogsSubscription] {
+ if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
+ }
+ })
+ }
+ }
+}
+
+func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
+ oldh := es.lastHead
+ es.lastHead = newHeader
+ if oldh == nil {
+ return
+ }
+ newh := newHeader
+ // find common ancestor, create list of rolled back and new block hashes
+ var oldHeaders, newHeaders []*types.Header
+ for oldh.Hash() != newh.Hash() {
+ if oldh.Number.Uint64() >= newh.Number.Uint64() {
+ oldHeaders = append(oldHeaders, oldh)
+ oldh = rawdb.ReadHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1)
+ }
+ if oldh.Number.Uint64() < newh.Number.Uint64() {
+ newHeaders = append(newHeaders, newh)
+ newh = rawdb.ReadHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1)
+ if newh == nil {
+ // happens when CHT syncing, nothing to do
+ newh = oldh
+ }
+ }
+ }
+ // roll back old blocks
+ for _, h := range oldHeaders {
+ callBack(h, true)
+ }
+ // check new blocks (array is in reverse order)
+ for i := len(newHeaders) - 1; i >= 0; i-- {
+ callBack(newHeaders[i], false)
+ }
+}
+
+// filter logs of a single header in light client mode
+func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log {
+ if bloomFilter(header.Bloom, addresses, topics) {
+ // Get the logs of the block
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+ defer cancel()
+ logsList, err := es.backend.GetLogs(ctx, header.Hash())
+ if err != nil {
+ return nil
+ }
+ var unfiltered []*types.Log
+ for _, logs := range logsList {
+ for _, log := range logs {
+ logcopy := *log
+ logcopy.Removed = remove
+ unfiltered = append(unfiltered, &logcopy)
+ }
+ }
+ logs := filterLogs(unfiltered, nil, nil, addresses, topics)
+ if len(logs) > 0 && logs[0].TxHash == (common.Hash{}) {
+ // We have matching but non-derived logs
+ receipts, err := es.backend.GetReceipts(ctx, header.Hash())
+ if err != nil {
+ return nil
+ }
+ unfiltered = unfiltered[:0]
+ for _, receipt := range receipts {
+ for _, log := range receipt.Logs {
+ logcopy := *log
+ logcopy.Removed = remove
+ unfiltered = append(unfiltered, &logcopy)
+ }
+ }
+ logs = filterLogs(unfiltered, nil, nil, addresses, topics)
+ }
+ return logs
+ }
+ return nil
+}
+
+// eventLoop (un)installs filters and processes mux events.
+func (es *EventSystem) eventLoop() {
+ // Ensure all subscriptions get cleaned up
+ defer func() {
+ es.pendingLogSub.Unsubscribe()
+ es.txsSub.Unsubscribe()
+ es.logsSub.Unsubscribe()
+ es.rmLogsSub.Unsubscribe()
+ es.chainSub.Unsubscribe()
+ }()
+
+ index := make(filterIndex)
+ for i := UnknownSubscription; i < LastIndexSubscription; i++ {
+ index[i] = make(map[rpc.ID]*subscription)
+ }
+
+ for {
+ select {
+ // Handle subscribed events
+ case ev := <-es.txsCh:
+ es.broadcast(index, ev)
+ case ev := <-es.logsCh:
+ es.broadcast(index, ev)
+ case ev := <-es.rmLogsCh:
+ es.broadcast(index, ev)
+ case ev := <-es.chainCh:
+ es.broadcast(index, ev)
+ case ev, active := <-es.pendingLogSub.Chan():
+ if !active { // system stopped
+ return
+ }
+ es.broadcast(index, ev)
+
+ case f := <-es.install:
+ if f.typ == MinedAndPendingLogsSubscription {
+ // the type are logs and pending logs subscriptions
+ index[LogsSubscription][f.id] = f
+ index[PendingLogsSubscription][f.id] = f
+ } else {
+ index[f.typ][f.id] = f
+ }
+ close(f.installed)
+
+ case f := <-es.uninstall:
+ if f.typ == MinedAndPendingLogsSubscription {
+ // the type are logs and pending logs subscriptions
+ delete(index[LogsSubscription], f.id)
+ delete(index[PendingLogsSubscription], f.id)
+ } else {
+ delete(index[f.typ], f.id)
+ }
+ close(f.err)
+
+ // System stopped
+ case <-es.txsSub.Err():
+ return
+ case <-es.logsSub.Err():
+ return
+ case <-es.rmLogsSub.Err():
+ return
+ case <-es.chainSub.Err():
+ return
+ }
+ }
+}