From bc43122f54ed8de21f74ee6393549c9554d732e8 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 1 Oct 2019 13:28:55 -0400 Subject: support "accepted" as block number in JSON-RPC --- coreth.go | 10 +- eth/api.go | 6 +- eth/api_backend.go | 14 +- eth/api_tracer.go | 25 +- eth/backend.go | 24 +- eth/config.go | 62 ++--- eth/filters/api.go | 576 +++++++++++++++++++++++++++++++++++++++++++ eth/filters/filter.go | 348 ++++++++++++++++++++++++++ eth/filters/filter_system.go | 512 ++++++++++++++++++++++++++++++++++++++ eth/gasprice/gasprice.go | 189 ++++++++++++++ eth/gen_config.go | 4 +- internal/ethapi/api.go | 4 +- internal/ethapi/backend.go | 8 +- params/denomination.go | 28 +++ rpc/types.go | 77 ++++++ 15 files changed, 1831 insertions(+), 56 deletions(-) create mode 100644 eth/filters/api.go create mode 100644 eth/filters/filter.go create mode 100644 eth/filters/filter_system.go create mode 100644 eth/gasprice/gasprice.go create mode 100644 params/denomination.go create mode 100644 rpc/types.go diff --git a/coreth.go b/coreth.go index 04d782f..fe9e6d6 100644 --- a/coreth.go +++ b/coreth.go @@ -29,6 +29,7 @@ type ETHChain struct { backend *eth.Ethereum cb *dummy.ConsensusCallbacks mcb *miner.MinerCallbacks + bcb *eth.BackendCallbacks } func isLocalBlock(block *types.Block) bool { @@ -52,8 +53,9 @@ func NewETHChain(config *eth.Config, nodecfg *node.Config, etherBase *common.Add } cb := new(dummy.ConsensusCallbacks) mcb := new(miner.MinerCallbacks) - backend, _ := eth.New(&ctx, config, cb, mcb, chainDB) - chain := ÐChain{backend: backend, cb: cb, mcb: mcb} + bcb := new(eth.BackendCallbacks) + backend, _ := eth.New(&ctx, config, cb, mcb, bcb, chainDB) + chain := ÐChain{backend: backend, cb: cb, mcb: mcb, bcb: bcb} if etherBase == nil { etherBase = &common.Address{ 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, @@ -125,6 +127,10 @@ func (self *ETHChain) SetOnFinalizeAndAssemble(cb dummy.OnFinalizeAndAssembleCal self.cb.OnFinalizeAndAssemble = cb } +func (self *ETHChain) SetOnQueryAcceptedBlock(cb func() *types.Block) { + self.bcb.OnQueryAcceptedBlock = cb +} + // Returns a new mutable state based on the current HEAD block. func (self *ETHChain) CurrentState() (*state.StateDB, error) { return self.backend.BlockChain().State() diff --git a/eth/api.go b/eth/api.go index 10cf01d..59750ce 100644 --- a/eth/api.go +++ b/eth/api.go @@ -28,15 +28,15 @@ import ( "strings" "time" + "github.com/ava-labs/coreth/internal/ethapi" + "github.com/ava-labs/coreth/rpc" "github.com/ava-labs/go-ethereum/common" "github.com/ava-labs/go-ethereum/common/hexutil" "github.com/ava-labs/go-ethereum/core" "github.com/ava-labs/go-ethereum/core/rawdb" "github.com/ava-labs/go-ethereum/core/state" "github.com/ava-labs/go-ethereum/core/types" - "github.com/ava-labs/coreth/internal/ethapi" "github.com/ava-labs/go-ethereum/rlp" - "github.com/ava-labs/go-ethereum/rpc" "github.com/ava-labs/go-ethereum/trie" ) @@ -271,6 +271,8 @@ func (api *PublicDebugAPI) DumpBlock(blockNr rpc.BlockNumber) (state.Dump, error var block *types.Block if blockNr == rpc.LatestBlockNumber { block = api.eth.blockchain.CurrentBlock() + } else if blockNr == rpc.AcceptedBlockNumber { + block = api.eth.AcceptedBlock() } else { block = api.eth.blockchain.GetBlockByNumber(uint64(blockNr)) } diff --git a/eth/api_backend.go b/eth/api_backend.go index f4c0101..99556c9 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -21,6 +21,8 @@ import ( "errors" "math/big" + "github.com/ava-labs/coreth/eth/gasprice" + "github.com/ava-labs/coreth/rpc" "github.com/ava-labs/go-ethereum/accounts" "github.com/ava-labs/go-ethereum/common" "github.com/ava-labs/go-ethereum/common/math" @@ -31,11 +33,9 @@ import ( "github.com/ava-labs/go-ethereum/core/types" "github.com/ava-labs/go-ethereum/core/vm" "github.com/ava-labs/go-ethereum/eth/downloader" - "github.com/ava-labs/go-ethereum/eth/gasprice" "github.com/ava-labs/go-ethereum/ethdb" "github.com/ava-labs/go-ethereum/event" "github.com/ava-labs/go-ethereum/params" - "github.com/ava-labs/go-ethereum/rpc" ) // EthAPIBackend implements ethapi.Backend for full nodes @@ -54,6 +54,10 @@ func (b *EthAPIBackend) CurrentBlock() *types.Block { return b.eth.blockchain.CurrentBlock() } +func (b *EthAPIBackend) AcceptedBlock() *types.Block { + return b.eth.AcceptedBlock() +} + func (b *EthAPIBackend) SetHead(number uint64) { b.eth.protocolManager.downloader.Cancel() b.eth.blockchain.SetHead(number) @@ -69,6 +73,9 @@ func (b *EthAPIBackend) HeaderByNumber(ctx context.Context, number rpc.BlockNumb if number == rpc.LatestBlockNumber { return b.eth.blockchain.CurrentBlock().Header(), nil } + if number == rpc.AcceptedBlockNumber { + return b.eth.AcceptedBlock().Header(), nil + } return b.eth.blockchain.GetHeaderByNumber(uint64(number)), nil } @@ -86,6 +93,9 @@ func (b *EthAPIBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumbe if number == rpc.LatestBlockNumber { return b.eth.blockchain.CurrentBlock(), nil } + if number == rpc.AcceptedBlockNumber { + return b.eth.AcceptedBlock(), nil + } return b.eth.blockchain.GetBlockByNumber(uint64(number)), nil } diff --git a/eth/api_tracer.go b/eth/api_tracer.go index 618e7c8..b87a9fe 100644 --- a/eth/api_tracer.go +++ b/eth/api_tracer.go @@ -28,6 +28,8 @@ import ( "sync" "time" + "github.com/ava-labs/coreth/internal/ethapi" + myrpc "github.com/ava-labs/coreth/rpc" "github.com/ava-labs/go-ethereum/common" "github.com/ava-labs/go-ethereum/common/hexutil" "github.com/ava-labs/go-ethereum/core" @@ -36,7 +38,6 @@ import ( "github.com/ava-labs/go-ethereum/core/types" "github.com/ava-labs/go-ethereum/core/vm" "github.com/ava-labs/go-ethereum/eth/tracers" - "github.com/ava-labs/coreth/internal/ethapi" "github.com/ava-labs/go-ethereum/log" "github.com/ava-labs/go-ethereum/rlp" "github.com/ava-labs/go-ethereum/rpc" @@ -101,23 +102,27 @@ type txTraceTask struct { // TraceChain returns the structured logs created during the execution of EVM // between two blocks (excluding start) and returns them as a JSON object. -func (api *PrivateDebugAPI) TraceChain(ctx context.Context, start, end rpc.BlockNumber, config *TraceConfig) (*rpc.Subscription, error) { +func (api *PrivateDebugAPI) TraceChain(ctx context.Context, start, end myrpc.BlockNumber, config *TraceConfig) (*rpc.Subscription, error) { // Fetch the block interval that we want to trace var from, to *types.Block switch start { - case rpc.PendingBlockNumber: + case myrpc.PendingBlockNumber: from = api.eth.miner.PendingBlock() - case rpc.LatestBlockNumber: + case myrpc.LatestBlockNumber: from = api.eth.blockchain.CurrentBlock() + case myrpc.AcceptedBlockNumber: + from = api.eth.AcceptedBlock() default: from = api.eth.blockchain.GetBlockByNumber(uint64(start)) } switch end { - case rpc.PendingBlockNumber: + case myrpc.PendingBlockNumber: to = api.eth.miner.PendingBlock() - case rpc.LatestBlockNumber: + case myrpc.LatestBlockNumber: to = api.eth.blockchain.CurrentBlock() + case myrpc.AcceptedBlockNumber: + from = api.eth.AcceptedBlock() default: to = api.eth.blockchain.GetBlockByNumber(uint64(end)) } @@ -352,15 +357,17 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl // TraceBlockByNumber returns the structured logs created during the execution of // EVM and returns them as a JSON object. -func (api *PrivateDebugAPI) TraceBlockByNumber(ctx context.Context, number rpc.BlockNumber, config *TraceConfig) ([]*txTraceResult, error) { +func (api *PrivateDebugAPI) TraceBlockByNumber(ctx context.Context, number myrpc.BlockNumber, config *TraceConfig) ([]*txTraceResult, error) { // Fetch the block that we want to trace var block *types.Block switch number { - case rpc.PendingBlockNumber: + case myrpc.PendingBlockNumber: block = api.eth.miner.PendingBlock() - case rpc.LatestBlockNumber: + case myrpc.LatestBlockNumber: block = api.eth.blockchain.CurrentBlock() + case myrpc.AcceptedBlockNumber: + block = api.eth.AcceptedBlock() default: block = api.eth.blockchain.GetBlockByNumber(uint64(number)) } diff --git a/eth/backend.go b/eth/backend.go index d119e89..95d4c24 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -27,6 +27,8 @@ import ( "github.com/ava-labs/coreth/consensus/dummy" mycore "github.com/ava-labs/coreth/core" + "github.com/ava-labs/coreth/eth/filters" + "github.com/ava-labs/coreth/eth/gasprice" "github.com/ava-labs/coreth/internal/ethapi" "github.com/ava-labs/coreth/miner" "github.com/ava-labs/coreth/node" @@ -44,8 +46,6 @@ import ( "github.com/ava-labs/go-ethereum/core/types" "github.com/ava-labs/go-ethereum/core/vm" "github.com/ava-labs/go-ethereum/eth/downloader" - "github.com/ava-labs/go-ethereum/eth/filters" - "github.com/ava-labs/go-ethereum/eth/gasprice" "github.com/ava-labs/go-ethereum/ethdb" "github.com/ava-labs/go-ethereum/event" "github.com/ava-labs/go-ethereum/log" @@ -65,6 +65,10 @@ type LesServer interface { SetContractBackend(bind.ContractBackend) } +type BackendCallbacks struct { + OnQueryAcceptedBlock func() *types.Block +} + // Ethereum implements the Ethereum full node service. type Ethereum struct { config *Config @@ -102,6 +106,7 @@ type Ethereum struct { lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase) txSubmitChan chan struct{} + bcb *BackendCallbacks } func (s *Ethereum) AddLesServer(ls LesServer) { @@ -119,7 +124,11 @@ func (s *Ethereum) SetContractBackend(backend bind.ContractBackend) { // New creates a new Ethereum object (including the // initialisation of the common Ethereum object) -func New(ctx *node.ServiceContext, config *Config, cb *dummy.ConsensusCallbacks, mcb *miner.MinerCallbacks, chainDb ethdb.Database) (*Ethereum, error) { +func New(ctx *node.ServiceContext, config *Config, + cb *dummy.ConsensusCallbacks, + mcb *miner.MinerCallbacks, + bcb *BackendCallbacks, + chainDb ethdb.Database) (*Ethereum, error) { // Ensure configuration values are compatible and sane if config.SyncMode == downloader.LightSync { return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum") @@ -164,6 +173,7 @@ func New(ctx *node.ServiceContext, config *Config, cb *dummy.ConsensusCallbacks, bloomRequests: make(chan chan *bloombits.Retrieval), bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), txSubmitChan: make(chan struct{}, 1), + bcb: bcb, } bcVersion := rawdb.ReadDatabaseVersion(chainDb) @@ -557,3 +567,11 @@ func (s *Ethereum) StopPart() error { func (s *Ethereum) GetTxSubmitCh() <-chan struct{} { return s.txSubmitChan } + +func (s *Ethereum) AcceptedBlock() *types.Block { + cb := s.bcb.OnQueryAcceptedBlock + if cb != nil { + return cb() + } + return s.blockchain.CurrentBlock() +} diff --git a/eth/config.go b/eth/config.go index a84f5af..8aee0eb 100644 --- a/eth/config.go +++ b/eth/config.go @@ -24,13 +24,13 @@ import ( "runtime" "time" - "github.com/ava-labs/go-ethereum/common/hexutil" + "github.com/ava-labs/coreth/eth/gasprice" + "github.com/ava-labs/coreth/miner" "github.com/ava-labs/go-ethereum/common" + "github.com/ava-labs/go-ethereum/common/hexutil" "github.com/ava-labs/go-ethereum/consensus/ethash" "github.com/ava-labs/go-ethereum/core" "github.com/ava-labs/go-ethereum/eth/downloader" - "github.com/ava-labs/go-ethereum/eth/gasprice" - "github.com/ava-labs/coreth/miner" "github.com/ava-labs/go-ethereum/params" ) @@ -158,32 +158,32 @@ type Config struct { } func MyDefaultConfig() Config { - config := DefaultConfig - chainConfig := ¶ms.ChainConfig { - ChainID: big.NewInt(42222), - HomesteadBlock: big.NewInt(0), - DAOForkBlock: big.NewInt(0), - DAOForkSupport: true, - EIP150Block: big.NewInt(0), - EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"), - EIP155Block: big.NewInt(0), - EIP158Block: big.NewInt(0), - ByzantiumBlock: big.NewInt(0), - ConstantinopleBlock: big.NewInt(0), - PetersburgBlock: big.NewInt(0), - IstanbulBlock: nil, - Ethash: nil, - } - genBalance := big.NewInt(1000000000000000000) - - config.Genesis = &core.Genesis{ - Config: chainConfig, - Nonce: 0, - Number: 0, - ExtraData: hexutil.MustDecode("0x00"), - GasLimit: 100000000, - Difficulty: big.NewInt(0), - Alloc: core.GenesisAlloc{ common.Address{}: { Balance: genBalance }}, - } - return config + config := DefaultConfig + chainConfig := ¶ms.ChainConfig{ + ChainID: big.NewInt(42222), + HomesteadBlock: big.NewInt(0), + DAOForkBlock: big.NewInt(0), + DAOForkSupport: true, + EIP150Block: big.NewInt(0), + EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"), + EIP155Block: big.NewInt(0), + EIP158Block: big.NewInt(0), + ByzantiumBlock: big.NewInt(0), + ConstantinopleBlock: big.NewInt(0), + PetersburgBlock: big.NewInt(0), + IstanbulBlock: nil, + Ethash: nil, + } + genBalance := big.NewInt(1000000000000000000) + + config.Genesis = &core.Genesis{ + Config: chainConfig, + Nonce: 0, + Number: 0, + ExtraData: hexutil.MustDecode("0x00"), + GasLimit: 100000000, + Difficulty: big.NewInt(0), + Alloc: core.GenesisAlloc{common.Address{}: {Balance: genBalance}}, + } + return config } diff --git a/eth/filters/api.go b/eth/filters/api.go new file mode 100644 index 0000000..9d9e757 --- /dev/null +++ b/eth/filters/api.go @@ -0,0 +1,576 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package filters + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/big" + "sync" + "time" + + ethereum "github.com/ava-labs/go-ethereum" + "github.com/ava-labs/go-ethereum/common" + "github.com/ava-labs/go-ethereum/common/hexutil" + "github.com/ava-labs/go-ethereum/core/types" + "github.com/ava-labs/go-ethereum/ethdb" + "github.com/ava-labs/go-ethereum/event" + "github.com/ava-labs/go-ethereum/rpc" +) + +var ( + deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline +) + +// filter is a helper struct that holds meta information over the filter type +// and associated subscription in the event system. +type filter struct { + typ Type + deadline *time.Timer // filter is inactiv when deadline triggers + hashes []common.Hash + crit FilterCriteria + logs []*types.Log + s *Subscription // associated subscription in event system +} + +// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various +// information related to the Ethereum protocol such als blocks, transactions and logs. +type PublicFilterAPI struct { + backend Backend + mux *event.TypeMux + quit chan struct{} + chainDb ethdb.Database + events *EventSystem + filtersMu sync.Mutex + filters map[rpc.ID]*filter +} + +// NewPublicFilterAPI returns a new PublicFilterAPI instance. +func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { + api := &PublicFilterAPI{ + backend: backend, + mux: backend.EventMux(), + chainDb: backend.ChainDb(), + events: NewEventSystem(backend.EventMux(), backend, lightMode), + filters: make(map[rpc.ID]*filter), + } + go api.timeoutLoop() + + return api +} + +// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used. +// Tt is started when the api is created. +func (api *PublicFilterAPI) timeoutLoop() { + ticker := time.NewTicker(5 * time.Minute) + for { + <-ticker.C + api.filtersMu.Lock() + for id, f := range api.filters { + select { + case <-f.deadline.C: + f.s.Unsubscribe() + delete(api.filters, id) + default: + continue + } + } + api.filtersMu.Unlock() + } +} + +// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes +// as transactions enter the pending state. +// +// It is part of the filter package because this filter can be used through the +// `eth_getFilterChanges` polling method that is also used for log filters. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter +func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { + var ( + pendingTxs = make(chan []common.Hash) + pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) + ) + + api.filtersMu.Lock() + api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case ph := <-pendingTxs: + api.filtersMu.Lock() + if f, found := api.filters[pendingTxSub.ID]; found { + f.hashes = append(f.hashes, ph...) + } + api.filtersMu.Unlock() + case <-pendingTxSub.Err(): + api.filtersMu.Lock() + delete(api.filters, pendingTxSub.ID) + api.filtersMu.Unlock() + return + } + } + }() + + return pendingTxSub.ID +} + +// NewPendingTransactions creates a subscription that is triggered each time a transaction +// enters the transaction pool and was signed from one of the transactions this nodes manages. +func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + txHashes := make(chan []common.Hash, 128) + pendingTxSub := api.events.SubscribePendingTxs(txHashes) + + for { + select { + case hashes := <-txHashes: + // To keep the original behaviour, send a single tx hash in one notification. + // TODO(rjl493456442) Send a batch of tx hashes in one notification + for _, h := range hashes { + notifier.Notify(rpcSub.ID, h) + } + case <-rpcSub.Err(): + pendingTxSub.Unsubscribe() + return + case <-notifier.Closed(): + pendingTxSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + +// NewBlockFilter creates a filter that fetches blocks that are imported into the chain. +// It is part of the filter package since polling goes with eth_getFilterChanges. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter +func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { + var ( + headers = make(chan *types.Header) + headerSub = api.events.SubscribeNewHeads(headers) + ) + + api.filtersMu.Lock() + api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case h := <-headers: + api.filtersMu.Lock() + if f, found := api.filters[headerSub.ID]; found { + f.hashes = append(f.hashes, h.Hash()) + } + api.filtersMu.Unlock() + case <-headerSub.Err(): + api.filtersMu.Lock() + delete(api.filters, headerSub.ID) + api.filtersMu.Unlock() + return + } + } + }() + + return headerSub.ID +} + +// NewHeads send a notification each time a new (header) block is appended to the chain. +func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + headers := make(chan *types.Header) + headersSub := api.events.SubscribeNewHeads(headers) + + for { + select { + case h := <-headers: + notifier.Notify(rpcSub.ID, h) + case <-rpcSub.Err(): + headersSub.Unsubscribe() + return + case <-notifier.Closed(): + headersSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + +// Logs creates a subscription that fires for all new log that match the given filter criteria. +func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + var ( + rpcSub = notifier.CreateSubscription() + matchedLogs = make(chan []*types.Log) + ) + + logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs) + if err != nil { + return nil, err + } + + go func() { + + for { + select { + case logs := <-matchedLogs: + for _, log := range logs { + notifier.Notify(rpcSub.ID, &log) + } + case <-rpcSub.Err(): // client send an unsubscribe request + logsSub.Unsubscribe() + return + case <-notifier.Closed(): // connection dropped + logsSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + +// FilterCriteria represents a request to create a new filter. +// Same as ethereum.FilterQuery but with UnmarshalJSON() method. +type FilterCriteria ethereum.FilterQuery + +// NewFilter creates a new filter and returns the filter id. It can be +// used to retrieve logs when the state changes. This method cannot be +// used to fetch logs that are already stored in the state. +// +// Default criteria for the from and to block are "latest". +// Using "latest" as block number will return logs for mined blocks. +// Using "pending" as block number returns logs for not yet mined (pending) blocks. +// In case logs are removed (chain reorg) previously returned logs are returned +// again but with the removed property set to true. +// +// In case "fromBlock" > "toBlock" an error is returned. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter +func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { + logs := make(chan []*types.Log) + logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), logs) + if err != nil { + return rpc.ID(""), err + } + + api.filtersMu.Lock() + api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*types.Log, 0), s: logsSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case l := <-logs: + api.filtersMu.Lock() + if f, found := api.filters[logsSub.ID]; found { + f.logs = append(f.logs, l...) + } + api.filtersMu.Unlock() + case <-logsSub.Err(): + api.filtersMu.Lock() + delete(api.filters, logsSub.ID) + api.filtersMu.Unlock() + return + } + } + }() + + return logsSub.ID, nil +} + +// GetLogs returns logs matching the given argument that are stored within the state. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs +func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) { + var filter *Filter + if crit.BlockHash != nil { + // Block filter requested, construct a single-shot filter + filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics) + } else { + // Convert the RPC block numbers into internal representations + begin := rpc.LatestBlockNumber.Int64() + if crit.FromBlock != nil { + begin = crit.FromBlock.Int64() + } + end := rpc.LatestBlockNumber.Int64() + if crit.ToBlock != nil { + end = crit.ToBlock.Int64() + } + // Construct the range filter + filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics) + } + // Run the filter and return all the logs + logs, err := filter.Logs(ctx) + if err != nil { + return nil, err + } + return returnLogs(logs), err +} + +// UninstallFilter removes the filter with the given filter id. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter +func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool { + api.filtersMu.Lock() + f, found := api.filters[id] + if found { + delete(api.filters, id) + } + api.filtersMu.Unlock() + if found { + f.s.Unsubscribe() + } + + return found +} + +// GetFilterLogs returns the logs for the filter with the given id. +// If the filter could not be found an empty array of logs is returned. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs +func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Log, error) { + api.filtersMu.Lock() + f, found := api.filters[id] + api.filtersMu.Unlock() + + if !found || f.typ != LogsSubscription { + return nil, fmt.Errorf("filter not found") + } + + var filter *Filter + if f.crit.BlockHash != nil { + // Block filter requested, construct a single-shot filter + filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics) + } else { + // Convert the RPC block numbers into internal representations + begin := rpc.LatestBlockNumber.Int64() + if f.crit.FromBlock != nil { + begin = f.crit.FromBlock.Int64() + } + end := rpc.LatestBlockNumber.Int64() + if f.crit.ToBlock != nil { + end = f.crit.ToBlock.Int64() + } + // Construct the range filter + filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics) + } + // Run the filter and return all the logs + logs, err := filter.Logs(ctx) + if err != nil { + return nil, err + } + return returnLogs(logs), nil +} + +// GetFilterChanges returns the logs for the filter with the given id since +// last time it was called. This can be used for polling. +// +// For pending transaction and block filters the result is []common.Hash. +// (pending)Log filters return []Log. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges +func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { + api.filtersMu.Lock() + defer api.filtersMu.Unlock() + + if f, found := api.filters[id]; found { + if !f.deadline.Stop() { + // timer expired but filter is not yet removed in timeout loop + // receive timer value and reset timer + <-f.deadline.C + } + f.deadline.Reset(deadline) + + switch f.typ { + case PendingTransactionsSubscription, BlocksSubscription: + hashes := f.hashes + f.hashes = nil + return returnHashes(hashes), nil + case LogsSubscription: + logs := f.logs + f.logs = nil + return returnLogs(logs), nil + } + } + + return []interface{}{}, fmt.Errorf("filter not found") +} + +// returnHashes is a helper that will return an empty hash array case the given hash array is nil, +// otherwise the given hashes array is returned. +func returnHashes(hashes []common.Hash) []common.Hash { + if hashes == nil { + return []common.Hash{} + } + return hashes +} + +// returnLogs is a helper that will return an empty log array in case the given logs array is nil, +// otherwise the given logs array is returned. +func returnLogs(logs []*types.Log) []*types.Log { + if logs == nil { + return []*types.Log{} + } + return logs +} + +// UnmarshalJSON sets *args fields with given data. +func (args *FilterCriteria) UnmarshalJSON(data []byte) error { + type input struct { + BlockHash *common.Hash `json:"blockHash"` + FromBlock *rpc.BlockNumber `json:"fromBlock"` + ToBlock *rpc.BlockNumber `json:"toBlock"` + Addresses interface{} `json:"address"` + Topics []interface{} `json:"topics"` + } + + var raw input + if err := json.Unmarshal(data, &raw); err != nil { + return err + } + + if raw.BlockHash != nil { + if raw.FromBlock != nil || raw.ToBlock != nil { + // BlockHash is mutually exclusive with FromBlock/ToBlock criteria + return fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock, choose one or the other") + } + args.BlockHash = raw.BlockHash + } else { + if raw.FromBlock != nil { + args.FromBlock = big.NewInt(raw.FromBlock.Int64()) + } + + if raw.ToBlock != nil { + args.ToBlock = big.NewInt(raw.ToBlock.Int64()) + } + } + + args.Addresses = []common.Address{} + + if raw.Addresses != nil { + // raw.Address can contain a single address or an array of addresses + switch rawAddr := raw.Addresses.(type) { + case []interface{}: + for i, addr := range rawAddr { + if strAddr, ok := addr.(string); ok { + addr, err := decodeAddress(strAddr) + if err != nil { + return fmt.Errorf("invalid address at index %d: %v", i, err) + } + args.Addresses = append(args.Addresses, addr) + } else { + return fmt.Errorf("non-string address at index %d", i) + } + } + case string: + addr, err := decodeAddress(rawAddr) + if err != nil { + return fmt.Errorf("invalid address: %v", err) + } + args.Addresses = []common.Address{addr} + default: + return errors.New("invalid addresses in query") + } + } + + // topics is an array consisting of strings and/or arrays of strings. + // JSON null values are converted to common.Hash{} and ignored by the filter manager. + if len(raw.Topics) > 0 { + args.Topics = make([][]common.Hash, len(raw.Topics)) + for i, t := range raw.Topics { + switch topic := t.(type) { + case nil: + // ignore topic when matching logs + + case string: + // match specific topic + top, err := decodeTopic(topic) + if err != nil { + return err + } + args.Topics[i] = []common.Hash{top} + + case []interface{}: + // or case e.g. [null, "topic0", "topic1"] + for _, rawTopic := range topic { + if rawTopic == nil { + // null component, match all + args.Topics[i] = nil + break + } + if topic, ok := rawTopic.(string); ok { + parsed, err := decodeTopic(topic) + if err != nil { + return err + } + args.Topics[i] = append(args.Topics[i], parsed) + } else { + return fmt.Errorf("invalid topic(s)") + } + } + default: + return fmt.Errorf("invalid topic(s)") + } + } + } + + return nil +} + +func decodeAddress(s string) (common.Address, error) { + b, err := hexutil.Decode(s) + if err == nil && len(b) != common.AddressLength { + err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for address", len(b), common.AddressLength) + } + return common.BytesToAddress(b), err +} + +func decodeTopic(s string) (common.Hash, error) { + b, err := hexutil.Decode(s) + if err == nil && len(b) != common.HashLength { + err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for topic", len(b), common.HashLength) + } + return common.BytesToHash(b), err +} diff --git a/eth/filters/filter.go b/eth/filters/filter.go new file mode 100644 index 0000000..fa3d985 --- /dev/null +++ b/eth/filters/filter.go @@ -0,0 +1,348 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package filters + +import ( + "context" + "errors" + "math/big" + + "github.com/ava-labs/coreth/rpc" + "github.com/ava-labs/go-ethereum/common" + "github.com/ava-labs/go-ethereum/core" + "github.com/ava-labs/go-ethereum/core/bloombits" + "github.com/ava-labs/go-ethereum/core/types" + "github.com/ava-labs/go-ethereum/ethdb" + "github.com/ava-labs/go-ethereum/event" +) + +type Backend interface { + ChainDb() ethdb.Database + EventMux() *event.TypeMux + HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) + HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) + GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) + GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) + + SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription + SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription + SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription + SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription + + BloomStatus() (uint64, uint64) + ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) +} + +// Filter can be used to retrieve and filter logs. +type Filter struct { + backend Backend + + db ethdb.Database + addresses []common.Address + topics [][]common.Hash + + block common.Hash // Block hash if filtering a single block + begin, end int64 // Range interval if filtering multiple blocks + + matcher *bloombits.Matcher +} + +// NewRangeFilter creates a new filter which uses a bloom filter on blocks to +// figure out whether a particular block is interesting or not. +func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { + // Flatten the address and topic filter clauses into a single bloombits filter + // system. Since the bloombits are not positional, nil topics are permitted, + // which get flattened into a nil byte slice. + var filters [][][]byte + if len(addresses) > 0 { + filter := make([][]byte, len(addresses)) + for i, address := range addresses { + filter[i] = address.Bytes() + } + filters = append(filters, filter) + } + for _, topicList := range topics { + filter := make([][]byte, len(topicList)) + for i, topic := range topicList { + filter[i] = topic.Bytes() + } + filters = append(filters, filter) + } + size, _ := backend.BloomStatus() + + // Create a generic filter and convert it into a range filter + filter := newFilter(backend, addresses, topics) + + filter.matcher = bloombits.NewMatcher(size, filters) + filter.begin = begin + filter.end = end + + return filter +} + +// NewBlockFilter creates a new filter which directly inspects the contents of +// a block to figure out whether it is interesting or not. +func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter { + // Create a generic filter and convert it into a block filter + filter := newFilter(backend, addresses, topics) + filter.block = block + return filter +} + +// newFilter creates a generic filter that can either filter based on a block hash, +// or based on range queries. The search criteria needs to be explicitly set. +func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash) *Filter { + return &Filter{ + backend: backend, + addresses: addresses, + topics: topics, + db: backend.ChainDb(), + } +} + +// Logs searches the blockchain for matching log entries, returning all from the +// first block that contains matches, updating the start of the filter accordingly. +func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { + // If we're doing singleton block filtering, execute and return + if f.block != (common.Hash{}) { + header, err := f.backend.HeaderByHash(ctx, f.block) + if err != nil { + return nil, err + } + if header == nil { + return nil, errors.New("unknown block") + } + return f.blockLogs(ctx, header) + } + // Figure out the limits of the filter range + header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) + if header == nil { + return nil, nil + } + head := header.Number.Uint64() + + if f.begin == -1 { + f.begin = int64(head) + } + end := uint64(f.end) + if f.end == -1 { + end = head + } + // Gather all indexed logs, and finish with non indexed ones + var ( + logs []*types.Log + err error + ) + size, sections := f.backend.BloomStatus() + if indexed := sections * size; indexed > uint64(f.begin) { + if indexed > end { + logs, err = f.indexedLogs(ctx, end) + } else { + logs, err = f.indexedLogs(ctx, indexed-1) + } + if err != nil { + return logs, err + } + } + rest, err := f.unindexedLogs(ctx, end) + logs = append(logs, rest...) + return logs, err +} + +// indexedLogs returns the logs matching the filter criteria based on the bloom +// bits indexed available locally or via the network. +func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { + // Create a matcher session and request servicing from the backend + matches := make(chan uint64, 64) + + session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches) + if err != nil { + return nil, err + } + defer session.Close() + + f.backend.ServiceFilter(ctx, session) + + // Iterate over the matches until exhausted or context closed + var logs []*types.Log + + for { + select { + case number, ok := <-matches: + // Abort if all matches have been fulfilled + if !ok { + err := session.Error() + if err == nil { + f.begin = int64(end) + 1 + } + return logs, err + } + f.begin = int64(number) + 1 + + // Retrieve the suggested block and pull any truly matching logs + header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) + if header == nil || err != nil { + return logs, err + } + found, err := f.checkMatches(ctx, header) + if err != nil { + return logs, err + } + logs = append(logs, found...) + + case <-ctx.Done(): + return logs, ctx.Err() + } + } +} + +// indexedLogs returns the logs matching the filter criteria based on raw block +// iteration and bloom matching. +func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { + var logs []*types.Log + + for ; f.begin <= int64(end); f.begin++ { + header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) + if header == nil || err != nil { + return logs, err + } + found, err := f.blockLogs(ctx, header) + if err != nil { + return logs, err + } + logs = append(logs, found...) + } + return logs, nil +} + +// blockLogs returns the logs matching the filter criteria within a single block. +func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { + if bloomFilter(header.Bloom, f.addresses, f.topics) { + found, err := f.checkMatches(ctx, header) + if err != nil { + return logs, err + } + logs = append(logs, found...) + } + return logs, nil +} + +// checkMatches checks if the receipts belonging to the given header contain any log events that +// match the filter criteria. This function is called when the bloom filter signals a potential match. +func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { + // Get the logs of the block + logsList, err := f.backend.GetLogs(ctx, header.Hash()) + if err != nil { + return nil, err + } + var unfiltered []*types.Log + for _, logs := range logsList { + unfiltered = append(unfiltered, logs...) + } + logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) + if len(logs) > 0 { + // We have matching logs, check if we need to resolve full logs via the light client + if logs[0].TxHash == (common.Hash{}) { + receipts, err := f.backend.GetReceipts(ctx, header.Hash()) + if err != nil { + return nil, err + } + unfiltered = unfiltered[:0] + for _, receipt := range receipts { + unfiltered = append(unfiltered, receipt.Logs...) + } + logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) + } + return logs, nil + } + return nil, nil +} + +func includes(addresses []common.Address, a common.Address) bool { + for _, addr := range addresses { + if addr == a { + return true + } + } + + return false +} + +// filterLogs creates a slice of logs matching the given criteria. +func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log { + var ret []*types.Log +Logs: + for _, log := range logs { + if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber { + continue + } + if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber { + continue + } + + if len(addresses) > 0 && !includes(addresses, log.Address) { + continue + } + // If the to filtered topics is greater than the amount of topics in logs, skip. + if len(topics) > len(log.Topics) { + continue Logs + } + for i, sub := range topics { + match := len(sub) == 0 // empty rule set == wildcard + for _, topic := range sub { + if log.Topics[i] == topic { + match = true + break + } + } + if !match { + continue Logs + } + } + ret = append(ret, log) + } + return ret +} + +func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool { + if len(addresses) > 0 { + var included bool + for _, addr := range addresses { + if types.BloomLookup(bloom, addr) { + included = true + break + } + } + if !included { + return false + } + } + + for _, sub := range topics { + included := len(sub) == 0 // empty rule set == wildcard + for _, topic := range sub { + if types.BloomLookup(bloom, topic) { + included = true + break + } + } + if !included { + return false + } + } + return true +} diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go new file mode 100644 index 0000000..a27859f --- /dev/null +++ b/eth/filters/filter_system.go @@ -0,0 +1,512 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package filters implements an ethereum filtering system for block, +// transactions and log events. +package filters + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + myrpc "github.com/ava-labs/coreth/rpc" + ethereum "github.com/ava-labs/go-ethereum" + "github.com/ava-labs/go-ethereum/common" + "github.com/ava-labs/go-ethereum/core" + "github.com/ava-labs/go-ethereum/core/rawdb" + "github.com/ava-labs/go-ethereum/core/types" + "github.com/ava-labs/go-ethereum/event" + "github.com/ava-labs/go-ethereum/log" + "github.com/ava-labs/go-ethereum/rpc" +) + +// Type determines the kind of filter and is used to put the filter in to +// the correct bucket when added. +type Type byte + +const ( + // UnknownSubscription indicates an unknown subscription type + UnknownSubscription Type = iota + // LogsSubscription queries for new or removed (chain reorg) logs + LogsSubscription + // PendingLogsSubscription queries for logs in pending blocks + PendingLogsSubscription + // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks. + MinedAndPendingLogsSubscription + // PendingTransactionsSubscription queries tx hashes for pending + // transactions entering the pending state + PendingTransactionsSubscription + // BlocksSubscription queries hashes for blocks that are imported + BlocksSubscription + // LastSubscription keeps track of the last index + LastIndexSubscription +) + +const ( + + // txChanSize is the size of channel listening to NewTxsEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + // rmLogsChanSize is the size of channel listening to RemovedLogsEvent. + rmLogsChanSize = 10 + // logsChanSize is the size of channel listening to LogsEvent. + logsChanSize = 10 + // chainEvChanSize is the size of channel listening to ChainEvent. + chainEvChanSize = 10 +) + +var ( + ErrInvalidSubscriptionID = errors.New("invalid id") +) + +type subscription struct { + id rpc.ID + typ Type + created time.Time + logsCrit ethereum.FilterQuery + logs chan []*types.Log + hashes chan []common.Hash + headers chan *types.Header + installed chan struct{} // closed when the filter is installed + err chan error // closed when the filter is uninstalled +} + +// EventSystem creates subscriptions, processes events and broadcasts them to the +// subscription which match the subscription criteria. +type EventSystem struct { + mux *event.TypeMux + backend Backend + lightMode bool + lastHead *types.Header + + // Subscriptions + txsSub event.Subscription // Subscription for new transaction event + logsSub event.Subscription // Subscription for new log event + rmLogsSub event.Subscription // Subscription for removed log event + chainSub event.Subscription // Subscription for new chain event + pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event + + // Channels + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txsCh chan core.NewTxsEvent // Channel to receive new transactions event + logsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event +} + +// NewEventSystem creates a new manager that listens for event on the given mux, +// parses and filters them. It uses the all map to retrieve filter changes. The +// work loop holds its own index that is used to forward events to filters. +// +// The returned manager has a loop that needs to be stopped with the Stop function +// or by stopping the given mux. +func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem { + m := &EventSystem{ + mux: mux, + backend: backend, + lightMode: lightMode, + install: make(chan *subscription), + uninstall: make(chan *subscription), + txsCh: make(chan core.NewTxsEvent, txChanSize), + logsCh: make(chan []*types.Log, logsChanSize), + rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), + chainCh: make(chan core.ChainEvent, chainEvChanSize), + } + + // Subscribe events + m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh) + m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) + m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) + m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) + // TODO(rjl493456442): use feed to subscribe pending log event + m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) + + // Make sure none of the subscriptions are empty + if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || + m.pendingLogSub.Closed() { + log.Crit("Subscribe for event system failed") + } + + go m.eventLoop() + return m +} + +// Subscription is created when the client registers itself for a particular event. +type Subscription struct { + ID rpc.ID + f *subscription + es *EventSystem + unsubOnce sync.Once +} + +// Err returns a channel that is closed when unsubscribed. +func (sub *Subscription) Err() <-chan error { + return sub.f.err +} + +// Unsubscribe uninstalls the subscription from the event broadcast loop. +func (sub *Subscription) Unsubscribe() { + sub.unsubOnce.Do(func() { + uninstallLoop: + for { + // write uninstall request and consume logs/hashes. This prevents + // the eventLoop broadcast method to deadlock when writing to the + // filter event channel while the subscription loop is waiting for + // this method to return (and thus not reading these events). + select { + case sub.es.uninstall <- sub.f: + break uninstallLoop + case <-sub.f.logs: + case <-sub.f.hashes: + case <-sub.f.headers: + } + } + + // wait for filter to be uninstalled in work loop before returning + // this ensures that the manager won't use the event channel which + // will probably be closed by the client asap after this method returns. + <-sub.Err() + }) +} + +// subscribe installs the subscription in the event broadcast loop. +func (es *EventSystem) subscribe(sub *subscription) *Subscription { + es.install <- sub + <-sub.installed + return &Subscription{ID: sub.id, f: sub, es: es} +} + +// SubscribeLogs creates a subscription that will write all logs matching the +// given criteria to the given logs channel. Default value for the from and to +// block is "latest". If the fromBlock > toBlock an error is returned. +func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) (*Subscription, error) { + var from, to myrpc.BlockNumber + if crit.FromBlock == nil { + from = myrpc.LatestBlockNumber + } else { + from = myrpc.BlockNumber(crit.FromBlock.Int64()) + } + if crit.ToBlock == nil { + to = myrpc.LatestBlockNumber + } else { + to = myrpc.BlockNumber(crit.ToBlock.Int64()) + } + + // only interested in pending logs + if from == myrpc.PendingBlockNumber && to == myrpc.PendingBlockNumber { + return es.subscribePendingLogs(crit, logs), nil + } + // only interested in new mined logs + if from == myrpc.LatestBlockNumber && to == myrpc.LatestBlockNumber { + return es.subscribeLogs(crit, logs), nil + } + // only interested in mined logs within a specific block range + if from >= 0 && to >= 0 && to >= from { + return es.subscribeLogs(crit, logs), nil + } + // interested in mined logs from a specific block number, new logs and pending logs + if from >= myrpc.LatestBlockNumber && to == myrpc.PendingBlockNumber { + return es.subscribeMinedPendingLogs(crit, logs), nil + } + // interested in logs from a specific block number to new mined blocks + if from >= 0 && to == myrpc.LatestBlockNumber { + return es.subscribeLogs(crit, logs), nil + } + return nil, fmt.Errorf("invalid from and to block combination: from > to") +} + +// subscribeMinedPendingLogs creates a subscription that returned mined and +// pending logs that match the given criteria. +func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: MinedAndPendingLogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// subscribeLogs creates a subscription that will write all logs matching the +// given criteria to the given logs channel. +func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: LogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// subscribePendingLogs creates a subscription that writes transaction hashes for +// transactions that enter the transaction pool. +func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: PendingLogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// SubscribeNewHeads creates a subscription that writes the header of a block that is +// imported in the chain. +func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: BlocksSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + headers: headers, + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// SubscribePendingTxs creates a subscription that writes transaction hashes for +// transactions that enter the transaction pool. +func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: PendingTransactionsSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: hashes, + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +type filterIndex map[Type]map[rpc.ID]*subscription + +// broadcast event to filters that match criteria. +func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { + if ev == nil { + return + } + + switch e := ev.(type) { + case []*types.Log: + if len(e) > 0 { + for _, f := range filters[LogsSubscription] { + if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs + } + } + } + case core.RemovedLogsEvent: + for _, f := range filters[LogsSubscription] { + if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs + } + } + case *event.TypeMuxEvent: + if muxe, ok := e.Data.(core.PendingLogsEvent); ok { + for _, f := range filters[PendingLogsSubscription] { + if e.Time.After(f.created) { + if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs + } + } + } + } + case core.NewTxsEvent: + hashes := make([]common.Hash, 0, len(e.Txs)) + for _, tx := range e.Txs { + hashes = append(hashes, tx.Hash()) + } + for _, f := range filters[PendingTransactionsSubscription] { + f.hashes <- hashes + } + case core.ChainEvent: + for _, f := range filters[BlocksSubscription] { + f.headers <- e.Block.Header() + } + if es.lightMode && len(filters[LogsSubscription]) > 0 { + es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) { + for _, f := range filters[LogsSubscription] { + if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { + f.logs <- matchedLogs + } + } + }) + } + } +} + +func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) { + oldh := es.lastHead + es.lastHead = newHeader + if oldh == nil { + return + } + newh := newHeader + // find common ancestor, create list of rolled back and new block hashes + var oldHeaders, newHeaders []*types.Header + for oldh.Hash() != newh.Hash() { + if oldh.Number.Uint64() >= newh.Number.Uint64() { + oldHeaders = append(oldHeaders, oldh) + oldh = rawdb.ReadHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1) + } + if oldh.Number.Uint64() < newh.Number.Uint64() { + newHeaders = append(newHeaders, newh) + newh = rawdb.ReadHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1) + if newh == nil { + // happens when CHT syncing, nothing to do + newh = oldh + } + } + } + // roll back old blocks + for _, h := range oldHeaders { + callBack(h, true) + } + // check new blocks (array is in reverse order) + for i := len(newHeaders) - 1; i >= 0; i-- { + callBack(newHeaders[i], false) + } +} + +// filter logs of a single header in light client mode +func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log { + if bloomFilter(header.Bloom, addresses, topics) { + // Get the logs of the block + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + logsList, err := es.backend.GetLogs(ctx, header.Hash()) + if err != nil { + return nil + } + var unfiltered []*types.Log + for _, logs := range logsList { + for _, log := range logs { + logcopy := *log + logcopy.Removed = remove + unfiltered = append(unfiltered, &logcopy) + } + } + logs := filterLogs(unfiltered, nil, nil, addresses, topics) + if len(logs) > 0 && logs[0].TxHash == (common.Hash{}) { + // We have matching but non-derived logs + receipts, err := es.backend.GetReceipts(ctx, header.Hash()) + if err != nil { + return nil + } + unfiltered = unfiltered[:0] + for _, receipt := range receipts { + for _, log := range receipt.Logs { + logcopy := *log + logcopy.Removed = remove + unfiltered = append(unfiltered, &logcopy) + } + } + logs = filterLogs(unfiltered, nil, nil, addresses, topics) + } + return logs + } + return nil +} + +// eventLoop (un)installs filters and processes mux events. +func (es *EventSystem) eventLoop() { + // Ensure all subscriptions get cleaned up + defer func() { + es.pendingLogSub.Unsubscribe() + es.txsSub.Unsubscribe() + es.logsSub.Unsubscribe() + es.rmLogsSub.Unsubscribe() + es.chainSub.Unsubscribe() + }() + + index := make(filterIndex) + for i := UnknownSubscription; i < LastIndexSubscription; i++ { + index[i] = make(map[rpc.ID]*subscription) + } + + for { + select { + // Handle subscribed events + case ev := <-es.txsCh: + es.broadcast(index, ev) + case ev := <-es.logsCh: + es.broadcast(index, ev) + case ev := <-es.rmLogsCh: + es.broadcast(index, ev) + case ev := <-es.chainCh: + es.broadcast(index, ev) + case ev, active := <-es.pendingLogSub.Chan(): + if !active { // system stopped + return + } + es.broadcast(index, ev) + + case f := <-es.install: + if f.typ == MinedAndPendingLogsSubscription { + // the type are logs and pending logs subscriptions + index[LogsSubscription][f.id] = f + index[PendingLogsSubscription][f.id] = f + } else { + index[f.typ][f.id] = f + } + close(f.installed) + + case f := <-es.uninstall: + if f.typ == MinedAndPendingLogsSubscription { + // the type are logs and pending logs subscriptions + delete(index[LogsSubscription], f.id) + delete(index[PendingLogsSubscription], f.id) + } else { + delete(index[f.typ], f.id) + } + close(f.err) + + // System stopped + case <-es.txsSub.Err(): + return + case <-es.logsSub.Err(): + return + case <-es.rmLogsSub.Err(): + return + case <-es.chainSub.Err(): + return + } + } +} diff --git a/eth/gasprice/gasprice.go b/eth/gasprice/gasprice.go new file mode 100644 index 0000000..9b632c2 --- /dev/null +++ b/eth/gasprice/gasprice.go @@ -0,0 +1,189 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package gasprice + +import ( + "context" + "math/big" + "sort" + "sync" + + "github.com/ava-labs/coreth/internal/ethapi" + "github.com/ava-labs/coreth/params" + "github.com/ava-labs/coreth/rpc" + "github.com/ava-labs/go-ethereum/common" + "github.com/ava-labs/go-ethereum/core/types" +) + +var maxPrice = big.NewInt(500 * params.GWei) + +type Config struct { + Blocks int + Percentile int + Default *big.Int `toml:",omitempty"` +} + +// Oracle recommends gas prices based on the content of recent +// blocks. Suitable for both light and full clients. +type Oracle struct { + backend ethapi.Backend + lastHead common.Hash + lastPrice *big.Int + cacheLock sync.RWMutex + fetchLock sync.Mutex + + checkBlocks, maxEmpty, maxBlocks int + percentile int +} + +// NewOracle returns a new oracle. +func NewOracle(backend ethapi.Backend, params Config) *Oracle { + blocks := params.Blocks + if blocks < 1 { + blocks = 1 + } + percent := params.Percentile + if percent < 0 { + percent = 0 + } + if percent > 100 { + percent = 100 + } + return &Oracle{ + backend: backend, + lastPrice: params.Default, + checkBlocks: blocks, + maxEmpty: blocks / 2, + maxBlocks: blocks * 5, + percentile: percent, + } +} + +// SuggestPrice returns the recommended gas price. +func (gpo *Oracle) SuggestPrice(ctx context.Context) (*big.Int, error) { + gpo.cacheLock.RLock() + lastHead := gpo.lastHead + lastPrice := gpo.lastPrice + gpo.cacheLock.RUnlock() + + head, _ := gpo.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) + headHash := head.Hash() + if headHash == lastHead { + return lastPrice, nil + } + + gpo.fetchLock.Lock() + defer gpo.fetchLock.Unlock() + + // try checking the cache again, maybe the last fetch fetched what we need + gpo.cacheLock.RLock() + lastHead = gpo.lastHead + lastPrice = gpo.lastPrice + gpo.cacheLock.RUnlock() + if headHash == lastHead { + return lastPrice, nil + } + + blockNum := head.Number.Uint64() + ch := make(chan getBlockPricesResult, gpo.checkBlocks) + sent := 0 + exp := 0 + var blockPrices []*big.Int + for sent < gpo.checkBlocks && blockNum > 0 { + go gpo.getBlockPrices(ctx, types.MakeSigner(gpo.backend.ChainConfig(), big.NewInt(int64(blockNum))), blockNum, ch) + sent++ + exp++ + blockNum-- + } + maxEmpty := gpo.maxEmpty + for exp > 0 { + res := <-ch + if res.err != nil { + return lastPrice, res.err + } + exp-- + if res.price != nil { + blockPrices = append(blockPrices, res.price) + continue + } + if maxEmpty > 0 { + maxEmpty-- + continue + } + if blockNum > 0 && sent < gpo.maxBlocks { + go gpo.getBlockPrices(ctx, types.MakeSigner(gpo.backend.ChainConfig(), big.NewInt(int64(blockNum))), blockNum, ch) + sent++ + exp++ + blockNum-- + } + } + price := lastPrice + if len(blockPrices) > 0 { + sort.Sort(bigIntArray(blockPrices)) + price = blockPrices[(len(blockPrices)-1)*gpo.percentile/100] + } + if price.Cmp(maxPrice) > 0 { + price = new(big.Int).Set(maxPrice) + } + + gpo.cacheLock.Lock() + gpo.lastHead = headHash + gpo.lastPrice = price + gpo.cacheLock.Unlock() + return price, nil +} + +type getBlockPricesResult struct { + price *big.Int + err error +} + +type transactionsByGasPrice []*types.Transaction + +func (t transactionsByGasPrice) Len() int { return len(t) } +func (t transactionsByGasPrice) Swap(i, j int) { t[i], t[j] = t[j], t[i] } +func (t transactionsByGasPrice) Less(i, j int) bool { return t[i].GasPrice().Cmp(t[j].GasPrice()) < 0 } + +// getBlockPrices calculates the lowest transaction gas price in a given block +// and sends it to the result channel. If the block is empty, price is nil. +func (gpo *Oracle) getBlockPrices(ctx context.Context, signer types.Signer, blockNum uint64, ch chan getBlockPricesResult) { + block, err := gpo.backend.BlockByNumber(ctx, rpc.BlockNumber(blockNum)) + if block == nil { + ch <- getBlockPricesResult{nil, err} + return + } + + blockTxs := block.Transactions() + txs := make([]*types.Transaction, len(blockTxs)) + copy(txs, blockTxs) + sort.Sort(transactionsByGasPrice(txs)) + + for _, tx := range txs { + sender, err := types.Sender(signer, tx) + if err == nil && sender != block