diff options
author | Determinant <[email protected]> | 2020-09-15 23:55:34 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2020-09-15 23:55:34 -0400 |
commit | 78745551c077bf54151202138c2629f288769561 (patch) | |
tree | 2b628e99fd110617089778fa91235ecd2888f4ef /core/blockchain.go | |
parent | 7d1388c743b4ec8f4a86bea95bfada785dee83f7 (diff) |
WIP: geth-tavum
Diffstat (limited to 'core/blockchain.go')
-rw-r--r-- | core/blockchain.go | 906 |
1 files changed, 586 insertions, 320 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 6e316c7..342790e 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -23,6 +23,7 @@ import ( "io" "math/big" mrand "math/rand" + "sort" "sync" "sync/atomic" "time" @@ -33,16 +34,17 @@ import ( "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/core/vm" "github.com/ava-labs/coreth/params" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/common/mclock" - "github.com/ava-labs/go-ethereum/common/prque" - "github.com/ava-labs/go-ethereum/ethdb" - "github.com/ava-labs/go-ethereum/event" - "github.com/ava-labs/go-ethereum/log" - "github.com/ava-labs/go-ethereum/metrics" - "github.com/ava-labs/go-ethereum/rlp" - "github.com/ava-labs/go-ethereum/trie" - "github.com/hashicorp/golang-lru" + "github.com/ava-labs/coreth/state/snapshot" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/common/prque" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" + lru "github.com/hashicorp/golang-lru" ) var ( @@ -60,12 +62,19 @@ var ( storageUpdateTimer = metrics.NewRegisteredTimer("chain/storage/updates", nil) storageCommitTimer = metrics.NewRegisteredTimer("chain/storage/commits", nil) + snapshotAccountReadTimer = metrics.NewRegisteredTimer("chain/snapshot/account/reads", nil) + snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storage/reads", nil) + snapshotCommitTimer = metrics.NewRegisteredTimer("chain/snapshot/commits", nil) + blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil) blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil) blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil) blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil) - blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil) - blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil) + + blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil) + blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil) + blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil) + blockReorgInvalidatedTx = metrics.NewRegisteredMeter("chain/reorg/invalidTx", nil) blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil) blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) @@ -103,17 +112,35 @@ const ( // - Version 7 // The following incompatible database changes were added: // * Use freezer as the ancient database to maintain all ancient data - BlockChainVersion uint64 = 7 + // - Version 8 + // The following incompatible database changes were added: + // * New scheme for contract code in order to separate the codes and trie nodes + BlockChainVersion uint64 = 8 ) // CacheConfig contains the configuration values for the trie caching/pruning // that's resident in a blockchain. type CacheConfig struct { TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory + TrieCleanJournal string // Disk journal for saving clean cache entries. + TrieCleanRejournal time.Duration // Time interval to dump clean cache to disk periodically TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node) TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk + SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory + + SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it +} + +// defaultCacheConfig are the default caching values if none are specified by the +// user (also used during testing). +var defaultCacheConfig = &CacheConfig{ + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieTimeLimit: 5 * time.Minute, + SnapshotLimit: 256, + SnapshotWait: true, } // BlockChain represents the canonical chain given a database with a genesis @@ -135,9 +162,17 @@ type BlockChain struct { cacheConfig *CacheConfig // Cache configuration for pruning db ethdb.Database // Low level persistent database to store final content in + snaps *snapshot.Tree // Snapshot tree for fast trie leaf access triegc *prque.Prque // Priority queue mapping block numbers to tries to gc gcproc time.Duration // Accumulates canonical block processing for trie dumping + // txLookupLimit is the maximum number of blocks from head whose tx indices + // are reserved: + // * 0: means no limit and regenerate any missing indexes + // * N: means N block limit [HEAD-N+1, HEAD] and delete extra indexes + // * nil: disable tx reindexer/deleter, but still index new blocks + txLookupLimit uint64 + hc *HeaderChain rmLogsFeed event.Feed chainFeed event.Feed @@ -161,11 +196,10 @@ type BlockChain struct { txLookupCache *lru.Cache // Cache for the most recent transaction lookup data. futureBlocks *lru.Cache // future blocks are blocks added for later processing - quit chan struct{} // blockchain quit channel - running int32 // running must be called atomically - // procInterrupt must be atomically called - procInterrupt int32 // interrupt signaler for block processing + quit chan struct{} // blockchain quit channel wg sync.WaitGroup // chain processing wait group for shutting down + running int32 // 0 if chain is running, 1 when stopped + procInterrupt int32 // interrupt signaler for block processing engine consensus.Engine validator Validator // Block and state validator interface @@ -182,13 +216,9 @@ type BlockChain struct { // NewBlockChain returns a fully initialised block chain using information // available in the database. It initialises the default Ethereum Validator and // Processor. -func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, manualCanonical bool) (*BlockChain, error) { +func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, txLookupLimit *uint64, manualCanonical bool) (*BlockChain, error) { if cacheConfig == nil { - cacheConfig = &CacheConfig{ - TrieCleanLimit: 256, - TrieDirtyLimit: 256, - TrieTimeLimit: 5 * time.Minute, - } + cacheConfig = defaultCacheConfig } bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) @@ -203,7 +233,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par cacheConfig: cacheConfig, db: db, triegc: prque.New(nil), - stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit), + stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit, cacheConfig.TrieCleanJournal), quit: make(chan struct{}), shouldPreserve: shouldPreserve, bodyCache: bodyCache, @@ -222,7 +252,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bc.processor = NewStateProcessor(chainConfig, bc, engine) var err error - bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt) + bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped) if err != nil { return nil, err } @@ -230,18 +260,35 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par if bc.genesisBlock == nil { return nil, ErrNoGenesis } + + var nilBlock *types.Block + bc.currentBlock.Store(nilBlock) + bc.currentFastBlock.Store(nilBlock) + // Initialize the chain with ancient data if it isn't empty. + var txIndexBlock uint64 + if bc.empty() { rawdb.InitDatabaseFromFreezer(bc.db) + // If ancient database is not empty, reconstruct all missing + // indices in the background. + frozen, _ := bc.db.Ancients() + if frozen > 0 { + txIndexBlock = frozen + } } if err := bc.loadLastState(); err != nil { return nil, err } - // The first thing the node will do is reconstruct the verification data for - // the head block (ethash cache or clique voting snapshot). Might as well do - // it in advance. - bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true) - + // Make sure the state associated with the block is available + head := bc.CurrentBlock() + if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil { + log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash()) + if err := bc.SetHead(head.NumberU64()); err != nil { + return nil, err + } + } + // Ensure that a previous crash in SetHead doesn't leave extra ancients if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 { var ( needRewind bool @@ -251,7 +298,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par // blockchain repair. If the head full block is even lower than the ancient // chain, truncate the ancient store. fullBlock := bc.CurrentBlock() - if fullBlock != nil && fullBlock != bc.genesisBlock && fullBlock.NumberU64() < frozen-1 { + if fullBlock != nil && fullBlock.Hash() != bc.genesisBlock.Hash() && fullBlock.NumberU64() < frozen-1 { needRewind = true low = fullBlock.NumberU64() } @@ -266,15 +313,17 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } } if needRewind { - var hashes []common.Hash - previous := bc.CurrentHeader().Number.Uint64() - for i := low + 1; i <= bc.CurrentHeader().Number.Uint64(); i++ { - hashes = append(hashes, rawdb.ReadCanonicalHash(bc.db, i)) + log.Error("Truncating ancient chain", "from", bc.CurrentHeader().Number.Uint64(), "to", low) + if err := bc.SetHead(low); err != nil { + return nil, err } - bc.Rollback(hashes) - log.Warn("Truncate ancient chain", "from", previous, "to", low) } } + // The first thing the node will do is reconstruct the verification data for + // the head block (ethash cache or clique voting snapshot). Might as well do + // it in advance. + bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true) + // Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain for hash := range BadHashes { if header := bc.GetHeaderByHash(hash); header != nil { @@ -283,20 +332,39 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par // make sure the headerByNumber (if present) is in our current canonical chain if headerByNumber != nil && headerByNumber.Hash() == header.Hash() { log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash) - bc.SetHead(header.Number.Uint64() - 1) + if err := bc.SetHead(header.Number.Uint64() - 1); err != nil { + return nil, err + } log.Error("Chain rewind was successful, resuming normal operation") } } } + // Load any existing snapshot, regenerating it if loading failed + if bc.cacheConfig.SnapshotLimit > 0 { + bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, bc.CurrentBlock().Root(), !bc.cacheConfig.SnapshotWait) + } // Take ownership of this particular state go bc.update() + if txLookupLimit != nil { + bc.txLookupLimit = *txLookupLimit + go bc.maintainTxIndex(txIndexBlock) + } + // If periodic cache journal is required, spin it up. + if bc.cacheConfig.TrieCleanRejournal > 0 { + if bc.cacheConfig.TrieCleanRejournal < time.Minute { + log.Warn("Sanitizing invalid trie cache journal time", "provided", bc.cacheConfig.TrieCleanRejournal, "updated", time.Minute) + bc.cacheConfig.TrieCleanRejournal = time.Minute + } + triedb := bc.stateCache.TrieDB() + bc.wg.Add(1) + go func() { + defer bc.wg.Done() + triedb.SaveCachePeriodically(bc.cacheConfig.TrieCleanJournal, bc.cacheConfig.TrieCleanRejournal, bc.quit) + }() + } return bc, nil } -func (bc *BlockChain) getProcInterrupt() bool { - return atomic.LoadInt32(&bc.procInterrupt) == 1 -} - // GetVMConfig returns the block chain VM config. func (bc *BlockChain) GetVMConfig() *vm.Config { return &bc.vmConfig @@ -333,15 +401,6 @@ func (bc *BlockChain) loadLastState() error { log.Warn("Head block missing, resetting chain", "hash", head) return bc.Reset() } - // Make sure the state associated with the block is available - if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil { - // Dangling block without a state associated, init from scratch - log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash()) - if err := bc.repair(¤tBlock); err != nil { - return err - } - rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash()) - } // Everything seems to be fine, set as the head block bc.currentBlock.Store(currentBlock) headBlockGauge.Update(int64(currentBlock.NumberU64())) @@ -375,37 +434,59 @@ func (bc *BlockChain) loadLastState() error { log.Info("Loaded most recent local header", "number", currentHeader.Number, "hash", currentHeader.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(currentHeader.Time), 0))) log.Info("Loaded most recent local full block", "number", currentBlock.Number(), "hash", currentBlock.Hash(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(currentBlock.Time()), 0))) log.Info("Loaded most recent local fast block", "number", currentFastBlock.Number(), "hash", currentFastBlock.Hash(), "td", fastTd, "age", common.PrettyAge(time.Unix(int64(currentFastBlock.Time()), 0))) - + if pivot := rawdb.ReadLastPivotNumber(bc.db); pivot != nil { + log.Info("Loaded last fast-sync pivot marker", "number", *pivot) + } return nil } -// SetHead rewinds the local chain to a new head. In the case of headers, everything -// above the new head will be deleted and the new one set. In the case of blocks -// though, the head may be further rewound if block bodies are missing (non-archive -// nodes after a fast sync). +// SetHead rewinds the local chain to a new head. Depending on whether the node +// was fast synced or full synced and in which state, the method will try to +// delete minimal data from disk whilst retaining chain consistency. func (bc *BlockChain) SetHead(head uint64) error { - log.Warn("Rewinding blockchain", "target", head) - bc.chainmu.Lock() defer bc.chainmu.Unlock() - updateFn := func(db ethdb.KeyValueWriter, header *types.Header) { - // Rewind the block chain, ensuring we don't end up with a stateless head block - if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() < currentBlock.NumberU64() { + // Retrieve the last pivot block to short circuit rollbacks beyond it and the + // current freezer limit to start nuking id underflown + pivot := rawdb.ReadLastPivotNumber(bc.db) + frozen, _ := bc.db.Ancients() + + updateFn := func(db ethdb.KeyValueWriter, header *types.Header) (uint64, bool) { + // Rewind the block chain, ensuring we don't end up with a stateless head + // block. Note, depth equality is permitted to allow using SetHead as a + // chain reparation mechanism without deleting any data! + if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() <= currentBlock.NumberU64() { newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64()) if newHeadBlock == nil { + log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash()) newHeadBlock = bc.genesisBlock } else { - if _, err := state.New(newHeadBlock.Root(), bc.stateCache); err != nil { - // Rewound state missing, rolled back to before pivot, reset to genesis - newHeadBlock = bc.genesisBlock + // Block exists, keep rewinding until we find one with state + for { + if _, err := state.New(newHeadBlock.Root(), bc.stateCache, bc.snaps); err != nil { + log.Trace("Block state missing, rewinding further", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash()) + if pivot == nil || newHeadBlock.NumberU64() > *pivot { + newHeadBlock = bc.GetBlock(newHeadBlock.ParentHash(), newHeadBlock.NumberU64()-1) + continue + } else { + log.Trace("Rewind passed pivot, aiming genesis", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "pivot", *pivot) + newHeadBlock = bc.genesisBlock + } + } + log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash()) + break } } rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash()) + + // Degrade the chain markers if they are explicitly reverted. + // In theory we should update all in-memory markers in the + // last step, however the direction of SetHead is from high + // to low, so it's safe the update in-memory markers directly. bc.currentBlock.Store(newHeadBlock) headBlockGauge.Update(int64(newHeadBlock.NumberU64())) } - // Rewind the fast block in a simpleton way to the target head if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() { newHeadFastBlock := bc.GetBlock(header.Hash(), header.Number.Uint64()) @@ -414,11 +495,25 @@ func (bc *BlockChain) SetHead(head uint64) error { newHeadFastBlock = bc.genesisBlock } rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash()) + + // Degrade the chain markers if they are explicitly reverted. + // In theory we should update all in-memory markers in the + // last step, however the direction of SetHead is from high + // to low, so it's safe the update in-memory markers directly. bc.currentFastBlock.Store(newHeadFastBlock) headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64())) } - } + head := bc.CurrentBlock().NumberU64() + // If setHead underflown the freezer threshold and the block processing + // intent afterwards is full block importing, delete the chain segment + // between the stateful-block and the sethead target. + var wipe bool + if head+1 < frozen { + wipe = pivot == nil || head >= *pivot + } + return head, wipe // Only force wipe if full synced + } // Rewind the header chain, deleting all block bodies until then delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) { // Ignore the error here since light client won't hit this path @@ -426,10 +521,9 @@ func (bc *BlockChain) SetHead(head uint64) error { if num+1 <= frozen { // Truncate all relative data(header, total difficulty, body, receipt // and canonical hash) from ancient store. - if err := bc.db.TruncateAncients(num + 1); err != nil { + if err := bc.db.TruncateAncients(num); err != nil { log.Crit("Failed to truncate ancient data", "number", num, "err", err) } - // Remove the hash <-> number mapping from the active store. rawdb.DeleteHeaderNumber(db, hash) } else { @@ -441,8 +535,18 @@ func (bc *BlockChain) SetHead(head uint64) error { } // Todo(rjl493456442) txlookup, bloombits, etc } - bc.hc.SetHead(head, updateFn, delFn) - + // If SetHead was only called as a chain reparation method, try to skip + // touching the header chain altogether, unless the freezer is broken + if block := bc.CurrentBlock(); block.NumberU64() == head { + if target, force := updateFn(bc.db, block.Header()); force { + bc.hc.SetHead(target, updateFn, delFn) + } + } else { + // Rewind the chain to the requested head and keep going backwards until a + // block with a state is found or fast sync pivot is passed + log.Warn("Rewinding blockchain", "target", head) + bc.hc.SetHead(head, updateFn, delFn) + } // Clear out any stale content from the caches bc.bodyCache.Purge() bc.bodyRLPCache.Purge() @@ -471,6 +575,10 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { headBlockGauge.Update(int64(block.NumberU64())) bc.chainmu.Unlock() + // Destroy any existing state snapshot and regenerate it in the background + if bc.snaps != nil { + bc.snaps.Rebuild(block.Root()) + } log.Info("Committed new head block", "number", block.Number(), "hash", hash) return nil } @@ -486,6 +594,15 @@ func (bc *BlockChain) CurrentBlock() *types.Block { return bc.currentBlock.Load().(*types.Block) } +// Snapshot returns the blockchain snapshot tree. This method is mainly used for +// testing, to make it possible to verify the snapshot after execution. +// +// Warning: There are no guarantees about the safety of using the returned 'snap' if the +// blockchain is simultaneously importing blocks, so take care. +func (bc *BlockChain) Snapshot() *snapshot.Tree { + return bc.snaps +} + // CurrentFastBlock retrieves the current fast-sync head block of the canonical // chain. The block is retrieved from the blockchain's internal cache. func (bc *BlockChain) CurrentFastBlock() *types.Block { @@ -509,7 +626,7 @@ func (bc *BlockChain) State() (*state.StateDB, error) { // StateAt returns a new mutable state based on a particular point in time. func (bc *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) { - return state.New(root, bc.stateCache) + return state.New(root, bc.stateCache, bc.snaps) } // StateCache returns the caching database underpinning the blockchain instance. @@ -533,46 +650,25 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { defer bc.chainmu.Unlock() // Prepare the genesis block and reinitialise the chain - if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil { - log.Crit("Failed to write genesis block TD", "err", err) + batch := bc.db.NewBatch() + rawdb.WriteTd(batch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()) + rawdb.WriteBlock(batch, genesis) + if err := batch.Write(); err != nil { + log.Crit("Failed to write genesis block", "err", err) } - rawdb.WriteBlock(bc.db, genesis) + bc.writeHeadBlock(genesis) + // Last update all in-memory chain markers bc.genesisBlock = genesis - bc.insert(bc.genesisBlock) bc.currentBlock.Store(bc.genesisBlock) headBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) - bc.hc.SetGenesis(bc.genesisBlock.Header()) bc.hc.SetCurrentHeader(bc.genesisBlock.Header()) bc.currentFastBlock.Store(bc.genesisBlock) headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) - return nil } -// repair tries to repair the current blockchain by rolling back the current block -// until one with associated state is found. This is needed to fix incomplete db -// writes caused either by crashes/power outages, or simply non-committed tries. -// -// This method only rolls back the current block. The current header and current -// fast block are left intact. -func (bc *BlockChain) repair(head **types.Block) error { - for { - // Abort if we've rewound to a head block that does have associated state - if _, err := state.New((*head).Root(), bc.stateCache); err == nil { - log.Info("Rewound blockchain to past state", "number", (*head).Number(), "hash", (*head).Hash()) - return nil - } - // Otherwise rewind one block and recheck state availability there - block := bc.GetBlock((*head).ParentHash(), (*head).NumberU64()-1) - if block == nil { - return fmt.Errorf("missing block %d [%x]", (*head).NumberU64()-1, (*head).ParentHash()) - } - *head = block - } -} - // Export writes the active chain to the given writer. func (bc *BlockChain) Export(w io.Writer) error { return bc.ExportN(w, uint64(0), bc.CurrentBlock().NumberU64()) @@ -605,31 +701,39 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { return nil } -// insert injects a new head block into the current block chain. This method +// writeHeadBlock injects a new head block into the current block chain. This method // assumes that the block is indeed a true head. It will also reset the head // header and the head fast sync block to this very same block if they are older // or if they are on a different side chain. // // Note, this function assumes that the `mu` mutex is held! -func (bc *BlockChain) insert(block *types.Block) { +func (bc *BlockChain) writeHeadBlock(block *types.Block) { // If the block is on a side chain or an unknown one, force other heads onto it too updateHeads := rawdb.ReadCanonicalHash(bc.db, block.NumberU64()) != block.Hash() // Add the block to the canonical chain number scheme and mark as the head - rawdb.WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64()) - rawdb.WriteHeadBlockHash(bc.db, block.Hash()) - - bc.currentBlock.Store(block) - headBlockGauge.Update(int64(block.NumberU64())) + batch := bc.db.NewBatch() + rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64()) + rawdb.WriteTxLookupEntriesByBlock(batch, block) + rawdb.WriteHeadBlockHash(batch, block.Hash()) // If the block is better than our head or is on a different chain, force update heads if updateHeads { + rawdb.WriteHeadHeaderHash(batch, block.Hash()) + rawdb.WriteHeadFastBlockHash(batch, block.Hash()) + } + // Flush the whole batch into the disk, exit the node if failed + if err := batch.Write(); err != nil { + log.Crit("Failed to update chain indexes and markers", "err", err) + } + // Update all in-memory chain markers in the last step + if updateHeads { bc.hc.SetCurrentHeader(block.Header()) - rawdb.WriteHeadFastBlockHash(bc.db, block.Hash()) - bc.currentFastBlock.Store(block) headFastBlockGauge.Update(int64(block.NumberU64())) } + bc.currentBlock.Store(block) + headBlockGauge.Update(int64(block.NumberU64())) } // Genesis retrieves the chain's genesis block. @@ -800,12 +904,30 @@ func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types. return uncles } -// TrieNode retrieves a blob of data associated with a trie node (or code hash) +// TrieNode retrieves a blob of data associated with a trie node // either from ephemeral in-memory cache, or from persistent storage. func (bc *BlockChain) TrieNode(hash common.Hash) ([]byte, error) { return bc.stateCache.TrieDB().Node(hash) } +// ContractCode retrieves a blob of data associated with a contract hash +// either from ephemeral in-memory cache, or from persistent storage. +func (bc *BlockChain) ContractCode(hash common.Hash) ([]byte, error) { + return bc.stateCache.ContractCode(common.Hash{}, hash) +} + +// ContractCodeWithPrefix retrieves a blob of data associated with a contract +// hash either from ephemeral in-memory cache, or from persistent storage. +// +// If the code doesn't exist in the in-memory cache, check the storage with +// new code scheme. +func (bc *BlockChain) ContractCodeWithPrefix(hash common.Hash) ([]byte, error) { + type codeReader interface { + ContractCodeWithPrefix(addrHash, codeHash common.Hash) ([]byte, error) + } + return bc.stateCache.(codeReader).ContractCodeWithPrefix(common.Hash{}, hash) +} + // Stop stops the blockchain service. If any imports are currently in progress // it will abort them using the procInterrupt. func (bc *BlockChain) Stop() { @@ -815,10 +937,17 @@ func (bc *BlockChain) Stop() { // Unsubscribe all subscriptions registered from blockchain bc.scope.Close() close(bc.quit) - atomic.StoreInt32(&bc.procInterrupt, 1) - + bc.StopInsert() bc.wg.Wait() + // Ensure that the entirety of the state snapshot is journalled to disk. + var snapBase common.Hash + if bc.snaps != nil { + var err error + if snapBase, err = bc.snaps.Journal(bc.CurrentBlock().Root()); err != nil { + log.Error("Failed to journal state snapshot", "err", err) + } + } // Ensure the state of a recent block is also stored to disk before exiting. // We're writing three different states to catch different restart scenarios: // - HEAD: So we don't need to reprocess any blocks in the general case @@ -832,11 +961,17 @@ func (bc *BlockChain) Stop() { recent := bc.GetBlockByNumber(number - offset) log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root()) - if err := triedb.Commit(recent.Root(), true); err != nil { + if err := triedb.Commit(recent.Root(), true, nil); err != nil { log.Error("Failed to commit recent state trie", "err", err) } } } + if snapBase != (common.Hash{}) { + log.Info("Writing snapshot state to disk", "root", snapBase) + if err := triedb.Commit(snapBase, true, nil); err != nil { + log.Error("Failed to commit recent state trie", "err", err) + } + } for !bc.triegc.Empty() { triedb.Dereference(bc.triegc.PopItem().(common.Hash)) } @@ -844,7 +979,25 @@ func (bc *BlockChain) Stop() { log.Error("Dangling trie nodes after full cleanup") } } - log.Info("Blockchain manager stopped") + // Ensure all live cached entries be saved into disk, so that we can skip + // cache warmup when node restarts. + if bc.cacheConfig.TrieCleanJournal != "" { + triedb := bc.stateCache.TrieDB() + triedb.SaveCache(bc.cacheConfig.TrieCleanJournal) + } + log.Info("Blockchain stopped") +} + +// StopInsert interrupts all insertion methods, causing them to return +// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after +// calling this method. +func (bc *BlockChain) StopInsert() { + atomic.StoreInt32(&bc.procInterrupt, 1) +} + +// insertStopped returns true after StopInsert has been called. +func (bc *BlockChain) insertStopped() bool { + return atomic.LoadInt32(&bc.procInterrupt) == 1 } func (bc *BlockChain) procFutureBlocks() { @@ -855,8 +1008,9 @@ func (bc *BlockChain) procFutureBlocks() { } } if len(blocks) > 0 { - types.BlockBy(types.Number).Sort(blocks) - + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].NumberU64() < blocks[j].NumberU64() + }) // Insert one by one as chain insertion needs contiguous ancestry between blocks for i := range blocks { bc.InsertChain(blocks[i : i+1]) @@ -873,42 +1027,6 @@ const ( SideStatTy ) -// Rollback is designed to remove a chain of links from the database that aren't -// certain enough to be valid. -func (bc *BlockChain) Rollback(chain []common.Hash) { - bc.chainmu.Lock() - defer bc.chainmu.Unlock() - - for i := len(chain) - 1; i >= 0; i-- { - hash := chain[i] - - currentHeader := bc.hc.CurrentHeader() - if currentHeader.Hash() == hash { - bc.hc.SetCurrentHeader(bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1)) - } - if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash { - newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1) - rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash()) - bc.currentFastBlock.Store(newFastBlock) - headFastBlockGauge.Update(int64(newFastBlock.NumberU64())) - } - if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash { - newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1) - rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash()) - bc.currentBlock.Store(newBlock) - headBlockGauge.Update(int64(newBlock.NumberU64())) - } - } - // Truncate ancient data which exceeds the current header. - // - // Notably, it can happen that system crashes without truncating the ancient data - // but the head indicator has been updated in the active store. Regarding this issue, - // system will self recovery by truncating the extra data during the setup phase. - if err := bc.truncateAncient(bc.hc.CurrentHeader().Number.Uint64()); err != nil { - log.Crit("Truncate ancient store failed", "err", err) - } -} - // truncateAncient rewinds the blockchain to the specified header and deletes all // data in the ancient store that exceeds the specified header. func (bc *BlockChain) truncateAncient(head uint64) error { @@ -982,7 +1100,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ size = 0 ) // updateHead updates the head fast sync block if the inserted blocks are better - // and returns a indicator whether the inserted blocks are canonical. + // and returns an indicator whether the inserted blocks are canonical. updateHead := func(head *types.Block) bool { if bc.manualCanonical { return false @@ -1024,7 +1142,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ var deleted []*numberHash for i, block := range blockChain { // Short circuit insertion if shutting down or processing failed - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.insertStopped() { return 0, errInsertionInterrupted } // Short circuit insertion if it is required(used in testing only) @@ -1064,7 +1182,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } // Don't collect too much in-memory, write it out every 100K blocks if len(deleted) > 100000 { - // Sync the ancient store explicitly to ensure all data has been flushed to disk. if err := bc.db.Sync(); err != nil { return 0, err @@ -1096,8 +1213,23 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } // Flush data into ancient database. size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64())) - rawdb.WriteTxLookupEntries(batch, block) + // Write tx indices if any condition is satisfied: + // * If user requires to reserve all tx indices(txlookuplimit=0) + // * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit) + // * If block number is large enough to be regarded as a recent block + // It means blocks below the ancientLimit-txlookupLimit won't be indexed. + // + // But if the `TxIndexTail` is not nil, e.g. Geth is initialized with + // an external ancient database, during the setup, blockchain will start + // a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients) + // range. In this case, all tx indices of newly imported blocks should be + // generated. + if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit { + rawdb.WriteTxLookupEntriesByBlock(batch, block) + } else if rawdb.ReadTxIndexTail(bc.db) != nil { + rawdb.WriteTxLookupEntriesByBlock(batch, block) + } stats.processed++ } // Flush all tx-lookup index data. @@ -1154,26 +1286,37 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } // writeLive writes blockchain and corresponding receipt chain into active store. writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { + skipPresenceCheck := false batch := bc.db.NewBatch() for i, block := range blockChain { // Short circuit insertion if shutting down or processing failed - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.insertStopped() { return 0, errInsertionInterrupted } // Short circuit if the owner header is unknown if !bc.HasHeader(block.Hash(), block.NumberU64()) { return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4]) } - if bc.HasBlock(block.Hash(), block.NumberU64()) { - stats.ignored++ - continue + if !skipPresenceCheck { + // Ignore if the entire data is already known + if bc.HasBlock(block.Hash(), block.NumberU64()) { + stats.ignored++ + continue + } else { + // If block N is not present, neither are the later blocks. + // This should be true, but if we are mistaken, the shortcut + // here will only cause overwriting of some existing data + skipPresenceCheck = true + } } // Write all the data out into the database rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()) rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i]) - rawdb.WriteTxLookupEntries(batch, block) + rawdb.WriteTxLookupEntriesByBlock(batch, block) // Always write tx indices for live blocks, we assume they are needed - stats.processed++ + // Write everything belongs to the blocks into the database. So that + // we can ensure all components of body is completed(body, receipts, + // tx indexes) if batch.ValueSize() >= ethdb.IdealBatchSize { if err := batch.Write(); err != nil { return 0, err @@ -1181,7 +1324,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ size += batch.ValueSize() batch.Reset() } + stats.processed++ } + // Write everything belongs to the blocks into the database. So that + // we can ensure all components of body is completed(body, receipts, + // tx indexes) if batch.ValueSize() > 0 { size += batch.ValueSize() if err := batch.Write(); err != nil { @@ -1191,7 +1338,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ updateHead(blockChain[len(blockChain)-1]) return 0, nil } - // Write downloaded chain data and corresponding receipt chain data. + // Write downloaded chain data and corresponding receipt chain data if len(ancientBlocks) > 0 { if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil { if err == errInsertionInterrupted { @@ -1200,6 +1347,19 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ return n, err } } + // Write the tx index tail (block number from where we index) before write any live blocks + if len(liveBlocks) > 0 && liveBlocks[0].NumberU64() == ancientLimit+1 { + // The tx index tail can only be one of the following two options: + // * 0: all ancient blocks have been indexed + // * ancient-limit: the indices of blocks before ancient-limit are ignored + if tail := rawdb.ReadTxIndexTail(bc.db); tail == nil { + if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit { + rawdb.WriteTxIndexTail(bc.db, 0) + } else { + rawdb.WriteTxIndexTail(bc.db, ancientLimit-bc.txLookupLimit) + } + } + } if len(liveBlocks) > 0 { if n, err := writeLive(liveBlocks, liveReceipts); err != nil { if err == errInsertionInterrupted { @@ -1223,6 +1383,18 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ return 0, nil } +// SetTxLookupLimit is responsible for updating the txlookup limit to the +// original one stored in db if the new mismatches with the old one. +func (bc *BlockChain) SetTxLookupLimit(limit uint64) { + bc.txLookupLimit = limit +} + +// TxLookupLimit retrieves the txlookup limit used by blockchain to prune +// stale transaction indices. +func (bc *BlockChain) TxLookupLimit() uint64 { + return bc.txLookupLimit +} + var lastWrite uint64 // writeBlockWithoutState writes only the block and its metadata to the database, @@ -1232,11 +1404,12 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e bc.wg.Add(1) defer bc.wg.Done() - if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), td); err != nil { - return err + batch := bc.db.NewBatch() + rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td) + rawdb.WriteBlock(batch, block) + if err := batch.Write(); err != nil { + log.Crit("Failed to write block into disk", "err", err) } - rawdb.WriteBlock(bc.db, block) - return nil } @@ -1252,25 +1425,21 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error { return err } } - // Write the positional metadata for transaction/receipt lookups. - // Preimages here is empty, ignore it. - rawdb.WriteTxLookupEntries(bc.db, block) - - bc.insert(block) + bc.writeHeadBlock(block) return nil } // WriteBlockWithState writes the block and all associated state to the database. -func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) { +func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { bc.chainmu.Lock() defer bc.chainmu.Unlock() - return bc.writeBlockWithState(block, receipts, state) + return bc.writeBlockWithState(block, receipts, logs, state, emitHeadEvent) } // writeBlockWithState writes the block and all associated state to the database, // but is expects the chain mutex to be held. -func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) { +func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { bc.wg.Add(1) defer bc.wg.Done() @@ -1284,12 +1453,19 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) externTd := new(big.Int).Add(block.Difficulty(), ptd) - // Irrelevant of the canonical status, write the block itself to the database - if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil { - return NonStatTy, err - } - rawdb.WriteBlock(bc.db, block) - + // Irrelevant of the canonical status, write the block itself to the database. + // + // Note all the components of block(td, hash->number map, header, body, receipts) + // should be written atomically. BlockBatch is used for containing all components. + blockBatch := bc.db.NewBatch() + rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd) + rawdb.WriteBlock(blockBatch, block) + rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts) + rawdb.WritePreimages(blockBatch, state.Preimages()) + if err := blockBatch.Write(); err != nil { + log.Crit("Failed to write block into disk", "err", err) + } + // Commit all cached state changes into underlying memory database. root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number())) if err != nil { return NonStatTy, err @@ -1298,7 +1474,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // If we're running an archive node, always flush if bc.cacheConfig.TrieDirtyDisabled { - if err := triedb.Commit(root, false); err != nil { + if err := triedb.Commit(root, false, nil); err != nil { return NonStatTy, err } } else { @@ -1332,7 +1508,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory) } // Flush an entire trie and restart the counters - triedb.Commit(header.Root, true) + triedb.Commit(header.Root, true, nil) lastWrite = chosen bc.gcproc = 0 } @@ -1348,11 +1524,6 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } } } - - // Write other block data using a batch. - batch := bc.db.NewBatch() - rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts) - // If the total difficulty is higher than our known, add it to the canonical chain // Second clause in the if statement reduces the vulnerability to selfish mining. // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf @@ -1378,23 +1549,32 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. return NonStatTy, err } } - // Write the positional metadata for transaction/receipt lookups and preimages - rawdb.WriteTxLookupEntries(batch, block) - rawdb.WritePreimages(batch, state.Preimages()) - status = CanonStatTy } else { status = SideStatTy } - if err := batch.Write(); err != nil { - return NonStatTy, err - } - // Set new head. if status == CanonStatTy { - bc.insert(block) + bc.writeHeadBlock(block) } bc.futureBlocks.Remove(block.Hash()) + + if status == CanonStatTy { + bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) + if len(logs) > 0 { + bc.logsFeed.Send(logs) + } + // In theory we should fire a ChainHeadEvent when we inject + // a canonical block, but sometimes we can insert a batch of + // canonicial blocks. Avoid firing too much ChainHeadEvents, + // we will fire an accumulated ChainHeadEvent and disable fire + // event here. + if emitHeadEvent { + bc.chainHeadFeed.Send(ChainHeadEvent{Block: block}) + } + } else { + bc.chainSideFeed.Send(ChainSideEvent{Block: block}) + } return status, nil } @@ -1445,11 +1625,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { // Pre-checks passed, start the full block imports bc.wg.Add(1) bc.chainmu.Lock() - n, events, logs, err := bc.insertChain(chain, true) + n, err := bc.insertChain(chain, true) bc.chainmu.Unlock() bc.wg.Done() - bc.PostChainEvents(events, logs) return n, err } @@ -1461,23 +1640,24 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { // racey behaviour. If a sidechain import is in progress, and the historic state // is imported, but then new canon-head is added before the actual sidechain // completes, then the historic state could be pruned again -func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { +func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, error) { // If the chain is terminating, don't even bother starting up if atomic.LoadInt32(&bc.procInterrupt) == 1 { - return 0, nil, nil, nil + return 0, nil } // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) - // A queued approach to delivering events. This is generally - // faster than direct delivery and requires much less mutex - // acquiring. var ( - stats = insertStats{startTime: mclock.Now()} - events = make([]interface{}, 0, len(chain)) - lastCanon *types.Block - coalescedLogs []*types.Log + stats = insertStats{startTime: mclock.Now()} + lastCanon *types.Block ) + // Fire a single chain head event if we've progressed the chain + defer func() { + if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { + bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon}) + } + }() // Start the parallel header verifier headers := make([]*types.Header, len(chain)) seals := make([]bool, len(chain)) @@ -1527,7 +1707,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] for block != nil && err == ErrKnownBlock { log.Debug("Writing previously known block", "number", block.Number(), "hash", block.Hash()) if err := bc.writeKnownBlock(block); err != nil { - return it.index, nil, nil, err + return it.index, err } lastCanon = block @@ -1537,16 +1717,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] } switch { // First block is pruned, insert as sidechain and reorg only if TD grows enough - case err == consensus.ErrPrunedAncestor: + case errors.Is(err, consensus.ErrPrunedAncestor): log.Debug("Pruned ancestor, inserting as sidechain", "number", block.Number(), "hash", block.Hash()) return bc.insertSideChain(block, it) // First block is future, shove it (and all children) to the future queue (unknown ancestor) - case err == consensus.ErrFutureBlock || (err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(it.first().ParentHash())): - for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) { + case errors.Is(err, consensus.ErrFutureBlock) || (errors.Is(err, consensus.ErrUnknownAncestor) && bc.futureBlocks.Contains(it.first().ParentHash())): + for block != nil && (it.index == 0 || errors.Is(err, consensus.ErrUnknownAncestor)) { log.Debug("Future block, postponing import", "number", block.Number(), "hash", block.Hash()) if err := bc.addFutureBlock(block); err != nil { - return it.index, events, coalescedLogs, err + return it.index, err } block, err = it.next() } @@ -1554,25 +1734,26 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] stats.ignored += it.remaining() // If there are any still remaining, mark as ignored - return it.index, events, coalescedLogs, err + return it.index, err // Some other error occurred, abort case err != nil: + bc.futureBlocks.Remove(block.Hash()) stats.ignored += len(it.chain) bc.reportBlock(block, nil, err) - return it.index, events, coalescedLogs, err + return it.index, err } // No validation errors for the first block (or chain prefix skipped) for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() { // If the chain is terminating, stop processing blocks - if atomic.LoadInt32(&bc.procInterrupt) == 1 { - log.Debug("Premature abort during blocks processing") + if bc.insertStopped() { + log.Debug("Abort during block processing") break } // If the header is a banned one, straight out abort if BadHashes[block.Hash()] { bc.reportBlock(block, nil, ErrBlacklistedHash) - return it.index, events, coalescedLogs, ErrBlacklistedHash + return it.index, ErrBlacklistedHash } // If the block is known (in the middle of the chain), it's a special case for // Clique blocks where they can share state among each other, so importing an @@ -1588,16 +1769,28 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] "uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(), "root", block.Root()) + // Special case. Commit the empty receipt slice if we meet the known + // block in the middle. It can only happen in the clique chain. Whenever + // we insert blocks via `insertSideChain`, we only commit `td`, `header` + // and `body` if it's non-existent. Since we don't have receipts without + // reexecution, so nothing to commit. But if the sidechain will be adpoted + // as the canonical chain eventually, it needs to be reexecuted for missing + // state, but if it's this special case here(skip reexecution) we will lose + // the empty receipt entry. + if len(block.Transactions()) == 0 { + rawdb.WriteReceipts(bc.db, block.Hash(), block.NumberU64(), nil) + } else { + log.Error("Please file an issue, skip known block execution without receipt", + "hash", block.Hash(), "number", block.NumberU64()) + } if err := bc.writeKnownBlock(block); err != nil { - return it.index, nil, nil, err + return it.index, err } stats.processed++ // We can assume that logs are empty here, since the only way for consecutive // Clique blocks to have the same state is if there are no transactions. - events = append(events, ChainEvent{block, block.Hash(), nil}) lastCanon = block - continue } // Retrieve the parent block and it's state to execute on top @@ -1607,25 +1800,24 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] if parent == nil { parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) } - statedb, err := state.New(parent.Root, bc.stateCache) + statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps) if err != nil { - return it.index, events, coalescedLogs, err + return it.index, err } // If we have a followup block, run that against the current state to pre-cache // transactions and probabilistically some of the account/storage trie nodes. var followupInterrupt uint32 - if !bc.cacheConfig.TrieCleanNoPrefetch { if followup, err := it.peek(); followup != nil && err == nil { - go func(start time.Time) { - throwaway, _ := state.New(parent.Root, bc.stateCache) + throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps) + go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) { bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt) blockPrefetchExecuteTimer.Update(time.Since(start)) - if atomic.LoadUint32(&followupInterrupt) == 1 { + if atomic.LoadUint32(interrupt) == 1 { blockPrefetchInterruptMeter.Mark(1) } - }(time.Now()) + }(time.Now(), followup, throwaway, &followupInterrupt) } } // Process block using the parent state as reference point @@ -1634,17 +1826,19 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] if err != nil { bc.reportBlock(block, receipts, err) atomic.StoreUint32(&followupInterrupt, 1) - return it.index, events, coalescedLogs, err + return it.index, err } // Update the metrics touched during block processing - accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them - storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them - accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them - storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them + accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them + storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them + accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them + storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them + snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete, we can mark them + snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete, we can mark them triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation - trieproc := statedb.AccountReads + statedb.AccountUpdates - trieproc += statedb.StorageReads + statedb.StorageUpdates + trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates + trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash) @@ -1653,7 +1847,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { bc.reportBlock(block, receipts, err) atomic.StoreUint32(&followupInterrupt, 1) - return it.index, events, coalescedLogs, err + return it.index, err } proctime := time.Since(start) @@ -1665,18 +1859,18 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // Write the block to the chain and get the status. substart = time.Now() - status, err := bc.writeBlockWithState(block, receipts, statedb) + status, err := bc.writeBlockWithState(block, receipts, logs, statedb, false) + atomic.StoreUint32(&followupInterrupt, 1) if err != nil { - atomic.StoreUint32(&followupInterrupt, 1) - return it.index, events, coalescedLogs, err + return it.index, err } - atomic.StoreUint32(&followupInterrupt, 1) // Update the metrics touched during block commit - accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them - storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them + accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them + storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them + snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them - blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits) + blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits) blockInsertTimer.UpdateSince(start) switch status { @@ -1686,8 +1880,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] "elapsed", common.PrettyDuration(time.Since(start)), "root", block.Root()) - coalescedLogs = append(coalescedLogs, logs...) - events = append(events, ChainEvent{block, block.Hash(), logs}) lastCanon = block // Only count canonical blocks for GC processing time @@ -1698,7 +1890,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()), "root", block.Root()) - events = append(events, ChainSideEvent{block}) default: // This in theory is impossible, but lets be nice to our future selves and leave @@ -1715,26 +1906,22 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] stats.report(chain, it.index, dirty) } // Any blocks remaining here? The only ones we care about are the future ones - if block != nil && err == consensus.ErrFutureBlock { + if block != nil && errors.Is(err, consensus.ErrFutureBlock) { if err := bc.addFutureBlock(block); err != nil { - return it.index, events, coalescedLogs, err + return it.index, err } block, err = it.next() - for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() { + for ; block != nil && errors.Is(err, consensus.ErrUnknownAncestor); block, err = it.next() { if err := bc.addFutureBlock(block); err != nil { - return it.index, events, coalescedLogs, err + return it.index, err } stats.queued++ } } stats.ignored += it.remaining() - // Append a single chain head event if we've progressed the chain - if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { - events = append(events, ChainHeadEvent{lastCanon}) - } - return it.index, events, coalescedLogs, err + return it.index, err } // insertSideChain is called when an import batch hits upon a pruned ancestor @@ -1743,7 +1930,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // // The method writes all (header-and-body-valid) blocks to disk, then tries to // switch over to the new chain if the TD exceeded the current chain. -func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, error) { +func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, error) { var ( externTd *big.Int current = bc.CurrentBlock() @@ -1753,7 +1940,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i // ones. Any other errors means that the block is invalid, and should not be written // to disk. err := consensus.ErrPrunedAncestor - for ; block != nil && (err == consensus.ErrPrunedAncestor); block, err = it.next() { + for ; block != nil && errors.Is(err, consensus.ErrPrunedAncestor); block, err = it.next() { // Check the canonical state root for that number if number := block.NumberU64(); current.NumberU64() >= number { canonical := bc.GetBlockByNumber(number) @@ -1779,7 +1966,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i // If someone legitimately side-mines blocks, they would still be imported as usual. However, // we cannot risk writing unverified blocks to disk when they obviously target the pruning // mechanism. - return it.index, nil, nil, errors.New("sidechain ghost-state attack") + return it.index, errors.New("sidechain ghost-state attack") } } if externTd == nil { @@ -1790,7 +1977,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i if !bc.HasBlock(block.Hash(), block.NumberU64()) { start := time.Now() if err := bc.writeBlockWithoutState(block, externTd); err != nil { - return it.index, nil, nil, err + return it.index, err } log.Debug("Injected sidechain block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)), @@ -1807,7 +1994,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i localTd := bc.GetTd(current.Hash(), current.NumberU64()) if bc.manualCanonical || localTd.Cmp(externTd) > 0 { log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().Number, "sidetd", externTd, "localtd", localTd) - return it.index, nil, nil, err + return it.index, err } // Gather all the sidechain hashes (full blocks may be memory heavy) var ( @@ -1822,7 +2009,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1) } if parent == nil { - return it.index, nil, nil, errors.New("missing parent") + return it.index, errors.New("missing parent") } // Import all the pruned blocks to make the state available var ( @@ -1841,15 +2028,15 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i // memory here. if len(blocks) >= 2048 || memory > 64*1024*1024 { log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64()) - if _, _, _, err := bc.insertChain(blocks, false); err != nil { - return 0, nil, nil, err + if _, err := bc.insertChain(blocks, false); err != nil { + return 0, err } blocks, memory = blocks[:0], 0 // If the chain is terminating, stop processing blocks - if atomic.LoadInt32(&bc.procInterrupt) == 1 { - log.Debug("Premature abort during blocks processing") - return 0, nil, nil, nil + if bc.insertStopped() { + log.Debug("Abort during blocks processing") + return 0, nil } } } @@ -1857,7 +2044,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64()) return bc.insertChain(blocks, false) } - return 0, nil, nil, nil + return 0, nil } // reorg takes two blocks, an old chain and a new chain and will reconstruct the @@ -1872,11 +2059,11 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { deletedTxs types.Transactions addedTxs types.Transactions - deletedLogs []*types.Log - rebirthLogs []*types.Log + deletedLogs [][]*types.Log + rebirthLogs [][]*types.Log - // collectLogs collects the logs that were generated during the - // processing of the block that corresponds with the given hash. + // collectLogs collects the logs that were generated or removed during + // the processing of the block that corresponds with the given hash. // These logs are later announced as deleted or reborn collectLogs = func(hash common.Hash, removed bool) { number := bc.hc.GetBlockNumber(hash) @@ -1884,18 +2071,40 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { return } receipts := rawdb.ReadReceipts(bc.db, hash, *number, bc.chainConfig) + + var logs []*types.Log for _, receipt := range receipts { for _, log := range receipt.Logs { l := *log if removed { l.Removed = true - deletedLogs = append(deletedLogs, &l) } else { - rebirthLogs = append(rebirthLogs, &l) } + logs = append(logs, &l) + } + } + if len(logs) > 0 { + if removed { + deletedLogs = append(deletedLogs, logs) + } else { + rebirthLogs = append(rebirthLogs, logs) } } } + // mergeLogs returns a merged log slice with specified sort order. + mergeLogs = func(logs [][]*types.Log, reverse bool) []*types.Log { + var ret []*types.Log + if reverse { + for i := len(logs) - 1; i >= 0; i-- { + ret = append(ret, logs[i]...) + } + } else { + for i := 0; i < len(logs); i++ { + ret = append(ret, logs[i]...) + } + } + return ret + } ) // Reduce the longer chain to the same number as the shorter one if oldBlock.NumberU64() > newBlock.NumberU64() { @@ -1954,6 +2163,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { "drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash()) blockReorgAddMeter.Mark(int64(len(newChain))) blockReorgDropMeter.Mark(int64(len(oldChain))) + blockReorgMeter.Mark(1) } else { log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash()) } @@ -1961,20 +2171,19 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // taking care of the proper incremental order. for i := len(newChain) - 1; i >= 1; i-- { // Insert the block in the canonical way, re-writing history - bc.insert(newChain[i]) + bc.writeHeadBlock(newChain[i]) // Collect reborn logs due to chain reorg collectLogs(newChain[i].Hash(), false) - // Write lookup entries for hash based transaction/receipt searches - rawdb.WriteTxLookupEntries(bc.db, newChain[i]) + // Collect the new added transactions. addedTxs = append(addedTxs, newChain[i].Transactions()...) } - // When transactions get deleted from the database, the receipts that were - // created in the fork must also be deleted - batch := bc.db.NewBatch() + // Delete useless indexes right now which includes the non-canonical + // transaction indexes, canonical chain indexes which above the head. + indexesBatch := bc.db.NewBatch() for _, tx := range types.TxDifference(deletedTxs, addedTxs) { - rawdb.DeleteTxLookupEntry(batch, tx.Hash()) + rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash()) } // Delete any canonical number assignments above the new head number := bc.CurrentBlock().NumberU64() @@ -1983,52 +2192,27 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { if hash == (common.Hash{}) { break } - rawdb.DeleteCanonicalHash(batch, i) + rawdb.DeleteCanonicalHash(indexesBatch, i) + } + if err := indexesBatch.Write(); err != nil { + log.Crit("Failed to delete useless indexes", "err", err) } - batch.Write() // If any logs need to be fired, do it now. In theory we could avoid creating // this goroutine if there are no events to fire, but realistcally that only // ever happens if we're reorging empty blocks, which will only happen on idle // networks where performance is not an issue either way. - // - // TODO(karalabe): Can we get rid of the goroutine somehow to guarantee correct - // event ordering? - go func() { - if len(deletedLogs) > 0 { - bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) - } - if len(rebirthLogs) > 0 { - bc.logsFeed.Send(rebirthLogs) - } - if len(oldChain) > 0 { - for _, block := range oldChain { - bc.chainSideFeed.Send(ChainSideEvent{Block: block}) - } - } - }() - return nil -} - -// PostChainEvents iterates over the events generated by a chain insertion and -// posts them into the event feed. -// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock. -func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { - // post event logs for further processing - if logs != nil { - bc.logsFeed.Send(logs) + if len(deletedLogs) > 0 { + bc.rmLogsFeed.Send(RemovedLogsEvent{mergeLogs(deletedLogs, true)}) } - for _, event := range events { - switch ev := event.(type) { - case ChainEvent: - bc.chainFeed.Send(ev) - - case ChainHeadEvent: - bc.chainHeadFeed.Send(ev) - - case ChainSideEvent: - bc.chainSideFeed.Send(ev) + if len(rebirthLogs) > 0 { + bc.logsFeed.Send(mergeLogs(rebirthLogs, false)) + } + if len(oldChain) > 0 { + for i := len(oldChain) - 1; i >= 0; i-- { + bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]}) } } + return nil } func (bc *BlockChain) update() { @@ -2044,6 +2228,86 @@ func (bc *BlockChain) update() { } } +// maintainTxIndex is responsible for the construction and deletion of the +// transaction index. +// +// User can use flag `txlookuplimit` to specify a "recentness" block, below +// which ancient tx indices get deleted. If `txlookuplimit` is 0, it means +// all tx indices will be reserved. +// +// The user can adjust the txlookuplimit value for each launch after fast +// sync, Geth will automatically construct the missing indices and delete +// the extra indices. +func (bc *BlockChain) maintainTxIndex(ancients uint64) { + // Before starting the actual maintenance, we need to handle a special case, + // where user might init Geth with an external ancient database. If so, we + // need to reindex all necessary transactions before starting to process any + // pruning requests. + if ancients > 0 { + var from = uint64(0) + if bc.txLookupLimit != 0 && ancients > bc.txLookupLimit { + from = ancients - bc.txLookupLimit + } + rawdb.IndexTransactions(bc.db, from, ancients) + } + // indexBlocks reindexes or unindexes transactions depending on user configuration + indexBlocks := func(tail *uint64, head uint64, done chan struct{}) { + defer func() { done <- struct{}{} }() + + // If the user just upgraded Geth to a new version which supports transaction + // index pruning, write the new tail and remove anything older. + if tail == nil { + if bc.txLookupLimit == 0 || head < bc.txLookupLimit { + // Nothing to delete, write the tail and return + rawdb.WriteTxIndexTail(bc.db, 0) + } else { + // Prune all stale tx indices and record the tx index tail + rawdb.UnindexTransactions(bc.db, 0, head-bc.txLookupLimit+1) + } + return + } + // If a previous indexing existed, make sure that we fill in any missing entries + if bc.txLookupLimit == 0 || head < bc.txLookupLimit { + if *tail > 0 { + rawdb.IndexTransactions(bc.db, 0, *tail) + } + return + } + // Update the transaction index to the new chain state + if head-bc.txLookupLimit+1 < *tail { + // Reindex a part of missing indices and rewind index tail to HEAD-limit + rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail) + } else { + // Unindex a part of stale indices and forward index tail to HEAD-limit + rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1) + } + } + // Any reindexing done, start listening to chain events and moving the index window + var ( + done chan struct{} // Non-nil if background unindexing or reindexing routine is active. + headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed + ) + sub := bc.SubscribeChainHeadEvent(headCh) + if sub == nil { + return + } + defer sub.Unsubscribe() + + for { + select { + case head := <-headCh: + if done == nil { + done = make(chan struct{}) + go indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done) + } + case <-done: + done = nil + case <-bc.quit: + return + } + } +} + // BadBlocks returns a list of the last 'bad blocks' that the client has seen on the network func (bc *BlockChain) BadBlocks() []*types.Block { blocks := make([]*types.Block, 0, bc.badBlocks.Len()) @@ -2148,6 +2412,11 @@ func (bc *BlockChain) HasHeader(hash common.Hash, number uint64) bool { return bc.hc.HasHeader(hash, number) } +// GetCanonicalHash returns the canonical hash for a given block number +func (bc *BlockChain) GetCanonicalHash(number uint64) common.Hash { + return bc.hc.GetCanonicalHash(number) +} + // GetBlockHashesFromHash retrieves a number of block hashes starting at a given // hash, fetching towards the genesis block. func (bc *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash { @@ -2160,9 +2429,6 @@ func (bc *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []com // // Note: ancestor == 0 returns the same block, 1 returns its parent and so on. func (bc *BlockChain) GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) { - bc.chainmu.RLock() - defer bc.chainmu.RUnlock() - return bc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical) } |