aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2019-10-21 14:47:51 -0400
committerDeterminant <[email protected]>2019-10-21 14:47:51 -0400
commit79b1169a9ff0b54ddf3b520a70a79c78ba5c988d (patch)
tree13fc62be3ebf344544547eeb9979450a3c6ecd40 /core
parent913e9439a7c7883881895ee597a0cc464fb92353 (diff)
customize Blockchain code
Diffstat (limited to 'core')
-rw-r--r--core/block_validator.go139
-rw-r--r--core/blockchain.go2235
-rw-r--r--core/blockchain_insert.go166
-rw-r--r--core/blocks.go25
-rw-r--r--core/chain_indexer.go512
-rw-r--r--core/error.go38
-rw-r--r--core/events.go48
-rw-r--r--core/evm.go97
-rw-r--r--core/gaspool.go54
-rw-r--r--core/headerchain.go538
-rw-r--r--core/state_prefetcher.go85
-rw-r--r--core/state_processor.go129
-rw-r--r--core/state_transition.go255
-rw-r--r--core/tx_cacher.go105
-rw-r--r--core/tx_journal.go180
-rw-r--r--core/tx_list.go520
-rw-r--r--core/tx_noncer.go79
-rw-r--r--core/tx_pool.go1523
-rw-r--r--core/types.go51
19 files changed, 6779 insertions, 0 deletions
diff --git a/core/block_validator.go b/core/block_validator.go
new file mode 100644
index 0000000..ae6cd4d
--- /dev/null
+++ b/core/block_validator.go
@@ -0,0 +1,139 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "fmt"
+
+ "github.com/ava-labs/go-ethereum/consensus"
+ "github.com/ava-labs/go-ethereum/core/state"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/params"
+)
+
+// BlockValidator is responsible for validating block headers, uncles and
+// processed state.
+//
+// BlockValidator implements Validator.
+type BlockValidator struct {
+ config *params.ChainConfig // Chain configuration options
+ bc *BlockChain // Canonical block chain
+ engine consensus.Engine // Consensus engine used for validating
+}
+
+// NewBlockValidator returns a new block validator which is safe for re-use
+func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engine consensus.Engine) *BlockValidator {
+ validator := &BlockValidator{
+ config: config,
+ engine: engine,
+ bc: blockchain,
+ }
+ return validator
+}
+
+// ValidateBody validates the given block's uncles and verifies the block
+// header's transaction and uncle roots. The headers are assumed to be already
+// validated at this point.
+func (v *BlockValidator) ValidateBody(block *types.Block) error {
+ // Check whether the block's known, and if not, that it's linkable
+ if v.bc.HasBlockAndState(block.Hash(), block.NumberU64()) {
+ return ErrKnownBlock
+ }
+ // Header validity is known at this point, check the uncles and transactions
+ header := block.Header()
+ if err := v.engine.VerifyUncles(v.bc, block); err != nil {
+ return err
+ }
+ if hash := types.CalcUncleHash(block.Uncles()); hash != header.UncleHash {
+ return fmt.Errorf("uncle root hash mismatch: have %x, want %x", hash, header.UncleHash)
+ }
+ if hash := types.DeriveSha(block.Transactions()); hash != header.TxHash {
+ return fmt.Errorf("transaction root hash mismatch: have %x, want %x", hash, header.TxHash)
+ }
+ if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
+ if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) {
+ return consensus.ErrUnknownAncestor
+ }
+ return consensus.ErrPrunedAncestor
+ }
+ return nil
+}
+
+// ValidateState validates the various changes that happen after a state
+// transition, such as amount of used gas, the receipt roots and the state root
+// itself. ValidateState returns a database batch if the validation was a success
+// otherwise nil and an error is returned.
+func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error {
+ header := block.Header()
+ if block.GasUsed() != usedGas {
+ return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas)
+ }
+ // Validate the received block's bloom with the one derived from the generated receipts.
+ // For valid blocks this should always validate to true.
+ rbloom := types.CreateBloom(receipts)
+ if rbloom != header.Bloom {
+ return fmt.Errorf("invalid bloom (remote: %x local: %x)", header.Bloom, rbloom)
+ }
+ // Tre receipt Trie's root (R = (Tr [[H1, R1], ... [Hn, R1]]))
+ receiptSha := types.DeriveSha(receipts)
+ if receiptSha != header.ReceiptHash {
+ return fmt.Errorf("invalid receipt root hash (remote: %x local: %x)", header.ReceiptHash, receiptSha)
+ }
+ // Validate the state root against the received state root and throw
+ // an error if they don't match.
+ if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
+ return fmt.Errorf("invalid merkle root (remote: %x local: %x)", header.Root, root)
+ }
+ return nil
+}
+
+// CalcGasLimit computes the gas limit of the next block after parent. It aims
+// to keep the baseline gas above the provided floor, and increase it towards the
+// ceil if the blocks are full. If the ceil is exceeded, it will always decrease
+// the gas allowance.
+func CalcGasLimit(parent *types.Block, gasFloor, gasCeil uint64) uint64 {
+ // contrib = (parentGasUsed * 3 / 2) / 1024
+ contrib := (parent.GasUsed() + parent.GasUsed()/2) / params.GasLimitBoundDivisor
+
+ // decay = parentGasLimit / 1024 -1
+ decay := parent.GasLimit()/params.GasLimitBoundDivisor - 1
+
+ /*
+ strategy: gasLimit of block-to-mine is set based on parent's
+ gasUsed value. if parentGasUsed > parentGasLimit * (2/3) then we
+ increase it, otherwise lower it (or leave it unchanged if it's right
+ at that usage) the amount increased/decreased depends on how far away
+ from parentGasLimit * (2/3) parentGasUsed is.
+ */
+ limit := parent.GasLimit() - decay + contrib
+ if limit < params.MinGasLimit {
+ limit = params.MinGasLimit
+ }
+ // If we're outside our allowed gas range, we try to hone towards them
+ if limit < gasFloor {
+ limit = parent.GasLimit() + decay
+ if limit > gasFloor {
+ limit = gasFloor
+ }
+ } else if limit > gasCeil {
+ limit = parent.GasLimit() - decay
+ if limit < gasCeil {
+ limit = gasCeil
+ }
+ }
+ return limit
+}
diff --git a/core/blockchain.go b/core/blockchain.go
new file mode 100644
index 0000000..174d403
--- /dev/null
+++ b/core/blockchain.go
@@ -0,0 +1,2235 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package core implements the Ethereum consensus protocol.
+package core
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "math/big"
+ mrand "math/rand"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/common/mclock"
+ "github.com/ava-labs/go-ethereum/common/prque"
+ "github.com/ava-labs/go-ethereum/consensus"
+ "github.com/ava-labs/go-ethereum/core/rawdb"
+ "github.com/ava-labs/go-ethereum/core/state"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/core/vm"
+ "github.com/ava-labs/go-ethereum/ethdb"
+ "github.com/ava-labs/go-ethereum/event"
+ "github.com/ava-labs/go-ethereum/log"
+ "github.com/ava-labs/go-ethereum/metrics"
+ "github.com/ava-labs/go-ethereum/params"
+ "github.com/ava-labs/go-ethereum/rlp"
+ "github.com/ava-labs/go-ethereum/trie"
+ "github.com/hashicorp/golang-lru"
+)
+
+var (
+ headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil)
+ headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil)
+ headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil)
+
+ accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil)
+ accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil)
+ accountUpdateTimer = metrics.NewRegisteredTimer("chain/account/updates", nil)
+ accountCommitTimer = metrics.NewRegisteredTimer("chain/account/commits", nil)
+
+ storageReadTimer = metrics.NewRegisteredTimer("chain/storage/reads", nil)
+ storageHashTimer = metrics.NewRegisteredTimer("chain/storage/hashes", nil)
+ storageUpdateTimer = metrics.NewRegisteredTimer("chain/storage/updates", nil)
+ storageCommitTimer = metrics.NewRegisteredTimer("chain/storage/commits", nil)
+
+ blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
+ blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
+ blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
+ blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
+ blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)
+ blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
+
+ blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil)
+ blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
+
+ errInsertionInterrupted = errors.New("insertion is interrupted")
+)
+
+const (
+ bodyCacheLimit = 256
+ blockCacheLimit = 256
+ receiptsCacheLimit = 32
+ txLookupCacheLimit = 1024
+ maxFutureBlocks = 256
+ maxTimeFutureBlocks = 30
+ badBlockLimit = 10
+ TriesInMemory = 128
+
+ // BlockChainVersion ensures that an incompatible database forces a resync from scratch.
+ //
+ // Changelog:
+ //
+ // - Version 4
+ // The following incompatible database changes were added:
+ // * the `BlockNumber`, `TxHash`, `TxIndex`, `BlockHash` and `Index` fields of log are deleted
+ // * the `Bloom` field of receipt is deleted
+ // * the `BlockIndex` and `TxIndex` fields of txlookup are deleted
+ // - Version 5
+ // The following incompatible database changes were added:
+ // * the `TxHash`, `GasCost`, and `ContractAddress` fields are no longer stored for a receipt
+ // * the `TxHash`, `GasCost`, and `ContractAddress` fields are computed by looking up the
+ // receipts' corresponding block
+ // - Version 6
+ // The following incompatible database changes were added:
+ // * Transaction lookup information stores the corresponding block number instead of block hash
+ // - Version 7
+ // The following incompatible database changes were added:
+ // * Use freezer as the ancient database to maintain all ancient data
+ BlockChainVersion uint64 = 7
+)
+
+// CacheConfig contains the configuration values for the trie caching/pruning
+// that's resident in a blockchain.
+type CacheConfig struct {
+ TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
+ TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks
+ TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
+ TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
+ TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
+}
+
+// BlockChain represents the canonical chain given a database with a genesis
+// block. The Blockchain manages chain imports, reverts, chain reorganisations.
+//
+// Importing blocks in to the block chain happens according to the set of rules
+// defined by the two stage Validator. Processing of blocks is done using the
+// Processor which processes the included transaction. The validation of the state
+// is done in the second part of the Validator. Failing results in aborting of
+// the import.
+//
+// The BlockChain also helps in returning blocks from **any** chain included
+// in the database as well as blocks that represents the canonical chain. It's
+// important to note that GetBlock can return any block and does not need to be
+// included in the canonical one where as GetBlockByNumber always represents the
+// canonical chain.
+type BlockChain struct {
+ chainConfig *params.ChainConfig // Chain & network configuration
+ cacheConfig *CacheConfig // Cache configuration for pruning
+
+ db ethdb.Database // Low level persistent database to store final content in
+ triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
+ gcproc time.Duration // Accumulates canonical block processing for trie dumping
+
+ hc *HeaderChain
+ rmLogsFeed event.Feed
+ chainFeed event.Feed
+ chainSideFeed event.Feed
+ chainHeadFeed event.Feed
+ logsFeed event.Feed
+ blockProcFeed event.Feed
+ scope event.SubscriptionScope
+ genesisBlock *types.Block
+
+ chainmu sync.RWMutex // blockchain insertion lock
+
+ currentBlock atomic.Value // Current head of the block chain
+ currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
+
+ stateCache state.Database // State database to reuse between imports (contains state cache)
+ bodyCache *lru.Cache // Cache for the most recent block bodies
+ bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
+ receiptsCache *lru.Cache // Cache for the most recent receipts per block
+ blockCache *lru.Cache // Cache for the most recent entire blocks
+ txLookupCache *lru.Cache // Cache for the most recent transaction lookup data.
+ futureBlocks *lru.Cache // future blocks are blocks added for later processing
+
+ quit chan struct{} // blockchain quit channel
+ running int32 // running must be called atomically
+ // procInterrupt must be atomically called
+ procInterrupt int32 // interrupt signaler for block processing
+ wg sync.WaitGroup // chain processing wait group for shutting down
+
+ engine consensus.Engine
+ validator Validator // Block and state validator interface
+ prefetcher Prefetcher // Block state prefetcher interface
+ processor Processor // Block transaction processor interface
+ vmConfig vm.Config
+
+ badBlocks *lru.Cache // Bad block cache
+ shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
+ terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
+ manualCanonical bool
+}
+
+// NewBlockChain returns a fully initialised block chain using information
+// available in the database. It initialises the default Ethereum Validator and
+// Processor.
+func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, manualCanonical bool) (*BlockChain, error) {
+ if cacheConfig == nil {
+ cacheConfig = &CacheConfig{
+ TrieCleanLimit: 256,
+ TrieDirtyLimit: 256,
+ TrieTimeLimit: 5 * time.Minute,
+ }
+ }
+ bodyCache, _ := lru.New(bodyCacheLimit)
+ bodyRLPCache, _ := lru.New(bodyCacheLimit)
+ receiptsCache, _ := lru.New(receiptsCacheLimit)
+ blockCache, _ := lru.New(blockCacheLimit)
+ txLookupCache, _ := lru.New(txLookupCacheLimit)
+ futureBlocks, _ := lru.New(maxFutureBlocks)
+ badBlocks, _ := lru.New(badBlockLimit)
+
+ bc := &BlockChain{
+ chainConfig: chainConfig,
+ cacheConfig: cacheConfig,
+ db: db,
+ triegc: prque.New(nil),
+ stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit),
+ quit: make(chan struct{}),
+ shouldPreserve: shouldPreserve,
+ bodyCache: bodyCache,
+ bodyRLPCache: bodyRLPCache,
+ receiptsCache: receiptsCache,
+ blockCache: blockCache,
+ txLookupCache: txLookupCache,
+ futureBlocks: futureBlocks,
+ engine: engine,
+ vmConfig: vmConfig,
+ badBlocks: badBlocks,
+ manualCanonical: manualCanonical,
+ }
+ bc.validator = NewBlockValidator(chainConfig, bc, engine)
+ bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
+ bc.processor = NewStateProcessor(chainConfig, bc, engine)
+
+ var err error
+ bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt)
+ if err != nil {
+ return nil, err
+ }
+ bc.genesisBlock = bc.GetBlockByNumber(0)
+ if bc.genesisBlock == nil {
+ return nil, ErrNoGenesis
+ }
+ // Initialize the chain with ancient data if it isn't empty.
+ if bc.empty() {
+ rawdb.InitDatabaseFromFreezer(bc.db)
+ }
+ if err := bc.loadLastState(); err != nil {
+ return nil, err
+ }
+ // The first thing the node will do is reconstruct the verification data for
+ // the head block (ethash cache or clique voting snapshot). Might as well do
+ // it in advance.
+ bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true)
+
+ if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 {
+ var (
+ needRewind bool
+ low uint64
+ )
+ // The head full block may be rolled back to a very low height due to
+ // blockchain repair. If the head full block is even lower than the ancient
+ // chain, truncate the ancient store.
+ fullBlock := bc.CurrentBlock()
+ if fullBlock != nil && fullBlock != bc.genesisBlock && fullBlock.NumberU64() < frozen-1 {
+ needRewind = true
+ low = fullBlock.NumberU64()
+ }
+ // In fast sync, it may happen that ancient data has been written to the
+ // ancient store, but the LastFastBlock has not been updated, truncate the
+ // extra data here.
+ fastBlock := bc.CurrentFastBlock()
+ if fastBlock != nil && fastBlock.NumberU64() < frozen-1 {
+ needRewind = true
+ if fastBlock.NumberU64() < low || low == 0 {
+ low = fastBlock.NumberU64()
+ }
+ }
+ if needRewind {
+ var hashes []common.Hash
+ previous := bc.CurrentHeader().Number.Uint64()
+ for i := low + 1; i <= bc.CurrentHeader().Number.Uint64(); i++ {
+ hashes = append(hashes, rawdb.ReadCanonicalHash(bc.db, i))
+ }
+ bc.Rollback(hashes)
+ log.Warn("Truncate ancient chain", "from", previous, "to", low)
+ }
+ }
+ // Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
+ for hash := range BadHashes {
+ if header := bc.GetHeaderByHash(hash); header != nil {
+ // get the canonical block corresponding to the offending header's number
+ headerByNumber := bc.GetHeaderByNumber(header.Number.Uint64())
+ // make sure the headerByNumber (if present) is in our current canonical chain
+ if headerByNumber != nil && headerByNumber.Hash() == header.Hash() {
+ log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash)
+ bc.SetHead(header.Number.Uint64() - 1)
+ log.Error("Chain rewind was successful, resuming normal operation")
+ }
+ }
+ }
+ // Take ownership of this particular state
+ go bc.update()
+ return bc, nil
+}
+
+func (bc *BlockChain) getProcInterrupt() bool {
+ return atomic.LoadInt32(&bc.procInterrupt) == 1
+}
+
+// GetVMConfig returns the block chain VM config.
+func (bc *BlockChain) GetVMConfig() *vm.Config {
+ return &bc.vmConfig
+}
+
+// empty returns an indicator whether the blockchain is empty.
+// Note, it's a special case that we connect a non-empty ancient
+// database with an empty node, so that we can plugin the ancient
+// into node seamlessly.
+func (bc *BlockChain) empty() bool {
+ genesis := bc.genesisBlock.Hash()
+ for _, hash := range []common.Hash{rawdb.ReadHeadBlockHash(bc.db), rawdb.ReadHeadHeaderHash(bc.db), rawdb.ReadHeadFastBlockHash(bc.db)} {
+ if hash != genesis {
+ return false
+ }
+ }
+ return true
+}
+
+// loadLastState loads the last known chain state from the database. This method
+// assumes that the chain manager mutex is held.
+func (bc *BlockChain) loadLastState() error {
+ // Restore the last known head block
+ head := rawdb.ReadHeadBlockHash(bc.db)
+ if head == (common.Hash{}) {
+ // Corrupt or empty database, init from scratch
+ log.Warn("Empty database, resetting chain")
+ return bc.Reset()
+ }
+ // Make sure the entire head block is available
+ currentBlock := bc.GetBlockByHash(head)
+ if currentBlock == nil {
+ // Corrupt or empty database, init from scratch
+ log.Warn("Head block missing, resetting chain", "hash", head)
+ return bc.Reset()
+ }
+ // Make sure the state associated with the block is available
+ if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
+ // Dangling block without a state associated, init from scratch
+ log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash())
+ if err := bc.repair(&currentBlock); err != nil {
+ return err
+ }
+ rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash())
+ }
+ // Everything seems to be fine, set as the head block
+ bc.currentBlock.Store(currentBlock)
+ headBlockGauge.Update(int64(currentBlock.NumberU64()))
+
+ // Restore the last known head header
+ currentHeader := currentBlock.Header()
+ if head := rawdb.ReadHeadHeaderHash(bc.db); head != (common.Hash{}) {
+ if header := bc.GetHeaderByHash(head); header != nil {
+ currentHeader = header
+ }
+ }
+ bc.hc.SetCurrentHeader(currentHeader)
+
+ // Restore the last known head fast block
+ bc.currentFastBlock.Store(currentBlock)
+ headFastBlockGauge.Update(int64(currentBlock.NumberU64()))
+
+ if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) {
+ if block := bc.GetBlockByHash(head); block != nil {
+ bc.currentFastBlock.Store(block)
+ headFastBlockGauge.Update(int64(block.NumberU64()))
+ }
+ }
+ // Issue a status log for the user
+ currentFastBlock := bc.CurrentFastBlock()
+
+ headerTd := bc.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64())
+ blockTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
+ fastTd := bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64())
+
+ log.Info("Loaded most recent local header", "number", currentHeader.Number, "hash", currentHeader.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(currentHeader.Time), 0)))
+ log.Info("Loaded most recent local full block", "number", currentBlock.Number(), "hash", currentBlock.Hash(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(currentBlock.Time()), 0)))
+ log.Info("Loaded most recent local fast block", "number", currentFastBlock.Number(), "hash", currentFastBlock.Hash(), "td", fastTd, "age", common.PrettyAge(time.Unix(int64(currentFastBlock.Time()), 0)))
+
+ return nil
+}
+
+// SetHead rewinds the local chain to a new head. In the case of headers, everything
+// above the new head will be deleted and the new one set. In the case of blocks
+// though, the head may be further rewound if block bodies are missing (non-archive
+// nodes after a fast sync).
+func (bc *BlockChain) SetHead(head uint64) error {
+ log.Warn("Rewinding blockchain", "target", head)
+
+ bc.chainmu.Lock()
+ defer bc.chainmu.Unlock()
+
+ updateFn := func(db ethdb.KeyValueWriter, header *types.Header) {
+ // Rewind the block chain, ensuring we don't end up with a stateless head block
+ if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() < currentBlock.NumberU64() {
+ newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
+ if newHeadBlock == nil {
+ newHeadBlock = bc.genesisBlock
+ } else {
+ if _, err := state.New(newHeadBlock.Root(), bc.stateCache); err != nil {
+ // Rewound state missing, rolled back to before pivot, reset to genesis
+ newHeadBlock = bc.genesisBlock
+ }
+ }
+ rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
+ bc.currentBlock.Store(newHeadBlock)
+ headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
+ }
+
+ // Rewind the fast block in a simpleton way to the target head
+ if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() {
+ newHeadFastBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
+ // If either blocks reached nil, reset to the genesis state
+ if newHeadFastBlock == nil {
+ newHeadFastBlock = bc.genesisBlock
+ }
+ rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash())
+ bc.currentFastBlock.Store(newHeadFastBlock)
+ headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
+ }
+ }
+
+ // Rewind the header chain, deleting all block bodies until then
+ delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
+ // Ignore the error here since light client won't hit this path
+ frozen, _ := bc.db.Ancients()
+ if num+1 <= frozen {
+ // Truncate all relative data(header, total difficulty, body, receipt
+ // and canonical hash) from ancient store.
+ if err := bc.db.TruncateAncients(num + 1); err != nil {
+ log.Crit("Failed to truncate ancient data", "number", num, "err", err)
+ }
+
+ // Remove the hash <-> number mapping from the active store.
+ rawdb.DeleteHeaderNumber(db, hash)
+ } else {
+ // Remove relative body and receipts from the active store.
+ // The header, total difficulty and canonical hash will be
+ // removed in the hc.SetHead function.
+ rawdb.DeleteBody(db, hash, num)
+ rawdb.DeleteReceipts(db, hash, num)
+ }
+ // Todo(rjl493456442) txlookup, bloombits, etc
+ }
+ bc.hc.SetHead(head, updateFn, delFn)
+
+ // Clear out any stale content from the caches
+ bc.bodyCache.Purge()
+ bc.bodyRLPCache.Purge()
+ bc.receiptsCache.Purge()
+ bc.blockCache.Purge()
+ bc.txLookupCache.Purge()
+ bc.futureBlocks.Purge()
+
+ return bc.loadLastState()
+}
+
+// FastSyncCommitHead sets the current head block to the one defined by the hash
+// irrelevant what the chain contents were prior.
+func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
+ // Make sure that both the block as well at its state trie exists
+ block := bc.GetBlockByHash(hash)
+ if block == nil {
+ return fmt.Errorf("non existent block [%x…]", hash[:4])
+ }
+ if _, err := trie.NewSecure(block.Root(), bc.stateCache.TrieDB()); err != nil {
+ return err
+ }
+ // If all checks out, manually set the head block
+ bc.chainmu.Lock()
+ bc.currentBlock.Store(block)
+ headBlockGauge.Update(int64(block.NumberU64()))
+ bc.chainmu.Unlock()
+
+ log.Info("Committed new head block", "number", block.Number(), "hash", hash)
+ return nil
+}
+
+// GasLimit returns the gas limit of the current HEAD block.
+func (bc *BlockChain) GasLimit() uint64 {
+ return bc.CurrentBlock().GasLimit()
+}
+
+// CurrentBlock retrieves the current head block of the canonical chain. The
+// block is retrieved from the blockchain's internal cache.
+func (bc *BlockChain) CurrentBlock() *types.Block {
+ return bc.currentBlock.Load().(*types.Block)
+}
+
+// CurrentFastBlock retrieves the current fast-sync head block of the canonical
+// chain. The block is retrieved from the blockchain's internal cache.
+func (bc *BlockChain) CurrentFastBlock() *types.Block {
+ return bc.currentFastBlock.Load().(*types.Block)
+}
+
+// Validator returns the current validator.
+func (bc *BlockChain) Validator() Validator {
+ return bc.validator
+}
+
+// Processor returns the current processor.
+func (bc *BlockChain) Processor() Processor {
+ return bc.processor
+}
+
+// State returns a new mutable state based on the current HEAD block.
+func (bc *BlockChain) State() (*state.StateDB, error) {
+ return bc.StateAt(bc.CurrentBlock().Root())
+}
+
+// StateAt returns a new mutable state based on a particular point in time.
+func (bc *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) {
+ return state.New(root, bc.stateCache)
+}
+
+// StateCache returns the caching database underpinning the blockchain instance.
+func (bc *BlockChain) StateCache() state.Database {
+ return bc.stateCache
+}
+
+// Reset purges the entire blockchain, restoring it to its genesis state.
+func (bc *BlockChain) Reset() error {
+ return bc.ResetWithGenesisBlock(bc.genesisBlock)
+}
+
+// ResetWithGenesisBlock purges the entire blockchain, restoring it to the
+// specified genesis state.
+func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
+ // Dump the entire block chain and purge the caches
+ if err := bc.SetHead(0); err != nil {
+ return err
+ }
+ bc.chainmu.Lock()
+ defer bc.chainmu.Unlock()
+
+ // Prepare the genesis block and reinitialise the chain
+ if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
+ log.Crit("Failed to write genesis block TD", "err", err)
+ }
+ rawdb.WriteBlock(bc.db, genesis)
+
+ bc.genesisBlock = genesis
+ bc.insert(bc.genesisBlock)
+ bc.currentBlock.Store(bc.genesisBlock)
+ headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
+
+ bc.hc.SetGenesis(bc.genesisBlock.Header())
+ bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
+ bc.currentFastBlock.Store(bc.genesisBlock)
+ headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
+
+ return nil
+}
+
+// repair tries to repair the current blockchain by rolling back the current block
+// until one with associated state is found. This is needed to fix incomplete db
+// writes caused either by crashes/power outages, or simply non-committed tries.
+//
+// This method only rolls back the current block. The current header and current
+// fast block are left intact.
+func (bc *BlockChain) repair(head **types.Block) error {
+ for {
+ // Abort if we've rewound to a head block that does have associated state
+ if _, err := state.New((*head).Root(), bc.stateCache); err == nil {
+ log.Info("Rewound blockchain to past state", "number", (*head).Number(), "hash", (*head).Hash())
+ return nil
+ }
+ // Otherwise rewind one block and recheck state availability there
+ block := bc.GetBlock((*head).ParentHash(), (*head).NumberU64()-1)
+ if block == nil {
+ return fmt.Errorf("missing block %d [%x]", (*head).NumberU64()-1, (*head).ParentHash())
+ }
+ *head = block
+ }
+}
+
+// Export writes the active chain to the given writer.
+func (bc *BlockChain) Export(w io.Writer) error {
+ return bc.ExportN(w, uint64(0), bc.CurrentBlock().NumberU64())
+}
+
+// ExportN writes a subset of the active chain to the given writer.
+func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
+ bc.chainmu.RLock()
+ defer bc.chainmu.RUnlock()
+
+ if first > last {
+ return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last)
+ }
+ log.Info("Exporting batch of blocks", "count", last-first+1)
+
+ start, reported := time.Now(), time.Now()
+ for nr := first; nr <= last; nr++ {
+ block := bc.GetBlockByNumber(nr)
+ if block == nil {
+ return fmt.Errorf("export failed on #%d: not found", nr)
+ }
+ if err := block.EncodeRLP(w); err != nil {
+ return err
+ }
+ if time.Since(reported) >= statsReportLimit {
+ log.Info("Exporting blocks", "exported", block.NumberU64()-first, "elapsed", common.PrettyDuration(time.Since(start)))
+ reported = time.Now()
+ }
+ }
+ return nil
+}
+
+// insert injects a new head block into the current block chain. This method
+// assumes that the block is indeed a true head. It will also reset the head
+// header and the head fast sync block to this very same block if they are older
+// or if they are on a different side chain.
+//
+// Note, this function assumes that the `mu` mutex is held!
+func (bc *BlockChain) insert(block *types.Block) {
+ // If the block is on a side chain or an unknown one, force other heads onto it too
+ updateHeads := rawdb.ReadCanonicalHash(bc.db, block.NumberU64()) != block.Hash()
+
+ // Add the block to the canonical chain number scheme and mark as the head
+ rawdb.WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64())
+ rawdb.WriteHeadBlockHash(bc.db, block.Hash())
+
+ bc.currentBlock.Store(block)
+ headBlockGauge.Update(int64(block.NumberU64()))
+
+ // If the block is better than our head or is on a different chain, force update heads
+ if updateHeads {
+ bc.hc.SetCurrentHeader(block.Header())
+ rawdb.WriteHeadFastBlockHash(bc.db, block.Hash())
+
+ bc.currentFastBlock.Store(block)
+ headFastBlockGauge.Update(int64(block.NumberU64()))
+ }
+}
+
+// Genesis retrieves the chain's genesis block.
+func (bc *BlockChain) Genesis() *types.Block {
+ return bc.genesisBlock
+}
+
+// GetBody retrieves a block body (transactions and uncles) from the database by
+// hash, caching it if found.
+func (bc *BlockChain) GetBody(hash common.Hash) *types.Body {
+ // Short circuit if the body's already in the cache, retrieve otherwise
+ if cached, ok := bc.bodyCache.Get(hash); ok {
+ body := cached.(*types.Body)
+ return body
+ }
+ number := bc.hc.GetBlockNumber(hash)
+ if number == nil {
+ return nil
+ }
+ body := rawdb.ReadBody(bc.db, hash, *number)
+ if body == nil {
+ return nil
+ }
+ // Cache the found body for next time and return
+ bc.bodyCache.Add(hash, body)
+ return body
+}
+
+// GetBodyRLP retrieves a block body in RLP encoding from the database by hash,
+// caching it if found.
+func (bc *BlockChain) GetBodyRLP(hash common.Hash) rlp.RawValue {
+ // Short circuit if the body's already in the cache, retrieve otherwise
+ if cached, ok := bc.bodyRLPCache.Get(hash); ok {
+ return cached.(rlp.RawValue)
+ }
+ number := bc.hc.GetBlockNumber(hash)
+ if number == nil {
+ return nil
+ }
+ body := rawdb.ReadBodyRLP(bc.db, hash, *number)
+ if len(body) == 0 {
+ return nil
+ }
+ // Cache the found body for next time and return
+ bc.bodyRLPCache.Add(hash, body)
+ return body
+}
+
+// HasBlock checks if a block is fully present in the database or not.
+func (bc *BlockChain) HasBlock(hash common.Hash, number uint64) bool {
+ if bc.blockCache.Contains(hash) {
+ return true
+ }
+ return rawdb.HasBody(bc.db, hash, number)
+}
+
+// HasFastBlock checks if a fast block is fully present in the database or not.
+func (bc *BlockChain) HasFastBlock(hash common.Hash, number uint64) bool {
+ if !bc.HasBlock(hash, number) {
+ return false
+ }
+ if bc.receiptsCache.Contains(hash) {
+ return true
+ }
+ return rawdb.HasReceipts(bc.db, hash, number)
+}
+
+// HasState checks if state trie is fully present in the database or not.
+func (bc *BlockChain) HasState(hash common.Hash) bool {
+ _, err := bc.stateCache.OpenTrie(hash)
+ return err == nil
+}
+
+// HasBlockAndState checks if a block and associated state trie is fully present
+// in the database or not, caching it if present.
+func (bc *BlockChain) HasBlockAndState(hash common.Hash, number uint64) bool {
+ // Check first that the block itself is known
+ block := bc.GetBlock(hash, number)
+ if block == nil {
+ return false
+ }
+ return bc.HasState(block.Root())
+}
+
+// GetBlock retrieves a block from the database by hash and number,
+// caching it if found.
+func (bc *BlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {
+ // Short circuit if the block's already in the cache, retrieve otherwise
+ if block, ok := bc.blockCache.Get(hash); ok {
+ return block.(*types.Block)
+ }
+ block := rawdb.ReadBlock(bc.db, hash, number)
+ if block == nil {
+ return nil
+ }
+ // Cache the found block for next time and return
+ bc.blockCache.Add(block.Hash(), block)
+ return block
+}
+
+// GetBlockByHash retrieves a block from the database by hash, caching it if found.
+func (bc *BlockChain) GetBlockByHash(hash common.Hash) *types.Block {
+ number := bc.hc.GetBlockNumber(hash)
+ if number == nil {
+ return nil
+ }
+ return bc.GetBlock(hash, *number)
+}
+
+// GetBlockByNumber retrieves a block from the database by number, caching it
+// (associated with its hash) if found.
+func (bc *BlockChain) GetBlockByNumber(number uint64) *types.Block {
+ hash := rawdb.ReadCanonicalHash(bc.db, number)
+ if hash == (common.Hash{}) {
+ return nil
+ }
+ return bc.GetBlock(hash, number)
+}
+
+// GetReceiptsByHash retrieves the receipts for all transactions in a given block.
+func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
+ if receipts, ok := bc.receiptsCache.Get(hash); ok {
+ return receipts.(types.Receipts)
+ }
+ number := rawdb.ReadHeaderNumber(bc.db, hash)
+ if number == nil {
+ return nil
+ }
+ receipts := rawdb.ReadReceipts(bc.db, hash, *number, bc.chainConfig)
+ if receipts == nil {
+ return nil
+ }
+ bc.receiptsCache.Add(hash, receipts)
+ return receipts
+}
+
+// GetBlocksFromHash returns the block corresponding to hash and up to n-1 ancestors.
+// [deprecated by eth/62]
+func (bc *BlockChain) GetBlocksFromHash(hash common.Hash, n int) (blocks []*types.Block) {
+ number := bc.hc.GetBlockNumber(hash)
+ if number == nil {
+ return nil
+ }
+ for i := 0; i < n; i++ {
+ block := bc.GetBlock(hash, *number)
+ if block == nil {
+ break
+ }
+ blocks = append(blocks, block)
+ hash = block.ParentHash()
+ *number--
+ }
+ return
+}
+
+// GetUnclesInChain retrieves all the uncles from a given block backwards until
+// a specific distance is reached.
+func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header {
+ uncles := []*types.Header{}
+ for i := 0; block != nil && i < length; i++ {
+ uncles = append(uncles, block.Uncles()...)
+ block = bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
+ }
+ return uncles
+}
+
+// TrieNode retrieves a blob of data associated with a trie node (or code hash)
+// either from ephemeral in-memory cache, or from persistent storage.
+func (bc *BlockChain) TrieNode(hash common.Hash) ([]byte, error) {
+ return bc.stateCache.TrieDB().Node(hash)
+}
+
+// Stop stops the blockchain service. If any imports are currently in progress
+// it will abort them using the procInterrupt.
+func (bc *BlockChain) Stop() {
+ if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
+ return
+ }
+ // Unsubscribe all subscriptions registered from blockchain
+ bc.scope.Close()
+ close(bc.quit)
+ atomic.StoreInt32(&bc.procInterrupt, 1)
+
+ bc.wg.Wait()
+
+ // Ensure the state of a recent block is also stored to disk before exiting.
+ // We're writing three different states to catch different restart scenarios:
+ // - HEAD: So we don't need to reprocess any blocks in the general case
+ // - HEAD-1: So we don't do large reorgs if our HEAD becomes an uncle
+ // - HEAD-127: So we have a hard limit on the number of blocks reexecuted
+ if !bc.cacheConfig.TrieDirtyDisabled {
+ triedb := bc.stateCache.TrieDB()
+
+ for _, offset := range []uint64{0, 1, TriesInMemory - 1} {
+ if number := bc.CurrentBlock().NumberU64(); number > offset {
+ log.Info(".....", "number", number, "offset", offset)
+ recent := bc.GetBlockByNumber(number - offset)
+
+ log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root())
+ if err := triedb.Commit(recent.Root(), true); err != nil {
+ log.Error("Failed to commit recent state trie", "err", err)
+ }
+ }
+ }
+ for !bc.triegc.Empty() {
+ triedb.Dereference(bc.triegc.PopItem().(common.Hash))
+ }
+ if size, _ := triedb.Size(); size != 0 {
+ log.Error("Dangling trie nodes after full cleanup")
+ }
+ }
+ log.Info("Blockchain manager stopped")
+}
+
+func (bc *BlockChain) procFutureBlocks() {
+ blocks := make([]*types.Block, 0, bc.futureBlocks.Len())
+ for _, hash := range bc.futureBlocks.Keys() {
+ if block, exist := bc.futureBlocks.Peek(hash); exist {
+ blocks = append(blocks, block.(*types.Block))
+ }
+ }
+ if len(blocks) > 0 {
+ types.BlockBy(types.Number).Sort(blocks)
+
+ // Insert one by one as chain insertion needs contiguous ancestry between blocks
+ for i := range blocks {
+ bc.InsertChain(blocks[i : i+1])
+ }
+ }
+}
+
+// WriteStatus status of write
+type WriteStatus byte
+
+const (
+ NonStatTy WriteStatus = iota
+ CanonStatTy
+ SideStatTy
+)
+
+// Rollback is designed to remove a chain of links from the database that aren't
+// certain enough to be valid.
+func (bc *BlockChain) Rollback(chain []common.Hash) {
+ bc.chainmu.Lock()
+ defer bc.chainmu.Unlock()
+
+ for i := len(chain) - 1; i >= 0; i-- {
+ hash := chain[i]
+
+ currentHeader := bc.hc.CurrentHeader()
+ if currentHeader.Hash() == hash {
+ bc.hc.SetCurrentHeader(bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1))
+ }
+ if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash {
+ newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
+ rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash())
+ bc.currentFastBlock.Store(newFastBlock)
+ headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
+ }
+ if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
+ newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
+ rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash())
+ bc.currentBlock.Store(newBlock)
+ headBlockGauge.Update(int64(newBlock.NumberU64()))
+ }
+ }
+ // Truncate ancient data which exceeds the current header.
+ //
+ // Notably, it can happen that system crashes without truncating the ancient data
+ // but the head indicator has been updated in the active store. Regarding this issue,
+ // system will self recovery by truncating the extra data during the setup phase.
+ if err := bc.truncateAncient(bc.hc.CurrentHeader().Number.Uint64()); err != nil {
+ log.Crit("Truncate ancient store failed", "err", err)
+ }
+}
+
+// truncateAncient rewinds the blockchain to the specified header and deletes all
+// data in the ancient store that exceeds the specified header.
+func (bc *BlockChain) truncateAncient(head uint64) error {
+ frozen, err := bc.db.Ancients()
+ if err != nil {
+ return err
+ }
+ // Short circuit if there is no data to truncate in ancient store.
+ if frozen <= head+1 {
+ return nil
+ }
+ // Truncate all the data in the freezer beyond the specified head
+ if err := bc.db.TruncateAncients(head + 1); err != nil {
+ return err
+ }
+ // Clear out any stale content from the caches
+ bc.hc.headerCache.Purge()
+ bc.hc.tdCache.Purge()
+ bc.hc.numberCache.Purge()
+
+ // Clear out any stale content from the caches
+ bc.bodyCache.Purge()
+ bc.bodyRLPCache.Purge()
+ bc.receiptsCache.Purge()
+ bc.blockCache.Purge()
+ bc.txLookupCache.Purge()
+ bc.futureBlocks.Purge()
+
+ log.Info("Rewind ancient data", "number", head)
+ return nil
+}
+
+// numberHash is just a container for a number and a hash, to represent a block
+type numberHash struct {
+ number uint64
+ hash common.Hash
+}
+
+// InsertReceiptChain attempts to complete an already existing header chain with
+// transaction and receipt data.
+func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts, ancientLimit uint64) (int, error) {
+ // We don't require the chainMu here since we want to maximize the
+ // concurrency of header insertion and receipt insertion.
+ bc.wg.Add(1)
+ defer bc.wg.Done()
+
+ var (
+ ancientBlocks, liveBlocks types.Blocks
+ ancientReceipts, liveReceipts []types.Receipts
+ )
+ // Do a sanity check that the provided chain is actually ordered and linked
+ for i := 0; i < len(blockChain); i++ {
+ if i != 0 {
+ if blockChain[i].NumberU64() != blockChain[i-1].NumberU64()+1 || blockChain[i].ParentHash() != blockChain[i-1].Hash() {
+ log.Error("Non contiguous receipt insert", "number", blockChain[i].Number(), "hash", blockChain[i].Hash(), "parent", blockChain[i].ParentHash(),
+ "prevnumber", blockChain[i-1].Number(), "prevhash", blockChain[i-1].Hash())
+ return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, blockChain[i-1].NumberU64(),
+ blockChain[i-1].Hash().Bytes()[:4], i, blockChain[i].NumberU64(), blockChain[i].Hash().Bytes()[:4], blockChain[i].ParentHash().Bytes()[:4])
+ }
+ }
+ if blockChain[i].NumberU64() <= ancientLimit {
+ ancientBlocks, ancientReceipts = append(ancientBlocks, blockChain[i]), append(ancientReceipts, receiptChain[i])
+ } else {
+ liveBlocks, liveReceipts = append(liveBlocks, blockChain[i]), append(liveReceipts, receiptChain[i])
+ }
+ }
+
+ var (
+ stats = struct{ processed, ignored int32 }{}
+ start = time.Now()
+ size = 0
+ )
+ // updateHead updates the head fast sync block if the inserted blocks are better
+ // and returns a indicator whether the inserted blocks are canonical.
+ updateHead := func(head *types.Block) bool {
+ if bc.manualCanonical {
+ return false
+ }
+ bc.chainmu.Lock()
+
+ // Rewind may have occurred, skip in that case.
+ if bc.CurrentHeader().Number.Cmp(head.Number()) >= 0 {
+ currentFastBlock, td := bc.CurrentFastBlock(), bc.GetTd(head.Hash(), head.NumberU64())
+ if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 {
+ rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
+ bc.currentFastBlock.Store(head)
+ headFastBlockGauge.Update(int64(head.NumberU64()))
+ bc.chainmu.Unlock()
+ return true
+ }
+ }
+ bc.chainmu.Unlock()
+ return false
+ }
+ // writeAncient writes blockchain and corresponding receipt chain into ancient store.
+ //
+ // this function only accepts canonical chain data. All side chain will be reverted
+ // eventually.
+ writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
+ var (
+ previous = bc.CurrentFastBlock()
+ batch = bc.db.NewBatch()
+ )
+ // If any error occurs before updating the head or we are inserting a side chain,
+ // all the data written this time wll be rolled back.
+ defer func() {
+ if previous != nil {
+ if err := bc.truncateAncient(previous.NumberU64()); err != nil {
+ log.Crit("Truncate ancient store failed", "err", err)
+ }
+ }
+ }()
+ var deleted []*numberHash
+ for i, block := range blockChain {
+ // Short circuit insertion if shutting down or processing failed
+ if atomic.LoadInt32(&bc.procInterrupt) == 1 {
+ return 0, errInsertionInterrupted
+ }
+ // Short circuit insertion if it is required(used in testing only)
+ if bc.terminateInsert != nil && bc.terminateInsert(block.Hash(), block.NumberU64()) {
+ return i, errors.New("insertion is terminated for testing purpose")
+ }
+ // Short circuit if the owner header is unknown
+ if !bc.HasHeader(block.Hash(), block.NumberU64()) {
+ return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
+ }
+ var (
+ start = time.Now()
+ logged = time.Now()
+ count int
+ )
+ // Migrate all ancient blocks. This can happen if someone upgrades from Geth
+ // 1.8.x to 1.9.x mid-fast-sync. Perhaps we can get rid of this path in the
+ // long term.
+ for {
+ // We can ignore the error here since light client won't hit this code path.
+ frozen, _ := bc.db.Ancients()
+ if frozen >= block.NumberU64() {
+ break
+ }
+ h := rawdb.ReadCanonicalHash(bc.db, frozen)
+ b := rawdb.ReadBlock(bc.db, h, frozen)
+ size += rawdb.WriteAncientBlock(bc.db, b, rawdb.ReadReceipts(bc.db, h, frozen, bc.chainConfig), rawdb.ReadTd(bc.db, h, frozen))
+ count += 1
+
+ // Always keep genesis block in active database.
+ if b.NumberU64() != 0 {
+ deleted = append(deleted, &numberHash{b.NumberU64(), b.Hash()})
+ }
+ if time.Since(logged) > 8*time.Second {
+ log.Info("Migrating ancient blocks", "count", count, "elapsed", common.PrettyDuration(time.Since(start)))
+ logged = time.Now()
+ }
+ // Don't collect too much in-memory, write it out every 100K blocks
+ if len(deleted) > 100000 {
+
+ // Sync the ancient store explicitly to ensure all data has been flushed to disk.
+ if err := bc.db.Sync(); err != nil {
+ return 0, err
+ }
+ // Wipe out canonical block data.
+ for _, nh := range deleted {
+ rawdb.DeleteBlockWithoutNumber(batch, nh.hash, nh.number)
+ rawdb.DeleteCanonicalHash(batch, nh.number)
+ }
+ if err := batch.Write(); err != nil {
+ return 0, err
+ }
+ batch.Reset()
+ // Wipe out side chain too.
+ for _, nh := range deleted {
+ for _, hash := range rawdb.ReadAllHashes(bc.db, nh.number) {
+ rawdb.DeleteBlock(batch, hash, nh.number)
+ }
+ }
+ if err := batch.Write(); err != nil {
+ return 0, err
+ }
+ batch.Reset()
+ deleted = deleted[0:]
+ }
+ }
+ if count > 0 {
+ log.Info("Migrated ancient blocks", "count", count, "elapsed", common.PrettyDuration(time.Since(start)))
+ }
+ // Flush data into ancient database.
+ size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64()))
+ rawdb.WriteTxLookupEntries(batch, block)
+
+ stats.processed++
+ }
+ // Flush all tx-lookup index data.
+ size += batch.ValueSize()
+ if err := batch.Write(); err != nil {
+ return 0, err
+ }
+ batch.Reset()
+
+ // Sync the ancient store explicitly to ensure all data has been flushed to disk.
+ if err := bc.db.Sync(); err != nil {
+ return 0, err
+ }
+ if !updateHead(blockChain[len(blockChain)-1]) {
+ return 0, errors.New("side blocks can't be accepted as the ancient chain data")
+ }
+ previous = nil // disable rollback explicitly
+
+ // Wipe out canonical block data.
+ for _, nh := range deleted {
+ rawdb.DeleteBlockWithoutNumber(batch, nh.hash, nh.number)
+ rawdb.DeleteCanonicalHash(batch, nh.number)
+ }
+ for _, block := range blockChain {
+ // Always keep genesis block in active database.
+ if block.NumberU64() != 0 {
+ rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64())
+ rawdb.DeleteCanonicalHash(batch, block.NumberU64())
+ }
+ }
+ if err := batch.Write(); err != nil {
+ return 0, err
+ }
+ batch.Reset()
+
+ // Wipe out side chain too.
+ for _, nh := range deleted {
+ for _, hash := range rawdb.ReadAllHashes(bc.db, nh.number) {
+ rawdb.DeleteBlock(batch, hash, nh.number)
+ }
+ }
+ for _, block := range blockChain {
+ // Always keep genesis block in active database.
+ if block.NumberU64() != 0 {
+ for _, hash := range rawdb.ReadAllHashes(bc.db, block.NumberU64()) {
+ rawdb.DeleteBlock(batch, hash, block.NumberU64())
+ }
+ }
+ }
+ if err := batch.Write(); err != nil {
+ return 0, err
+ }
+ return 0, nil
+ }
+ // writeLive writes blockchain and corresponding receipt chain into active store.
+ writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
+ batch := bc.db.NewBatch()
+ for i, block := range blockChain {
+ // Short circuit insertion if shutting down or processing failed
+ if atomic.LoadInt32(&bc.procInterrupt) == 1 {
+ return 0, errInsertionInterrupted
+ }
+ // Short circuit if the owner header is unknown
+ if !bc.HasHeader(block.Hash(), block.NumberU64()) {
+ return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
+ }
+ if bc.HasBlock(block.Hash(), block.NumberU64()) {
+ stats.ignored++
+ continue
+ }
+ // Write all the data out into the database
+ rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
+ rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
+ rawdb.WriteTxLookupEntries(batch, block)
+
+ stats.processed++
+ if batch.ValueSize() >= ethdb.IdealBatchSize {
+ if err := batch.Write(); err != nil {
+ return 0, err
+ }
+ size += batch.ValueSize()
+ batch.Reset()
+ }
+ }
+ if batch.ValueSize() > 0 {
+ size += batch.ValueSize()
+ if err := batch.Write(); err != nil {
+ return 0, err
+ }
+ }
+ updateHead(blockChain[len(blockChain)-1])
+ return 0, nil
+ }
+ // Write downloaded chain data and corresponding receipt chain data.
+ if len(ancientBlocks) > 0 {
+ if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil {
+ if err == errInsertionInterrupted {
+ return 0, nil
+ }
+ return n, err
+ }
+ }
+ if len(liveBlocks) > 0 {
+ if n, err := writeLive(liveBlocks, liveReceipts); err != nil {
+ if err == errInsertionInterrupted {
+ return 0, nil
+ }
+ return n, err
+ }
+ }
+
+ head := blockChain[len(blockChain)-1]
+ context := []interface{}{
+ "count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
+ "number", head.Number(), "hash", head.Hash(), "age", common.PrettyAge(time.Unix(int64(head.Time()), 0)),
+ "size", common.StorageSize(size),
+ }
+ if stats.ignored > 0 {
+ context = append(context, []interface{}{"ignored", stats.ignored}...)
+ }
+ log.Info("Imported new block receipts", context...)
+
+ return 0, nil
+}
+
+var lastWrite uint64
+
+// writeBlockWithoutState writes only the block and its metadata to the database,
+// but does not write any state. This is used to construct competing side forks
+// up to the point where they exceed the canonical total difficulty.
+func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (err error) {
+ bc.wg.Add(1)
+ defer bc.wg.Done()
+
+ if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), td); err != nil {
+ return err
+ }
+ rawdb.WriteBlock(bc.db, block)
+
+ return nil
+}
+
+// writeKnownBlock updates the head block flag with a known block
+// and introduces chain reorg if necessary.
+func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
+ bc.wg.Add(1)
+ defer bc.wg.Done()
+
+ current := bc.CurrentBlock()
+ if block.ParentHash() != current.Hash() {
+ if err := bc.reorg(current, block); err != nil {
+ return err
+ }
+ }
+ // Write the positional metadata for transaction/receipt lookups.
+ // Preimages here is empty, ignore it.
+ rawdb.WriteTxLookupEntries(bc.db, block)
+
+ bc.insert(block)
+ return nil
+}
+
+// WriteBlockWithState writes the block and all associated state to the database.
+func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
+ bc.chainmu.Lock()
+ defer bc.chainmu.Unlock()
+
+ return bc.writeBlockWithState(block, receipts, state)
+}
+
+// writeBlockWithState writes the block and all associated state to the database,
+// but is expects the chain mutex to be held.
+func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
+ bc.wg.Add(1)
+ defer bc.wg.Done()
+
+ // Calculate the total difficulty of the block
+ ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
+ if ptd == nil {
+ return NonStatTy, consensus.ErrUnknownAncestor
+ }
+ // Make sure no inconsistent state is leaked during insertion
+ currentBlock := bc.CurrentBlock()
+ localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
+ externTd := new(big.Int).Add(block.Difficulty(), ptd)
+
+ // Irrelevant of the canonical status, write the block itself to the database
+ if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
+ return NonStatTy, err
+ }
+ rawdb.WriteBlock(bc.db, block)
+
+ root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
+ if err != nil {
+ return NonStatTy, err
+ }
+ triedb := bc.stateCache.TrieDB()
+
+ // If we're running an archive node, always flush
+ if bc.cacheConfig.TrieDirtyDisabled {
+ if err := triedb.Commit(root, false); err != nil {
+ return NonStatTy, err
+ }
+ } else {
+ // Full but not archive node, do proper garbage collection
+ triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
+ bc.triegc.Push(root, -int64(block.NumberU64()))
+
+ if current := block.NumberU64(); current > TriesInMemory {
+ // If we exceeded our memory allowance, flush matured singleton nodes to disk
+ var (
+ nodes, imgs = triedb.Size()
+ limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
+ )
+ if nodes > limit || imgs > 4*1024*1024 {
+ triedb.Cap(limit - ethdb.IdealBatchSize)
+ }
+ // Find the next state trie we need to commit
+ chosen := current - TriesInMemory
+
+ // If we exceeded out time allowance, flush an entire trie to disk
+ if bc.gcproc > bc.cacheConfig.TrieTimeLimit {
+ // If the header is missing (canonical chain behind), we're reorging a low
+ // diff sidechain. Suspend committing until this operation is completed.
+ header := bc.GetHeaderByNumber(chosen)
+ if header == nil {
+ log.Warn("Reorg in progress, trie commit postponed", "number", chosen)
+ } else {
+ // If we're exceeding limits but haven't reached a large enough memory gap,
+ // warn the user that the system is becoming unstable.
+ if chosen < lastWrite+TriesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
+ log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory)
+ }
+ // Flush an entire trie and restart the counters
+ triedb.Commit(header.Root, true)
+ lastWrite = chosen
+ bc.gcproc = 0
+ }
+ }
+ // Garbage collect anything below our required write retention
+ for !bc.triegc.Empty() {
+ root, number := bc.triegc.Pop()
+ if uint64(-number) > chosen {
+ bc.triegc.Push(root, number)
+ break
+ }
+ triedb.Dereference(root.(common.Hash))
+ }
+ }
+ }
+
+ // Write other block data using a batch.
+ batch := bc.db.NewBatch()
+ rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
+
+ // If the total difficulty is higher than our known, add it to the canonical chain
+ // Second clause in the if statement reduces the vulnerability to selfish mining.
+ // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
+ reorg := externTd.Cmp(localTd) > 0
+ currentBlock = bc.CurrentBlock()
+ if !bc.manualCanonical && (!reorg && externTd.Cmp(localTd) == 0) {
+ //if (!reorg && externTd.Cmp(localTd) == 0) {
+ // Split same-difficulty blocks by number, then preferentially select
+ // the block generated by the local miner as the canonical block.
+ if block.NumberU64() < currentBlock.NumberU64() {
+ reorg = true
+ } else if block.NumberU64() == currentBlock.NumberU64() {
+ var currentPreserve, blockPreserve bool
+ if bc.shouldPreserve != nil {
+ currentPreserve, blockPreserve = bc.shouldPreserve(currentBlock), bc.shouldPreserve(block)
+ }
+ reorg = !currentPreserve && (blockPreserve || mrand.Float64() < 0.5)
+ }
+ }
+ if !bc.manualCanonical && reorg {
+ // Reorganise the chain if the parent is not the head block
+ if block.ParentHash() != currentBlock.Hash() {
+ if err := bc.reorg(currentBlock, block); err != nil {
+ return NonStatTy, err
+ }
+ }
+ // Write the positional metadata for transaction/receipt lookups and preimages
+ rawdb.WriteTxLookupEntries(batch, block)
+ rawdb.WritePreimages(batch, state.Preimages())
+
+ status = CanonStatTy
+ } else {
+ status = SideStatTy
+ }
+ if err := batch.Write(); err != nil {
+ return NonStatTy, err
+ }
+
+ // Set new head.
+ if status == CanonStatTy {
+ bc.insert(block)
+ }
+ bc.futureBlocks.Remove(block.Hash())
+ return status, nil
+}
+
+// addFutureBlock checks if the block is within the max allowed window to get
+// accepted for future processing, and returns an error if the block is too far
+// ahead and was not added.
+func (bc *BlockChain) addFutureBlock(block *types.Block) error {
+ max := uint64(time.Now().Unix() + maxTimeFutureBlocks)
+ if block.Time() > max {
+ return fmt.Errorf("future block timestamp %v > allowed %v", block.Time(), max)
+ }
+ bc.futureBlocks.Add(block.Hash(), block)
+ return nil
+}
+
+// InsertChain attempts to insert the given batch of blocks in to the canonical
+// chain or, otherwise, create a fork. If an error is returned it will return
+// the index number of the failing block as well an error describing what went
+// wrong.
+//
+// After insertion is done, all accumulated events will be fired.
+func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
+ // Sanity check that we have something meaningful to import
+ if len(chain) == 0 {
+ return 0, nil
+ }
+
+ bc.blockProcFeed.Send(true)
+ defer bc.blockProcFeed.Send(false)
+
+ // Remove already known canon-blocks
+ var (
+ block, prev *types.Block
+ )
+ // Do a sanity check that the provided chain is actually ordered and linked
+ for i := 1; i < len(chain); i++ {
+ block = chain[i]
+ prev = chain[i-1]
+ if block.NumberU64() != prev.NumberU64()+1 || block.ParentHash() != prev.Hash() {
+ // Chain broke ancestry, log a message (programming error) and skip insertion
+ log.Error("Non contiguous block insert", "number", block.Number(), "hash", block.Hash(),
+ "parent", block.ParentHash(), "prevnumber", prev.Number(), "prevhash", prev.Hash())
+
+ return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, prev.NumberU64(),
+ prev.Hash().Bytes()[:4], i, block.NumberU64(), block.Hash().Bytes()[:4], block.ParentHash().Bytes()[:4])
+ }
+ }
+ // Pre-checks passed, start the full block imports
+ bc.wg.Add(1)
+ bc.chainmu.Lock()
+ n, events, logs, err := bc.insertChain(chain, true)
+ bc.chainmu.Unlock()
+ bc.wg.Done()
+
+ bc.PostChainEvents(events, logs)
+ return n, err
+}
+
+// insertChain is the internal implementation of InsertChain, which assumes that
+// 1) chains are contiguous, and 2) The chain mutex is held.
+//
+// This method is split out so that import batches that require re-injecting
+// historical blocks can do so without releasing the lock, which could lead to
+// racey behaviour. If a sidechain import is in progress, and the historic state
+// is imported, but then new canon-head is added before the actual sidechain
+// completes, then the historic state could be pruned again
+func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
+ // If the chain is terminating, don't even bother starting up
+ if atomic.LoadInt32(&bc.procInterrupt) == 1 {
+ return 0, nil, nil, nil
+ }
+ // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
+ senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
+
+ // A queued approach to delivering events. This is generally
+ // faster than direct delivery and requires much less mutex
+ // acquiring.
+ var (
+ stats = insertStats{startTime: mclock.Now()}
+ events = make([]interface{}, 0, len(chain))
+ lastCanon *types.Block
+ coalescedLogs []*types.Log
+ )
+ // Start the parallel header verifier
+ headers := make([]*types.Header, len(chain))
+ seals := make([]bool, len(chain))
+
+ for i, block := range chain {
+ headers[i] = block.Header()
+ seals[i] = verifySeals
+ }
+ abort, results := bc.engine.VerifyHeaders(bc, headers, seals)
+ defer close(abort)
+
+ // Peek the error for the first block to decide the directing import logic
+ it := newInsertIterator(chain, results, bc.validator)
+
+ block, err := it.next()
+
+ // Left-trim all the known blocks
+ if err == ErrKnownBlock {
+ // First block (and state) is known
+ // 1. We did a roll-back, and should now do a re-import
+ // 2. The block is stored as a sidechain, and is lying about it's stateroot, and passes a stateroot
+ // from the canonical chain, which has not been verified.
+ // Skip all known blocks that are behind us
+ var (
+ current = bc.CurrentBlock()
+ localTd = bc.GetTd(current.Hash(), current.NumberU64())
+ externTd = bc.GetTd(block.ParentHash(), block.NumberU64()-1) // The first block can't be nil
+ )
+ for block != nil && err == ErrKnownBlock {
+ externTd = new(big.Int).Add(externTd, block.Difficulty())
+ if !bc.manualCanonical && localTd.Cmp(externTd) < 0 {
+ break
+ }
+ log.Debug("Ignoring already known block", "number", block.Number(), "hash", block.Hash())
+ stats.ignored++
+
+ block, err = it.next()
+ }
+ // The remaining blocks are still known blocks, the only scenario here is:
+ // During the fast sync, the pivot point is already submitted but rollback
+ // happens. Then node resets the head full block to a lower height via `rollback`
+ // and leaves a few known blocks in the database.
+ //
+ // When node runs a fast sync again, it can re-import a batch of known blocks via
+ // `insertChain` while a part of them have higher total difficulty than current
+ // head full block(new pivot point).
+ for block != nil && err == ErrKnownBlock {
+ log.Debug("Writing previously known block", "number", block.Number(), "hash", block.Hash())
+ if err := bc.writeKnownBlock(block); err != nil {
+ return it.index, nil, nil, err
+ }
+ lastCanon = block
+
+ block, err = it.next()
+ }
+ // Falls through to the block import
+ }
+ switch {
+ // First block is pruned, insert as sidechain and reorg only if TD grows enough
+ case err == consensus.ErrPrunedAncestor:
+ log.Debug("Pruned ancestor, inserting as sidechain", "number", block.Number(), "hash", block.Hash())
+ return bc.insertSideChain(block, it)
+
+ // First block is future, shove it (and all children) to the future queue (unknown ancestor)
+ case err == consensus.ErrFutureBlock || (err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(it.first().ParentHash())):
+ for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) {
+ log.Debug("Future block, postponing import", "number", block.Number(), "hash", block.Hash())
+ if err := bc.addFutureBlock(block); err != nil {
+ return it.index, events, coalescedLogs, err
+ }
+ block, err = it.next()
+ }
+ stats.queued += it.processed()
+ stats.ignored += it.remaining()
+
+ // If there are any still remaining, mark as ignored
+ return it.index, events, coalescedLogs, err
+
+ // Some other error occurred, abort
+ case err != nil:
+ stats.ignored += len(it.chain)
+ bc.reportBlock(block, nil, err)
+ return it.index, events, coalescedLogs, err
+ }
+ // No validation errors for the first block (or chain prefix skipped)
+ for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() {
+ // If the chain is terminating, stop processing blocks
+ if atomic.LoadInt32(&bc.procInterrupt) == 1 {
+ log.Debug("Premature abort during blocks processing")
+ break
+ }
+ // If the header is a banned one, straight out abort
+ if BadHashes[block.Hash()] {
+ bc.reportBlock(block, nil, ErrBlacklistedHash)
+ return it.index, events, coalescedLogs, ErrBlacklistedHash
+ }
+ // If the block is known (in the middle of the chain), it's a special case for
+ // Clique blocks where they can share state among each other, so importing an
+ // older block might complete the state of the subsequent one. In this case,
+ // just skip the block (we already validated it once fully (and crashed), since
+ // its header and body was already in the database).
+ if err == ErrKnownBlock {
+ logger := log.Debug
+ if bc.chainConfig.Clique == nil {
+ logger = log.Warn
+ }
+ logger("Inserted known block", "number", block.Number(), "hash", block.Hash(),
+ "uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(),
+ "root", block.Root())
+
+ if err := bc.writeKnownBlock(block); err != nil {
+ return it.index, nil, nil, err
+ }
+ stats.processed++
+
+ // We can assume that logs are empty here, since the only way for consecutive
+ // Clique blocks to have the same state is if there are no transactions.
+ events = append(events, ChainEvent{block, block.Hash(), nil})
+ lastCanon = block
+
+ continue
+ }
+ // Retrieve the parent block and it's state to execute on top
+ start := time.Now()
+
+ parent := it.previous()
+ if parent == nil {
+ parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
+ }
+ statedb, err := state.New(parent.Root, bc.stateCache)
+ if err != nil {
+ return it.index, events, coalescedLogs, err
+ }
+ // If we have a followup block, run that against the current state to pre-cache
+ // transactions and probabilistically some of the account/storage trie nodes.
+ var followupInterrupt uint32
+
+ if !bc.cacheConfig.TrieCleanNoPrefetch {
+ if followup, err := it.peek(); followup != nil && err == nil {
+ go func(start time.Time) {
+ throwaway, _ := state.New(parent.Root, bc.stateCache)
+ bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
+
+ blockPrefetchExecuteTimer.Update(time.Since(start))
+ if atomic.LoadUint32(&followupInterrupt) == 1 {
+ blockPrefetchInterruptMeter.Mark(1)
+ }
+ }(time.Now())
+ }
+ }
+ // Process block using the parent state as reference point
+ substart := time.Now()
+ receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
+ if err != nil {
+ bc.reportBlock(block, receipts, err)
+ atomic.StoreUint32(&followupInterrupt, 1)
+ return it.index, events, coalescedLogs, err
+ }
+ // Update the metrics touched during block processing
+ accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
+ storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them
+ accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
+ storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
+
+ triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
+ trieproc := statedb.AccountReads + statedb.AccountUpdates
+ trieproc += statedb.StorageReads + statedb.StorageUpdates
+
+ blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash)
+
+ // Validate the state using the default validator
+ substart = time.Now()
+ if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
+ bc.reportBlock(block, receipts, err)
+ atomic.StoreUint32(&followupInterrupt, 1)
+ return it.index, events, coalescedLogs, err
+ }
+ proctime := time.Since(start)
+
+ // Update the metrics touched during block validation
+ accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete, we can mark them
+ storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete, we can mark them
+
+ blockValidationTimer.Update(time.Since(substart) - (statedb.AccountHashes + statedb.StorageHashes - triehash))
+
+ // Write the block to the chain and get the status.
+ substart = time.Now()
+ status, err := bc.writeBlockWithState(block, receipts, statedb)
+ if err != nil {
+ atomic.StoreUint32(&followupInterrupt, 1)
+ return it.index, events, coalescedLogs, err
+ }
+ atomic.StoreUint32(&followupInterrupt, 1)
+
+ // Update the metrics touched during block commit
+ accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
+ storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
+
+ blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits)
+ blockInsertTimer.UpdateSince(start)
+
+ switch status {
+ case CanonStatTy:
+ log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(),
+ "uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(),
+ "elapsed", common.PrettyDuration(time.Since(start)),
+ "root", block.Root())
+
+ coalescedLogs = append(coalescedLogs, logs...)
+ events = append(events, ChainEvent{block, block.Hash(), logs})
+ lastCanon = block
+
+ // Only count canonical blocks for GC processing time
+ bc.gcproc += proctime
+
+ case SideStatTy:
+ log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(),
+ "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
+ "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
+ "root", block.Root())
+ events = append(events, ChainSideEvent{block})
+
+ default:
+ // This in theory is impossible, but lets be nice to our future selves and leave
+ // a log, instead of trying to track down blocks imports that don't emit logs.
+ log.Warn("Inserted block with unknown status", "number", block.Number(), "hash", block.Hash(),
+ "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
+ "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
+ "root", block.Root())
+ }
+ stats.processed++
+ stats.usedGas += usedGas
+
+ dirty, _ := bc.stateCache.TrieDB().Size()
+ stats.report(chain, it.index, dirty)
+ }
+ // Any blocks remaining here? The only ones we care about are the future ones
+ if block != nil && err == consensus.ErrFutureBlock {
+ if err := bc.addFutureBlock(block); err != nil {
+ return it.index, events, coalescedLogs, err
+ }
+ block, err = it.next()
+
+ for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() {
+ if err := bc.addFutureBlock(block); err != nil {
+ return it.index, events, coalescedLogs, err
+ }
+ stats.queued++
+ }
+ }
+ stats.ignored += it.remaining()
+
+ // Append a single chain head event if we've progressed the chain
+ if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
+ events = append(events, ChainHeadEvent{lastCanon})
+ }
+ return it.index, events, coalescedLogs, err
+}
+
+// insertSideChain is called when an import batch hits upon a pruned ancestor
+// error, which happens when a sidechain with a sufficiently old fork-block is
+// found.
+//
+// The method writes all (header-and-body-valid) blocks to disk, then tries to
+// switch over to the new chain if the TD exceeded the current chain.
+func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, error) {
+ var (
+ externTd *big.Int
+ current = bc.CurrentBlock()
+ )
+ // The first sidechain block error is already verified to be ErrPrunedAncestor.
+ // Since we don't import them here, we expect ErrUnknownAncestor for the remaining
+ // ones. Any other errors means that the block is invalid, and should not be written
+ // to disk.
+ err := consensus.ErrPrunedAncestor
+ for ; block != nil && (err == consensus.ErrPrunedAncestor); block, err = it.next() {
+ // Check the canonical state root for that number
+ if number := block.NumberU64(); current.NumberU64() >= number {
+ canonical := bc.GetBlockByNumber(number)
+ if canonical != nil && canonical.Hash() == block.Hash() {
+ // Not a sidechain block, this is a re-import of a canon block which has it's state pruned
+
+ // Collect the TD of the block. Since we know it's a canon one,
+ // we can get it directly, and not (like further below) use
+ // the parent and then add the block on top
+ externTd = bc.GetTd(block.Hash(), block.NumberU64())
+ continue
+ }
+ if canonical != nil && canonical.Root() == block.Root() {
+ // This is most likely a shadow-state attack. When a fork is imported into the
+ // database, and it eventually reaches a block height which is not pruned, we
+ // just found that the state already exist! This means that the sidechain block
+ // refers to a state which already exists in our canon chain.
+ //
+ // If left unchecked, we would now proceed importing the blocks, without actually
+ // having verified the state of the previous blocks.
+ log.Warn("Sidechain ghost-state attack detected", "number", block.NumberU64(), "sideroot", block.Root(), "canonroot", canonical.Root())
+
+ // If someone legitimately side-mines blocks, they would still be imported as usual. However,
+ // we cannot risk writing unverified blocks to disk when they obviously target the pruning
+ // mechanism.
+ return it.index, nil, nil, errors.New("sidechain ghost-state attack")
+ }
+ }
+ if externTd == nil {
+ externTd = bc.GetTd(block.ParentHash(), block.NumberU64()-1)
+ }
+ externTd = new(big.Int).Add(externTd, block.Difficulty())
+
+ if !bc.HasBlock(block.Hash(), block.NumberU64()) {
+ start := time.Now()
+ if err := bc.writeBlockWithoutState(block, externTd); err != nil {
+ return it.index, nil, nil, err
+ }
+ log.Debug("Injected sidechain block", "number", block.Number(), "hash", block.Hash(),
+ "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
+ "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
+ "root", block.Root())
+ }
+ }
+ // At this point, we've written all sidechain blocks to database. Loop ended
+ // either on some other error or all were processed. If there was some other
+ // error, we can ignore the rest of those blocks.
+ //
+ // If the externTd was larger than our local TD, we now need to reimport the previous
+ // blocks to regenerate the required state
+ localTd := bc.GetTd(current.Hash(), current.NumberU64())
+ if bc.manualCanonical || localTd.Cmp(externTd) > 0 {
+ log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().Number, "sidetd", externTd, "localtd", localTd)
+ return it.index, nil, nil, err
+ }
+ // Gather all the sidechain hashes (full blocks may be memory heavy)
+ var (
+ hashes []common.Hash
+ numbers []uint64
+ )
+ parent := it.previous()
+ for parent != nil && !bc.HasState(parent.Root) {
+ hashes = append(hashes, parent.Hash())
+ numbers = append(numbers, parent.Number.Uint64())
+
+ parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1)
+ }
+ if parent == nil {
+ return it.index, nil, nil, errors.New("missing parent")
+ }
+ // Import all the pruned blocks to make the state available
+ var (
+ blocks []*types.Block
+ memory common.StorageSize
+ )
+ for i := len(hashes) - 1; i >= 0; i-- {
+ // Append the next block to our batch
+ block := bc.GetBlock(hashes[i], numbers[i])
+
+ blocks = append(blocks, block)
+ memory += block.Size()
+
+ // If memory use grew too large, import and continue. Sadly we need to discard
+ // all raised events and logs from notifications since we're too heavy on the
+ // memory here.
+ if len(blocks) >= 2048 || memory > 64*1024*1024 {
+ log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64())
+ if _, _, _, err := bc.insertChain(blocks, false); err != nil {
+ return 0, nil, nil, err
+ }
+ blocks, memory = blocks[:0], 0
+
+ // If the chain is terminating, stop processing blocks
+ if atomic.LoadInt32(&bc.procInterrupt) == 1 {
+ log.Debug("Premature abort during blocks processing")
+ return 0, nil, nil, nil
+ }
+ }
+ }
+ if len(blocks) > 0 {
+ log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64())
+ return bc.insertChain(blocks, false)
+ }
+ return 0, nil, nil, nil
+}
+
+// reorg takes two blocks, an old chain and a new chain and will reconstruct the
+// blocks and inserts them to be part of the new canonical chain and accumulates
+// potential missing transactions and post an event about them.
+func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
+ var (
+ newChain types.Blocks
+ oldChain types.Blocks
+ commonBlock *types.Block
+
+ deletedTxs types.Transactions
+ addedTxs types.Transactions
+
+ deletedLogs []*types.Log
+ rebirthLogs []*types.Log
+
+ // collectLogs collects the logs that were generated during the
+ // processing of the block that corresponds with the given hash.
+ // These logs are later announced as deleted or reborn
+ collectLogs = func(hash common.Hash, removed bool) {
+ number := bc.hc.GetBlockNumber(hash)
+ if number == nil {
+ return
+ }
+ receipts := rawdb.ReadReceipts(bc.db, hash, *number, bc.chainConfig)
+ for _, receipt := range receipts {
+ for _, log := range receipt.Logs {
+ l := *log
+ if removed {
+ l.Removed = true
+ deletedLogs = append(deletedLogs, &l)
+ } else {
+ rebirthLogs = append(rebirthLogs, &l)
+ }
+ }
+ }
+ }
+ )
+ // Reduce the longer chain to the same number as the shorter one
+ if oldBlock.NumberU64() > newBlock.NumberU64() {
+ // Old chain is longer, gather all transactions and logs as deleted ones
+ for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
+ oldChain = append(oldChain, oldBlock)
+ deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
+ collectLogs(oldBlock.Hash(), true)
+ }
+ } else {
+ // New chain is longer, stash all blocks away for subsequent insertion
+ for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
+ newChain = append(newChain, newBlock)
+ }
+ }
+ if oldBlock == nil {
+ return fmt.Errorf("invalid old chain")
+ }
+ if newBlock == nil {
+ return fmt.Errorf("invalid new chain")
+ }
+ // Both sides of the reorg are at the same number, reduce both until the common
+ // ancestor is found
+ for {
+ // If the common ancestor was found, bail out
+ if oldBlock.Hash() == newBlock.Hash() {
+ commonBlock = oldBlock
+ break
+ }
+ // Remove an old block as well as stash away a new block
+ oldChain = append(oldChain, oldBlock)
+ deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
+ collectLogs(oldBlock.Hash(), true)
+
+ newChain = append(newChain, newBlock)
+
+ // Step back with both chains
+ oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1)
+ if oldBlock == nil {
+ return fmt.Errorf("invalid old chain")
+ }
+ newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
+ if newBlock == nil {
+ return fmt.Errorf("invalid new chain")
+ }
+ }
+ // Ensure the user sees large reorgs
+ if len(oldChain) > 0 && len(newChain) > 0 {
+ logFn := log.Info
+ msg := "Chain reorg detected"
+ if len(oldChain) > 63 {
+ msg = "Large chain reorg detected"
+ logFn = log.Warn
+ }
+ logFn(msg, "number", commonBlock.Number(), "hash", commonBlock.Hash(),
+ "drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash())
+ blockReorgAddMeter.Mark(int64(len(newChain)))
+ blockReorgDropMeter.Mark(int64(len(oldChain)))
+ } else {
+ log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash())
+ }
+ // Insert the new chain(except the head block(reverse order)),
+ // taking care of the proper incremental order.
+ for i := len(newChain) - 1; i >= 1; i-- {
+ // Insert the block in the canonical way, re-writing history
+ bc.insert(newChain[i])
+
+ // Collect reborn logs due to chain reorg
+ collectLogs(newChain[i].Hash(), false)
+
+ // Write lookup entries for hash based transaction/receipt searches
+ rawdb.WriteTxLookupEntries(bc.db, newChain[i])
+ addedTxs = append(addedTxs, newChain[i].Transactions()...)
+ }
+ // When transactions get deleted from the database, the receipts that were
+ // created in the fork must also be deleted
+ batch := bc.db.NewBatch()
+ for _, tx := range types.TxDifference(deletedTxs, addedTxs) {
+ rawdb.DeleteTxLookupEntry(batch, tx.Hash())
+ }
+ // Delete any canonical number assignments above the new head
+ number := bc.CurrentBlock().NumberU64()
+ for i := number + 1; ; i++ {
+ hash := rawdb.ReadCanonicalHash(bc.db, i)
+ if hash == (common.Hash{}) {
+ break
+ }
+ rawdb.DeleteCanonicalHash(batch, i)
+ }
+ batch.Write()
+ // If any logs need to be fired, do it now. In theory we could avoid creating
+ // this goroutine if there are no events to fire, but realistcally that only
+ // ever happens if we're reorging empty blocks, which will only happen on idle
+ // networks where performance is not an issue either way.
+ //
+ // TODO(karalabe): Can we get rid of the goroutine somehow to guarantee correct
+ // event ordering?
+ go func() {
+ if len(deletedLogs) > 0 {
+ bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
+ }
+ if len(rebirthLogs) > 0 {
+ bc.logsFeed.Send(rebirthLogs)
+ }
+ if len(oldChain) > 0 {
+ for _, block := range oldChain {
+ bc.chainSideFeed.Send(ChainSideEvent{Block: block})
+ }
+ }
+ }()
+ return nil
+}
+
+// PostChainEvents iterates over the events generated by a chain insertion and
+// posts them into the event feed.
+// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
+func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
+ // post event logs for further processing
+ if logs != nil {
+ bc.logsFeed.Send(logs)
+ }
+ for _, event := range events {
+ switch ev := event.(type) {
+ case ChainEvent:
+ bc.chainFeed.Send(ev)
+
+ case ChainHeadEvent:
+ bc.chainHeadFeed.Send(ev)
+
+ case ChainSideEvent:
+ bc.chainSideFeed.Send(ev)
+ }
+ }
+}
+
+func (bc *BlockChain) update() {
+ futureTimer := time.NewTicker(5 * time.Second)
+ defer futureTimer.Stop()
+ for {
+ select {
+ case <-futureTimer.C:
+ bc.procFutureBlocks()
+ case <-bc.quit:
+ return
+ }
+ }
+}
+
+// BadBlocks returns a list of the last 'bad blocks' that the client has seen on the network
+func (bc *BlockChain) BadBlocks() []*types.Block {
+ blocks := make([]*types.Block, 0, bc.badBlocks.Len())
+ for _, hash := range bc.badBlocks.Keys() {
+ if blk, exist := bc.badBlocks.Peek(hash); exist {
+ block := blk.(*types.Block)
+ blocks = append(blocks, block)
+ }
+ }
+ return blocks
+}
+
+// addBadBlock adds a bad block to the bad-block LRU cache
+func (bc *BlockChain) addBadBlock(block *types.Block) {
+ bc.badBlocks.Add(block.Hash(), block)
+}
+
+// reportBlock logs a bad block error.
+func (bc *BlockChain) reportBlock(block *types.Block, receipts types.Receipts, err error) {
+ bc.addBadBlock(block)
+
+ var receiptString string
+ for i, receipt := range receipts {
+ receiptString += fmt.Sprintf("\t %d: cumulative: %v gas: %v contract: %v status: %v tx: %v logs: %v bloom: %x state: %x\n",
+ i, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.ContractAddress.Hex(),
+ receipt.Status, receipt.TxHash.Hex(), receipt.Logs, receipt.Bloom, receipt.PostState)
+ }
+ log.Error(fmt.Sprintf(`
+########## BAD BLOCK #########
+Chain config: %v
+
+Number: %v
+Hash: 0x%x
+%v
+
+Error: %v
+##############################
+`, bc.chainConfig, block.Number(), block.Hash(), receiptString, err))
+}
+
+// InsertHeaderChain attempts to insert the given header chain in to the local
+// chain, possibly creating a reorg. If an error is returned, it will return the
+// index number of the failing header as well an error describing what went wrong.
+//
+// The verify parameter can be used to fine tune whether nonce verification
+// should be done or not. The reason behind the optional check is because some
+// of the header retrieval mechanisms already need to verify nonces, as well as
+// because nonces can be verified sparsely, not needing to check each.
+func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
+ start := time.Now()
+ if i, err := bc.hc.ValidateHeaderChain(chain, checkFreq); err != nil {
+ return i, err
+ }
+
+ // Make sure only one thread manipulates the chain at once
+ bc.chainmu.Lock()
+ defer bc.chainmu.Unlock()
+
+ bc.wg.Add(1)
+ defer bc.wg.Done()
+
+ whFunc := func(header *types.Header) error {
+ _, err := bc.hc.WriteHeader(header)
+ return err
+ }
+ return bc.hc.InsertHeaderChain(chain, whFunc, start)
+}
+
+// CurrentHeader retrieves the current head header of the canonical chain. The
+// header is retrieved from the HeaderChain's internal cache.
+func (bc *BlockChain) CurrentHeader() *types.Header {
+ return bc.hc.CurrentHeader()
+}
+
+// GetTd retrieves a block's total difficulty in the canonical chain from the
+// database by hash and number, caching it if found.
+func (bc *BlockChain) GetTd(hash common.Hash, number uint64) *big.Int {
+ return bc.hc.GetTd(hash, number)
+}
+
+// GetTdByHash retrieves a block's total difficulty in the canonical chain from the
+// database by hash, caching it if found.
+func (bc *BlockChain) GetTdByHash(hash common.Hash) *big.Int {
+ return bc.hc.GetTdByHash(hash)
+}
+
+// GetHeader retrieves a block header from the database by hash and number,
+// caching it if found.
+func (bc *BlockChain) GetHeader(hash common.Hash, number uint64) *types.Header {
+ return bc.hc.GetHeader(hash, number)
+}
+
+// GetHeaderByHash retrieves a block header from the database by hash, caching it if
+// found.
+func (bc *BlockChain) GetHeaderByHash(hash common.Hash) *types.Header {
+ return bc.hc.GetHeaderByHash(hash)
+}
+
+// HasHeader checks if a block header is present in the database or not, caching
+// it if present.
+func (bc *BlockChain) HasHeader(hash common.Hash, number uint64) bool {
+ return bc.hc.HasHeader(hash, number)
+}
+
+// GetBlockHashesFromHash retrieves a number of block hashes starting at a given
+// hash, fetching towards the genesis block.
+func (bc *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash {
+ return bc.hc.GetBlockHashesFromHash(hash, max)
+}
+
+// GetAncestor retrieves the Nth ancestor of a given block. It assumes that either the given block or
+// a close ancestor of it is canonical. maxNonCanonical points to a downwards counter limiting the
+// number of blocks to be individually checked before we reach the canonical chain.
+//
+// Note: ancestor == 0 returns the same block, 1 returns its parent and so on.
+func (bc *BlockChain) GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) {
+ bc.chainmu.RLock()
+ defer bc.chainmu.RUnlock()
+
+ return bc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical)
+}
+
+// GetHeaderByNumber retrieves a block header from the database by number,
+// caching it (associated with its hash) if found.
+func (bc *BlockChain) GetHeaderByNumber(number uint64) *types.Header {
+ return bc.hc.GetHeaderByNumber(number)
+}
+
+// GetTransactionLookup retrieves the lookup associate with the given transaction
+// hash from the cache or database.
+func (bc *BlockChain) GetTransactionLookup(hash common.Hash) *rawdb.LegacyTxLookupEntry {
+ // Short circuit if the txlookup already in the cache, retrieve otherwise
+ if lookup, exist := bc.txLookupCache.Get(hash); exist {
+ return lookup.(*rawdb.LegacyTxLookupEntry)
+ }
+ tx, blockHash, blockNumber, txIndex := rawdb.ReadTransaction(bc.db, hash)
+ if tx == nil {
+ return nil
+ }
+ lookup := &rawdb.LegacyTxLookupEntry{BlockHash: blockHash, BlockIndex: blockNumber, Index: txIndex}
+ bc.txLookupCache.Add(hash, lookup)
+ return lookup
+}
+
+// Config retrieves the chain's fork configuration.
+func (bc *BlockChain) Config() *params.ChainConfig { return bc.chainConfig }
+
+// Engine retrieves the blockchain's consensus engine.
+func (bc *BlockChain) Engine() consensus.Engine { return bc.engine }
+
+// SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent.
+func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription {
+ return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch))
+}
+
+// SubscribeChainEvent registers a subscription of ChainEvent.
+func (bc *BlockChain) SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription {
+ return bc.scope.Track(bc.chainFeed.Subscribe(ch))
+}
+
+// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent.
+func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription {
+ return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
+}
+
+// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
+func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
+ return bc.scope.Track(bc.chainSideFeed.Subscribe(ch))
+}
+
+// SubscribeLogsEvent registers a subscription of []*types.Log.
+func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+ return bc.scope.Track(bc.logsFeed.Subscribe(ch))
+}
+
+// SubscribeBlockProcessingEvent registers a subscription of bool where true means
+// block processing has started while false means it has stopped.
+func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
+ return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
+}
+
+func (bc *BlockChain) ManualHead(hash common.Hash) error {
+ block := bc.GetBlockByHash(hash)
+ if block == nil {
+ return errors.New("block not found")
+ }
+ bc.chainmu.Lock()
+ defer bc.chainmu.Unlock()
+ bc.insert(block)
+ return nil
+}
diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go
new file mode 100644
index 0000000..afcbb2b
--- /dev/null
+++ b/core/blockchain_insert.go
@@ -0,0 +1,166 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "time"
+
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/common/mclock"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/log"
+)
+
+// insertStats tracks and reports on block insertion.
+type insertStats struct {
+ queued, processed, ignored int
+ usedGas uint64
+ lastIndex int
+ startTime mclock.AbsTime
+}
+
+// statsReportLimit is the time limit during import and export after which we
+// always print out progress. This avoids the user wondering what's going on.
+const statsReportLimit = 8 * time.Second
+
+// report prints statistics if some number of blocks have been processed
+// or more than a few seconds have passed since the last message.
+func (st *insertStats) report(chain []*types.Block, index int, dirty common.StorageSize) {
+ // Fetch the timings for the batch
+ var (
+ now = mclock.Now()
+ elapsed = time.Duration(now) - time.Duration(st.startTime)
+ )
+ // If we're at the last block of the batch or report period reached, log
+ if index == len(chain)-1 || elapsed >= statsReportLimit {
+ // Count the number of transactions in this segment
+ var txs int
+ for _, block := range chain[st.lastIndex : index+1] {
+ txs += len(block.Transactions())
+ }
+ end := chain[index]
+
+ // Assemble the log context and send it to the logger
+ context := []interface{}{
+ "blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000,
+ "elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed),
+ "number", end.Number(), "hash", end.Hash(),
+ }
+ if timestamp := time.Unix(int64(end.Time()), 0); time.Since(timestamp) > time.Minute {
+ context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
+ }
+ context = append(context, []interface{}{"dirty", dirty}...)
+
+ if st.queued > 0 {
+ context = append(context, []interface{}{"queued", st.queued}...)
+ }
+ if st.ignored > 0 {
+ context = append(context, []interface{}{"ignored", st.ignored}...)
+ }
+ log.Info("Imported new chain segment", context...)
+
+ // Bump the stats reported to the next section
+ *st = insertStats{startTime: now, lastIndex: index + 1}
+ }
+}
+
+// insertIterator is a helper to assist during chain import.
+type insertIterator struct {
+ chain types.Blocks // Chain of blocks being iterated over
+
+ results <-chan error // Verification result sink from the consensus engine
+ errors []error // Header verification errors for the blocks
+
+ index int // Current offset of the iterator
+ validator Validator // Validator to run if verification succeeds
+}
+
+// newInsertIterator creates a new iterator based on the given blocks, which are
+// assumed to be a contiguous chain.
+func newInsertIterator(chain types.Blocks, results <-chan error, validator Validator) *insertIterator {
+ return &insertIterator{
+ chain: chain,
+ results: results,
+ errors: make([]error, 0, len(chain)),
+ index: -1,
+ validator: validator,
+ }
+}
+
+// next returns the next block in the iterator, along with any potential validation
+// error for that block. When the end is reached, it will return (nil, nil).
+func (it *insertIterator) next() (*types.Block, error) {
+ // If we reached the end of the chain, abort
+ if it.index+1 >= len(it.chain) {
+ it.index = len(it.chain)
+ return nil, nil
+ }
+ // Advance the iterator and wait for verification result if not yet done
+ it.index++
+ if len(it.errors) <= it.index {
+ it.errors = append(it.errors, <-it.results)
+ }
+ if it.errors[it.index] != nil {
+ return it.chain[it.index], it.errors[it.index]
+ }
+ // Block header valid, run body validation and return
+ return it.chain[it.index], it.validator.ValidateBody(it.chain[it.index])
+}
+
+// peek returns the next block in the iterator, along with any potential validation
+// error for that block, but does **not** advance the iterator.
+//
+// Both header and body validation errors (nil too) is cached into the iterator
+// to avoid duplicating work on the following next() call.
+func (it *insertIterator) peek() (*types.Block, error) {
+ // If we reached the end of the chain, abort
+ if it.index+1 >= len(it.chain) {
+ return nil, nil
+ }
+ // Wait for verification result if not yet done
+ if len(it.errors) <= it.index+1 {
+ it.errors = append(it.errors, <-it.results)
+ }
+ if it.errors[it.index+1] != nil {
+ return it.chain[it.index+1], it.errors[it.index+1]
+ }
+ // Block header valid, ignore body validation since we don't have a parent anyway
+ return it.chain[it.index+1], nil
+}
+
+// previous returns the previous header that was being processed, or nil.
+func (it *insertIterator) previous() *types.Header {
+ if it.index < 1 {
+ return nil
+ }
+ return it.chain[it.index-1].Header()
+}
+
+// first returns the first block in the it.
+func (it *insertIterator) first() *types.Block {
+ return it.chain[0]
+}
+
+// remaining returns the number of remaining blocks.
+func (it *insertIterator) remaining() int {
+ return len(it.chain) - it.index
+}
+
+// processed returns the number of processed blocks.
+func (it *insertIterator) processed() int {
+ return it.index + 1
+}
diff --git a/core/blocks.go b/core/blocks.go
new file mode 100644
index 0000000..7ad99c4
--- /dev/null
+++ b/core/blocks.go
@@ -0,0 +1,25 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import "github.com/ava-labs/go-ethereum/common"
+
+// BadHashes represent a set of manually tracked bad hashes (usually hard forks)
+var BadHashes = map[common.Hash]bool{
+ common.HexToHash("05bef30ef572270f654746da22639a7a0c97dd97a7050b9e252391996aaeb689"): true,
+ common.HexToHash("7d05d08cbc596a2e5e4f13b80a743e53e09221b5323c3a61946b20873e58583f"): true,
+}
diff --git a/core/chain_indexer.go b/core/chain_indexer.go
new file mode 100644
index 0000000..8389e13
--- /dev/null
+++ b/core/chain_indexer.go
@@ -0,0 +1,512 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "context"
+ "encoding/binary"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/core/rawdb"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/ethdb"
+ "github.com/ava-labs/go-ethereum/event"
+ "github.com/ava-labs/go-ethereum/log"
+)
+
+// ChainIndexerBackend defines the methods needed to process chain segments in
+// the background and write the segment results into the database. These can be
+// used to create filter blooms or CHTs.
+type ChainIndexerBackend interface {
+ // Reset initiates the processing of a new chain segment, potentially terminating
+ // any partially completed operations (in case of a reorg).
+ Reset(ctx context.Context, section uint64, prevHead common.Hash) error
+
+ // Process crunches through the next header in the chain segment. The caller
+ // will ensure a sequential order of headers.
+ Process(ctx context.Context, header *types.Header) error
+
+ // Commit finalizes the section metadata and stores it into the database.
+ Commit() error
+}
+
+// ChainIndexerChain interface is used for connecting the indexer to a blockchain
+type ChainIndexerChain interface {
+ // CurrentHeader retrieves the latest locally known header.
+ CurrentHeader() *types.Header
+
+ // SubscribeChainHeadEvent subscribes to new head header notifications.
+ SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
+}
+
+// ChainIndexer does a post-processing job for equally sized sections of the
+// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is
+// connected to the blockchain through the event system by starting a
+// ChainHeadEventLoop in a goroutine.
+//
+// Further child ChainIndexers can be added which use the output of the parent
+// section indexer. These child indexers receive new head notifications only
+// after an entire section has been finished or in case of rollbacks that might
+// affect already finished sections.
+type ChainIndexer struct {
+ chainDb ethdb.Database // Chain database to index the data from
+ indexDb ethdb.Database // Prefixed table-view of the db to write index metadata into
+ backend ChainIndexerBackend // Background processor generating the index data content
+ children []*ChainIndexer // Child indexers to cascade chain updates to
+
+ active uint32 // Flag whether the event loop was started
+ update chan struct{} // Notification channel that headers should be processed
+ quit chan chan error // Quit channel to tear down running goroutines
+ ctx context.Context
+ ctxCancel func()
+
+ sectionSize uint64 // Number of blocks in a single chain segment to process
+ confirmsReq uint64 // Number of confirmations before processing a completed segment
+
+ storedSections uint64 // Number of sections successfully indexed into the database
+ knownSections uint64 // Number of sections known to be complete (block wise)
+ cascadedHead uint64 // Block number of the last completed section cascaded to subindexers
+
+ checkpointSections uint64 // Number of sections covered by the checkpoint
+ checkpointHead common.Hash // Section head belonging to the checkpoint
+
+ throttling time.Duration // Disk throttling to prevent a heavy upgrade from hogging resources
+
+ log log.Logger
+ lock sync.RWMutex
+}
+
+// NewChainIndexer creates a new chain indexer to do background processing on
+// chain segments of a given size after certain number of confirmations passed.
+// The throttling parameter might be used to prevent database thrashing.
+func NewChainIndexer(chainDb ethdb.Database, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer {
+ c := &ChainIndexer{
+ chainDb: chainDb,
+ indexDb: indexDb,
+ backend: backend,
+ update: make(chan struct{}, 1),
+ quit: make(chan chan error),
+ sectionSize: section,
+ confirmsReq: confirm,
+ throttling: throttling,
+ log: log.New("type", kind),
+ }
+ // Initialize database dependent fields and start the updater
+ c.loadValidSections()
+ c.ctx, c.ctxCancel = context.WithCancel(context.Background())
+
+ go c.updateLoop()
+
+ return c
+}
+
+// AddCheckpoint adds a checkpoint. Sections are never processed and the chain
+// is not expected to be available before this point. The indexer assumes that
+// the backend has sufficient information available to process subsequent sections.
+//
+// Note: knownSections == 0 and storedSections == checkpointSections until
+// syncing reaches the checkpoint
+func (c *ChainIndexer) AddCheckpoint(section uint64, shead common.Hash) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ // Short circuit if the given checkpoint is below than local's.
+ if c.checkpointSections >= section+1 || section < c.storedSections {
+ return
+ }
+ c.checkpointSections = section + 1
+ c.checkpointHead = shead
+
+ c.setSectionHead(section, shead)
+ c.setValidSections(section + 1)
+}
+
+// Start creates a goroutine to feed chain head events into the indexer for
+// cascading background processing. Children do not need to be started, they
+// are notified about new events by their parents.
+func (c *ChainIndexer) Start(chain ChainIndexerChain) {
+ events := make(chan ChainHeadEvent, 10)
+ sub := chain.SubscribeChainHeadEvent(events)
+
+ go c.eventLoop(chain.CurrentHeader(), events, sub)
+}
+
+// Close tears down all goroutines belonging to the indexer and returns any error
+// that might have occurred internally.
+func (c *ChainIndexer) Close() error {
+ var errs []error
+
+ c.ctxCancel()
+
+ // Tear down the primary update loop
+ errc := make(chan error)
+ c.quit <- errc
+ if err := <-errc; err != nil {
+ errs = append(errs, err)
+ }
+ // If needed, tear down the secondary event loop
+ if atomic.LoadUint32(&c.active) != 0 {
+ c.quit <- errc
+ if err := <-errc; err != nil {
+ errs = append(errs, err)
+ }
+ }
+ // Close all children
+ for _, child := range c.children {
+ if err := child.Close(); err != nil {
+ errs = append(errs, err)
+ }
+ }
+ // Return any failures
+ switch {
+ case len(errs) == 0:
+ return nil
+
+ case len(errs) == 1:
+ return errs[0]
+
+ default:
+ return fmt.Errorf("%v", errs)
+ }
+}
+
+// eventLoop is a secondary - optional - event loop of the indexer which is only
+// started for the outermost indexer to push chain head events into a processing
+// queue.
+func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainHeadEvent, sub event.Subscription) {
+ // Mark the chain indexer as active, requiring an additional teardown
+ atomic.StoreUint32(&c.active, 1)
+
+ defer sub.Unsubscribe()
+
+ // Fire the initial new head event to start any outstanding processing
+ c.newHead(currentHeader.Number.Uint64(), false)
+
+ var (
+ prevHeader = currentHeader
+ prevHash = currentHeader.Hash()
+ )
+ for {
+ select {
+ case errc := <-c.quit:
+ // Chain indexer terminating, report no failure and abort
+ errc <- nil
+ return
+
+ case ev, ok := <-events:
+ // Received a new event, ensure it's not nil (closing) and update
+ if !ok {
+ errc := <-c.quit
+ errc <- nil
+ return
+ }
+ header := ev.Block.Header()
+ if header.ParentHash != prevHash {
+ // Reorg to the common ancestor if needed (might not exist in light sync mode, skip reorg then)
+ // TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?
+
+ if rawdb.ReadCanonicalHash(c.chainDb, prevHeader.Number.Uint64()) != prevHash {
+ if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
+ c.newHead(h.Number.Uint64(), true)
+ }
+ }
+ }
+ c.newHead(header.Number.Uint64(), false)
+
+ prevHeader, prevHash = header, header.Hash()
+ }
+ }
+}
+
+// newHead notifies the indexer about new chain heads and/or reorgs.
+func (c *ChainIndexer) newHead(head uint64, reorg bool) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ // If a reorg happened, invalidate all sections until that point
+ if reorg {
+ // Revert the known section number to the reorg point
+ known := (head + 1) / c.sectionSize
+ stored := known
+ if known < c.checkpointSections {
+ known = 0
+ }
+ if stored < c.checkpointSections {
+ stored = c.checkpointSections
+ }
+ if known < c.knownSections {
+ c.knownSections = known
+ }
+ // Revert the stored sections from the database to the reorg point
+ if stored < c.storedSections {
+ c.setValidSections(stored)
+ }
+ // Update the new head number to the finalized section end and notify children
+ head = known * c.sectionSize
+
+ if head < c.cascadedHead {
+ c.cascadedHead = head
+ for _, child := range c.children {
+ child.newHead(c.cascadedHead, true)
+ }
+ }
+ return
+ }
+ // No reorg, calculate the number of newly known sections and update if high enough
+ var sections uint64
+ if head >= c.confirmsReq {
+ sections = (head + 1 - c.confirmsReq) / c.sectionSize
+ if sections < c.checkpointSections {
+ sections = 0
+ }
+ if sections > c.knownSections {
+ if c.knownSections < c.checkpointSections {
+ // syncing reached the checkpoint, verify section head
+ syncedHead := rawdb.ReadCanonicalHash(c.chainDb, c.checkpointSections*c.sectionSize-1)
+ if syncedHead != c.checkpointHead {
+ c.log.Error("Synced chain does not match checkpoint", "number", c.checkpointSections*c.sectionSize-1, "expected", c.checkpointHead, "synced", syncedHead)
+ return
+ }
+ }
+ c.knownSections = sections
+
+ select {
+ case c.update <- struct{}{}:
+ default:
+ }
+ }
+ }
+}
+
+// updateLoop is the main event loop of the indexer which pushes chain segments
+// down into the processing backend.
+func (c *ChainIndexer) updateLoop() {
+ var (
+ updating bool
+ updated time.Time
+ )
+
+ for {
+ select {
+ case errc := <-c.quit:
+ // Chain indexer terminating, report no failure and abort
+ errc <- nil
+ return
+
+ case <-c.update:
+ // Section headers completed (or rolled back), update the index
+ c.lock.Lock()
+ if c.knownSections > c.storedSections {
+ // Periodically print an upgrade log message to the user
+ if time.Since(updated) > 8*time.Second {
+ if c.knownSections > c.storedSections+1 {
+ updating = true
+ c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
+ }
+ updated = time.Now()
+ }
+ // Cache the current section count and head to allow unlocking the mutex
+ c.verifyLastHead()
+ section := c.storedSections
+ var oldHead common.Hash
+ if section > 0 {
+ oldHead = c.SectionHead(section - 1)
+ }
+ // Process the newly defined section in the background
+ c.lock.Unlock()
+ newHead, err := c.processSection(section, oldHead)
+ if err != nil {
+ select {
+ case <-c.ctx.Done():
+ <-c.quit <- nil
+ return
+ default:
+ }
+ c.log.Error("Section processing failed", "error", err)
+ }
+ c.lock.Lock()
+
+ // If processing succeeded and no reorgs occurred, mark the section completed
+ if err == nil && (section == 0 || oldHead == c.SectionHead(section-1)) {
+ c.setSectionHead(section, newHead)
+ c.setValidSections(section + 1)
+ if c.storedSections == c.knownSections && updating {
+ updating = false
+ c.log.Info("Finished upgrading chain index")
+ }
+ c.cascadedHead = c.storedSections*c.sectionSize - 1
+ for _, child := range c.children {
+ c.log.Trace("Cascading chain index update", "head", c.cascadedHead)
+ child.newHead(c.cascadedHead, false)
+ }
+ } else {
+ // If processing failed, don't retry until further notification
+ c.log.Debug("Chain index processing failed", "section", section, "err", err)
+ c.verifyLastHead()
+ c.knownSections = c.storedSections
+ }
+ }
+ // If there are still further sections to process, reschedule
+ if c.knownSections > c.storedSections {
+ time.AfterFunc(c.throttling, func() {
+ select {
+ case c.update <- struct{}{}:
+ default:
+ }
+ })
+ }
+ c.lock.Unlock()
+ }
+ }
+}
+
+// processSection processes an entire section by calling backend functions while
+// ensuring the continuity of the passed headers. Since the chain mutex is not
+// held while processing, the continuity can be broken by a long reorg, in which
+// case the function returns with an error.
+func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) {
+ c.log.Trace("Processing new chain section", "section", section)
+
+ // Reset and partial processing
+
+ if err := c.backend.Reset(c.ctx, section, lastHead); err != nil {
+ c.setValidSections(0)
+ return common.Hash{}, err
+ }
+
+ for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
+ hash := rawdb.ReadCanonicalHash(c.chainDb, number)
+ if hash == (common.Hash{}) {
+ return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number)
+ }
+ header := rawdb.ReadHeader(c.chainDb, hash, number)
+ if header == nil {
+ return common.Hash{}, fmt.Errorf("block #%d [%x…] not found", number, hash[:4])
+ } else if header.ParentHash != lastHead {
+ return common.Hash{}, fmt.Errorf("chain reorged during section processing")
+ }
+ if err := c.backend.Process(c.ctx, header); err != nil {
+ return common.Hash{}, err
+ }
+ lastHead = header.Hash()
+ }
+ if err := c.backend.Commit(); err != nil {
+ return common.Hash{}, err
+ }
+ return lastHead, nil
+}
+
+// verifyLastHead compares last stored section head with the corresponding block hash in the
+// actual canonical chain and rolls back reorged sections if necessary to ensure that stored
+// sections are all valid
+func (c *ChainIndexer) verifyLastHead() {
+ for c.storedSections > 0 && c.storedSections > c.checkpointSections {
+ if c.SectionHead(c.storedSections-1) == rawdb.ReadCanonicalHash(c.chainDb, c.storedSections*c.sectionSize-1) {
+ return
+ }
+ c.setValidSections(c.storedSections - 1)
+ }
+}
+
+// Sections returns the number of processed sections maintained by the indexer
+// and also the information about the last header indexed for potential canonical
+// verifications.
+func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ c.verifyLastHead()
+ return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1)
+}
+
+// AddChildIndexer adds a child ChainIndexer that can use the output of this one
+func (c *ChainIndexer) AddChildIndexer(indexer *ChainIndexer) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ c.children = append(c.children, indexer)
+
+ // Cascade any pending updates to new children too
+ sections := c.storedSections
+ if c.knownSections < sections {
+ // if a section is "stored" but not "known" then it is a checkpoint without
+ // available chain data so we should not cascade it yet
+ sections = c.knownSections
+ }
+ if sections > 0 {
+ indexer.newHead(sections*c.sectionSize-1, false)
+ }
+}
+
+// loadValidSections reads the number of valid sections from the index database
+// and caches is into the local state.
+func (c *ChainIndexer) loadValidSections() {
+ data, _ := c.indexDb.Get([]byte("count"))
+ if len(data) == 8 {
+ c.storedSections = binary.BigEndian.Uint64(data)
+ }
+}
+
+// setValidSections writes the number of valid sections to the index database
+func (c *ChainIndexer) setValidSections(sections uint64) {
+ // Set the current number of valid sections in the database
+ var data [8]byte
+ binary.BigEndian.PutUint64(data[:], sections)
+ c.indexDb.Put([]byte("count"), data[:])
+
+ // Remove any reorged sections, caching the valids in the mean time
+ for c.storedSections > sections {
+ c.storedSections--
+ c.removeSectionHead(c.storedSections)
+ }
+ c.storedSections = sections // needed if new > old
+}
+
+// SectionHead retrieves the last block hash of a processed section from the
+// index database.
+func (c *ChainIndexer) SectionHead(section uint64) common.Hash {
+ var data [8]byte
+ binary.BigEndian.PutUint64(data[:], section)
+
+ hash, _ := c.indexDb.Get(append([]byte("shead"), data[:]...))
+ if len(hash) == len(common.Hash{}) {
+ return common.BytesToHash(hash)
+ }
+ return common.Hash{}
+}
+
+// setSectionHead writes the last block hash of a processed section to the index
+// database.
+func (c *ChainIndexer) setSectionHead(section uint64, hash common.Hash) {
+ var data [8]byte
+ binary.BigEndian.PutUint64(data[:], section)
+
+ c.indexDb.Put(append([]byte("shead"), data[:]...), hash.Bytes())
+}
+
+// removeSectionHead removes the reference to a processed section from the index
+// database.
+func (c *ChainIndexer) removeSectionHead(section uint64) {
+ var data [8]byte
+ binary.BigEndian.PutUint64(data[:], section)
+
+ c.indexDb.Delete(append([]byte("shead"), data[:]...))
+}
diff --git a/core/error.go b/core/error.go
new file mode 100644
index 0000000..cd4be3d
--- /dev/null
+++ b/core/error.go
@@ -0,0 +1,38 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import "errors"
+
+var (
+ // ErrKnownBlock is returned when a block to import is already known locally.
+ ErrKnownBlock = errors.New("block already known")
+
+ // ErrGasLimitReached is returned by the gas pool if the amount of gas required
+ // by a transaction is higher than what's left in the block.
+ ErrGasLimitReached = errors.New("gas limit reached")
+
+ // ErrBlacklistedHash is returned if a block to import is on the blacklist.
+ ErrBlacklistedHash = errors.New("blacklisted hash")
+
+ // ErrNonceTooHigh is returned if the nonce of a transaction is higher than the
+ // next one expected based on the local chain.
+ ErrNonceTooHigh = errors.New("nonce too high")
+
+ // ErrNoGenesis is returned when there is no Genesis Block.
+ ErrNoGenesis = errors.New("genesis not found in chain")
+)
diff --git a/core/events.go b/core/events.go
new file mode 100644
index 0000000..09e9180
--- /dev/null
+++ b/core/events.go
@@ -0,0 +1,48 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/core/types"
+)
+
+// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
+type NewTxsEvent struct{ Txs []*types.Transaction }
+
+// PendingLogsEvent is posted pre mining and notifies of pending logs.
+type PendingLogsEvent struct {
+ Logs []*types.Log
+}
+
+// NewMinedBlockEvent is posted when a block has been imported.
+type NewMinedBlockEvent struct{ Block *types.Block }
+
+// RemovedLogsEvent is posted when a reorg happens
+type RemovedLogsEvent struct{ Logs []*types.Log }
+
+type ChainEvent struct {
+ Block *types.Block
+ Hash common.Hash
+ Logs []*types.Log
+}
+
+type ChainSideEvent struct {
+ Block *types.Block
+}
+
+type ChainHeadEvent struct{ Block *types.Block }
diff --git a/core/evm.go b/core/evm.go
new file mode 100644
index 0000000..5c9d178
--- /dev/null
+++ b/core/evm.go
@@ -0,0 +1,97 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "math/big"
+
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/consensus"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/core/vm"
+)
+
+// ChainContext supports retrieving headers and consensus parameters from the
+// current blockchain to be used during transaction processing.
+type ChainContext interface {
+ // Engine retrieves the chain's consensus engine.
+ Engine() consensus.Engine
+
+ // GetHeader returns the hash corresponding to their hash.
+ GetHeader(common.Hash, uint64) *types.Header
+}
+
+// NewEVMContext creates a new context for use in the EVM.
+func NewEVMContext(msg Message, header *types.Header, chain ChainContext, author *common.Address) vm.Context {
+ // If we don't have an explicit author (i.e. not mining), extract from the header
+ var beneficiary common.Address
+ if author == nil {
+ beneficiary, _ = chain.Engine().Author(header) // Ignore error, we're past header validation
+ } else {
+ beneficiary = *author
+ }
+ return vm.Context{
+ CanTransfer: CanTransfer,
+ Transfer: Transfer,
+ GetHash: GetHashFn(header, chain),
+ Origin: msg.From(),
+ Coinbase: beneficiary,
+ BlockNumber: new(big.Int).Set(header.Number),
+ Time: new(big.Int).SetUint64(header.Time),
+ Difficulty: new(big.Int).Set(header.Difficulty),
+ GasLimit: header.GasLimit,
+ GasPrice: new(big.Int).Set(msg.GasPrice()),
+ }
+}
+
+// GetHashFn returns a GetHashFunc which retrieves header hashes by number
+func GetHashFn(ref *types.Header, chain ChainContext) func(n uint64) common.Hash {
+ var cache map[uint64]common.Hash
+
+ return func(n uint64) common.Hash {
+ // If there's no hash cache yet, make one
+ if cache == nil {
+ cache = map[uint64]common.Hash{
+ ref.Number.Uint64() - 1: ref.ParentHash,
+ }
+ }
+ // Try to fulfill the request from the cache
+ if hash, ok := cache[n]; ok {
+ return hash
+ }
+ // Not cached, iterate the blocks and cache the hashes
+ for header := chain.GetHeader(ref.ParentHash, ref.Number.Uint64()-1); header != nil; header = chain.GetHeader(header.ParentHash, header.Number.Uint64()-1) {
+ cache[header.Number.Uint64()-1] = header.ParentHash
+ if n == header.Number.Uint64()-1 {
+ return header.ParentHash
+ }
+ }
+ return common.Hash{}
+ }
+}
+
+// CanTransfer checks whether there are enough funds in the address' account to make a transfer.
+// This does not take the necessary gas in to account to make the transfer valid.
+func CanTransfer(db vm.StateDB, addr common.Address, amount *big.Int) bool {
+ return db.GetBalance(addr).Cmp(amount) >= 0
+}
+
+// Transfer subtracts amount from sender and adds amount to recipient using the given Db
+func Transfer(db vm.StateDB, sender, recipient common.Address, amount *big.Int) {
+ db.SubBalance(sender, amount)
+ db.AddBalance(recipient, amount)
+}
diff --git a/core/gaspool.go b/core/gaspool.go
new file mode 100644
index 0000000..e3795c1
--- /dev/null
+++ b/core/gaspool.go
@@ -0,0 +1,54 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "fmt"
+ "math"
+)
+
+// GasPool tracks the amount of gas available during execution of the transactions
+// in a block. The zero value is a pool with zero gas available.
+type GasPool uint64
+
+// AddGas makes gas available for execution.
+func (gp *GasPool) AddGas(amount uint64) *GasPool {
+ if uint64(*gp) > math.MaxUint64-amount {
+ panic("gas pool pushed above uint64")
+ }
+ *(*uint64)(gp) += amount
+ return gp
+}
+
+// SubGas deducts the given amount from the pool if enough gas is
+// available and returns an error otherwise.
+func (gp *GasPool) SubGas(amount uint64) error {
+ if uint64(*gp) < amount {
+ return ErrGasLimitReached
+ }
+ *(*uint64)(gp) -= amount
+ return nil
+}
+
+// Gas returns the amount of gas remaining in the pool.
+func (gp *GasPool) Gas() uint64 {
+ return uint64(*gp)
+}
+
+func (gp *GasPool) String() string {
+ return fmt.Sprintf("%d", *gp)
+}
diff --git a/core/headerchain.go b/core/headerchain.go
new file mode 100644
index 0000000..2a64763
--- /dev/null
+++ b/core/headerchain.go
@@ -0,0 +1,538 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ crand "crypto/rand"
+ "errors"
+ "fmt"
+ "math"
+ "math/big"
+ mrand "math/rand"
+ "sync/atomic"
+ "time"
+
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/consensus"
+ "github.com/ava-labs/go-ethereum/core/rawdb"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/ethdb"
+ "github.com/ava-labs/go-ethereum/log"
+ "github.com/ava-labs/go-ethereum/params"
+ lru "github.com/hashicorp/golang-lru"
+)
+
+const (
+ headerCacheLimit = 512
+ tdCacheLimit = 1024
+ numberCacheLimit = 2048
+)
+
+// HeaderChain implements the basic block header chain logic that is shared by
+// core.BlockChain and light.LightChain. It is not usable in itself, only as
+// a part of either structure.
+// It is not thread safe either, the encapsulating chain structures should do
+// the necessary mutex locking/unlocking.
+type HeaderChain struct {
+ config *params.ChainConfig
+
+ chainDb ethdb.Database
+ genesisHeader *types.Header
+
+ currentHeader atomic.Value // Current head of the header chain (may be above the block chain!)
+ currentHeaderHash common.Hash // Hash of the current head of the header chain (prevent recomputing all the time)
+
+ headerCache *lru.Cache // Cache for the most recent block headers
+ tdCache *lru.Cache // Cache for the most recent block total difficulties
+ numberCache *lru.Cache // Cache for the most recent block numbers
+
+ procInterrupt func() bool
+
+ rand *mrand.Rand
+ engine consensus.Engine
+}
+
+// NewHeaderChain creates a new HeaderChain structure.
+// getValidator should return the parent's validator
+// procInterrupt points to the parent's interrupt semaphore
+// wg points to the parent's shutdown wait group
+func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, procInterrupt func() bool) (*HeaderChain, error) {
+ headerCache, _ := lru.New(headerCacheLimit)
+ tdCache, _ := lru.New(tdCacheLimit)
+ numberCache, _ := lru.New(numberCacheLimit)
+
+ // Seed a fast but crypto originating random generator
+ seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64))
+ if err != nil {
+ return nil, err
+ }
+
+ hc := &HeaderChain{
+ config: config,
+ chainDb: chainDb,
+ headerCache: headerCache,
+ tdCache: tdCache,
+ numberCache: numberCache,
+ procInterrupt: procInterrupt,
+ rand: mrand.New(mrand.NewSource(seed.Int64())),
+ engine: engine,
+ }
+
+ hc.genesisHeader = hc.GetHeaderByNumber(0)
+ if hc.genesisHeader == nil {
+ return nil, ErrNoGenesis
+ }
+
+ hc.currentHeader.Store(hc.genesisHeader)
+ if head := rawdb.ReadHeadBlockHash(chainDb); head != (common.Hash{}) {
+ if chead := hc.GetHeaderByHash(head); chead != nil {
+ hc.currentHeader.Store(chead)
+ }
+ }
+ hc.currentHeaderHash = hc.CurrentHeader().Hash()
+ headHeaderGauge.Update(hc.CurrentHeader().Number.Int64())
+
+ return hc, nil
+}
+
+// GetBlockNumber retrieves the block number belonging to the given hash
+// from the cache or database
+func (hc *HeaderChain) GetBlockNumber(hash common.Hash) *uint64 {
+ if cached, ok := hc.numberCache.Get(hash); ok {
+ number := cached.(uint64)
+ return &number
+ }
+ number := rawdb.ReadHeaderNumber(hc.chainDb, hash)
+ if number != nil {
+ hc.numberCache.Add(hash, *number)
+ }
+ return number
+}
+
+// WriteHeader writes a header into the local chain, given that its parent is
+// already known. If the total difficulty of the newly inserted header becomes
+// greater than the current known TD, the canonical chain is re-routed.
+//
+// Note: This method is not concurrent-safe with inserting blocks simultaneously
+// into the chain, as side effects caused by reorganisations cannot be emulated
+// without the real blocks. Hence, writing headers directly should only be done
+// in two scenarios: pure-header mode of operation (light clients), or properly
+// separated header/block phases (non-archive clients).
+func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, err error) {
+ // Cache some values to prevent constant recalculation
+ var (
+ hash = header.Hash()
+ number = header.Number.Uint64()
+ )
+ // Calculate the total difficulty of the header
+ ptd := hc.GetTd(header.ParentHash, number-1)
+ if ptd == nil {
+ return NonStatTy, consensus.ErrUnknownAncestor
+ }
+ localTd := hc.GetTd(hc.currentHeaderHash, hc.CurrentHeader().Number.Uint64())
+ externTd := new(big.Int).Add(header.Difficulty, ptd)
+
+ // Irrelevant of the canonical status, write the td and header to the database
+ if err := hc.WriteTd(hash, number, externTd); err != nil {
+ log.Crit("Failed to write header total difficulty", "err", err)
+ }
+ rawdb.WriteHeader(hc.chainDb, header)
+
+ // If the total difficulty is higher than our known, add it to the canonical chain
+ // Second clause in the if statement reduces the vulnerability to selfish mining.
+ // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
+ if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) {
+ // Delete any canonical number assignments above the new head
+ batch := hc.chainDb.NewBatch()
+ for i := number + 1; ; i++ {
+ hash := rawdb.ReadCanonicalHash(hc.chainDb, i)
+ if hash == (common.Hash{}) {
+ break
+ }
+ rawdb.DeleteCanonicalHash(batch, i)
+ }
+ batch.Write()
+
+ // Overwrite any stale canonical number assignments
+ var (
+ headHash = header.ParentHash
+ headNumber = header.Number.Uint64() - 1
+ headHeader = hc.GetHeader(headHash, headNumber)
+ )
+ for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash {
+ rawdb.WriteCanonicalHash(hc.chainDb, headHash, headNumber)
+
+ headHash = headHeader.ParentHash
+ headNumber = headHeader.Number.Uint64() - 1
+ headHeader = hc.GetHeader(headHash, headNumber)
+ }
+ // Extend the canonical chain with the new header
+ rawdb.WriteCanonicalHash(hc.chainDb, hash, number)
+ rawdb.WriteHeadHeaderHash(hc.chainDb, hash)
+
+ hc.currentHeaderHash = hash
+ hc.currentHeader.Store(types.CopyHeader(header))
+ headHeaderGauge.Update(header.Number.Int64())
+
+ status = CanonStatTy
+ } else {
+ status = SideStatTy
+ }
+ hc.headerCache.Add(hash, header)
+ hc.numberCache.Add(hash, number)
+
+ return
+}
+
+// WhCallback is a callback function for inserting individual headers.
+// A callback is used for two reasons: first, in a LightChain, status should be
+// processed and light chain events sent, while in a BlockChain this is not
+// necessary since chain events are sent after inserting blocks. Second, the
+// header writes should be protected by the parent chain mutex individually.
+type WhCallback func(*types.Header) error
+
+func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
+ // Do a sanity check that the provided chain is actually ordered and linked
+ for i := 1; i < len(chain); i++ {
+ if chain[i].Number.Uint64() != chain[i-1].Number.Uint64()+1 || chain[i].ParentHash != chain[i-1].Hash() {
+ // Chain broke ancestry, log a message (programming error) and skip insertion
+ log.Error("Non contiguous header insert", "number", chain[i].Number, "hash", chain[i].Hash(),
+ "parent", chain[i].ParentHash, "prevnumber", chain[i-1].Number, "prevhash", chain[i-1].Hash())
+
+ return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].Number,
+ chain[i-1].Hash().Bytes()[:4], i, chain[i].Number, chain[i].Hash().Bytes()[:4], chain[i].ParentHash[:4])
+ }
+ }
+
+ // Generate the list of seal verification requests, and start the parallel verifier
+ seals := make([]bool, len(chain))
+ if checkFreq != 0 {
+ // In case of checkFreq == 0 all seals are left false.
+ for i := 0; i < len(seals)/checkFreq; i++ {
+ index := i*checkFreq + hc.rand.Intn(checkFreq)
+ if index >= len(seals) {
+ index = len(seals) - 1
+ }
+ seals[index] = true
+ }
+ // Last should always be verified to avoid junk.
+ seals[len(seals)-1] = true
+ }
+
+ abort, results := hc.engine.VerifyHeaders(hc, chain, seals)
+ defer close(abort)
+
+ // Iterate over the headers and ensure they all check out
+ for i, header := range chain {
+ // If the chain is terminating, stop processing blocks
+ if hc.procInterrupt() {
+ log.Debug("Premature abort during headers verification")
+ return 0, errors.New("aborted")
+ }
+ // If the header is a banned one, straight out abort
+ if BadHashes[header.Hash()] {
+ return i, ErrBlacklistedHash
+ }
+ // Otherwise wait for headers checks and ensure they pass
+ if err := <-results; err != nil {
+ return i, err
+ }
+ }
+
+ return 0, nil
+}
+
+// InsertHeaderChain attempts to insert the given header chain in to the local
+// chain, possibly creating a reorg. If an error is returned, it will return the
+// index number of the failing header as well an error describing what went wrong.
+//
+// The verify parameter can be used to fine tune whether nonce verification
+// should be done or not. The reason behind the optional check is because some
+// of the header retrieval mechanisms already need to verfy nonces, as well as
+// because nonces can be verified sparsely, not needing to check each.
+func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, writeHeader WhCallback, start time.Time) (int, error) {
+ // Collect some import statistics to report on
+ stats := struct{ processed, ignored int }{}
+ // All headers passed verification, import them into the database
+ for i, header := range chain {
+ // Short circuit insertion if shutting down
+ if hc.procInterrupt() {
+ log.Debug("Premature abort during headers import")
+ return i, errors.New("aborted")
+ }
+ // If the header's already known, skip it, otherwise store
+ hash := header.Hash()
+ if hc.HasHeader(hash, header.Number.Uint64()) {
+ externTd := hc.GetTd(hash, header.Number.Uint64())
+ localTd := hc.GetTd(hc.currentHeaderHash, hc.CurrentHeader().Number.Uint64())
+ if externTd == nil || externTd.Cmp(localTd) <= 0 {
+ stats.ignored++
+ continue
+ }
+ }
+ if err := writeHeader(header); err != nil {
+ return i, err
+ }
+ stats.processed++
+ }
+ // Report some public statistics so the user has a clue what's going on
+ last := chain[len(chain)-1]
+
+ context := []interface{}{
+ "count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
+ "number", last.Number, "hash", last.Hash(),
+ }
+ if timestamp := time.Unix(int64(last.Time), 0); time.Since(timestamp) > time.Minute {
+ context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
+ }
+ if stats.ignored > 0 {
+ context = append(context, []interface{}{"ignored", stats.ignored}...)
+ }
+ log.Info("Imported new block headers", context...)
+
+ return 0, nil
+}
+
+// GetBlockHashesFromHash retrieves a number of block hashes starting at a given
+// hash, fetching towards the genesis block.
+func (hc *HeaderChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash {
+ // Get the origin header from which to fetch
+ header := hc.GetHeaderByHash(hash)
+ if header == nil {
+ return nil
+ }
+ // Iterate the headers until enough is collected or the genesis reached
+ chain := make([]common.Hash, 0, max)
+ for i := uint64(0); i < max; i++ {
+ next := header.ParentHash
+ if header = hc.GetHeader(next, header.Number.Uint64()-1); header == nil {
+ break
+ }
+ chain = append(chain, next)
+ if header.Number.Sign() == 0 {
+ break
+ }
+ }
+ return chain
+}
+
+// GetAncestor retrieves the Nth ancestor of a given block. It assumes that either the given block or
+// a close ancestor of it is canonical. maxNonCanonical points to a downwards counter limiting the
+// number of blocks to be individually checked before we reach the canonical chain.
+//
+// Note: ancestor == 0 returns the same block, 1 returns its parent and so on.
+func (hc *HeaderChain) GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) {
+ if ancestor > number {
+ return common.Hash{}, 0
+ }
+ if ancestor == 1 {
+ // in this case it is cheaper to just read the header
+ if header := hc.GetHeader(hash, number); header != nil {
+ return header.ParentHash, number - 1
+ } else {
+ return common.Hash{}, 0
+ }
+ }
+ for ancestor != 0 {
+ if rawdb.ReadCanonicalHash(hc.chainDb, number) == hash {
+ number -= ancestor
+ return rawdb.ReadCanonicalHash(hc.chainDb, number), number
+ }
+ if *maxNonCanonical == 0 {
+ return common.Hash{}, 0
+ }
+ *maxNonCanonical--
+ ancestor--
+ header := hc.GetHeader(hash, number)
+ if header == nil {
+ return common.Hash{}, 0
+ }
+ hash = header.ParentHash
+ number--
+ }
+ return hash, number
+}
+
+// GetTd retrieves a block's total difficulty in the canonical chain from the
+// database by hash and number, caching it if found.
+func (hc *HeaderChain) GetTd(hash common.Hash, number uint64) *big.Int {
+ // Short circuit if the td's already in the cache, retrieve otherwise
+ if cached, ok := hc.tdCache.Get(hash); ok {
+ return cached.(*big.Int)
+ }
+ td := rawdb.ReadTd(hc.chainDb, hash, number)
+ if td == nil {
+ return nil
+ }
+ // Cache the found body for next time and return
+ hc.tdCache.Add(hash, td)
+ return td
+}
+
+// GetTdByHash retrieves a block's total difficulty in the canonical chain from the
+// database by hash, caching it if found.
+func (hc *HeaderChain) GetTdByHash(hash common.Hash) *big.Int {
+ number := hc.GetBlockNumber(hash)
+ if number == nil {
+ return nil
+ }
+ return hc.GetTd(hash, *number)
+}
+
+// WriteTd stores a block's total difficulty into the database, also caching it
+// along the way.
+func (hc *HeaderChain) WriteTd(hash common.Hash, number uint64, td *big.Int) error {
+ rawdb.WriteTd(hc.chainDb, hash, number, td)
+ hc.tdCache.Add(hash, new(big.Int).Set(td))
+ return nil
+}
+
+// GetHeader retrieves a block header from the database by hash and number,
+// caching it if found.
+func (hc *HeaderChain) GetHeader(hash common.Hash, number uint64) *types.Header {
+ // Short circuit if the header's already in the cache, retrieve otherwise
+ if header, ok := hc.headerCache.Get(hash); ok {
+ return header.(*types.Header)
+ }
+ header := rawdb.ReadHeader(hc.chainDb, hash, number)
+ if header == nil {
+ return nil
+ }
+ // Cache the found header for next time and return
+ hc.headerCache.Add(hash, header)
+ return header
+}
+
+// GetHeaderByHash retrieves a block header from the database by hash, caching it if
+// found.
+func (hc *HeaderChain) GetHeaderByHash(hash common.Hash) *types.Header {
+ number := hc.GetBlockNumber(hash)
+ if number == nil {
+ return nil
+ }
+ return hc.GetHeader(hash, *number)
+}
+
+// HasHeader checks if a block header is present in the database or not.
+func (hc *HeaderChain) HasHeader(hash common.Hash, number uint64) bool {
+ if hc.numberCache.Contains(hash) || hc.headerCache.Contains(hash) {
+ return true
+ }
+ return rawdb.HasHeader(hc.chainDb, hash, number)
+}
+
+// GetHeaderByNumber retrieves a block header from the database by number,
+// caching it (associated with its hash) if found.
+func (hc *HeaderChain) GetHeaderByNumber(number uint64) *types.Header {
+ hash := rawdb.ReadCanonicalHash(hc.chainDb, number)
+ if hash == (common.Hash{}) {
+ return nil
+ }
+ return hc.GetHeader(hash, number)
+}
+
+// CurrentHeader retrieves the current head header of the canonical chain. The
+// header is retrieved from the HeaderChain's internal cache.
+func (hc *HeaderChain) CurrentHeader() *types.Header {
+ return hc.currentHeader.Load().(*types.Header)
+}
+
+// SetCurrentHeader sets the current head header of the canonical chain.
+func (hc *HeaderChain) SetCurrentHeader(head *types.Header) {
+ rawdb.WriteHeadHeaderHash(hc.chainDb, head.Hash())
+
+ hc.currentHeader.Store(head)
+ hc.currentHeaderHash = head.Hash()
+ headHeaderGauge.Update(head.Number.Int64())
+}
+
+type (
+ // UpdateHeadBlocksCallback is a callback function that is called by SetHead
+ // before head header is updated.
+ UpdateHeadBlocksCallback func(ethdb.KeyValueWriter, *types.Header)
+
+ // DeleteBlockContentCallback is a callback function that is called by SetHead
+ // before each header is deleted.
+ DeleteBlockContentCallback func(ethdb.KeyValueWriter, common.Hash, uint64)
+)
+
+// SetHead rewinds the local chain to a new head. Everything above the new head
+// will be deleted and the new one set.
+func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, delFn DeleteBlockContentCallback) {
+ var (
+ parentHash common.Hash
+ batch = hc.chainDb.NewBatch()
+ )
+ for hdr := hc.CurrentHeader(); hdr != nil && hdr.Number.Uint64() > head; hdr = hc.CurrentHeader() {
+ hash, num := hdr.Hash(), hdr.Number.Uint64()
+
+ // Rewind block chain to new head.
+ parent := hc.GetHeader(hdr.ParentHash, num-1)
+ if parent == nil {
+ parent = hc.genesisHeader
+ }
+ parentHash = hdr.ParentHash
+ // Notably, since geth has the possibility for setting the head to a low
+ // height which is even lower than ancient head.
+ // In order to ensure that the head is always no higher than the data in
+ // the database(ancient store or active store), we need to update head
+ // first then remove the relative data from the database.
+ //
+ // Update head first(head fast block, head full block) before deleting the data.
+ if updateFn != nil {
+ updateFn(hc.chainDb, parent)
+ }
+ // Update head header then.
+ rawdb.WriteHeadHeaderHash(hc.chainDb, parentHash)
+
+ // Remove the relative data from the database.
+ if delFn != nil {
+ delFn(batch, hash, num)
+ }
+ // Rewind header chain to new head.
+ rawdb.DeleteHeader(batch, hash, num)
+ rawdb.DeleteTd(batch, hash, num)
+ rawdb.DeleteCanonicalHash(batch, num)
+
+ hc.currentHeader.Store(parent)
+ hc.currentHeaderHash = parentHash
+ headHeaderGauge.Update(parent.Number.Int64())
+ }
+ batch.Write()
+
+ // Clear out any stale content from the caches
+ hc.headerCache.Purge()
+ hc.tdCache.Purge()
+ hc.numberCache.Purge()
+}
+
+// SetGenesis sets a new genesis block header for the chain
+func (hc *HeaderChain) SetGenesis(head *types.Header) {
+ hc.genesisHeader = head
+}
+
+// Config retrieves the header chain's chain configuration.
+func (hc *HeaderChain) Config() *params.ChainConfig { return hc.config }
+
+// Engine retrieves the header chain's consensus engine.
+func (hc *HeaderChain) Engine() consensus.Engine { return hc.engine }
+
+// GetBlock implements consensus.ChainReader, and returns nil for every input as
+// a header chain does not have blocks available for retrieval.
+func (hc *HeaderChain) GetBlock(hash common.Hash, number uint64) *types.Block {
+ return nil
+}
diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go
new file mode 100644
index 0000000..f408098
--- /dev/null
+++ b/core/state_prefetcher.go
@@ -0,0 +1,85 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "sync/atomic"
+
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/consensus"
+ "github.com/ava-labs/go-ethereum/core/state"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/core/vm"
+ "github.com/ava-labs/go-ethereum/params"
+)
+
+// statePrefetcher is a basic Prefetcher, which blindly executes a block on top
+// of an arbitrary state with the goal of prefetching potentially useful state
+// data from disk before the main block processor start executing.
+type statePrefetcher struct {
+ config *params.ChainConfig // Chain configuration options
+ bc *BlockChain // Canonical block chain
+ engine consensus.Engine // Consensus engine used for block rewards
+}
+
+// newStatePrefetcher initialises a new statePrefetcher.
+func newStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *statePrefetcher {
+ return &statePrefetcher{
+ config: config,
+ bc: bc,
+ engine: engine,
+ }
+}
+
+// Prefetch processes the state changes according to the Ethereum rules by running
+// the transaction messages using the statedb, but any changes are discarded. The
+// only goal is to pre-cache transaction signatures and state trie nodes.
+func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) {
+ var (
+ header = block.Header()
+ gaspool = new(GasPool).AddGas(block.GasLimit())
+ )
+ // Iterate over and process the individual transactions
+ for i, tx := range block.Transactions() {
+ // If block precaching was interrupted, abort
+ if interrupt != nil && atomic.LoadUint32(interrupt) == 1 {
+ return
+ }
+ // Block precaching permitted to continue, execute the transaction
+ statedb.Prepare(tx.Hash(), block.Hash(), i)
+ if err := precacheTransaction(p.config, p.bc, nil, gaspool, statedb, header, tx, cfg); err != nil {
+ return // Ugh, something went horribly wrong, bail out
+ }
+ }
+}
+
+// precacheTransaction attempts to apply a transaction to the given state database
+// and uses the input parameters for its environment. The goal is not to execute
+// the transaction successfully, rather to warm up touched data slots.
+func precacheTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gaspool *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, cfg vm.Config) error {
+ // Convert the transaction into an executable message and pre-cache its sender
+ msg, err := tx.AsMessage(types.MakeSigner(config, header.Number))
+ if err != nil {
+ return err
+ }
+ // Create the EVM and execute the transaction
+ context := NewEVMContext(msg, header, bc, author)
+ vm := vm.NewEVM(context, statedb, config, cfg)
+
+ _, _, _, err = ApplyMessage(vm, msg, gaspool)
+ return err
+}
diff --git a/core/state_processor.go b/core/state_processor.go
new file mode 100644
index 0000000..680d512
--- /dev/null
+++ b/core/state_processor.go
@@ -0,0 +1,129 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/consensus"
+ "github.com/ava-labs/go-ethereum/consensus/misc"
+ "github.com/ava-labs/go-ethereum/core/state"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/core/vm"
+ "github.com/ava-labs/go-ethereum/crypto"
+ "github.com/ava-labs/go-ethereum/params"
+)
+
+// StateProcessor is a basic Processor, which takes care of transitioning
+// state from one point to another.
+//
+// StateProcessor implements Processor.
+type StateProcessor struct {
+ config *params.ChainConfig // Chain configuration options
+ bc *BlockChain // Canonical block chain
+ engine consensus.Engine // Consensus engine used for block rewards
+}
+
+// NewStateProcessor initialises a new StateProcessor.
+func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *StateProcessor {
+ return &StateProcessor{
+ config: config,
+ bc: bc,
+ engine: engine,
+ }
+}
+
+// Process processes the state changes according to the Ethereum rules by running
+// the transaction messages using the statedb and applying any rewards to both
+// the processor (coinbase) and any included uncles.
+//
+// Process returns the receipts and logs accumulated during the process and
+// returns the amount of gas that was used in the process. If any of the
+// transactions failed to execute due to insufficient gas it will return an error.
+func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) {
+ var (
+ receipts types.Receipts
+ usedGas = new(uint64)
+ header = block.Header()
+ allLogs []*types.Log
+ gp = new(GasPool).AddGas(block.GasLimit())
+ )
+ // Mutate the block and state according to any hard-fork specs
+ if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 {
+ misc.ApplyDAOHardFork(statedb)
+ }
+ // Iterate over and process the individual transactions
+ for i, tx := range block.Transactions() {
+ statedb.Prepare(tx.Hash(), block.Hash(), i)
+ receipt, _, err := ApplyTransaction(p.config, p.bc, nil, gp, statedb, header, tx, usedGas, cfg)
+ if err != nil {
+ return nil, nil, 0, err
+ }
+ receipts = append(receipts, receipt)
+ allLogs = append(allLogs, receipt.Logs...)
+ }
+ // Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
+ p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles())
+
+ return receipts, allLogs, *usedGas, nil
+}
+
+// ApplyTransaction attempts to apply a transaction to the given state database
+// and uses the input parameters for its environment. It returns the receipt
+// for the transaction, gas used and an error if the transaction failed,
+// indicating the block was invalid.
+func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config) (*types.Receipt, uint64, error) {
+ msg, err := tx.AsMessage(types.MakeSigner(config, header.Number))
+ if err != nil {
+ return nil, 0, err
+ }
+ // Create a new context to be used in the EVM environment
+ context := NewEVMContext(msg, header, bc, author)
+ // Create a new environment which holds all relevant information
+ // about the transaction and calling mechanisms.
+ vmenv := vm.NewEVM(context, statedb, config, cfg)
+ // Apply the transaction to the current state (included in the env)
+ _, gas, failed, err := ApplyMessage(vmenv, msg, gp)
+ if err != nil {
+ return nil, 0, err
+ }
+ // Update the state with pending changes
+ var root []byte
+ if config.IsByzantium(header.Number) {
+ statedb.Finalise(true)
+ } else {
+ root = statedb.IntermediateRoot(config.IsEIP158(header.Number)).Bytes()
+ }
+ *usedGas += gas
+
+ // Create a new receipt for the transaction, storing the intermediate root and gas used by the tx
+ // based on the eip phase, we're passing whether the root touch-delete accounts.
+ receipt := types.NewReceipt(root, failed, *usedGas)
+ receipt.TxHash = tx.Hash()
+ receipt.GasUsed = gas
+ // if the transaction created a contract, store the creation address in the receipt.
+ if msg.To() == nil {
+ receipt.ContractAddress = crypto.CreateAddress(vmenv.Context.Origin, tx.Nonce())
+ }
+ // Set the receipt logs and create a bloom for filtering
+ receipt.Logs = statedb.GetLogs(tx.Hash())
+ receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
+ receipt.BlockHash = statedb.BlockHash()
+ receipt.BlockNumber = header.Number
+ receipt.TransactionIndex = uint(statedb.TxIndex())
+
+ return receipt, gas, err
+}
diff --git a/core/state_transition.go b/core/state_transition.go
new file mode 100644
index 0000000..f648bce
--- /dev/null
+++ b/core/state_transition.go
@@ -0,0 +1,255 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "errors"
+ "math"
+ "math/big"
+
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/core/vm"
+ "github.com/ava-labs/go-ethereum/log"
+ "github.com/ava-labs/go-ethereum/params"
+)
+
+var (
+ errInsufficientBalanceForGas = errors.New("insufficient balance to pay for gas")
+)
+
+/*
+The State Transitioning Model
+
+A state transition is a change made when a transaction is applied to the current world state
+The state transitioning model does all the necessary work to work out a valid new state root.
+
+1) Nonce handling
+2) Pre pay gas
+3) Create a new state object if the recipient is \0*32
+4) Value transfer
+== If contract creation ==
+ 4a) Attempt to run transaction data
+ 4b) If valid, use result as code for the new state object
+== end ==
+5) Run Script section
+6) Derive new state root
+*/
+type StateTransition struct {
+ gp *GasPool
+ msg Message
+ gas uint64
+ gasPrice *big.Int
+ initialGas uint64
+ value *big.Int
+ data []byte
+ state vm.StateDB
+ evm *vm.EVM
+}
+
+// Message represents a message sent to a contract.
+type Message interface {
+ From() common.Address
+ //FromFrontier() (common.Address, error)
+ To() *common.Address
+
+ GasPrice() *big.Int
+ Gas() uint64
+ Value() *big.Int
+
+ Nonce() uint64
+ CheckNonce() bool
+ Data() []byte
+}
+
+// IntrinsicGas computes the 'intrinsic gas' for a message with the given data.
+func IntrinsicGas(data []byte, contractCreation, isEIP155 bool, isEIP2028 bool) (uint64, error) {
+ // Set the starting gas for the raw transaction
+ var gas uint64
+ if contractCreation && isEIP155 {
+ gas = params.TxGasContractCreation
+ } else {
+ gas = params.TxGas
+ }
+ // Bump the required gas by the amount of transactional data
+ if len(data) > 0 {
+ // Zero and non-zero bytes are priced differently
+ var nz uint64
+ for _, byt := range data {
+ if byt != 0 {
+ nz++
+ }
+ }
+ // Make sure we don't exceed uint64 for all data combinations
+ nonZeroGas := params.TxDataNonZeroGasFrontier
+ if isEIP2028 {
+ nonZeroGas = params.TxDataNonZeroGasEIP2028
+ }
+ if (math.MaxUint64-gas)/nonZeroGas < nz {
+ return 0, vm.ErrOutOfGas
+ }
+ gas += nz * nonZeroGas
+
+ z := uint64(len(data)) - nz
+ if (math.MaxUint64-gas)/params.TxDataZeroGas < z {
+ return 0, vm.ErrOutOfGas
+ }
+ gas += z * params.TxDataZeroGas
+ }
+ return gas, nil
+}
+
+// NewStateTransition initialises and returns a new state transition object.
+func NewStateTransition(evm *vm.EVM, msg Message, gp *GasPool) *StateTransition {
+ return &StateTransition{
+ gp: gp,
+ evm: evm,
+ msg: msg,
+ gasPrice: msg.GasPrice(),
+ value: msg.Value(),
+ data: msg.Data(),
+ state: evm.StateDB,
+ }
+}
+
+// ApplyMessage computes the new state by applying the given message
+// against the old state within the environment.
+//
+// ApplyMessage returns the bytes returned by any EVM execution (if it took place),
+// the gas used (which includes gas refunds) and an error if it failed. An error always
+// indicates a core error meaning that the message would always fail for that particular
+// state and would never be accepted within a block.
+func ApplyMessage(evm *vm.EVM, msg Message, gp *GasPool) ([]byte, uint64, bool, error) {
+ return NewStateTransition(evm, msg, gp).TransitionDb()
+}
+
+// to returns the recipient of the message.
+func (st *StateTransition) to() common.Address {
+ if st.msg == nil || st.msg.To() == nil /* contract creation */ {
+ return common.Address{}
+ }
+ return *st.msg.To()
+}
+
+func (st *StateTransition) useGas(amount uint64) error {
+ if st.gas < amount {
+ return vm.ErrOutOfGas
+ }
+ st.gas -= amount
+
+ return nil
+}
+
+func (st *StateTransition) buyGas() error {
+ mgval := new(big.Int).Mul(new(big.Int).SetUint64(st.msg.Gas()), st.gasPrice)
+ if st.state.GetBalance(st.msg.From()).Cmp(mgval) < 0 {
+ return errInsufficientBalanceForGas
+ }
+ if err := st.gp.SubGas(st.msg.Gas()); err != nil {
+ return err
+ }
+ st.gas += st.msg.Gas()
+
+ st.initialGas = st.msg.Gas()
+ st.state.SubBalance(st.msg.From(), mgval)
+ return nil
+}
+
+func (st *StateTransition) preCheck() error {
+ // Make sure this transaction's nonce is correct.
+ if st.msg.CheckNonce() {
+ nonce := st.state.GetNonce(st.msg.From())
+ if nonce < st.msg.Nonce() {
+ return ErrNonceTooHigh
+ } else if nonce > st.msg.Nonce() {
+ return ErrNonceTooLow
+ }
+ }
+ return st.buyGas()
+}
+
+// TransitionDb will transition the state by applying the current message and
+// returning the result including the used gas. It returns an error if failed.
+// An error indicates a consensus issue.
+func (st *StateTransition) TransitionDb() (ret []byte, usedGas uint64, failed bool, err error) {
+ if err = st.preCheck(); err != nil {
+ return
+ }
+ msg := st.msg
+ sender := vm.AccountRef(msg.From())
+ homestead := st.evm.ChainConfig().IsHomestead(st.evm.BlockNumber)
+ istanbul := st.evm.ChainConfig().IsIstanbul(st.evm.BlockNumber)
+ contractCreation := msg.To() == nil
+
+ // Pay intrinsic gas
+ gas, err := IntrinsicGas(st.data, contractCreation, homestead, istanbul)
+ if err != nil {
+ return nil, 0, false, err
+ }
+ if err = st.useGas(gas); err != nil {
+ return nil, 0, false, err
+ }
+
+ var (
+ evm = st.evm
+ // vm errors do not effect consensus and are therefor
+ // not assigned to err, except for insufficient balance
+ // error.
+ vmerr error
+ )
+ if contractCreation {
+ ret, _, st.gas, vmerr = evm.Create(sender, st.data, st.gas, st.value)
+ } else {
+ // Increment the nonce for the next transaction
+ st.state.SetNonce(msg.From(), st.state.GetNonce(sender.Address())+1)
+ ret, st.gas, vmerr = evm.Call(sender, st.to(), st.data, st.gas, st.value)
+ }
+ if vmerr != nil {
+ log.Debug("VM returned with error", "err", vmerr)
+ // The only possible consensus-error would be if there wasn't
+ // sufficient balance to make the transfer happen. The first
+ // balance transfer may never fail.
+ if vmerr == vm.ErrInsufficientBalance {
+ return nil, 0, false, vmerr
+ }
+ }
+ st.refundGas()
+ st.state.AddBalance(st.evm.Coinbase, new(big.Int).Mul(new(big.Int).SetUint64(st.gasUsed()), st.gasPrice))
+
+ return ret, st.gasUsed(), vmerr != nil, err
+}
+
+func (st *StateTransition) refundGas() {
+ // Apply refund counter, capped to half of the used gas.
+ refund := st.gasUsed() / 2
+ if refund > st.state.GetRefund() {
+ refund = st.state.GetRefund()
+ }
+ st.gas += refund
+
+ // Return ETH for remaining gas, exchanged at the original rate.
+ remaining := new(big.Int).Mul(new(big.Int).SetUint64(st.gas), st.gasPrice)
+ st.state.AddBalance(st.msg.From(), remaining)
+
+ // Also return remaining gas to the block gas counter so it is
+ // available for the next transaction.
+ st.gp.AddGas(st.gas)
+}
+
+// gasUsed returns the amount of gas used up by the state transition.
+func (st *StateTransition) gasUsed() uint64 {
+ return st.initialGas - st.gas
+}
diff --git a/core/tx_cacher.go b/core/tx_cacher.go
new file mode 100644
index 0000000..ffac7f1
--- /dev/null
+++ b/core/tx_cacher.go
@@ -0,0 +1,105 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "runtime"
+
+ "github.com/ava-labs/go-ethereum/core/types"
+)
+
+// senderCacher is a concurrent transaction sender recoverer and cacher.
+var senderCacher = newTxSenderCacher(runtime.NumCPU())
+
+// txSenderCacherRequest is a request for recovering transaction senders with a
+// specific signature scheme and caching it into the transactions themselves.
+//
+// The inc field defines the number of transactions to skip after each recovery,
+// which is used to feed the same underlying input array to different threads but
+// ensure they process the early transactions fast.
+type txSenderCacherRequest struct {
+ signer types.Signer
+ txs []*types.Transaction
+ inc int
+}
+
+// txSenderCacher is a helper structure to concurrently ecrecover transaction
+// senders from digital signatures on background threads.
+type txSenderCacher struct {
+ threads int
+ tasks chan *txSenderCacherRequest
+}
+
+// newTxSenderCacher creates a new transaction sender background cacher and starts
+// as many processing goroutines as allowed by the GOMAXPROCS on construction.
+func newTxSenderCacher(threads int) *txSenderCacher {
+ cacher := &txSenderCacher{
+ tasks: make(chan *txSenderCacherRequest, threads),
+ threads: threads,
+ }
+ for i := 0; i < threads; i++ {
+ go cacher.cache()
+ }
+ return cacher
+}
+
+// cache is an infinite loop, caching transaction senders from various forms of
+// data structures.
+func (cacher *txSenderCacher) cache() {
+ for task := range cacher.tasks {
+ for i := 0; i < len(task.txs); i += task.inc {
+ types.Sender(task.signer, task.txs[i])
+ }
+ }
+}
+
+// recover recovers the senders from a batch of transactions and caches them
+// back into the same data structures. There is no validation being done, nor
+// any reaction to invalid signatures. That is up to calling code later.
+func (cacher *txSenderCacher) recover(signer types.Signer, txs []*types.Transaction) {
+ // If there's nothing to recover, abort
+ if len(txs) == 0 {
+ return
+ }
+ // Ensure we have meaningful task sizes and schedule the recoveries
+ tasks := cacher.threads
+ if len(txs) < tasks*4 {
+ tasks = (len(txs) + 3) / 4
+ }
+ for i := 0; i < tasks; i++ {
+ cacher.tasks <- &txSenderCacherRequest{
+ signer: signer,
+ txs: txs[i:],
+ inc: tasks,
+ }
+ }
+}
+
+// recoverFromBlocks recovers the senders from a batch of blocks and caches them
+// back into the same data structures. There is no validation being done, nor
+// any reaction to invalid signatures. That is up to calling code later.
+func (cacher *txSenderCacher) recoverFromBlocks(signer types.Signer, blocks []*types.Block) {
+ count := 0
+ for _, block := range blocks {
+ count += len(block.Transactions())
+ }
+ txs := make([]*types.Transaction, 0, count)
+ for _, block := range blocks {
+ txs = append(txs, block.Transactions()...)
+ }
+ cacher.recover(signer, txs)
+}
diff --git a/core/tx_journal.go b/core/tx_journal.go
new file mode 100644
index 0000000..dfd8dab
--- /dev/null
+++ b/core/tx_journal.go
@@ -0,0 +1,180 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "errors"
+ "io"
+ "os"
+
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/log"
+ "github.com/ava-labs/go-ethereum/rlp"
+)
+
+// errNoActiveJournal is returned if a transaction is attempted to be inserted
+// into the journal, but no such file is currently open.
+var errNoActiveJournal = errors.New("no active journal")
+
+// devNull is a WriteCloser that just discards anything written into it. Its
+// goal is to allow the transaction journal to write into a fake journal when
+// loading transactions on startup without printing warnings due to no file
+// being read for write.
+type devNull struct{}
+
+func (*devNull) Write(p []byte) (n int, err error) { return len(p), nil }
+func (*devNull) Close() error { return nil }
+
+// txJournal is a rotating log of transactions with the aim of storing locally
+// created transactions to allow non-executed ones to survive node restarts.
+type txJournal struct {
+ path string // Filesystem path to store the transactions at
+ writer io.WriteCloser // Output stream to write new transactions into
+}
+
+// newTxJournal creates a new transaction journal to
+func newTxJournal(path string) *txJournal {
+ return &txJournal{
+ path: path,
+ }
+}
+
+// load parses a transaction journal dump from disk, loading its contents into
+// the specified pool.
+func (journal *txJournal) load(add func([]*types.Transaction) []error) error {
+ // Skip the parsing if the journal file doesn't exist at all
+ if _, err := os.Stat(journal.path); os.IsNotExist(err) {
+ return nil
+ }
+ // Open the journal for loading any past transactions
+ input, err := os.Open(journal.path)
+ if err != nil {
+ return err
+ }
+ defer input.Close()
+
+ // Temporarily discard any journal additions (don't double add on load)
+ journal.writer = new(devNull)
+ defer func() { journal.writer = nil }()
+
+ // Inject all transactions from the journal into the pool
+ stream := rlp.NewStream(input, 0)
+ total, dropped := 0, 0
+
+ // Create a method to load a limited batch of transactions and bump the
+ // appropriate progress counters. Then use this method to load all the
+ // journaled transactions in small-ish batches.
+ loadBatch := func(txs types.Transactions) {
+ for _, err := range add(txs) {
+ if err != nil {
+ log.Debug("Failed to add journaled transaction", "err", err)
+ dropped++
+ }
+ }
+ }
+ var (
+ failure error
+ batch types.Transactions
+ )
+ for {
+ // Parse the next transaction and terminate on error
+ tx := new(types.Transaction)
+ if err = stream.Decode(tx); err != nil {
+ if err != io.EOF {
+ failure = err
+ }
+ if batch.Len() > 0 {
+ loadBatch(batch)
+ }
+ break
+ }
+ // New transaction parsed, queue up for later, import if threshold is reached
+ total++
+
+ if batch = append(batch, tx); batch.Len() > 1024 {
+ loadBatch(batch)
+ batch = batch[:0]
+ }
+ }
+ log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped)
+
+ return failure
+}
+
+// insert adds the specified transaction to the local disk journal.
+func (journal *txJournal) insert(tx *types.Transaction) error {
+ if journal.writer == nil {
+ return errNoActiveJournal
+ }
+ if err := rlp.Encode(journal.writer, tx); err != nil {
+ return err
+ }
+ return nil
+}
+
+// rotate regenerates the transaction journal based on the current contents of
+// the transaction pool.
+func (journal *txJournal) rotate(all map[common.Address]types.Transactions) error {
+ // Close the current journal (if any is open)
+ if journal.writer != nil {
+ if err := journal.writer.Close(); err != nil {
+ return err
+ }
+ journal.writer = nil
+ }
+ // Generate a new journal with the contents of the current pool
+ replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
+ if err != nil {
+ return err
+ }
+ journaled := 0
+ for _, txs := range all {
+ for _, tx := range txs {
+ if err = rlp.Encode(replacement, tx); err != nil {
+ replacement.Close()
+ return err
+ }
+ }
+ journaled += len(txs)
+ }
+ replacement.Close()
+
+ // Replace the live journal with the newly generated one
+ if err = os.Rename(journal.path+".new", journal.path); err != nil {
+ return err
+ }
+ sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0755)
+ if err != nil {
+ return err
+ }
+ journal.writer = sink
+ log.Info("Regenerated local transaction journal", "transactions", journaled, "accounts", len(all))
+
+ return nil
+}
+
+// close flushes the transaction journal contents to disk and closes the file.
+func (journal *txJournal) close() error {
+ var err error
+
+ if journal.writer != nil {
+ err = journal.writer.Close()
+ journal.writer = nil
+ }
+ return err
+}
diff --git a/core/tx_list.go b/core/tx_list.go
new file mode 100644
index 0000000..a54ca77
--- /dev/null
+++ b/core/tx_list.go
@@ -0,0 +1,520 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "container/heap"
+ "math"
+ "math/big"
+ "sort"
+
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/log"
+)
+
+// nonceHeap is a heap.Interface implementation over 64bit unsigned integers for
+// retrieving sorted transactions from the possibly gapped future queue.
+type nonceHeap []uint64
+
+func (h nonceHeap) Len() int { return len(h) }
+func (h nonceHeap) Less(i, j int) bool { return h[i] < h[j] }
+func (h nonceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
+
+func (h *nonceHeap) Push(x interface{}) {
+ *h = append(*h, x.(uint64))
+}
+
+func (h *nonceHeap) Pop() interface{} {
+ old := *h
+ n := len(old)
+ x := old[n-1]
+ *h = old[0 : n-1]
+ return x
+}
+
+// txSortedMap is a nonce->transaction hash map with a heap based index to allow
+// iterating over the contents in a nonce-incrementing way.
+type txSortedMap struct {
+ items map[uint64]*types.Transaction // Hash map storing the transaction data
+ index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode)
+ cache types.Transactions // Cache of the transactions already sorted
+}
+
+// newTxSortedMap creates a new nonce-sorted transaction map.
+func newTxSortedMap() *txSortedMap {
+ return &txSortedMap{
+ items: make(map[uint64]*types.Transaction),
+ index: new(nonceHeap),
+ }
+}
+
+// Get retrieves the current transactions associated with the given nonce.
+func (m *txSortedMap) Get(nonce uint64) *types.Transaction {
+ return m.items[nonce]
+}
+
+// Put inserts a new transaction into the map, also updating the map's nonce
+// index. If a transaction already exists with the same nonce, it's overwritten.
+func (m *txSortedMap) Put(tx *types.Transaction) {
+ nonce := tx.Nonce()
+ if m.items[nonce] == nil {
+ heap.Push(m.index, nonce)
+ }
+ m.items[nonce], m.cache = tx, nil
+}
+
+// Forward removes all transactions from the map with a nonce lower than the
+// provided threshold. Every removed transaction is returned for any post-removal
+// maintenance.
+func (m *txSortedMap) Forward(threshold uint64) types.Transactions {
+ var removed types.Transactions
+
+ // Pop off heap items until the threshold is reached
+ for m.index.Len() > 0 && (*m.index)[0] < threshold {
+ nonce := heap.Pop(m.index).(uint64)
+ removed = append(removed, m.items[nonce])
+ delete(m.items, nonce)
+ }
+ // If we had a cached order, shift the front
+ if m.cache != nil {
+ m.cache = m.cache[len(removed):]
+ }
+ return removed
+}
+
+// Filter iterates over the list of transactions and removes all of them for which
+// the specified function evaluates to true.
+func (m *txSortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions {
+ var removed types.Transactions
+
+ // Collect all the transactions to filter out
+ for nonce, tx := range m.items {
+ if filter(tx) {
+ removed = append(removed, tx)
+ delete(m.items, nonce)
+ }
+ }
+ // If transactions were removed, the heap and cache are ruined
+ if len(removed) > 0 {
+ *m.index = make([]uint64, 0, len(m.items))
+ for nonce := range m.items {
+ *m.index = append(*m.index, nonce)
+ }
+ heap.Init(m.index)
+
+ m.cache = nil
+ }
+ return removed
+}
+
+// Cap places a hard limit on the number of items, returning all transactions
+// exceeding that limit.
+func (m *txSortedMap) Cap(threshold int) types.Transactions {
+ // Short circuit if the number of items is under the limit
+ if len(m.items) <= threshold {
+ return nil
+ }
+ // Otherwise gather and drop the highest nonce'd transactions
+ var drops types.Transactions
+
+ sort.Sort(*m.index)
+ for size := len(m.items); size > threshold; size-- {
+ drops = append(drops, m.items[(*m.index)[size-1]])
+ delete(m.items, (*m.index)[size-1])
+ }
+ *m.index = (*m.index)[:threshold]
+ heap.Init(m.index)
+
+ // If we had a cache, shift the back
+ if m.cache != nil {
+ m.cache = m.cache[:len(m.cache)-len(drops)]
+ }
+ return drops
+}
+
+// Remove deletes a transaction from the maintained map, returning whether the
+// transaction was found.
+func (m *txSortedMap) Remove(nonce uint64) bool {
+ // Short circuit if no transaction is present
+ _, ok := m.items[nonce]
+ if !ok {
+ return false
+ }
+ // Otherwise delete the transaction and fix the heap index
+ for i := 0; i < m.index.Len(); i++ {
+ if (*m.index)[i] == nonce {
+ heap.Remove(m.index, i)
+ break
+ }
+ }
+ delete(m.items, nonce)
+ m.cache = nil
+
+ return true
+}
+
+// Ready retrieves a sequentially increasing list of transactions starting at the
+// provided nonce that is ready for processing. The returned transactions will be
+// removed from the list.
+//
+// Note, all transactions with nonces lower than start will also be returned to
+// prevent getting into and invalid state. This is not something that should ever
+// happen but better to be self correcting than failing!
+func (m *txSortedMap) Ready(start uint64) types.Transactions {
+ // Short circuit if no transactions are available
+ if m.index.Len() == 0 || (*m.index)[0] > start {
+ return nil
+ }
+ // Otherwise start accumulating incremental transactions
+ var ready types.Transactions
+ for next := (*m.index)[0]; m.index.Len() > 0 && (*m.index)[0] == next; next++ {
+ ready = append(ready, m.items[next])
+ delete(m.items, next)
+ heap.Pop(m.index)
+ }
+ m.cache = nil
+
+ return ready
+}
+
+// Len returns the length of the transaction map.
+func (m *txSortedMap) Len() int {
+ return len(m.items)
+}
+
+// Flatten creates a nonce-sorted slice of transactions based on the loosely
+// sorted internal representation. The result of the sorting is cached in case
+// it's requested again before any modifications are made to the contents.
+func (m *txSortedMap) Flatten() types.Transactions {
+ // If the sorting was not cached yet, create and cache it
+ if m.cache == nil {
+ m.cache = make(types.Transactions, 0, len(m.items))
+ for _, tx := range m.items {
+ m.cache = append(m.cache, tx)
+ }
+ sort.Sort(types.TxByNonce(m.cache))
+ }
+ // Copy the cache to prevent accidental modifications
+ txs := make(types.Transactions, len(m.cache))
+ copy(txs, m.cache)
+ return txs
+}
+
+// txList is a "list" of transactions belonging to an account, sorted by account
+// nonce. The same type can be used both for storing contiguous transactions for
+// the executable/pending queue; and for storing gapped transactions for the non-
+// executable/future queue, with minor behavioral changes.
+type txList struct {
+ strict bool // Whether nonces are strictly continuous or not
+ txs *txSortedMap // Heap indexed sorted hash map of the transactions
+
+ costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance)
+ gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit)
+}
+
+// newTxList create a new transaction list for maintaining nonce-indexable fast,
+// gapped, sortable transaction lists.
+func newTxList(strict bool) *txList {
+ return &txList{
+ strict: strict,
+ txs: newTxSortedMap(),
+ costcap: new(big.Int),
+ }
+}
+
+// Overlaps returns whether the transaction specified has the same nonce as one
+// already contained within the list.
+func (l *txList) Overlaps(tx *types.Transaction) bool {
+ return l.txs.Get(tx.Nonce()) != nil
+}
+
+// Add tries to insert a new transaction into the list, returning whether the
+// transaction was accepted, and if yes, any previous transaction it replaced.
+//
+// If the new transaction is accepted into the list, the lists' cost and gas
+// thresholds are also potentially updated.
+func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Transaction) {
+ // If there's an older better transaction, abort
+ old := l.txs.Get(tx.Nonce())
+ if old != nil {
+ threshold := new(big.Int).Div(new(big.Int).Mul(old.GasPrice(), big.NewInt(100+int64(priceBump))), big.NewInt(100))
+ // Have to ensure that the new gas price is higher than the old gas
+ // price as well as checking the percentage threshold to ensure that
+ // this is accurate for low (Wei-level) gas price replacements
+ if old.GasPrice().Cmp(tx.GasPrice()) >= 0 || threshold.Cmp(tx.GasPrice()) > 0 {
+ return false, nil
+ }
+ }
+ // Otherwise overwrite the old transaction with the current one
+ l.txs.Put(tx)
+ if cost := tx.Cost(); l.costcap.Cmp(cost) < 0 {
+ l.costcap = cost
+ }
+ if gas := tx.Gas(); l.gascap < gas {
+ l.gascap = gas
+ }
+ return true, old
+}
+
+// Forward removes all transactions from the list with a nonce lower than the
+// provided threshold. Every removed transaction is returned for any post-removal
+// maintenance.
+func (l *txList) Forward(threshold uint64) types.Transactions {
+ return l.txs.Forward(threshold)
+}
+
+// Filter removes all transactions from the list with a cost or gas limit higher
+// than the provided thresholds. Every removed transaction is returned for any
+// post-removal maintenance. Strict-mode invalidated transactions are also
+// returned.
+//
+// This method uses the cached costcap and gascap to quickly decide if there's even
+// a point in calculating all the costs or if the balance covers all. If the threshold
+// is lower than the costgas cap, the caps will be reset to a new high after removing
+// the newly invalidated transactions.
+func (l *txList) Filter(costLimit *big.Int, gasLimit uint64) (types.Transactions, types.Transactions) {
+ // If all transactions are below the threshold, short circuit
+ if l.costcap.Cmp(costLimit) <= 0 && l.gascap <= gasLimit {
+ return nil, nil
+ }
+ l.costcap = new(big.Int).Set(costLimit) // Lower the caps to the thresholds
+ l.gascap = gasLimit
+
+ // Filter out all the transactions above the account's funds
+ removed := l.txs.Filter(func(tx *types.Transaction) bool { return tx.Cost().Cmp(costLimit) > 0 || tx.Gas() > gasLimit })
+
+ // If the list was strict, filter anything above the lowest nonce
+ var invalids types.Transactions
+
+ if l.strict && len(removed) > 0 {
+ lowest := uint64(math.MaxUint64)
+ for _, tx := range removed {
+ if nonce := tx.Nonce(); lowest > nonce {
+ lowest = nonce
+ }
+ }
+ invalids = l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest })
+ }
+ return removed, invalids
+}
+
+// Cap places a hard limit on the number of items, returning all transactions
+// exceeding that limit.
+func (l *txList) Cap(threshold int) types.Transactions {
+ return l.txs.Cap(threshold)
+}
+
+// Remove deletes a transaction from the maintained list, returning whether the
+// transaction was found, and also returning any transaction invalidated due to
+// the deletion (strict mode only).
+func (l *txList) Remove(tx *types.Transaction) (bool, types.Transactions) {
+ // Remove the transaction from the set
+ nonce := tx.Nonce()
+ if removed := l.txs.Remove(nonce); !removed {
+ return false, nil
+ }
+ // In strict mode, filter out non-executable transactions
+ if l.strict {
+ return true, l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > nonce })
+ }
+ return true, nil
+}
+
+// Ready retrieves a sequentially increasing list of transactions starting at the
+// provided nonce that is ready for processing. The returned transactions will be
+// removed from the list.
+//
+// Note, all transactions with nonces lower than start will also be returned to
+// prevent getting into and invalid state. This is not something that should ever
+// happen but better to be self correcting than failing!
+func (l *txList) Ready(start uint64) types.Transactions {
+ return l.txs.Ready(start)
+}
+
+// Len returns the length of the transaction list.
+func (l *txList) Len() int {
+ return l.txs.Len()
+}
+
+// Empty returns whether the list of transactions is empty or not.
+func (l *txList) Empty() bool {
+ return l.Len() == 0
+}
+
+// Flatten creates a nonce-sorted slice of transactions based on the loosely
+// sorted internal representation. The result of the sorting is cached in case
+// it's requested again before any modifications are made to the contents.
+func (l *txList) Flatten() types.Transactions {
+ return l.txs.Flatten()
+}
+
+// priceHeap is a heap.Interface implementation over transactions for retrieving
+// price-sorted transactions to discard when the pool fills up.
+type priceHeap []*types.Transaction
+
+func (h priceHeap) Len() int { return len(h) }
+func (h priceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
+
+func (h priceHeap) Less(i, j int) bool {
+ // Sort primarily by price, returning the cheaper one
+ switch h[i].GasPrice().Cmp(h[j].GasPrice()) {
+ case -1:
+ return true
+ case 1:
+ return false
+ }
+ // If the prices match, stabilize via nonces (high nonce is worse)
+ return h[i].Nonce() > h[j].Nonce()
+}
+
+func (h *priceHeap) Push(x interface{}) {
+ *h = append(*h, x.(*types.Transaction))
+}
+
+func (h *priceHeap) Pop() interface{} {
+ old := *h
+ n := len(old)
+ x := old[n-1]
+ *h = old[0 : n-1]
+ return x
+}
+
+// txPricedList is a price-sorted heap to allow operating on transactions pool
+// contents in a price-incrementing way.
+type txPricedList struct {
+ all *txLookup // Pointer to the map of all transactions
+ items *priceHeap // Heap of prices of all the stored transactions
+ stales int // Number of stale price points to (re-heap trigger)
+}
+
+// newTxPricedList creates a new price-sorted transaction heap.
+func newTxPricedList(all *txLookup) *txPricedList {
+ return &txPricedList{
+ all: all,
+ items: new(priceHeap),
+ }
+}
+
+// Put inserts a new transaction into the heap.
+func (l *txPricedList) Put(tx *types.Transaction) {
+ heap.Push(l.items, tx)
+}
+
+// Removed notifies the prices transaction list that an old transaction dropped
+// from the pool. The list will just keep a counter of stale objects and update
+// the heap if a large enough ratio of transactions go stale.
+func (l *txPricedList) Removed(count int) {
+ // Bump the stale counter, but exit if still too low (< 25%)
+ l.stales += count
+ if l.stales <= len(*l.items)/4 {
+ return
+ }
+ // Seems we've reached a critical number of stale transactions, reheap
+ reheap := make(priceHeap, 0, l.all.Count())
+
+ l.stales, l.items = 0, &reheap
+ l.all.Range(func(hash common.Hash, tx *types.Transaction) bool {
+ *l.items = append(*l.items, tx)
+ return true
+ })
+ heap.Init(l.items)
+}
+
+// Cap finds all the transactions below the given price threshold, drops them
+// from the priced list and returns them for further removal from the entire pool.
+func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transactions {
+ drop := make(types.Transactions, 0, 128) // Remote underpriced transactions to drop
+ save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep
+
+ for len(*l.items) > 0 {
+ // Discard stale transactions if found during cleanup
+ tx := heap.Pop(l.items).(*types.Transaction)
+ if l.all.Get(tx.Hash()) == nil {
+ l.stales--
+ continue
+ }
+ // Stop the discards if we've reached the threshold
+ if tx.GasPrice().Cmp(threshold) >= 0 {
+ save = append(save, tx)
+ break
+ }
+ // Non stale transaction found, discard unless local
+ if local.containsTx(tx) {
+ save = append(save, tx)
+ } else {
+ drop = append(drop, tx)
+ }
+ }
+ for _, tx := range save {
+ heap.Push(l.items, tx)
+ }
+ return drop
+}
+
+// Underpriced checks whether a transaction is cheaper than (or as cheap as) the
+// lowest priced transaction currently being tracked.
+func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) bool {
+ // Local transactions cannot be underpriced
+ if local.containsTx(tx) {
+ return false
+ }
+ // Discard stale price points if found at the heap start
+ for len(*l.items) > 0 {
+ head := []*types.Transaction(*l.items)[0]
+ if l.all.Get(head.Hash()) == nil {
+ l.stales--
+ heap.Pop(l.items)
+ continue
+ }
+ break
+ }
+ // Check if the transaction is underpriced or not
+ if len(*l.items) == 0 {
+ log.Error("Pricing query for empty pool") // This cannot happen, print to catch programming errors
+ return false
+ }
+ cheapest := []*types.Transaction(*l.items)[0]
+ return cheapest.GasPrice().Cmp(tx.GasPrice()) >= 0
+}
+
+// Discard finds a number of most underpriced transactions, removes them from the
+// priced list and returns them for further removal from the entire pool.
+func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions {
+ drop := make(types.Transactions, 0, count) // Remote underpriced transactions to drop
+ save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep
+
+ for len(*l.items) > 0 && count > 0 {
+ // Discard stale transactions if found during cleanup
+ tx := heap.Pop(l.items).(*types.Transaction)
+ if l.all.Get(tx.Hash()) == nil {
+ l.stales--
+ continue
+ }
+ // Non stale transaction found, discard unless local
+ if local.containsTx(tx) {
+ save = append(save, tx)
+ } else {
+ drop = append(drop, tx)
+ count--
+ }
+ }
+ for _, tx := range save {
+ heap.Push(l.items, tx)
+ }
+ return drop
+}
diff --git a/core/tx_noncer.go b/core/tx_noncer.go
new file mode 100644
index 0000000..d52e267
--- /dev/null
+++ b/core/tx_noncer.go
@@ -0,0 +1,79 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "sync"
+
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/core/state"
+)
+
+// txNoncer is a tiny virtual state database to manage the executable nonces of
+// accounts in the pool, falling back to reading from a real state database if
+// an account is unknown.
+type txNoncer struct {
+ fallback *state.StateDB
+ nonces map[common.Address]uint64
+ lock sync.Mutex
+}
+
+// newTxNoncer creates a new virtual state database to track the pool nonces.
+func newTxNoncer(statedb *state.StateDB) *txNoncer {
+ return &txNoncer{
+ fallback: statedb.Copy(),
+ nonces: make(map[common.Address]uint64),
+ }
+}
+
+// get returns the current nonce of an account, falling back to a real state
+// database if the account is unknown.
+func (txn *txNoncer) get(addr common.Address) uint64 {
+ // We use mutex for get operation is the underlying
+ // state will mutate db even for read access.
+ txn.lock.Lock()
+ defer txn.lock.Unlock()
+
+ if _, ok := txn.nonces[addr]; !ok {
+ txn.nonces[addr] = txn.fallback.GetNonce(addr)
+ }
+ return txn.nonces[addr]
+}
+
+// set inserts a new virtual nonce into the virtual state database to be returned
+// whenever the pool requests it instead of reaching into the real state database.
+func (txn *txNoncer) set(addr common.Address, nonce uint64) {
+ txn.lock.Lock()
+ defer txn.lock.Unlock()
+
+ txn.nonces[addr] = nonce
+}
+
+// setIfLower updates a new virtual nonce into the virtual state database if the
+// the new one is lower.
+func (txn *txNoncer) setIfLower(addr common.Address, nonce uint64) {
+ txn.lock.Lock()
+ defer txn.lock.Unlock()
+
+ if _, ok := txn.nonces[addr]; !ok {
+ txn.nonces[addr] = txn.fallback.GetNonce(addr)
+ }
+ if txn.nonces[addr] <= nonce {
+ return
+ }
+ txn.nonces[addr] = nonce
+}
diff --git a/core/tx_pool.go b/core/tx_pool.go
new file mode 100644
index 0000000..caabd5c
--- /dev/null
+++ b/core/tx_pool.go
@@ -0,0 +1,1523 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "errors"
+ "fmt"
+ "math"
+ "math/big"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/common/prque"
+ "github.com/ava-labs/go-ethereum/core/state"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/event"
+ "github.com/ava-labs/go-ethereum/log"
+ "github.com/ava-labs/go-ethereum/metrics"
+ "github.com/ava-labs/go-ethereum/params"
+)
+
+const (
+ // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
+ chainHeadChanSize = 10
+)
+
+var (
+ // ErrInvalidSender is returned if the transaction contains an invalid signature.
+ ErrInvalidSender = errors.New("invalid sender")
+
+ // ErrNonceTooLow is returned if the nonce of a transaction is lower than the
+ // one present in the local chain.
+ ErrNonceTooLow = errors.New("nonce too low")
+
+ // ErrUnderpriced is returned if a transaction's gas price is below the minimum
+ // configured for the transaction pool.
+ ErrUnderpriced = errors.New("transaction underpriced")
+
+ // ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
+ // with a different one without the required price bump.
+ ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
+
+ // ErrInsufficientFunds is returned if the total cost of executing a transaction
+ // is higher than the balance of the user's account.
+ ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value")
+
+ // ErrIntrinsicGas is returned if the transaction is specified to use less gas
+ // than required to start the invocation.
+ ErrIntrinsicGas = errors.New("intrinsic gas too low")
+
+ // ErrGasLimit is returned if a transaction's requested gas limit exceeds the
+ // maximum allowance of the current block.
+ ErrGasLimit = errors.New("exceeds block gas limit")
+
+ // ErrNegativeValue is a sanity error to ensure noone is able to specify a
+ // transaction with a negative value.
+ ErrNegativeValue = errors.New("negative value")
+
+ // ErrOversizedData is returned if the input data of a transaction is greater
+ // than some meaningful limit a user might use. This is not a consensus error
+ // making the transaction invalid, rather a DOS protection.
+ ErrOversizedData = errors.New("oversized data")
+)
+
+var (
+ evictionInterval = time.Minute // Time interval to check for evictable transactions
+ statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
+)
+
+var (
+ // Metrics for the pending pool
+ pendingDiscardMeter = metrics.NewRegisteredMeter("txpool/pending/discard", nil)
+ pendingReplaceMeter = metrics.NewRegisteredMeter("txpool/pending/replace", nil)
+ pendingRateLimitMeter = metrics.NewRegisteredMeter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting
+ pendingNofundsMeter = metrics.NewRegisteredMeter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds
+
+ // Metrics for the queued pool
+ queuedDiscardMeter = metrics.NewRegisteredMeter("txpool/queued/discard", nil)
+ queuedReplaceMeter = metrics.NewRegisteredMeter("txpool/queued/replace", nil)
+ queuedRateLimitMeter = metrics.NewRegisteredMeter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting
+ queuedNofundsMeter = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds
+
+ // General tx metrics
+ validMeter = metrics.NewRegisteredMeter("txpool/valid", nil)
+ invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil)
+ underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil)
+
+ pendingCounter = metrics.NewRegisteredCounter("txpool/pending", nil)
+ queuedCounter = metrics.NewRegisteredCounter("txpool/queued", nil)
+ localCounter = metrics.NewRegisteredCounter("txpool/local", nil)
+)
+
+// TxStatus is the current status of a transaction as seen by the pool.
+type TxStatus uint
+
+const (
+ TxStatusUnknown TxStatus = iota
+ TxStatusQueued
+ TxStatusPending
+ TxStatusIncluded
+)
+
+// blockChain provides the state of blockchain and current gas limit to do
+// some pre checks in tx pool and event subscribers.
+type blockChain interface {
+ CurrentBlock() *types.Block
+ GetBlock(hash common.Hash, number uint64) *types.Block
+ StateAt(root common.Hash) (*state.StateDB, error)
+
+ SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
+}
+
+// TxPoolConfig are the configuration parameters of the transaction pool.
+type TxPoolConfig struct {
+ Locals []common.Address // Addresses that should be treated by default as local
+ NoLocals bool // Whether local transaction handling should be disabled
+ Journal string // Journal of local transactions to survive node restarts
+ Rejournal time.Duration // Time interval to regenerate the local transaction journal
+
+ PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
+ PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)
+
+ AccountSlots uint64 // Number of executable transaction slots guaranteed per account
+ GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts
+ AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
+ GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts
+
+ Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
+}
+
+// DefaultTxPoolConfig contains the default configurations for the transaction
+// pool.
+var DefaultTxPoolConfig = TxPoolConfig{
+ Journal: "transactions.rlp",
+ Rejournal: time.Hour,
+
+ PriceLimit: 1,
+ PriceBump: 10,
+
+ AccountSlots: 16,
+ GlobalSlots: 4096,
+ AccountQueue: 64,
+ GlobalQueue: 1024,
+
+ Lifetime: 3 * time.Hour,
+}
+
+// sanitize checks the provided user configurations and changes anything that's
+// unreasonable or unworkable.
+func (config *TxPoolConfig) sanitize() TxPoolConfig {
+ conf := *config
+ if conf.Rejournal < time.Second {
+ log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second)
+ conf.Rejournal = time.Second
+ }
+ if conf.PriceLimit < 1 {
+ log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultTxPoolConfig.PriceLimit)
+ conf.PriceLimit = DefaultTxPoolConfig.PriceLimit
+ }
+ if conf.PriceBump < 1 {
+ log.Warn("Sanitizing invalid txpool price bump", "provided", conf.PriceBump, "updated", DefaultTxPoolConfig.PriceBump)
+ conf.PriceBump = DefaultTxPoolConfig.PriceBump
+ }
+ if conf.AccountSlots < 1 {
+ log.Warn("Sanitizing invalid txpool account slots", "provided", conf.AccountSlots, "updated", DefaultTxPoolConfig.AccountSlots)
+ conf.AccountSlots = DefaultTxPoolConfig.AccountSlots
+ }
+ if conf.GlobalSlots < 1 {
+ log.Warn("Sanitizing invalid txpool global slots", "provided", conf.GlobalSlots, "updated", DefaultTxPoolConfig.GlobalSlots)
+ conf.GlobalSlots = DefaultTxPoolConfig.GlobalSlots
+ }
+ if conf.AccountQueue < 1 {
+ log.Warn("Sanitizing invalid txpool account queue", "provided", conf.AccountQueue, "updated", DefaultTxPoolConfig.AccountQueue)
+ conf.AccountQueue = DefaultTxPoolConfig.AccountQueue
+ }
+ if conf.GlobalQueue < 1 {
+ log.Warn("Sanitizing invalid txpool global queue", "provided", conf.GlobalQueue, "updated", DefaultTxPoolConfig.GlobalQueue)
+ conf.GlobalQueue = DefaultTxPoolConfig.GlobalQueue
+ }
+ if conf.Lifetime < 1 {
+ log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime)
+ conf.Lifetime = DefaultTxPoolConfig.Lifetime
+ }
+ return conf
+}
+
+// TxPool contains all currently known transactions. Transactions
+// enter the pool when they are received from the network or submitted
+// locally. They exit the pool when they are included in the blockchain.
+//
+// The pool separates processable transactions (which can be applied to the
+// current state) and future transactions. Transactions move between those
+// two states over time as they are received and processed.
+type TxPool struct {
+ config TxPoolConfig
+ chainconfig *params.ChainConfig
+ chain blockChain
+ gasPrice *big.Int
+ txFeed event.Feed
+ scope event.SubscriptionScope
+ signer types.Signer
+ mu sync.RWMutex
+
+ istanbul bool // Fork indicator whether we are in the istanbul stage.
+
+ currentState *state.StateDB // Current state in the blockchain head
+ pendingNonces *txNoncer // Pending state tracking virtual nonces
+ currentMaxGas uint64 // Current gas limit for transaction caps
+
+ locals *accountSet // Set of local transaction to exempt from eviction rules
+ journal *txJournal // Journal of local transaction to back up to disk
+
+ pending map[common.Address]*txList // All currently processable transactions
+ queue map[common.Address]*txList // Queued but non-processable transactions
+ beats map[common.Address]time.Time // Last heartbeat from each known account
+ all *txLookup // All transactions to allow lookups
+ priced *txPricedList // All transactions sorted by price
+
+ chainHeadCh chan ChainHeadEvent
+ chainHeadSub event.Subscription
+ reqResetCh chan *txpoolResetRequest
+ reqPromoteCh chan *accountSet
+ queueTxEventCh chan *types.Transaction
+ reorgDoneCh chan chan struct{}
+ reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
+ wg sync.WaitGroup // tracks loop, scheduleReorgLoop
+}
+
+type txpoolResetRequest struct {
+ oldHead, newHead *types.Header
+}
+
+// NewTxPool creates a new transaction pool to gather, sort and filter inbound
+// transactions from the network.
+func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
+ // Sanitize the input to ensure no vulnerable gas prices are set
+ config = (&config).sanitize()
+
+ // Create the transaction pool with its initial settings
+ pool := &TxPool{
+ config: config,
+ chainconfig: chainconfig,
+ chain: chain,
+ signer: types.NewEIP155Signer(chainconfig.ChainID),
+ pending: make(map[common.Address]*txList),
+ queue: make(map[common.Address]*txList),
+ beats: make(map[common.Address]time.Time),
+ all: newTxLookup(),
+ chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
+ reqResetCh: make(chan *txpoolResetRequest),
+ reqPromoteCh: make(chan *accountSet),
+ queueTxEventCh: make(chan *types.Transaction),
+ reorgDoneCh: make(chan chan struct{}),
+ reorgShutdownCh: make(chan struct{}),
+ gasPrice: new(big.Int).SetUint64(config.PriceLimit),
+ }
+ pool.locals = newAccountSet(pool.signer)
+ for _, addr := range config.Locals {
+ log.Info("Setting new local account", "address", addr)
+ pool.locals.add(addr)
+ }
+ pool.priced = newTxPricedList(pool.all)
+ pool.reset(nil, chain.CurrentBlock().Header())
+
+ // Start the reorg loop early so it can handle requests generated during journal loading.
+ pool.wg.Add(1)
+ go pool.scheduleReorgLoop()
+
+ // If local transactions and journaling is enabled, load from disk
+ if !config.NoLocals && config.Journal != "" {
+ pool.journal = newTxJournal(config.Journal)
+
+ if err := pool.journal.load(pool.AddLocals); err != nil {
+ log.Warn("Failed to load transaction journal", "err", err)
+ }
+ if err := pool.journal.rotate(pool.local()); err != nil {
+ log.Warn("Failed to rotate transaction journal", "err", err)
+ }
+ }
+
+ // Subscribe events from blockchain and start the main event loop.
+ pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
+ pool.wg.Add(1)
+ go pool.loop()
+
+ return pool
+}
+
+// loop is the transaction pool's main event loop, waiting for and reacting to
+// outside blockchain events as well as for various reporting and transaction
+// eviction events.
+func (pool *TxPool) loop() {
+ defer pool.wg.Done()
+
+ var (
+ prevPending, prevQueued, prevStales int
+ // Start the stats reporting and transaction eviction tickers
+ report = time.NewTicker(statsReportInterval)
+ evict = time.NewTicker(evictionInterval)
+ journal = time.NewTicker(pool.config.Rejournal)
+ // Track the previous head headers for transaction reorgs
+ head = pool.chain.CurrentBlock()
+ )
+ defer report.Stop()
+ defer evict.Stop()
+ defer journal.Stop()
+
+ for {
+ select {
+ // Handle ChainHeadEvent
+ case ev := <-pool.chainHeadCh:
+ if ev.Block != nil {
+ pool.requestReset(head.Header(), ev.Block.Header())
+ head = ev.Block
+ }
+
+ // System shutdown.
+ case <-pool.chainHeadSub.Err():
+ close(pool.reorgShutdownCh)
+ return
+
+ // Handle stats reporting ticks
+ case <-report.C:
+ pool.mu.RLock()
+ pending, queued := pool.stats()
+ stales := pool.priced.stales
+ pool.mu.RUnlock()
+
+ if pending != prevPending || queued != prevQueued || stales != prevStales {
+ log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
+ prevPending, prevQueued, prevStales = pending, queued, stales
+ }
+
+ // Handle inactive account transaction eviction
+ case <-evict.C:
+ pool.mu.Lock()
+ for addr := range pool.queue {
+ // Skip local transactions from the eviction mechanism
+ if pool.locals.contains(addr) {
+ continue
+ }
+ // Any non-locals old enough should be removed
+ if time.Since(pool.beats[addr]) > pool.config.Lifetime {
+ for _, tx := range pool.queue[addr].Flatten() {
+ pool.removeTx(tx.Hash(), true)
+ }
+ }
+ }
+ pool.mu.Unlock()
+
+ // Handle local transaction journal rotation
+ case <-journal.C:
+ if pool.journal != nil {
+ pool.mu.Lock()
+ if err := pool.journal.rotate(pool.local()); err != nil {
+ log.Warn("Failed to rotate local tx journal", "err", err)
+ }
+ pool.mu.Unlock()
+ }
+ }
+ }
+}
+
+// Stop terminates the transaction pool.
+func (pool *TxPool) Stop() {
+ // Unsubscribe all subscriptions registered from txpool
+ pool.scope.Close()
+
+ // Unsubscribe subscriptions registered from blockchain
+ pool.chainHeadSub.Unsubscribe()
+ pool.wg.Wait()
+
+ if pool.journal != nil {
+ pool.journal.close()
+ }
+ log.Info("Transaction pool stopped")
+}
+
+// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
+// starts sending event to the given channel.
+func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
+ return pool.scope.Track(pool.txFeed.Subscribe(ch))
+}
+
+// GasPrice returns the current gas price enforced by the transaction pool.
+func (pool *TxPool) GasPrice() *big.Int {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ return new(big.Int).Set(pool.gasPrice)
+}
+
+// SetGasPrice updates the minimum price required by the transaction pool for a
+// new transaction, and drops all transactions below this threshold.
+func (pool *TxPool) SetGasPrice(price *big.Int) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ pool.gasPrice = price
+ for _, tx := range pool.priced.Cap(price, pool.locals) {
+ pool.removeTx(tx.Hash(), false)
+ }
+ log.Info("Transaction pool price threshold updated", "price", price)
+}
+
+// Nonce returns the next nonce of an account, with all transactions executable
+// by the pool already applied on top.
+func (pool *TxPool) Nonce(addr common.Address) uint64 {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ return pool.pendingNonces.get(addr)
+}
+
+// Stats retrieves the current pool stats, namely the number of pending and the
+// number of queued (non-executable) transactions.
+func (pool *TxPool) Stats() (int, int) {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ return pool.stats()
+}
+
+// stats retrieves the current pool stats, namely the number of pending and the
+// number of queued (non-executable) transactions.
+func (pool *TxPool) stats() (int, int) {
+ pending := 0
+ for _, list := range pool.pending {
+ pending += list.Len()
+ }
+ queued := 0
+ for _, list := range pool.queue {
+ queued += list.Len()
+ }
+ return pending, queued
+}
+
+// Content retrieves the data content of the transaction pool, returning all the
+// pending as well as queued transactions, grouped by account and sorted by nonce.
+func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ pending := make(map[common.Address]types.Transactions)
+ for addr, list := range pool.pending {
+ pending[addr] = list.Flatten()
+ }
+ queued := make(map[common.Address]types.Transactions)
+ for addr, list := range pool.queue {
+ queued[addr] = list.Flatten()
+ }
+ return pending, queued
+}
+
+// Pending retrieves all currently processable transactions, grouped by origin
+// account and sorted by nonce. The returned transaction set is a copy and can be
+// freely modified by calling code.
+func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ pending := make(map[common.Address]types.Transactions)
+ for addr, list := range pool.pending {
+ pending[addr] = list.Flatten()
+ }
+ return pending, nil
+}
+
+// Locals retrieves the accounts currently considered local by the pool.
+func (pool *TxPool) Locals() []common.Address {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ return pool.locals.flatten()
+}
+
+// local retrieves all currently known local transactions, grouped by origin
+// account and sorted by nonce. The returned transaction set is a copy and can be
+// freely modified by calling code.
+func (pool *TxPool) local() map[common.Address]types.Transactions {
+ txs := make(map[common.Address]types.Transactions)
+ for addr := range pool.locals.accounts {
+ if pending := pool.pending[addr]; pending != nil {
+ txs[addr] = append(txs[addr], pending.Flatten()...)
+ }
+ if queued := pool.queue[addr]; queued != nil {
+ txs[addr] = append(txs[addr], queued.Flatten()...)
+ }
+ }
+ return txs
+}
+
+// validateTx checks whether a transaction is valid according to the consensus
+// rules and adheres to some heuristic limits of the local node (price and size).
+func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
+ // Heuristic limit, reject transactions over 32KB to prevent DOS attacks
+ if tx.Size() > 32*1024 {
+ return ErrOversizedData
+ }
+ // Transactions can't be negative. This may never happen using RLP decoded
+ // transactions but may occur if you create a transaction using the RPC.
+ if tx.Value().Sign() < 0 {
+ return ErrNegativeValue
+ }
+ // Ensure the transaction doesn't exceed the current block limit gas.
+ if pool.currentMaxGas < tx.Gas() {
+ return ErrGasLimit
+ }
+ // Make sure the transaction is signed properly
+ from, err := types.Sender(pool.signer, tx)
+ if err != nil {
+ return ErrInvalidSender
+ }
+ // Drop non-local transactions under our own minimal accepted gas price
+ local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
+ if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
+ return ErrUnderpriced
+ }
+ // Ensure the transaction adheres to nonce ordering
+ if pool.currentState.GetNonce(from) > tx.Nonce() {
+ return ErrNonceTooLow
+ }
+ // Transactor should have enough funds to cover the costs
+ // cost == V + GP * GL
+ if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
+ return ErrInsufficientFunds
+ }
+ // Ensure the transaction has more gas than the basic tx fee.
+ intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, true, pool.istanbul)
+ if err != nil {
+ return err
+ }
+ if tx.Gas() < intrGas {
+ return ErrIntrinsicGas
+ }
+ return nil
+}
+
+// add validates a transaction and inserts it into the non-executable queue for later
+// pending promotion and execution. If the transaction is a replacement for an already
+// pending or queued one, it overwrites the previous transaction if its price is higher.
+//
+// If a newly added transaction is marked as local, its sending account will be
+// whitelisted, preventing any associated transaction from being dropped out of the pool
+// due to pricing constraints.
+func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
+ // If the transaction is already known, discard it
+ hash := tx.Hash()
+ if pool.all.Get(hash) != nil {
+ log.Trace("Discarding already known transaction", "hash", hash)
+ return false, fmt.Errorf("known transaction: %x", hash)
+ }
+
+ // If the transaction fails basic validation, discard it
+ if err := pool.validateTx(tx, local); err != nil {
+ log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
+ invalidTxMeter.Mark(1)
+ return false, err
+ }
+
+ // If the transaction pool is full, discard underpriced transactions
+ if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
+ // If the new transaction is underpriced, don't accept it
+ if !local && pool.priced.Underpriced(tx, pool.locals) {
+ log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
+ underpricedTxMeter.Mark(1)
+ return false, ErrUnderpriced
+ }
+ // New transaction is better than our worse ones, make room for it
+ drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
+ for _, tx := range drop {
+ log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
+ underpricedTxMeter.Mark(1)
+ pool.removeTx(tx.Hash(), false)
+ }
+ }
+
+ // Try to replace an existing transaction in the pending pool
+ from, _ := types.Sender(pool.signer, tx) // already validated
+ if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
+ // Nonce already pending, check if required price bump is met
+ inserted, old := list.Add(tx, pool.config.PriceBump)
+ if !inserted {
+ pendingDiscardMeter.Mark(1)
+ return false, ErrReplaceUnderpriced
+ }
+ // New transaction is better, replace old one
+ if old != nil {
+ pool.all.Remove(old.Hash())
+ pool.priced.Removed(1)
+ pendingReplaceMeter.Mark(1)
+ }
+ pool.all.Add(tx)
+ pool.priced.Put(tx)
+ pool.journalTx(from, tx)
+ pool.queueTxEvent(tx)
+ log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
+ return old != nil, nil
+ }
+
+ // New transaction isn't replacing a pending one, push into queue
+ replaced, err = pool.enqueueTx(hash, tx)
+ if err != nil {
+ return false, err
+ }
+
+ // Mark local addresses and journal local transactions
+ if local {
+ if !pool.locals.contains(from) {
+ log.Info("Setting new local account", "address", from)
+ pool.locals.add(from)
+ }
+ }
+ if local || pool.locals.contains(from) {
+ localCounter.Inc(1)
+ }
+ pool.journalTx(from, tx)
+
+ log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
+ return replaced, nil
+}
+
+// enqueueTx inserts a new transaction into the non-executable transaction queue.
+//
+// Note, this method assumes the pool lock is held!
+func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
+ // Try to insert the transaction into the future queue
+ from, _ := types.Sender(pool.signer, tx) // already validated
+ if pool.queue[from] == nil {
+ pool.queue[from] = newTxList(false)
+ }
+ inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
+ if !inserted {
+ // An older transaction was better, discard this
+ queuedDiscardMeter.Mark(1)
+ return false, ErrReplaceUnderpriced
+ }
+ // Discard any previous transaction and mark this
+ if old != nil {
+ pool.all.Remove(old.Hash())
+ pool.priced.Removed(1)
+ queuedReplaceMeter.Mark(1)
+ } else {
+ // Nothing was replaced, bump the queued counter
+ queuedCounter.Inc(1)
+ }
+ if pool.all.Get(hash) == nil {
+ pool.all.Add(tx)
+ pool.priced.Put(tx)
+ }
+ return old != nil, nil
+}
+
+// journalTx adds the specified transaction to the local disk journal if it is
+// deemed to have been sent from a local account.
+func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
+ // Only journal if it's enabled and the transaction is local
+ if pool.journal == nil || !pool.locals.contains(from) {
+ return
+ }
+ if err := pool.journal.insert(tx); err != nil {
+ log.Warn("Failed to journal local transaction", "err", err)
+ }
+}
+
+// promoteTx adds a transaction to the pending (processable) list of transactions
+// and returns whether it was inserted or an older was better.
+//
+// Note, this method assumes the pool lock is held!
+func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool {
+ // Try to insert the transaction into the pending queue
+ if pool.pending[addr] == nil {
+ pool.pending[addr] = newTxList(true)
+ }
+ list := pool.pending[addr]
+
+ inserted, old := list.Add(tx, pool.config.PriceBump)
+ if !inserted {
+ // An older transaction was better, discard this
+ pool.all.Remove(hash)
+ pool.priced.Removed(1)
+
+ pendingDiscardMeter.Mark(1)
+ return false
+ }
+ // Otherwise discard any previous transaction and mark this
+ if old != nil {
+ pool.all.Remove(old.Hash())
+ pool.priced.Removed(1)
+
+ pendingReplaceMeter.Mark(1)
+ } else {
+ // Nothing was replaced, bump the pending counter
+ pendingCounter.Inc(1)
+ }
+ // Failsafe to work around direct pending inserts (tests)
+ if pool.all.Get(hash) == nil {
+ pool.all.Add(tx)
+ pool.priced.Put(tx)
+ }
+ // Set the potentially new pending nonce and notify any subsystems of the new tx
+ pool.beats[addr] = time.Now()
+ pool.pendingNonces.set(addr, tx.Nonce()+1)
+
+ return true
+}
+
+// AddLocals enqueues a batch of transactions into the pool if they are valid, marking the
+// senders as a local ones, ensuring they go around the local pricing constraints.
+//
+// This method is used to add transactions from the RPC API and performs synchronous pool
+// reorganization and event propagation.
+func (pool *TxPool) AddLocals(txs []*types.Transaction) []error {
+ return pool.addTxs(txs, !pool.config.NoLocals, true)
+}
+
+// AddLocal enqueues a single local transaction into the pool if it is valid. This is
+// a convenience wrapper aroundd AddLocals.
+func (pool *TxPool) AddLocal(tx *types.Transaction) error {
+ errs := pool.AddLocals([]*types.Transaction{tx})
+ return errs[0]
+}
+
+// AddRemotes enqueues a batch of transactions into the pool if they are valid. If the
+// senders are not among the locally tracked ones, full pricing constraints will apply.
+//
+// This method is used to add transactions from the p2p network and does not wait for pool
+// reorganization and internal event propagation.
+func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
+ return pool.addTxs(txs, false, false)
+}
+
+// This is like AddRemotes, but waits for pool reorganization. Tests use this method.
+func (pool *TxPool) AddRemotesSync(txs []*types.Transaction) []error {
+ return pool.addTxs(txs, false, true)
+}
+
+// This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method.
+func (pool *TxPool) addRemoteSync(tx *types.Transaction) error {
+ errs := pool.AddRemotesSync([]*types.Transaction{tx})
+ return errs[0]
+}
+
+// AddRemote enqueues a single transaction into the pool if it is valid. This is a convenience
+// wrapper around AddRemotes.
+//
+// Deprecated: use AddRemotes
+func (pool *TxPool) AddRemote(tx *types.Transaction) error {
+ errs := pool.AddRemotes([]*types.Transaction{tx})
+ return errs[0]
+}
+
+// addTxs attempts to queue a batch of transactions if they are valid.
+func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
+ // Cache senders in transactions before obtaining lock (pool.signer is immutable)
+ for _, tx := range txs {
+ types.Sender(pool.signer, tx)
+ }
+
+ pool.mu.Lock()
+ errs, dirtyAddrs := pool.addTxsLocked(txs, local)
+ pool.mu.Unlock()
+
+ done := pool.requestPromoteExecutables(dirtyAddrs)
+ if sync {
+ <-done
+ }
+ return errs
+}
+
+// addTxsLocked attempts to queue a batch of transactions if they are valid.
+// The transaction pool lock must be held.
+func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
+ dirty := newAccountSet(pool.signer)
+ errs := make([]error, len(txs))
+ for i, tx := range txs {
+ replaced, err := pool.add(tx, local)
+ errs[i] = err
+ if err == nil && !replaced {
+ dirty.addTx(tx)
+ }
+ }
+ validMeter.Mark(int64(len(dirty.accounts)))
+ return errs, dirty
+}
+
+// Status returns the status (unknown/pending/queued) of a batch of transactions
+// identified by their hashes.
+func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ status := make([]TxStatus, len(hashes))
+ for i, hash := range hashes {
+ if tx := pool.all.Get(hash); tx != nil {
+ from, _ := types.Sender(pool.signer, tx) // already validated
+ if pool.pending[from] != nil && pool.pending[from].txs.items[tx.Nonce()] != nil {
+ status[i] = TxStatusPending
+ } else {
+ status[i] = TxStatusQueued
+ }
+ }
+ }
+ return status
+}
+
+// Get returns a transaction if it is contained in the pool and nil otherwise.
+func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
+ return pool.all.Get(hash)
+}
+
+// removeTx removes a single transaction from the queue, moving all subsequent
+// transactions back to the future queue.
+func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
+ // Fetch the transaction we wish to delete
+ tx := pool.all.Get(hash)
+ if tx == nil {
+ return
+ }
+ addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
+
+ // Remove it from the list of known transactions
+ pool.all.Remove(hash)
+ if outofbound {
+ pool.priced.Removed(1)
+ }
+ if pool.locals.contains(addr) {
+ localCounter.Dec(1)
+ }
+ // Remove the transaction from the pending lists and reset the account nonce
+ if pending := pool.pending[addr]; pending != nil {
+ if removed, invalids := pending.Remove(tx); removed {
+ // If no more pending transactions are left, remove the list
+ if pending.Empty() {
+ delete(pool.pending, addr)
+ delete(pool.beats, addr)
+ }
+ // Postpone any invalidated transactions
+ for _, tx := range invalids {
+ pool.enqueueTx(tx.Hash(), tx)
+ }
+ // Update the account nonce if needed
+ pool.pendingNonces.setIfLower(addr, tx.Nonce())
+ // Reduce the pending counter
+ pendingCounter.Dec(int64(1 + len(invalids)))
+ return
+ }
+ }
+ // Transaction is in the future queue
+ if future := pool.queue[addr]; future != nil {
+ if removed, _ := future.Remove(tx); removed {
+ // Reduce the queued counter
+ queuedCounter.Dec(1)
+ }
+ if future.Empty() {
+ delete(pool.queue, addr)
+ }
+ }
+}
+
+// requestPromoteExecutables requests a pool reset to the new head block.
+// The returned channel is closed when the reset has occurred.
+func (pool *TxPool) requestReset(oldHead *types.Header, newHead *types.Header) chan struct{} {
+ select {
+ case pool.reqResetCh <- &txpoolResetRequest{oldHead, newHead}:
+ return <-pool.reorgDoneCh
+ case <-pool.reorgShutdownCh:
+ return pool.reorgShutdownCh
+ }
+}
+
+// requestPromoteExecutables requests transaction promotion checks for the given addresses.
+// The returned channel is closed when the promotion checks have occurred.
+func (pool *TxPool) requestPromoteExecutables(set *accountSet) chan struct{} {
+ select {
+ case pool.reqPromoteCh <- set:
+ return <-pool.reorgDoneCh
+ case <-pool.reorgShutdownCh:
+ return pool.reorgShutdownCh
+ }
+}
+
+// queueTxEvent enqueues a transaction event to be sent in the next reorg run.
+func (pool *TxPool) queueTxEvent(tx *types.Transaction) {
+ select {
+ case pool.queueTxEventCh <- tx:
+ case <-pool.reorgShutdownCh:
+ }
+}
+
+// scheduleReorgLoop schedules runs of reset and promoteExecutables. Code above should not
+// call those methods directly, but request them being run using requestReset and
+// requestPromoteExecutables instead.
+func (pool *TxPool) scheduleReorgLoop() {
+ defer pool.wg.Done()
+
+ var (
+ curDone chan struct{} // non-nil while runReorg is active
+ nextDone = make(chan struct{})
+ launchNextRun bool
+ reset *txpoolResetRequest
+ dirtyAccounts *accountSet
+ queuedEvents = make(map[common.Address]*txSortedMap)
+ )
+ for {
+ // Launch next background reorg if needed
+ if curDone == nil && launchNextRun {
+ // Run the background reorg and announcements
+ go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)
+
+ // Prepare everything for the next round of reorg
+ curDone, nextDone = nextDone, make(chan struct{})
+ launchNextRun = false
+
+ reset, dirtyAccounts = nil, nil
+ queuedEvents = make(map[common.Address]*txSortedMap)
+ }
+
+ select {
+ case req := <-pool.reqResetCh:
+ // Reset request: update head if request is already pending.
+ if reset == nil {
+ reset = req
+ } else {
+ reset.newHead = req.newHead
+ }
+ launchNextRun = true
+ pool.reorgDoneCh <- nextDone
+
+ case req := <-pool.reqPromoteCh:
+ // Promote request: update address set if request is already pending.
+ if dirtyAccounts == nil {
+ dirtyAccounts = req
+ } else {
+ dirtyAccounts.merge(req)
+ }
+ launchNextRun = true
+ pool.reorgDoneCh <- nextDone
+
+ case tx := <-pool.queueTxEventCh:
+ // Queue up the event, but don't schedule a reorg. It's up to the caller to
+ // request one later if they want the events sent.
+ addr, _ := types.Sender(pool.signer, tx)
+ if _, ok := queuedEvents[addr]; !ok {
+ queuedEvents[addr] = newTxSortedMap()
+ }
+ queuedEvents[addr].Put(tx)
+
+ case <-curDone:
+ curDone = nil
+
+ case <-pool.reorgShutdownCh:
+ // Wait for current run to finish.
+ if curDone != nil {
+ <-curDone
+ }
+ close(nextDone)
+ return
+ }
+ }
+}
+
+// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
+func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*txSortedMap) {
+ defer close(done)
+
+ var promoteAddrs []common.Address
+ if dirtyAccounts != nil {
+ promoteAddrs = dirtyAccounts.flatten()
+ }
+ pool.mu.Lock()
+ if reset != nil {
+ // Reset from the old head to the new, rescheduling any reorged transactions
+ pool.reset(reset.oldHead, reset.newHead)
+
+ // Nonces were reset, discard any events that became stale
+ for addr := range events {
+ events[addr].Forward(pool.pendingNonces.get(addr))
+ if events[addr].Len() == 0 {
+ delete(events, addr)
+ }
+ }
+ // Reset needs promote for all addresses
+ promoteAddrs = promoteAddrs[:0]
+ for addr := range pool.queue {
+ promoteAddrs = append(promoteAddrs, addr)
+ }
+ }
+ // Check for pending transactions for every account that sent new ones
+ promoted := pool.promoteExecutables(promoteAddrs)
+ for _, tx := range promoted {
+ addr, _ := types.Sender(pool.signer, tx)
+ if _, ok := events[addr]; !ok {
+ events[addr] = newTxSortedMap()
+ }
+ events[addr].Put(tx)
+ }
+ // If a new block appeared, validate the pool of pending transactions. This will
+ // remove any transaction that has been included in the block or was invalidated
+ // because of another transaction (e.g. higher gas price).
+ if reset != nil {
+ pool.demoteUnexecutables()
+ }
+ // Ensure pool.queue and pool.pending sizes stay within the configured limits.
+ pool.truncatePending()
+ pool.truncateQueue()
+
+ // Update all accounts to the latest known pending nonce
+ for addr, list := range pool.pending {
+ txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
+ pool.pendingNonces.set(addr, txs[len(txs)-1].Nonce()+1)
+ }
+ pool.mu.Unlock()
+
+ // Notify subsystems for newly added transactions
+ if len(events) > 0 {
+ var txs []*types.Transaction
+ for _, set := range events {
+ txs = append(txs, set.Flatten()...)
+ }
+ pool.txFeed.Send(NewTxsEvent{txs})
+ }
+}
+
+// reset retrieves the current state of the blockchain and ensures the content
+// of the transaction pool is valid with regard to the chain state.
+func (pool *TxPool) reset(oldHead, newHead *types.Header) {
+ // If we're reorging an old state, reinject all dropped transactions
+ var reinject types.Transactions
+
+ if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
+ // If the reorg is too deep, avoid doing it (will happen during fast sync)
+ oldNum := oldHead.Number.Uint64()
+ newNum := newHead.Number.Uint64()
+
+ if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
+ log.Debug("Skipping deep transaction reorg", "depth", depth)
+ } else {
+ // Reorg seems shallow enough to pull in all transactions into memory
+ var discarded, included types.Transactions
+ var (
+ rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
+ add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
+ )
+ if rem == nil {
+ // This can happen if a setHead is performed, where we simply discard the old
+ // head from the chain.
+ // If that is the case, we don't have the lost transactions any more, and
+ // there's nothing to add
+ if newNum < oldNum {
+ // If the reorg ended up on a lower number, it's indicative of setHead being the cause
+ log.Debug("Skipping transaction reset caused by setHead",
+ "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
+ } else {
+ // If we reorged to a same or higher number, then it's not a case of setHead
+ log.Warn("Transaction pool reset with missing oldhead",
+ "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
+ }
+ return
+ }
+ for rem.NumberU64() > add.NumberU64() {
+ discarded = append(discarded, rem.Transactions()...)
+ if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
+ log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
+ return
+ }
+ }
+ for add.NumberU64() > rem.NumberU64() {
+ included = append(included, add.Transactions()...)
+ if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
+ log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
+ return
+ }
+ }
+ for rem.Hash() != add.Hash() {
+ discarded = append(discarded, rem.Transactions()...)
+ if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
+ log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
+ return
+ }
+ included = append(included, add.Transactions()...)
+ if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
+ log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
+ return
+ }
+ }
+ reinject = types.TxDifference(discarded, included)
+ }
+ }
+ // Initialize the internal state to the current head
+ if newHead == nil {
+ newHead = pool.chain.CurrentBlock().Header() // Special case during testing
+ }
+ statedb, err := pool.chain.StateAt(newHead.Root)
+ if err != nil {
+ log.Error("Failed to reset txpool state", "err", err)
+ return
+ }
+ pool.currentState = statedb
+ pool.pendingNonces = newTxNoncer(statedb)
+ pool.currentMaxGas = newHead.GasLimit
+
+ // Inject any transactions discarded due to reorgs
+ log.Debug("Reinjecting stale transactions", "count", len(reinject))
+ senderCacher.recover(pool.signer, reinject)
+ pool.addTxsLocked(reinject, false)
+
+ // Update all fork indicator by next pending block number.
+ next := new(big.Int).Add(newHead.Number, big.NewInt(1))
+ pool.istanbul = pool.chainconfig.IsIstanbul(next)
+}
+
+// promoteExecutables moves transactions that have become processable from the
+// future queue to the set of pending transactions. During this process, all
+// invalidated transactions (low nonce, low balance) are deleted.
+func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
+ // Track the promoted transactions to broadcast them at once
+ var promoted []*types.Transaction
+
+ // Iterate over all accounts and promote any executable transactions
+ for _, addr := range accounts {
+ list := pool.queue[addr]
+ if list == nil {
+ continue // Just in case someone calls with a non existing account
+ }
+ // Drop all transactions that are deemed too old (low nonce)
+ forwards := list.Forward(pool.currentState.GetNonce(addr))
+ for _, tx := range forwards {
+ hash := tx.Hash()
+ pool.all.Remove(hash)
+ log.Trace("Removed old queued transaction", "hash", hash)
+ }
+ // Drop all transactions that are too costly (low balance or out of gas)
+ drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
+ for _, tx := range drops {
+ hash := tx.Hash()
+ pool.all.Remove(hash)
+ log.Trace("Removed unpayable queued transaction", "hash", hash)
+ }
+ queuedNofundsMeter.Mark(int64(len(drops)))
+
+ // Gather all executable transactions and promote them
+ readies := list.Ready(pool.pendingNonces.get(addr))
+ for _, tx := range readies {
+ hash := tx.Hash()
+ if pool.promoteTx(addr, hash, tx) {
+ log.Trace("Promoting queued transaction", "hash", hash)
+ promoted = append(promoted, tx)
+ }
+ }
+ queuedCounter.Dec(int64(len(readies)))
+
+ // Drop all transactions over the allowed limit
+ var caps types.Transactions
+ if !pool.locals.contains(addr) {
+ caps = list.Cap(int(pool.config.AccountQueue))
+ for _, tx := range caps {
+ hash := tx.Hash()
+ pool.all.Remove(hash)
+ log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
+ }
+ queuedRateLimitMeter.Mark(int64(len(caps)))
+ }
+ // Mark all the items dropped as removed
+ pool.priced.Removed(len(forwards) + len(drops) + len(caps))
+ queuedCounter.Dec(int64(len(forwards) + len(drops) + len(caps)))
+ if pool.locals.contains(addr) {
+ localCounter.Dec(int64(len(forwards) + len(drops) + len(caps)))
+ }
+ // Delete the entire queue entry if it became empty.
+ if list.Empty() {
+ delete(pool.queue, addr)
+ }
+ }
+ return promoted
+}
+
+// truncatePending removes transactions from the pending queue if the pool is above the
+// pending limit. The algorithm tries to reduce transaction counts by an approximately
+// equal number for all for accounts with many pending transactions.
+func (pool *TxPool) truncatePending() {
+ pending := uint64(0)
+ for _, list := range pool.pending {
+ pending += uint64(list.Len())
+ }
+ if pending <= pool.config.GlobalSlots {
+ return
+ }
+
+ pendingBeforeCap := pending
+ // Assemble a spam order to penalize large transactors first
+ spammers := prque.New(nil)
+ for addr, list := range pool.pending {
+ // Only evict transactions from high rollers
+ if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
+ spammers.Push(addr, int64(list.Len()))
+ }
+ }
+ // Gradually drop transactions from offenders
+ offenders := []common.Address{}
+ for pending > pool.config.GlobalSlots && !spammers.Empty() {
+ // Retrieve the next offender if not local address
+ offender, _ := spammers.Pop()
+ offenders = append(offenders, offender.(common.Address))
+
+ // Equalize balances until all the same or below threshold
+ if len(offenders) > 1 {
+ // Calculate the equalization threshold for all current offenders
+ threshold := pool.pending[offender.(common.Address)].Len()
+
+ // Iteratively reduce all offenders until below limit or threshold reached
+ for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
+ for i := 0; i < len(offenders)-1; i++ {
+ list := pool.pending[offenders[i]]
+
+ caps := list.Cap(list.Len() - 1)
+ for _, tx := range caps {
+ // Drop the transaction from the global pools too
+ hash := tx.Hash()
+ pool.all.Remove(hash)
+
+ // Update the account nonce to the dropped transaction
+ pool.pendingNonces.setIfLower(offenders[i], tx.Nonce())
+ log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
+ }
+ pool.priced.Removed(len(caps))
+ pendingCounter.Dec(int64(len(caps)))
+ if pool.locals.contains(offenders[i]) {
+ localCounter.Dec(int64(len(caps)))
+ }
+ pending--
+ }
+ }
+ }
+ }
+
+ // If still above threshold, reduce to limit or min allowance
+ if pending > pool.config.GlobalSlots && len(offenders) > 0 {
+ for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
+ for _, addr := range offenders {
+ list := pool.pending[addr]
+
+ caps := list.Cap(list.Len() - 1)
+ for _, tx := range caps {
+ // Drop the transaction from the global pools too
+ hash := tx.Hash()
+ pool.all.Remove(hash)
+
+ // Update the account nonce to the dropped transaction
+ pool.pendingNonces.setIfLower(addr, tx.Nonce())
+ log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
+ }
+ pool.priced.Removed(len(caps))
+ pendingCounter.Dec(int64(len(caps)))
+ if pool.locals.contains(addr) {
+ localCounter.Dec(int64(len(caps)))
+ }
+ pending--
+ }
+ }
+ }
+ pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending))
+}
+
+// truncateQueue drops the oldes transactions in the queue if the pool is above the global queue limit.
+func (pool *TxPool) truncateQueue() {
+ queued := uint64(0)
+ for _, list := range pool.queue {
+ queued += uint64(list.Len())
+ }
+ if queued <= pool.config.GlobalQueue {
+ return
+ }
+
+ // Sort all accounts with queued transactions by heartbeat
+ addresses := make(addressesByHeartbeat, 0, len(pool.queue))
+ for addr := range pool.queue {
+ if !pool.locals.contains(addr) { // don't drop locals
+ addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
+ }
+ }
+ sort.Sort(addresses)
+
+ // Drop transactions until the total is below the limit or only locals remain
+ for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
+ addr := addresses[len(addresses)-1]
+ list := pool.queue[addr.address]
+
+ addresses = addresses[:len(addresses)-1]
+
+ // Drop all transactions if they are less than the overflow
+ if size := uint64(list.Len()); size <= drop {
+ for _, tx := range list.Flatten() {
+ pool.removeTx(tx.Hash(), true)
+ }
+ drop -= size
+ queuedRateLimitMeter.Mark(int64(size))
+ continue
+ }
+ // Otherwise drop only last few transactions
+ txs := list.Flatten()
+ for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
+ pool.removeTx(txs[i].Hash(), true)
+ drop--
+ queuedRateLimitMeter.Mark(1)
+ }
+ }
+}
+
+// demoteUnexecutables removes invalid and processed transactions from the pools
+// executable/pending queue and any subsequent transactions that become unexecutable
+// are moved back into the future queue.
+func (pool *TxPool) demoteUnexecutables() {
+ // Iterate over all accounts and demote any non-executable transactions
+ for addr, list := range pool.pending {
+ nonce := pool.currentState.GetNonce(addr)
+
+ // Drop all transactions that are deemed too old (low nonce)
+ olds := list.Forward(nonce)
+ for _, tx := range olds {
+ hash := tx.Hash()
+ pool.all.Remove(hash)
+ log.Trace("Removed old pending transaction", "hash", hash)
+ }
+ // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
+ drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
+ for _, tx := range drops {
+ hash := tx.Hash()
+ log.Trace("Removed unpayable pending transaction", "hash", hash)
+ pool.all.Remove(hash)
+ }
+ pool.priced.Removed(len(olds) + len(drops))
+ pendingNofundsMeter.Mark(int64(len(drops)))
+
+ for _, tx := range invalids {
+ hash := tx.Hash()
+ log.Trace("Demoting pending transaction", "hash", hash)
+ pool.enqueueTx(hash, tx)
+ }
+ pendingCounter.Dec(int64(len(olds) + len(drops) + len(invalids)))
+ if pool.locals.contains(addr) {
+ localCounter.Dec(int64(len(olds) + len(drops) + len(invalids)))
+ }
+ // If there's a gap in front, alert (should never happen) and postpone all transactions
+ if list.Len() > 0 && list.txs.Get(nonce) == nil {
+ gapped := list.Cap(0)
+ for _, tx := range gapped {
+ hash := tx.Hash()
+ log.Error("Demoting invalidated transaction", "hash", hash)
+ pool.enqueueTx(hash, tx)
+ }
+ pendingCounter.Dec(int64(len(gapped)))
+ }
+ // Delete the entire queue entry if it became empty.
+ if list.Empty() {
+ delete(pool.pending, addr)
+ delete(pool.beats, addr)
+ }
+ }
+}
+
+// addressByHeartbeat is an account address tagged with its last activity timestamp.
+type addressByHeartbeat struct {
+ address common.Address
+ heartbeat time.Time
+}
+
+type addressesByHeartbeat []addressByHeartbeat
+
+func (a addressesByHeartbeat) Len() int { return len(a) }
+func (a addressesByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) }
+func (a addressesByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+
+// accountSet is simply a set of addresses to check for existence, and a signer
+// capable of deriving addresses from transactions.
+type accountSet struct {
+ accounts map[common.Address]struct{}
+ signer types.Signer
+ cache *[]common.Address
+}
+
+// newAccountSet creates a new address set with an associated signer for sender
+// derivations.
+func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet {
+ as := &accountSet{
+ accounts: make(map[common.Address]struct{}),
+ signer: signer,
+ }
+ for _, addr := range addrs {
+ as.add(addr)
+ }
+ return as
+}
+
+// contains checks if a given address is contained within the set.
+func (as *accountSet) contains(addr common.Address) bool {
+ _, exist := as.accounts[addr]
+ return exist
+}
+
+// containsTx checks if the sender of a given tx is within the set. If the sender
+// cannot be derived, this method returns false.
+func (as *accountSet) containsTx(tx *types.Transaction) bool {
+ if addr, err := types.Sender(as.signer, tx); err == nil {
+ return as.contains(addr)
+ }
+ return false
+}
+
+// add inserts a new address into the set to track.
+func (as *accountSet) add(addr common.Address) {
+ as.accounts[addr] = struct{}{}
+ as.cache = nil
+}
+
+// addTx adds the sender of tx into the set.
+func (as *accountSet) addTx(tx *types.Transaction) {
+ if addr, err := types.Sender(as.signer, tx); err == nil {
+ as.add(addr)
+ }
+}
+
+// flatten returns the list of addresses within this set, also caching it for later
+// reuse. The returned slice should not be changed!
+func (as *accountSet) flatten() []common.Address {
+ if as.cache == nil {
+ accounts := make([]common.Address, 0, len(as.accounts))
+ for account := range as.accounts {
+ accounts = append(accounts, account)
+ }
+ as.cache = &accounts
+ }
+ return *as.cache
+}
+
+// merge adds all addresses from the 'other' set into 'as'.
+func (as *accountSet) merge(other *accountSet) {
+ for addr := range other.accounts {
+ as.accounts[addr] = struct{}{}
+ }
+ as.cache = nil
+}
+
+// txLookup is used internally by TxPool to track transactions while allowing lookup without
+// mutex contention.
+//
+// Note, although this type is properly protected against concurrent access, it
+// is **not** a type that should ever be mutated or even exposed outside of the
+// transaction pool, since its internal state is tightly coupled with the pools
+// internal mechanisms. The sole purpose of the type is to permit out-of-bound
+// peeking into the pool in TxPool.Get without having to acquire the widely scoped
+// TxPool.mu mutex.
+type txLookup struct {
+ all map[common.Hash]*types.Transaction
+ lock sync.RWMutex
+}
+
+// newTxLookup returns a new txLookup structure.
+func newTxLookup() *txLookup {
+ return &txLookup{
+ all: make(map[common.Hash]*types.Transaction),
+ }
+}
+
+// Range calls f on each key and value present in the map.
+func (t *txLookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) {
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ for key, value := range t.all {
+ if !f(key, value) {
+ break
+ }
+ }
+}
+
+// Get returns a transaction if it exists in the lookup, or nil if not found.
+func (t *txLookup) Get(hash common.Hash) *types.Transaction {
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ return t.all[hash]
+}
+
+// Count returns the current number of items in the lookup.
+func (t *txLookup) Count() int {
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ return len(t.all)
+}
+
+// Add adds a transaction to the lookup.
+func (t *txLookup) Add(tx *types.Transaction) {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ t.all[tx.Hash()] = tx
+}
+
+// Remove removes a transaction from the lookup.
+func (t *txLookup) Remove(hash common.Hash) {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ delete(t.all, hash)
+}
diff --git a/core/types.go b/core/types.go
new file mode 100644
index 0000000..fc67e71
--- /dev/null
+++ b/core/types.go
@@ -0,0 +1,51 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "github.com/ava-labs/go-ethereum/core/state"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/core/vm"
+)
+
+// Validator is an interface which defines the standard for block validation. It
+// is only responsible for validating block contents, as the header validation is
+// done by the specific consensus engines.
+type Validator interface {
+ // ValidateBody validates the given block's content.
+ ValidateBody(block *types.Block) error
+
+ // ValidateState validates the given statedb and optionally the receipts and
+ // gas used.
+ ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error
+}
+
+// Prefetcher is an interface for pre-caching transaction signatures and state.
+type Prefetcher interface {
+ // Prefetch processes the state changes according to the Ethereum rules by running
+ // the transaction messages using the statedb, but any changes are discarded. The
+ // only goal is to pre-cache transaction signatures and state trie nodes.
+ Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32)
+}
+
+// Processor is an interface for processing blocks using a given initial state.
+type Processor interface {
+ // Process processes the state changes according to the Ethereum rules by running
+ // the transaction messages using the statedb and applying any rewards to both
+ // the processor (coinbase) and any included uncles.
+ Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error)
+}