aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <tederminant@gmail.com>2019-10-01 13:28:55 -0400
committerDeterminant <tederminant@gmail.com>2019-10-01 13:28:55 -0400
commitbc43122f54ed8de21f74ee6393549c9554d732e8 (patch)
tree549fffb8771dd280b29eec1e8691e67eb4b24895
parent841b2b7225a9318718c3c856a9debdf01bc4f061 (diff)
support "accepted" as block number in JSON-RPC
-rw-r--r--coreth.go10
-rw-r--r--eth/api.go6
-rw-r--r--eth/api_backend.go14
-rw-r--r--eth/api_tracer.go25
-rw-r--r--eth/backend.go24
-rw-r--r--eth/config.go62
-rw-r--r--eth/filters/api.go576
-rw-r--r--eth/filters/filter.go348
-rw-r--r--eth/filters/filter_system.go512
-rw-r--r--eth/gasprice/gasprice.go189
-rw-r--r--eth/gen_config.go4
-rw-r--r--internal/ethapi/api.go4
-rw-r--r--internal/ethapi/backend.go8
-rw-r--r--params/denomination.go28
-rw-r--r--rpc/types.go77
15 files changed, 1831 insertions, 56 deletions
diff --git a/coreth.go b/coreth.go
index 04d782f..fe9e6d6 100644
--- a/coreth.go
+++ b/coreth.go
@@ -29,6 +29,7 @@ type ETHChain struct {
backend *eth.Ethereum
cb *dummy.ConsensusCallbacks
mcb *miner.MinerCallbacks
+ bcb *eth.BackendCallbacks
}
func isLocalBlock(block *types.Block) bool {
@@ -52,8 +53,9 @@ func NewETHChain(config *eth.Config, nodecfg *node.Config, etherBase *common.Add
}
cb := new(dummy.ConsensusCallbacks)
mcb := new(miner.MinerCallbacks)
- backend, _ := eth.New(&ctx, config, cb, mcb, chainDB)
- chain := &ETHChain{backend: backend, cb: cb, mcb: mcb}
+ bcb := new(eth.BackendCallbacks)
+ backend, _ := eth.New(&ctx, config, cb, mcb, bcb, chainDB)
+ chain := &ETHChain{backend: backend, cb: cb, mcb: mcb, bcb: bcb}
if etherBase == nil {
etherBase = &common.Address{
1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
@@ -125,6 +127,10 @@ func (self *ETHChain) SetOnFinalizeAndAssemble(cb dummy.OnFinalizeAndAssembleCal
self.cb.OnFinalizeAndAssemble = cb
}
+func (self *ETHChain) SetOnQueryAcceptedBlock(cb func() *types.Block) {
+ self.bcb.OnQueryAcceptedBlock = cb
+}
+
// Returns a new mutable state based on the current HEAD block.
func (self *ETHChain) CurrentState() (*state.StateDB, error) {
return self.backend.BlockChain().State()
diff --git a/eth/api.go b/eth/api.go
index 10cf01d..59750ce 100644
--- a/eth/api.go
+++ b/eth/api.go
@@ -28,15 +28,15 @@ import (
"strings"
"time"
+ "github.com/ava-labs/coreth/internal/ethapi"
+ "github.com/ava-labs/coreth/rpc"
"github.com/ava-labs/go-ethereum/common"
"github.com/ava-labs/go-ethereum/common/hexutil"
"github.com/ava-labs/go-ethereum/core"
"github.com/ava-labs/go-ethereum/core/rawdb"
"github.com/ava-labs/go-ethereum/core/state"
"github.com/ava-labs/go-ethereum/core/types"
- "github.com/ava-labs/coreth/internal/ethapi"
"github.com/ava-labs/go-ethereum/rlp"
- "github.com/ava-labs/go-ethereum/rpc"
"github.com/ava-labs/go-ethereum/trie"
)
@@ -271,6 +271,8 @@ func (api *PublicDebugAPI) DumpBlock(blockNr rpc.BlockNumber) (state.Dump, error
var block *types.Block
if blockNr == rpc.LatestBlockNumber {
block = api.eth.blockchain.CurrentBlock()
+ } else if blockNr == rpc.AcceptedBlockNumber {
+ block = api.eth.AcceptedBlock()
} else {
block = api.eth.blockchain.GetBlockByNumber(uint64(blockNr))
}
diff --git a/eth/api_backend.go b/eth/api_backend.go
index f4c0101..99556c9 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -21,6 +21,8 @@ import (
"errors"
"math/big"
+ "github.com/ava-labs/coreth/eth/gasprice"
+ "github.com/ava-labs/coreth/rpc"
"github.com/ava-labs/go-ethereum/accounts"
"github.com/ava-labs/go-ethereum/common"
"github.com/ava-labs/go-ethereum/common/math"
@@ -31,11 +33,9 @@ import (
"github.com/ava-labs/go-ethereum/core/types"
"github.com/ava-labs/go-ethereum/core/vm"
"github.com/ava-labs/go-ethereum/eth/downloader"
- "github.com/ava-labs/go-ethereum/eth/gasprice"
"github.com/ava-labs/go-ethereum/ethdb"
"github.com/ava-labs/go-ethereum/event"
"github.com/ava-labs/go-ethereum/params"
- "github.com/ava-labs/go-ethereum/rpc"
)
// EthAPIBackend implements ethapi.Backend for full nodes
@@ -54,6 +54,10 @@ func (b *EthAPIBackend) CurrentBlock() *types.Block {
return b.eth.blockchain.CurrentBlock()
}
+func (b *EthAPIBackend) AcceptedBlock() *types.Block {
+ return b.eth.AcceptedBlock()
+}
+
func (b *EthAPIBackend) SetHead(number uint64) {
b.eth.protocolManager.downloader.Cancel()
b.eth.blockchain.SetHead(number)
@@ -69,6 +73,9 @@ func (b *EthAPIBackend) HeaderByNumber(ctx context.Context, number rpc.BlockNumb
if number == rpc.LatestBlockNumber {
return b.eth.blockchain.CurrentBlock().Header(), nil
}
+ if number == rpc.AcceptedBlockNumber {
+ return b.eth.AcceptedBlock().Header(), nil
+ }
return b.eth.blockchain.GetHeaderByNumber(uint64(number)), nil
}
@@ -86,6 +93,9 @@ func (b *EthAPIBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumbe
if number == rpc.LatestBlockNumber {
return b.eth.blockchain.CurrentBlock(), nil
}
+ if number == rpc.AcceptedBlockNumber {
+ return b.eth.AcceptedBlock(), nil
+ }
return b.eth.blockchain.GetBlockByNumber(uint64(number)), nil
}
diff --git a/eth/api_tracer.go b/eth/api_tracer.go
index 618e7c8..b87a9fe 100644
--- a/eth/api_tracer.go
+++ b/eth/api_tracer.go
@@ -28,6 +28,8 @@ import (
"sync"
"time"
+ "github.com/ava-labs/coreth/internal/ethapi"
+ myrpc "github.com/ava-labs/coreth/rpc"
"github.com/ava-labs/go-ethereum/common"
"github.com/ava-labs/go-ethereum/common/hexutil"
"github.com/ava-labs/go-ethereum/core"
@@ -36,7 +38,6 @@ import (
"github.com/ava-labs/go-ethereum/core/types"
"github.com/ava-labs/go-ethereum/core/vm"
"github.com/ava-labs/go-ethereum/eth/tracers"
- "github.com/ava-labs/coreth/internal/ethapi"
"github.com/ava-labs/go-ethereum/log"
"github.com/ava-labs/go-ethereum/rlp"
"github.com/ava-labs/go-ethereum/rpc"
@@ -101,23 +102,27 @@ type txTraceTask struct {
// TraceChain returns the structured logs created during the execution of EVM
// between two blocks (excluding start) and returns them as a JSON object.
-func (api *PrivateDebugAPI) TraceChain(ctx context.Context, start, end rpc.BlockNumber, config *TraceConfig) (*rpc.Subscription, error) {
+func (api *PrivateDebugAPI) TraceChain(ctx context.Context, start, end myrpc.BlockNumber, config *TraceConfig) (*rpc.Subscription, error) {
// Fetch the block interval that we want to trace
var from, to *types.Block
switch start {
- case rpc.PendingBlockNumber:
+ case myrpc.PendingBlockNumber:
from = api.eth.miner.PendingBlock()
- case rpc.LatestBlockNumber:
+ case myrpc.LatestBlockNumber:
from = api.eth.blockchain.CurrentBlock()
+ case myrpc.AcceptedBlockNumber:
+ from = api.eth.AcceptedBlock()
default:
from = api.eth.blockchain.GetBlockByNumber(uint64(start))
}
switch end {
- case rpc.PendingBlockNumber:
+ case myrpc.PendingBlockNumber:
to = api.eth.miner.PendingBlock()
- case rpc.LatestBlockNumber:
+ case myrpc.LatestBlockNumber:
to = api.eth.blockchain.CurrentBlock()
+ case myrpc.AcceptedBlockNumber:
+ from = api.eth.AcceptedBlock()
default:
to = api.eth.blockchain.GetBlockByNumber(uint64(end))
}
@@ -352,15 +357,17 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl
// TraceBlockByNumber returns the structured logs created during the execution of
// EVM and returns them as a JSON object.
-func (api *PrivateDebugAPI) TraceBlockByNumber(ctx context.Context, number rpc.BlockNumber, config *TraceConfig) ([]*txTraceResult, error) {
+func (api *PrivateDebugAPI) TraceBlockByNumber(ctx context.Context, number myrpc.BlockNumber, config *TraceConfig) ([]*txTraceResult, error) {
// Fetch the block that we want to trace
var block *types.Block
switch number {
- case rpc.PendingBlockNumber:
+ case myrpc.PendingBlockNumber:
block = api.eth.miner.PendingBlock()
- case rpc.LatestBlockNumber:
+ case myrpc.LatestBlockNumber:
block = api.eth.blockchain.CurrentBlock()
+ case myrpc.AcceptedBlockNumber:
+ block = api.eth.AcceptedBlock()
default:
block = api.eth.blockchain.GetBlockByNumber(uint64(number))
}
diff --git a/eth/backend.go b/eth/backend.go
index d119e89..95d4c24 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -27,6 +27,8 @@ import (
"github.com/ava-labs/coreth/consensus/dummy"
mycore "github.com/ava-labs/coreth/core"
+ "github.com/ava-labs/coreth/eth/filters"
+ "github.com/ava-labs/coreth/eth/gasprice"
"github.com/ava-labs/coreth/internal/ethapi"
"github.com/ava-labs/coreth/miner"
"github.com/ava-labs/coreth/node"
@@ -44,8 +46,6 @@ import (
"github.com/ava-labs/go-ethereum/core/types"
"github.com/ava-labs/go-ethereum/core/vm"
"github.com/ava-labs/go-ethereum/eth/downloader"
- "github.com/ava-labs/go-ethereum/eth/filters"
- "github.com/ava-labs/go-ethereum/eth/gasprice"
"github.com/ava-labs/go-ethereum/ethdb"
"github.com/ava-labs/go-ethereum/event"
"github.com/ava-labs/go-ethereum/log"
@@ -65,6 +65,10 @@ type LesServer interface {
SetContractBackend(bind.ContractBackend)
}
+type BackendCallbacks struct {
+ OnQueryAcceptedBlock func() *types.Block
+}
+
// Ethereum implements the Ethereum full node service.
type Ethereum struct {
config *Config
@@ -102,6 +106,7 @@ type Ethereum struct {
lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)
txSubmitChan chan struct{}
+ bcb *BackendCallbacks
}
func (s *Ethereum) AddLesServer(ls LesServer) {
@@ -119,7 +124,11 @@ func (s *Ethereum) SetContractBackend(backend bind.ContractBackend) {
// New creates a new Ethereum object (including the
// initialisation of the common Ethereum object)
-func New(ctx *node.ServiceContext, config *Config, cb *dummy.ConsensusCallbacks, mcb *miner.MinerCallbacks, chainDb ethdb.Database) (*Ethereum, error) {
+func New(ctx *node.ServiceContext, config *Config,
+ cb *dummy.ConsensusCallbacks,
+ mcb *miner.MinerCallbacks,
+ bcb *BackendCallbacks,
+ chainDb ethdb.Database) (*Ethereum, error) {
// Ensure configuration values are compatible and sane
if config.SyncMode == downloader.LightSync {
return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
@@ -164,6 +173,7 @@ func New(ctx *node.ServiceContext, config *Config, cb *dummy.ConsensusCallbacks,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
txSubmitChan: make(chan struct{}, 1),
+ bcb: bcb,
}
bcVersion := rawdb.ReadDatabaseVersion(chainDb)
@@ -557,3 +567,11 @@ func (s *Ethereum) StopPart() error {
func (s *Ethereum) GetTxSubmitCh() <-chan struct{} {
return s.txSubmitChan
}
+
+func (s *Ethereum) AcceptedBlock() *types.Block {
+ cb := s.bcb.OnQueryAcceptedBlock
+ if cb != nil {
+ return cb()
+ }
+ return s.blockchain.CurrentBlock()
+}
diff --git a/eth/config.go b/eth/config.go
index a84f5af..8aee0eb 100644
--- a/eth/config.go
+++ b/eth/config.go
@@ -24,13 +24,13 @@ import (
"runtime"
"time"
- "github.com/ava-labs/go-ethereum/common/hexutil"
+ "github.com/ava-labs/coreth/eth/gasprice"
+ "github.com/ava-labs/coreth/miner"
"github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/common/hexutil"
"github.com/ava-labs/go-ethereum/consensus/ethash"
"github.com/ava-labs/go-ethereum/core"
"github.com/ava-labs/go-ethereum/eth/downloader"
- "github.com/ava-labs/go-ethereum/eth/gasprice"
- "github.com/ava-labs/coreth/miner"
"github.com/ava-labs/go-ethereum/params"
)
@@ -158,32 +158,32 @@ type Config struct {
}
func MyDefaultConfig() Config {
- config := DefaultConfig
- chainConfig := &params.ChainConfig {
- ChainID: big.NewInt(42222),
- HomesteadBlock: big.NewInt(0),
- DAOForkBlock: big.NewInt(0),
- DAOForkSupport: true,
- EIP150Block: big.NewInt(0),
- EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"),
- EIP155Block: big.NewInt(0),
- EIP158Block: big.NewInt(0),
- ByzantiumBlock: big.NewInt(0),
- ConstantinopleBlock: big.NewInt(0),
- PetersburgBlock: big.NewInt(0),
- IstanbulBlock: nil,
- Ethash: nil,
- }
- genBalance := big.NewInt(1000000000000000000)
-
- config.Genesis = &core.Genesis{
- Config: chainConfig,
- Nonce: 0,
- Number: 0,
- ExtraData: hexutil.MustDecode("0x00"),
- GasLimit: 100000000,
- Difficulty: big.NewInt(0),
- Alloc: core.GenesisAlloc{ common.Address{}: { Balance: genBalance }},
- }
- return config
+ config := DefaultConfig
+ chainConfig := &params.ChainConfig{
+ ChainID: big.NewInt(42222),
+ HomesteadBlock: big.NewInt(0),
+ DAOForkBlock: big.NewInt(0),
+ DAOForkSupport: true,
+ EIP150Block: big.NewInt(0),
+ EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"),
+ EIP155Block: big.NewInt(0),
+ EIP158Block: big.NewInt(0),
+ ByzantiumBlock: big.NewInt(0),
+ ConstantinopleBlock: big.NewInt(0),
+ PetersburgBlock: big.NewInt(0),
+ IstanbulBlock: nil,
+ Ethash: nil,
+ }
+ genBalance := big.NewInt(1000000000000000000)
+
+ config.Genesis = &core.Genesis{
+ Config: chainConfig,
+ Nonce: 0,
+ Number: 0,
+ ExtraData: hexutil.MustDecode("0x00"),
+ GasLimit: 100000000,
+ Difficulty: big.NewInt(0),
+ Alloc: core.GenesisAlloc{common.Address{}: {Balance: genBalance}},
+ }
+ return config
}
diff --git a/eth/filters/api.go b/eth/filters/api.go
new file mode 100644
index 0000000..9d9e757
--- /dev/null
+++ b/eth/filters/api.go
@@ -0,0 +1,576 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package filters
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "math/big"
+ "sync"
+ "time"
+
+ ethereum "github.com/ava-labs/go-ethereum"
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/common/hexutil"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/ethdb"
+ "github.com/ava-labs/go-ethereum/event"
+ "github.com/ava-labs/go-ethereum/rpc"
+)
+
+var (
+ deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline
+)
+
+// filter is a helper struct that holds meta information over the filter type
+// and associated subscription in the event system.
+type filter struct {
+ typ Type
+ deadline *time.Timer // filter is inactiv when deadline triggers
+ hashes []common.Hash
+ crit FilterCriteria
+ logs []*types.Log
+ s *Subscription // associated subscription in event system
+}
+
+// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
+// information related to the Ethereum protocol such als blocks, transactions and logs.
+type PublicFilterAPI struct {
+ backend Backend
+ mux *event.TypeMux
+ quit chan struct{}
+ chainDb ethdb.Database
+ events *EventSystem
+ filtersMu sync.Mutex
+ filters map[rpc.ID]*filter
+}
+
+// NewPublicFilterAPI returns a new PublicFilterAPI instance.
+func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
+ api := &PublicFilterAPI{
+ backend: backend,
+ mux: backend.EventMux(),
+ chainDb: backend.ChainDb(),
+ events: NewEventSystem(backend.EventMux(), backend, lightMode),
+ filters: make(map[rpc.ID]*filter),
+ }
+ go api.timeoutLoop()
+
+ return api
+}
+
+// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used.
+// Tt is started when the api is created.
+func (api *PublicFilterAPI) timeoutLoop() {
+ ticker := time.NewTicker(5 * time.Minute)
+ for {
+ <-ticker.C
+ api.filtersMu.Lock()
+ for id, f := range api.filters {
+ select {
+ case <-f.deadline.C:
+ f.s.Unsubscribe()
+ delete(api.filters, id)
+ default:
+ continue
+ }
+ }
+ api.filtersMu.Unlock()
+ }
+}
+
+// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
+// as transactions enter the pending state.
+//
+// It is part of the filter package because this filter can be used through the
+// `eth_getFilterChanges` polling method that is also used for log filters.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter
+func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
+ var (
+ pendingTxs = make(chan []common.Hash)
+ pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
+ )
+
+ api.filtersMu.Lock()
+ api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case ph := <-pendingTxs:
+ api.filtersMu.Lock()
+ if f, found := api.filters[pendingTxSub.ID]; found {
+ f.hashes = append(f.hashes, ph...)
+ }
+ api.filtersMu.Unlock()
+ case <-pendingTxSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, pendingTxSub.ID)
+ api.filtersMu.Unlock()
+ return
+ }
+ }
+ }()
+
+ return pendingTxSub.ID
+}
+
+// NewPendingTransactions creates a subscription that is triggered each time a transaction
+// enters the transaction pool and was signed from one of the transactions this nodes manages.
+func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+ }
+
+ rpcSub := notifier.CreateSubscription()
+
+ go func() {
+ txHashes := make(chan []common.Hash, 128)
+ pendingTxSub := api.events.SubscribePendingTxs(txHashes)
+
+ for {
+ select {
+ case hashes := <-txHashes:
+ // To keep the original behaviour, send a single tx hash in one notification.
+ // TODO(rjl493456442) Send a batch of tx hashes in one notification
+ for _, h := range hashes {
+ notifier.Notify(rpcSub.ID, h)
+ }
+ case <-rpcSub.Err():
+ pendingTxSub.Unsubscribe()
+ return
+ case <-notifier.Closed():
+ pendingTxSub.Unsubscribe()
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
+// NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
+// It is part of the filter package since polling goes with eth_getFilterChanges.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter
+func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
+ var (
+ headers = make(chan *types.Header)
+ headerSub = api.events.SubscribeNewHeads(headers)
+ )
+
+ api.filtersMu.Lock()
+ api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case h := <-headers:
+ api.filtersMu.Lock()
+ if f, found := api.filters[headerSub.ID]; found {
+ f.hashes = append(f.hashes, h.Hash())
+ }
+ api.filtersMu.Unlock()
+ case <-headerSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, headerSub.ID)
+ api.filtersMu.Unlock()
+ return
+ }
+ }
+ }()
+
+ return headerSub.ID
+}
+
+// NewHeads send a notification each time a new (header) block is appended to the chain.
+func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+ }
+
+ rpcSub := notifier.CreateSubscription()
+
+ go func() {
+ headers := make(chan *types.Header)
+ headersSub := api.events.SubscribeNewHeads(headers)
+
+ for {
+ select {
+ case h := <-headers:
+ notifier.Notify(rpcSub.ID, h)
+ case <-rpcSub.Err():
+ headersSub.Unsubscribe()
+ return
+ case <-notifier.Closed():
+ headersSub.Unsubscribe()
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
+// Logs creates a subscription that fires for all new log that match the given filter criteria.
+func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+ }
+
+ var (
+ rpcSub = notifier.CreateSubscription()
+ matchedLogs = make(chan []*types.Log)
+ )
+
+ logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs)
+ if err != nil {
+ return nil, err
+ }
+
+ go func() {
+
+ for {
+ select {
+ case logs := <-matchedLogs:
+ for _, log := range logs {
+ notifier.Notify(rpcSub.ID, &log)
+ }
+ case <-rpcSub.Err(): // client send an unsubscribe request
+ logsSub.Unsubscribe()
+ return
+ case <-notifier.Closed(): // connection dropped
+ logsSub.Unsubscribe()
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
+// FilterCriteria represents a request to create a new filter.
+// Same as ethereum.FilterQuery but with UnmarshalJSON() method.
+type FilterCriteria ethereum.FilterQuery
+
+// NewFilter creates a new filter and returns the filter id. It can be
+// used to retrieve logs when the state changes. This method cannot be
+// used to fetch logs that are already stored in the state.
+//
+// Default criteria for the from and to block are "latest".
+// Using "latest" as block number will return logs for mined blocks.
+// Using "pending" as block number returns logs for not yet mined (pending) blocks.
+// In case logs are removed (chain reorg) previously returned logs are returned
+// again but with the removed property set to true.
+//
+// In case "fromBlock" > "toBlock" an error is returned.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
+func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
+ logs := make(chan []*types.Log)
+ logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), logs)
+ if err != nil {
+ return rpc.ID(""), err
+ }
+
+ api.filtersMu.Lock()
+ api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*types.Log, 0), s: logsSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case l := <-logs:
+ api.filtersMu.Lock()
+ if f, found := api.filters[logsSub.ID]; found {
+ f.logs = append(f.logs, l...)
+ }
+ api.filtersMu.Unlock()
+ case <-logsSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, logsSub.ID)
+ api.filtersMu.Unlock()
+ return
+ }
+ }
+ }()
+
+ return logsSub.ID, nil
+}
+
+// GetLogs returns logs matching the given argument that are stored within the state.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
+func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
+ var filter *Filter
+ if crit.BlockHash != nil {
+ // Block filter requested, construct a single-shot filter
+ filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics)
+ } else {
+ // Convert the RPC block numbers into internal representations
+ begin := rpc.LatestBlockNumber.Int64()
+ if crit.FromBlock != nil {
+ begin = crit.FromBlock.Int64()
+ }
+ end := rpc.LatestBlockNumber.Int64()
+ if crit.ToBlock != nil {
+ end = crit.ToBlock.Int64()
+ }
+ // Construct the range filter
+ filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics)
+ }
+ // Run the filter and return all the logs
+ logs, err := filter.Logs(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return returnLogs(logs), err
+}
+
+// UninstallFilter removes the filter with the given filter id.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter
+func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool {
+ api.filtersMu.Lock()
+ f, found := api.filters[id]
+ if found {
+ delete(api.filters, id)
+ }
+ api.filtersMu.Unlock()
+ if found {
+ f.s.Unsubscribe()
+ }
+
+ return found
+}
+
+// GetFilterLogs returns the logs for the filter with the given id.
+// If the filter could not be found an empty array of logs is returned.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
+func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Log, error) {
+ api.filtersMu.Lock()
+ f, found := api.filters[id]
+ api.filtersMu.Unlock()
+
+ if !found || f.typ != LogsSubscription {
+ return nil, fmt.Errorf("filter not found")
+ }
+
+ var filter *Filter
+ if f.crit.BlockHash != nil {
+ // Block filter requested, construct a single-shot filter
+ filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
+ } else {
+ // Convert the RPC block numbers into internal representations
+ begin := rpc.LatestBlockNumber.Int64()
+ if f.crit.FromBlock != nil {
+ begin = f.crit.FromBlock.Int64()
+ }
+ end := rpc.LatestBlockNumber.Int64()
+ if f.crit.ToBlock != nil {
+ end = f.crit.ToBlock.Int64()
+ }
+ // Construct the range filter
+ filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
+ }
+ // Run the filter and return all the logs
+ logs, err := filter.Logs(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return returnLogs(logs), nil
+}
+
+// GetFilterChanges returns the logs for the filter with the given id since
+// last time it was called. This can be used for polling.
+//
+// For pending transaction and block filters the result is []common.Hash.
+// (pending)Log filters return []Log.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
+func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
+ api.filtersMu.Lock()
+ defer api.filtersMu.Unlock()
+
+ if f, found := api.filters[id]; found {
+ if !f.deadline.Stop() {
+ // timer expired but filter is not yet removed in timeout loop
+ // receive timer value and reset timer
+ <-f.deadline.C
+ }
+ f.deadline.Reset(deadline)
+
+ switch f.typ {
+ case PendingTransactionsSubscription, BlocksSubscription:
+ hashes := f.hashes
+ f.hashes = nil
+ return returnHashes(hashes), nil
+ case LogsSubscription:
+ logs := f.logs
+ f.logs = nil
+ return returnLogs(logs), nil
+ }
+ }
+
+ return []interface{}{}, fmt.Errorf("filter not found")
+}
+
+// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
+// otherwise the given hashes array is returned.
+func returnHashes(hashes []common.Hash) []common.Hash {
+ if hashes == nil {
+ return []common.Hash{}
+ }
+ return hashes
+}
+
+// returnLogs is a helper that will return an empty log array in case the given logs array is nil,
+// otherwise the given logs array is returned.
+func returnLogs(logs []*types.Log) []*types.Log {
+ if logs == nil {
+ return []*types.Log{}
+ }
+ return logs
+}
+
+// UnmarshalJSON sets *args fields with given data.
+func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
+ type input struct {
+ BlockHash *common.Hash `json:"blockHash"`
+ FromBlock *rpc.BlockNumber `json:"fromBlock"`
+ ToBlock *rpc.BlockNumber `json:"toBlock"`
+ Addresses interface{} `json:"address"`
+ Topics []interface{} `json:"topics"`
+ }
+
+ var raw input
+ if err := json.Unmarshal(data, &raw); err != nil {
+ return err
+ }
+
+ if raw.BlockHash != nil {
+ if raw.FromBlock != nil || raw.ToBlock != nil {
+ // BlockHash is mutually exclusive with FromBlock/ToBlock criteria
+ return fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock, choose one or the other")
+ }
+ args.BlockHash = raw.BlockHash
+ } else {
+ if raw.FromBlock != nil {
+ args.FromBlock = big.NewInt(raw.FromBlock.Int64())
+ }
+
+ if raw.ToBlock != nil {
+ args.ToBlock = big.NewInt(raw.ToBlock.Int64())
+ }
+ }
+
+ args.Addresses = []common.Address{}
+
+ if raw.Addresses != nil {
+ // raw.Address can contain a single address or an array of addresses
+ switch rawAddr := raw.Addresses.(type) {
+ case []interface{}:
+ for i, addr := range rawAddr {
+ if strAddr, ok := addr.(string); ok {
+ addr, err := decodeAddress(strAddr)
+ if err != nil {
+ return fmt.Errorf("invalid address at index %d: %v", i, err)
+ }
+ args.Addresses = append(args.Addresses, addr)
+ } else {
+ return fmt.Errorf("non-string address at index %d", i)
+ }
+ }
+ case string:
+ addr, err := decodeAddress(rawAddr)
+ if err != nil {
+ return fmt.Errorf("invalid address: %v", err)
+ }
+ args.Addresses = []common.Address{addr}
+ default:
+ return errors.New("invalid addresses in query")
+ }
+ }
+
+ // topics is an array consisting of strings and/or arrays of strings.
+ // JSON null values are converted to common.Hash{} and ignored by the filter manager.
+ if len(raw.Topics) > 0 {
+ args.Topics = make([][]common.Hash, len(raw.Topics))
+ for i, t := range raw.Topics {
+ switch topic := t.(type) {
+ case nil:
+ // ignore topic when matching logs
+
+ case string:
+ // match specific topic
+ top, err := decodeTopic(topic)
+ if err != nil {
+ return err
+ }
+ args.Topics[i] = []common.Hash{top}
+
+ case []interface{}:
+ // or case e.g. [null, "topic0", "topic1"]
+ for _, rawTopic := range topic {
+ if rawTopic == nil {
+ // null component, match all
+ args.Topics[i] = nil
+ break
+ }
+ if topic, ok := rawTopic.(string); ok {
+ parsed, err := decodeTopic(topic)
+ if err != nil {
+ return err
+ }
+ args.Topics[i] = append(args.Topics[i], parsed)
+ } else {
+ return fmt.Errorf("invalid topic(s)")
+ }
+ }
+ default:
+ return fmt.Errorf("invalid topic(s)")
+ }
+ }
+ }
+
+ return nil
+}
+
+func decodeAddress(s string) (common.Address, error) {
+ b, err := hexutil.Decode(s)
+ if err == nil && len(b) != common.AddressLength {
+ err = fmt.Errorf("hex has inval