aboutsummaryrefslogtreecommitdiff
path: root/eth
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2019-10-01 13:28:55 -0400
committerDeterminant <[email protected]>2019-10-01 13:28:55 -0400
commitbc43122f54ed8de21f74ee6393549c9554d732e8 (patch)
tree549fffb8771dd280b29eec1e8691e67eb4b24895 /eth
parent841b2b7225a9318718c3c856a9debdf01bc4f061 (diff)
support "accepted" as block number in JSON-RPC
Diffstat (limited to 'eth')
-rw-r--r--eth/api.go6
-rw-r--r--eth/api_backend.go14
-rw-r--r--eth/api_tracer.go25
-rw-r--r--eth/backend.go24
-rw-r--r--eth/config.go62
-rw-r--r--eth/filters/api.go576
-rw-r--r--eth/filters/filter.go348
-rw-r--r--eth/filters/filter_system.go512
-rw-r--r--eth/gasprice/gasprice.go189
-rw-r--r--eth/gen_config.go4
10 files changed, 1711 insertions, 49 deletions
diff --git a/eth/api.go b/eth/api.go
index 10cf01d..59750ce 100644
--- a/eth/api.go
+++ b/eth/api.go
@@ -28,15 +28,15 @@ import (
"strings"
"time"
+ "github.com/ava-labs/coreth/internal/ethapi"
+ "github.com/ava-labs/coreth/rpc"
"github.com/ava-labs/go-ethereum/common"
"github.com/ava-labs/go-ethereum/common/hexutil"
"github.com/ava-labs/go-ethereum/core"
"github.com/ava-labs/go-ethereum/core/rawdb"
"github.com/ava-labs/go-ethereum/core/state"
"github.com/ava-labs/go-ethereum/core/types"
- "github.com/ava-labs/coreth/internal/ethapi"
"github.com/ava-labs/go-ethereum/rlp"
- "github.com/ava-labs/go-ethereum/rpc"
"github.com/ava-labs/go-ethereum/trie"
)
@@ -271,6 +271,8 @@ func (api *PublicDebugAPI) DumpBlock(blockNr rpc.BlockNumber) (state.Dump, error
var block *types.Block
if blockNr == rpc.LatestBlockNumber {
block = api.eth.blockchain.CurrentBlock()
+ } else if blockNr == rpc.AcceptedBlockNumber {
+ block = api.eth.AcceptedBlock()
} else {
block = api.eth.blockchain.GetBlockByNumber(uint64(blockNr))
}
diff --git a/eth/api_backend.go b/eth/api_backend.go
index f4c0101..99556c9 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -21,6 +21,8 @@ import (
"errors"
"math/big"
+ "github.com/ava-labs/coreth/eth/gasprice"
+ "github.com/ava-labs/coreth/rpc"
"github.com/ava-labs/go-ethereum/accounts"
"github.com/ava-labs/go-ethereum/common"
"github.com/ava-labs/go-ethereum/common/math"
@@ -31,11 +33,9 @@ import (
"github.com/ava-labs/go-ethereum/core/types"
"github.com/ava-labs/go-ethereum/core/vm"
"github.com/ava-labs/go-ethereum/eth/downloader"
- "github.com/ava-labs/go-ethereum/eth/gasprice"
"github.com/ava-labs/go-ethereum/ethdb"
"github.com/ava-labs/go-ethereum/event"
"github.com/ava-labs/go-ethereum/params"
- "github.com/ava-labs/go-ethereum/rpc"
)
// EthAPIBackend implements ethapi.Backend for full nodes
@@ -54,6 +54,10 @@ func (b *EthAPIBackend) CurrentBlock() *types.Block {
return b.eth.blockchain.CurrentBlock()
}
+func (b *EthAPIBackend) AcceptedBlock() *types.Block {
+ return b.eth.AcceptedBlock()
+}
+
func (b *EthAPIBackend) SetHead(number uint64) {
b.eth.protocolManager.downloader.Cancel()
b.eth.blockchain.SetHead(number)
@@ -69,6 +73,9 @@ func (b *EthAPIBackend) HeaderByNumber(ctx context.Context, number rpc.BlockNumb
if number == rpc.LatestBlockNumber {
return b.eth.blockchain.CurrentBlock().Header(), nil
}
+ if number == rpc.AcceptedBlockNumber {
+ return b.eth.AcceptedBlock().Header(), nil
+ }
return b.eth.blockchain.GetHeaderByNumber(uint64(number)), nil
}
@@ -86,6 +93,9 @@ func (b *EthAPIBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumbe
if number == rpc.LatestBlockNumber {
return b.eth.blockchain.CurrentBlock(), nil
}
+ if number == rpc.AcceptedBlockNumber {
+ return b.eth.AcceptedBlock(), nil
+ }
return b.eth.blockchain.GetBlockByNumber(uint64(number)), nil
}
diff --git a/eth/api_tracer.go b/eth/api_tracer.go
index 618e7c8..b87a9fe 100644
--- a/eth/api_tracer.go
+++ b/eth/api_tracer.go
@@ -28,6 +28,8 @@ import (
"sync"
"time"
+ "github.com/ava-labs/coreth/internal/ethapi"
+ myrpc "github.com/ava-labs/coreth/rpc"
"github.com/ava-labs/go-ethereum/common"
"github.com/ava-labs/go-ethereum/common/hexutil"
"github.com/ava-labs/go-ethereum/core"
@@ -36,7 +38,6 @@ import (
"github.com/ava-labs/go-ethereum/core/types"
"github.com/ava-labs/go-ethereum/core/vm"
"github.com/ava-labs/go-ethereum/eth/tracers"
- "github.com/ava-labs/coreth/internal/ethapi"
"github.com/ava-labs/go-ethereum/log"
"github.com/ava-labs/go-ethereum/rlp"
"github.com/ava-labs/go-ethereum/rpc"
@@ -101,23 +102,27 @@ type txTraceTask struct {
// TraceChain returns the structured logs created during the execution of EVM
// between two blocks (excluding start) and returns them as a JSON object.
-func (api *PrivateDebugAPI) TraceChain(ctx context.Context, start, end rpc.BlockNumber, config *TraceConfig) (*rpc.Subscription, error) {
+func (api *PrivateDebugAPI) TraceChain(ctx context.Context, start, end myrpc.BlockNumber, config *TraceConfig) (*rpc.Subscription, error) {
// Fetch the block interval that we want to trace
var from, to *types.Block
switch start {
- case rpc.PendingBlockNumber:
+ case myrpc.PendingBlockNumber:
from = api.eth.miner.PendingBlock()
- case rpc.LatestBlockNumber:
+ case myrpc.LatestBlockNumber:
from = api.eth.blockchain.CurrentBlock()
+ case myrpc.AcceptedBlockNumber:
+ from = api.eth.AcceptedBlock()
default:
from = api.eth.blockchain.GetBlockByNumber(uint64(start))
}
switch end {
- case rpc.PendingBlockNumber:
+ case myrpc.PendingBlockNumber:
to = api.eth.miner.PendingBlock()
- case rpc.LatestBlockNumber:
+ case myrpc.LatestBlockNumber:
to = api.eth.blockchain.CurrentBlock()
+ case myrpc.AcceptedBlockNumber:
+ from = api.eth.AcceptedBlock()
default:
to = api.eth.blockchain.GetBlockByNumber(uint64(end))
}
@@ -352,15 +357,17 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl
// TraceBlockByNumber returns the structured logs created during the execution of
// EVM and returns them as a JSON object.
-func (api *PrivateDebugAPI) TraceBlockByNumber(ctx context.Context, number rpc.BlockNumber, config *TraceConfig) ([]*txTraceResult, error) {
+func (api *PrivateDebugAPI) TraceBlockByNumber(ctx context.Context, number myrpc.BlockNumber, config *TraceConfig) ([]*txTraceResult, error) {
// Fetch the block that we want to trace
var block *types.Block
switch number {
- case rpc.PendingBlockNumber:
+ case myrpc.PendingBlockNumber:
block = api.eth.miner.PendingBlock()
- case rpc.LatestBlockNumber:
+ case myrpc.LatestBlockNumber:
block = api.eth.blockchain.CurrentBlock()
+ case myrpc.AcceptedBlockNumber:
+ block = api.eth.AcceptedBlock()
default:
block = api.eth.blockchain.GetBlockByNumber(uint64(number))
}
diff --git a/eth/backend.go b/eth/backend.go
index d119e89..95d4c24 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -27,6 +27,8 @@ import (
"github.com/ava-labs/coreth/consensus/dummy"
mycore "github.com/ava-labs/coreth/core"
+ "github.com/ava-labs/coreth/eth/filters"
+ "github.com/ava-labs/coreth/eth/gasprice"
"github.com/ava-labs/coreth/internal/ethapi"
"github.com/ava-labs/coreth/miner"
"github.com/ava-labs/coreth/node"
@@ -44,8 +46,6 @@ import (
"github.com/ava-labs/go-ethereum/core/types"
"github.com/ava-labs/go-ethereum/core/vm"
"github.com/ava-labs/go-ethereum/eth/downloader"
- "github.com/ava-labs/go-ethereum/eth/filters"
- "github.com/ava-labs/go-ethereum/eth/gasprice"
"github.com/ava-labs/go-ethereum/ethdb"
"github.com/ava-labs/go-ethereum/event"
"github.com/ava-labs/go-ethereum/log"
@@ -65,6 +65,10 @@ type LesServer interface {
SetContractBackend(bind.ContractBackend)
}
+type BackendCallbacks struct {
+ OnQueryAcceptedBlock func() *types.Block
+}
+
// Ethereum implements the Ethereum full node service.
type Ethereum struct {
config *Config
@@ -102,6 +106,7 @@ type Ethereum struct {
lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)
txSubmitChan chan struct{}
+ bcb *BackendCallbacks
}
func (s *Ethereum) AddLesServer(ls LesServer) {
@@ -119,7 +124,11 @@ func (s *Ethereum) SetContractBackend(backend bind.ContractBackend) {
// New creates a new Ethereum object (including the
// initialisation of the common Ethereum object)
-func New(ctx *node.ServiceContext, config *Config, cb *dummy.ConsensusCallbacks, mcb *miner.MinerCallbacks, chainDb ethdb.Database) (*Ethereum, error) {
+func New(ctx *node.ServiceContext, config *Config,
+ cb *dummy.ConsensusCallbacks,
+ mcb *miner.MinerCallbacks,
+ bcb *BackendCallbacks,
+ chainDb ethdb.Database) (*Ethereum, error) {
// Ensure configuration values are compatible and sane
if config.SyncMode == downloader.LightSync {
return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
@@ -164,6 +173,7 @@ func New(ctx *node.ServiceContext, config *Config, cb *dummy.ConsensusCallbacks,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
txSubmitChan: make(chan struct{}, 1),
+ bcb: bcb,
}
bcVersion := rawdb.ReadDatabaseVersion(chainDb)
@@ -557,3 +567,11 @@ func (s *Ethereum) StopPart() error {
func (s *Ethereum) GetTxSubmitCh() <-chan struct{} {
return s.txSubmitChan
}
+
+func (s *Ethereum) AcceptedBlock() *types.Block {
+ cb := s.bcb.OnQueryAcceptedBlock
+ if cb != nil {
+ return cb()
+ }
+ return s.blockchain.CurrentBlock()
+}
diff --git a/eth/config.go b/eth/config.go
index a84f5af..8aee0eb 100644
--- a/eth/config.go
+++ b/eth/config.go
@@ -24,13 +24,13 @@ import (
"runtime"
"time"
- "github.com/ava-labs/go-ethereum/common/hexutil"
+ "github.com/ava-labs/coreth/eth/gasprice"
+ "github.com/ava-labs/coreth/miner"
"github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/common/hexutil"
"github.com/ava-labs/go-ethereum/consensus/ethash"
"github.com/ava-labs/go-ethereum/core"
"github.com/ava-labs/go-ethereum/eth/downloader"
- "github.com/ava-labs/go-ethereum/eth/gasprice"
- "github.com/ava-labs/coreth/miner"
"github.com/ava-labs/go-ethereum/params"
)
@@ -158,32 +158,32 @@ type Config struct {
}
func MyDefaultConfig() Config {
- config := DefaultConfig
- chainConfig := &params.ChainConfig {
- ChainID: big.NewInt(42222),
- HomesteadBlock: big.NewInt(0),
- DAOForkBlock: big.NewInt(0),
- DAOForkSupport: true,
- EIP150Block: big.NewInt(0),
- EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"),
- EIP155Block: big.NewInt(0),
- EIP158Block: big.NewInt(0),
- ByzantiumBlock: big.NewInt(0),
- ConstantinopleBlock: big.NewInt(0),
- PetersburgBlock: big.NewInt(0),
- IstanbulBlock: nil,
- Ethash: nil,
- }
- genBalance := big.NewInt(1000000000000000000)
-
- config.Genesis = &core.Genesis{
- Config: chainConfig,
- Nonce: 0,
- Number: 0,
- ExtraData: hexutil.MustDecode("0x00"),
- GasLimit: 100000000,
- Difficulty: big.NewInt(0),
- Alloc: core.GenesisAlloc{ common.Address{}: { Balance: genBalance }},
- }
- return config
+ config := DefaultConfig
+ chainConfig := &params.ChainConfig{
+ ChainID: big.NewInt(42222),
+ HomesteadBlock: big.NewInt(0),
+ DAOForkBlock: big.NewInt(0),
+ DAOForkSupport: true,
+ EIP150Block: big.NewInt(0),
+ EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"),
+ EIP155Block: big.NewInt(0),
+ EIP158Block: big.NewInt(0),
+ ByzantiumBlock: big.NewInt(0),
+ ConstantinopleBlock: big.NewInt(0),
+ PetersburgBlock: big.NewInt(0),
+ IstanbulBlock: nil,
+ Ethash: nil,
+ }
+ genBalance := big.NewInt(1000000000000000000)
+
+ config.Genesis = &core.Genesis{
+ Config: chainConfig,
+ Nonce: 0,
+ Number: 0,
+ ExtraData: hexutil.MustDecode("0x00"),
+ GasLimit: 100000000,
+ Difficulty: big.NewInt(0),
+ Alloc: core.GenesisAlloc{common.Address{}: {Balance: genBalance}},
+ }
+ return config
}
diff --git a/eth/filters/api.go b/eth/filters/api.go
new file mode 100644
index 0000000..9d9e757
--- /dev/null
+++ b/eth/filters/api.go
@@ -0,0 +1,576 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package filters
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "math/big"
+ "sync"
+ "time"
+
+ ethereum "github.com/ava-labs/go-ethereum"
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/common/hexutil"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/ethdb"
+ "github.com/ava-labs/go-ethereum/event"
+ "github.com/ava-labs/go-ethereum/rpc"
+)
+
+var (
+ deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline
+)
+
+// filter is a helper struct that holds meta information over the filter type
+// and associated subscription in the event system.
+type filter struct {
+ typ Type
+ deadline *time.Timer // filter is inactiv when deadline triggers
+ hashes []common.Hash
+ crit FilterCriteria
+ logs []*types.Log
+ s *Subscription // associated subscription in event system
+}
+
+// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
+// information related to the Ethereum protocol such als blocks, transactions and logs.
+type PublicFilterAPI struct {
+ backend Backend
+ mux *event.TypeMux
+ quit chan struct{}
+ chainDb ethdb.Database
+ events *EventSystem
+ filtersMu sync.Mutex
+ filters map[rpc.ID]*filter
+}
+
+// NewPublicFilterAPI returns a new PublicFilterAPI instance.
+func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
+ api := &PublicFilterAPI{
+ backend: backend,
+ mux: backend.EventMux(),
+ chainDb: backend.ChainDb(),
+ events: NewEventSystem(backend.EventMux(), backend, lightMode),
+ filters: make(map[rpc.ID]*filter),
+ }
+ go api.timeoutLoop()
+
+ return api
+}
+
+// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used.
+// Tt is started when the api is created.
+func (api *PublicFilterAPI) timeoutLoop() {
+ ticker := time.NewTicker(5 * time.Minute)
+ for {
+ <-ticker.C
+ api.filtersMu.Lock()
+ for id, f := range api.filters {
+ select {
+ case <-f.deadline.C:
+ f.s.Unsubscribe()
+ delete(api.filters, id)
+ default:
+ continue
+ }
+ }
+ api.filtersMu.Unlock()
+ }
+}
+
+// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
+// as transactions enter the pending state.
+//
+// It is part of the filter package because this filter can be used through the
+// `eth_getFilterChanges` polling method that is also used for log filters.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter
+func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
+ var (
+ pendingTxs = make(chan []common.Hash)
+ pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
+ )
+
+ api.filtersMu.Lock()
+ api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case ph := <-pendingTxs:
+ api.filtersMu.Lock()
+ if f, found := api.filters[pendingTxSub.ID]; found {
+ f.hashes = append(f.hashes, ph...)
+ }
+ api.filtersMu.Unlock()
+ case <-pendingTxSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, pendingTxSub.ID)
+ api.filtersMu.Unlock()
+ return
+ }
+ }
+ }()
+
+ return pendingTxSub.ID
+}
+
+// NewPendingTransactions creates a subscription that is triggered each time a transaction
+// enters the transaction pool and was signed from one of the transactions this nodes manages.
+func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+ }
+
+ rpcSub := notifier.CreateSubscription()
+
+ go func() {
+ txHashes := make(chan []common.Hash, 128)
+ pendingTxSub := api.events.SubscribePendingTxs(txHashes)
+
+ for {
+ select {
+ case hashes := <-txHashes:
+ // To keep the original behaviour, send a single tx hash in one notification.
+ // TODO(rjl493456442) Send a batch of tx hashes in one notification
+ for _, h := range hashes {
+ notifier.Notify(rpcSub.ID, h)
+ }
+ case <-rpcSub.Err():
+ pendingTxSub.Unsubscribe()
+ return
+ case <-notifier.Closed():
+ pendingTxSub.Unsubscribe()
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
+// NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
+// It is part of the filter package since polling goes with eth_getFilterChanges.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter
+func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
+ var (
+ headers = make(chan *types.Header)
+ headerSub = api.events.SubscribeNewHeads(headers)
+ )
+
+ api.filtersMu.Lock()
+ api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case h := <-headers:
+ api.filtersMu.Lock()
+ if f, found := api.filters[headerSub.ID]; found {
+ f.hashes = append(f.hashes, h.Hash())
+ }
+ api.filtersMu.Unlock()
+ case <-headerSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, headerSub.ID)
+ api.filtersMu.Unlock()
+ return
+ }
+ }
+ }()
+
+ return headerSub.ID
+}
+
+// NewHeads send a notification each time a new (header) block is appended to the chain.
+func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+ }
+
+ rpcSub := notifier.CreateSubscription()
+
+ go func() {
+ headers := make(chan *types.Header)
+ headersSub := api.events.SubscribeNewHeads(headers)
+
+ for {
+ select {
+ case h := <-headers:
+ notifier.Notify(rpcSub.ID, h)
+ case <-rpcSub.Err():
+ headersSub.Unsubscribe()
+ return
+ case <-notifier.Closed():
+ headersSub.Unsubscribe()
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
+// Logs creates a subscription that fires for all new log that match the given filter criteria.
+func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+ }
+
+ var (
+ rpcSub = notifier.CreateSubscription()
+ matchedLogs = make(chan []*types.Log)
+ )
+
+ logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs)
+ if err != nil {
+ return nil, err
+ }
+
+ go func() {
+
+ for {
+ select {
+ case logs := <-matchedLogs:
+ for _, log := range logs {
+ notifier.Notify(rpcSub.ID, &log)
+ }
+ case <-rpcSub.Err(): // client send an unsubscribe request
+ logsSub.Unsubscribe()
+ return
+ case <-notifier.Closed(): // connection dropped
+ logsSub.Unsubscribe()
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
+// FilterCriteria represents a request to create a new filter.
+// Same as ethereum.FilterQuery but with UnmarshalJSON() method.
+type FilterCriteria ethereum.FilterQuery
+
+// NewFilter creates a new filter and returns the filter id. It can be
+// used to retrieve logs when the state changes. This method cannot be
+// used to fetch logs that are already stored in the state.
+//
+// Default criteria for the from and to block are "latest".
+// Using "latest" as block number will return logs for mined blocks.
+// Using "pending" as block number returns logs for not yet mined (pending) blocks.
+// In case logs are removed (chain reorg) previously returned logs are returned
+// again but with the removed property set to true.
+//
+// In case "fromBlock" > "toBlock" an error is returned.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
+func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
+ logs := make(chan []*types.Log)
+ logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), logs)
+ if err != nil {
+ return rpc.ID(""), err
+ }
+
+ api.filtersMu.Lock()
+ api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*types.Log, 0), s: logsSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case l := <-logs:
+ api.filtersMu.Lock()
+ if f, found := api.filters[logsSub.ID]; found {
+ f.logs = append(f.logs, l...)
+ }
+ api.filtersMu.Unlock()
+ case <-logsSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, logsSub.ID)
+ api.filtersMu.Unlock()
+ return
+ }
+ }
+ }()
+
+ return logsSub.ID, nil
+}
+
+// GetLogs returns logs matching the given argument that are stored within the state.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
+func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
+ var filter *Filter
+ if crit.BlockHash != nil {
+ // Block filter requested, construct a single-shot filter
+ filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics)
+ } else {
+ // Convert the RPC block numbers into internal representations
+ begin := rpc.LatestBlockNumber.Int64()
+ if crit.FromBlock != nil {
+ begin = crit.FromBlock.Int64()
+ }
+ end := rpc.LatestBlockNumber.Int64()
+ if crit.ToBlock != nil {
+ end = crit.ToBlock.Int64()
+ }
+ // Construct the range filter
+ filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics)
+ }
+ // Run the filter and return all the logs
+ logs, err := filter.Logs(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return returnLogs(logs), err
+}
+
+// UninstallFilter removes the filter with the given filter id.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter
+func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool {
+ api.filtersMu.Lock()
+ f, found := api.filters[id]
+ if found {
+ delete(api.filters, id)
+ }
+ api.filtersMu.Unlock()
+ if found {
+ f.s.Unsubscribe()
+ }
+
+ return found
+}
+
+// GetFilterLogs returns the logs for the filter with the given id.
+// If the filter could not be found an empty array of logs is returned.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
+func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Log, error) {
+ api.filtersMu.Lock()
+ f, found := api.filters[id]
+ api.filtersMu.Unlock()
+
+ if !found || f.typ != LogsSubscription {
+ return nil, fmt.Errorf("filter not found")
+ }
+
+ var filter *Filter
+ if f.crit.BlockHash != nil {
+ // Block filter requested, construct a single-shot filter
+ filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
+ } else {
+ // Convert the RPC block numbers into internal representations
+ begin := rpc.LatestBlockNumber.Int64()
+ if f.crit.FromBlock != nil {
+ begin = f.crit.FromBlock.Int64()
+ }
+ end := rpc.LatestBlockNumber.Int64()
+ if f.crit.ToBlock != nil {
+ end = f.crit.ToBlock.Int64()
+ }
+ // Construct the range filter
+ filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
+ }
+ // Run the filter and return all the logs
+ logs, err := filter.Logs(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return returnLogs(logs), nil
+}
+
+// GetFilterChanges returns the logs for the filter with the given id since
+// last time it was called. This can be used for polling.
+//
+// For pending transaction and block filters the result is []common.Hash.
+// (pending)Log filters return []Log.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
+func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
+ api.filtersMu.Lock()
+ defer api.filtersMu.Unlock()
+
+ if f, found := api.filters[id]; found {
+ if !f.deadline.Stop() {
+ // timer expired but filter is not yet removed in timeout loop
+ // receive timer value and reset timer
+ <-f.deadline.C
+ }
+ f.deadline.Reset(deadline)
+
+ switch f.typ {
+ case PendingTransactionsSubscription, BlocksSubscription:
+ hashes := f.hashes
+ f.hashes = nil
+ return returnHashes(hashes), nil
+ case LogsSubscription:
+ logs := f.logs
+ f.logs = nil
+ return returnLogs(logs), nil
+ }
+ }
+
+ return []interface{}{}, fmt.Errorf("filter not found")
+}
+
+// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
+// otherwise the given hashes array is returned.
+func returnHashes(hashes []common.Hash) []common.Hash {
+ if hashes == nil {
+ return []common.Hash{}
+ }
+ return hashes
+}
+
+// returnLogs is a helper that will return an empty log array in case the given logs array is nil,
+// otherwise the given logs array is returned.
+func returnLogs(logs []*types.Log) []*types.Log {
+ if logs == nil {
+ return []*types.Log{}
+ }
+ return logs
+}
+
+// UnmarshalJSON sets *args fields with given data.
+func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
+ type input struct {
+ BlockHash *common.Hash `json:"blockHash"`
+ FromBlock *rpc.BlockNumber `json:"fromBlock"`
+ ToBlock *rpc.BlockNumber `json:"toBlock"`
+ Addresses interface{} `json:"address"`
+ Topics []interface{} `json:"topics"`
+ }
+
+ var raw input
+ if err := json.Unmarshal(data, &raw); err != nil {
+ return err
+ }
+
+ if raw.BlockHash != nil {
+ if raw.FromBlock != nil || raw.ToBlock != nil {
+ // BlockHash is mutually exclusive with FromBlock/ToBlock criteria
+ return fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock, choose one or the other")
+ }
+ args.BlockHash = raw.BlockHash
+ } else {
+ if raw.FromBlock != nil {
+ args.FromBlock = big.NewInt(raw.FromBlock.Int64())
+ }
+
+ if raw.ToBlock != nil {
+ args.ToBlock = big.NewInt(raw.ToBlock.Int64())
+ }
+ }
+
+ args.Addresses = []common.Address{}
+
+ if raw.Addresses != nil {
+ // raw.Address can contain a single address or an array of addresses
+ switch rawAddr := raw.Addresses.(type) {
+ case []interface{}:
+ for i, addr := range rawAddr {
+ if strAddr, ok := addr.(string); ok {
+ addr, err := decodeAddress(strAddr)
+ if err != nil {
+ return fmt.Errorf("invalid address at index %d: %v", i, err)
+ }
+ args.Addresses = append(args.Addresses, addr)
+ } else {
+ return fmt.Errorf("non-string address at index %d", i)
+ }
+ }
+ case string:
+ addr, err := decodeAddress(rawAddr)
+ if err != nil {
+ return fmt.Errorf("invalid address: %v", err)
+ }
+ args.Addresses = []common.Address{addr}
+ default:
+ return errors.New("invalid addresses in query")
+ }
+ }
+
+ // topics is an array consisting of strings and/or arrays of strings.
+ // JSON null values are converted to common.Hash{} and ignored by the filter manager.
+ if len(raw.Topics) > 0 {
+ args.Topics = make([][]common.Hash, len(raw.Topics))
+ for i, t := range raw.Topics {
+ switch topic := t.(type) {
+ case nil:
+ // ignore topic when matching logs
+
+ case string:
+ // match specific topic
+ top, err := decodeTopic(topic)
+ if err != nil {
+ return err
+ }
+ args.Topics[i] = []common.Hash{top}
+
+ case []interface{}:
+ // or case e.g. [null, "topic0", "topic1"]
+ for _, rawTopic := range topic {
+ if rawTopic == nil {
+ // null component, match all
+ args.Topics[i] = nil
+ break
+ }
+ if topic, ok := rawTopic.(string); ok {
+ parsed, err := decodeTopic(topic)
+ if err != nil {
+ return err
+ }
+ args.Topics[i] = append(args.Topics[i], parsed)
+ } else {
+ return fmt.Errorf("invalid topic(s)")
+ }
+ }
+ default:
+ return fmt.Errorf("invalid topic(s)")
+ }
+ }
+ }
+
+ return nil
+}
+
+func decodeAddress(s string) (common.Address, error) {
+ b, err := hexutil.Decode(s)
+ if err == nil && len(b) != common.AddressLength {
+ err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for address", len(b), common.AddressLength)
+ }
+ return common.BytesToAddress(b), err
+}
+
+func decodeTopic(s string) (common.Hash, error) {
+ b, err := hexutil.Decode(s)
+ if err == nil && len(b) != common.HashLength {
+ err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for topic", len(b), common.HashLength)
+ }
+ return common.BytesToHash(b), err
+}
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
new file mode 100644
index 0000000..fa3d985
--- /dev/null
+++ b/eth/filters/filter.go
@@ -0,0 +1,348 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package filters
+
+import (
+ "context"
+ "errors"
+ "math/big"
+
+ "github.com/ava-labs/coreth/rpc"
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/core"
+ "github.com/ava-labs/go-ethereum/core/bloombits"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/ethdb"
+ "github.com/ava-labs/go-ethereum/event"
+)
+
+type Backend interface {
+ ChainDb() ethdb.Database
+ EventMux() *event.TypeMux
+ HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
+ HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
+ GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
+ GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
+
+ SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
+ SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
+ SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
+ SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
+
+ BloomStatus() (uint64, uint64)
+ ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
+}
+
+// Filter can be used to retrieve and filter logs.
+type Filter struct {
+ backend Backend
+
+ db ethdb.Database
+ addresses []common.Address
+ topics [][]common.Hash
+
+ block common.Hash // Block hash if filtering a single block
+ begin, end int64 // Range interval if filtering multiple blocks
+
+ matcher *bloombits.Matcher
+}
+
+// NewRangeFilter creates a new filter which uses a bloom filter on blocks to
+// figure out whether a particular block is interesting or not.
+func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
+ // Flatten the address and topic filter clauses into a single bloombits filter
+ // system. Since the bloombits are not positional, nil topics are permitted,
+ // which get flattened into a nil byte slice.
+ var filters [][][]byte
+ if len(addresses) > 0 {
+ filter := make([][]byte, len(addresses))
+ for i, address := range addresses {
+ filter[i] = address.Bytes()
+ }
+ filters = append(filters, filter)
+ }
+ for _, topicList := range topics {
+ filter := make([][]byte, len(topicList))
+ for i, topic := range topicList {
+ filter[i] = topic.Bytes()
+ }
+ filters = append(filters, filter)
+ }
+ size, _ := backend.BloomStatus()
+
+ // Create a generic filter and convert it into a range filter
+ filter := newFilter(backend, addresses, topics)
+
+ filter.matcher = bloombits.NewMatcher(size, filters)
+ filter.begin = begin
+ filter.end = end
+
+ return filter
+}
+
+// NewBlockFilter creates a new filter which directly inspects the contents of
+// a block to figure out whether it is interesting or not.
+func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
+ // Create a generic filter and convert it into a block filter
+ filter := newFilter(backend, addresses, topics)
+ filter.block = block
+ return filter
+}
+
+// newFilter creates a generic filter that can either filter based on a block hash,
+// or based on range queries. The search criteria needs to be explicitly set.
+func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash) *Filter {
+ return &Filter{
+ backend: backend,
+ addresses: addresses,
+ topics: topics,
+ db: backend.ChainDb(),
+ }
+}
+
+// Logs searches the blockchain for matching log entries, returning all from the
+// first block that contains matches, updating the start of the filter accordingly.
+func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
+ // If we're doing singleton block filtering, execute and return
+ if f.block != (common.Hash{}) {
+ header, err := f.backend.HeaderByHash(ctx, f.block)
+ if err != nil {
+ return nil, err
+ }
+ if header == nil {
+ return nil, errors.New("unknown block")
+ }
+ return f.blockLogs(ctx, header)
+ }
+ // Figure out the limits of the filter range
+ header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
+ if header == nil {
+ return nil, nil
+ }
+ head := header.Number.Uint64()
+
+ if f.begin == -1 {
+ f.begin = int64(head)
+ }
+ end := uint64(f.end)
+ if f.end == -1 {
+ end = head
+ }
+ // Gather all indexed logs, and finish with non indexed ones
+ var (
+ logs []*types.Log
+ err error
+ )
+ size, sections := f.backend.BloomStatus()
+ if indexed := sections * size; indexed > uint64(f.begin) {
+ if indexed > end {
+ logs, err = f.indexedLogs(ctx, end)
+ } else {
+ logs, err = f.indexedLogs(ctx, indexed-1)
+ }
+ if err != nil {
+ return logs, err
+ }
+ }
+ rest, err := f.unindexedLogs(ctx, end)
+ logs = append(logs, rest...)
+ return logs, err
+}
+
+// indexedLogs returns the logs matching the filter criteria based on the bloom
+// bits indexed available locally or via the network.
+func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ // Create a matcher session and request servicing from the backend
+ matches := make(chan uint64, 64)
+
+ session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
+ if err != nil {
+ return nil, err
+ }
+ defer session.Close()
+
+ f.backend.ServiceFilter(ctx, session)
+
+ // Iterate over the matches until exhausted or context closed
+ var logs []*types.Log
+
+ for {
+ select {
+ case number, ok := <-matches:
+ // Abort if all matches have been fulfilled
+ if !ok {
+ err := session.Error()
+ if err == nil {
+ f.begin = int64(end) + 1
+ }
+ return logs, err
+ }
+ f.begin = int64(number) + 1
+
+ // Retrieve the suggested block and pull any truly matching logs
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
+ if header == nil || err != nil {
+ return logs, err
+ }
+ found, err := f.checkMatches(ctx, header)
+ if err != nil {
+ return logs, err
+ }
+ logs = append(logs, found...)
+
+ case <-ctx.Done():
+ return logs, ctx.Err()
+ }
+ }
+}
+
+// indexedLogs returns the logs matching the filter criteria based on raw block
+// iteration and bloom matching.
+func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ var logs []*types.Log
+
+ for ; f.begin <= int64(end); f.begin++ {
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
+ if header == nil || err != nil {
+ return logs, err
+ }
+ found, err := f.blockLogs(ctx, header)
+ if err != nil {
+ return logs, err
+ }
+ logs = append(logs, found...)
+ }
+ return logs, nil
+}
+
+// blockLogs returns the logs matching the filter criteria within a single block.
+func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
+ if bloomFilter(header.Bloom, f.addresses, f.topics) {
+ found, err := f.checkMatches(ctx, header)
+ if err != nil {
+ return logs, err
+ }
+ logs = append(logs, found...)
+ }
+ return logs, nil
+}
+
+// checkMatches checks if the receipts belonging to the given header contain any log events that
+// match the filter criteria. This function is called when the bloom filter signals a potential match.
+func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
+ // Get the logs of the block
+ logsList, err := f.backend.GetLogs(ctx, header.Hash())
+ if err != nil {
+ return nil, err
+ }
+ var unfiltered []*types.Log
+ for _, logs := range logsList {
+ unfiltered = append(unfiltered, logs...)
+ }
+ logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
+ if len(logs) > 0 {
+ // We have matching logs, check if we need to resolve full logs via the light client
+ if logs[0].TxHash == (common.Hash{}) {
+ receipts, err := f.backend.GetReceipts(ctx, header.Hash())
+ if err != nil {
+ return nil, err
+ }
+ unfiltered = unfiltered[:0]
+ for _, receipt := range receipts {
+ unfiltered = append(unfiltered, receipt.Logs...)
+ }
+ logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
+ }
+ return logs, nil
+ }
+ return nil, nil
+}
+
+func includes(addresses []common.Address, a common.Address) bool {
+ for _, addr := range addresses {
+ if addr == a {
+ return true
+ }
+ }
+
+ return false
+}
+
+// filterLogs creates a slice of logs matching the given criteria.
+func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log {
+ var ret []*types.Log
+Logs:
+ for _, log := range logs {
+ if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber {
+ continue
+ }
+ if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber {
+ continue
+ }
+
+ if len(addresses) > 0 && !includes(addresses, log.Address) {
+ continue
+ }
+ // If the to filtered topics is greater than the amount of topics in logs, skip.
+ if len(topics) > len(log.Topics) {
+ continue Logs
+ }
+ for i, sub := range topics {
+ match := len(sub) == 0 // empty rule set == wildcard
+ for _, topic := range sub {
+ if log.Topics[i] == topic {
+ match = true
+ break
+ }
+ }
+ if !match {
+ continue Logs
+ }
+ }
+ ret = append(ret, log)
+ }
+ return ret
+}
+
+func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
+ if len(addresses) > 0 {
+ var included bool
+ for _, addr := range addresses {
+ if types.BloomLookup(bloom, addr) {
+ included = true
+ break
+ }
+ }
+ if !included {
+ return false
+ }
+ }
+
+ for _, sub := range topics {
+ included := len(sub) == 0 // empty rule set == wildcard
+ for _, topic := range sub {
+ if types.BloomLookup(bloom, topic) {
+ included = true
+ break
+ }
+ }
+ if !included {
+ return false
+ }
+ }
+ return true
+}
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
new file mode 100644
index 0000000..a27859f
--- /dev/null
+++ b/eth/filters/filter_system.go
@@ -0,0 +1,512 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package filters implements an ethereum filtering system for block,
+// transactions and log events.
+package filters
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ myrpc "github.com/ava-labs/coreth/rpc"
+ ethereum "github.com/ava-labs/go-ethereum"
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/core"
+ "github.com/ava-labs/go-ethereum/core/rawdb"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/event"
+ "github.com/ava-labs/go-ethereum/log"
+ "github.com/ava-labs/go-ethereum/rpc"
+)
+
+// Type determines the kind of filter and is used to put the filter in to
+// the correct bucket when added.
+type Type byte
+
+const (
+ // UnknownSubscription indicates an unknown subscription type
+ UnknownSubscription Type = iota
+ // LogsSubscription queries for new or removed (chain reorg) logs
+ LogsSubscription
+ // PendingLogsSubscription queries for logs in pending blocks
+ PendingLogsSubscription
+ // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
+ MinedAndPendingLogsSubscription
+ // PendingTransactionsSubscription queries tx hashes for pending
+ // transactions entering the pending state
+ PendingTransactionsSubscription
+ // BlocksSubscription queries hashes for blocks that are imported
+ BlocksSubscription
+ // LastSubscription keeps track of the last index
+ LastIndexSubscription
+)
+
+const (
+
+ // txChanSize is the size of channel listening to NewTxsEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
+ // rmLogsChanSize is the size of channel listening to RemovedLogsEvent.
+ rmLogsChanSize = 10
+ // logsChanSize is the size of channel listening to LogsEvent.
+ logsChanSize = 10
+ // chainEvChanSize is the size of channel listening to ChainEvent.
+ chainEvChanSize = 10
+)
+
+var (
+ ErrInvalidSubscriptionID = errors.New("invalid id")
+)
+
+type subscription struct {
+ id rpc.ID
+ typ Type
+ created time.Time
+ logsCrit ethereum.FilterQuery
+ logs chan []*types.Log
+ hashes chan []common.Hash
+ headers chan *types.Header
+ installed chan struct{} // closed when the filter is installed
+ err chan error // closed when the filter is uninstalled
+}
+
+// EventSystem creates subscriptions, processes events and broadcasts them to the
+// subscription which match the subscription criteria.
+type EventSystem struct {
+ mux *event.TypeMux
+ backend Backend
+ lightMode bool
+ lastHead *types.Header
+
+ // Subscriptions
+ txsSub event.Subscription // Subscription for new transaction event
+ logsSub event.Subscription // Subscription for new log event
+ rmLogsSub event.Subscription // Subscription for removed log event
+ chainSub event.Subscription // Subscription for new chain event
+ pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
+
+ // Channels
+ install chan *subscription // install filter for event notification
+ uninstall chan *subscription // remove filter for event notification
+ txsCh chan core.NewTxsEvent // Channel to receive new transactions event
+ logsCh chan []*types.Log // Channel to receive new log event
+ rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
+ chainCh chan core.ChainEvent // Channel to receive new chain event
+}
+
+// NewEventSystem creates a new manager that listens for event on the given mux,
+// parses and filters them. It uses the all map to retrieve filter changes. The
+// work loop holds its own index that is used to forward events to filters.
+//
+// The returned manager has a loop that needs to be stopped with the Stop function
+// or by stopping the given mux.
+func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem {
+ m := &EventSystem{
+ mux: mux,
+ backend: backend,
+ lightMode: lightMode,
+ install: make(chan *subscription),
+ uninstall: make(chan *subscription),
+ txsCh: make(chan core.NewTxsEvent, txChanSize),
+ logsCh: make(chan []*types.Log, logsChanSize),
+ rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
+ chainCh: make(chan core.ChainEvent, chainEvChanSize),
+ }
+
+ // Subscribe events
+ m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
+ m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
+ m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
+ m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
+ // TODO(rjl493456442): use feed to subscribe pending log event
+ m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
+
+ // Make sure none of the subscriptions are empty
+ if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
+ m.pendingLogSub.Closed() {
+ log.Crit("Subscribe for event system failed")
+ }
+
+ go m.eventLoop()
+ return m
+}
+
+// Subscription is created when the client registers itself for a particular event.
+type Subscription struct {
+ ID rpc.ID
+ f *subscription
+ es *EventSystem
+ unsubOnce sync.Once
+}
+
+// Err returns a channel that is closed when unsubscribed.
+func (sub *Subscription) Err() <-chan error {
+ return sub.f.err
+}
+
+// Unsubscribe uninstalls the subscription from the event broadcast loop.
+func (sub *Subscription) Unsubscribe() {
+ sub.unsubOnce.Do(func() {
+ uninstallLoop:
+ for {
+ // write uninstall request and consume logs/hashes. This prevents
+ // the eventLoop broadcast method to deadlock when writing to the
+ // filter event channel while the subscription loop is waiting for
+ // this method to return (and thus not reading these events).
+ select {
+ case sub.es.uninstall <- sub.f:
+ break uninstallLoop
+ case <-sub.f.logs:
+ case <-sub.f.hashes:
+ case <-sub.f.headers:
+ }
+ }
+
+ // wait for filter to be uninstalled in work loop before returning
+ // this ensures that the manager won't use the event channel which
+ // will probably be closed by the client asap after this method returns.
+ <-sub.Err()
+ })
+}
+
+// subscribe installs the subscription in the event broadcast loop.
+func (es *EventSystem) subscribe(sub *subscription) *Subscription {
+ es.install <- sub
+ <-sub.installed
+ return &Subscription{ID: sub.id, f: sub, es: es}
+}
+
+// SubscribeLogs creates a subscription that will write all logs matching the
+// given criteria to the given logs channel. Default value for the from and to
+// block is "latest". If the fromBlock > toBlock an error is returned.
+func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) (*Subscription, error) {
+ var from, to myrpc.BlockNumber
+ if crit.FromBlock == nil {
+ from = myrpc.LatestBlockNumber
+ } else {
+ from = myrpc.BlockNumber(crit.FromBlock.Int64())
+ }
+ if crit.ToBlock == nil {
+ to = myrpc.LatestBlockNumber
+ } else {
+ to = myrpc.BlockNumber(crit.ToBlock.Int64())
+ }
+
+ // only interested in pending logs
+ if from == myrpc.PendingBlockNumber && to == myrpc.PendingBlockNumber {
+ return es.subscribePendingLogs(crit, logs), nil
+ }
+ // only interested in new mined logs
+ if from == myrpc.LatestBlockNumber && to == myrpc.LatestBlockNumber {
+ return es.subscribeLogs(crit, logs), nil
+ }
+ // only interested in mined logs within a specific block range
+ if from >= 0 && to >= 0 && to >= from {
+ return es.subscribeLogs(crit, logs), nil
+ }
+ // interested in mined logs from a specific block number, new logs and pending logs
+ if from >= myrpc.LatestBlockNumber && to == myrpc.PendingBlockNumber {
+ return es.subscribeMinedPendingLogs(crit, logs), nil
+ }
+ // interested in logs from a specific block number to new mined blocks
+ if from >= 0 && to == myrpc.LatestBlockNumber {
+ return es.subscribeLogs(crit, logs), nil
+ }
+ return nil, fmt.Errorf("invalid from and to block combination: from > to")
+}
+
+// subscribeMinedPendingLogs creates a subscription that returned mined and
+// pending logs that match the given criteria.
+func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: MinedAndPendingLogsSubscription,
+ logsCrit: crit,
+ created: time.Now(),
+ logs: logs,
+ hashes: make(chan []common.Hash),
+ headers: make(chan *types.Header),
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+ return es.subscribe(sub)
+}
+
+// subscribeLogs creates a subscription that will write all logs matching the
+// given criteria to the given logs channel.
+func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: LogsSubscription,
+ logsCrit: crit,
+ created: time.Now(),
+ logs: logs,
+ hashes: make(chan []common.Hash),
+ headers: make(chan *types.Header),
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+ return es.subscribe(sub)
+}
+
+// subscribePendingLogs creates a subscription that writes transaction hashes for
+// transactions that enter the transaction pool.
+func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: PendingLogsSubscription,
+ logsCrit: crit,
+ created: time.Now(),
+ logs: logs,
+ hashes: make(chan []common.Hash),
+ headers: make(chan *types.Header),
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+ return es.subscribe(sub)
+}
+
+// SubscribeNewHeads creates a subscription that writes the header of a block that is
+// imported in the chain.
+func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: BlocksSubscription,
+ created: time.Now(),
+ logs: make(chan []*types.Log),
+ hashes: make(chan []common.Hash),
+ headers: headers,
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+ return es.subscribe(sub)
+}
+
+// SubscribePendingTxs creates a subscription that writes transaction hashes for
+// transactions that enter the transaction pool.
+func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: PendingTransactionsSubscription,
+ created: time.Now(),
+ logs: make(chan []*types.Log),
+ hashes: hashes,
+ headers: make(chan *types.Header),
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+ return es.subscribe(sub)
+}
+
+type filterIndex map[Type]map[rpc.ID]*subscription
+
+// broadcast event to filters that match criteria.
+func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
+ if ev == nil {
+ return
+ }
+
+ switch e := ev.(type) {
+ case []*types.Log:
+ if len(e) > 0 {
+ for _, f := range filters[LogsSubscription] {
+ if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
+ }
+ }
+ case core.RemovedLogsEvent:
+ for _, f := range filters[LogsSubscription] {
+ if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
+ }
+ case *event.TypeMuxEvent:
+ if muxe, ok := e.Data.(core.PendingLogsEvent); ok {
+ for _, f := range filters[PendingLogsSubscription] {
+ if e.Time.After(f.created) {
+ if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
+ }
+ }
+ }
+ case core.NewTxsEvent:
+ hashes := make([]common.Hash, 0, len(e.Txs))
+ for _, tx := range e.Txs {
+ hashes = append(hashes, tx.Hash())
+ }
+ for _, f := range filters[PendingTransactionsSubscription] {
+ f.hashes <- hashes
+ }
+ case core.ChainEvent:
+ for _, f := range filters[BlocksSubscription] {
+ f.headers <- e.Block.Header()
+ }
+ if es.lightMode && len(filters[LogsSubscription]) > 0 {
+ es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
+ for _, f := range filters[LogsSubscription] {
+ if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
+ }
+ })
+ }
+ }
+}
+
+func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
+ oldh := es.lastHead
+ es.lastHead = newHeader
+ if oldh == nil {
+ return
+ }
+ newh := newHeader
+ // find common ancestor, create list of rolled back and new block hashes
+ var oldHeaders, newHeaders []*types.Header
+ for oldh.Hash() != newh.Hash() {
+ if oldh.Number.Uint64() >= newh.Number.Uint64() {
+ oldHeaders = append(oldHeaders, oldh)
+ oldh = rawdb.ReadHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1)
+ }
+ if oldh.Number.Uint64() < newh.Number.Uint64() {
+ newHeaders = append(newHeaders, newh)
+ newh = rawdb.ReadHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1)
+ if newh == nil {
+ // happens when CHT syncing, nothing to do
+ newh = oldh
+ }
+ }
+ }
+ // roll back old blocks
+ for _, h := range oldHeaders {
+ callBack(h, true)
+ }
+ // check new blocks (array is in reverse order)
+ for i := len(newHeaders) - 1; i >= 0; i-- {
+ callBack(newHeaders[i], false)
+ }
+}
+
+// filter logs of a single header in light client mode
+func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log {
+ if bloomFilter(header.Bloom, addresses, topics) {
+ // Get the logs of the block
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+ defer cancel()
+ logsList, err := es.backend.GetLogs(ctx, header.Hash())
+ if err != nil {
+ return nil
+ }
+ var unfiltered []*types.Log
+ for _, logs := range logsList {
+ for _, log := range logs {
+ logcopy := *log
+ logcopy.Removed = remove
+ unfiltered = append(unfiltered, &logcopy)
+ }
+ }
+ logs := filterLogs(unfiltered, nil, nil, addresses, topics)
+ if len(logs) > 0 && logs[0].TxHash == (common.Hash{}) {
+ // We have matching but non-derived logs
+ receipts, err := es.backend.GetReceipts(ctx, header.Hash())
+ if err != nil {
+ return nil
+ }
+ unfiltered = unfiltered[:0]
+ for _, receipt := range receipts {
+ for _, log := range receipt.Logs {
+ logcopy := *log
+ logcopy.Removed = remove
+ unfiltered = append(unfiltered, &logcopy)
+ }
+ }
+ logs = filterLogs(unfiltered, nil, nil, addresses, topics)
+ }
+ return logs
+ }
+ return nil
+}
+
+// eventLoop (un)installs filters and processes mux events.
+func (es *EventSystem) eventLoop() {
+ // Ensure all subscriptions get cleaned up
+ defer func() {
+ es.pendingLogSub.Unsubscribe()
+ es.txsSub.Unsubscribe()
+ es.logsSub.Unsubscribe()
+ es.rmLogsSub.Unsubscribe()
+ es.chainSub.Unsubscribe()
+ }()
+
+ index := make(filterIndex)
+ for i := UnknownSubscription; i < LastIndexSubscription; i++ {
+ index[i] = make(map[rpc.ID]*subscription)
+ }
+
+ for {
+ select {
+ // Handle subscribed events
+ case ev := <-es.txsCh:
+ es.broadcast(index, ev)
+ case ev := <-es.logsCh:
+ es.broadcast(index, ev)
+ case ev := <-es.rmLogsCh:
+ es.broadcast(index, ev)
+ case ev := <-es.chainCh:
+ es.broadcast(index, ev)
+ case ev, active := <-es.pendingLogSub.Chan():
+ if !active { // system stopped
+ return
+ }
+ es.broadcast(index, ev)
+
+ case f := <-es.install:
+ if f.typ == MinedAndPendingLogsSubscription {
+ // the type are logs and pending logs subscriptions
+ index[LogsSubscription][f.id] = f
+ index[PendingLogsSubscription][f.id] = f
+ } else {
+ index[f.typ][f.id] = f
+ }
+ close(f.installed)
+
+ case f := <-es.uninstall:
+ if f.typ == MinedAndPendingLogsSubscription {
+ // the type are logs and pending logs subscriptions
+ delete(index[LogsSubscription], f.id)
+ delete(index[PendingLogsSubscription], f.id)
+ } else {
+ delete(index[f.typ], f.id)
+ }
+ close(f.err)
+
+ // System stopped
+ case <-es.txsSub.Err():
+ return
+ case <-es.logsSub.Err():
+ return
+ case <-es.rmLogsSub.Err():
+ return
+ case <-es.chainSub.Err():
+ return
+ }
+ }
+}
diff --git a/eth/gasprice/gasprice.go b/eth/gasprice/gasprice.go
new file mode 100644
index 0000000..9b632c2
--- /dev/null
+++ b/eth/gasprice/gasprice.go
@@ -0,0 +1,189 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package gasprice
+
+import (
+ "context"
+ "math/big"
+ "sort"
+ "sync"
+
+ "github.com/ava-labs/coreth/internal/ethapi"
+ "github.com/ava-labs/coreth/params"
+ "github.com/ava-labs/coreth/rpc"
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/core/types"
+)
+
+var maxPrice = big.NewInt(500 * params.GWei)
+
+type Config struct {
+ Blocks int
+ Percentile int
+ Default *big.Int `toml:",omitempty"`
+}
+
+// Oracle recommends gas prices based on the content of recent
+// blocks. Suitable for both light and full clients.
+type Oracle struct {
+ backend ethapi.Backend
+ lastHead common.Hash
+ lastPrice *big.Int
+ cacheLock sync.RWMutex
+ fetchLock sync.Mutex
+
+ checkBlocks, maxEmpty, maxBlocks int
+ percentile int
+}
+
+// NewOracle returns a new oracle.
+func NewOracle(backend ethapi.Backend, params Config) *Oracle {
+ blocks := params.Blocks
+ if blocks < 1 {
+ blocks = 1
+ }
+ percent := params.Percentile
+ if percent < 0 {
+ percent = 0
+ }
+ if percent > 100 {
+ percent = 100
+ }
+ return &Oracle{
+ backend: backend,
+ lastPrice: params.Default,
+ checkBlocks: blocks,
+ maxEmpty: blocks / 2,
+ maxBlocks: blocks * 5,
+ percentile: percent,
+ }
+}
+
+// SuggestPrice returns the recommended gas price.
+func (gpo *Oracle) SuggestPrice(ctx context.Context) (*big.Int, error) {
+ gpo.cacheLock.RLock()
+ lastHead := gpo.lastHead
+ lastPrice := gpo.lastPrice
+ gpo.cacheLock.RUnlock()
+
+ head, _ := gpo.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
+ headHash := head.Hash()
+ if headHash == lastHead {
+ return lastPrice, nil
+ }
+
+ gpo.fetchLock.Lock()
+ defer gpo.fetchLock.Unlock()
+
+ // try checking the cache again, maybe the last fetch fetched what we need
+ gpo.cacheLock.RLock()
+ lastHead = gpo.lastHead
+ lastPrice = gpo.lastPrice
+ gpo.cacheLock.RUnlock()
+ if headHash == lastHead {
+ return lastPrice, nil
+ }
+
+ blockNum := head.Number.Uint64()
+ ch := make(chan getBlockPricesResult, gpo.checkBlocks)
+ sent := 0
+ exp := 0
+ var blockPrices []*big.Int
+ for sent < gpo.checkBlocks && blockNum > 0 {
+ go gpo.getBlockPrices(ctx, types.MakeSigner(gpo.backend.ChainConfig(), big.NewInt(int64(blockNum))), blockNum, ch)
+ sent++
+ exp++
+ blockNum--
+ }
+ maxEmpty := gpo.maxEmpty
+ for exp > 0 {
+ res := <-ch
+ if res.err != nil {
+ return lastPrice, res.err
+ }
+ exp--
+ if res.price != nil {
+ blockPrices = append(blockPrices, res.price)
+ continue
+ }
+ if maxEmpty > 0 {
+ maxEmpty--
+ continue
+ }
+ if blockNum > 0 && sent < gpo.maxBlocks {
+ go gpo.getBlockPrices(ctx, types.MakeSigner(gpo.backend.ChainConfig(), big.NewInt(int64(blockNum))), blockNum, ch)
+ sent++
+ exp++
+ blockNum--
+ }
+ }
+ price := lastPrice
+ if len(blockPrices) > 0 {
+ sort.Sort(bigIntArray(blockPrices))
+ price = blockPrices[(len(blockPrices)-1)*gpo.percentile/100]
+ }
+ if price.Cmp(maxPrice) > 0 {
+ price = new(big.Int).Set(maxPrice)
+ }
+
+ gpo.cacheLock.Lock()
+ gpo.lastHead = headHash
+ gpo.lastPrice = price
+ gpo.cacheLock.Unlock()
+ return price, nil
+}
+
+type getBlockPricesResult struct {
+ price *big.Int
+ err error
+}
+
+type transactionsByGasPrice []*types.Transaction
+
+func (t transactionsByGasPrice) Len() int { return len(t) }
+func (t transactionsByGasPrice) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
+func (t transactionsByGasPrice) Less(i, j int) bool { return t[i].GasPrice().Cmp(t[j].GasPrice()) < 0 }
+
+// getBlockPrices calculates the lowest transaction gas price in a given block
+// and sends it to the result channel. If the block is empty, price is nil.
+func (gpo *Oracle) getBlockPrices(ctx context.Context, signer types.Signer, blockNum uint64, ch chan getBlockPricesResult) {
+ block, err := gpo.backend.BlockByNumber(ctx, rpc.BlockNumber(blockNum))
+ if block == nil {
+ ch <- getBlockPricesResult{nil, err}
+ return
+ }
+
+ blockTxs := block.Transactions()
+ txs := make([]*types.Transaction, len(blockTxs))
+ copy(txs, blockTxs)
+ sort.Sort(transactionsByGasPrice(txs))
+
+ for _, tx := range txs {
+ sender, err := types.Sender(signer, tx)
+ if err == nil && sender != block.Coinbase() {
+ ch <- getBlockPricesResult{tx.GasPrice(), nil}
+ return
+ }
+ }
+ ch <- getBlockPricesResult{nil, nil}
+}
+
+type bigIntArray []*big.Int
+
+func (s bigIntArray) Len() int { return len(s) }
+func (s bigIntArray) Less(i, j int) bool { return s[i].Cmp(s[j]) < 0 }
+func (s bigIntArray) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
diff --git a/eth/gen_config.go b/eth/gen_config.go
index 7d54b6b..a254f5d 100644
--- a/eth/gen_config.go
+++ b/eth/gen_config.go
@@ -6,12 +6,12 @@ import (
"math/big"
"time"
+ "github.com/ava-labs/coreth/eth/gasprice"
+ "github.com/ava-labs/coreth/miner"
"github.com/ava-labs/go-ethereum/common"
"github.com/ava-labs/go-ethereum/consensus/ethash"
"github.com/ava-labs/go-ethereum/core"
"github.com/ava-labs/go-ethereum/eth/downloader"
- "github.com/ava-labs/go-ethereum/eth/gasprice"
- "github.com/ava-labs/coreth/miner"
"github.com/ava-labs/go-ethereum/params"
)