diff options
Diffstat (limited to 'eth')
-rw-r--r-- | eth/api.go | 533 | ||||
-rw-r--r-- | eth/api_backend.go | 244 | ||||
-rw-r--r-- | eth/api_tracer.go | 823 | ||||
-rw-r--r-- | eth/backend.go | 566 | ||||
-rw-r--r-- | eth/bloombits.go | 138 | ||||
-rw-r--r-- | eth/config.go | 157 | ||||
-rw-r--r-- | eth/enr_entry.go | 61 | ||||
-rw-r--r-- | eth/gen_config.go | 221 | ||||
-rw-r--r-- | eth/handler.go | 844 | ||||
-rw-r--r-- | eth/metrics.go | 139 | ||||
-rw-r--r-- | eth/peer.go | 546 | ||||
-rw-r--r-- | eth/protocol.go | 196 | ||||
-rw-r--r-- | eth/sync.go | 216 |
13 files changed, 4684 insertions, 0 deletions
diff --git a/eth/api.go b/eth/api.go new file mode 100644 index 0000000..34f41db --- /dev/null +++ b/eth/api.go @@ -0,0 +1,533 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "compress/gzip" + "context" + "errors" + "fmt" + "io" + "math/big" + "os" + "runtime" + "strings" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/Determinant/coreth/internal/ethapi" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/trie" +) + +// PublicEthereumAPI provides an API to access Ethereum full node-related +// information. +type PublicEthereumAPI struct { + e *Ethereum +} + +// NewPublicEthereumAPI creates a new Ethereum protocol API for full nodes. +func NewPublicEthereumAPI(e *Ethereum) *PublicEthereumAPI { + return &PublicEthereumAPI{e} +} + +// Etherbase is the address that mining rewards will be send to +func (api *PublicEthereumAPI) Etherbase() (common.Address, error) { + return api.e.Etherbase() +} + +// Coinbase is the address that mining rewards will be send to (alias for Etherbase) +func (api *PublicEthereumAPI) Coinbase() (common.Address, error) { + return api.Etherbase() +} + +// Hashrate returns the POW hashrate +func (api *PublicEthereumAPI) Hashrate() hexutil.Uint64 { + return hexutil.Uint64(api.e.Miner().HashRate()) +} + +// ChainId is the EIP-155 replay-protection chain id for the current ethereum chain config. +func (api *PublicEthereumAPI) ChainId() hexutil.Uint64 { + chainID := new(big.Int) + if config := api.e.blockchain.Config(); config.IsEIP155(api.e.blockchain.CurrentBlock().Number()) { + chainID = config.ChainID + } + return (hexutil.Uint64)(chainID.Uint64()) +} + +// PublicMinerAPI provides an API to control the miner. +// It offers only methods that operate on data that pose no security risk when it is publicly accessible. +type PublicMinerAPI struct { + e *Ethereum +} + +// NewPublicMinerAPI create a new PublicMinerAPI instance. +func NewPublicMinerAPI(e *Ethereum) *PublicMinerAPI { + return &PublicMinerAPI{e} +} + +// Mining returns an indication if this node is currently mining. +func (api *PublicMinerAPI) Mining() bool { + return api.e.IsMining() +} + +// PrivateMinerAPI provides private RPC methods to control the miner. +// These methods can be abused by external users and must be considered insecure for use by untrusted users. +type PrivateMinerAPI struct { + e *Ethereum +} + +// NewPrivateMinerAPI create a new RPC service which controls the miner of this node. +func NewPrivateMinerAPI(e *Ethereum) *PrivateMinerAPI { + return &PrivateMinerAPI{e: e} +} + +// Start starts the miner with the given number of threads. If threads is nil, +// the number of workers started is equal to the number of logical CPUs that are +// usable by this process. If mining is already running, this method adjust the +// number of threads allowed to use and updates the minimum price required by the +// transaction pool. +func (api *PrivateMinerAPI) Start(threads *int) error { + if threads == nil { + return api.e.StartMining(runtime.NumCPU()) + } + return api.e.StartMining(*threads) +} + +// Stop terminates the miner, both at the consensus engine level as well as at +// the block creation level. +func (api *PrivateMinerAPI) Stop() { + api.e.StopMining() +} + +// SetExtra sets the extra data string that is included when this miner mines a block. +func (api *PrivateMinerAPI) SetExtra(extra string) (bool, error) { + if err := api.e.Miner().SetExtra([]byte(extra)); err != nil { + return false, err + } + return true, nil +} + +// SetGasPrice sets the minimum accepted gas price for the miner. +func (api *PrivateMinerAPI) SetGasPrice(gasPrice hexutil.Big) bool { + api.e.lock.Lock() + api.e.gasPrice = (*big.Int)(&gasPrice) + api.e.lock.Unlock() + + api.e.txPool.SetGasPrice((*big.Int)(&gasPrice)) + return true +} + +// SetEtherbase sets the etherbase of the miner +func (api *PrivateMinerAPI) SetEtherbase(etherbase common.Address) bool { + api.e.SetEtherbase(etherbase) + return true +} + +// SetRecommitInterval updates the interval for miner sealing work recommitting. +func (api *PrivateMinerAPI) SetRecommitInterval(interval int) { + api.e.Miner().SetRecommitInterval(time.Duration(interval) * time.Millisecond) +} + +// GetHashrate returns the current hashrate of the miner. +func (api *PrivateMinerAPI) GetHashrate() uint64 { + return api.e.miner.HashRate() +} + +// PrivateAdminAPI is the collection of Ethereum full node-related APIs +// exposed over the private admin endpoint. +type PrivateAdminAPI struct { + eth *Ethereum +} + +// NewPrivateAdminAPI creates a new API definition for the full node private +// admin methods of the Ethereum service. +func NewPrivateAdminAPI(eth *Ethereum) *PrivateAdminAPI { + return &PrivateAdminAPI{eth: eth} +} + +// ExportChain exports the current blockchain into a local file. +func (api *PrivateAdminAPI) ExportChain(file string) (bool, error) { + // Make sure we can create the file to export into + out, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.ModePerm) + if err != nil { + return false, err + } + defer out.Close() + + var writer io.Writer = out + if strings.HasSuffix(file, ".gz") { + writer = gzip.NewWriter(writer) + defer writer.(*gzip.Writer).Close() + } + + // Export the blockchain + if err := api.eth.BlockChain().Export(writer); err != nil { + return false, err + } + return true, nil +} + +func hasAllBlocks(chain *core.BlockChain, bs []*types.Block) bool { + for _, b := range bs { + if !chain.HasBlock(b.Hash(), b.NumberU64()) { + return false + } + } + + return true +} + +// ImportChain imports a blockchain from a local file. +func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) { + // Make sure the can access the file to import + in, err := os.Open(file) + if err != nil { + return false, err + } + defer in.Close() + + var reader io.Reader = in + if strings.HasSuffix(file, ".gz") { + if reader, err = gzip.NewReader(reader); err != nil { + return false, err + } + } + + // Run actual the import in pre-configured batches + stream := rlp.NewStream(reader, 0) + + blocks, index := make([]*types.Block, 0, 2500), 0 + for batch := 0; ; batch++ { + // Load a batch of blocks from the input file + for len(blocks) < cap(blocks) { + block := new(types.Block) + if err := stream.Decode(block); err == io.EOF { + break + } else if err != nil { + return false, fmt.Errorf("block %d: failed to parse: %v", index, err) + } + blocks = append(blocks, block) + index++ + } + if len(blocks) == 0 { + break + } + + if hasAllBlocks(api.eth.BlockChain(), blocks) { + blocks = blocks[:0] + continue + } + // Import the batch and reset the buffer + if _, err := api.eth.BlockChain().InsertChain(blocks); err != nil { + return false, fmt.Errorf("batch %d: failed to insert: %v", batch, err) + } + blocks = blocks[:0] + } + return true, nil +} + +// PublicDebugAPI is the collection of Ethereum full node APIs exposed +// over the public debugging endpoint. +type PublicDebugAPI struct { + eth *Ethereum +} + +// NewPublicDebugAPI creates a new API definition for the full node- +// related public debug methods of the Ethereum service. +func NewPublicDebugAPI(eth *Ethereum) *PublicDebugAPI { + return &PublicDebugAPI{eth: eth} +} + +// DumpBlock retrieves the entire state of the database at a given block. +func (api *PublicDebugAPI) DumpBlock(blockNr rpc.BlockNumber) (state.Dump, error) { + if blockNr == rpc.PendingBlockNumber { + // If we're dumping the pending state, we need to request + // both the pending block as well as the pending state from + // the miner and operate on those + _, stateDb := api.eth.miner.Pending() + return stateDb.RawDump(false, false, true), nil + } + var block *types.Block + if blockNr == rpc.LatestBlockNumber { + block = api.eth.blockchain.CurrentBlock() + } else { + block = api.eth.blockchain.GetBlockByNumber(uint64(blockNr)) + } + if block == nil { + return state.Dump{}, fmt.Errorf("block #%d not found", blockNr) + } + stateDb, err := api.eth.BlockChain().StateAt(block.Root()) + if err != nil { + return state.Dump{}, err + } + return stateDb.RawDump(false, false, true), nil +} + +// PrivateDebugAPI is the collection of Ethereum full node APIs exposed over +// the private debugging endpoint. +type PrivateDebugAPI struct { + eth *Ethereum +} + +// NewPrivateDebugAPI creates a new API definition for the full node-related +// private debug methods of the Ethereum service. +func NewPrivateDebugAPI(eth *Ethereum) *PrivateDebugAPI { + return &PrivateDebugAPI{eth: eth} +} + +// Preimage is a debug API function that returns the preimage for a sha3 hash, if known. +func (api *PrivateDebugAPI) Preimage(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) { + if preimage := rawdb.ReadPreimage(api.eth.ChainDb(), hash); preimage != nil { + return preimage, nil + } + return nil, errors.New("unknown preimage") +} + +// BadBlockArgs represents the entries in the list returned when bad blocks are queried. +type BadBlockArgs struct { + Hash common.Hash `json:"hash"` + Block map[string]interface{} `json:"block"` + RLP string `json:"rlp"` +} + +// GetBadBlocks returns a list of the last 'bad blocks' that the client has seen on the network +// and returns them as a JSON list of block-hashes +func (api *PrivateDebugAPI) GetBadBlocks(ctx context.Context) ([]*BadBlockArgs, error) { + blocks := api.eth.BlockChain().BadBlocks() + results := make([]*BadBlockArgs, len(blocks)) + + var err error + for i, block := range blocks { + results[i] = &BadBlockArgs{ + Hash: block.Hash(), + } + if rlpBytes, err := rlp.EncodeToBytes(block); err != nil { + results[i].RLP = err.Error() // Hacky, but hey, it works + } else { + results[i].RLP = fmt.Sprintf("0x%x", rlpBytes) + } + if results[i].Block, err = ethapi.RPCMarshalBlock(block, true, true); err != nil { + results[i].Block = map[string]interface{}{"error": err.Error()} + } + } + return results, nil +} + +// AccountRangeResult returns a mapping from the hash of an account addresses +// to its preimage. It will return the JSON null if no preimage is found. +// Since a query can return a limited amount of results, a "next" field is +// also present for paging. +type AccountRangeResult struct { + Accounts map[common.Hash]*common.Address `json:"accounts"` + Next common.Hash `json:"next"` +} + +func accountRange(st state.Trie, start *common.Hash, maxResults int) (AccountRangeResult, error) { + if start == nil { + start = &common.Hash{0} + } + it := trie.NewIterator(st.NodeIterator(start.Bytes())) + result := AccountRangeResult{Accounts: make(map[common.Hash]*common.Address), Next: common.Hash{}} + + if maxResults > AccountRangeMaxResults { + maxResults = AccountRangeMaxResults + } + + for i := 0; i < maxResults && it.Next(); i++ { + if preimage := st.GetKey(it.Key); preimage != nil { + addr := &common.Address{} + addr.SetBytes(preimage) + result.Accounts[common.BytesToHash(it.Key)] = addr + } else { + result.Accounts[common.BytesToHash(it.Key)] = nil + } + } + + if it.Next() { + result.Next = common.BytesToHash(it.Key) + } + + return result, nil +} + +// AccountRangeMaxResults is the maximum number of results to be returned per call +const AccountRangeMaxResults = 256 + +// AccountRange enumerates all accounts in the latest state +func (api *PrivateDebugAPI) AccountRange(ctx context.Context, start *common.Hash, maxResults int) (AccountRangeResult, error) { + var statedb *state.StateDB + var err error + block := api.eth.blockchain.CurrentBlock() + + if len(block.Transactions()) == 0 { + statedb, err = api.computeStateDB(block, defaultTraceReexec) + if err != nil { + return AccountRangeResult{}, err + } + } else { + _, _, statedb, err = api.computeTxEnv(block.Hash(), len(block.Transactions())-1, 0) + if err != nil { + return AccountRangeResult{}, err + } + } + + trie, err := statedb.Database().OpenTrie(block.Header().Root) + if err != nil { + return AccountRangeResult{}, err + } + + return accountRange(trie, start, maxResults) +} + +// StorageRangeResult is the result of a debug_storageRangeAt API call. +type StorageRangeResult struct { + Storage storageMap `json:"storage"` + NextKey *common.Hash `json:"nextKey"` // nil if Storage includes the last key in the trie. +} + +type storageMap map[common.Hash]storageEntry + +type storageEntry struct { + Key *common.Hash `json:"key"` + Value common.Hash `json:"value"` +} + +// StorageRangeAt returns the storage at the given block height and transaction index. +func (api *PrivateDebugAPI) StorageRangeAt(ctx context.Context, blockHash common.Hash, txIndex int, contractAddress common.Address, keyStart hexutil.Bytes, maxResult int) (StorageRangeResult, error) { + _, _, statedb, err := api.computeTxEnv(blockHash, txIndex, 0) + if err != nil { + return StorageRangeResult{}, err + } + st := statedb.StorageTrie(contractAddress) + if st == nil { + return StorageRangeResult{}, fmt.Errorf("account %x doesn't exist", contractAddress) + } + return storageRangeAt(st, keyStart, maxResult) +} + +func storageRangeAt(st state.Trie, start []byte, maxResult int) (StorageRangeResult, error) { + it := trie.NewIterator(st.NodeIterator(start)) + result := StorageRangeResult{Storage: storageMap{}} + for i := 0; i < maxResult && it.Next(); i++ { + _, content, _, err := rlp.Split(it.Value) + if err != nil { + return StorageRangeResult{}, err + } + e := storageEntry{Value: common.BytesToHash(content)} + if preimage := st.GetKey(it.Key); preimage != nil { + preimage := common.BytesToHash(preimage) + e.Key = &preimage + } + result.Storage[common.BytesToHash(it.Key)] = e + } + // Add the 'next key' so clients can continue downloading. + if it.Next() { + next := common.BytesToHash(it.Key) + result.NextKey = &next + } + return result, nil +} + +// GetModifiedAccountsByNumber returns all accounts that have changed between the +// two blocks specified. A change is defined as a difference in nonce, balance, +// code hash, or storage hash. +// +// With one parameter, returns the list of accounts modified in the specified block. +func (api *PrivateDebugAPI) GetModifiedAccountsByNumber(startNum uint64, endNum *uint64) ([]common.Address, error) { + var startBlock, endBlock *types.Block + + startBlock = api.eth.blockchain.GetBlockByNumber(startNum) + if startBlock == nil { + return nil, fmt.Errorf("start block %x not found", startNum) + } + + if endNum == nil { + endBlock = startBlock + startBlock = api.eth.blockchain.GetBlockByHash(startBlock.ParentHash()) + if startBlock == nil { + return nil, fmt.Errorf("block %x has no parent", endBlock.Number()) + } + } else { + endBlock = api.eth.blockchain.GetBlockByNumber(*endNum) + if endBlock == nil { + return nil, fmt.Errorf("end block %d not found", *endNum) + } + } + return api.getModifiedAccounts(startBlock, endBlock) +} + +// GetModifiedAccountsByHash returns all accounts that have changed between the +// two blocks specified. A change is defined as a difference in nonce, balance, +// code hash, or storage hash. +// +// With one parameter, returns the list of accounts modified in the specified block. +func (api *PrivateDebugAPI) GetModifiedAccountsByHash(startHash common.Hash, endHash *common.Hash) ([]common.Address, error) { + var startBlock, endBlock *types.Block + startBlock = api.eth.blockchain.GetBlockByHash(startHash) + if startBlock == nil { + return nil, fmt.Errorf("start block %x not found", startHash) + } + + if endHash == nil { + endBlock = startBlock + startBlock = api.eth.blockchain.GetBlockByHash(startBlock.ParentHash()) + if startBlock == nil { + return nil, fmt.Errorf("block %x has no parent", endBlock.Number()) + } + } else { + endBlock = api.eth.blockchain.GetBlockByHash(*endHash) + if endBlock == nil { + return nil, fmt.Errorf("end block %x not found", *endHash) + } + } + return api.getModifiedAccounts(startBlock, endBlock) +} + +func (api *PrivateDebugAPI) getModifiedAccounts(startBlock, endBlock *types.Block) ([]common.Address, error) { + if startBlock.Number().Uint64() >= endBlock.Number().Uint64() { + return nil, fmt.Errorf("start block height (%d) must be less than end block height (%d)", startBlock.Number().Uint64(), endBlock.Number().Uint64()) + } + triedb := api.eth.BlockChain().StateCache().TrieDB() + + oldTrie, err := trie.NewSecure(startBlock.Root(), triedb) + if err != nil { + return nil, err + } + newTrie, err := trie.NewSecure(endBlock.Root(), triedb) + if err != nil { + return nil, err + } + diff, _ := trie.NewDifferenceIterator(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{})) + iter := trie.NewIterator(diff) + + var dirty []common.Address + for iter.Next() { + key := newTrie.GetKey(iter.Key) + if key == nil { + return nil, fmt.Errorf("no preimage found for hash %x", iter.Key) + } + dirty = append(dirty, common.BytesToAddress(key)) + } + return dirty, nil +} diff --git a/eth/api_backend.go b/eth/api_backend.go new file mode 100644 index 0000000..69904a7 --- /dev/null +++ b/eth/api_backend.go @@ -0,0 +1,244 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "context" + "errors" + "math/big" + + "github.com/ethereum/go-ethereum/accounts" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/math" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/eth/gasprice" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rpc" +) + +// EthAPIBackend implements ethapi.Backend for full nodes +type EthAPIBackend struct { + extRPCEnabled bool + eth *Ethereum + gpo *gasprice.Oracle +} + +// ChainConfig returns the active chain configuration. +func (b *EthAPIBackend) ChainConfig() *params.ChainConfig { + return b.eth.blockchain.Config() +} + +func (b *EthAPIBackend) CurrentBlock() *types.Block { + return b.eth.blockchain.CurrentBlock() +} + +func (b *EthAPIBackend) SetHead(number uint64) { + b.eth.protocolManager.downloader.Cancel() + b.eth.blockchain.SetHead(number) +} + +func (b *EthAPIBackend) HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error) { + // Pending block is only known by the miner + if number == rpc.PendingBlockNumber { + block := b.eth.miner.PendingBlock() + return block.Header(), nil + } + // Otherwise resolve and return the block + if number == rpc.LatestBlockNumber { + return b.eth.blockchain.CurrentBlock().Header(), nil + } + return b.eth.blockchain.GetHeaderByNumber(uint64(number)), nil +} + +func (b *EthAPIBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + return b.eth.blockchain.GetHeaderByHash(hash), nil +} + +func (b *EthAPIBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error) { + // Pending block is only known by the miner + if number == rpc.PendingBlockNumber { + block := b.eth.miner.PendingBlock() + return block, nil + } + // Otherwise resolve and return the block + if number == rpc.LatestBlockNumber { + return b.eth.blockchain.CurrentBlock(), nil + } + return b.eth.blockchain.GetBlockByNumber(uint64(number)), nil +} + +func (b *EthAPIBackend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { + return b.eth.blockchain.GetBlockByHash(hash), nil +} + +func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*state.StateDB, *types.Header, error) { + // Pending state is only known by the miner + if number == rpc.PendingBlockNumber { + block, state := b.eth.miner.Pending() + return state, block.Header(), nil + } + // Otherwise resolve the block number and return its state + header, err := b.HeaderByNumber(ctx, number) + if err != nil { + return nil, nil, err + } + if header == nil { + return nil, nil, errors.New("header not found") + } + stateDb, err := b.eth.BlockChain().StateAt(header.Root) + return stateDb, header, err +} + +func (b *EthAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) { + return b.eth.blockchain.GetReceiptsByHash(hash), nil +} + +func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) { + receipts := b.eth.blockchain.GetReceiptsByHash(hash) + if receipts == nil { + return nil, nil + } + logs := make([][]*types.Log, len(receipts)) + for i, receipt := range receipts { + logs[i] = receipt.Logs + } + return logs, nil +} + +func (b *EthAPIBackend) GetTd(blockHash common.Hash) *big.Int { + return b.eth.blockchain.GetTdByHash(blockHash) +} + +func (b *EthAPIBackend) GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header) (*vm.EVM, func() error, error) { + state.SetBalance(msg.From(), math.MaxBig256) + vmError := func() error { return nil } + + context := core.NewEVMContext(msg, header, b.eth.BlockChain(), nil) + return vm.NewEVM(context, state, b.eth.blockchain.Config(), *b.eth.blockchain.GetVMConfig()), vmError, nil +} + +func (b *EthAPIBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch) +} + +func (b *EthAPIBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { + return b.eth.BlockChain().SubscribeChainEvent(ch) +} + +func (b *EthAPIBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return b.eth.BlockChain().SubscribeChainHeadEvent(ch) +} + +func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription { + return b.eth.BlockChain().SubscribeChainSideEvent(ch) +} + +func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return b.eth.BlockChain().SubscribeLogsEvent(ch) +} + +func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { + return b.eth.txPool.AddLocal(signedTx) +} + +func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { + pending, err := b.eth.txPool.Pending() + if err != nil { + return nil, err + } + var txs types.Transactions + for _, batch := range pending { + txs = append(txs, batch...) + } + return txs, nil +} + +func (b *EthAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction { + return b.eth.txPool.Get(hash) +} + +func (b *EthAPIBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { + tx, blockHash, blockNumber, index := rawdb.ReadTransaction(b.eth.ChainDb(), txHash) + return tx, blockHash, blockNumber, index, nil +} + +func (b *EthAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) { + return b.eth.txPool.Nonce(addr), nil +} + +func (b *EthAPIBackend) Stats() (pending int, queued int) { + return b.eth.txPool.Stats() +} + +func (b *EthAPIBackend) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) { + return b.eth.TxPool().Content() +} + +func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { + return b.eth.TxPool().SubscribeNewTxsEvent(ch) +} + +func (b *EthAPIBackend) Downloader() *downloader.Downloader { + return b.eth.Downloader() +} + +func (b *EthAPIBackend) ProtocolVersion() int { + return b.eth.EthVersion() +} + +func (b *EthAPIBackend) SuggestPrice(ctx context.Context) (*big.Int, error) { + return b.gpo.SuggestPrice(ctx) +} + +func (b *EthAPIBackend) ChainDb() ethdb.Database { + return b.eth.ChainDb() +} + +func (b *EthAPIBackend) EventMux() *event.TypeMux { + return b.eth.EventMux() +} + +func (b *EthAPIBackend) AccountManager() *accounts.Manager { + return b.eth.AccountManager() +} + +func (b *EthAPIBackend) ExtRPCEnabled() bool { + return b.extRPCEnabled +} + +func (b *EthAPIBackend) RPCGasCap() *big.Int { + return b.eth.config.RPCGasCap +} + +func (b *EthAPIBackend) BloomStatus() (uint64, uint64) { + sections, _, _ := b.eth.bloomIndexer.Sections() + return params.BloomBitsBlocks, sections +} + +func (b *EthAPIBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { + for i := 0; i < bloomFilterThreads; i++ { + go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, b.eth.bloomRequests) + } +} diff --git a/eth/api_tracer.go b/eth/api_tracer.go new file mode 100644 index 0000000..7d983fa --- /dev/null +++ b/eth/api_tracer.go @@ -0,0 +1,823 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "io/ioutil" + "os" + "runtime" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/eth/tracers" + "github.com/Determinant/coreth/internal/ethapi" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/trie" +) + +const ( + // defaultTraceTimeout is the amount of time a single transaction can execute + // by default before being forcefully aborted. + defaultTraceTimeout = 5 * time.Second + + // defaultTraceReexec is the number of blocks the tracer is willing to go back + // and reexecute to produce missing historical state necessary to run a specific + // trace. + defaultTraceReexec = uint64(128) +) + +// TraceConfig holds extra parameters to trace functions. +type TraceConfig struct { + *vm.LogConfig + Tracer *string + Timeout *string + Reexec *uint64 +} + +// StdTraceConfig holds extra parameters to standard-json trace functions. +type StdTraceConfig struct { + *vm.LogConfig + Reexec *uint64 + TxHash common.Hash +} + +// txTraceResult is the result of a single transaction trace. +type txTraceResult struct { + Result interface{} `json:"result,omitempty"` // Trace results produced by the tracer + Error string `json:"error,omitempty"` // Trace failure produced by the tracer +} + +// blockTraceTask represents a single block trace task when an entire chain is +// being traced. +type blockTraceTask struct { + statedb *state.StateDB // Intermediate state prepped for tracing + block *types.Block // Block to trace the transactions from + rootref common.Hash // Trie root reference held for this task + results []*txTraceResult // Trace results procudes by the task +} + +// blockTraceResult represets the results of tracing a single block when an entire +// chain is being traced. +type blockTraceResult struct { + Block hexutil.Uint64 `json:"block"` // Block number corresponding to this trace + Hash common.Hash `json:"hash"` // Block hash corresponding to this trace + Traces []*txTraceResult `json:"traces"` // Trace results produced by the task +} + +// txTraceTask represents a single transaction trace task when an entire block +// is being traced. +type txTraceTask struct { + statedb *state.StateDB // Intermediate state prepped for tracing + index int // Transaction offset in the block +} + +// TraceChain returns the structured logs created during the execution of EVM +// between two blocks (excluding start) and returns them as a JSON object. +func (api *PrivateDebugAPI) TraceChain(ctx context.Context, start, end rpc.BlockNumber, config *TraceConfig) (*rpc.Subscription, error) { + // Fetch the block interval that we want to trace + var from, to *types.Block + + switch start { + case rpc.PendingBlockNumber: + from = api.eth.miner.PendingBlock() + case rpc.LatestBlockNumber: + from = api.eth.blockchain.CurrentBlock() + default: + from = api.eth.blockchain.GetBlockByNumber(uint64(start)) + } + switch end { + case rpc.PendingBlockNumber: + to = api.eth.miner.PendingBlock() + case rpc.LatestBlockNumber: + to = api.eth.blockchain.CurrentBlock() + default: + to = api.eth.blockchain.GetBlockByNumber(uint64(end)) + } + // Trace the chain if we've found all our blocks + if from == nil { + return nil, fmt.Errorf("starting block #%d not found", start) + } + if to == nil { + return nil, fmt.Errorf("end block #%d not found", end) + } + if from.Number().Cmp(to.Number()) >= 0 { + return nil, fmt.Errorf("end block (#%d) needs to come after start block (#%d)", end, start) + } + return api.traceChain(ctx, from, to, config) +} + +// traceChain configures a new tracer according to the provided configuration, and +// executes all the transactions contained within. The return value will be one item +// per transaction, dependent on the requested tracer. +func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Block, config *TraceConfig) (*rpc.Subscription, error) { + // Tracing a chain is a **long** operation, only do with subscriptions + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + sub := notifier.CreateSubscription() + + // Ensure we have a valid starting state before doing any work + origin := start.NumberU64() + database := state.NewDatabaseWithCache(api.eth.ChainDb(), 16) // Chain tracing will probably start at genesis + + if number := start.NumberU64(); number > 0 { + start = api.eth.blockchain.GetBlock(start.ParentHash(), start.NumberU64()-1) + if start == nil { + return nil, fmt.Errorf("parent block #%d not found", number-1) + } + } + statedb, err := state.New(start.Root(), database) + if err != nil { + // If the starting state is missing, allow some number of blocks to be reexecuted + reexec := defaultTraceReexec + if config != nil && config.Reexec != nil { + reexec = *config.Reexec + } + // Find the most recent block that has the state available + for i := uint64(0); i < reexec; i++ { + start = api.eth.blockchain.GetBlock(start.ParentHash(), start.NumberU64()-1) + if start == nil { + break + } + if statedb, err = state.New(start.Root(), database); err == nil { + break + } + } + // If we still don't have the state available, bail out + if err != nil { + switch err.(type) { + case *trie.MissingNodeError: + return nil, errors.New("required historical state unavailable") + default: + return nil, err + } + } + } + // Execute all the transaction contained within the chain concurrently for each block + blocks := int(end.NumberU64() - origin) + + threads := runtime.NumCPU() + if threads > blocks { + threads = blocks + } + var ( + pend = new(sync.WaitGroup) + tasks = make(chan *blockTraceTask, threads) + results = make(chan *blockTraceTask, threads) + ) + for th := 0; th < threads; th++ { + pend.Add(1) + go func() { + defer pend.Done() + + // Fetch and execute the next block trace tasks + for task := range tasks { + signer := types.MakeSigner(api.eth.blockchain.Config(), task.block.Number()) + + // Trace all the transactions contained within + for i, tx := range task.block.Transactions() { + msg, _ := tx.AsMessage(signer) + vmctx := core.NewEVMContext(msg, task.block.Header(), api.eth.blockchain, nil) + + res, err := api.traceTx(ctx, msg, vmctx, task.statedb, config) + if err != nil { + task.results[i] = &txTraceResult{Error: err.Error()} + log.Warn("Tracing failed", "hash", tx.Hash(), "block", task.block.NumberU64(), "err", err) + break + } + // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect + task.statedb.Finalise(api.eth.blockchain.Config().IsEIP158(task.block.Number())) + task.results[i] = &txTraceResult{Result: res} + } + // Stream the result back to the user or abort on teardown + select { + case results <- task: + case <-notifier.Closed(): + return + } + } + }() + } + // Start a goroutine to feed all the blocks into the tracers + begin := time.Now() + + go func() { + var ( + logged time.Time + number uint64 + traced uint64 + failed error + proot common.Hash + ) + // Ensure everything is properly cleaned up on any exit path + defer func() { + close(tasks) + pend.Wait() + + switch { + case failed != nil: + log.Warn("Chain tracing failed", "start", start.NumberU64(), "end", end.NumberU64(), "transactions", traced, "elapsed", time.Since(begin), "err", failed) + case number < end.NumberU64(): + log.Warn("Chain tracing aborted", "start", start.NumberU64(), "end", end.NumberU64(), "abort", number, "transactions", traced, "elapsed", time.Since(begin)) + default: + log.Info("Chain tracing finished", "start", start.NumberU64(), "end", end.NumberU64(), "transactions", traced, "elapsed", time.Since(begin)) + } + close(results) + }() + // Feed all the blocks both into the tracer, as well as fast process concurrently + for number = start.NumberU64() + 1; number <= end.NumberU64(); number++ { + // Stop tracing if interruption was requested + select { + case <-notifier.Closed(): + return + default: + } + // Print progress logs if long enough time elapsed + if time.Since(logged) > 8*time.Second { + if number > origin { + nodes, imgs := database.TrieDB().Size() + log.Info("Tracing chain segment", "start", origin, "end", end.NumberU64(), "current", number, "transactions", traced, "elapsed", time.Since(begin), "memory", nodes+imgs) + } else { + log.Info("Preparing state for chain trace", "block", number, "start", origin, "elapsed", time.Since(begin)) + } + logged = time.Now() + } + // Retrieve the next block to trace + block := api.eth.blockchain.GetBlockByNumber(number) + if block == nil { + failed = fmt.Errorf("block #%d not found", number) + break + } + // Send the block over to the concurrent tracers (if not in the fast-forward phase) + if number > origin { + txs := block.Transactions() + + select { + case tasks <- &blockTraceTask{statedb: statedb.Copy(), block: block, rootref: proot, results: make([]*txTraceResult, len(txs))}: + case <-notifier.Closed(): + return + } + traced += uint64(len(txs)) + } + // Generate the next state snapshot fast without tracing + _, _, _, err := api.eth.blockchain.Processor().Process(block, statedb, vm.Config{}) + if err != nil { + failed = err + break + } + // Finalize the state so any modifications are written to the trie + root, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number())) + if err != nil { + failed = err + break + } + if err := statedb.Reset(root); err != nil { + failed = err + break + } + // Reference the trie twice, once for us, once for the tracer + database.TrieDB().Reference(root, common.Hash{}) + if number >= origin { + database.TrieDB().Reference(root, common.Hash{}) + } + // Dereference all past tries we ourselves are done working with + if proot != (common.Hash{}) { + database.TrieDB().Dereference(proot) + } + proot = root + + // TODO(karalabe): Do we need the preimages? Won't they accumulate too much? + } + }() + + // Keep reading the trace results and stream the to the user + go func() { + var ( + done = make(map[uint64]*blockTraceResult) + next = origin + 1 + ) + for res := range results { + // Queue up next received result + result := &blockTraceResult{ + Block: hexutil.Uint64(res.block.NumberU64()), + Hash: res.block.Hash(), + Traces: res.results, + } + done[uint64(result.Block)] = result + + // Dereference any paret tries held in memory by this task + database.TrieDB().Dereference(res.rootref) + + // Stream completed traces to the user, aborting on the first error + for result, ok := done[next]; ok; result, ok = done[next] { + if len(result.Traces) > 0 || next == end.NumberU64() { + notifier.Notify(sub.ID, result) + } + delete(done, next) + next++ + } + } + }() + return sub, nil +} + +// TraceBlockByNumber returns the structured logs created during the execution of +// EVM and returns them as a JSON object. +func (api *PrivateDebugAPI) TraceBlockByNumber(ctx context.Context, number rpc.BlockNumber, config *TraceConfig) ([]*txTraceResult, error) { + // Fetch the block that we want to trace + var block *types.Block + + switch number { + case rpc.PendingBlockNumber: + block = api.eth.miner.PendingBlock() + case rpc.LatestBlockNumber: + block = api.eth.blockchain.CurrentBlock() + default: + block = api.eth.blockchain.GetBlockByNumber(uint64(number)) + } + // Trace the block if it was found + if block == nil { + return nil, fmt.Errorf("block #%d not found", number) + } + return api.traceBlock(ctx, block, config) +} + +// TraceBlockByHash returns the structured logs created during the execution of +// EVM and returns them as a JSON object. +func (api *PrivateDebugAPI) TraceBlockByHash(ctx context.Context, hash common.Hash, config *TraceConfig) ([]*txTraceResult, error) { + block := api.eth.blockchain.GetBlockByHash(hash) + if block == nil { + return nil, fmt.Errorf("block %#x not found", hash) + } + return api.traceBlock(ctx, block, config) +} + +// TraceBlock returns the structured logs created during the execution of EVM +// and returns them as a JSON object. +func (api *PrivateDebugAPI) TraceBlock(ctx context.Context, blob []byte, config *TraceConfig) ([]*txTraceResult, error) { + block := new(types.Block) + if err := rlp.Decode(bytes.NewReader(blob), block); err != nil { + return nil, fmt.Errorf("could not decode block: %v", err) + } + return api.traceBlock(ctx, block, config) +} + +// TraceBlockFromFile returns the structured logs created during the execution of +// EVM and returns them as a JSON object. +func (api *PrivateDebugAPI) TraceBlockFromFile(ctx context.Context, file string, config *TraceConfig) ([]*txTraceResult, error) { + blob, err := ioutil.ReadFile(file) + if err != nil { + return nil, fmt.Errorf("could not read file: %v", err) + } + return api.TraceBlock(ctx, blob, config) +} + +// TraceBadBlockByHash returns the structured logs created during the execution of +// EVM against a block pulled from the pool of bad ones and returns them as a JSON +// object. +func (api *PrivateDebugAPI) TraceBadBlock(ctx context.Context, hash common.Hash, config *TraceConfig) ([]*txTraceResult, error) { + blocks := api.eth.blockchain.BadBlocks() + for _, block := range blocks { + if block.Hash() == hash { + return api.traceBlock(ctx, block, config) + } + } + return nil, fmt.Errorf("bad block %#x not found", hash) +} + +// StandardTraceBlockToFile dumps the structured logs created during the +// execution of EVM to the local file system and returns a list of files +// to the caller. +func (api *PrivateDebugAPI) StandardTraceBlockToFile(ctx context.Context, hash common.Hash, config *StdTraceConfig) ([]string, error) { + block := api.eth.blockchain.GetBlockByHash(hash) + if block == nil { + return nil, fmt.Errorf("block %#x not found", hash) + } + return api.standardTraceBlockToFile(ctx, block, config) +} + +// StandardTraceBadBlockToFile dumps the structured logs created during the +// execution of EVM against a block pulled from the pool of bad ones to the +// local file system and returns a list of files to the caller. +func (api *PrivateDebugAPI) StandardTraceBadBlockToFile(ctx context.Context, hash common.Hash, config *StdTraceConfig) ([]string, error) { + blocks := api.eth.blockchain.BadBlocks() + for _, block := range blocks { + if block.Hash() == hash { + return api.standardTraceBlockToFile(ctx, block, config) + } + } + return nil, fmt.Errorf("bad block %#x not found", hash) +} + +// traceBlock configures a new tracer according to the provided configuration, and +// executes all the transactions contained within. The return value will be one item +// per transaction, dependent on the requestd tracer. +func (api *PrivateDebugAPI) traceBlock(ctx context.Context, block *types.Block, config *TraceConfig) ([]*txTraceResult, error) { + // Create the parent state database + if err := api.eth.engine.VerifyHeader(api.eth.blockchain, block.Header(), true); err != nil { + return nil, err + } + parent := api.eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) + if parent == nil { + return nil, fmt.Errorf("parent %#x not found", block.ParentHash()) + } + reexec := defaultTraceReexec + if config != nil && config.Reexec != nil { + reexec = *config.Reexec + } + statedb, err := api.computeStateDB(parent, reexec) + if err != nil { + return nil, err + } + // Execute all the transaction contained within the block concurrently + var ( + signer = types.MakeSigner(api.eth.blockchain.Config(), block.Number()) + + txs = block.Transactions() + results = make([]*txTraceResult, len(txs)) + + pend = new(sync.WaitGroup) + jobs = make(chan *txTraceTask, len(txs)) + ) + threads := runtime.NumCPU() + if threads > len(txs) { + threads = len(txs) + } + for th := 0; th < threads; th++ { + pend.Add(1) + go func() { + defer pend.Done() + + // Fetch and execute the next transaction trace tasks + for task := range jobs { + msg, _ := txs[task.index].AsMessage(signer) + vmctx := core.NewEVMContext(msg, block.Header(), api.eth.blockchain, nil) + + res, err := api.traceTx(ctx, msg, vmctx, task.statedb, config) + if err != nil { + results[task.index] = &txTraceResult{Error: err.Error()} + continue + } + results[task.index] = &txTraceResult{Result: res} + } + }() + } + // Feed the transactions into the tracers and return + var failed error + for i, tx := range txs { + // Send the trace task over for execution + jobs <- &txTraceTask{statedb: statedb.Copy(), index: i} + + // Generate the next state snapshot fast without tracing + msg, _ := tx.AsMessage(signer) + vmctx := core.NewEVMContext(msg, block.Header(), api.eth.blockchain, nil) + + vmenv := vm.NewEVM(vmctx, statedb, api.eth.blockchain.Config(), vm.Config{}) + if _, _, _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas())); err != nil { + failed = err + break + } + // Finalize the state so any modifications are written to the trie + // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect + statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number())) + } + close(jobs) + pend.Wait() + + // If execution failed in between, abort + if failed != nil { + return nil, failed + } + return results, nil +} + +// standardTraceBlockToFile configures a new tracer which uses standard JSON output, +// and traces either a full block or an individual transaction. The return value will +// be one filename per transaction traced. +func (api *PrivateDebugAPI) standardTraceBlockToFile(ctx context.Context, block *types.Block, config *StdTraceConfig) ([]string, error) { + // If we're tracing a single transaction, make sure it's present + if config != nil && config.TxHash != (common.Hash{}) { + if !containsTx(block, config.TxHash) { + return nil, fmt.Errorf("transaction %#x not found in block", config.TxHash) + } + } + // Create the parent state database + if err := api.eth.engine.VerifyHeader(api.eth.blockchain, block.Header(), true); err != nil { + return nil, err + } + parent := api.eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) + if parent == nil { + return nil, fmt.Errorf("parent %#x not found", block.ParentHash()) + } + reexec := defaultTraceReexec + if config != nil && config.Reexec != nil { + reexec = *config.Reexec + } + statedb, err := api.computeStateDB(parent, reexec) + if err != nil { + return nil, err + } + // Retrieve the tracing configurations, or use default values + var ( + logConfig vm.LogConfig + txHash common.Hash + ) + if config != nil { + if config.LogConfig != nil { + logConfig = *config.LogConfig + } + txHash = config.TxHash + } + logConfig.Debug = true + + // Execute transaction, either tracing all or just the requested one + var ( + signer = types.MakeSigner(api.eth.blockchain.Config(), block.Number()) + dumps []string + ) + for i, tx := range block.Transactions() { + // Prepare the trasaction for un-traced execution + var ( + msg, _ = tx.AsMessage(signer) + vmctx = core.NewEVMContext(msg, block.Header(), api.eth.blockchain, nil) + + vmConf vm.Config + dump *os.File + writer *bufio.Writer + err error + ) + // If the transaction needs tracing, swap out the configs + if tx.Hash() == txHash || txHash == (common.Hash{}) { + // Generate a unique temporary file to dump it into + prefix := fmt.Sprintf("block_%#x-%d-%#x-", block.Hash().Bytes()[:4], i, tx.Hash().Bytes()[:4]) + + dump, err = ioutil.TempFile(os.TempDir(), prefix) + if err != nil { + return nil, err + } + dumps = append(dumps, dump.Name()) + + // Swap out the noop logger to the standard tracer + writer = bufio.NewWriter(dump) + vmConf = vm.Config{ + Debug: true, + Tracer: vm.NewJSONLogger(&logConfig, writer), + EnablePreimageRecording: true, + } + } + // Execute the transaction and flush any traces to disk + vmenv := vm.NewEVM(vmctx, statedb, api.eth.blockchain.Config(), vmConf) + _, _, _, err = core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas())) + if writer != nil { + writer.Flush() + } + if dump != nil { + dump.Close() + log.Info("Wrote standard trace", "file", dump.Name()) + } + if err != nil { + return dumps, err + } + // Finalize the state so any modifications are written to the trie + // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect + statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number())) + + // If we've traced the transaction we were looking for, abort + if tx.Hash() == txHash { + break + } + } + return dumps, nil +} + +// containsTx reports whether the transaction with a certain hash +// is contained within the specified block. +func containsTx(block *types.Block, hash common.Hash) bool { + for _, tx := range block.Transactions() { + if tx.Hash() == hash { + return true + } + } + return false +} + +// computeStateDB retrieves the state database associated with a certain block. +// If no state is locally available for the given block, a number of blocks are +// attempted to be reexecuted to generate the desired state. +func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (*state.StateDB, error) { + // If we have the state fully available, use that + statedb, err := api.eth.blockchain.StateAt(block.Root()) + if err == nil { + return statedb, nil + } + // Otherwise try to reexec blocks until we find a state or reach our limit + origin := block.NumberU64() + database := state.NewDatabaseWithCache(api.eth.ChainDb(), 16) + + for i := uint64(0); i < reexec; i++ { + block = api.eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) + if block == nil { + break + } + if statedb, err = state.New(block.Root(), database); err == nil { + break + } + } + if err != nil { + switch err.(type) { + case *trie.MissingNodeError: + return nil, fmt.Errorf("required historical state unavailable (reexec=%d)", reexec) + default: + return nil, err + } + } + // State was available at historical point, regenerate + var ( + start = time.Now() + logged time.Time + proot common.Hash + ) + for block.NumberU64() < origin { + // Print progress logs if long enough time elapsed + if time.Since(logged) > 8*time.Second { + log.Info("Regenerating historical state", "block", block.NumberU64()+1, "target", origin, "remaining", origin-block.NumberU64()-1, "elapsed", time.Since(start)) + logged = time.Now() + } + // Retrieve the next block to regenerate and process it + if block = api.eth.blockchain.GetBlockByNumber(block.NumberU64() + 1); block == nil { + return nil, fmt.Errorf("block #%d not found", block.NumberU64()+1) + } + _, _, _, err := api.eth.blockchain.Processor().Process(block, statedb, vm.Config{}) + if err != nil { + return nil, fmt.Errorf("processing block %d failed: %v", block.NumberU64(), err) + } + // Finalize the state so any modifications are written to the trie + root, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number())) + if err != nil { + return nil, err + } + if err := statedb.Reset(root); err != nil { + return nil, fmt.Errorf("state reset after block %d failed: %v", block.NumberU64(), err) + } + database.TrieDB().Reference(root, common.Hash{}) + if proot != (common.Hash{}) { + database.TrieDB().Dereference(proot) + } + proot = root + } + nodes, imgs := database.TrieDB().Size() + log.Info("Historical state regenerated", "block", block.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs) + return statedb, nil +} + +// TraceTransaction returns the structured logs created during the execution of EVM +// and returns them as a JSON object. +func (api *PrivateDebugAPI) TraceTransaction(ctx context.Context, hash common.Hash, config *TraceConfig) (interface{}, error) { + // Retrieve the transaction and assemble its EVM context + tx, blockHash, _, index := rawdb.ReadTransaction(api.eth.ChainDb(), hash) + if tx == nil { + return nil, fmt.Errorf("transaction %#x not found", hash) + } + reexec := defaultTraceReexec + if config != nil && config.Reexec != nil { + reexec = *config.Reexec + } + msg, vmctx, statedb, err := api.computeTxEnv(blockHash, int(index), reexec) + if err != nil { + return nil, err + } + // Trace the transaction and return + return api.traceTx(ctx, msg, vmctx, statedb, config) +} + +// traceTx configures a new tracer according to the provided configuration, and +// executes the given message in the provided environment. The return value will +// be tracer dependent. +func (api *PrivateDebugAPI) traceTx(ctx context.Context, message core.Message, vmctx vm.Context, statedb *state.StateDB, config *TraceConfig) (interface{}, error) { + // Assemble the structured logger or the JavaScript tracer + var ( + tracer vm.Tracer + err error + ) + switch { + case config != nil && config.Tracer != nil: + // Define a meaningful timeout of a single transaction trace + timeout := defaultTraceTimeout + if config.Timeout != nil { + if timeout, err = time.ParseDuration(*config.Timeout); err != nil { + return nil, err + } + } + // Constuct the JavaScript tracer to execute with + if tracer, err = tracers.New(*config.Tracer); err != nil { + return nil, err + } + // Handle timeouts and RPC cancellations + deadlineCtx, cancel := context.WithTimeout(ctx, timeout) + go func() { + <-deadlineCtx.Done() + tracer.(*tracers.Tracer).Stop(errors.New("execution timeout")) + }() + defer cancel() + + case config == nil: + tracer = vm.NewStructLogger(nil) + + default: + tracer = vm.NewStructLogger(config.LogConfig) + } + // Run the transaction with tracing enabled. + vmenv := vm.NewEVM(vmctx, statedb, api.eth.blockchain.Config(), vm.Config{Debug: true, Tracer: tracer}) + + ret, gas, failed, err := core.ApplyMessage(vmenv, message, new(core.GasPool).AddGas(message.Gas())) + if err != nil { + return nil, fmt.Errorf("tracing failed: %v", err) + } + // Depending on the tracer type, format and return the output + switch tracer := tracer.(type) { + case *vm.StructLogger: + return ðapi.ExecutionResult{ + Gas: gas, + Failed: failed, + ReturnValue: fmt.Sprintf("%x", ret), + StructLogs: ethapi.FormatLogs(tracer.StructLogs()), + }, nil + + case *tracers.Tracer: + return tracer.GetResult() + + default: + panic(fmt.Sprintf("bad tracer type %T", tracer)) + } +} + +// computeTxEnv returns the execution environment of a certain transaction. +func (api *PrivateDebugAPI) computeTxEnv(blockHash common.Hash, txIndex int, reexec uint64) (core.Message, vm.Context, *state.StateDB, error) { + // Create the parent state database + block := api.eth.blockchain.GetBlockByHash(blockHash) + if block == nil { + return nil, vm.Context{}, nil, fmt.Errorf("block %#x not found", blockHash) + } + parent := api.eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) + if parent == nil { + return nil, vm.Context{}, nil, fmt.Errorf("parent %#x not found", block.ParentHash()) + } + statedb, err := api.computeStateDB(parent, reexec) + if err != nil { + return nil, vm.Context{}, nil, err + } + + if txIndex == 0 && len(block.Transactions()) == 0 { + return nil, vm.Context{}, statedb, nil + } + + // Recompute transactions up to the target index. + signer := types.MakeSigner(api.eth.blockchain.Config(), block.Number()) + + for idx, tx := range block.Transactions() { + // Assemble the transaction call message and return if the requested offset + msg, _ := tx.AsMessage(signer) + context := core.NewEVMContext(msg, block.Header(), api.eth.blockchain, nil) + if idx == txIndex { + return msg, context, statedb, nil + } + // Not yet the searched for transaction, execute on top of the current state + vmenv := vm.NewEVM(context, statedb, api.eth.blockchain.Config(), vm.Config{}) + if _, _, _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil { + return nil, vm.Context{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) + } + // Ensure any modifications are committed to the state + // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect + statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number())) + } + return nil, vm.Context{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, blockHash) +} diff --git a/eth/backend.go b/eth/backend.go new file mode 100644 index 0000000..92dfa28 --- /dev/null +++ b/eth/backend.go @@ -0,0 +1,566 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package eth implements the Ethereum protocol. +package eth + +import ( + "errors" + "fmt" + "math/big" + "runtime" + "sync" + "sync/atomic" + + "github.com/ethereum/go-ethereum/accounts" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/consensus/clique" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/eth/filters" + "github.com/ethereum/go-ethereum/eth/gasprice" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/Determinant/coreth/internal/ethapi" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/miner" + "github.com/Determinant/coreth/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" +) + +type LesServer interface { + Start(srvr *p2p.Server) + Stop() + APIs() []rpc.API + Protocols() []p2p.Protocol + SetBloomBitsIndexer(bbIndexer *core.ChainIndexer) + SetContractBackend(bind.ContractBackend) +} + +// Ethereum implements the Ethereum full node service. +type Ethereum struct { + config *Config + + // Channel for shutting down the service + shutdownChan chan bool + + server *p2p.Server + + // Handlers + txPool *core.TxPool + blockchain *core.BlockChain + protocolManager *ProtocolManager + lesServer LesServer + + // DB interfaces + chainDb ethdb.Database // Block chain database + + eventMux *event.TypeMux + engine consensus.Engine + accountManager *accounts.Manager + + bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests + bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports + + APIBackend *EthAPIBackend + + miner *miner.Miner + gasPrice *big.Int + etherbase common.Address + + networkID uint64 + netRPCService *ethapi.PublicNetAPI + + lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase) +} + +func (s *Ethereum) AddLesServer(ls LesServer) { + s.lesServer = ls + ls.SetBloomBitsIndexer(s.bloomIndexer) +} + +// SetClient sets a rpc client which connecting to our local node. +func (s *Ethereum) SetContractBackend(backend bind.ContractBackend) { + // Pass the rpc client to les server if it is enabled. + if s.lesServer != nil { + s.lesServer.SetContractBackend(backend) + } +} + +// New creates a new Ethereum object (including the +// initialisation of the common Ethereum object) +func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { + // Ensure configuration values are compatible and sane + if config.SyncMode == downloader.LightSync { + return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum") + } + if !config.SyncMode.IsValid() { + return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode) + } + if config.Miner.GasPrice == nil || config.Miner.GasPrice.Cmp(common.Big0) <= 0 { + log.Warn("Sanitizing invalid miner gas price", "provided", config.Miner.GasPrice, "updated", DefaultConfig.Miner.GasPrice) + config.Miner.GasPrice = new(big.Int).Set(DefaultConfig.Miner.GasPrice) + } + if config.NoPruning && config.TrieDirtyCache > 0 { + config.TrieCleanCache += config.TrieDirtyCache + config.TrieDirtyCache = 0 + } + log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024) + + // Assemble the Ethereum object + chainDb, err := ctx.OpenDatabaseWithFreezer("chaindata", config.DatabaseCache, config.DatabaseHandles, config.DatabaseFreezer, "eth/db/chaindata/") + if err != nil { + return nil, err + } + chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis) + if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok { + return nil, genesisErr + } + log.Info("Initialised chain configuration", "config", chainConfig) + + eth := &Ethereum{ + config: config, + chainDb: chainDb, + eventMux: ctx.EventMux, + accountManager: ctx.AccountManager, + engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.Miner.Notify, config.Miner.Noverify, chainDb), + shutdownChan: make(chan bool), + networkID: config.NetworkId, + gasPrice: config.Miner.GasPrice, + etherbase: config.Miner.Etherbase, + bloomRequests: make(chan chan *bloombits.Retrieval), + bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), + } + + bcVersion := rawdb.ReadDatabaseVersion(chainDb) + var dbVer = "<nil>" + if bcVersion != nil { + dbVer = fmt.Sprintf("%d", *bcVersion) + } + log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId, "dbversion", dbVer) + + if !config.SkipBcVersionCheck { + if bcVersion != nil && *bcVersion > core.BlockChainVersion { + return nil, fmt.Errorf("database version is v%d, Geth %s only supports v%d", *bcVersion, params.VersionWithMeta, core.BlockChainVersion) + } else if bcVersion == nil || *bcVersion < core.BlockChainVersion { + log.Warn("Upgrade blockchain database version", "from", dbVer, "to", core.BlockChainVersion) + rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion) + } + } + var ( + vmConfig = vm.Config{ + EnablePreimageRecording: config.EnablePreimageRecording, + EWASMInterpreter: config.EWASMInterpreter, + EVMInterpreter: config.EVMInterpreter, + } + cacheConfig = &core.CacheConfig{ + TrieCleanLimit: config.TrieCleanCache, + TrieCleanNoPrefetch: config.NoPrefetch, + TrieDirtyLimit: config.TrieDirtyCache, + TrieDirtyDisabled: config.NoPruning, + TrieTimeLimit: config.TrieTimeout, + } + ) + eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve) + if err != nil { + return nil, err + } + // Rewind the chain in case of an incompatible config upgrade. + if compat, ok := genesisErr.(*params.ConfigCompatError); ok { + log.Warn("Rewinding chain to upgrade configuration", "err", compat) + eth.blockchain.SetHead(compat.RewindTo) + rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig) + } + eth.bloomIndexer.Start(eth.blockchain) + + if config.TxPool.Journal != "" { + config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal) + } + eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain) + + // Permit the downloader to use the trie cache allowance during fast sync + cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + checkpoint := config.Checkpoint + if checkpoint == nil { + checkpoint = params.TrustedCheckpoints[genesisHash] + } + if eth.protocolManager, err = NewProtocolManager(chainConfig, checkpoint, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, cacheLimit, config.Whitelist); err != nil { + return nil, err + } + eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock) + eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) + + eth.APIBackend = &EthAPIBackend{ctx.ExtRPCEnabled(), eth, nil} + gpoParams := config.GPO + if gpoParams.Default == nil { + gpoParams.Default = config.Miner.GasPrice + } + eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams) + + return eth, nil +} + +func makeExtraData(extra []byte) []byte { + if len(extra) == 0 { + // create default extradata + extra, _ = rlp.EncodeToBytes([]interface{}{ + uint(params.VersionMajor<<16 | params.VersionMinor<<8 | params.VersionPatch), + "geth", + runtime.Version(), + runtime.GOOS, + }) + } + if uint64(len(extra)) > params.MaximumExtraDataSize { + log.Warn("Miner extra data exceed limit", "extra", hexutil.Bytes(extra), "limit", params.MaximumExtraDataSize) + extra = nil + } + return extra +} + +// CreateConsensusEngine creates the required type of consensus engine instance for an Ethereum service +func CreateConsensusEngine(ctx *node.ServiceContext, chainConfig *params.ChainConfig, config *ethash.Config, notify []string, noverify bool, db ethdb.Database) consensus.Engine { + // If proof-of-authority is requested, set it up + if chainConfig.Clique != nil { + return clique.New(chainConfig.Clique, db) + } + // Otherwise assume proof-of-work + switch config.PowMode { + case ethash.ModeFake: + log.Warn("Ethash used in fake mode") + return ethash.NewFaker() + case ethash.ModeTest: + log.Warn("Ethash used in test mode") + return ethash.NewTester(nil, noverify) + case ethash.ModeShared: + log.Warn("Ethash used in shared mode") + return ethash.NewShared() + default: + engine := ethash.New(ethash.Config{ + CacheDir: ctx.ResolvePath(config.CacheDir), + CachesInMem: config.CachesInMem, + CachesOnDisk: config.CachesOnDisk, + DatasetDir: config.DatasetDir, + DatasetsInMem: config.DatasetsInMem, + DatasetsOnDisk: config.DatasetsOnDisk, + }, notify, noverify) + engine.SetThreads(-1) // Disable CPU mining + return engine + } +} + +// APIs return the collection of RPC services the ethereum package offers. +// NOTE, some of these services probably need to be moved to somewhere else. +func (s *Ethereum) APIs() []rpc.API { + apis := ethapi.GetAPIs(s.APIBackend) + + // Append any APIs exposed explicitly by the les server + if s.lesServer != nil { + apis = append(apis, s.lesServer.APIs()...) + } + // Append any APIs exposed explicitly by the consensus engine + apis = append(apis, s.engine.APIs(s.BlockChain())...) + + // Append any APIs exposed explicitly by the les server + if s.lesServer != nil { + apis = append(apis, s.lesServer.APIs()...) + } + + // Append all the local APIs and return + return append(apis, []rpc.API{ + { + Namespace: "eth", + Version: "1.0", + Service: NewPublicEthereumAPI(s), + Public: true, + }, { + Namespace: "eth", + Version: "1.0", + Service: NewPublicMinerAPI(s), + Public: true, + }, { + Namespace: "eth", + Version: "1.0", + Service: downloader.NewPublicDownloaderAPI(s.protocolManager.downloader, s.eventMux), + Public: true, + }, { + Namespace: "miner", + Version: "1.0", + Service: NewPrivateMinerAPI(s), + Public: false, + }, { + Namespace: "eth", + Version: "1.0", + Service: filters.NewPublicFilterAPI(s.APIBackend, false), + Public: true, + }, { + Namespace: "admin", + Version: "1.0", + Service: NewPrivateAdminAPI(s), + }, { + Namespace: "debug", + Version: "1.0", + Service: NewPublicDebugAPI(s), + Public: true, + }, { + Namespace: "debug", + Version: "1.0", + Service: NewPrivateDebugAPI(s), + }, { + Namespace: "net", + Version: "1.0", + Service: s.netRPCService, + Public: true, + }, + }...) +} + +func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) { + s.blockchain.ResetWithGenesisBlock(gb) +} + +func (s *Ethereum) Etherbase() (eb common.Address, err error) { + s.lock.RLock() + etherbase := s.etherbase + s.lock.RUnlock() + + if etherbase != (common.Address{}) { + return etherbase, nil + } + if wallets := s.AccountManager().Wallets(); len(wallets) > 0 { + if accounts := wallets[0].Accounts(); len(accounts) > 0 { + etherbase := accounts[0].Address + + s.lock.Lock() + s.etherbase = etherbase + s.lock.Unlock() + + log.Info("Etherbase automatically configured", "address", etherbase) + return etherbase, nil + } + } + return common.Address{}, fmt.Errorf("etherbase must be explicitly specified") +} + +// isLocalBlock checks whether the specified block is mined +// by local miner accounts. +// +// We regard two types of accounts as local miner account: etherbase +// and accounts specified via `txpool.locals` flag. +func (s *Ethereum) isLocalBlock(block *types.Block) bool { + author, err := s.engine.Author(block.Header()) + if err != nil { + log.Warn("Failed to retrieve block author", "number", block.NumberU64(), "hash", block.Hash(), "err", err) + return false + } + // Check whether the given address is etherbase. + s.lock.RLock() + etherbase := s.etherbase + s.lock.RUnlock() + if author == etherbase { + return true + } + // Check whether the given address is specified by `txpool.local` + // CLI flag. + for _, account := range s.config.TxPool.Locals { + if account == author { + return true + } + } + return false +} + +// shouldPreserve checks whether we should preserve the given block +// during the chain reorg depending on whether the author of block +// is a local account. +func (s *Ethereum) shouldPreserve(block *types.Block) bool { + // The reason we need to disable the self-reorg preserving for clique + // is it can be probable to introduce a deadlock. + // + // e.g. If there are 7 available signers + // + // r1 A + // r2 B + // r3 C + // r4 D + // r5 A [X] F G + // r6 [X] + // + // In the round5, the inturn signer E is offline, so the worst case + // is A, F and G sign the block of round5 and reject the block of opponents + // and in the round6, the last available signer B is offline, the whole + // network is stuck. + if _, ok := s.engine.(*clique.Clique); ok { + return false + } + return s.isLocalBlock(block) +} + +// SetEtherbase sets the mining reward address. +func (s *Ethereum) SetEtherbase(etherbase common.Address) { + s.lock.Lock() + s.etherbase = etherbase + s.lock.Unlock() + + s.miner.SetEtherbase(etherbase) +} + +// StartMining starts the miner with the given number of CPU threads. If mining +// is already running, this method adjust the number of threads allowed to use +// and updates the minimum price required by the transaction pool. +func (s *Ethereum) StartMining(threads int) error { + // Update the thread count within the consensus engine + type threaded interface { + SetThreads(threads int) + } + if th, ok := s.engine.(threaded); ok { + log.Info("Updated mining threads", "threads", threads) + if threads == 0 { + threads = -1 // Disable the miner from within + } + th.SetThreads(threads) + } + // If the miner was not running, initialize it + if !s.IsMining() { + // Propagate the initial price point to the transaction pool + s.lock.RLock() + price := s.gasPrice + s.lock.RUnlock() + s.txPool.SetGasPrice(price) + + // Configure the local mining address + eb, err := s.Etherbase() + if err != nil { + log.Error("Cannot start mining without etherbase", "err", err) + return fmt.Errorf("etherbase missing: %v", err) + } + if clique, ok := s.engine.(*clique.Clique); ok { + wallet, err := s.accountManager.Find(accounts.Account{Address: eb}) + if wallet == nil || err != nil { + log.Error("Etherbase account unavailable locally", "err", err) + return fmt.Errorf("signer missing: %v", err) + } + clique.Authorize(eb, wallet.SignData) + } + // If mining is started, we can disable the transaction rejection mechanism + // introduced to speed sync times. + atomic.StoreUint32(&s.protocolManager.acceptTxs, 1) + + go s.miner.Start(eb) + } + return nil +} + +// StopMining terminates the miner, both at the consensus engine level as well as +// at the block creation level. +func (s *Ethereum) StopMining() { + // Update the thread count within the consensus engine + type threaded interface { + SetThreads(threads int) + } + if th, ok := s.engine.(threaded); ok { + th.SetThreads(-1) + } + // Stop the block creating itself + s.miner.Stop() +} + +func (s *Ethereum) IsMining() bool { return s.miner.Mining() } +func (s *Ethereum) Miner() *miner.Miner { return s.miner } + +func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager } +func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain } +func (s *Ethereum) TxPool() *core.TxPool { return s.txPool } +func (s *Ethereum) EventMux() *event.TypeMux { return s.eventMux } +func (s *Ethereum) Engine() consensus.Engine { return s.engine } +func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb } +func (s *Ethereum) IsListening() bool { return true } // Always listening +func (s *Ethereum) EthVersion() int { return int(ProtocolVersions[0]) } +func (s *Ethereum) NetVersion() uint64 { return s.networkID } +func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManager.downloader } +func (s *Ethereum) Synced() bool { return atomic.LoadUint32(&s.protocolManager.acceptTxs) == 1 } +func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning } + +// Protocols implements node.Service, returning all the currently configured +// network protocols to start. +func (s *Ethereum) Protocols() []p2p.Protocol { + protos := make([]p2p.Protocol, len(ProtocolVersions)) + for i, vsn := range ProtocolVersions { + protos[i] = s.protocolManager.makeProtocol(vsn) + protos[i].Attributes = []enr.Entry{s.currentEthEntry()} + } + if s.lesServer != nil { + protos = append(protos, s.lesServer.Protocols()...) + } + return protos +} + +// Start implements node.Service, starting all internal goroutines needed by the +// Ethereum protocol implementation. +func (s *Ethereum) Start(srvr *p2p.Server) error { + s.startEthEntryUpdate(srvr.LocalNode()) + + // Start the bloom bits servicing goroutines + s.startBloomHandlers(params.BloomBitsBlocks) + + // Start the RPC service + s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion()) + + // Figure out a max peers count based on the server limits + maxPeers := srvr.MaxPeers + if s.config.LightServ > 0 { + if s.config.LightPeers >= srvr.MaxPeers { + return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers) + } + maxPeers -= s.config.LightPeers + } + // Start the networking layer and the light server if requested + s.protocolManager.Start(maxPeers) + if s.lesServer != nil { + s.lesServer.Start(srvr) + } + return nil +} + +// Stop implements node.Service, terminating all internal goroutines used by the +// Ethereum protocol. +func (s *Ethereum) Stop() error { + s.bloomIndexer.Close() + s.blockchain.Stop() + s.engine.Close() + s.protocolManager.Stop() + if s.lesServer != nil { + s.lesServer.Stop() + } + s.txPool.Stop() + s.miner.Stop() + s.eventMux.Stop() + + s.chainDb.Close() + close(s.shutdownChan) + return nil +} diff --git a/eth/bloombits.go b/eth/bloombits.go new file mode 100644 index 0000000..9a31997 --- /dev/null +++ b/eth/bloombits.go @@ -0,0 +1,138 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/bitutil" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" +) + +const ( + // bloomServiceThreads is the number of goroutines used globally by an Ethereum + // instance to service bloombits lookups for all running filters. + bloomServiceThreads = 16 + + // bloomFilterThreads is the number of goroutines used locally per filter to + // multiplex requests onto the global servicing goroutines. + bloomFilterThreads = 3 + + // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service + // in a single batch. + bloomRetrievalBatch = 16 + + // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests + // to accumulate request an entire batch (avoiding hysteresis). + bloomRetrievalWait = time.Duration(0) +) + +// startBloomHandlers starts a batch of goroutines to accept bloom bit database +// retrievals from possibly a range of filters and serving the data to satisfy. +func (eth *Ethereum) startBloomHandlers(sectionSize uint64) { + for i := 0; i < bloomServiceThreads; i++ { + go func() { + for { + select { + case <-eth.shutdownChan: + return + + case request := <-eth.bloomRequests: + task := <-request + task.Bitsets = make([][]byte, len(task.Sections)) + for i, section := range task.Sections { + head := rawdb.ReadCanonicalHash(eth.chainDb, (section+1)*sectionSize-1) + if compVector, err := rawdb.ReadBloomBits(eth.chainDb, task.Bit, section, head); err == nil { + if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil { + task.Bitsets[i] = blob + } else { + task.Error = err + } + } else { + task.Error = err + } + } + request <- task + } + } + }() + } +} + +const ( + // bloomThrottling is the time to wait between processing two consecutive index + // sections. It's useful during chain upgrades to prevent disk overload. + bloomThrottling = 100 * time.Millisecond +) + +// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index +// for the Ethereum header bloom filters, permitting blazing fast filtering. +type BloomIndexer struct { + size uint64 // section size to generate bloombits for + db ethdb.Database // database instance to write index data and metadata into + gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index + section uint64 // Section is the section number being processed currently + head common.Hash // Head is the hash of the last header processed +} + +// NewBloomIndexer returns a chain indexer that generates bloom bits data for the +// canonical chain for fast logs filtering. +func NewBloomIndexer(db ethdb.Database, size, confirms uint64) *core.ChainIndexer { + backend := &BloomIndexer{ + db: db, + size: size, + } + table := rawdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix)) + + return core.NewChainIndexer(db, table, backend, size, confirms, bloomThrottling, "bloombits") +} + +// Reset implements core.ChainIndexerBackend, starting a new bloombits index +// section. +func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error { + gen, err := bloombits.NewGenerator(uint(b.size)) + b.gen, b.section, b.head = gen, section, common.Hash{} + return err +} + +// Process implements core.ChainIndexerBackend, adding a new header's bloom into +// the index. +func (b *BloomIndexer) Process(ctx context.Context, header *types.Header) error { + b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom) + b.head = header.Hash() + return nil +} + +// Commit implements core.ChainIndexerBackend, finalizing the bloom section and +// writing it out into the database. +func (b *BloomIndexer) Commit() error { + batch := b.db.NewBatch() + for i := 0; i < types.BloomBitLength; i++ { + bits, err := b.gen.Bitset(uint(i)) + if err != nil { + return err + } + rawdb.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits)) + } + return batch.Write() +} diff --git a/eth/config.go b/eth/config.go new file mode 100644 index 0000000..6887872 --- /dev/null +++ b/eth/config.go @@ -0,0 +1,157 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "math/big" + "os" + "os/user" + "path/filepath" + "runtime" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/eth/gasprice" + "github.com/ethereum/go-ethereum/miner" + "github.com/ethereum/go-ethereum/params" +) + +// DefaultConfig contains default settings for use on the Ethereum main net. +var DefaultConfig = Config{ + SyncMode: downloader.FastSync, + Ethash: ethash.Config{ + CacheDir: "ethash", + CachesInMem: 2, + CachesOnDisk: 3, + DatasetsInMem: 1, + DatasetsOnDisk: 2, + }, + NetworkId: 1, + LightPeers: 100, + UltraLightFraction: 75, + DatabaseCache: 512, + TrieCleanCache: 256, + TrieDirtyCache: 256, + TrieTimeout: 60 * time.Minute, + Miner: miner.Config{ + GasFloor: 8000000, + GasCeil: 8000000, + GasPrice: big.NewInt(params.GWei), + Recommit: 3 * time.Second, + }, + TxPool: core.DefaultTxPoolConfig, + GPO: gasprice.Config{ + Blocks: 20, + Percentile: 60, + }, +} + +func init() { + home := os.Getenv("HOME") + if home == "" { + if user, err := user.Current(); err == nil { + home = user.HomeDir + } + } + if runtime.GOOS == "darwin" { + DefaultConfig.Ethash.DatasetDir = filepath.Join(home, "Library", "Ethash") + } else if runtime.GOOS == "windows" { + localappdata := os.Getenv("LOCALAPPDATA") + if localappdata != "" { + DefaultConfig.Ethash.DatasetDir = filepath.Join(localappdata, "Ethash") + } else { + DefaultConfig.Ethash.DatasetDir = filepath.Join(home, "AppData", "Local", "Ethash") + } + } else { + DefaultConfig.Ethash.DatasetDir = filepath.Join(home, ".ethash") + } +} + +//go:generate gencodec -type Config -formats toml -out gen_config.go + +type Config struct { + // The genesis block, which is inserted if the database is empty. + // If nil, the Ethereum main net block is used. + Genesis *core.Genesis `toml:",omitempty"` + + // Protocol options + NetworkId uint64 // Network ID to use for selecting peers to connect to + SyncMode downloader.SyncMode + + NoPruning bool // Whether to disable pruning and flush everything to disk + NoPrefetch bool // Whether to disable prefetching and only load state on demand + + // Whitelist of required block number -> hash values to accept + Whitelist map[uint64]common.Hash `toml:"-"` + + // Light client options + LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests + LightIngress int `toml:",omitempty"` // Incoming bandwidth limit for light servers + LightEgress int `toml:",omitempty"` // Outgoing bandwidth limit for light servers + LightPeers int `toml:",omitempty"` // Maximum number of LES client peers + + // Ultra Light client options + UltraLightServers []string `toml:",omitempty"` // List of trusted ultra light servers + UltraLightFraction int `toml:",omitempty"` // Percentage of trusted servers to accept an announcement + UltraLightOnlyAnnounce bool `toml:",omitempty"` // Whether to only announce headers, or also serve them + + // Database options + SkipBcVersionCheck bool `toml:"-"` + DatabaseHandles int `toml:"-"` + DatabaseCache int + DatabaseFreezer string + + TrieCleanCache int + TrieDirtyCache int + TrieTimeout time.Duration + + // Mining options + Miner miner.Config + + // Ethash options + Ethash ethash.Config + + // Transaction pool options + TxPool core.TxPoolConfig + + // Gas Price Oracle options + GPO gasprice.Config + + // Enables tracking of SHA3 preimages in the VM + EnablePreimageRecording bool + + // Miscellaneous options + DocRoot string `toml:"-"` + + // Type of the EWASM interpreter ("" for default) + EWASMInterpreter string + + // Type of the EVM interpreter ("" for default) + EVMInterpreter string + + // RPCGasCap is the global gas cap for eth-call variants. + RPCGasCap *big.Int `toml:",omitempty"` + + // Checkpoint is a hardcoded checkpoint which can be nil. + Checkpoint *params.TrustedCheckpoint `toml:",omitempty"` + + // CheckpointOracle is the configuration for checkpoint oracle. + CheckpointOracle *params.CheckpointOracleConfig `toml:",omitempty"` +} diff --git a/eth/enr_entry.go b/eth/enr_entry.go new file mode 100644 index 0000000..d9e7b95 --- /dev/null +++ b/eth/enr_entry.go @@ -0,0 +1,61 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/forkid" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/rlp" +) + +// ethEntry is the "eth" ENR entry which advertises eth protocol +// on the discovery network. +type ethEntry struct { + ForkID forkid.ID // Fork identifier per EIP-2124 + + // Ignore additional fields (for forward compatibility). + Rest []rlp.RawValue `rlp:"tail"` +} + +// ENRKey implements enr.Entry. +func (e ethEntry) ENRKey() string { + return "eth" +} + +func (eth *Ethereum) startEthEntryUpdate(ln *enode.LocalNode) { + var newHead = make(chan core.ChainHeadEvent, 10) + sub := eth.blockchain.SubscribeChainHeadEvent(newHead) + + go func() { + defer sub.Unsubscribe() + for { + select { + case <-newHead: + ln.Set(eth.currentEthEntry()) + case <-sub.Err(): + // Would be nice to sync with eth.Stop, but there is no + // good way to do that. + return + } + } + }() +} + +func (eth *Ethereum) currentEthEntry() *ethEntry { + return ðEntry{ForkID: forkid.NewID(eth.blockchain)} +} diff --git a/eth/gen_config.go b/eth/gen_config.go new file mode 100644 index 0000000..bc4b55b --- /dev/null +++ b/eth/gen_config.go @@ -0,0 +1,221 @@ +// Code generated by github.com/fjl/gencodec. DO NOT EDIT. + +package eth + +import ( + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/eth/gasprice" + "github.com/ethereum/go-ethereum/miner" + "github.com/ethereum/go-ethereum/params" +) + +// MarshalTOML marshals as TOML. +func (c Config) MarshalTOML() (interface{}, error) { + type Config struct { + Genesis *core.Genesis `toml:",omitempty"` + NetworkId uint64 + SyncMode downloader.SyncMode + NoPruning bool + NoPrefetch bool + Whitelist map[uint64]common.Hash `toml:"-"` + LightServ int `toml:",omitempty"` + LightIngress int `toml:",omitempty"` + LightEgress int `toml:",omitempty"` + LightPeers int `toml:",omitempty"` + UltraLightServers []string `toml:",omitempty"` + UltraLightFraction int `toml:",omitempty"` + UltraLightOnlyAnnounce bool `toml:",omitempty"` + SkipBcVersionCheck bool `toml:"-"` + DatabaseHandles int `toml:"-"` + DatabaseCache int + DatabaseFreezer string + TrieCleanCache int + TrieDirtyCache int + TrieTimeout time.Duration + Miner miner.Config + Ethash ethash.Config + TxPool core.TxPoolConfig + GPO gasprice.Config + EnablePreimageRecording bool + DocRoot string `toml:"-"` + EWASMInterpreter string + EVMInterpreter string + RPCGasCap *big.Int `toml:",omitempty"` + Checkpoint *params.TrustedCheckpoint `toml:",omitempty"` + CheckpointOracle *params.CheckpointOracleConfig `toml:",omitempty"` + } + var enc Config + enc.Genesis = c.Genesis + enc.NetworkId = c.NetworkId + enc.SyncMode = c.SyncMode + enc.NoPruning = c.NoPruning + enc.NoPrefetch = c.NoPrefetch + enc.Whitelist = c.Whitelist + enc.LightServ = c.LightServ + enc.LightIngress = c.LightIngress + enc.LightEgress = c.LightEgress + enc.LightPeers = c.LightPeers + enc.UltraLightServers = c.UltraLightServers + enc.UltraLightFraction = c.UltraLightFraction + enc.UltraLightOnlyAnnounce = c.UltraLightOnlyAnnounce + enc.SkipBcVersionCheck = c.SkipBcVersionCheck + enc.DatabaseHandles = c.DatabaseHandles + enc.DatabaseCache = c.DatabaseCache + enc.DatabaseFreezer = c.DatabaseFreezer + enc.TrieCleanCache = c.TrieCleanCache + enc.TrieDirtyCache = c.TrieDirtyCache + enc.TrieTimeout = c.TrieTimeout + enc.Miner = c.Miner + enc.Ethash = c.Ethash + enc.TxPool = c.TxPool + enc.GPO = c.GPO + enc.EnablePreimageRecording = c.EnablePreimageRecording + enc.DocRoot = c.DocRoot + enc.EWASMInterpreter = c.EWASMInterpreter + enc.EVMInterpreter = c.EVMInterpreter + enc.RPCGasCap = c.RPCGasCap + enc.Checkpoint = c.Checkpoint + enc.CheckpointOracle = c.CheckpointOracle + return &enc, nil +} + +// UnmarshalTOML unmarshals from TOML. +func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { + type Config struct { + Genesis *core.Genesis `toml:",omitempty"` + NetworkId *uint64 + SyncMode *downloader.SyncMode + NoPruning *bool + NoPrefetch *bool + Whitelist map[uint64]common.Hash `toml:"-"` + LightServ *int `toml:",omitempty"` + LightIngress *int `toml:",omitempty"` + LightEgress *int `toml:",omitempty"` + LightPeers *int `toml:",omitempty"` + UltraLightServers []string `toml:",omitempty"` + UltraLightFraction *int `toml:",omitempty"` + UltraLightOnlyAnnounce *bool `toml:",omitempty"` + SkipBcVersionCheck *bool `toml:"-"` + DatabaseHandles *int `toml:"-"` + DatabaseCache *int + DatabaseFreezer *string + TrieCleanCache *int + TrieDirtyCache *int + TrieTimeout *time.Duration + Miner *miner.Config + Ethash *ethash.Config + TxPool *core.TxPoolConfig + GPO *gasprice.Config + EnablePreimageRecording *bool + DocRoot *string `toml:"-"` + EWASMInterpreter *string + EVMInterpreter *string + RPCGasCap *big.Int `toml:",omitempty"` + Checkpoint *params.TrustedCheckpoint `toml:",omitempty"` + CheckpointOracle *params.CheckpointOracleConfig `toml:",omitempty"` + } + var dec Config + if err := unmarshal(&dec); err != nil { + return err + } + if dec.Genesis != nil { + c.Genesis = dec.Genesis + } + if dec.NetworkId != nil { + c.NetworkId = *dec.NetworkId + } + if dec.SyncMode != nil { + c.SyncMode = *dec.SyncMode + } + if dec.NoPruning != nil { + c.NoPruning = *dec.NoPruning + } + if dec.NoPrefetch != nil { + c.NoPrefetch = *dec.NoPrefetch + } + if dec.Whitelist != nil { + c.Whitelist = dec.Whitelist + } + if dec.LightServ != nil { + c.LightServ = *dec.LightServ + } + if dec.LightIngress != nil { + c.LightIngress = *dec.LightIngress + } + if dec.LightEgress != nil { + c.LightEgress = *dec.LightEgress + } + if dec.LightPeers != nil { + c.LightPeers = *dec.LightPeers + } + if dec.UltraLightServers != nil { + c.UltraLightServers = dec.UltraLightServers + } + if dec.UltraLightFraction != nil { + c.UltraLightFraction = *dec.UltraLightFraction + } + if dec.UltraLightOnlyAnnounce != nil { + c.UltraLightOnlyAnnounce = *dec.UltraLightOnlyAnnounce + } + if dec.SkipBcVersionCheck != nil { + c.SkipBcVersionCheck = *dec.SkipBcVersionCheck + } + if dec.DatabaseHandles != nil { + c.DatabaseHandles = *dec.DatabaseHandles + } + if dec.DatabaseCache != nil { + c.DatabaseCache = *dec.DatabaseCache + } + if dec.DatabaseFreezer != nil { + c.DatabaseFreezer = *dec.DatabaseFreezer + } + if dec.TrieCleanCache != nil { + c.TrieCleanCache = *dec.TrieCleanCache + } + if dec.TrieDirtyCache != nil { + c.TrieDirtyCache = *dec.TrieDirtyCache + } + if dec.TrieTimeout != nil { + c.TrieTimeout = *dec.TrieTimeout + } + if dec.Miner != nil { + c.Miner = *dec.Miner + } + if dec.Ethash != nil { + c.Ethash = *dec.Ethash + } + if dec.TxPool != nil { + c.TxPool = *dec.TxPool + } + if dec.GPO != nil { + c.GPO = *dec.GPO + } + if dec.EnablePreimageRecording != nil { + c.EnablePreimageRecording = *dec.EnablePreimageRecording + } + if dec.DocRoot != nil { + c.DocRoot = *dec.DocRoot + } + if dec.EWASMInterpreter != nil { + c.EWASMInterpreter = *dec.EWASMInterpreter + } + if dec.EVMInterpreter != nil { + c.EVMInterpreter = *dec.EVMInterpreter + } + if dec.RPCGasCap != nil { + c.RPCGasCap = dec.RPCGasCap + } + if dec.Checkpoint != nil { + c.Checkpoint = dec.Checkpoint + } + if dec.CheckpointOracle != nil { + c.CheckpointOracle = dec.CheckpointOracle + } + return nil +} diff --git a/eth/handler.go b/eth/handler.go new file mode 100644 index 0000000..4ce2d1c --- /dev/null +++ b/eth/handler.go @@ -0,0 +1,844 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "encoding/json" + "errors" + "fmt" + "math" + "math/big" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/eth/fetcher" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" +) + +const ( + softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data. + estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header + + // txChanSize is the size of channel listening to NewTxsEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + + // minimim number of peers to broadcast new blocks to + minBroadcastPeers = 4 +) + +var ( + syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge +) + +func errResp(code errCode, format string, v ...interface{}) error { + return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) +} + +type ProtocolManager struct { + networkID uint64 + + fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks) + acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing) + + checkpointNumber uint64 // Block number for the sync progress validator to cross reference + checkpointHash common.Hash // Block hash for the sync progress validator to cross reference + + txpool txPool + blockchain *core.BlockChain + maxPeers int + + downloader *downloader.Downloader + fetcher *fetcher.Fetcher + peers *peerSet + + eventMux *event.TypeMux + txsCh chan core.NewTxsEvent + txsSub event.Subscription + minedBlockSub *event.TypeMuxSubscription + + whitelist map[uint64]common.Hash + + // channels for fetcher, syncer, txsyncLoop + newPeerCh chan *peer + txsyncCh chan *txsync + quitSync chan struct{} + noMorePeers chan struct{} + + // wait group is used for graceful shutdowns during downloading + // and processing + wg sync.WaitGroup +} + +// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable +// with the Ethereum network. +func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) { + // Create the protocol manager with the base fields + manager := &ProtocolManager{ + networkID: networkID, + eventMux: mux, + txpool: txpool, + blockchain: blockchain, + peers: newPeerSet(), + whitelist: whitelist, + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), + } + if mode == downloader.FullSync { + // The database seems empty as the current block is the genesis. Yet the fast + // block is ahead, so fast sync was enabled for this node at a certain point. + // The scenarios where this can happen is + // * if the user manually (or via a bad block) rolled back a fast sync node + // below the sync point. + // * the last fast sync is not finished while user specifies a full sync this + // time. But we don't have any recent state for full sync. + // In these cases however it's safe to reenable fast sync. + fullBlock, fastBlock := blockchain.CurrentBlock(), blockchain.CurrentFastBlock() + if fullBlock.NumberU64() == 0 && fastBlock.NumberU64() > 0 { + manager.fastSync = uint32(1) + log.Warn("Switch sync mode from full sync to fast sync") + } + } else { + if blockchain.CurrentBlock().NumberU64() > 0 { + // Print warning log if database is not empty to run fast sync. + log.Warn("Switch sync mode from fast sync to full sync") + } else { + // If fast sync was requested and our database is empty, grant it + manager.fastSync = uint32(1) + } + } + // If we have trusted checkpoints, enforce them on the chain + if checkpoint != nil { + manager.checkpointNumber = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1 + manager.checkpointHash = checkpoint.SectionHead + } + + // Construct the downloader (long sync) and its backing state bloom if fast + // sync is requested. The downloader is responsible for deallocating the state + // bloom when it's done. + var stateBloom *trie.SyncBloom + if atomic.LoadUint32(&manager.fastSync) == 1 { + stateBloom = trie.NewSyncBloom(uint64(cacheLimit), chaindb) + } + manager.downloader = downloader.New(manager.checkpointNumber, chaindb, stateBloom, manager.eventMux, blockchain, nil, manager.removePeer) + + // Construct the fetcher (short sync) + validator := func(header *types.Header) error { + return engine.VerifyHeader(blockchain, header, true) + } + heighter := func() uint64 { + return blockchain.CurrentBlock().NumberU64() + } + inserter := func(blocks types.Blocks) (int, error) { + // If sync hasn't reached the checkpoint yet, deny importing weird blocks. + // + // Ideally we would also compare the head block's timestamp and similarly reject + // the propagated block if the head is too old. Unfortunately there is a corner + // case when starting new networks, where the genesis might be ancient (0 unix) + // which would prevent full nodes from accepting it. + if manager.blockchain.CurrentBlock().NumberU64() < manager.checkpointNumber { + log.Warn("Unsynced yet, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash()) + return 0, nil + } + // If fast sync is running, deny importing weird blocks. This is a problematic + // clause when starting up a new network, because fast-syncing miners might not + // accept each others' blocks until a restart. Unfortunately we haven't figured + // out a way yet where nodes can decide unilaterally whether the network is new + // or not. This should be fixed if we figure out a solution. + if atomic.LoadUint32(&manager.fastSync) == 1 { + log.Warn("Fast syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash()) + return 0, nil + } + n, err := manager.blockchain.InsertChain(blocks) + if err == nil { + atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import + } + return n, err + } + manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) + + return manager, nil +} + +func (pm *ProtocolManager) makeProtocol(version uint) p2p.Protocol { + length, ok := protocolLengths[version] + if !ok { + panic("makeProtocol for unknown version") + } + + return p2p.Protocol{ + Name: protocolName, + Version: version, + Length: length, + Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := pm.newPeer(int(version), p, rw) + select { + case pm.newPeerCh <- peer: + pm.wg.Add(1) + defer pm.wg.Done() + return pm.handle(peer) + case <-pm.quitSync: + return p2p.DiscQuitting + } + }, + NodeInfo: func() interface{} { + return pm.NodeInfo() + }, + PeerInfo: func(id enode.ID) interface{} { + if p := pm.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil { + return p.Info() + } + return nil + }, + } +} + +func (pm *ProtocolManager) removePeer(id string) { + // Short circuit if the peer was already removed + peer := pm.peers.Peer(id) + if peer == nil { + return + } + log.Debug("Removing Ethereum peer", "peer", id) + + // Unregister the peer from the downloader and Ethereum peer set + pm.downloader.UnregisterPeer(id) + if err := pm.peers.Unregister(id); err != nil { + log.Error("Peer removal failed", "peer", id, "err", err) + } + // Hard disconnect at the networking layer + if peer != nil { + peer.Peer.Disconnect(p2p.DiscUselessPeer) + } +} + +func (pm *ProtocolManager) Start(maxPeers int) { + pm.maxPeers = maxPeers + + // broadcast transactions + pm.txsCh = make(chan core.NewTxsEvent, txChanSize) + pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) + go pm.txBroadcastLoop() + + // broadcast mined blocks + pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) + go pm.minedBroadcastLoop() + + // start sync handlers + go pm.syncer() + go pm.txsyncLoop() +} + +func (pm *ProtocolManager) Stop() { + log.Info("Stopping Ethereum protocol") + + pm.txsSub.Unsubscribe() // quits txBroadcastLoop + pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + + // Quit the sync loop. + // After this send has completed, no new peers will be accepted. + pm.noMorePeers <- struct{}{} + + // Quit fetcher, txsyncLoop. + close(pm.quitSync) + + // Disconnect existing sessions. + // This also closes the gate for any new registrations on the peer set. + // sessions which are already established but not added to pm.peers yet + // will exit when they try to register. + pm.peers.Close() + + // Wait for all peer handler goroutines and the loops to come down. + pm.wg.Wait() + + log.Info("Ethereum protocol stopped") +} + +func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { + return newPeer(pv, p, newMeteredMsgWriter(rw)) +} + +// handle is the callback invoked to manage the life cycle of an eth peer. When +// this function terminates, the peer is disconnected. +func (pm *ProtocolManager) handle(p *peer) error { + // Ignore maxPeers if this is a trusted peer + if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted { + return p2p.DiscTooManyPeers + } + p.Log().Debug("Ethereum peer connected", "name", p.Name()) + + // Execute the Ethereum handshake + var ( + genesis = pm.blockchain.Genesis() + head = pm.blockchain.CurrentHeader() + hash = head.Hash() + number = head.Number.Uint64() + td = pm.blockchain.GetTd(hash, number) + ) + if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil { + p.Log().Debug("Ethereum handshake failed", "err", err) + return err + } + if rw, ok := p.rw.(*meteredMsgReadWriter); ok { + rw.Init(p.version) + } + // Register the peer locally + if err := pm.peers.Register(p); err != nil { + p.Log().Error("Ethereum peer registration failed", "err", err) + return err + } + defer pm.removePeer(p.id) + + // Register the peer in the downloader. If the downloader considers it banned, we disconnect + if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil { + return err + } + // Propagate existing transactions. new transactions appearing + // after this will be sent via broadcasts. + pm.syncTransactions(p) + + // If we have a trusted CHT, reject all peers below that (avoid fast sync eclipse) + if pm.checkpointHash != (common.Hash{}) { + // Request the peer's checkpoint header for chain height/weight validation + if err := p.RequestHeadersByNumber(pm.checkpointNumber, 1, 0, false); err != nil { + return err + } + // Start a timer to disconnect if the peer doesn't reply in time + p.syncDrop = time.AfterFunc(syncChallengeTimeout, func() { + p.Log().Warn("Checkpoint challenge timed out, dropping", "addr", p.RemoteAddr(), "type", p.Name()) + pm.removePeer(p.id) + }) + // Make sure it's cleaned up if the peer dies off + defer func() { + if p.syncDrop != nil { + p.syncDrop.Stop() + p.syncDrop = nil + } + }() + } + // If we have any explicit whitelist block hashes, request them + for number := range pm.whitelist { + if err := p.RequestHeadersByNumber(number, 1, 0, false); err != nil { + return err + } + } + // Handle incoming messages until the connection is torn down + for { + if err := pm.handleMsg(p); err != nil { + p.Log().Debug("Ethereum message handling failed", "err", err) + return err + } + } +} + +// handleMsg is invoked whenever an inbound message is received from a remote +// peer. The remote connection is torn down upon returning any error. +func (pm *ProtocolManager) handleMsg(p *peer) error { + // Read the next message from the remote peer, and ensure it's fully consumed + msg, err := p.rw.ReadMsg() + if err != nil { + return err + } + if msg.Size > protocolMaxMsgSize { + return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, protocolMaxMsgSize) + } + defer msg.Discard() + + // Handle the message depending on its contents + switch { + case msg.Code == StatusMsg: + // Status messages should never arrive after the handshake + return errResp(ErrExtraStatusMsg, "uncontrolled status message") + + // Block header query, collect the requested headers and reply + case msg.Code == GetBlockHeadersMsg: + // Decode the complex header query + var query getBlockHeadersData + if err := msg.Decode(&query); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + hashMode := query.Origin.Hash != (common.Hash{}) + first := true + maxNonCanonical := uint64(100) + + // Gather headers until the fetch or network limits is reached + var ( + bytes common.StorageSize + headers []*types.Header + unknown bool + ) + for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch { + // Retrieve the next header satisfying the query + var origin *types.Header + if hashMode { + if first { + first = false + origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash) + if origin != nil { + query.Origin.Number = origin.Number.Uint64() + } + } else { + origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number) + } + } else { + origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number) + } + if origin == nil { + break + } + headers = append(headers, origin) + bytes += estHeaderRlpSize + + // Advance to the next header of the query + switch { + case hashMode && query.Reverse: + // Hash based traversal towards the genesis block + ancestor := query.Skip + 1 + if ancestor == 0 { + unknown = true + } else { + query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical) + unknown = (query.Origin.Hash == common.Hash{}) + } + case hashMode && !query.Reverse: + // Hash based traversal towards the leaf block + var ( + current = origin.Number.Uint64() + next = current + query.Skip + 1 + ) + if next <= current { + infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ") + p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos) + unknown = true + } else { + if header := pm.blockchain.GetHeaderByNumber(next); header != nil { + nextHash := header.Hash() + expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical) + if expOldHash == query.Origin.Hash { + query.Origin.Hash, query.Origin.Number = nextHash, next + } else { + unknown = true + } + } else { + unknown = true + } + } + case query.Reverse: + // Number based traversal towards the genesis block + if query.Origin.Number >= query.Skip+1 { + query.Origin.Number -= query.Skip + 1 + } else { + unknown = true + } + + case !query.Reverse: + // Number based traversal towards the leaf block + query.Origin.Number += query.Skip + 1 + } + } + return p.SendBlockHeaders(headers) + + case msg.Code == BlockHeadersMsg: + // A batch of headers arrived to one of our previous requests + var headers []*types.Header + if err := msg.Decode(&headers); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // If no headers were received, but we're expencting a checkpoint header, consider it that + if len(headers) == 0 && p.syncDrop != nil { + // Stop the timer either way, decide later to drop or not + p.syncDrop.Stop() + p.syncDrop = nil + + // If we're doing a fast sync, we must enforce the checkpoint block to avoid + // eclipse attacks. Unsynced nodes are welcome to connect after we're done + // joining the network + if atomic.LoadUint32(&pm.fastSync) == 1 { + p.Log().Warn("Dropping unsynced node during fast sync", "addr", p.RemoteAddr(), "type", p.Name()) + return errors.New("unsynced node cannot serve fast sync") + } + } + // Filter out any explicitly requested headers, deliver the rest to the downloader + filter := len(headers) == 1 + if filter { + // If it's a potential sync progress check, validate the content and advertised chain weight + if p.syncDrop != nil && headers[0].Number.Uint64() == pm.checkpointNumber { + // Disable the sync drop timer + p.syncDrop.Stop() + p.syncDrop = nil + + // Validate the header and either drop the peer or continue + if headers[0].Hash() != pm.checkpointHash { + return errors.New("checkpoint hash mismatch") + } + return nil + } + // Otherwise if it's a whitelisted block, validate against the set + if want, ok := pm.whitelist[headers[0].Number.Uint64()]; ok { + if hash := headers[0].Hash(); want != hash { + p.Log().Info("Whitelist mismatch, dropping peer", "number", headers[0].Number.Uint64(), "hash", hash, "want", want) + return errors.New("whitelist block mismatch") + } + p.Log().Debug("Whitelist block verified", "number", headers[0].Number.Uint64(), "hash", want) + } + // Irrelevant of the fork checks, send the header to the fetcher just in case + headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) + } + if len(headers) > 0 || !filter { + err := pm.downloader.DeliverHeaders(p.id, headers) + if err != nil { + log.Debug("Failed to deliver headers", "err", err) + } + } + + case msg.Code == GetBlockBodiesMsg: + // Decode the retrieval message + msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + if _, err := msgStream.List(); err != nil { + return err + } + // Gather blocks until the fetch or network limits is reached + var ( + hash common.Hash + bytes int + bodies []rlp.RawValue + ) + for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch { + // Retrieve the hash of the next block + if err := msgStream.Decode(&hash); err == rlp.EOL { + break + } else if err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Retrieve the requested block body, stopping if enough was found + if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 { + bodies = append(bodies, data) + bytes += len(data) + } + } + return p.SendBlockBodiesRLP(bodies) + + case msg.Code == BlockBodiesMsg: + // A batch of block bodies arrived to one of our previous requests + var request blockBodiesData + if err := msg.Decode(&request); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Deliver them all to the downloader for queuing + transactions := make([][]*types.Transaction, len(request)) + uncles := make([][]*types.Header, len(request)) + + for i, body := range request { + transactions[i] = body.Transactions + uncles[i] = body.Uncles + } + // Filter out any explicitly requested bodies, deliver the rest to the downloader + filter := len(transactions) > 0 || len(uncles) > 0 + if filter { + transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now()) + } + if len(transactions) > 0 || len(uncles) > 0 || !filter { + err := pm.downloader.DeliverBodies(p.id, transactions, uncles) + if err != nil { + log.Debug("Failed to deliver bodies", "err", err) + } + } + + case p.version >= eth63 && msg.Code == GetNodeDataMsg: + // Decode the retrieval message + msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + if _, err := msgStream.List(); err != nil { + return err + } + // Gather state data until the fetch or network limits is reached + var ( + hash common.Hash + bytes int + data [][]byte + ) + for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch { + // Retrieve the hash of the next state entry + if err := msgStream.Decode(&hash); err == rlp.EOL { + break + } else if err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Retrieve the requested state entry, stopping if enough was found + if entry, err := pm.blockchain.TrieNode(hash); err == nil { + data = append(data, entry) + bytes += len(entry) + } + } + return p.SendNodeData(data) + + case p.version >= eth63 && msg.Code == NodeDataMsg: + // A batch of node state data arrived to one of our previous requests + var data [][]byte + if err := msg.Decode(&data); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Deliver all to the downloader + if err := pm.downloader.DeliverNodeData(p.id, data); err != nil { + log.Debug("Failed to deliver node state data", "err", err) + } + + case p.version >= eth63 && msg.Code == GetReceiptsMsg: + // Decode the retrieval message + msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + if _, err := msgStream.List(); err != nil { + return err + } + // Gather state data until the fetch or network limits is reached + var ( + hash common.Hash + bytes int + receipts []rlp.RawValue + ) + for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch { + // Retrieve the hash of the next block + if err := msgStream.Decode(&hash); err == rlp.EOL { + break + } else if err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Retrieve the requested block's receipts, skipping if unknown to us + results := pm.blockchain.GetReceiptsByHash(hash) + if results == nil { + if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { + continue + } + } + // If known, encode and queue for response packet + if encoded, err := rlp.EncodeToBytes(results); err != nil { + log.Error("Failed to encode receipt", "err", err) + } else { + receipts = append(receipts, encoded) + bytes += len(encoded) + } + } + return p.SendReceiptsRLP(receipts) + + case p.version >= eth63 && msg.Code == ReceiptsMsg: + // A batch of receipts arrived to one of our previous requests + var receipts [][]*types.Receipt + if err := msg.Decode(&receipts); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Deliver all to the downloader + if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil { + log.Debug("Failed to deliver receipts", "err", err) + } + + case msg.Code == NewBlockHashesMsg: + var announces newBlockHashesData + if err := msg.Decode(&announces); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + // Mark the hashes as present at the remote node + for _, block := range announces { + p.MarkBlock(block.Hash) + } + // Schedule all the unknown hashes for retrieval + unknown := make(newBlockHashesData, 0, len(announces)) + for _, block := range announces { + if !pm.blockchain.HasBlock(block.Hash, block.Number) { + unknown = append(unknown, block) + } + } + for _, block := range unknown { + pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies) + } + + case msg.Code == NewBlockMsg: + // Retrieve and decode the propagated block + var request newBlockData + if err := msg.Decode(&request); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + if err := request.sanityCheck(); err != nil { + return err + } + request.Block.ReceivedAt = msg.ReceivedAt + request.Block.ReceivedFrom = p + + // Mark the peer as owning the block and schedule it for import + p.MarkBlock(request.Block.Hash()) + pm.fetcher.Enqueue(p.id, request.Block) + + // Assuming the block is importable by the peer, but possibly not yet done so, + // calculate the head hash and TD that the peer truly must have. + var ( + trueHead = request.Block.ParentHash() + trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty()) + ) + // Update the peer's total difficulty if better than the previous + if _, td := p.Head(); trueTD.Cmp(td) > 0 { + p.SetHead(trueHead, trueTD) + + // Schedule a sync if above ours. Note, this will not fire a sync for a gap of + // a single block (as the true TD is below the propagated block), however this + // scenario should easily be covered by the fetcher. + currentBlock := pm.blockchain.CurrentBlock() + if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 { + go pm.synchronise(p) + } + } + + case msg.Code == TxMsg: + // Transactions arrived, make sure we have a valid and fresh chain to handle them + if atomic.LoadUint32(&pm.acceptTxs) == 0 { + break + } + // Transactions can be processed, parse all of them and deliver to the pool + var txs []*types.Transaction + if err := msg.Decode(&txs); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + for i, tx := range txs { + // Validate and mark the remote transaction + if tx == nil { + return errResp(ErrDecode, "transaction %d is nil", i) + } + p.MarkTransaction(tx.Hash()) + } + pm.txpool.AddRemotes(txs) + + default: + return errResp(ErrInvalidMsgCode, "%v", msg.Code) + } + return nil +} + +// BroadcastBlock will either propagate a block to a subset of it's peers, or +// will only announce it's availability (depending what's requested). +func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { + hash := block.Hash() + peers := pm.peers.PeersWithoutBlock(hash) + + // If propagation is requested, send to a subset of the peer + if propagate { + // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) + var td *big.Int + if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { + td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1)) + } else { + log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) + return + } + // Send the block to a subset of our peers + transferLen := int(math.Sqrt(float64(len(peers)))) + if transferLen < minBroadcastPeers { + transferLen = minBroadcastPeers + } + if transferLen > len(peers) { + transferLen = len(peers) + } + transfer := peers[:transferLen] + for _, peer := range transfer { + peer.AsyncSendNewBlock(block, td) + } + log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) + return + } + // Otherwise if the block is indeed in out own chain, announce it + if pm.blockchain.HasBlock(hash, block.NumberU64()) { + for _, peer := range peers { + peer.AsyncSendNewBlockHash(block) + } + log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) + } +} + +// BroadcastTxs will propagate a batch of transactions to all peers which are not known to +// already have the given transaction. +func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { + var txset = make(map[*peer]types.Transactions) + + // Broadcast transactions to a batch of peers not knowing about it + for _, tx := range txs { + peers := pm.peers.PeersWithoutTx(tx.Hash()) + for _, peer := range peers { + txset[peer] = append(txset[peer], tx) + } + log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers)) + } + // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] + for peer, txs := range txset { + peer.AsyncSendTransactions(txs) + } +} + +// Mined broadcast loop +func (pm *ProtocolManager) minedBroadcastLoop() { + // automatically stops if unsubscribe + for obj := range pm.minedBlockSub.Chan() { + if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { + pm.BroadcastBlock(ev.Block, true) // First propagate block to peers + pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest + } + } +} + +func (pm *ProtocolManager) txBroadcastLoop() { + for { + select { + case event := <-pm.txsCh: + pm.BroadcastTxs(event.Txs) + + // Err() channel will be closed when unsubscribing. + case <-pm.txsSub.Err(): + return + } + } +} + +// NodeInfo represents a short summary of the Ethereum sub-protocol metadata +// known about the host peer. +type NodeInfo struct { + Network uint64 `json:"network"` // Ethereum network ID (1=Frontier, 2=Morden, Ropsten=3, Rinkeby=4) + Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain + Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block + Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules + Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block +} + +// NodeInfo retrieves some protocol metadata about the running host node. +func (pm *ProtocolManager) NodeInfo() *NodeInfo { + currentBlock := pm.blockchain.CurrentBlock() + return &NodeInfo{ + Network: pm.networkID, + Difficulty: pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()), + Genesis: pm.blockchain.Genesis().Hash(), + Config: pm.blockchain.Config(), + Head: currentBlock.Hash(), + } +} diff --git a/eth/metrics.go b/eth/metrics.go new file mode 100644 index 0000000..0533a2a --- /dev/null +++ b/eth/metrics.go @@ -0,0 +1,139 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/p2p" +) + +var ( + propTxnInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/txns/in/packets", nil) + propTxnInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/txns/in/traffic", nil) + propTxnOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/txns/out/packets", nil) + propTxnOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/txns/out/traffic", nil) + propHashInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/hashes/in/packets", nil) + propHashInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/hashes/in/traffic", nil) + propHashOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/hashes/out/packets", nil) + propHashOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/hashes/out/traffic", nil) + propBlockInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/blocks/in/packets", nil) + propBlockInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/blocks/in/traffic", nil) + propBlockOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/blocks/out/packets", nil) + propBlockOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/blocks/out/traffic", nil) + reqHeaderInPacketsMeter = metrics.NewRegisteredMeter("eth/req/headers/in/packets", nil) + reqHeaderInTrafficMeter = metrics.NewRegisteredMeter("eth/req/headers/in/traffic", nil) + reqHeaderOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/headers/out/packets", nil) + reqHeaderOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/headers/out/traffic", nil) + reqBodyInPacketsMeter = metrics.NewRegisteredMeter("eth/req/bodies/in/packets", nil) + reqBodyInTrafficMeter = metrics.NewRegisteredMeter("eth/req/bodies/in/traffic", nil) + reqBodyOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/bodies/out/packets", nil) + reqBodyOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/bodies/out/traffic", nil) + reqStateInPacketsMeter = metrics.NewRegisteredMeter("eth/req/states/in/packets", nil) + reqStateInTrafficMeter = metrics.NewRegisteredMeter("eth/req/states/in/traffic", nil) + reqStateOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/states/out/packets", nil) + reqStateOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/states/out/traffic", nil) + reqReceiptInPacketsMeter = metrics.NewRegisteredMeter("eth/req/receipts/in/packets", nil) + reqReceiptInTrafficMeter = metrics.NewRegisteredMeter("eth/req/receipts/in/traffic", nil) + reqReceiptOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/receipts/out/packets", nil) + reqReceiptOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/receipts/out/traffic", nil) + miscInPacketsMeter = metrics.NewRegisteredMeter("eth/misc/in/packets", nil) + miscInTrafficMeter = metrics.NewRegisteredMeter("eth/misc/in/traffic", nil) + miscOutPacketsMeter = metrics.NewRegisteredMeter("eth/misc/out/packets", nil) + miscOutTrafficMeter = metrics.NewRegisteredMeter("eth/misc/out/traffic", nil) +) + +// meteredMsgReadWriter is a wrapper around a p2p.MsgReadWriter, capable of +// accumulating the above defined metrics based on the data stream contents. +type meteredMsgReadWriter struct { + p2p.MsgReadWriter // Wrapped message stream to meter + version int // Protocol version to select correct meters +} + +// newMeteredMsgWriter wraps a p2p MsgReadWriter with metering support. If the +// metrics system is disabled, this function returns the original object. +func newMeteredMsgWriter(rw p2p.MsgReadWriter) p2p.MsgReadWriter { + if !metrics.Enabled { + return rw + } + return &meteredMsgReadWriter{MsgReadWriter: rw} +} + +// Init sets the protocol version used by the stream to know which meters to +// increment in case of overlapping message ids between protocol versions. +func (rw *meteredMsgReadWriter) Init(version int) { + rw.version = version +} + +func (rw *meteredMsgReadWriter) ReadMsg() (p2p.Msg, error) { + // Read the message and short circuit in case of an error + msg, err := rw.MsgReadWriter.ReadMsg() + if err != nil { + return msg, err + } + // Account for the data traffic + packets, traffic := miscInPacketsMeter, miscInTrafficMeter + switch { + case msg.Code == BlockHeadersMsg: + packets, traffic = reqHeaderInPacketsMeter, reqHeaderInTrafficMeter + case msg.Code == BlockBodiesMsg: + packets, traffic = reqBodyInPacketsMeter, reqBodyInTrafficMeter + + case rw.version >= eth63 && msg.Code == NodeDataMsg: + packets, traffic = reqStateInPacketsMeter, reqStateInTrafficMeter + case rw.version >= eth63 && msg.Code == ReceiptsMsg: + packets, traffic = reqReceiptInPacketsMeter, reqReceiptInTrafficMeter + + case msg.Code == NewBlockHashesMsg: + packets, traffic = propHashInPacketsMeter, propHashInTrafficMeter + case msg.Code == NewBlockMsg: + packets, traffic = propBlockInPacketsMeter, propBlockInTrafficMeter + case msg.Code == TxMsg: + packets, traffic = propTxnInPacketsMeter, propTxnInTrafficMeter + } + packets.Mark(1) + traffic.Mark(int64(msg.Size)) + + return msg, err +} + +func (rw *meteredMsgReadWriter) WriteMsg(msg p2p.Msg) error { + // Account for the data traffic + packets, traffic := miscOutPacketsMeter, miscOutTrafficMeter + switch { + case msg.Code == BlockHeadersMsg: + packets, traffic = reqHeaderOutPacketsMeter, reqHeaderOutTrafficMeter + case msg.Code == BlockBodiesMsg: + packets, traffic = reqBodyOutPacketsMeter, reqBodyOutTrafficMeter + + case rw.version >= eth63 && msg.Code == NodeDataMsg: + packets, traffic = reqStateOutPacketsMeter, reqStateOutTrafficMeter + case rw.version >= eth63 && msg.Code == ReceiptsMsg: + packets, traffic = reqReceiptOutPacketsMeter, reqReceiptOutTrafficMeter + + case msg.Code == NewBlockHashesMsg: + packets, traffic = propHashOutPacketsMeter, propHashOutTrafficMeter + case msg.Code == NewBlockMsg: + packets, traffic = propBlockOutPacketsMeter, propBlockOutTrafficMeter + case msg.Code == TxMsg: + packets, traffic = propTxnOutPacketsMeter, propTxnOutTrafficMeter + } + packets.Mark(1) + traffic.Mark(int64(msg.Size)) + + // Send the packet to the p2p layer + return rw.MsgReadWriter.WriteMsg(msg) +} diff --git a/eth/peer.go b/eth/peer.go new file mode 100644 index 0000000..814c787 --- /dev/null +++ b/eth/peer.go @@ -0,0 +1,546 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "errors" + "fmt" + "math/big" + "sync" + "time" + + mapset "github.com/deckarep/golang-set" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" +) + +var ( + errClosed = errors.New("peer set is closed") + errAlreadyRegistered = errors.New("peer is already registered") + errNotRegistered = errors.New("peer is not registered") +) + +const ( + maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) + maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) + + // maxQueuedTxs is the maximum number of transaction lists to queue up before + // dropping broadcasts. This is a sensitive number as a transaction list might + // contain a single transaction, or thousands. + maxQueuedTxs = 128 + + // maxQueuedProps is the maximum number of block propagations to queue up before + // dropping broadcasts. There's not much point in queueing stale blocks, so a few + // that might cover uncles should be enough. + maxQueuedProps = 4 + + // maxQueuedAnns is the maximum number of block announcements to queue up before + // dropping broadcasts. Similarly to block propagations, there's no point to queue + // above some healthy uncle limit, so use that. + maxQueuedAnns = 4 + + handshakeTimeout = 5 * time.Second +) + +// PeerInfo represents a short summary of the Ethereum sub-protocol metadata known +// about a connected peer. +type PeerInfo struct { + Version int `json:"version"` // Ethereum protocol version negotiated + Difficulty *big.Int `json:"difficulty"` // Total difficulty of the peer's blockchain + Head string `json:"head"` // SHA3 hash of the peer's best owned block +} + +// propEvent is a block propagation, waiting for its turn in the broadcast queue. +type propEvent struct { + block *types.Block + td *big.Int +} + +type peer struct { + id string + + *p2p.Peer + rw p2p.MsgReadWriter + + version int // Protocol version negotiated + syncDrop *time.Timer // Timed connection dropper if sync progress isn't validated in time + + head common.Hash + td *big.Int + lock sync.RWMutex + + knownTxs mapset.Set // Set of transaction hashes known to be known by this peer + knownBlocks mapset.Set // Set of block hashes known to be known by this peer + queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer + queuedProps chan *propEvent // Queue of blocks to broadcast to the peer + queuedAnns chan *types.Block // Queue of blocks to announce to the peer + term chan struct{} // Termination channel to stop the broadcaster +} + +func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { + return &peer{ + Peer: p, + rw: rw, + version: version, + id: fmt.Sprintf("%x", p.ID().Bytes()[:8]), + knownTxs: mapset.NewSet(), + knownBlocks: mapset.NewSet(), + queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), + queuedProps: make(chan *propEvent, maxQueuedProps), + queuedAnns: make(chan *types.Block, maxQueuedAnns), + term: make(chan struct{}), + } +} + +// broadcast is a write loop that multiplexes block propagations, announcements +// and transaction broadcasts into the remote peer. The goal is to have an async +// writer that does not lock up node internals. +func (p *peer) broadcast() { + for { + select { + case txs := <-p.queuedTxs: + if err := p.SendTransactions(txs); err != nil { + return + } + p.Log().Trace("Broadcast transactions", "count", len(txs)) + + case prop := <-p.queuedProps: + if err := p.SendNewBlock(prop.block, prop.td); err != nil { + return + } + p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) + + case block := <-p.queuedAnns: + if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil { + return + } + p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) + + case <-p.term: + return + } + } +} + +// close signals the broadcast goroutine to terminate. +func (p *peer) close() { + close(p.term) +} + +// Info gathers and returns a collection of metadata known about a peer. +func (p *peer) Info() *PeerInfo { + hash, td := p.Head() + + return &PeerInfo{ + Version: p.version, + Difficulty: td, + Head: hash.Hex(), + } +} + +// Head retrieves a copy of the current head hash and total difficulty of the +// peer. +func (p *peer) Head() (hash common.Hash, td *big.Int) { + p.lock.RLock() + defer p.lock.RUnlock() + + copy(hash[:], p.head[:]) + return hash, new(big.Int).Set(p.td) +} + +// SetHead updates the head hash and total difficulty of the peer. +func (p *peer) SetHead(hash common.Hash, td *big.Int) { + p.lock.Lock() + defer p.lock.Unlock() + + copy(p.head[:], hash[:]) + p.td.Set(td) +} + +// MarkBlock marks a block as known for the peer, ensuring that the block will +// never be propagated to this particular peer. +func (p *peer) MarkBlock(hash common.Hash) { + // If we reached the memory allowance, drop a previously known block hash + for p.knownBlocks.Cardinality() >= maxKnownBlocks { + p.knownBlocks.Pop() + } + p.knownBlocks.Add(hash) +} + +// MarkTransaction marks a transaction as known for the peer, ensuring that it +// will never be propagated to this particular peer. +func (p *peer) MarkTransaction(hash common.Hash) { + // If we reached the memory allowance, drop a previously known transaction hash + for p.knownTxs.Cardinality() >= maxKnownTxs { + p.knownTxs.Pop() + } + p.knownTxs.Add(hash) +} + +// SendTransactions sends transactions to the peer and includes the hashes +// in its transaction hash set for future reference. +func (p *peer) SendTransactions(txs types.Transactions) error { + // Mark all the transactions as known, but ensure we don't overflow our limits + for _, tx := range txs { + p.knownTxs.Add(tx.Hash()) + } + for p.knownTxs.Cardinality() >= maxKnownTxs { + p.knownTxs.Pop() + } + return p2p.Send(p.rw, TxMsg, txs) +} + +// AsyncSendTransactions queues list of transactions propagation to a remote +// peer. If the peer's broadcast queue is full, the event is silently dropped. +func (p *peer) AsyncSendTransactions(txs []*types.Transaction) { + select { + case p.queuedTxs <- txs: + // Mark all the transactions as known, but ensure we don't overflow our limits + for _, tx := range txs { + p.knownTxs.Add(tx.Hash()) + } + for p.knownTxs.Cardinality() >= maxKnownTxs { + p.knownTxs.Pop() + } + default: + p.Log().Debug("Dropping transaction propagation", "count", len(txs)) + } +} + +// SendNewBlockHashes announces the availability of a number of blocks through +// a hash notification. +func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { + // Mark all the block hashes as known, but ensure we don't overflow our limits + for _, hash := range hashes { + p.knownBlocks.Add(hash) + } + for p.knownBlocks.Cardinality() >= maxKnownBlocks { + p.knownBlocks.Pop() + } + request := make(newBlockHashesData, len(hashes)) + for i := 0; i < len(hashes); i++ { + request[i].Hash = hashes[i] + request[i].Number = numbers[i] + } + return p2p.Send(p.rw, NewBlockHashesMsg, request) +} + +// AsyncSendNewBlockHash queues the availability of a block for propagation to a +// remote peer. If the peer's broadcast queue is full, the event is silently +// dropped. +func (p *peer) AsyncSendNewBlockHash(block *types.Block) { + select { + case p.queuedAnns <- block: + // Mark all the block hash as known, but ensure we don't overflow our limits + p.knownBlocks.Add(block.Hash()) + for p.knownBlocks.Cardinality() >= maxKnownBlocks { + p.knownBlocks.Pop() + } + default: + p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash()) + } +} + +// SendNewBlock propagates an entire block to a remote peer. +func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { + // Mark all the block hash as known, but ensure we don't overflow our limits + p.knownBlocks.Add(block.Hash()) + for p.knownBlocks.Cardinality() >= maxKnownBlocks { + p.knownBlocks.Pop() + } + return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) +} + +// AsyncSendNewBlock queues an entire block for propagation to a remote peer. If +// the peer's broadcast queue is full, the event is silently dropped. +func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { + select { + case p.queuedProps <- &propEvent{block: block, td: td}: + // Mark all the block hash as known, but ensure we don't overflow our limits + p.knownBlocks.Add(block.Hash()) + for p.knownBlocks.Cardinality() >= maxKnownBlocks { + p.knownBlocks.Pop() + } + default: + p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash()) + } +} + +// SendBlockHeaders sends a batch of block headers to the remote peer. +func (p *peer) SendBlockHeaders(headers []*types.Header) error { + return p2p.Send(p.rw, BlockHeadersMsg, headers) +} + +// SendBlockBodies sends a batch of block contents to the remote peer. +func (p *peer) SendBlockBodies(bodies []*blockBody) error { + return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesData(bodies)) +} + +// SendBlockBodiesRLP sends a batch of block contents to the remote peer from +// an already RLP encoded format. +func (p *peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error { + return p2p.Send(p.rw, BlockBodiesMsg, bodies) +} + +// SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the +// hashes requested. +func (p *peer) SendNodeData(data [][]byte) error { + return p2p.Send(p.rw, NodeDataMsg, data) +} + +// SendReceiptsRLP sends a batch of transaction receipts, corresponding to the +// ones requested from an already RLP encoded format. +func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error { + return p2p.Send(p.rw, ReceiptsMsg, receipts) +} + +// RequestOneHeader is a wrapper around the header query functions to fetch a +// single header. It is used solely by the fetcher. +func (p *peer) RequestOneHeader(hash common.Hash) error { + p.Log().Debug("Fetching single header", "hash", hash) + return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false}) +} + +// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the +// specified header query, based on the hash of an origin block. +func (p *peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error { + p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse) + return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) +} + +// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the +// specified header query, based on the number of an origin block. +func (p *peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error { + p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse) + return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) +} + +// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes +// specified. +func (p *peer) RequestBodies(hashes []common.Hash) error { + p.Log().Debug("Fetching batch of block bodies", "count", len(hashes)) + return p2p.Send(p.rw, GetBlockBodiesMsg, hashes) +} + +// RequestNodeData fetches a batch of arbitrary data from a node's known state +// data, corresponding to the specified hashes. +func (p *peer) RequestNodeData(hashes []common.Hash) error { + p.Log().Debug("Fetching batch of state data", "count", len(hashes)) + return p2p.Send(p.rw, GetNodeDataMsg, hashes) +} + +// RequestReceipts fetches a batch of transaction receipts from a remote node. +func (p *peer) RequestReceipts(hashes []common.Hash) error { + p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) + return p2p.Send(p.rw, GetReceiptsMsg, hashes) +} + +// Handshake executes the eth protocol handshake, negotiating version number, +// network IDs, difficulties, head and genesis blocks. +func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash) error { + // Send out own handshake in a new thread + errc := make(chan error, 2) + var status statusData // safe to read after two values have been received from errc + + go func() { + errc <- p2p.Send(p.rw, StatusMsg, &statusData{ + ProtocolVersion: uint32(p.version), + NetworkId: network, + TD: td, + CurrentBlock: head, + GenesisBlock: genesis, + }) + }() + go func() { + errc <- p.readStatus(network, &status, genesis) + }() + timeout := time.NewTimer(handshakeTimeout) + defer timeout.Stop() + for i := 0; i < 2; i++ { + select { + case err := <-errc: + if err != nil { + return err + } + case <-timeout.C: + return p2p.DiscReadTimeout + } + } + p.td, p.head = status.TD, status.CurrentBlock + return nil +} + +func (p *peer) readStatus(network uint64, status *statusData, genesis common.Hash) (err error) { + msg, err := p.rw.ReadMsg() + if err != nil { + return err + } + if msg.Code != StatusMsg { + return errResp(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg) + } + if msg.Size > protocolMaxMsgSize { + return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, protocolMaxMsgSize) + } + // Decode the handshake and make sure everything matches + if err := msg.Decode(&status); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + if status.GenesisBlock != genesis { + return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock[:8], genesis[:8]) + } + if status.NetworkId != network { + return errResp(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, network) + } + if int(status.ProtocolVersion) != p.version { + return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, p.version) + } + return nil +} + +// String implements fmt.Stringer. +func (p *peer) String() string { + return fmt.Sprintf("Peer %s [%s]", p.id, + fmt.Sprintf("eth/%2d", p.version), + ) +} + +// peerSet represents the collection of active peers currently participating in +// the Ethereum sub-protocol. +type peerSet struct { + peers map[string]*peer + lock sync.RWMutex + closed bool +} + +// newPeerSet creates a new peer set to track the active participants. +func newPeerSet() *peerSet { + return &peerSet{ + peers: make(map[string]*peer), + } +} + +// Register injects a new peer into the working set, or returns an error if the +// peer is already known. If a new peer it registered, its broadcast loop is also +// started. +func (ps *peerSet) Register(p *peer) error { + ps.lock.Lock() + defer ps.lock.Unlock() + + if ps.closed { + return errClosed + } + if _, ok := ps.peers[p.id]; ok { + return errAlreadyRegistered + } + ps.peers[p.id] = p + go p.broadcast() + + return nil +} + +// Unregister removes a remote peer from the active set, disabling any further +// actions to/from that particular entity. +func (ps *peerSet) Unregister(id string) error { + ps.lock.Lock() + defer ps.lock.Unlock() + + p, ok := ps.peers[id] + if !ok { + return errNotRegistered + } + delete(ps.peers, id) + p.close() + + return nil +} + +// Peer retrieves the registered peer with the given id. +func (ps *peerSet) Peer(id string) *peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return ps.peers[id] +} + +// Len returns if the current number of peers in the set. +func (ps *peerSet) Len() int { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return len(ps.peers) +} + +// PeersWithoutBlock retrieves a list of peers that do not have a given block in +// their set of known hashes. +func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.knownBlocks.Contains(hash) { + list = append(list, p) + } + } + return list +} + +// PeersWithoutTx retrieves a list of peers that do not have a given transaction +// in their set of known hashes. +func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.knownTxs.Contains(hash) { + list = append(list, p) + } + } + return list +} + +// BestPeer retrieves the known peer with the currently highest total difficulty. +func (ps *peerSet) BestPeer() *peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + var ( + bestPeer *peer + bestTd *big.Int + ) + for _, p := range ps.peers { + if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 { + bestPeer, bestTd = p, td + } + } + return bestPeer +} + +// Close disconnects all peers. +// No new peers can be registered after Close has returned. +func (ps *peerSet) Close() { + ps.lock.Lock() + defer ps.lock.Unlock() + + for _, p := range ps.peers { + p.Disconnect(p2p.DiscQuitting) + } + ps.closed = true +} diff --git a/eth/protocol.go b/eth/protocol.go new file mode 100644 index 0000000..de0c979 --- /dev/null +++ b/eth/protocol.go @@ -0,0 +1,196 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "fmt" + "io" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/rlp" +) + +// Constants to match up protocol versions and messages +const ( + eth62 = 62 + eth63 = 63 +) + +// protocolName is the official short name of the protocol used during capability negotiation. +const protocolName = "eth" + +// ProtocolVersions are the supported versions of the eth protocol (first is primary). +var ProtocolVersions = []uint{eth63} + +// protocolLengths are the number of implemented message corresponding to different protocol versions. +var protocolLengths = map[uint]uint64{eth63: 17, eth62: 8} + +const protocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message + +// eth protocol message codes +const ( + // Protocol messages belonging to eth/62 + StatusMsg = 0x00 + NewBlockHashesMsg = 0x01 + TxMsg = 0x02 + GetBlockHeadersMsg = 0x03 + BlockHeadersMsg = 0x04 + GetBlockBodiesMsg = 0x05 + BlockBodiesMsg = 0x06 + NewBlockMsg = 0x07 + + // Protocol messages belonging to eth/63 + GetNodeDataMsg = 0x0d + NodeDataMsg = 0x0e + GetReceiptsMsg = 0x0f + ReceiptsMsg = 0x10 +) + +type errCode int + +const ( + ErrMsgTooLarge = iota + ErrDecode + ErrInvalidMsgCode + ErrProtocolVersionMismatch + ErrNetworkIdMismatch + ErrGenesisBlockMismatch + ErrNoStatusMsg + ErrExtraStatusMsg + ErrSuspendedPeer +) + +func (e errCode) String() string { + return errorToString[int(e)] +} + +// XXX change once legacy code is out +var errorToString = map[int]string{ + ErrMsgTooLarge: "Message too long", + ErrDecode: "Invalid message", + ErrInvalidMsgCode: "Invalid message code", + ErrProtocolVersionMismatch: "Protocol version mismatch", + ErrNetworkIdMismatch: "NetworkId mismatch", + ErrGenesisBlockMismatch: "Genesis block mismatch", + ErrNoStatusMsg: "No status message", + ErrExtraStatusMsg: "Extra status message", + ErrSuspendedPeer: "Suspended peer", +} + +type txPool interface { + // AddRemotes should add the given transactions to the pool. + AddRemotes([]*types.Transaction) []error + + // Pending should return pending transactions. + // The slice should be modifiable by the caller. + Pending() (map[common.Address]types.Transactions, error) + + // SubscribeNewTxsEvent should return an event subscription of + // NewTxsEvent and send events to the given channel. + SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription +} + +// statusData is the network packet for the status message. +type statusData struct { + ProtocolVersion uint32 + NetworkId uint64 + TD *big.Int + CurrentBlock common.Hash + GenesisBlock common.Hash +} + +// newBlockHashesData is the network packet for the block announcements. +type newBlockHashesData []struct { + Hash common.Hash // Hash of one particular block being announced + Number uint64 // Number of one particular block being announced +} + +// getBlockHeadersData represents a block header query. +type getBlockHeadersData struct { + Origin hashOrNumber // Block from which to retrieve headers + Amount uint64 // Maximum number of headers to retrieve + Skip uint64 // Blocks to skip between consecutive headers + Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis) +} + +// hashOrNumber is a combined field for specifying an origin block. +type hashOrNumber struct { + Hash common.Hash // Block hash from which to retrieve headers (excludes Number) + Number uint64 // Block hash from which to retrieve headers (excludes Hash) +} + +// EncodeRLP is a specialized encoder for hashOrNumber to encode only one of the +// two contained union fields. +func (hn *hashOrNumber) EncodeRLP(w io.Writer) error { + if hn.Hash == (common.Hash{}) { + return rlp.Encode(w, hn.Number) + } + if hn.Number != 0 { + return fmt.Errorf("both origin hash (%x) and number (%d) provided", hn.Hash, hn.Number) + } + return rlp.Encode(w, hn.Hash) +} + +// DecodeRLP is a specialized decoder for hashOrNumber to decode the contents +// into either a block hash or a block number. +func (hn *hashOrNumber) DecodeRLP(s *rlp.Stream) error { + _, size, _ := s.Kind() + origin, err := s.Raw() + if err == nil { + switch { + case size == 32: + err = rlp.DecodeBytes(origin, &hn.Hash) + case size <= 8: + err = rlp.DecodeBytes(origin, &hn.Number) + default: + err = fmt.Errorf("invalid input size %d for origin", size) + } + } + return err +} + +// newBlockData is the network packet for the block propagation message. +type newBlockData struct { + Block *types.Block + TD *big.Int +} + +// sanityCheck verifies that the values are reasonable, as a DoS protection +func (request *newBlockData) sanityCheck() error { + if err := request.Block.SanityCheck(); err != nil { + return err + } + //TD at mainnet block #7753254 is 76 bits. If it becomes 100 million times + // larger, it will still fit within 100 bits + if tdlen := request.TD.BitLen(); tdlen > 100 { + return fmt.Errorf("too large block TD: bitlen %d", tdlen) + } + return nil +} + +// blockBody represents the data content of a single block. +type blockBody struct { + Transactions []*types.Transaction // Transactions contained within a block + Uncles []*types.Header // Uncles contained within a block +} + +// blockBodiesData is the network packet for block content distribution. +type blockBodiesData []*blockBody diff --git a/eth/sync.go b/eth/sync.go new file mode 100644 index 0000000..9e180ee --- /dev/null +++ b/eth/sync.go @@ -0,0 +1,216 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "math/rand" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +const ( + forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available + minDesiredPeerCount = 5 // Amount of peers desired to start syncing + + // This is the target size for the packs of transactions sent by txsyncLoop. + // A pack can get larger than this if a single transactions exceeds this size. + txsyncPackSize = 100 * 1024 +) + +type txsync struct { + p *peer + txs []*types.Transaction +} + +// syncTransactions starts sending all currently pending transactions to the given peer. +func (pm *ProtocolManager) syncTransactions(p *peer) { + var txs types.Transactions + pending, _ := pm.txpool.Pending() + for _, batch := range pending { + txs = append(txs, batch...) + } + if len(txs) == 0 { + return + } + select { + case pm.txsyncCh <- &txsync{p, txs}: + case <-pm.quitSync: + } +} + +// txsyncLoop takes care of the initial transaction sync for each new +// connection. When a new peer appears, we relay all currently pending +// transactions. In order to minimise egress bandwidth usage, we send +// the transactions in small packs to one peer at a time. +func (pm *ProtocolManager) txsyncLoop() { + var ( + pending = make(map[enode.ID]*txsync) + sending = false // whether a send is active + pack = new(txsync) // the pack that is being sent + done = make(chan error, 1) // result of the send + ) + + // send starts a sending a pack of transactions from the sync. + send := func(s *txsync) { + // Fill pack with transactions up to the target size. + size := common.StorageSize(0) + pack.p = s.p + pack.txs = pack.txs[:0] + for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ { + pack.txs = append(pack.txs, s.txs[i]) + size += s.txs[i].Size() + } + // Remove the transactions that will be sent. + s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])] + if len(s.txs) == 0 { + delete(pending, s.p.ID()) + } + // Send the pack in the background. + s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size) + sending = true + go func() { done <- pack.p.SendTransactions(pack.txs) }() + } + + // pick chooses the next pending sync. + pick := func() *txsync { + if len(pending) == 0 { + return nil + } + n := rand.Intn(len(pending)) + 1 + for _, s := range pending { + if n--; n == 0 { + return s + } + } + return nil + } + + for { + select { + case s := <-pm.txsyncCh: + pending[s.p.ID()] = s + if !sending { + send(s) + } + case err := <-done: + sending = false + // Stop tracking peers that cause send failures. + if err != nil { + pack.p.Log().Debug("Transaction send failed", "err", err) + delete(pending, pack.p.ID()) + } + // Schedule the next send. + if s := pick(); s != nil { + send(s) + } + case <-pm.quitSync: + return + } + } +} + +// syncer is responsible for periodically synchronising with the network, both +// downloading hashes and blocks as well as handling the announcement handler. +func (pm *ProtocolManager) syncer() { + // Start and ensure cleanup of sync mechanisms + pm.fetcher.Start() + defer pm.fetcher.Stop() + defer pm.downloader.Terminate() + + // Wait for different events to fire synchronisation operations + forceSync := time.NewTicker(forceSyncCycle) + defer forceSync.Stop() + + for { + select { + case <-pm.newPeerCh: + // Make sure we have peers to select from, then sync + if pm.peers.Len() < minDesiredPeerCount { + break + } + go pm.synchronise(pm.peers.BestPeer()) + + case <-forceSync.C: + // Force a sync even if not enough peers are present + go pm.synchronise(pm.peers.BestPeer()) + + case <-pm.noMorePeers: + return + } + } +} + +// synchronise tries to sync up our local block chain with a remote peer. +func (pm *ProtocolManager) synchronise(peer *peer) { + // Short circuit if no peers are available + if peer == nil { + return + } + // Make sure the peer's TD is higher than our own + currentBlock := pm.blockchain.CurrentBlock() + td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) + + pHead, pTd := peer.Head() + if pTd.Cmp(td) <= 0 { + return + } + // Otherwise try to sync with the downloader + mode := downloader.FullSync + if atomic.LoadUint32(&pm.fastSync) == 1 { + // Fast sync was explicitly requested, and explicitly granted + mode = downloader.FastSync + } + if mode == downloader.FastSync { + // Make sure the peer's total difficulty we are synchronizing is higher. + if pm.blockchain.GetTdByHash(pm.blockchain.CurrentFastBlock().Hash()).Cmp(pTd) >= 0 { + return + } + } + // Run the sync cycle, and disable fast sync if we've went past the pivot block + if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil { + return + } + if atomic.LoadUint32(&pm.fastSync) == 1 { + log.Info("Fast sync complete, auto disabling") + atomic.StoreUint32(&pm.fastSync, 0) + } + // If we've successfully finished a sync cycle and passed any required checkpoint, + // enable accepting transactions from the network. + head := pm.blockchain.CurrentBlock() + if head.NumberU64() >= pm.checkpointNumber { + // Checkpoint passed, sanity check the timestamp to have a fallback mechanism + // for non-checkpointed (number = 0) private networks. + if head.Time() >= uint64(time.Now().AddDate(0, -1, 0).Unix()) { + atomic.StoreUint32(&pm.acceptTxs, 1) + } + } + if head.NumberU64() > 0 { + // We've completed a sync cycle, notify all peers of new state. This path is + // essential in star-topology networks where a gateway node needs to notify + // all its out-of-date peers of the availability of a new block. This failure + // scenario will most often crop up in private and hackathon networks with + // degenerate connectivity, but it should be healthy for the mainnet too to + // more reliably update peers or the local TD state. + go pm.BroadcastBlock(head, false) + } +} |