aboutsummaryrefslogtreecommitdiff
path: root/eth
diff options
context:
space:
mode:
Diffstat (limited to 'eth')
-rw-r--r--eth/api.go533
-rw-r--r--eth/api_backend.go244
-rw-r--r--eth/api_tracer.go823
-rw-r--r--eth/backend.go566
-rw-r--r--eth/bloombits.go138
-rw-r--r--eth/config.go157
-rw-r--r--eth/enr_entry.go61
-rw-r--r--eth/gen_config.go221
-rw-r--r--eth/handler.go844
-rw-r--r--eth/metrics.go139
-rw-r--r--eth/peer.go546
-rw-r--r--eth/protocol.go196
-rw-r--r--eth/sync.go216
13 files changed, 4684 insertions, 0 deletions
diff --git a/eth/api.go b/eth/api.go
new file mode 100644
index 0000000..34f41db
--- /dev/null
+++ b/eth/api.go
@@ -0,0 +1,533 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "compress/gzip"
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "math/big"
+ "os"
+ "runtime"
+ "strings"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/rawdb"
+ "github.com/ethereum/go-ethereum/core/state"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/Determinant/coreth/internal/ethapi"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethereum/go-ethereum/trie"
+)
+
+// PublicEthereumAPI provides an API to access Ethereum full node-related
+// information.
+type PublicEthereumAPI struct {
+ e *Ethereum
+}
+
+// NewPublicEthereumAPI creates a new Ethereum protocol API for full nodes.
+func NewPublicEthereumAPI(e *Ethereum) *PublicEthereumAPI {
+ return &PublicEthereumAPI{e}
+}
+
+// Etherbase is the address that mining rewards will be send to
+func (api *PublicEthereumAPI) Etherbase() (common.Address, error) {
+ return api.e.Etherbase()
+}
+
+// Coinbase is the address that mining rewards will be send to (alias for Etherbase)
+func (api *PublicEthereumAPI) Coinbase() (common.Address, error) {
+ return api.Etherbase()
+}
+
+// Hashrate returns the POW hashrate
+func (api *PublicEthereumAPI) Hashrate() hexutil.Uint64 {
+ return hexutil.Uint64(api.e.Miner().HashRate())
+}
+
+// ChainId is the EIP-155 replay-protection chain id for the current ethereum chain config.
+func (api *PublicEthereumAPI) ChainId() hexutil.Uint64 {
+ chainID := new(big.Int)
+ if config := api.e.blockchain.Config(); config.IsEIP155(api.e.blockchain.CurrentBlock().Number()) {
+ chainID = config.ChainID
+ }
+ return (hexutil.Uint64)(chainID.Uint64())
+}
+
+// PublicMinerAPI provides an API to control the miner.
+// It offers only methods that operate on data that pose no security risk when it is publicly accessible.
+type PublicMinerAPI struct {
+ e *Ethereum
+}
+
+// NewPublicMinerAPI create a new PublicMinerAPI instance.
+func NewPublicMinerAPI(e *Ethereum) *PublicMinerAPI {
+ return &PublicMinerAPI{e}
+}
+
+// Mining returns an indication if this node is currently mining.
+func (api *PublicMinerAPI) Mining() bool {
+ return api.e.IsMining()
+}
+
+// PrivateMinerAPI provides private RPC methods to control the miner.
+// These methods can be abused by external users and must be considered insecure for use by untrusted users.
+type PrivateMinerAPI struct {
+ e *Ethereum
+}
+
+// NewPrivateMinerAPI create a new RPC service which controls the miner of this node.
+func NewPrivateMinerAPI(e *Ethereum) *PrivateMinerAPI {
+ return &PrivateMinerAPI{e: e}
+}
+
+// Start starts the miner with the given number of threads. If threads is nil,
+// the number of workers started is equal to the number of logical CPUs that are
+// usable by this process. If mining is already running, this method adjust the
+// number of threads allowed to use and updates the minimum price required by the
+// transaction pool.
+func (api *PrivateMinerAPI) Start(threads *int) error {
+ if threads == nil {
+ return api.e.StartMining(runtime.NumCPU())
+ }
+ return api.e.StartMining(*threads)
+}
+
+// Stop terminates the miner, both at the consensus engine level as well as at
+// the block creation level.
+func (api *PrivateMinerAPI) Stop() {
+ api.e.StopMining()
+}
+
+// SetExtra sets the extra data string that is included when this miner mines a block.
+func (api *PrivateMinerAPI) SetExtra(extra string) (bool, error) {
+ if err := api.e.Miner().SetExtra([]byte(extra)); err != nil {
+ return false, err
+ }
+ return true, nil
+}
+
+// SetGasPrice sets the minimum accepted gas price for the miner.
+func (api *PrivateMinerAPI) SetGasPrice(gasPrice hexutil.Big) bool {
+ api.e.lock.Lock()
+ api.e.gasPrice = (*big.Int)(&gasPrice)
+ api.e.lock.Unlock()
+
+ api.e.txPool.SetGasPrice((*big.Int)(&gasPrice))
+ return true
+}
+
+// SetEtherbase sets the etherbase of the miner
+func (api *PrivateMinerAPI) SetEtherbase(etherbase common.Address) bool {
+ api.e.SetEtherbase(etherbase)
+ return true
+}
+
+// SetRecommitInterval updates the interval for miner sealing work recommitting.
+func (api *PrivateMinerAPI) SetRecommitInterval(interval int) {
+ api.e.Miner().SetRecommitInterval(time.Duration(interval) * time.Millisecond)
+}
+
+// GetHashrate returns the current hashrate of the miner.
+func (api *PrivateMinerAPI) GetHashrate() uint64 {
+ return api.e.miner.HashRate()
+}
+
+// PrivateAdminAPI is the collection of Ethereum full node-related APIs
+// exposed over the private admin endpoint.
+type PrivateAdminAPI struct {
+ eth *Ethereum
+}
+
+// NewPrivateAdminAPI creates a new API definition for the full node private
+// admin methods of the Ethereum service.
+func NewPrivateAdminAPI(eth *Ethereum) *PrivateAdminAPI {
+ return &PrivateAdminAPI{eth: eth}
+}
+
+// ExportChain exports the current blockchain into a local file.
+func (api *PrivateAdminAPI) ExportChain(file string) (bool, error) {
+ // Make sure we can create the file to export into
+ out, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.ModePerm)
+ if err != nil {
+ return false, err
+ }
+ defer out.Close()
+
+ var writer io.Writer = out
+ if strings.HasSuffix(file, ".gz") {
+ writer = gzip.NewWriter(writer)
+ defer writer.(*gzip.Writer).Close()
+ }
+
+ // Export the blockchain
+ if err := api.eth.BlockChain().Export(writer); err != nil {
+ return false, err
+ }
+ return true, nil
+}
+
+func hasAllBlocks(chain *core.BlockChain, bs []*types.Block) bool {
+ for _, b := range bs {
+ if !chain.HasBlock(b.Hash(), b.NumberU64()) {
+ return false
+ }
+ }
+
+ return true
+}
+
+// ImportChain imports a blockchain from a local file.
+func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) {
+ // Make sure the can access the file to import
+ in, err := os.Open(file)
+ if err != nil {
+ return false, err
+ }
+ defer in.Close()
+
+ var reader io.Reader = in
+ if strings.HasSuffix(file, ".gz") {
+ if reader, err = gzip.NewReader(reader); err != nil {
+ return false, err
+ }
+ }
+
+ // Run actual the import in pre-configured batches
+ stream := rlp.NewStream(reader, 0)
+
+ blocks, index := make([]*types.Block, 0, 2500), 0
+ for batch := 0; ; batch++ {
+ // Load a batch of blocks from the input file
+ for len(blocks) < cap(blocks) {
+ block := new(types.Block)
+ if err := stream.Decode(block); err == io.EOF {
+ break
+ } else if err != nil {
+ return false, fmt.Errorf("block %d: failed to parse: %v", index, err)
+ }
+ blocks = append(blocks, block)
+ index++
+ }
+ if len(blocks) == 0 {
+ break
+ }
+
+ if hasAllBlocks(api.eth.BlockChain(), blocks) {
+ blocks = blocks[:0]
+ continue
+ }
+ // Import the batch and reset the buffer
+ if _, err := api.eth.BlockChain().InsertChain(blocks); err != nil {
+ return false, fmt.Errorf("batch %d: failed to insert: %v", batch, err)
+ }
+ blocks = blocks[:0]
+ }
+ return true, nil
+}
+
+// PublicDebugAPI is the collection of Ethereum full node APIs exposed
+// over the public debugging endpoint.
+type PublicDebugAPI struct {
+ eth *Ethereum
+}
+
+// NewPublicDebugAPI creates a new API definition for the full node-
+// related public debug methods of the Ethereum service.
+func NewPublicDebugAPI(eth *Ethereum) *PublicDebugAPI {
+ return &PublicDebugAPI{eth: eth}
+}
+
+// DumpBlock retrieves the entire state of the database at a given block.
+func (api *PublicDebugAPI) DumpBlock(blockNr rpc.BlockNumber) (state.Dump, error) {
+ if blockNr == rpc.PendingBlockNumber {
+ // If we're dumping the pending state, we need to request
+ // both the pending block as well as the pending state from
+ // the miner and operate on those
+ _, stateDb := api.eth.miner.Pending()
+ return stateDb.RawDump(false, false, true), nil
+ }
+ var block *types.Block
+ if blockNr == rpc.LatestBlockNumber {
+ block = api.eth.blockchain.CurrentBlock()
+ } else {
+ block = api.eth.blockchain.GetBlockByNumber(uint64(blockNr))
+ }
+ if block == nil {
+ return state.Dump{}, fmt.Errorf("block #%d not found", blockNr)
+ }
+ stateDb, err := api.eth.BlockChain().StateAt(block.Root())
+ if err != nil {
+ return state.Dump{}, err
+ }
+ return stateDb.RawDump(false, false, true), nil
+}
+
+// PrivateDebugAPI is the collection of Ethereum full node APIs exposed over
+// the private debugging endpoint.
+type PrivateDebugAPI struct {
+ eth *Ethereum
+}
+
+// NewPrivateDebugAPI creates a new API definition for the full node-related
+// private debug methods of the Ethereum service.
+func NewPrivateDebugAPI(eth *Ethereum) *PrivateDebugAPI {
+ return &PrivateDebugAPI{eth: eth}
+}
+
+// Preimage is a debug API function that returns the preimage for a sha3 hash, if known.
+func (api *PrivateDebugAPI) Preimage(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) {
+ if preimage := rawdb.ReadPreimage(api.eth.ChainDb(), hash); preimage != nil {
+ return preimage, nil
+ }
+ return nil, errors.New("unknown preimage")
+}
+
+// BadBlockArgs represents the entries in the list returned when bad blocks are queried.
+type BadBlockArgs struct {
+ Hash common.Hash `json:"hash"`
+ Block map[string]interface{} `json:"block"`
+ RLP string `json:"rlp"`
+}
+
+// GetBadBlocks returns a list of the last 'bad blocks' that the client has seen on the network
+// and returns them as a JSON list of block-hashes
+func (api *PrivateDebugAPI) GetBadBlocks(ctx context.Context) ([]*BadBlockArgs, error) {
+ blocks := api.eth.BlockChain().BadBlocks()
+ results := make([]*BadBlockArgs, len(blocks))
+
+ var err error
+ for i, block := range blocks {
+ results[i] = &BadBlockArgs{
+ Hash: block.Hash(),
+ }
+ if rlpBytes, err := rlp.EncodeToBytes(block); err != nil {
+ results[i].RLP = err.Error() // Hacky, but hey, it works
+ } else {
+ results[i].RLP = fmt.Sprintf("0x%x", rlpBytes)
+ }
+ if results[i].Block, err = ethapi.RPCMarshalBlock(block, true, true); err != nil {
+ results[i].Block = map[string]interface{}{"error": err.Error()}
+ }
+ }
+ return results, nil
+}
+
+// AccountRangeResult returns a mapping from the hash of an account addresses
+// to its preimage. It will return the JSON null if no preimage is found.
+// Since a query can return a limited amount of results, a "next" field is
+// also present for paging.
+type AccountRangeResult struct {
+ Accounts map[common.Hash]*common.Address `json:"accounts"`
+ Next common.Hash `json:"next"`
+}
+
+func accountRange(st state.Trie, start *common.Hash, maxResults int) (AccountRangeResult, error) {
+ if start == nil {
+ start = &common.Hash{0}
+ }
+ it := trie.NewIterator(st.NodeIterator(start.Bytes()))
+ result := AccountRangeResult{Accounts: make(map[common.Hash]*common.Address), Next: common.Hash{}}
+
+ if maxResults > AccountRangeMaxResults {
+ maxResults = AccountRangeMaxResults
+ }
+
+ for i := 0; i < maxResults && it.Next(); i++ {
+ if preimage := st.GetKey(it.Key); preimage != nil {
+ addr := &common.Address{}
+ addr.SetBytes(preimage)
+ result.Accounts[common.BytesToHash(it.Key)] = addr
+ } else {
+ result.Accounts[common.BytesToHash(it.Key)] = nil
+ }
+ }
+
+ if it.Next() {
+ result.Next = common.BytesToHash(it.Key)
+ }
+
+ return result, nil
+}
+
+// AccountRangeMaxResults is the maximum number of results to be returned per call
+const AccountRangeMaxResults = 256
+
+// AccountRange enumerates all accounts in the latest state
+func (api *PrivateDebugAPI) AccountRange(ctx context.Context, start *common.Hash, maxResults int) (AccountRangeResult, error) {
+ var statedb *state.StateDB
+ var err error
+ block := api.eth.blockchain.CurrentBlock()
+
+ if len(block.Transactions()) == 0 {
+ statedb, err = api.computeStateDB(block, defaultTraceReexec)
+ if err != nil {
+ return AccountRangeResult{}, err
+ }
+ } else {
+ _, _, statedb, err = api.computeTxEnv(block.Hash(), len(block.Transactions())-1, 0)
+ if err != nil {
+ return AccountRangeResult{}, err
+ }
+ }
+
+ trie, err := statedb.Database().OpenTrie(block.Header().Root)
+ if err != nil {
+ return AccountRangeResult{}, err
+ }
+
+ return accountRange(trie, start, maxResults)
+}
+
+// StorageRangeResult is the result of a debug_storageRangeAt API call.
+type StorageRangeResult struct {
+ Storage storageMap `json:"storage"`
+ NextKey *common.Hash `json:"nextKey"` // nil if Storage includes the last key in the trie.
+}
+
+type storageMap map[common.Hash]storageEntry
+
+type storageEntry struct {
+ Key *common.Hash `json:"key"`
+ Value common.Hash `json:"value"`
+}
+
+// StorageRangeAt returns the storage at the given block height and transaction index.
+func (api *PrivateDebugAPI) StorageRangeAt(ctx context.Context, blockHash common.Hash, txIndex int, contractAddress common.Address, keyStart hexutil.Bytes, maxResult int) (StorageRangeResult, error) {
+ _, _, statedb, err := api.computeTxEnv(blockHash, txIndex, 0)
+ if err != nil {
+ return StorageRangeResult{}, err
+ }
+ st := statedb.StorageTrie(contractAddress)
+ if st == nil {
+ return StorageRangeResult{}, fmt.Errorf("account %x doesn't exist", contractAddress)
+ }
+ return storageRangeAt(st, keyStart, maxResult)
+}
+
+func storageRangeAt(st state.Trie, start []byte, maxResult int) (StorageRangeResult, error) {
+ it := trie.NewIterator(st.NodeIterator(start))
+ result := StorageRangeResult{Storage: storageMap{}}
+ for i := 0; i < maxResult && it.Next(); i++ {
+ _, content, _, err := rlp.Split(it.Value)
+ if err != nil {
+ return StorageRangeResult{}, err
+ }
+ e := storageEntry{Value: common.BytesToHash(content)}
+ if preimage := st.GetKey(it.Key); preimage != nil {
+ preimage := common.BytesToHash(preimage)
+ e.Key = &preimage
+ }
+ result.Storage[common.BytesToHash(it.Key)] = e
+ }
+ // Add the 'next key' so clients can continue downloading.
+ if it.Next() {
+ next := common.BytesToHash(it.Key)
+ result.NextKey = &next
+ }
+ return result, nil
+}
+
+// GetModifiedAccountsByNumber returns all accounts that have changed between the
+// two blocks specified. A change is defined as a difference in nonce, balance,
+// code hash, or storage hash.
+//
+// With one parameter, returns the list of accounts modified in the specified block.
+func (api *PrivateDebugAPI) GetModifiedAccountsByNumber(startNum uint64, endNum *uint64) ([]common.Address, error) {
+ var startBlock, endBlock *types.Block
+
+ startBlock = api.eth.blockchain.GetBlockByNumber(startNum)
+ if startBlock == nil {
+ return nil, fmt.Errorf("start block %x not found", startNum)
+ }
+
+ if endNum == nil {
+ endBlock = startBlock
+ startBlock = api.eth.blockchain.GetBlockByHash(startBlock.ParentHash())
+ if startBlock == nil {
+ return nil, fmt.Errorf("block %x has no parent", endBlock.Number())
+ }
+ } else {
+ endBlock = api.eth.blockchain.GetBlockByNumber(*endNum)
+ if endBlock == nil {
+ return nil, fmt.Errorf("end block %d not found", *endNum)
+ }
+ }
+ return api.getModifiedAccounts(startBlock, endBlock)
+}
+
+// GetModifiedAccountsByHash returns all accounts that have changed between the
+// two blocks specified. A change is defined as a difference in nonce, balance,
+// code hash, or storage hash.
+//
+// With one parameter, returns the list of accounts modified in the specified block.
+func (api *PrivateDebugAPI) GetModifiedAccountsByHash(startHash common.Hash, endHash *common.Hash) ([]common.Address, error) {
+ var startBlock, endBlock *types.Block
+ startBlock = api.eth.blockchain.GetBlockByHash(startHash)
+ if startBlock == nil {
+ return nil, fmt.Errorf("start block %x not found", startHash)
+ }
+
+ if endHash == nil {
+ endBlock = startBlock
+ startBlock = api.eth.blockchain.GetBlockByHash(startBlock.ParentHash())
+ if startBlock == nil {
+ return nil, fmt.Errorf("block %x has no parent", endBlock.Number())
+ }
+ } else {
+ endBlock = api.eth.blockchain.GetBlockByHash(*endHash)
+ if endBlock == nil {
+ return nil, fmt.Errorf("end block %x not found", *endHash)
+ }
+ }
+ return api.getModifiedAccounts(startBlock, endBlock)
+}
+
+func (api *PrivateDebugAPI) getModifiedAccounts(startBlock, endBlock *types.Block) ([]common.Address, error) {
+ if startBlock.Number().Uint64() >= endBlock.Number().Uint64() {
+ return nil, fmt.Errorf("start block height (%d) must be less than end block height (%d)", startBlock.Number().Uint64(), endBlock.Number().Uint64())
+ }
+ triedb := api.eth.BlockChain().StateCache().TrieDB()
+
+ oldTrie, err := trie.NewSecure(startBlock.Root(), triedb)
+ if err != nil {
+ return nil, err
+ }
+ newTrie, err := trie.NewSecure(endBlock.Root(), triedb)
+ if err != nil {
+ return nil, err
+ }
+ diff, _ := trie.NewDifferenceIterator(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}))
+ iter := trie.NewIterator(diff)
+
+ var dirty []common.Address
+ for iter.Next() {
+ key := newTrie.GetKey(iter.Key)
+ if key == nil {
+ return nil, fmt.Errorf("no preimage found for hash %x", iter.Key)
+ }
+ dirty = append(dirty, common.BytesToAddress(key))
+ }
+ return dirty, nil
+}
diff --git a/eth/api_backend.go b/eth/api_backend.go
new file mode 100644
index 0000000..69904a7
--- /dev/null
+++ b/eth/api_backend.go
@@ -0,0 +1,244 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "context"
+ "errors"
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/accounts"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/math"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
+ "github.com/ethereum/go-ethereum/core/rawdb"
+ "github.com/ethereum/go-ethereum/core/state"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/core/vm"
+ "github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/eth/gasprice"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/params"
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+// EthAPIBackend implements ethapi.Backend for full nodes
+type EthAPIBackend struct {
+ extRPCEnabled bool
+ eth *Ethereum
+ gpo *gasprice.Oracle
+}
+
+// ChainConfig returns the active chain configuration.
+func (b *EthAPIBackend) ChainConfig() *params.ChainConfig {
+ return b.eth.blockchain.Config()
+}
+
+func (b *EthAPIBackend) CurrentBlock() *types.Block {
+ return b.eth.blockchain.CurrentBlock()
+}
+
+func (b *EthAPIBackend) SetHead(number uint64) {
+ b.eth.protocolManager.downloader.Cancel()
+ b.eth.blockchain.SetHead(number)
+}
+
+func (b *EthAPIBackend) HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error) {
+ // Pending block is only known by the miner
+ if number == rpc.PendingBlockNumber {
+ block := b.eth.miner.PendingBlock()
+ return block.Header(), nil
+ }
+ // Otherwise resolve and return the block
+ if number == rpc.LatestBlockNumber {
+ return b.eth.blockchain.CurrentBlock().Header(), nil
+ }
+ return b.eth.blockchain.GetHeaderByNumber(uint64(number)), nil
+}
+
+func (b *EthAPIBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
+ return b.eth.blockchain.GetHeaderByHash(hash), nil
+}
+
+func (b *EthAPIBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error) {
+ // Pending block is only known by the miner
+ if number == rpc.PendingBlockNumber {
+ block := b.eth.miner.PendingBlock()
+ return block, nil
+ }
+ // Otherwise resolve and return the block
+ if number == rpc.LatestBlockNumber {
+ return b.eth.blockchain.CurrentBlock(), nil
+ }
+ return b.eth.blockchain.GetBlockByNumber(uint64(number)), nil
+}
+
+func (b *EthAPIBackend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) {
+ return b.eth.blockchain.GetBlockByHash(hash), nil
+}
+
+func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*state.StateDB, *types.Header, error) {
+ // Pending state is only known by the miner
+ if number == rpc.PendingBlockNumber {
+ block, state := b.eth.miner.Pending()
+ return state, block.Header(), nil
+ }
+ // Otherwise resolve the block number and return its state
+ header, err := b.HeaderByNumber(ctx, number)
+ if err != nil {
+ return nil, nil, err
+ }
+ if header == nil {
+ return nil, nil, errors.New("header not found")
+ }
+ stateDb, err := b.eth.BlockChain().StateAt(header.Root)
+ return stateDb, header, err
+}
+
+func (b *EthAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) {
+ return b.eth.blockchain.GetReceiptsByHash(hash), nil
+}
+
+func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
+ receipts := b.eth.blockchain.GetReceiptsByHash(hash)
+ if receipts == nil {
+ return nil, nil
+ }
+ logs := make([][]*types.Log, len(receipts))
+ for i, receipt := range receipts {
+ logs[i] = receipt.Logs
+ }
+ return logs, nil
+}
+
+func (b *EthAPIBackend) GetTd(blockHash common.Hash) *big.Int {
+ return b.eth.blockchain.GetTdByHash(blockHash)
+}
+
+func (b *EthAPIBackend) GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header) (*vm.EVM, func() error, error) {
+ state.SetBalance(msg.From(), math.MaxBig256)
+ vmError := func() error { return nil }
+
+ context := core.NewEVMContext(msg, header, b.eth.BlockChain(), nil)
+ return vm.NewEVM(context, state, b.eth.blockchain.Config(), *b.eth.blockchain.GetVMConfig()), vmError, nil
+}
+
+func (b *EthAPIBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch)
+}
+
+func (b *EthAPIBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeChainEvent(ch)
+}
+
+func (b *EthAPIBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeChainHeadEvent(ch)
+}
+
+func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeChainSideEvent(ch)
+}
+
+func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+ return b.eth.BlockChain().SubscribeLogsEvent(ch)
+}
+
+func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
+ return b.eth.txPool.AddLocal(signedTx)
+}
+
+func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
+ pending, err := b.eth.txPool.Pending()
+ if err != nil {
+ return nil, err
+ }
+ var txs types.Transactions
+ for _, batch := range pending {
+ txs = append(txs, batch...)
+ }
+ return txs, nil
+}
+
+func (b *EthAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction {
+ return b.eth.txPool.Get(hash)
+}
+
+func (b *EthAPIBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) {
+ tx, blockHash, blockNumber, index := rawdb.ReadTransaction(b.eth.ChainDb(), txHash)
+ return tx, blockHash, blockNumber, index, nil
+}
+
+func (b *EthAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
+ return b.eth.txPool.Nonce(addr), nil
+}
+
+func (b *EthAPIBackend) Stats() (pending int, queued int) {
+ return b.eth.txPool.Stats()
+}
+
+func (b *EthAPIBackend) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
+ return b.eth.TxPool().Content()
+}
+
+func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
+ return b.eth.TxPool().SubscribeNewTxsEvent(ch)
+}
+
+func (b *EthAPIBackend) Downloader() *downloader.Downloader {
+ return b.eth.Downloader()
+}
+
+func (b *EthAPIBackend) ProtocolVersion() int {
+ return b.eth.EthVersion()
+}
+
+func (b *EthAPIBackend) SuggestPrice(ctx context.Context) (*big.Int, error) {
+ return b.gpo.SuggestPrice(ctx)
+}
+
+func (b *EthAPIBackend) ChainDb() ethdb.Database {
+ return b.eth.ChainDb()
+}
+
+func (b *EthAPIBackend) EventMux() *event.TypeMux {
+ return b.eth.EventMux()
+}
+
+func (b *EthAPIBackend) AccountManager() *accounts.Manager {
+ return b.eth.AccountManager()
+}
+
+func (b *EthAPIBackend) ExtRPCEnabled() bool {
+ return b.extRPCEnabled
+}
+
+func (b *EthAPIBackend) RPCGasCap() *big.Int {
+ return b.eth.config.RPCGasCap
+}
+
+func (b *EthAPIBackend) BloomStatus() (uint64, uint64) {
+ sections, _, _ := b.eth.bloomIndexer.Sections()
+ return params.BloomBitsBlocks, sections
+}
+
+func (b *EthAPIBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
+ for i := 0; i < bloomFilterThreads; i++ {
+ go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, b.eth.bloomRequests)
+ }
+}
diff --git a/eth/api_tracer.go b/eth/api_tracer.go
new file mode 100644
index 0000000..7d983fa
--- /dev/null
+++ b/eth/api_tracer.go
@@ -0,0 +1,823 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "runtime"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/rawdb"
+ "github.com/ethereum/go-ethereum/core/state"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/core/vm"
+ "github.com/ethereum/go-ethereum/eth/tracers"
+ "github.com/Determinant/coreth/internal/ethapi"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethereum/go-ethereum/trie"
+)
+
+const (
+ // defaultTraceTimeout is the amount of time a single transaction can execute
+ // by default before being forcefully aborted.
+ defaultTraceTimeout = 5 * time.Second
+
+ // defaultTraceReexec is the number of blocks the tracer is willing to go back
+ // and reexecute to produce missing historical state necessary to run a specific
+ // trace.
+ defaultTraceReexec = uint64(128)
+)
+
+// TraceConfig holds extra parameters to trace functions.
+type TraceConfig struct {
+ *vm.LogConfig
+ Tracer *string
+ Timeout *string
+ Reexec *uint64
+}
+
+// StdTraceConfig holds extra parameters to standard-json trace functions.
+type StdTraceConfig struct {
+ *vm.LogConfig
+ Reexec *uint64
+ TxHash common.Hash
+}
+
+// txTraceResult is the result of a single transaction trace.
+type txTraceResult struct {
+ Result interface{} `json:"result,omitempty"` // Trace results produced by the tracer
+ Error string `json:"error,omitempty"` // Trace failure produced by the tracer
+}
+
+// blockTraceTask represents a single block trace task when an entire chain is
+// being traced.
+type blockTraceTask struct {
+ statedb *state.StateDB // Intermediate state prepped for tracing
+ block *types.Block // Block to trace the transactions from
+ rootref common.Hash // Trie root reference held for this task
+ results []*txTraceResult // Trace results procudes by the task
+}
+
+// blockTraceResult represets the results of tracing a single block when an entire
+// chain is being traced.
+type blockTraceResult struct {
+ Block hexutil.Uint64 `json:"block"` // Block number corresponding to this trace
+ Hash common.Hash `json:"hash"` // Block hash corresponding to this trace
+ Traces []*txTraceResult `json:"traces"` // Trace results produced by the task
+}
+
+// txTraceTask represents a single transaction trace task when an entire block
+// is being traced.
+type txTraceTask struct {
+ statedb *state.StateDB // Intermediate state prepped for tracing
+ index int // Transaction offset in the block
+}
+
+// TraceChain returns the structured logs created during the execution of EVM
+// between two blocks (excluding start) and returns them as a JSON object.
+func (api *PrivateDebugAPI) TraceChain(ctx context.Context, start, end rpc.BlockNumber, config *TraceConfig) (*rpc.Subscription, error) {
+ // Fetch the block interval that we want to trace
+ var from, to *types.Block
+
+ switch start {
+ case rpc.PendingBlockNumber:
+ from = api.eth.miner.PendingBlock()
+ case rpc.LatestBlockNumber:
+ from = api.eth.blockchain.CurrentBlock()
+ default:
+ from = api.eth.blockchain.GetBlockByNumber(uint64(start))
+ }
+ switch end {
+ case rpc.PendingBlockNumber:
+ to = api.eth.miner.PendingBlock()
+ case rpc.LatestBlockNumber:
+ to = api.eth.blockchain.CurrentBlock()
+ default:
+ to = api.eth.blockchain.GetBlockByNumber(uint64(end))
+ }
+ // Trace the chain if we've found all our blocks
+ if from == nil {
+ return nil, fmt.Errorf("starting block #%d not found", start)
+ }
+ if to == nil {
+ return nil, fmt.Errorf("end block #%d not found", end)
+ }
+ if from.Number().Cmp(to.Number()) >= 0 {
+ return nil, fmt.Errorf("end block (#%d) needs to come after start block (#%d)", end, start)
+ }
+ return api.traceChain(ctx, from, to, config)
+}
+
+// traceChain configures a new tracer according to the provided configuration, and
+// executes all the transactions contained within. The return value will be one item
+// per transaction, dependent on the requested tracer.
+func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Block, config *TraceConfig) (*rpc.Subscription, error) {
+ // Tracing a chain is a **long** operation, only do with subscriptions
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+ }
+ sub := notifier.CreateSubscription()
+
+ // Ensure we have a valid starting state before doing any work
+ origin := start.NumberU64()
+ database := state.NewDatabaseWithCache(api.eth.ChainDb(), 16) // Chain tracing will probably start at genesis
+
+ if number := start.NumberU64(); number > 0 {
+ start = api.eth.blockchain.GetBlock(start.ParentHash(), start.NumberU64()-1)
+ if start == nil {
+ return nil, fmt.Errorf("parent block #%d not found", number-1)
+ }
+ }
+ statedb, err := state.New(start.Root(), database)
+ if err != nil {
+ // If the starting state is missing, allow some number of blocks to be reexecuted
+ reexec := defaultTraceReexec
+ if config != nil && config.Reexec != nil {
+ reexec = *config.Reexec
+ }
+ // Find the most recent block that has the state available
+ for i := uint64(0); i < reexec; i++ {
+ start = api.eth.blockchain.GetBlock(start.ParentHash(), start.NumberU64()-1)
+ if start == nil {
+ break
+ }
+ if statedb, err = state.New(start.Root(), database); err == nil {
+ break
+ }
+ }
+ // If we still don't have the state available, bail out
+ if err != nil {
+ switch err.(type) {
+ case *trie.MissingNodeError:
+ return nil, errors.New("required historical state unavailable")
+ default:
+ return nil, err
+ }
+ }
+ }
+ // Execute all the transaction contained within the chain concurrently for each block
+ blocks := int(end.NumberU64() - origin)
+
+ threads := runtime.NumCPU()
+ if threads > blocks {
+ threads = blocks
+ }
+ var (
+ pend = new(sync.WaitGroup)
+ tasks = make(chan *blockTraceTask, threads)
+ results = make(chan *blockTraceTask, threads)
+ )
+ for th := 0; th < threads; th++ {
+ pend.Add(1)
+ go func() {
+ defer pend.Done()
+
+ // Fetch and execute the next block trace tasks
+ for task := range tasks {
+ signer := types.MakeSigner(api.eth.blockchain.Config(), task.block.Number())
+
+ // Trace all the transactions contained within
+ for i, tx := range task.block.Transactions() {
+ msg, _ := tx.AsMessage(signer)
+ vmctx := core.NewEVMContext(msg, task.block.Header(), api.eth.blockchain, nil)
+
+ res, err := api.traceTx(ctx, msg, vmctx, task.statedb, config)
+ if err != nil {
+ task.results[i] = &txTraceResult{Error: err.Error()}
+ log.Warn("Tracing failed", "hash", tx.Hash(), "block", task.block.NumberU64(), "err", err)
+ break
+ }
+ // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect
+ task.statedb.Finalise(api.eth.blockchain.Config().IsEIP158(task.block.Number()))
+ task.results[i] = &txTraceResult{Result: res}
+ }
+ // Stream the result back to the user or abort on teardown
+ select {
+ case results <- task:
+ case <-notifier.Closed():
+ return
+ }
+ }
+ }()
+ }
+ // Start a goroutine to feed all the blocks into the tracers
+ begin := time.Now()
+
+ go func() {
+ var (
+ logged time.Time
+ number uint64
+ traced uint64
+ failed error
+ proot common.Hash
+ )
+ // Ensure everything is properly cleaned up on any exit path
+ defer func() {
+ close(tasks)
+ pend.Wait()
+
+ switch {
+ case failed != nil:
+ log.Warn("Chain tracing failed", "start", start.NumberU64(), "end", end.NumberU64(), "transactions", traced, "elapsed", time.Since(begin), "err", failed)
+ case number < end.NumberU64():
+ log.Warn("Chain tracing aborted", "start", start.NumberU64(), "end", end.NumberU64(), "abort", number, "transactions", traced, "elapsed", time.Since(begin))
+ default:
+ log.Info("Chain tracing finished", "start", start.NumberU64(), "end", end.NumberU64(), "transactions", traced, "elapsed", time.Since(begin))
+ }
+ close(results)
+ }()
+ // Feed all the blocks both into the tracer, as well as fast process concurrently
+ for number = start.NumberU64() + 1; number <= end.NumberU64(); number++ {
+ // Stop tracing if interruption was requested
+ select {
+ case <-notifier.Closed():
+ return
+ default:
+ }
+ // Print progress logs if long enough time elapsed
+ if time.Since(logged) > 8*time.Second {
+ if number > origin {
+ nodes, imgs := database.TrieDB().Size()
+ log.Info("Tracing chain segment", "start", origin, "end", end.NumberU64(), "current", number, "transactions", traced, "elapsed", time.Since(begin), "memory", nodes+imgs)
+ } else {
+ log.Info("Preparing state for chain trace", "block", number, "start", origin, "elapsed", time.Since(begin))
+ }
+ logged = time.Now()
+ }
+ // Retrieve the next block to trace
+ block := api.eth.blockchain.GetBlockByNumber(number)
+ if block == nil {
+ failed = fmt.Errorf("block #%d not found", number)
+ break
+ }
+ // Send the block over to the concurrent tracers (if not in the fast-forward phase)
+ if number > origin {
+ txs := block.Transactions()
+
+ select {
+ case tasks <- &blockTraceTask{statedb: statedb.Copy(), block: block, rootref: proot, results: make([]*txTraceResult, len(txs))}:
+ case <-notifier.Closed():
+ return
+ }
+ traced += uint64(len(txs))
+ }
+ // Generate the next state snapshot fast without tracing
+ _, _, _, err := api.eth.blockchain.Processor().Process(block, statedb, vm.Config{})
+ if err != nil {
+ failed = err
+ break
+ }
+ // Finalize the state so any modifications are written to the trie
+ root, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
+ if err != nil {
+ failed = err
+ break
+ }
+ if err := statedb.Reset(root); err != nil {
+ failed = err
+ break
+ }
+ // Reference the trie twice, once for us, once for the tracer
+ database.TrieDB().Reference(root, common.Hash{})
+ if number >= origin {
+ database.TrieDB().Reference(root, common.Hash{})
+ }
+ // Dereference all past tries we ourselves are done working with
+ if proot != (common.Hash{}) {
+ database.TrieDB().Dereference(proot)
+ }
+ proot = root
+
+ // TODO(karalabe): Do we need the preimages? Won't they accumulate too much?
+ }
+ }()
+
+ // Keep reading the trace results and stream the to the user
+ go func() {
+ var (
+ done = make(map[uint64]*blockTraceResult)
+ next = origin + 1
+ )
+ for res := range results {
+ // Queue up next received result
+ result := &blockTraceResult{
+ Block: hexutil.Uint64(res.block.NumberU64()),
+ Hash: res.block.Hash(),
+ Traces: res.results,
+ }
+ done[uint64(result.Block)] = result
+
+ // Dereference any paret tries held in memory by this task
+ database.TrieDB().Dereference(res.rootref)
+
+ // Stream completed traces to the user, aborting on the first error
+ for result, ok := done[next]; ok; result, ok = done[next] {
+ if len(result.Traces) > 0 || next == end.NumberU64() {
+ notifier.Notify(sub.ID, result)
+ }
+ delete(done, next)
+ next++
+ }
+ }
+ }()
+ return sub, nil
+}
+
+// TraceBlockByNumber returns the structured logs created during the execution of
+// EVM and returns them as a JSON object.
+func (api *PrivateDebugAPI) TraceBlockByNumber(ctx context.Context, number rpc.BlockNumber, config *TraceConfig) ([]*txTraceResult, error) {
+ // Fetch the block that we want to trace
+ var block *types.Block
+
+ switch number {
+ case rpc.PendingBlockNumber:
+ block = api.eth.miner.PendingBlock()
+ case rpc.LatestBlockNumber:
+ block = api.eth.blockchain.CurrentBlock()
+ default:
+ block = api.eth.blockchain.GetBlockByNumber(uint64(number))
+ }
+ // Trace the block if it was found
+ if block == nil {
+ return nil, fmt.Errorf("block #%d not found", number)
+ }
+ return api.traceBlock(ctx, block, config)
+}
+
+// TraceBlockByHash returns the structured logs created during the execution of
+// EVM and returns them as a JSON object.
+func (api *PrivateDebugAPI) TraceBlockByHash(ctx context.Context, hash common.Hash, config *TraceConfig) ([]*txTraceResult, error) {
+ block := api.eth.blockchain.GetBlockByHash(hash)
+ if block == nil {
+ return nil, fmt.Errorf("block %#x not found", hash)
+ }
+ return api.traceBlock(ctx, block, config)
+}
+
+// TraceBlock returns the structured logs created during the execution of EVM
+// and returns them as a JSON object.
+func (api *PrivateDebugAPI) TraceBlock(ctx context.Context, blob []byte, config *TraceConfig) ([]*txTraceResult, error) {
+ block := new(types.Block)
+ if err := rlp.Decode(bytes.NewReader(blob), block); err != nil {
+ return nil, fmt.Errorf("could not decode block: %v", err)
+ }
+ return api.traceBlock(ctx, block, config)
+}
+
+// TraceBlockFromFile returns the structured logs created during the execution of
+// EVM and returns them as a JSON object.
+func (api *PrivateDebugAPI) TraceBlockFromFile(ctx context.Context, file string, config *TraceConfig) ([]*txTraceResult, error) {
+ blob, err := ioutil.ReadFile(file)
+ if err != nil {
+ return nil, fmt.Errorf("could not read file: %v", err)
+ }
+ return api.TraceBlock(ctx, blob, config)
+}
+
+// TraceBadBlockByHash returns the structured logs created during the execution of
+// EVM against a block pulled from the pool of bad ones and returns them as a JSON
+// object.
+func (api *PrivateDebugAPI) TraceBadBlock(ctx context.Context, hash common.Hash, config *TraceConfig) ([]*txTraceResult, error) {
+ blocks := api.eth.blockchain.BadBlocks()
+ for _, block := range blocks {
+ if block.Hash() == hash {
+ return api.traceBlock(ctx, block, config)
+ }
+ }
+ return nil, fmt.Errorf("bad block %#x not found", hash)
+}
+
+// StandardTraceBlockToFile dumps the structured logs created during the
+// execution of EVM to the local file system and returns a list of files
+// to the caller.
+func (api *PrivateDebugAPI) StandardTraceBlockToFile(ctx context.Context, hash common.Hash, config *StdTraceConfig) ([]string, error) {
+ block := api.eth.blockchain.GetBlockByHash(hash)
+ if block == nil {
+ return nil, fmt.Errorf("block %#x not found", hash)
+ }
+ return api.standardTraceBlockToFile(ctx, block, config)
+}
+
+// StandardTraceBadBlockToFile dumps the structured logs created during the
+// execution of EVM against a block pulled from the pool of bad ones to the
+// local file system and returns a list of files to the caller.
+func (api *PrivateDebugAPI) StandardTraceBadBlockToFile(ctx context.Context, hash common.Hash, config *StdTraceConfig) ([]string, error) {
+ blocks := api.eth.blockchain.BadBlocks()
+ for _, block := range blocks {
+ if block.Hash() == hash {
+ return api.standardTraceBlockToFile(ctx, block, config)
+ }
+ }
+ return nil, fmt.Errorf("bad block %#x not found", hash)
+}
+
+// traceBlock configures a new tracer according to the provided configuration, and
+// executes all the transactions contained within. The return value will be one item
+// per transaction, dependent on the requestd tracer.
+func (api *PrivateDebugAPI) traceBlock(ctx context.Context, block *types.Block, config *TraceConfig) ([]*txTraceResult, error) {
+ // Create the parent state database
+ if err := api.eth.engine.VerifyHeader(api.eth.blockchain, block.Header(), true); err != nil {
+ return nil, err
+ }
+ parent := api.eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1)
+ if parent == nil {
+ return nil, fmt.Errorf("parent %#x not found", block.ParentHash())
+ }
+ reexec := defaultTraceReexec
+ if config != nil && config.Reexec != nil {
+ reexec = *config.Reexec
+ }
+ statedb, err := api.computeStateDB(parent, reexec)
+ if err != nil {
+ return nil, err
+ }
+ // Execute all the transaction contained within the block concurrently
+ var (
+ signer = types.MakeSigner(api.eth.blockchain.Config(), block.Number())
+
+ txs = block.Transactions()
+ results = make([]*txTraceResult, len(txs))
+
+ pend = new(sync.WaitGroup)
+ jobs = make(chan *txTraceTask, len(txs))
+ )
+ threads := runtime.NumCPU()
+ if threads > len(txs) {
+ threads = len(txs)
+ }
+ for th := 0; th < threads; th++ {
+ pend.Add(1)
+ go func() {
+ defer pend.Done()
+
+ // Fetch and execute the next transaction trace tasks
+ for task := range jobs {
+ msg, _ := txs[task.index].AsMessage(signer)
+ vmctx := core.NewEVMContext(msg, block.Header(), api.eth.blockchain, nil)
+
+ res, err := api.traceTx(ctx, msg, vmctx, task.statedb, config)
+ if err != nil {
+ results[task.index] = &txTraceResult{Error: err.Error()}
+ continue
+ }
+ results[task.index] = &txTraceResult{Result: res}
+ }
+ }()
+ }
+ // Feed the transactions into the tracers and return
+ var failed error
+ for i, tx := range txs {
+ // Send the trace task over for execution
+ jobs <- &txTraceTask{statedb: statedb.Copy(), index: i}
+
+ // Generate the next state snapshot fast without tracing
+ msg, _ := tx.AsMessage(signer)
+ vmctx := core.NewEVMContext(msg, block.Header(), api.eth.blockchain, nil)
+
+ vmenv := vm.NewEVM(vmctx, statedb, api.eth.blockchain.Config(), vm.Config{})
+ if _, _, _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas())); err != nil {
+ failed = err
+ break
+ }
+ // Finalize the state so any modifications are written to the trie
+ // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect
+ statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number()))
+ }
+ close(jobs)
+ pend.Wait()
+
+ // If execution failed in between, abort
+ if failed != nil {
+ return nil, failed
+ }
+ return results, nil
+}
+
+// standardTraceBlockToFile configures a new tracer which uses standard JSON output,
+// and traces either a full block or an individual transaction. The return value will
+// be one filename per transaction traced.
+func (api *PrivateDebugAPI) standardTraceBlockToFile(ctx context.Context, block *types.Block, config *StdTraceConfig) ([]string, error) {
+ // If we're tracing a single transaction, make sure it's present
+ if config != nil && config.TxHash != (common.Hash{}) {
+ if !containsTx(block, config.TxHash) {
+ return nil, fmt.Errorf("transaction %#x not found in block", config.TxHash)
+ }
+ }
+ // Create the parent state database
+ if err := api.eth.engine.VerifyHeader(api.eth.blockchain, block.Header(), true); err != nil {
+ return nil, err
+ }
+ parent := api.eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1)
+ if parent == nil {
+ return nil, fmt.Errorf("parent %#x not found", block.ParentHash())
+ }
+ reexec := defaultTraceReexec
+ if config != nil && config.Reexec != nil {
+ reexec = *config.Reexec
+ }
+ statedb, err := api.computeStateDB(parent, reexec)
+ if err != nil {
+ return nil, err
+ }
+ // Retrieve the tracing configurations, or use default values
+ var (
+ logConfig vm.LogConfig
+ txHash common.Hash
+ )
+ if config != nil {
+ if config.LogConfig != nil {
+ logConfig = *config.LogConfig
+ }
+ txHash = config.TxHash
+ }
+ logConfig.Debug = true
+
+ // Execute transaction, either tracing all or just the requested one
+ var (
+ signer = types.MakeSigner(api.eth.blockchain.Config(), block.Number())
+ dumps []string
+ )
+ for i, tx := range block.Transactions() {
+ // Prepare the trasaction for un-traced execution
+ var (
+ msg, _ = tx.AsMessage(signer)
+ vmctx = core.NewEVMContext(msg, block.Header(), api.eth.blockchain, nil)
+
+ vmConf vm.Config
+ dump *os.File
+ writer *bufio.Writer
+ err error
+ )
+ // If the transaction needs tracing, swap out the configs
+ if tx.Hash() == txHash || txHash == (common.Hash{}) {
+ // Generate a unique temporary file to dump it into
+ prefix := fmt.Sprintf("block_%#x-%d-%#x-", block.Hash().Bytes()[:4], i, tx.Hash().Bytes()[:4])
+
+ dump, err = ioutil.TempFile(os.TempDir(), prefix)
+ if err != nil {
+ return nil, err
+ }
+ dumps = append(dumps, dump.Name())
+
+ // Swap out the noop logger to the standard tracer
+ writer = bufio.NewWriter(dump)
+ vmConf = vm.Config{
+ Debug: true,
+ Tracer: vm.NewJSONLogger(&logConfig, writer),
+ EnablePreimageRecording: true,
+ }
+ }
+ // Execute the transaction and flush any traces to disk
+ vmenv := vm.NewEVM(vmctx, statedb, api.eth.blockchain.Config(), vmConf)
+ _, _, _, err = core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas()))
+ if writer != nil {
+ writer.Flush()
+ }
+ if dump != nil {
+ dump.Close()
+ log.Info("Wrote standard trace", "file", dump.Name())
+ }
+ if err != nil {
+ return dumps, err
+ }
+ // Finalize the state so any modifications are written to the trie
+ // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect
+ statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number()))
+
+ // If we've traced the transaction we were looking for, abort
+ if tx.Hash() == txHash {
+ break
+ }
+ }
+ return dumps, nil
+}
+
+// containsTx reports whether the transaction with a certain hash
+// is contained within the specified block.
+func containsTx(block *types.Block, hash common.Hash) bool {
+ for _, tx := range block.Transactions() {
+ if tx.Hash() == hash {
+ return true
+ }
+ }
+ return false
+}
+
+// computeStateDB retrieves the state database associated with a certain block.
+// If no state is locally available for the given block, a number of blocks are
+// attempted to be reexecuted to generate the desired state.
+func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (*state.StateDB, error) {
+ // If we have the state fully available, use that
+ statedb, err := api.eth.blockchain.StateAt(block.Root())
+ if err == nil {
+ return statedb, nil
+ }
+ // Otherwise try to reexec blocks until we find a state or reach our limit
+ origin := block.NumberU64()
+ database := state.NewDatabaseWithCache(api.eth.ChainDb(), 16)
+
+ for i := uint64(0); i < reexec; i++ {
+ block = api.eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1)
+ if block == nil {
+ break
+ }
+ if statedb, err = state.New(block.Root(), database); err == nil {
+ break
+ }
+ }
+ if err != nil {
+ switch err.(type) {
+ case *trie.MissingNodeError:
+ return nil, fmt.Errorf("required historical state unavailable (reexec=%d)", reexec)
+ default:
+ return nil, err
+ }
+ }
+ // State was available at historical point, regenerate
+ var (
+ start = time.Now()
+ logged time.Time
+ proot common.Hash
+ )
+ for block.NumberU64() < origin {
+ // Print progress logs if long enough time elapsed
+ if time.Since(logged) > 8*time.Second {
+ log.Info("Regenerating historical state", "block", block.NumberU64()+1, "target", origin, "remaining", origin-block.NumberU64()-1, "elapsed", time.Since(start))
+ logged = time.Now()
+ }
+ // Retrieve the next block to regenerate and process it
+ if block = api.eth.blockchain.GetBlockByNumber(block.NumberU64() + 1); block == nil {
+ return nil, fmt.Errorf("block #%d not found", block.NumberU64()+1)
+ }
+ _, _, _, err := api.eth.blockchain.Processor().Process(block, statedb, vm.Config{})
+ if err != nil {
+ return nil, fmt.Errorf("processing block %d failed: %v", block.NumberU64(), err)
+ }
+ // Finalize the state so any modifications are written to the trie
+ root, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
+ if err != nil {
+ return nil, err
+ }
+ if err := statedb.Reset(root); err != nil {
+ return nil, fmt.Errorf("state reset after block %d failed: %v", block.NumberU64(), err)
+ }
+ database.TrieDB().Reference(root, common.Hash{})
+ if proot != (common.Hash{}) {
+ database.TrieDB().Dereference(proot)
+ }
+ proot = root
+ }
+ nodes, imgs := database.TrieDB().Size()
+ log.Info("Historical state regenerated", "block", block.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs)
+ return statedb, nil
+}
+
+// TraceTransaction returns the structured logs created during the execution of EVM
+// and returns them as a JSON object.
+func (api *PrivateDebugAPI) TraceTransaction(ctx context.Context, hash common.Hash, config *TraceConfig) (interface{}, error) {
+ // Retrieve the transaction and assemble its EVM context
+ tx, blockHash, _, index := rawdb.ReadTransaction(api.eth.ChainDb(), hash)
+ if tx == nil {
+ return nil, fmt.Errorf("transaction %#x not found", hash)
+ }
+ reexec := defaultTraceReexec
+ if config != nil && config.Reexec != nil {
+ reexec = *config.Reexec
+ }
+ msg, vmctx, statedb, err := api.computeTxEnv(blockHash, int(index), reexec)
+ if err != nil {
+ return nil, err
+ }
+ // Trace the transaction and return
+ return api.traceTx(ctx, msg, vmctx, statedb, config)
+}
+
+// traceTx configures a new tracer according to the provided configuration, and
+// executes the given message in the provided environment. The return value will
+// be tracer dependent.
+func (api *PrivateDebugAPI) traceTx(ctx context.Context, message core.Message, vmctx vm.Context, statedb *state.StateDB, config *TraceConfig) (interface{}, error) {
+ // Assemble the structured logger or the JavaScript tracer
+ var (
+ tracer vm.Tracer
+ err error
+ )
+ switch {
+ case config != nil && config.Tracer != nil:
+ // Define a meaningful timeout of a single transaction trace
+ timeout := defaultTraceTimeout
+ if config.Timeout != nil {
+ if timeout, err = time.ParseDuration(*config.Timeout); err != nil {
+ return nil, err
+ }
+ }
+ // Constuct the JavaScript tracer to execute with
+ if tracer, err = tracers.New(*config.Tracer); err != nil {
+ return nil, err
+ }
+ // Handle timeouts and RPC cancellations
+ deadlineCtx, cancel := context.WithTimeout(ctx, timeout)
+ go func() {
+ <-deadlineCtx.Done()
+ tracer.(*tracers.Tracer).Stop(errors.New("execution timeout"))
+ }()
+ defer cancel()
+
+ case config == nil:
+ tracer = vm.NewStructLogger(nil)
+
+ default:
+ tracer = vm.NewStructLogger(config.LogConfig)
+ }
+ // Run the transaction with tracing enabled.
+ vmenv := vm.NewEVM(vmctx, statedb, api.eth.blockchain.Config(), vm.Config{Debug: true, Tracer: tracer})
+
+ ret, gas, failed, err := core.ApplyMessage(vmenv, message, new(core.GasPool).AddGas(message.Gas()))
+ if err != nil {
+ return nil, fmt.Errorf("tracing failed: %v", err)
+ }
+ // Depending on the tracer type, format and return the output
+ switch tracer := tracer.(type) {
+ case *vm.StructLogger:
+ return &ethapi.ExecutionResult{
+ Gas: gas,
+ Failed: failed,
+ ReturnValue: fmt.Sprintf("%x", ret),
+ StructLogs: ethapi.FormatLogs(tracer.StructLogs()),
+ }, nil
+
+ case *tracers.Tracer:
+ return tracer.GetResult()
+
+ default:
+ panic(fmt.Sprintf("bad tracer type %T", tracer))
+ }
+}
+
+// computeTxEnv returns the execution environment of a certain transaction.
+func (api *PrivateDebugAPI) computeTxEnv(blockHash common.Hash, txIndex int, reexec uint64) (core.Message, vm.Context, *state.StateDB, error) {
+ // Create the parent state database
+ block := api.eth.blockchain.GetBlockByHash(blockHash)
+ if block == nil {
+ return nil, vm.Context{}, nil, fmt.Errorf("block %#x not found", blockHash)
+ }
+ parent := api.eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1)
+ if parent == nil {
+ return nil, vm.Context{}, nil, fmt.Errorf("parent %#x not found", block.ParentHash())
+ }
+ statedb, err := api.computeStateDB(parent, reexec)
+ if err != nil {
+ return nil, vm.Context{}, nil, err
+ }
+
+ if txIndex == 0 && len(block.Transactions()) == 0 {
+ return nil, vm.Context{}, statedb, nil
+ }
+
+ // Recompute transactions up to the target index.
+ signer := types.MakeSigner(api.eth.blockchain.Config(), block.Number())
+
+ for idx, tx := range block.Transactions() {
+ // Assemble the transaction call message and return if the requested offset
+ msg, _ := tx.AsMessage(signer)
+ context := core.NewEVMContext(msg, block.Header(), api.eth.blockchain, nil)
+ if idx == txIndex {
+ return msg, context, statedb, nil
+ }
+ // Not yet the searched for transaction, execute on top of the current state
+ vmenv := vm.NewEVM(context, statedb, api.eth.blockchain.Config(), vm.Config{})
+ if _, _, _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil {
+ return nil, vm.Context{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err)
+ }
+ // Ensure any modifications are committed to the state
+ // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect
+ statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number()))
+ }
+ return nil, vm.Context{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, blockHash)
+}
diff --git a/eth/backend.go b/eth/backend.go
new file mode 100644
index 0000000..92dfa28
--- /dev/null
+++ b/eth/backend.go
@@ -0,0 +1,566 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package eth implements the Ethereum protocol.
+package eth
+
+import (
+ "errors"
+ "fmt"
+ "math/big"
+ "runtime"
+ "sync"
+ "sync/atomic"
+
+ "github.com/ethereum/go-ethereum/accounts"
+ "github.com/ethereum/go-ethereum/accounts/abi/bind"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/consensus"
+ "github.com/ethereum/go-ethereum/consensus/clique"
+ "github.com/ethereum/go-ethereum/consensus/ethash"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
+ "github.com/ethereum/go-ethereum/core/rawdb"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/core/vm"
+ "github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/eth/filters"
+ "github.com/ethereum/go-ethereum/eth/gasprice"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/Determinant/coreth/internal/ethapi"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/miner"
+ "github.com/Determinant/coreth/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/enr"
+ "github.com/ethereum/go-ethereum/params"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+type LesServer interface {
+ Start(srvr *p2p.Server)
+ Stop()
+ APIs() []rpc.API
+ Protocols() []p2p.Protocol
+ SetBloomBitsIndexer(bbIndexer *core.ChainIndexer)
+ SetContractBackend(bind.ContractBackend)
+}
+
+// Ethereum implements the Ethereum full node service.
+type Ethereum struct {
+ config *Config
+
+ // Channel for shutting down the service
+ shutdownChan chan bool
+
+ server *p2p.Server
+
+ // Handlers
+ txPool *core.TxPool
+ blockchain *core.BlockChain
+ protocolManager *ProtocolManager
+ lesServer LesServer
+
+ // DB interfaces
+ chainDb ethdb.Database // Block chain database
+
+ eventMux *event.TypeMux
+ engine consensus.Engine
+ accountManager *accounts.Manager
+
+ bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
+ bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
+
+ APIBackend *EthAPIBackend
+
+ miner *miner.Miner
+ gasPrice *big.Int
+ etherbase common.Address
+
+ networkID uint64
+ netRPCService *ethapi.PublicNetAPI
+
+ lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)
+}
+
+func (s *Ethereum) AddLesServer(ls LesServer) {
+ s.lesServer = ls
+ ls.SetBloomBitsIndexer(s.bloomIndexer)
+}
+
+// SetClient sets a rpc client which connecting to our local node.
+func (s *Ethereum) SetContractBackend(backend bind.ContractBackend) {
+ // Pass the rpc client to les server if it is enabled.
+ if s.lesServer != nil {
+ s.lesServer.SetContractBackend(backend)
+ }
+}
+
+// New creates a new Ethereum object (including the
+// initialisation of the common Ethereum object)
+func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
+ // Ensure configuration values are compatible and sane
+ if config.SyncMode == downloader.LightSync {
+ return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
+ }
+ if !config.SyncMode.IsValid() {
+ return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
+ }
+ if config.Miner.GasPrice == nil || config.Miner.GasPrice.Cmp(common.Big0) <= 0 {
+ log.Warn("Sanitizing invalid miner gas price", "provided", config.Miner.GasPrice, "updated", DefaultConfig.Miner.GasPrice)
+ config.Miner.GasPrice = new(big.Int).Set(DefaultConfig.Miner.GasPrice)
+ }
+ if config.NoPruning && config.TrieDirtyCache > 0 {
+ config.TrieCleanCache += config.TrieDirtyCache
+ config.TrieDirtyCache = 0
+ }
+ log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024)
+
+ // Assemble the Ethereum object
+ chainDb, err := ctx.OpenDatabaseWithFreezer("chaindata", config.DatabaseCache, config.DatabaseHandles, config.DatabaseFreezer, "eth/db/chaindata/")
+ if err != nil {
+ return nil, err
+ }
+ chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
+ if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
+ return nil, genesisErr
+ }
+ log.Info("Initialised chain configuration", "config", chainConfig)
+
+ eth := &Ethereum{
+ config: config,
+ chainDb: chainDb,
+ eventMux: ctx.EventMux,
+ accountManager: ctx.AccountManager,
+ engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.Miner.Notify, config.Miner.Noverify, chainDb),
+ shutdownChan: make(chan bool),
+ networkID: config.NetworkId,
+ gasPrice: config.Miner.GasPrice,
+ etherbase: config.Miner.Etherbase,
+ bloomRequests: make(chan chan *bloombits.Retrieval),
+ bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
+ }
+
+ bcVersion := rawdb.ReadDatabaseVersion(chainDb)
+ var dbVer = "<nil>"
+ if bcVersion != nil {
+ dbVer = fmt.Sprintf("%d", *bcVersion)
+ }
+ log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId, "dbversion", dbVer)
+
+ if !config.SkipBcVersionCheck {
+ if bcVersion != nil && *bcVersion > core.BlockChainVersion {
+ return nil, fmt.Errorf("database version is v%d, Geth %s only supports v%d", *bcVersion, params.VersionWithMeta, core.BlockChainVersion)
+ } else if bcVersion == nil || *bcVersion < core.BlockChainVersion {
+ log.Warn("Upgrade blockchain database version", "from", dbVer, "to", core.BlockChainVersion)
+ rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion)
+ }
+ }
+ var (
+ vmConfig = vm.Config{
+ EnablePreimageRecording: config.EnablePreimageRecording,
+ EWASMInterpreter: config.EWASMInterpreter,
+ EVMInterpreter: config.EVMInterpreter,
+ }
+ cacheConfig = &core.CacheConfig{
+ TrieCleanLimit: config.TrieCleanCache,
+ TrieCleanNoPrefetch: config.NoPrefetch,
+ TrieDirtyLimit: config.TrieDirtyCache,
+ TrieDirtyDisabled: config.NoPruning,
+ TrieTimeLimit: config.TrieTimeout,
+ }
+ )
+ eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve)
+ if err != nil {
+ return nil, err
+ }
+ // Rewind the chain in case of an incompatible config upgrade.
+ if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
+ log.Warn("Rewinding chain to upgrade configuration", "err", compat)
+ eth.blockchain.SetHead(compat.RewindTo)
+ rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig)
+ }
+ eth.bloomIndexer.Start(eth.blockchain)
+
+ if config.TxPool.Journal != "" {
+ config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
+ }
+ eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)
+
+ // Permit the downloader to use the trie cache allowance during fast sync
+ cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit
+ checkpoint := config.Checkpoint
+ if checkpoint == nil {
+ checkpoint = params.TrustedCheckpoints[genesisHash]
+ }
+ if eth.protocolManager, err = NewProtocolManager(chainConfig, checkpoint, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, cacheLimit, config.Whitelist); err != nil {
+ return nil, err
+ }
+ eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock)
+ eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
+
+ eth.APIBackend = &EthAPIBackend{ctx.ExtRPCEnabled(), eth, nil}
+ gpoParams := config.GPO
+ if gpoParams.Default == nil {
+ gpoParams.Default = config.Miner.GasPrice
+ }
+ eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)
+
+ return eth, nil
+}
+
+func makeExtraData(extra []byte) []byte {
+ if len(extra) == 0 {
+ // create default extradata
+ extra, _ = rlp.EncodeToBytes([]interface{}{
+ uint(params.VersionMajor<<16 | params.VersionMinor<<8 | params.VersionPatch),
+ "geth",
+ runtime.Version(),
+ runtime.GOOS,
+ })
+ }
+ if uint64(len(extra)) > params.MaximumExtraDataSize {
+ log.Warn("Miner extra data exceed limit", "extra", hexutil.Bytes(extra), "limit", params.MaximumExtraDataSize)
+ extra = nil
+ }
+ return extra
+}
+
+// CreateConsensusEngine creates the required type of consensus engine instance for an Ethereum service
+func CreateConsensusEngine(ctx *node.ServiceContext, chainConfig *params.ChainConfig, config *ethash.Config, notify []string, noverify bool, db ethdb.Database) consensus.Engine {
+ // If proof-of-authority is requested, set it up
+ if chainConfig.Clique != nil {
+ return clique.New(chainConfig.Clique, db)
+ }
+ // Otherwise assume proof-of-work
+ switch config.PowMode {
+ case ethash.ModeFake:
+ log.Warn("Ethash used in fake mode")
+ return ethash.NewFaker()
+ case ethash.ModeTest:
+ log.Warn("Ethash used in test mode")
+ return ethash.NewTester(nil, noverify)
+ case ethash.ModeShared:
+ log.Warn("Ethash used in shared mode")
+ return ethash.NewShared()
+ default:
+ engine := ethash.New(ethash.Config{
+ CacheDir: ctx.ResolvePath(config.CacheDir),
+ CachesInMem: config.CachesInMem,
+ CachesOnDisk: config.CachesOnDisk,
+ DatasetDir: config.DatasetDir,
+ DatasetsInMem: config.DatasetsInMem,
+ DatasetsOnDisk: config.DatasetsOnDisk,
+ }, notify, noverify)
+ engine.SetThreads(-1) // Disable CPU mining
+ return engine
+ }
+}
+
+// APIs return the collection of RPC services the ethereum package offers.
+// NOTE, some of these services probably need to be moved to somewhere else.
+func (s *Ethereum) APIs() []rpc.API {
+ apis := ethapi.GetAPIs(s.APIBackend)
+
+ // Append any APIs exposed explicitly by the les server
+ if s.lesServer != nil {
+ apis = append(apis, s.lesServer.APIs()...)
+ }
+ // Append any APIs exposed explicitly by the consensus engine
+ apis = append(apis, s.engine.APIs(s.BlockChain())...)
+
+ // Append any APIs exposed explicitly by the les server
+ if s.lesServer != nil {
+ apis = append(apis, s.lesServer.APIs()...)
+ }
+
+ // Append all the local APIs and return
+ return append(apis, []rpc.API{
+ {
+ Namespace: "eth",
+ Version: "1.0",
+ Service: NewPublicEthereumAPI(s),
+ Public: true,
+ }, {
+ Namespace: "eth",
+ Version: "1.0",
+ Service: NewPublicMinerAPI(s),
+ Public: true,
+ }, {
+ Namespace: "eth",
+ Version: "1.0",
+ Service: downloader.NewPublicDownloaderAPI(s.protocolManager.downloader, s.eventMux),
+ Public: true,
+ }, {
+ Namespace: "miner",
+ Version: "1.0",
+ Service: NewPrivateMinerAPI(s),
+ Public: false,
+ }, {
+ Namespace: "eth",
+ Version: "1.0",
+ Service: filters.NewPublicFilterAPI(s.APIBackend, false),
+ Public: true,
+ }, {
+ Namespace: "admin",
+ Version: "1.0",
+ Service: NewPrivateAdminAPI(s),
+ }, {
+ Namespace: "debug",
+ Version: "1.0",
+ Service: NewPublicDebugAPI(s),
+ Public: true,
+ }, {
+ Namespace: "debug",
+ Version: "1.0",
+ Service: NewPrivateDebugAPI(s),
+ }, {
+ Namespace: "net",
+ Version: "1.0",
+ Service: s.netRPCService,
+ Public: true,
+ },
+ }...)
+}
+
+func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) {
+ s.blockchain.ResetWithGenesisBlock(gb)
+}
+
+func (s *Ethereum) Etherbase() (eb common.Address, err error) {
+ s.lock.RLock()
+ etherbase := s.etherbase
+ s.lock.RUnlock()
+
+ if etherbase != (common.Address{}) {
+ return etherbase, nil
+ }
+ if wallets := s.AccountManager().Wallets(); len(wallets) > 0 {
+ if accounts := wallets[0].Accounts(); len(accounts) > 0 {
+ etherbase := accounts[0].Address
+
+ s.lock.Lock()
+ s.etherbase = etherbase
+ s.lock.Unlock()
+
+ log.Info("Etherbase automatically configured", "address", etherbase)
+ return etherbase, nil
+ }
+ }
+ return common.Address{}, fmt.Errorf("etherbase must be explicitly specified")
+}
+
+// isLocalBlock checks whether the specified block is mined
+// by local miner accounts.
+//
+// We regard two types of accounts as local miner account: etherbase
+// and accounts specified via `txpool.locals` flag.
+func (s *Ethereum) isLocalBlock(block *types.Block) bool {
+ author, err := s.engine.Author(block.Header())
+ if err != nil {
+ log.Warn("Failed to retrieve block author", "number", block.NumberU64(), "hash", block.Hash(), "err", err)
+ return false
+ }
+ // Check whether the given address is etherbase.
+ s.lock.RLock()
+ etherbase := s.etherbase
+ s.lock.RUnlock()
+ if author == etherbase {
+ return true
+ }
+ // Check whether the given address is specified by `txpool.local`
+ // CLI flag.
+ for _, account := range s.config.TxPool.Locals {
+ if account == author {
+ return true
+ }
+ }
+ return false
+}
+
+// shouldPreserve checks whether we should preserve the given block
+// during the chain reorg depending on whether the author of block
+// is a local account.
+func (s *Ethereum) shouldPreserve(block *types.Block) bool {
+ // The reason we need to disable the self-reorg preserving for clique
+ // is it can be probable to introduce a deadlock.
+ //
+ // e.g. If there are 7 available signers
+ //
+ // r1 A
+ // r2 B
+ // r3 C
+ // r4 D
+ // r5 A [X] F G
+ // r6 [X]
+ //
+ // In the round5, the inturn signer E is offline, so the worst case
+ // is A, F and G sign the block of round5 and reject the block of opponents
+ // and in the round6, the last available signer B is offline, the whole
+ // network is stuck.
+ if _, ok := s.engine.(*clique.Clique); ok {
+ return false
+ }
+ return s.isLocalBlock(block)
+}
+
+// SetEtherbase sets the mining reward address.
+func (s *Ethereum) SetEtherbase(etherbase common.Address) {
+ s.lock.Lock()
+ s.etherbase = etherbase
+ s.lock.Unlock()
+
+ s.miner.SetEtherbase(etherbase)
+}
+
+// StartMining starts the miner with the given number of CPU threads. If mining
+// is already running, this method adjust the number of threads allowed to use
+// and updates the minimum price required by the transaction pool.
+func (s *Ethereum) StartMining(threads int) error {
+ // Update the thread count within the consensus engine
+ type threaded interface {
+ SetThreads(threads int)
+ }
+ if th, ok := s.engine.(threaded); ok {
+ log.Info("Updated mining threads", "threads", threads)
+ if threads == 0 {
+ threads = -1 // Disable the miner from within
+ }
+ th.SetThreads(threads)
+ }
+ // If the miner was not running, initialize it
+ if !s.IsMining() {
+ // Propagate the initial price point to the transaction pool
+ s.lock.RLock()
+ price := s.gasPrice
+ s.lock.RUnlock()
+ s.txPool.SetGasPrice(price)
+
+ // Configure the local mining address
+ eb, err := s.Etherbase()
+ if err != nil {
+ log.Error("Cannot start mining without etherbase", "err", err)
+ return fmt.Errorf("etherbase missing: %v", err)
+ }
+ if clique, ok := s.engine.(*clique.Clique); ok {
+ wallet, err := s.accountManager.Find(accounts.Account{Address: eb})
+ if wallet == nil || err != nil {
+ log.Error("Etherbase account unavailable locally", "err", err)
+ return fmt.Errorf("signer missing: %v", err)
+ }
+ clique.Authorize(eb, wallet.SignData)
+ }
+ // If mining is started, we can disable the transaction rejection mechanism
+ // introduced to speed sync times.
+ atomic.StoreUint32(&s.protocolManager.acceptTxs, 1)
+
+ go s.miner.Start(eb)
+ }
+ return nil
+}
+
+// StopMining terminates the miner, both at the consensus engine level as well as
+// at the block creation level.
+func (s *Ethereum) StopMining() {
+ // Update the thread count within the consensus engine
+ type threaded interface {
+ SetThreads(threads int)
+ }
+ if th, ok := s.engine.(threaded); ok {
+ th.SetThreads(-1)
+ }
+ // Stop the block creating itself
+ s.miner.Stop()
+}
+
+func (s *Ethereum) IsMining() bool { return s.miner.Mining() }
+func (s *Ethereum) Miner() *miner.Miner { return s.miner }
+
+func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager }
+func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain }
+func (s *Ethereum) TxPool() *core.TxPool { return s.txPool }
+func (s *Ethereum) EventMux() *event.TypeMux { return s.eventMux }
+func (s *Ethereum) Engine() consensus.Engine { return s.engine }
+func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb }
+func (s *Ethereum) IsListening() bool { return true } // Always listening
+func (s *Ethereum) EthVersion() int { return int(ProtocolVersions[0]) }
+func (s *Ethereum) NetVersion() uint64 { return s.networkID }
+func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManager.downloader }
+func (s *Ethereum) Synced() bool { return atomic.LoadUint32(&s.protocolManager.acceptTxs) == 1 }
+func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning }
+
+// Protocols implements node.Service, returning all the currently configured
+// network protocols to start.
+func (s *Ethereum) Protocols() []p2p.Protocol {
+ protos := make([]p2p.Protocol, len(ProtocolVersions))
+ for i, vsn := range ProtocolVersions {
+ protos[i] = s.protocolManager.makeProtocol(vsn)
+ protos[i].Attributes = []enr.Entry{s.currentEthEntry()}
+ }
+ if s.lesServer != nil {
+ protos = append(protos, s.lesServer.Protocols()...)
+ }
+ return protos
+}
+
+// Start implements node.Service, starting all internal goroutines needed by the
+// Ethereum protocol implementation.
+func (s *Ethereum) Start(srvr *p2p.Server) error {
+ s.startEthEntryUpdate(srvr.LocalNode())
+
+ // Start the bloom bits servicing goroutines
+ s.startBloomHandlers(params.BloomBitsBlocks)
+
+ // Start the RPC service
+ s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
+
+ // Figure out a max peers count based on the server limits
+ maxPeers := srvr.MaxPeers
+ if s.config.LightServ > 0 {
+ if s.config.LightPeers >= srvr.MaxPeers {
+ return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
+ }
+ maxPeers -= s.config.LightPeers
+ }
+ // Start the networking layer and the light server if requested
+ s.protocolManager.Start(maxPeers)
+ if s.lesServer != nil {
+ s.lesServer.Start(srvr)
+ }
+ return nil
+}
+
+// Stop implements node.Service, terminating all internal goroutines used by the
+// Ethereum protocol.
+func (s *Ethereum) Stop() error {
+ s.bloomIndexer.Close()
+ s.blockchain.Stop()
+ s.engine.Close()
+ s.protocolManager.Stop()
+ if s.lesServer != nil {
+ s.lesServer.Stop()
+ }
+ s.txPool.Stop()
+ s.miner.Stop()
+ s.eventMux.Stop()
+
+ s.chainDb.Close()
+ close(s.shutdownChan)
+ return nil
+}
diff --git a/eth/bloombits.go b/eth/bloombits.go
new file mode 100644
index 0000000..9a31997
--- /dev/null
+++ b/eth/bloombits.go
@@ -0,0 +1,138 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "context"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/bitutil"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
+ "github.com/ethereum/go-ethereum/core/rawdb"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
+)
+
+const (
+ // bloomServiceThreads is the number of goroutines used globally by an Ethereum
+ // instance to service bloombits lookups for all running filters.
+ bloomServiceThreads = 16
+
+ // bloomFilterThreads is the number of goroutines used locally per filter to
+ // multiplex requests onto the global servicing goroutines.
+ bloomFilterThreads = 3
+
+ // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
+ // in a single batch.
+ bloomRetrievalBatch = 16
+
+ // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
+ // to accumulate request an entire batch (avoiding hysteresis).
+ bloomRetrievalWait = time.Duration(0)
+)
+
+// startBloomHandlers starts a batch of goroutines to accept bloom bit database
+// retrievals from possibly a range of filters and serving the data to satisfy.
+func (eth *Ethereum) startBloomHandlers(sectionSize uint64) {
+ for i := 0; i < bloomServiceThreads; i++ {
+ go func() {
+ for {
+ select {
+ case <-eth.shutdownChan:
+ return
+
+ case request := <-eth.bloomRequests:
+ task := <-request
+ task.Bitsets = make([][]byte, len(task.Sections))
+ for i, section := range task.Sections {
+ head := rawdb.ReadCanonicalHash(eth.chainDb, (section+1)*sectionSize-1)
+ if compVector, err := rawdb.ReadBloomBits(eth.chainDb, task.Bit, section, head); err == nil {
+ if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil {
+ task.Bitsets[i] = blob
+ } else {
+ task.Error = err
+ }
+ } else {
+ task.Error = err
+ }
+ }
+ request <- task
+ }
+ }
+ }()
+ }
+}
+
+const (
+ // bloomThrottling is the time to wait between processing two consecutive index
+ // sections. It's useful during chain upgrades to prevent disk overload.
+ bloomThrottling = 100 * time.Millisecond
+)
+
+// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
+// for the Ethereum header bloom filters, permitting blazing fast filtering.
+type BloomIndexer struct {
+ size uint64 // section size to generate bloombits for
+ db ethdb.Database // database instance to write index data and metadata into
+ gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
+ section uint64 // Section is the section number being processed currently
+ head common.Hash // Head is the hash of the last header processed
+}
+
+// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
+// canonical chain for fast logs filtering.
+func NewBloomIndexer(db ethdb.Database, size, confirms uint64) *core.ChainIndexer {
+ backend := &BloomIndexer{
+ db: db,
+ size: size,
+ }
+ table := rawdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix))
+
+ return core.NewChainIndexer(db, table, backend, size, confirms, bloomThrottling, "bloombits")
+}
+
+// Reset implements core.ChainIndexerBackend, starting a new bloombits index
+// section.
+func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error {
+ gen, err := bloombits.NewGenerator(uint(b.size))
+ b.gen, b.section, b.head = gen, section, common.Hash{}
+ return err
+}
+
+// Process implements core.ChainIndexerBackend, adding a new header's bloom into
+// the index.
+func (b *BloomIndexer) Process(ctx context.Context, header *types.Header) error {
+ b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom)
+ b.head = header.Hash()
+ return nil
+}
+
+// Commit implements core.ChainIndexerBackend, finalizing the bloom section and
+// writing it out into the database.
+func (b *BloomIndexer) Commit() error {
+ batch := b.db.NewBatch()
+ for i := 0; i < types.BloomBitLength; i++ {
+ bits, err := b.gen.Bitset(uint(i))
+ if err != nil {
+ return err
+ }
+ rawdb.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits))
+ }
+ return batch.Write()
+}
diff --git a/eth/config.go b/eth/config.go
new file mode 100644
index 0000000..6887872
--- /dev/null
+++ b/eth/config.go
@@ -0,0 +1,157 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "math/big"
+ "os"
+ "os/user"
+ "path/filepath"
+ "runtime"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/consensus/ethash"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/eth/gasprice"
+ "github.com/ethereum/go-ethereum/miner"
+ "github.com/ethereum/go-ethereum/params"
+)
+
+// DefaultConfig contains default settings for use on the Ethereum main net.
+var DefaultConfig = Config{
+ SyncMode: downloader.FastSync,
+ Ethash: ethash.Config{
+ CacheDir: "ethash",
+ CachesInMem: 2,
+ CachesOnDisk: 3,
+ DatasetsInMem: 1,
+ DatasetsOnDisk: 2,
+ },
+ NetworkId: 1,
+ LightPeers: 100,
+ UltraLightFraction: 75,
+ DatabaseCache: 512,
+ TrieCleanCache: 256,
+ TrieDirtyCache: 256,
+ TrieTimeout: 60 * time.Minute,
+ Miner: miner.Config{
+ GasFloor: 8000000,
+ GasCeil: 8000000,
+ GasPrice: big.NewInt(params.GWei),
+ Recommit: 3 * time.Second,
+ },
+ TxPool: core.DefaultTxPoolConfig,
+ GPO: gasprice.Config{
+ Blocks: 20,
+ Percentile: 60,
+ },
+}
+
+func init() {
+ home := os.Getenv("HOME")
+ if home == "" {
+ if user, err := user.Current(); err == nil {
+ home = user.HomeDir
+ }
+ }
+ if runtime.GOOS == "darwin" {
+ DefaultConfig.Ethash.DatasetDir = filepath.Join(home, "Library", "Ethash")
+ } else if runtime.GOOS == "windows" {
+ localappdata := os.Getenv("LOCALAPPDATA")
+ if localappdata != "" {
+ DefaultConfig.Ethash.DatasetDir = filepath.Join(localappdata, "Ethash")
+ } else {
+ DefaultConfig.Ethash.DatasetDir = filepath.Join(home, "AppData", "Local", "Ethash")
+ }
+ } else {
+ DefaultConfig.Ethash.DatasetDir = filepath.Join(home, ".ethash")
+ }
+}
+
+//go:generate gencodec -type Config -formats toml -out gen_config.go
+
+type Config struct {
+ // The genesis block, which is inserted if the database is empty.
+ // If nil, the Ethereum main net block is used.
+ Genesis *core.Genesis `toml:",omitempty"`
+
+ // Protocol options
+ NetworkId uint64 // Network ID to use for selecting peers to connect to
+ SyncMode downloader.SyncMode
+
+ NoPruning bool // Whether to disable pruning and flush everything to disk
+ NoPrefetch bool // Whether to disable prefetching and only load state on demand
+
+ // Whitelist of required block number -> hash values to accept
+ Whitelist map[uint64]common.Hash `toml:"-"`
+
+ // Light client options
+ LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests
+ LightIngress int `toml:",omitempty"` // Incoming bandwidth limit for light servers
+ LightEgress int `toml:",omitempty"` // Outgoing bandwidth limit for light servers
+ LightPeers int `toml:",omitempty"` // Maximum number of LES client peers
+
+ // Ultra Light client options
+ UltraLightServers []string `toml:",omitempty"` // List of trusted ultra light servers
+ UltraLightFraction int `toml:",omitempty"` // Percentage of trusted servers to accept an announcement
+ UltraLightOnlyAnnounce bool `toml:",omitempty"` // Whether to only announce headers, or also serve them
+
+ // Database options
+ SkipBcVersionCheck bool `toml:"-"`
+ DatabaseHandles int `toml:"-"`
+ DatabaseCache int
+ DatabaseFreezer string
+
+ TrieCleanCache int
+ TrieDirtyCache int
+ TrieTimeout time.Duration
+
+ // Mining options
+ Miner miner.Config
+
+ // Ethash options
+ Ethash ethash.Config
+
+ // Transaction pool options
+ TxPool core.TxPoolConfig
+
+ // Gas Price Oracle options
+ GPO gasprice.Config
+
+ // Enables tracking of SHA3 preimages in the VM
+ EnablePreimageRecording bool
+
+ // Miscellaneous options
+ DocRoot string `toml:"-"`
+
+ // Type of the EWASM interpreter ("" for default)
+ EWASMInterpreter string
+
+ // Type of the EVM interpreter ("" for default)
+ EVMInterpreter string
+
+ // RPCGasCap is the global gas cap for eth-call variants.
+ RPCGasCap *big.Int `toml:",omitempty"`
+
+ // Checkpoint is a hardcoded checkpoint which can be nil.
+ Checkpoint *params.TrustedCheckpoint `toml:",omitempty"`
+
+ // CheckpointOracle is the configuration for checkpoint oracle.
+ CheckpointOracle *params.CheckpointOracleConfig `toml:",omitempty"`
+}
diff --git a/eth/enr_entry.go b/eth/enr_entry.go
new file mode 100644
index 0000000..d9e7b95
--- /dev/null
+++ b/eth/enr_entry.go
@@ -0,0 +1,61 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/forkid"
+ "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+// ethEntry is the "eth" ENR entry which advertises eth protocol
+// on the discovery network.
+type ethEntry struct {
+ ForkID forkid.ID // Fork identifier per EIP-2124
+
+ // Ignore additional fields (for forward compatibility).
+ Rest []rlp.RawValue `rlp:"tail"`
+}
+
+// ENRKey implements enr.Entry.
+func (e ethEntry) ENRKey() string {
+ return "eth"
+}
+
+func (eth *Ethereum) startEthEntryUpdate(ln *enode.LocalNode) {
+ var newHead = make(chan core.ChainHeadEvent, 10)
+ sub := eth.blockchain.SubscribeChainHeadEvent(newHead)
+
+ go func() {
+ defer sub.Unsubscribe()
+ for {
+ select {
+ case <-newHead:
+ ln.Set(eth.currentEthEntry())
+ case <-sub.Err():
+ // Would be nice to sync with eth.Stop, but there is no
+ // good way to do that.
+ return
+ }
+ }
+ }()
+}
+
+func (eth *Ethereum) currentEthEntry() *ethEntry {
+ return &ethEntry{ForkID: forkid.NewID(eth.blockchain)}
+}
diff --git a/eth/gen_config.go b/eth/gen_config.go
new file mode 100644
index 0000000..bc4b55b
--- /dev/null
+++ b/eth/gen_config.go
@@ -0,0 +1,221 @@
+// Code generated by github.com/fjl/gencodec. DO NOT EDIT.
+
+package eth
+
+import (
+ "math/big"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/consensus/ethash"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/eth/gasprice"
+ "github.com/ethereum/go-ethereum/miner"
+ "github.com/ethereum/go-ethereum/params"
+)
+
+// MarshalTOML marshals as TOML.
+func (c Config) MarshalTOML() (interface{}, error) {
+ type Config struct {
+ Genesis *core.Genesis `toml:",omitempty"`
+ NetworkId uint64
+ SyncMode downloader.SyncMode
+ NoPruning bool
+ NoPrefetch bool
+ Whitelist map[uint64]common.Hash `toml:"-"`
+ LightServ int `toml:",omitempty"`
+ LightIngress int `toml:",omitempty"`
+ LightEgress int `toml:",omitempty"`
+ LightPeers int `toml:",omitempty"`
+ UltraLightServers []string `toml:",omitempty"`
+ UltraLightFraction int `toml:",omitempty"`
+ UltraLightOnlyAnnounce bool `toml:",omitempty"`
+ SkipBcVersionCheck bool `toml:"-"`
+ DatabaseHandles int `toml:"-"`
+ DatabaseCache int
+ DatabaseFreezer string
+ TrieCleanCache int
+ TrieDirtyCache int
+ TrieTimeout time.Duration
+ Miner miner.Config
+ Ethash ethash.Config
+ TxPool core.TxPoolConfig
+ GPO gasprice.Config
+ EnablePreimageRecording bool
+ DocRoot string `toml:"-"`
+ EWASMInterpreter string
+ EVMInterpreter string
+ RPCGasCap *big.Int `toml:",omitempty"`
+ Checkpoint *params.TrustedCheckpoint `toml:",omitempty"`
+ CheckpointOracle *params.CheckpointOracleConfig `toml:",omitempty"`
+ }
+ var enc Config
+ enc.Genesis = c.Genesis
+ enc.NetworkId = c.NetworkId
+ enc.SyncMode = c.SyncMode
+ enc.NoPruning = c.NoPruning
+ enc.NoPrefetch = c.NoPrefetch
+ enc.Whitelist = c.Whitelist
+ enc.LightServ = c.LightServ
+ enc.LightIngress = c.LightIngress
+ enc.LightEgress = c.LightEgress
+ enc.LightPeers = c.LightPeers
+ enc.UltraLightServers = c.UltraLightServers
+ enc.UltraLightFraction = c.UltraLightFraction
+ enc.UltraLightOnlyAnnounce = c.UltraLightOnlyAnnounce
+ enc.SkipBcVersionCheck = c.SkipBcVersionCheck
+ enc.DatabaseHandles = c.DatabaseHandles
+ enc.DatabaseCache = c.DatabaseCache
+ enc.DatabaseFreezer = c.DatabaseFreezer
+ enc.TrieCleanCache = c.TrieCleanCache
+ enc.TrieDirtyCache = c.TrieDirtyCache
+ enc.TrieTimeout = c.TrieTimeout
+ enc.Miner = c.Miner
+ enc.Ethash = c.Ethash
+ enc.TxPool = c.TxPool
+ enc.GPO = c.GPO
+ enc.EnablePreimageRecording = c.EnablePreimageRecording
+ enc.DocRoot = c.DocRoot
+ enc.EWASMInterpreter = c.EWASMInterpreter
+ enc.EVMInterpreter = c.EVMInterpreter
+ enc.RPCGasCap = c.RPCGasCap
+ enc.Checkpoint = c.Checkpoint
+ enc.CheckpointOracle = c.CheckpointOracle
+ return &enc, nil
+}
+
+// UnmarshalTOML unmarshals from TOML.
+func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
+ type Config struct {
+ Genesis *core.Genesis `toml:",omitempty"`
+ NetworkId *uint64
+ SyncMode *downloader.SyncMode
+ NoPruning *bool
+ NoPrefetch *bool
+ Whitelist map[uint64]common.Hash `toml:"-"`
+ LightServ *int `toml:",omitempty"`
+ LightIngress *int `toml:",omitempty"`
+ LightEgress *int `toml:",omitempty"`
+ LightPeers *int `toml:",omitempty"`
+ UltraLightServers []string `toml:",omitempty"`
+ UltraLightFraction *int `toml:",omitempty"`
+ UltraLightOnlyAnnounce *bool `toml:",omitempty"`
+ SkipBcVersionCheck *bool `toml:"-"`
+ DatabaseHandles *int `toml:"-"`
+ DatabaseCache *int
+ DatabaseFreezer *string
+ TrieCleanCache *int
+ TrieDirtyCache *int
+ TrieTimeout *time.Duration
+ Miner *miner.Config
+ Ethash *ethash.Config
+ TxPool *core.TxPoolConfig
+ GPO *gasprice.Config
+ EnablePreimageRecording *bool
+ DocRoot *string `toml:"-"`
+ EWASMInterpreter *string
+ EVMInterpreter *string
+ RPCGasCap *big.Int `toml:",omitempty"`
+ Checkpoint *params.TrustedCheckpoint `toml:",omitempty"`
+ CheckpointOracle *params.CheckpointOracleConfig `toml:",omitempty"`
+ }
+ var dec Config
+ if err := unmarshal(&dec); err != nil {
+ return err
+ }
+ if dec.Genesis != nil {
+ c.Genesis = dec.Genesis
+ }
+ if dec.NetworkId != nil {
+ c.NetworkId = *dec.NetworkId
+ }
+ if dec.SyncMode != nil {
+ c.SyncMode = *dec.SyncMode
+ }
+ if dec.NoPruning != nil {
+ c.NoPruning = *dec.NoPruning
+ }
+ if dec.NoPrefetch != nil {
+ c.NoPrefetch = *dec.NoPrefetch
+ }
+ if dec.Whitelist != nil {
+ c.Whitelist = dec.Whitelist
+ }
+ if dec.LightServ != nil {
+ c.LightServ = *dec.LightServ
+ }
+ if dec.LightIngress != nil {
+ c.LightIngress = *dec.LightIngress
+ }
+ if dec.LightEgress != nil {
+ c.LightEgress = *dec.LightEgress
+ }
+ if dec.LightPeers != nil {
+ c.LightPeers = *dec.LightPeers
+ }
+ if dec.UltraLightServers != nil {
+ c.UltraLightServers = dec.UltraLightServers
+ }
+ if dec.UltraLightFraction != nil {
+ c.UltraLightFraction = *dec.UltraLightFraction
+ }
+ if dec.UltraLightOnlyAnnounce != nil {
+ c.UltraLightOnlyAnnounce = *dec.UltraLightOnlyAnnounce
+ }
+ if dec.SkipBcVersionCheck != nil {
+ c.SkipBcVersionCheck = *dec.SkipBcVersionCheck
+ }
+ if dec.DatabaseHandles != nil {
+ c.DatabaseHandles = *dec.DatabaseHandles
+ }
+ if dec.DatabaseCache != nil {
+ c.DatabaseCache = *dec.DatabaseCache
+ }
+ if dec.DatabaseFreezer != nil {
+ c.DatabaseFreezer = *dec.DatabaseFreezer
+ }
+ if dec.TrieCleanCache != nil {
+ c.TrieCleanCache = *dec.TrieCleanCache
+ }
+ if dec.TrieDirtyCache != nil {
+ c.TrieDirtyCache = *dec.TrieDirtyCache
+ }
+ if dec.TrieTimeout != nil {
+ c.TrieTimeout = *dec.TrieTimeout
+ }
+ if dec.Miner != nil {
+ c.Miner = *dec.Miner
+ }
+ if dec.Ethash != nil {
+ c.Ethash = *dec.Ethash
+ }
+ if dec.TxPool != nil {
+ c.TxPool = *dec.TxPool
+ }
+ if dec.GPO != nil {
+ c.GPO = *dec.GPO
+ }
+ if dec.EnablePreimageRecording != nil {
+ c.EnablePreimageRecording = *dec.EnablePreimageRecording
+ }
+ if dec.DocRoot != nil {
+ c.DocRoot = *dec.DocRoot
+ }
+ if dec.EWASMInterpreter != nil {
+ c.EWASMInterpreter = *dec.EWASMInterpreter
+ }
+ if dec.EVMInterpreter != nil {
+ c.EVMInterpreter = *dec.EVMInterpreter
+ }
+ if dec.RPCGasCap != nil {
+ c.RPCGasCap = dec.RPCGasCap
+ }
+ if dec.Checkpoint != nil {
+ c.Checkpoint = dec.Checkpoint
+ }
+ if dec.CheckpointOracle != nil {
+ c.CheckpointOracle = dec.CheckpointOracle
+ }
+ return nil
+}
diff --git a/eth/handler.go b/eth/handler.go
new file mode 100644
index 0000000..4ce2d1c
--- /dev/null
+++ b/eth/handler.go
@@ -0,0 +1,844 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "math"
+ "math/big"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/consensus"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/eth/fetcher"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/params"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/trie"
+)
+
+const (
+ softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
+ estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
+
+ // txChanSize is the size of channel listening to NewTxsEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
+
+ // minimim number of peers to broadcast new blocks to
+ minBroadcastPeers = 4
+)
+
+var (
+ syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge
+)
+
+func errResp(code errCode, format string, v ...interface{}) error {
+ return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
+}
+
+type ProtocolManager struct {
+ networkID uint64
+
+ fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
+ acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
+
+ checkpointNumber uint64 // Block number for the sync progress validator to cross reference
+ checkpointHash common.Hash // Block hash for the sync progress validator to cross reference
+
+ txpool txPool
+ blockchain *core.BlockChain
+ maxPeers int
+
+ downloader *downloader.Downloader
+ fetcher *fetcher.Fetcher
+ peers *peerSet
+
+ eventMux *event.TypeMux
+ txsCh chan core.NewTxsEvent
+ txsSub event.Subscription
+ minedBlockSub *event.TypeMuxSubscription
+
+ whitelist map[uint64]common.Hash
+
+ // channels for fetcher, syncer, txsyncLoop
+ newPeerCh chan *peer
+ txsyncCh chan *txsync
+ quitSync chan struct{}
+ noMorePeers chan struct{}
+
+ // wait group is used for graceful shutdowns during downloading
+ // and processing
+ wg sync.WaitGroup
+}
+
+// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
+// with the Ethereum network.
+func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
+ // Create the protocol manager with the base fields
+ manager := &ProtocolManager{
+ networkID: networkID,
+ eventMux: mux,
+ txpool: txpool,
+ blockchain: blockchain,
+ peers: newPeerSet(),
+ whitelist: whitelist,
+ newPeerCh: make(chan *peer),
+ noMorePeers: make(chan struct{}),
+ txsyncCh: make(chan *txsync),
+ quitSync: make(chan struct{}),
+ }
+ if mode == downloader.FullSync {
+ // The database seems empty as the current block is the genesis. Yet the fast
+ // block is ahead, so fast sync was enabled for this node at a certain point.
+ // The scenarios where this can happen is
+ // * if the user manually (or via a bad block) rolled back a fast sync node
+ // below the sync point.
+ // * the last fast sync is not finished while user specifies a full sync this
+ // time. But we don't have any recent state for full sync.
+ // In these cases however it's safe to reenable fast sync.
+ fullBlock, fastBlock := blockchain.CurrentBlock(), blockchain.CurrentFastBlock()
+ if fullBlock.NumberU64() == 0 && fastBlock.NumberU64() > 0 {
+ manager.fastSync = uint32(1)
+ log.Warn("Switch sync mode from full sync to fast sync")
+ }
+ } else {
+ if blockchain.CurrentBlock().NumberU64() > 0 {
+ // Print warning log if database is not empty to run fast sync.
+ log.Warn("Switch sync mode from fast sync to full sync")
+ } else {
+ // If fast sync was requested and our database is empty, grant it
+ manager.fastSync = uint32(1)
+ }
+ }
+ // If we have trusted checkpoints, enforce them on the chain
+ if checkpoint != nil {
+ manager.checkpointNumber = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1
+ manager.checkpointHash = checkpoint.SectionHead
+ }
+
+ // Construct the downloader (long sync) and its backing state bloom if fast
+ // sync is requested. The downloader is responsible for deallocating the state
+ // bloom when it's done.
+ var stateBloom *trie.SyncBloom
+ if atomic.LoadUint32(&manager.fastSync) == 1 {
+ stateBloom = trie.NewSyncBloom(uint64(cacheLimit), chaindb)
+ }
+ manager.downloader = downloader.New(manager.checkpointNumber, chaindb, stateBloom, manager.eventMux, blockchain, nil, manager.removePeer)
+
+ // Construct the fetcher (short sync)
+ validator := func(header *types.Header) error {
+ return engine.VerifyHeader(blockchain, header, true)
+ }
+ heighter := func() uint64 {
+ return blockchain.CurrentBlock().NumberU64()
+ }
+ inserter := func(blocks types.Blocks) (int, error) {
+ // If sync hasn't reached the checkpoint yet, deny importing weird blocks.
+ //
+ // Ideally we would also compare the head block's timestamp and similarly reject
+ // the propagated block if the head is too old. Unfortunately there is a corner
+ // case when starting new networks, where the genesis might be ancient (0 unix)
+ // which would prevent full nodes from accepting it.
+ if manager.blockchain.CurrentBlock().NumberU64() < manager.checkpointNumber {
+ log.Warn("Unsynced yet, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
+ return 0, nil
+ }
+ // If fast sync is running, deny importing weird blocks. This is a problematic
+ // clause when starting up a new network, because fast-syncing miners might not
+ // accept each others' blocks until a restart. Unfortunately we haven't figured
+ // out a way yet where nodes can decide unilaterally whether the network is new
+ // or not. This should be fixed if we figure out a solution.
+ if atomic.LoadUint32(&manager.fastSync) == 1 {
+ log.Warn("Fast syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
+ return 0, nil
+ }
+ n, err := manager.blockchain.InsertChain(blocks)
+ if err == nil {
+ atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
+ }
+ return n, err
+ }
+ manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
+
+ return manager, nil
+}
+
+func (pm *ProtocolManager) makeProtocol(version uint) p2p.Protocol {
+ length, ok := protocolLengths[version]
+ if !ok {
+ panic("makeProtocol for unknown version")
+ }
+
+ return p2p.Protocol{
+ Name: protocolName,
+ Version: version,
+ Length: length,
+ Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ peer := pm.newPeer(int(version), p, rw)
+ select {
+ case pm.newPeerCh <- peer:
+ pm.wg.Add(1)
+ defer pm.wg.Done()
+ return pm.handle(peer)
+ case <-pm.quitSync:
+ return p2p.DiscQuitting
+ }
+ },
+ NodeInfo: func() interface{} {
+ return pm.NodeInfo()
+ },
+ PeerInfo: func(id enode.ID) interface{} {
+ if p := pm.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
+ return p.Info()
+ }
+ return nil
+ },
+ }
+}
+
+func (pm *ProtocolManager) removePeer(id string) {
+ // Short circuit if the peer was already removed
+ peer := pm.peers.Peer(id)
+ if peer == nil {
+ return
+ }
+ log.Debug("Removing Ethereum peer", "peer", id)
+
+ // Unregister the peer from the downloader and Ethereum peer set
+ pm.downloader.UnregisterPeer(id)
+ if err := pm.peers.Unregister(id); err != nil {
+ log.Error("Peer removal failed", "peer", id, "err", err)
+ }
+ // Hard disconnect at the networking layer
+ if peer != nil {
+ peer.Peer.Disconnect(p2p.DiscUselessPeer)
+ }
+}
+
+func (pm *ProtocolManager) Start(maxPeers int) {
+ pm.maxPeers = maxPeers
+
+ // broadcast transactions
+ pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
+ pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
+ go pm.txBroadcastLoop()
+
+ // broadcast mined blocks
+ pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
+ go pm.minedBroadcastLoop()
+
+ // start sync handlers
+ go pm.syncer()
+ go pm.txsyncLoop()
+}
+
+func (pm *ProtocolManager) Stop() {
+ log.Info("Stopping Ethereum protocol")
+
+ pm.txsSub.Unsubscribe() // quits txBroadcastLoop
+ pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
+
+ // Quit the sync loop.
+ // After this send has completed, no new peers will be accepted.
+ pm.noMorePeers <- struct{}{}
+
+ // Quit fetcher, txsyncLoop.
+ close(pm.quitSync)
+
+ // Disconnect existing sessions.
+ // This also closes the gate for any new registrations on the peer set.
+ // sessions which are already established but not added to pm.peers yet
+ // will exit when they try to register.
+ pm.peers.Close()
+
+ // Wait for all peer handler goroutines and the loops to come down.
+ pm.wg.Wait()
+
+ log.Info("Ethereum protocol stopped")
+}
+
+func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
+ return newPeer(pv, p, newMeteredMsgWriter(rw))
+}
+
+// handle is the callback invoked to manage the life cycle of an eth peer. When
+// this function terminates, the peer is disconnected.
+func (pm *ProtocolManager) handle(p *peer) error {
+ // Ignore maxPeers if this is a trusted peer
+ if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
+ return p2p.DiscTooManyPeers
+ }
+ p.Log().Debug("Ethereum peer connected", "name", p.Name())
+
+ // Execute the Ethereum handshake
+ var (
+ genesis = pm.blockchain.Genesis()
+ head = pm.blockchain.CurrentHeader()
+ hash = head.Hash()
+ number = head.Number.Uint64()
+ td = pm.blockchain.GetTd(hash, number)
+ )
+ if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
+ p.Log().Debug("Ethereum handshake failed", "err", err)
+ return err
+ }
+ if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
+ rw.Init(p.version)
+ }
+ // Register the peer locally
+ if err := pm.peers.Register(p); err != nil {
+ p.Log().Error("Ethereum peer registration failed", "err", err)
+ return err
+ }
+ defer pm.removePeer(p.id)
+
+ // Register the peer in the downloader. If the downloader considers it banned, we disconnect
+ if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
+ return err
+ }
+ // Propagate existing transactions. new transactions appearing
+ // after this will be sent via broadcasts.
+ pm.syncTransactions(p)
+
+ // If we have a trusted CHT, reject all peers below that (avoid fast sync eclipse)
+ if pm.checkpointHash != (common.Hash{}) {
+ // Request the peer's checkpoint header for chain height/weight validation
+ if err := p.RequestHeadersByNumber(pm.checkpointNumber, 1, 0, false); err != nil {
+ return err
+ }
+ // Start a timer to disconnect if the peer doesn't reply in time
+ p.syncDrop = time.AfterFunc(syncChallengeTimeout, func() {
+ p.Log().Warn("Checkpoint challenge timed out, dropping", "addr", p.RemoteAddr(), "type", p.Name())
+ pm.removePeer(p.id)
+ })
+ // Make sure it's cleaned up if the peer dies off
+ defer func() {
+ if p.syncDrop != nil {
+ p.syncDrop.Stop()
+ p.syncDrop = nil
+ }
+ }()
+ }
+ // If we have any explicit whitelist block hashes, request them
+ for number := range pm.whitelist {
+ if err := p.RequestHeadersByNumber(number, 1, 0, false); err != nil {
+ return err
+ }
+ }
+ // Handle incoming messages until the connection is torn down
+ for {
+ if err := pm.handleMsg(p); err != nil {
+ p.Log().Debug("Ethereum message handling failed", "err", err)
+ return err
+ }
+ }
+}
+
+// handleMsg is invoked whenever an inbound message is received from a remote
+// peer. The remote connection is torn down upon returning any error.
+func (pm *ProtocolManager) handleMsg(p *peer) error {
+ // Read the next message from the remote peer, and ensure it's fully consumed
+ msg, err := p.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if msg.Size > protocolMaxMsgSize {
+ return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, protocolMaxMsgSize)
+ }
+ defer msg.Discard()
+
+ // Handle the message depending on its contents
+ switch {
+ case msg.Code == StatusMsg:
+ // Status messages should never arrive after the handshake
+ return errResp(ErrExtraStatusMsg, "uncontrolled status message")
+
+ // Block header query, collect the requested headers and reply
+ case msg.Code == GetBlockHeadersMsg:
+ // Decode the complex header query
+ var query getBlockHeadersData
+ if err := msg.Decode(&query); err != nil {
+ return errResp(ErrDecode, "%v: %v", msg, err)
+ }
+ hashMode := query.Origin.Hash != (common.Hash{})
+ first := true
+ maxNonCanonical := uint64(100)
+
+ // Gather headers until the fetch or network limits is reached
+ var (
+ bytes common.StorageSize
+ headers []*types.Header
+ unknown bool
+ )
+ for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
+ // Retrieve the next header satisfying the query
+ var origin *types.Header
+ if hashMode {
+ if first {
+ first = false
+ origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
+ if origin != nil {
+ query.Origin.Number = origin.Number.Uint64()
+ }
+ } else {
+ origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
+ }
+ } else {
+ origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
+ }
+ if origin == nil {
+ break
+ }
+ headers = append(headers, origin)
+ bytes += estHeaderRlpSize
+
+ // Advance to the next header of the query
+ switch {
+ case hashMode && query.Reverse:
+ // Hash based traversal towards the genesis block
+ ancestor := query.Skip + 1
+ if ancestor == 0 {
+ unknown = true
+ } else {
+ query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
+ unknown = (query.Origin.Hash == common.Hash{})
+ }
+ case hashMode && !query.Reverse:
+ // Hash based traversal towards the leaf block
+ var (
+ current = origin.Number.Uint64()
+ next = current + query.Skip + 1
+ )
+ if next <= current {
+ infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
+ p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
+ unknown = true
+ } else {
+ if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
+ nextHash := header.Hash()
+ expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
+ if expOldHash == query.Origin.Hash {
+ query.Origin.Hash, query.Origin.Number = nextHash, next
+ } else {
+ unknown = true
+ }
+ } else {
+ unknown = true
+ }
+ }
+ case query.Reverse:
+ // Number based traversal towards the genesis block
+ if query.Origin.Number >= query.Skip+1 {
+ query.Origin.Number -= query.Skip + 1
+ } else {
+ unknown = true
+ }
+
+ case !query.Reverse:
+ // Number based traversal towards the leaf block
+ query.Origin.Number += query.Skip + 1
+ }
+ }
+ return p.SendBlockHeaders(headers)
+
+ case msg.Code == BlockHeadersMsg:
+ // A batch of headers arrived to one of our previous requests
+ var headers []*types.Header
+ if err := msg.Decode(&headers); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // If no headers were received, but we're expencting a checkpoint header, consider it that
+ if len(headers) == 0 && p.syncDrop != nil {
+ // Stop the timer either way, decide later to drop or not
+ p.syncDrop.Stop()
+ p.syncDrop = nil
+
+ // If we're doing a fast sync, we must enforce the checkpoint block to avoid
+ // eclipse attacks. Unsynced nodes are welcome to connect after we're done
+ // joining the network
+ if atomic.LoadUint32(&pm.fastSync) == 1 {
+ p.Log().Warn("Dropping unsynced node during fast sync", "addr", p.RemoteAddr(), "type", p.Name())
+ return errors.New("unsynced node cannot serve fast sync")
+ }
+ }
+ // Filter out any explicitly requested headers, deliver the rest to the downloader
+ filter := len(headers) == 1
+ if filter {
+ // If it's a potential sync progress check, validate the content and advertised chain weight
+ if p.syncDrop != nil && headers[0].Number.Uint64() == pm.checkpointNumber {
+ // Disable the sync drop timer
+ p.syncDrop.Stop()
+ p.syncDrop = nil
+
+ // Validate the header and either drop the peer or continue
+ if headers[0].Hash() != pm.checkpointHash {
+ return errors.New("checkpoint hash mismatch")
+ }
+ return nil
+ }
+ // Otherwise if it's a whitelisted block, validate against the set
+ if want, ok := pm.whitelist[headers[0].Number.Uint64()]; ok {
+ if hash := headers[0].Hash(); want != hash {
+ p.Log().Info("Whitelist mismatch, dropping peer", "number", headers[0].Number.Uint64(), "hash", hash, "want", want)
+ return errors.New("whitelist block mismatch")
+ }
+ p.Log().Debug("Whitelist block verified", "number", headers[0].Number.Uint64(), "hash", want)
+ }
+ // Irrelevant of the fork checks, send the header to the fetcher just in case
+ headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
+ }
+ if len(headers) > 0 || !filter {
+ err := pm.downloader.DeliverHeaders(p.id, headers)
+ if err != nil {
+ log.Debug("Failed to deliver headers", "err", err)
+ }
+ }
+
+ case msg.Code == GetBlockBodiesMsg:
+ // Decode the retrieval message
+ msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+ // Gather blocks until the fetch or network limits is reached
+ var (
+ hash common.Hash
+ bytes int
+ bodies []rlp.RawValue
+ )
+ for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
+ // Retrieve the hash of the next block
+ if err := msgStream.Decode(&hash); err == rlp.EOL {
+ break
+ } else if err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Retrieve the requested block body, stopping if enough was found
+ if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 {
+ bodies = append(bodies, data)
+ bytes += len(data)
+ }
+ }
+ return p.SendBlockBodiesRLP(bodies)
+
+ case msg.Code == BlockBodiesMsg:
+ // A batch of block bodies arrived to one of our previous requests
+ var request blockBodiesData
+ if err := msg.Decode(&request); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Deliver them all to the downloader for queuing
+ transactions := make([][]*types.Transaction, len(request))
+ uncles := make([][]*types.Header, len(request))
+
+ for i, body := range request {
+ transactions[i] = body.Transactions
+ uncles[i] = body.Uncles
+ }
+ // Filter out any explicitly requested bodies, deliver the rest to the downloader
+ filter := len(transactions) > 0 || len(uncles) > 0
+ if filter {
+ transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
+ }
+ if len(transactions) > 0 || len(uncles) > 0 || !filter {
+ err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
+ if err != nil {
+ log.Debug("Failed to deliver bodies", "err", err)
+ }
+ }
+
+ case p.version >= eth63 && msg.Code == GetNodeDataMsg:
+ // Decode the retrieval message
+ msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+ // Gather state data until the fetch or network limits is reached
+ var (
+ hash common.Hash
+ bytes int
+ data [][]byte
+ )
+ for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch {
+ // Retrieve the hash of the next state entry
+ if err := msgStream.Decode(&hash); err == rlp.EOL {
+ break
+ } else if err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Retrieve the requested state entry, stopping if enough was found
+ if entry, err := pm.blockchain.TrieNode(hash); err == nil {
+ data = append(data, entry)
+ bytes += len(entry)
+ }
+ }
+ return p.SendNodeData(data)
+
+ case p.version >= eth63 && msg.Code == NodeDataMsg:
+ // A batch of node state data arrived to one of our previous requests
+ var data [][]byte
+ if err := msg.Decode(&data); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Deliver all to the downloader
+ if err := pm.downloader.DeliverNodeData(p.id, data); err != nil {
+ log.Debug("Failed to deliver node state data", "err", err)
+ }
+
+ case p.version >= eth63 && msg.Code == GetReceiptsMsg:
+ // Decode the retrieval message
+ msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+ // Gather state data until the fetch or network limits is reached
+ var (
+ hash common.Hash
+ bytes int
+ receipts []rlp.RawValue
+ )
+ for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch {
+ // Retrieve the hash of the next block
+ if err := msgStream.Decode(&hash); err == rlp.EOL {
+ break
+ } else if err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Retrieve the requested block's receipts, skipping if unknown to us
+ results := pm.blockchain.GetReceiptsByHash(hash)
+ if results == nil {
+ if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
+ continue
+ }
+ }
+ // If known, encode and queue for response packet
+ if encoded, err := rlp.EncodeToBytes(results); err != nil {
+ log.Error("Failed to encode receipt", "err", err)
+ } else {
+ receipts = append(receipts, encoded)
+ bytes += len(encoded)
+ }
+ }
+ return p.SendReceiptsRLP(receipts)
+
+ case p.version >= eth63 && msg.Code == ReceiptsMsg:
+ // A batch of receipts arrived to one of our previous requests
+ var receipts [][]*types.Receipt
+ if err := msg.Decode(&receipts); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Deliver all to the downloader
+ if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil {
+ log.Debug("Failed to deliver receipts", "err", err)
+ }
+
+ case msg.Code == NewBlockHashesMsg:
+ var announces newBlockHashesData
+ if err := msg.Decode(&announces); err != nil {
+ return errResp(ErrDecode, "%v: %v", msg, err)
+ }
+ // Mark the hashes as present at the remote node
+ for _, block := range announces {
+ p.MarkBlock(block.Hash)
+ }
+ // Schedule all the unknown hashes for retrieval
+ unknown := make(newBlockHashesData, 0, len(announces))
+ for _, block := range announces {
+ if !pm.blockchain.HasBlock(block.Hash, block.Number) {
+ unknown = append(unknown, block)
+ }
+ }
+ for _, block := range unknown {
+ pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
+ }
+
+ case msg.Code == NewBlockMsg:
+ // Retrieve and decode the propagated block
+ var request newBlockData
+ if err := msg.Decode(&request); err != nil {
+ return errResp(ErrDecode, "%v: %v", msg, err)
+ }
+ if err := request.sanityCheck(); err != nil {
+ return err
+ }
+ request.Block.ReceivedAt = msg.ReceivedAt
+ request.Block.ReceivedFrom = p
+
+ // Mark the peer as owning the block and schedule it for import
+ p.MarkBlock(request.Block.Hash())
+ pm.fetcher.Enqueue(p.id, request.Block)
+
+ // Assuming the block is importable by the peer, but possibly not yet done so,
+ // calculate the head hash and TD that the peer truly must have.
+ var (
+ trueHead = request.Block.ParentHash()
+ trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty())
+ )
+ // Update the peer's total difficulty if better than the previous
+ if _, td := p.Head(); trueTD.Cmp(td) > 0 {
+ p.SetHead(trueHead, trueTD)
+
+ // Schedule a sync if above ours. Note, this will not fire a sync for a gap of
+ // a single block (as the true TD is below the propagated block), however this
+ // scenario should easily be covered by the fetcher.
+ currentBlock := pm.blockchain.CurrentBlock()
+ if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
+ go pm.synchronise(p)
+ }
+ }
+
+ case msg.Code == TxMsg:
+ // Transactions arrived, make sure we have a valid and fresh chain to handle them
+ if atomic.LoadUint32(&pm.acceptTxs) == 0 {
+ break
+ }
+ // Transactions can be processed, parse all of them and deliver to the pool
+ var txs []*types.Transaction
+ if err := msg.Decode(&txs); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ for i, tx := range txs {
+ // Validate and mark the remote transaction
+ if tx == nil {
+ return errResp(ErrDecode, "transaction %d is nil", i)
+ }
+ p.MarkTransaction(tx.Hash())
+ }
+ pm.txpool.AddRemotes(txs)
+
+ default:
+ return errResp(ErrInvalidMsgCode, "%v", msg.Code)
+ }
+ return nil
+}
+
+// BroadcastBlock will either propagate a block to a subset of it's peers, or
+// will only announce it's availability (depending what's requested).
+func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
+ hash := block.Hash()
+ peers := pm.peers.PeersWithoutBlock(hash)
+
+ // If propagation is requested, send to a subset of the peer
+ if propagate {
+ // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
+ var td *big.Int
+ if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
+ td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
+ } else {
+ log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
+ return
+ }
+ // Send the block to a subset of our peers
+ transferLen := int(math.Sqrt(float64(len(peers))))
+ if transferLen < minBroadcastPeers {
+ transferLen = minBroadcastPeers
+ }
+ if transferLen > len(peers) {
+ transferLen = len(peers)
+ }
+ transfer := peers[:transferLen]
+ for _, peer := range transfer {
+ peer.AsyncSendNewBlock(block, td)
+ }
+ log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
+ return
+ }
+ // Otherwise if the block is indeed in out own chain, announce it
+ if pm.blockchain.HasBlock(hash, block.NumberU64()) {
+ for _, peer := range peers {
+ peer.AsyncSendNewBlockHash(block)
+ }
+ log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
+ }
+}
+
+// BroadcastTxs will propagate a batch of transactions to all peers which are not known to
+// already have the given transaction.
+func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
+ var txset = make(map[*peer]types.Transactions)
+
+ // Broadcast transactions to a batch of peers not knowing about it
+ for _, tx := range txs {
+ peers := pm.peers.PeersWithoutTx(tx.Hash())
+ for _, peer := range peers {
+ txset[peer] = append(txset[peer], tx)
+ }
+ log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
+ }
+ // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
+ for peer, txs := range txset {
+ peer.AsyncSendTransactions(txs)
+ }
+}
+
+// Mined broadcast loop
+func (pm *ProtocolManager) minedBroadcastLoop() {
+ // automatically stops if unsubscribe
+ for obj := range pm.minedBlockSub.Chan() {
+ if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
+ pm.BroadcastBlock(ev.Block, true) // First propagate block to peers
+ pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
+ }
+ }
+}
+
+func (pm *ProtocolManager) txBroadcastLoop() {
+ for {
+ select {
+ case event := <-pm.txsCh:
+ pm.BroadcastTxs(event.Txs)
+
+ // Err() channel will be closed when unsubscribing.
+ case <-pm.txsSub.Err():
+ return
+ }
+ }
+}
+
+// NodeInfo represents a short summary of the Ethereum sub-protocol metadata
+// known about the host peer.
+type NodeInfo struct {
+ Network uint64 `json:"network"` // Ethereum network ID (1=Frontier, 2=Morden, Ropsten=3, Rinkeby=4)
+ Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain
+ Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block
+ Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules
+ Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block
+}
+
+// NodeInfo retrieves some protocol metadata about the running host node.
+func (pm *ProtocolManager) NodeInfo() *NodeInfo {
+ currentBlock := pm.blockchain.CurrentBlock()
+ return &NodeInfo{
+ Network: pm.networkID,
+ Difficulty: pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()),
+ Genesis: pm.blockchain.Genesis().Hash(),
+ Config: pm.blockchain.Config(),
+ Head: currentBlock.Hash(),
+ }
+}
diff --git a/eth/metrics.go b/eth/metrics.go
new file mode 100644
index 0000000..0533a2a
--- /dev/null
+++ b/eth/metrics.go
@@ -0,0 +1,139 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/p2p"
+)
+
+var (
+ propTxnInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/txns/in/packets", nil)
+ propTxnInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/txns/in/traffic", nil)
+ propTxnOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/txns/out/packets", nil)
+ propTxnOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/txns/out/traffic", nil)
+ propHashInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/hashes/in/packets", nil)
+ propHashInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/hashes/in/traffic", nil)
+ propHashOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/hashes/out/packets", nil)
+ propHashOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/hashes/out/traffic", nil)
+ propBlockInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/blocks/in/packets", nil)
+ propBlockInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/blocks/in/traffic", nil)
+ propBlockOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/blocks/out/packets", nil)
+ propBlockOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/blocks/out/traffic", nil)
+ reqHeaderInPacketsMeter = metrics.NewRegisteredMeter("eth/req/headers/in/packets", nil)
+ reqHeaderInTrafficMeter = metrics.NewRegisteredMeter("eth/req/headers/in/traffic", nil)
+ reqHeaderOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/headers/out/packets", nil)
+ reqHeaderOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/headers/out/traffic", nil)
+ reqBodyInPacketsMeter = metrics.NewRegisteredMeter("eth/req/bodies/in/packets", nil)
+ reqBodyInTrafficMeter = metrics.NewRegisteredMeter("eth/req/bodies/in/traffic", nil)
+ reqBodyOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/bodies/out/packets", nil)
+ reqBodyOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/bodies/out/traffic", nil)
+ reqStateInPacketsMeter = metrics.NewRegisteredMeter("eth/req/states/in/packets", nil)
+ reqStateInTrafficMeter = metrics.NewRegisteredMeter("eth/req/states/in/traffic", nil)
+ reqStateOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/states/out/packets", nil)
+ reqStateOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/states/out/traffic", nil)
+ reqReceiptInPacketsMeter = metrics.NewRegisteredMeter("eth/req/receipts/in/packets", nil)
+ reqReceiptInTrafficMeter = metrics.NewRegisteredMeter("eth/req/receipts/in/traffic", nil)
+ reqReceiptOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/receipts/out/packets", nil)
+ reqReceiptOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/receipts/out/traffic", nil)
+ miscInPacketsMeter = metrics.NewRegisteredMeter("eth/misc/in/packets", nil)
+ miscInTrafficMeter = metrics.NewRegisteredMeter("eth/misc/in/traffic", nil)
+ miscOutPacketsMeter = metrics.NewRegisteredMeter("eth/misc/out/packets", nil)
+ miscOutTrafficMeter = metrics.NewRegisteredMeter("eth/misc/out/traffic", nil)
+)
+
+// meteredMsgReadWriter is a wrapper around a p2p.MsgReadWriter, capable of
+// accumulating the above defined metrics based on the data stream contents.
+type meteredMsgReadWriter struct {
+ p2p.MsgReadWriter // Wrapped message stream to meter
+ version int // Protocol version to select correct meters
+}
+
+// newMeteredMsgWriter wraps a p2p MsgReadWriter with metering support. If the
+// metrics system is disabled, this function returns the original object.
+func newMeteredMsgWriter(rw p2p.MsgReadWriter) p2p.MsgReadWriter {
+ if !metrics.Enabled {
+ return rw
+ }
+ return &meteredMsgReadWriter{MsgReadWriter: rw}
+}
+
+// Init sets the protocol version used by the stream to know which meters to
+// increment in case of overlapping message ids between protocol versions.
+func (rw *meteredMsgReadWriter) Init(version int) {
+ rw.version = version
+}
+
+func (rw *meteredMsgReadWriter) ReadMsg() (p2p.Msg, error) {
+ // Read the message and short circuit in case of an error
+ msg, err := rw.MsgReadWriter.ReadMsg()
+ if err != nil {
+ return msg, err
+ }
+ // Account for the data traffic
+ packets, traffic := miscInPacketsMeter, miscInTrafficMeter
+ switch {
+ case msg.Code == BlockHeadersMsg:
+ packets, traffic = reqHeaderInPacketsMeter, reqHeaderInTrafficMeter
+ case msg.Code == BlockBodiesMsg:
+ packets, traffic = reqBodyInPacketsMeter, reqBodyInTrafficMeter
+
+ case rw.version >= eth63 && msg.Code == NodeDataMsg:
+ packets, traffic = reqStateInPacketsMeter, reqStateInTrafficMeter
+ case rw.version >= eth63 && msg.Code == ReceiptsMsg:
+ packets, traffic = reqReceiptInPacketsMeter, reqReceiptInTrafficMeter
+
+ case msg.Code == NewBlockHashesMsg:
+ packets, traffic = propHashInPacketsMeter, propHashInTrafficMeter
+ case msg.Code == NewBlockMsg:
+ packets, traffic = propBlockInPacketsMeter, propBlockInTrafficMeter
+ case msg.Code == TxMsg:
+ packets, traffic = propTxnInPacketsMeter, propTxnInTrafficMeter
+ }
+ packets.Mark(1)
+ traffic.Mark(int64(msg.Size))
+
+ return msg, err
+}
+
+func (rw *meteredMsgReadWriter) WriteMsg(msg p2p.Msg) error {
+ // Account for the data traffic
+ packets, traffic := miscOutPacketsMeter, miscOutTrafficMeter
+ switch {
+ case msg.Code == BlockHeadersMsg:
+ packets, traffic = reqHeaderOutPacketsMeter, reqHeaderOutTrafficMeter
+ case msg.Code == BlockBodiesMsg:
+ packets, traffic = reqBodyOutPacketsMeter, reqBodyOutTrafficMeter
+
+ case rw.version >= eth63 && msg.Code == NodeDataMsg:
+ packets, traffic = reqStateOutPacketsMeter, reqStateOutTrafficMeter
+ case rw.version >= eth63 && msg.Code == ReceiptsMsg:
+ packets, traffic = reqReceiptOutPacketsMeter, reqReceiptOutTrafficMeter
+
+ case msg.Code == NewBlockHashesMsg:
+ packets, traffic = propHashOutPacketsMeter, propHashOutTrafficMeter
+ case msg.Code == NewBlockMsg:
+ packets, traffic = propBlockOutPacketsMeter, propBlockOutTrafficMeter
+ case msg.Code == TxMsg:
+ packets, traffic = propTxnOutPacketsMeter, propTxnOutTrafficMeter
+ }
+ packets.Mark(1)
+ traffic.Mark(int64(msg.Size))
+
+ // Send the packet to the p2p layer
+ return rw.MsgReadWriter.WriteMsg(msg)
+}
diff --git a/eth/peer.go b/eth/peer.go
new file mode 100644
index 0000000..814c787
--- /dev/null
+++ b/eth/peer.go
@@ -0,0 +1,546 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "errors"
+ "fmt"
+ "math/big"
+ "sync"
+ "time"
+
+ mapset "github.com/deckarep/golang-set"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+var (
+ errClosed = errors.New("peer set is closed")
+ errAlreadyRegistered = errors.New("peer is already registered")
+ errNotRegistered = errors.New("peer is not registered")
+)
+
+const (
+ maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+ maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
+
+ // maxQueuedTxs is the maximum number of transaction lists to queue up before
+ // dropping broadcasts. This is a sensitive number as a transaction list might
+ // contain a single transaction, or thousands.
+ maxQueuedTxs = 128
+
+ // maxQueuedProps is the maximum number of block propagations to queue up before
+ // dropping broadcasts. There's not much point in queueing stale blocks, so a few
+ // that might cover uncles should be enough.
+ maxQueuedProps = 4
+
+ // maxQueuedAnns is the maximum number of block announcements to queue up before
+ // dropping broadcasts. Similarly to block propagations, there's no point to queue
+ // above some healthy uncle limit, so use that.
+ maxQueuedAnns = 4
+
+ handshakeTimeout = 5 * time.Second
+)
+
+// PeerInfo represents a short summary of the Ethereum sub-protocol metadata known
+// about a connected peer.
+type PeerInfo struct {
+ Version int `json:"version"` // Ethereum protocol version negotiated
+ Difficulty *big.Int `json:"difficulty"` // Total difficulty of the peer's blockchain
+ Head string `json:"head"` // SHA3 hash of the peer's best owned block
+}
+
+// propEvent is a block propagation, waiting for its turn in the broadcast queue.
+type propEvent struct {
+ block *types.Block
+ td *big.Int
+}
+
+type peer struct {
+ id string
+
+ *p2p.Peer
+ rw p2p.MsgReadWriter
+
+ version int // Protocol version negotiated
+ syncDrop *time.Timer // Timed connection dropper if sync progress isn't validated in time
+
+ head common.Hash
+ td *big.Int
+ lock sync.RWMutex
+
+ knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
+ knownBlocks mapset.Set // Set of block hashes known to be known by this peer
+ queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
+ queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
+ queuedAnns chan *types.Block // Queue of blocks to announce to the peer
+ term chan struct{} // Termination channel to stop the broadcaster
+}
+
+func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
+ return &peer{
+ Peer: p,
+ rw: rw,
+ version: version,
+ id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
+ knownTxs: mapset.NewSet(),
+ knownBlocks: mapset.NewSet(),
+ queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
+ queuedProps: make(chan *propEvent, maxQueuedProps),
+ queuedAnns: make(chan *types.Block, maxQueuedAnns),
+ term: make(chan struct{}),
+ }
+}
+
+// broadcast is a write loop that multiplexes block propagations, announcements
+// and transaction broadcasts into the remote peer. The goal is to have an async
+// writer that does not lock up node internals.
+func (p *peer) broadcast() {
+ for {
+ select {
+ case txs := <-p.queuedTxs:
+ if err := p.SendTransactions(txs); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast transactions", "count", len(txs))
+
+ case prop := <-p.queuedProps:
+ if err := p.SendNewBlock(prop.block, prop.td); err != nil {
+ return
+ }
+ p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)
+
+ case block := <-p.queuedAnns:
+ if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
+ return
+ }
+ p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
+
+ case <-p.term:
+ return
+ }
+ }
+}
+
+// close signals the broadcast goroutine to terminate.
+func (p *peer) close() {
+ close(p.term)
+}
+
+// Info gathers and returns a collection of metadata known about a peer.
+func (p *peer) Info() *PeerInfo {
+ hash, td := p.Head()
+
+ return &PeerInfo{
+ Version: p.version,
+ Difficulty: td,
+ Head: hash.Hex(),
+ }
+}
+
+// Head retrieves a copy of the current head hash and total difficulty of the
+// peer.
+func (p *peer) Head() (hash common.Hash, td *big.Int) {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ copy(hash[:], p.head[:])
+ return hash, new(big.Int).Set(p.td)
+}
+
+// SetHead updates the head hash and total difficulty of the peer.
+func (p *peer) SetHead(hash common.Hash, td *big.Int) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ copy(p.head[:], hash[:])
+ p.td.Set(td)
+}
+
+// MarkBlock marks a block as known for the peer, ensuring that the block will
+// never be propagated to this particular peer.
+func (p *peer) MarkBlock(hash common.Hash) {
+ // If we reached the memory allowance, drop a previously known block hash
+ for p.knownBlocks.Cardinality() >= maxKnownBlocks {
+ p.knownBlocks.Pop()
+ }
+ p.knownBlocks.Add(hash)
+}
+
+// MarkTransaction marks a transaction as known for the peer, ensuring that it
+// will never be propagated to this particular peer.
+func (p *peer) MarkTransaction(hash common.Hash) {
+ // If we reached the memory allowance, drop a previously known transaction hash
+ for p.knownTxs.Cardinality() >= maxKnownTxs {
+ p.knownTxs.Pop()
+ }
+ p.knownTxs.Add(hash)
+}
+
+// SendTransactions sends transactions to the peer and includes the hashes
+// in its transaction hash set for future reference.
+func (p *peer) SendTransactions(txs types.Transactions) error {
+ // Mark all the transactions as known, but ensure we don't overflow our limits
+ for _, tx := range txs {
+ p.knownTxs.Add(tx.Hash())
+ }
+ for p.knownTxs.Cardinality() >= maxKnownTxs {
+ p.knownTxs.Pop()
+ }
+ return p2p.Send(p.rw, TxMsg, txs)
+}
+
+// AsyncSendTransactions queues list of transactions propagation to a remote
+// peer. If the peer's broadcast queue is full, the event is silently dropped.
+func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
+ select {
+ case p.queuedTxs <- txs:
+ // Mark all the transactions as known, but ensure we don't overflow our limits
+ for _, tx := range txs {
+ p.knownTxs.Add(tx.Hash())
+ }
+ for p.knownTxs.Cardinality() >= maxKnownTxs {
+ p.knownTxs.Pop()
+ }
+ default:
+ p.Log().Debug("Dropping transaction propagation", "count", len(txs))
+ }
+}
+
+// SendNewBlockHashes announces the availability of a number of blocks through
+// a hash notification.
+func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
+ // Mark all the block hashes as known, but ensure we don't overflow our limits
+ for _, hash := range hashes {
+ p.knownBlocks.Add(hash)
+ }
+ for p.knownBlocks.Cardinality() >= maxKnownBlocks {
+ p.knownBlocks.Pop()
+ }
+ request := make(newBlockHashesData, len(hashes))
+ for i := 0; i < len(hashes); i++ {
+ request[i].Hash = hashes[i]
+ request[i].Number = numbers[i]
+ }
+ return p2p.Send(p.rw, NewBlockHashesMsg, request)
+}
+
+// AsyncSendNewBlockHash queues the availability of a block for propagation to a
+// remote peer. If the peer's broadcast queue is full, the event is silently
+// dropped.
+func (p *peer) AsyncSendNewBlockHash(block *types.Block) {
+ select {
+ case p.queuedAnns <- block:
+ // Mark all the block hash as known, but ensure we don't overflow our limits
+ p.knownBlocks.Add(block.Hash())
+ for p.knownBlocks.Cardinality() >= maxKnownBlocks {
+ p.knownBlocks.Pop()
+ }
+ default:
+ p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
+ }
+}
+
+// SendNewBlock propagates an entire block to a remote peer.
+func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
+ // Mark all the block hash as known, but ensure we don't overflow our limits
+ p.knownBlocks.Add(block.Hash())
+ for p.knownBlocks.Cardinality() >= maxKnownBlocks {
+ p.knownBlocks.Pop()
+ }
+ return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
+}
+
+// AsyncSendNewBlock queues an entire block for propagation to a remote peer. If
+// the peer's broadcast queue is full, the event is silently dropped.
+func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
+ select {
+ case p.queuedProps <- &propEvent{block: block, td: td}:
+ // Mark all the block hash as known, but ensure we don't overflow our limits
+ p.knownBlocks.Add(block.Hash())
+ for p.knownBlocks.Cardinality() >= maxKnownBlocks {
+ p.knownBlocks.Pop()
+ }
+ default:
+ p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
+ }
+}
+
+// SendBlockHeaders sends a batch of block headers to the remote peer.
+func (p *peer) SendBlockHeaders(headers []*types.Header) error {
+ return p2p.Send(p.rw, BlockHeadersMsg, headers)
+}
+
+// SendBlockBodies sends a batch of block contents to the remote peer.
+func (p *peer) SendBlockBodies(bodies []*blockBody) error {
+ return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesData(bodies))
+}
+
+// SendBlockBodiesRLP sends a batch of block contents to the remote peer from
+// an already RLP encoded format.
+func (p *peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error {
+ return p2p.Send(p.rw, BlockBodiesMsg, bodies)
+}
+
+// SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the
+// hashes requested.
+func (p *peer) SendNodeData(data [][]byte) error {
+ return p2p.Send(p.rw, NodeDataMsg, data)
+}
+
+// SendReceiptsRLP sends a batch of transaction receipts, corresponding to the
+// ones requested from an already RLP encoded format.
+func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
+ return p2p.Send(p.rw, ReceiptsMsg, receipts)
+}
+
+// RequestOneHeader is a wrapper around the header query functions to fetch a
+// single header. It is used solely by the fetcher.
+func (p *peer) RequestOneHeader(hash common.Hash) error {
+ p.Log().Debug("Fetching single header", "hash", hash)
+ return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})
+}
+
+// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
+// specified header query, based on the hash of an origin block.
+func (p *peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
+ p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
+ return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
+}
+
+// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
+// specified header query, based on the number of an origin block.
+func (p *peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
+ p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
+ return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
+}
+
+// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
+// specified.
+func (p *peer) RequestBodies(hashes []common.Hash) error {
+ p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
+ return p2p.Send(p.rw, GetBlockBodiesMsg, hashes)
+}
+
+// RequestNodeData fetches a batch of arbitrary data from a node's known state
+// data, corresponding to the specified hashes.
+func (p *peer) RequestNodeData(hashes []common.Hash) error {
+ p.Log().Debug("Fetching batch of state data", "count", len(hashes))
+ return p2p.Send(p.rw, GetNodeDataMsg, hashes)
+}
+
+// RequestReceipts fetches a batch of transaction receipts from a remote node.
+func (p *peer) RequestReceipts(hashes []common.Hash) error {
+ p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
+ return p2p.Send(p.rw, GetReceiptsMsg, hashes)
+}
+
+// Handshake executes the eth protocol handshake, negotiating version number,
+// network IDs, difficulties, head and genesis blocks.
+func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash) error {
+ // Send out own handshake in a new thread
+ errc := make(chan error, 2)
+ var status statusData // safe to read after two values have been received from errc
+
+ go func() {
+ errc <- p2p.Send(p.rw, StatusMsg, &statusData{
+ ProtocolVersion: uint32(p.version),
+ NetworkId: network,
+ TD: td,
+ CurrentBlock: head,
+ GenesisBlock: genesis,
+ })
+ }()
+ go func() {
+ errc <- p.readStatus(network, &status, genesis)
+ }()
+ timeout := time.NewTimer(handshakeTimeout)
+ defer timeout.Stop()
+ for i := 0; i < 2; i++ {
+ select {
+ case err := <-errc:
+ if err != nil {
+ return err
+ }
+ case <-timeout.C:
+ return p2p.DiscReadTimeout
+ }
+ }
+ p.td, p.head = status.TD, status.CurrentBlock
+ return nil
+}
+
+func (p *peer) readStatus(network uint64, status *statusData, genesis common.Hash) (err error) {
+ msg, err := p.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if msg.Code != StatusMsg {
+ return errResp(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg)
+ }
+ if msg.Size > protocolMaxMsgSize {
+ return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, protocolMaxMsgSize)
+ }
+ // Decode the handshake and make sure everything matches
+ if err := msg.Decode(&status); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ if status.GenesisBlock != genesis {
+ return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock[:8], genesis[:8])
+ }
+ if status.NetworkId != network {
+ return errResp(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, network)
+ }
+ if int(status.ProtocolVersion) != p.version {
+ return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, p.version)
+ }
+ return nil
+}
+
+// String implements fmt.Stringer.
+func (p *peer) String() string {
+ return fmt.Sprintf("Peer %s [%s]", p.id,
+ fmt.Sprintf("eth/%2d", p.version),
+ )
+}
+
+// peerSet represents the collection of active peers currently participating in
+// the Ethereum sub-protocol.
+type peerSet struct {
+ peers map[string]*peer
+ lock sync.RWMutex
+ closed bool
+}
+
+// newPeerSet creates a new peer set to track the active participants.
+func newPeerSet() *peerSet {
+ return &peerSet{
+ peers: make(map[string]*peer),
+ }
+}
+
+// Register injects a new peer into the working set, or returns an error if the
+// peer is already known. If a new peer it registered, its broadcast loop is also
+// started.
+func (ps *peerSet) Register(p *peer) error {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ if ps.closed {
+ return errClosed
+ }
+ if _, ok := ps.peers[p.id]; ok {
+ return errAlreadyRegistered
+ }
+ ps.peers[p.id] = p
+ go p.broadcast()
+
+ return nil
+}
+
+// Unregister removes a remote peer from the active set, disabling any further
+// actions to/from that particular entity.
+func (ps *peerSet) Unregister(id string) error {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ p, ok := ps.peers[id]
+ if !ok {
+ return errNotRegistered
+ }
+ delete(ps.peers, id)
+ p.close()
+
+ return nil
+}
+
+// Peer retrieves the registered peer with the given id.
+func (ps *peerSet) Peer(id string) *peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ return ps.peers[id]
+}
+
+// Len returns if the current number of peers in the set.
+func (ps *peerSet) Len() int {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ return len(ps.peers)
+}
+
+// PeersWithoutBlock retrieves a list of peers that do not have a given block in
+// their set of known hashes.
+func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ if !p.knownBlocks.Contains(hash) {
+ list = append(list, p)
+ }
+ }
+ return list
+}
+
+// PeersWithoutTx retrieves a list of peers that do not have a given transaction
+// in their set of known hashes.
+func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ if !p.knownTxs.Contains(hash) {
+ list = append(list, p)
+ }
+ }
+ return list
+}
+
+// BestPeer retrieves the known peer with the currently highest total difficulty.
+func (ps *peerSet) BestPeer() *peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ var (
+ bestPeer *peer
+ bestTd *big.Int
+ )
+ for _, p := range ps.peers {
+ if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
+ bestPeer, bestTd = p, td
+ }
+ }
+ return bestPeer
+}
+
+// Close disconnects all peers.
+// No new peers can be registered after Close has returned.
+func (ps *peerSet) Close() {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ for _, p := range ps.peers {
+ p.Disconnect(p2p.DiscQuitting)
+ }
+ ps.closed = true
+}
diff --git a/eth/protocol.go b/eth/protocol.go
new file mode 100644
index 0000000..de0c979
--- /dev/null
+++ b/eth/protocol.go
@@ -0,0 +1,196 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "fmt"
+ "io"
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+// Constants to match up protocol versions and messages
+const (
+ eth62 = 62
+ eth63 = 63
+)
+
+// protocolName is the official short name of the protocol used during capability negotiation.
+const protocolName = "eth"
+
+// ProtocolVersions are the supported versions of the eth protocol (first is primary).
+var ProtocolVersions = []uint{eth63}
+
+// protocolLengths are the number of implemented message corresponding to different protocol versions.
+var protocolLengths = map[uint]uint64{eth63: 17, eth62: 8}
+
+const protocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message
+
+// eth protocol message codes
+const (
+ // Protocol messages belonging to eth/62
+ StatusMsg = 0x00
+ NewBlockHashesMsg = 0x01
+ TxMsg = 0x02
+ GetBlockHeadersMsg = 0x03
+ BlockHeadersMsg = 0x04
+ GetBlockBodiesMsg = 0x05
+ BlockBodiesMsg = 0x06
+ NewBlockMsg = 0x07
+
+ // Protocol messages belonging to eth/63
+ GetNodeDataMsg = 0x0d
+ NodeDataMsg = 0x0e
+ GetReceiptsMsg = 0x0f
+ ReceiptsMsg = 0x10
+)
+
+type errCode int
+
+const (
+ ErrMsgTooLarge = iota
+ ErrDecode
+ ErrInvalidMsgCode
+ ErrProtocolVersionMismatch
+ ErrNetworkIdMismatch
+ ErrGenesisBlockMismatch
+ ErrNoStatusMsg
+ ErrExtraStatusMsg
+ ErrSuspendedPeer
+)
+
+func (e errCode) String() string {
+ return errorToString[int(e)]
+}
+
+// XXX change once legacy code is out
+var errorToString = map[int]string{
+ ErrMsgTooLarge: "Message too long",
+ ErrDecode: "Invalid message",
+ ErrInvalidMsgCode: "Invalid message code",
+ ErrProtocolVersionMismatch: "Protocol version mismatch",
+ ErrNetworkIdMismatch: "NetworkId mismatch",
+ ErrGenesisBlockMismatch: "Genesis block mismatch",
+ ErrNoStatusMsg: "No status message",
+ ErrExtraStatusMsg: "Extra status message",
+ ErrSuspendedPeer: "Suspended peer",
+}
+
+type txPool interface {
+ // AddRemotes should add the given transactions to the pool.
+ AddRemotes([]*types.Transaction) []error
+
+ // Pending should return pending transactions.
+ // The slice should be modifiable by the caller.
+ Pending() (map[common.Address]types.Transactions, error)
+
+ // SubscribeNewTxsEvent should return an event subscription of
+ // NewTxsEvent and send events to the given channel.
+ SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
+}
+
+// statusData is the network packet for the status message.
+type statusData struct {
+ ProtocolVersion uint32
+ NetworkId uint64
+ TD *big.Int
+ CurrentBlock common.Hash
+ GenesisBlock common.Hash
+}
+
+// newBlockHashesData is the network packet for the block announcements.
+type newBlockHashesData []struct {
+ Hash common.Hash // Hash of one particular block being announced
+ Number uint64 // Number of one particular block being announced
+}
+
+// getBlockHeadersData represents a block header query.
+type getBlockHeadersData struct {
+ Origin hashOrNumber // Block from which to retrieve headers
+ Amount uint64 // Maximum number of headers to retrieve
+ Skip uint64 // Blocks to skip between consecutive headers
+ Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis)
+}
+
+// hashOrNumber is a combined field for specifying an origin block.
+type hashOrNumber struct {
+ Hash common.Hash // Block hash from which to retrieve headers (excludes Number)
+ Number uint64 // Block hash from which to retrieve headers (excludes Hash)
+}
+
+// EncodeRLP is a specialized encoder for hashOrNumber to encode only one of the
+// two contained union fields.
+func (hn *hashOrNumber) EncodeRLP(w io.Writer) error {
+ if hn.Hash == (common.Hash{}) {
+ return rlp.Encode(w, hn.Number)
+ }
+ if hn.Number != 0 {
+ return fmt.Errorf("both origin hash (%x) and number (%d) provided", hn.Hash, hn.Number)
+ }
+ return rlp.Encode(w, hn.Hash)
+}
+
+// DecodeRLP is a specialized decoder for hashOrNumber to decode the contents
+// into either a block hash or a block number.
+func (hn *hashOrNumber) DecodeRLP(s *rlp.Stream) error {
+ _, size, _ := s.Kind()
+ origin, err := s.Raw()
+ if err == nil {
+ switch {
+ case size == 32:
+ err = rlp.DecodeBytes(origin, &hn.Hash)
+ case size <= 8:
+ err = rlp.DecodeBytes(origin, &hn.Number)
+ default:
+ err = fmt.Errorf("invalid input size %d for origin", size)
+ }
+ }
+ return err
+}
+
+// newBlockData is the network packet for the block propagation message.
+type newBlockData struct {
+ Block *types.Block
+ TD *big.Int
+}
+
+// sanityCheck verifies that the values are reasonable, as a DoS protection
+func (request *newBlockData) sanityCheck() error {
+ if err := request.Block.SanityCheck(); err != nil {
+ return err
+ }
+ //TD at mainnet block #7753254 is 76 bits. If it becomes 100 million times
+ // larger, it will still fit within 100 bits
+ if tdlen := request.TD.BitLen(); tdlen > 100 {
+ return fmt.Errorf("too large block TD: bitlen %d", tdlen)
+ }
+ return nil
+}
+
+// blockBody represents the data content of a single block.
+type blockBody struct {
+ Transactions []*types.Transaction // Transactions contained within a block
+ Uncles []*types.Header // Uncles contained within a block
+}
+
+// blockBodiesData is the network packet for block content distribution.
+type blockBodiesData []*blockBody
diff --git a/eth/sync.go b/eth/sync.go
new file mode 100644
index 0000000..9e180ee
--- /dev/null
+++ b/eth/sync.go
@@ -0,0 +1,216 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "math/rand"
+ "sync/atomic"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p/enode"
+)
+
+const (
+ forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
+ minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+
+ // This is the target size for the packs of transactions sent by txsyncLoop.
+ // A pack can get larger than this if a single transactions exceeds this size.
+ txsyncPackSize = 100 * 1024
+)
+
+type txsync struct {
+ p *peer
+ txs []*types.Transaction
+}
+
+// syncTransactions starts sending all currently pending transactions to the given peer.
+func (pm *ProtocolManager) syncTransactions(p *peer) {
+ var txs types.Transactions
+ pending, _ := pm.txpool.Pending()
+ for _, batch := range pending {
+ txs = append(txs, batch...)
+ }
+ if len(txs) == 0 {
+ return
+ }
+ select {
+ case pm.txsyncCh <- &txsync{p, txs}:
+ case <-pm.quitSync:
+ }
+}
+
+// txsyncLoop takes care of the initial transaction sync for each new
+// connection. When a new peer appears, we relay all currently pending
+// transactions. In order to minimise egress bandwidth usage, we send
+// the transactions in small packs to one peer at a time.
+func (pm *ProtocolManager) txsyncLoop() {
+ var (
+ pending = make(map[enode.ID]*txsync)
+ sending = false // whether a send is active
+ pack = new(txsync) // the pack that is being sent
+ done = make(chan error, 1) // result of the send
+ )
+
+ // send starts a sending a pack of transactions from the sync.
+ send := func(s *txsync) {
+ // Fill pack with transactions up to the target size.
+ size := common.StorageSize(0)
+ pack.p = s.p
+ pack.txs = pack.txs[:0]
+ for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
+ pack.txs = append(pack.txs, s.txs[i])
+ size += s.txs[i].Size()
+ }
+ // Remove the transactions that will be sent.
+ s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
+ if len(s.txs) == 0 {
+ delete(pending, s.p.ID())
+ }
+ // Send the pack in the background.
+ s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
+ sending = true
+ go func() { done <- pack.p.SendTransactions(pack.txs) }()
+ }
+
+ // pick chooses the next pending sync.
+ pick := func() *txsync {
+ if len(pending) == 0 {
+ return nil
+ }
+ n := rand.Intn(len(pending)) + 1
+ for _, s := range pending {
+ if n--; n == 0 {
+ return s
+ }
+ }
+ return nil
+ }
+
+ for {
+ select {
+ case s := <-pm.txsyncCh:
+ pending[s.p.ID()] = s
+ if !sending {
+ send(s)
+ }
+ case err := <-done:
+ sending = false
+ // Stop tracking peers that cause send failures.
+ if err != nil {
+ pack.p.Log().Debug("Transaction send failed", "err", err)
+ delete(pending, pack.p.ID())
+ }
+ // Schedule the next send.
+ if s := pick(); s != nil {
+ send(s)
+ }
+ case <-pm.quitSync:
+ return
+ }
+ }
+}
+
+// syncer is responsible for periodically synchronising with the network, both
+// downloading hashes and blocks as well as handling the announcement handler.
+func (pm *ProtocolManager) syncer() {
+ // Start and ensure cleanup of sync mechanisms
+ pm.fetcher.Start()
+ defer pm.fetcher.Stop()
+ defer pm.downloader.Terminate()
+
+ // Wait for different events to fire synchronisation operations
+ forceSync := time.NewTicker(forceSyncCycle)
+ defer forceSync.Stop()
+
+ for {
+ select {
+ case <-pm.newPeerCh:
+ // Make sure we have peers to select from, then sync
+ if pm.peers.Len() < minDesiredPeerCount {
+ break
+ }
+ go pm.synchronise(pm.peers.BestPeer())
+
+ case <-forceSync.C:
+ // Force a sync even if not enough peers are present
+ go pm.synchronise(pm.peers.BestPeer())
+
+ case <-pm.noMorePeers:
+ return
+ }
+ }
+}
+
+// synchronise tries to sync up our local block chain with a remote peer.
+func (pm *ProtocolManager) synchronise(peer *peer) {
+ // Short circuit if no peers are available
+ if peer == nil {
+ return
+ }
+ // Make sure the peer's TD is higher than our own
+ currentBlock := pm.blockchain.CurrentBlock()
+ td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
+
+ pHead, pTd := peer.Head()
+ if pTd.Cmp(td) <= 0 {
+ return
+ }
+ // Otherwise try to sync with the downloader
+ mode := downloader.FullSync
+ if atomic.LoadUint32(&pm.fastSync) == 1 {
+ // Fast sync was explicitly requested, and explicitly granted
+ mode = downloader.FastSync
+ }
+ if mode == downloader.FastSync {
+ // Make sure the peer's total difficulty we are synchronizing is higher.
+ if pm.blockchain.GetTdByHash(pm.blockchain.CurrentFastBlock().Hash()).Cmp(pTd) >= 0 {
+ return
+ }
+ }
+ // Run the sync cycle, and disable fast sync if we've went past the pivot block
+ if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
+ return
+ }
+ if atomic.LoadUint32(&pm.fastSync) == 1 {
+ log.Info("Fast sync complete, auto disabling")
+ atomic.StoreUint32(&pm.fastSync, 0)
+ }
+ // If we've successfully finished a sync cycle and passed any required checkpoint,
+ // enable accepting transactions from the network.
+ head := pm.blockchain.CurrentBlock()
+ if head.NumberU64() >= pm.checkpointNumber {
+ // Checkpoint passed, sanity check the timestamp to have a fallback mechanism
+ // for non-checkpointed (number = 0) private networks.
+ if head.Time() >= uint64(time.Now().AddDate(0, -1, 0).Unix()) {
+ atomic.StoreUint32(&pm.acceptTxs, 1)
+ }
+ }
+ if head.NumberU64() > 0 {
+ // We've completed a sync cycle, notify all peers of new state. This path is
+ // essential in star-topology networks where a gateway node needs to notify
+ // all its out-of-date peers of the availability of a new block. This failure
+ // scenario will most often crop up in private and hackathon networks with
+ // degenerate connectivity, but it should be healthy for the mainnet too to
+ // more reliably update peers or the local TD state.
+ go pm.BroadcastBlock(head, false)
+ }
+}