diff options
author | Ted Yin <tederminant@gmail.com> | 2020-09-18 13:14:29 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-18 13:14:29 -0400 |
commit | d048937c48753d9eaef771bf71820cf95d79df26 (patch) | |
tree | 1a7f65fcd72e77092525ab01625b8b9d365e3e40 /core/blockchain.go | |
parent | 7d1388c743b4ec8f4a86bea95bfada785dee83f7 (diff) | |
parent | 7d8c85cf8895b0f998d8eafb02f99d5b689fcd59 (diff) |
Merge pull request #34 from ava-labs/devv0.3.0-rc.5
Dev
Diffstat (limited to 'core/blockchain.go')
-rw-r--r-- | core/blockchain.go | 911 |
1 files changed, 589 insertions, 322 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 6e316c7..208a026 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -23,6 +23,7 @@ import ( "io" "math/big" mrand "math/rand" + "sort" "sync" "sync/atomic" "time" @@ -30,19 +31,20 @@ import ( "github.com/ava-labs/coreth/consensus" "github.com/ava-labs/coreth/core/rawdb" "github.com/ava-labs/coreth/core/state" + "github.com/ava-labs/coreth/core/state/snapshot" "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/core/vm" "github.com/ava-labs/coreth/params" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/common/mclock" - "github.com/ava-labs/go-ethereum/common/prque" - "github.com/ava-labs/go-ethereum/ethdb" - "github.com/ava-labs/go-ethereum/event" - "github.com/ava-labs/go-ethereum/log" - "github.com/ava-labs/go-ethereum/metrics" - "github.com/ava-labs/go-ethereum/rlp" - "github.com/ava-labs/go-ethereum/trie" - "github.com/hashicorp/golang-lru" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/common/prque" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" + lru "github.com/hashicorp/golang-lru" ) var ( @@ -60,12 +62,19 @@ var ( storageUpdateTimer = metrics.NewRegisteredTimer("chain/storage/updates", nil) storageCommitTimer = metrics.NewRegisteredTimer("chain/storage/commits", nil) + snapshotAccountReadTimer = metrics.NewRegisteredTimer("chain/snapshot/account/reads", nil) + snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storage/reads", nil) + snapshotCommitTimer = metrics.NewRegisteredTimer("chain/snapshot/commits", nil) + blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil) blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil) blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil) blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil) - blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil) - blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil) + + blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil) + blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil) + blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil) + blockReorgInvalidatedTx = metrics.NewRegisteredMeter("chain/reorg/invalidTx", nil) blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil) blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) @@ -103,17 +112,35 @@ const ( // - Version 7 // The following incompatible database changes were added: // * Use freezer as the ancient database to maintain all ancient data - BlockChainVersion uint64 = 7 + // - Version 8 + // The following incompatible database changes were added: + // * New scheme for contract code in order to separate the codes and trie nodes + BlockChainVersion uint64 = 8 ) // CacheConfig contains the configuration values for the trie caching/pruning // that's resident in a blockchain. type CacheConfig struct { TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory + TrieCleanJournal string // Disk journal for saving clean cache entries. + TrieCleanRejournal time.Duration // Time interval to dump clean cache to disk periodically TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node) TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk + SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory + + SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it +} + +// defaultCacheConfig are the default caching values if none are specified by the +// user (also used during testing). +var defaultCacheConfig = &CacheConfig{ + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieTimeLimit: 5 * time.Minute, + SnapshotLimit: 256, + SnapshotWait: true, } // BlockChain represents the canonical chain given a database with a genesis @@ -135,9 +162,17 @@ type BlockChain struct { cacheConfig *CacheConfig // Cache configuration for pruning db ethdb.Database // Low level persistent database to store final content in + snaps *snapshot.Tree // Snapshot tree for fast trie leaf access triegc *prque.Prque // Priority queue mapping block numbers to tries to gc gcproc time.Duration // Accumulates canonical block processing for trie dumping + // txLookupLimit is the maximum number of blocks from head whose tx indices + // are reserved: + // * 0: means no limit and regenerate any missing indexes + // * N: means N block limit [HEAD-N+1, HEAD] and delete extra indexes + // * nil: disable tx reindexer/deleter, but still index new blocks + txLookupLimit uint64 + hc *HeaderChain rmLogsFeed event.Feed chainFeed event.Feed @@ -161,11 +196,10 @@ type BlockChain struct { txLookupCache *lru.Cache // Cache for the most recent transaction lookup data. futureBlocks *lru.Cache // future blocks are blocks added for later processing - quit chan struct{} // blockchain quit channel - running int32 // running must be called atomically - // procInterrupt must be atomically called - procInterrupt int32 // interrupt signaler for block processing + quit chan struct{} // blockchain quit channel wg sync.WaitGroup // chain processing wait group for shutting down + running int32 // 0 if chain is running, 1 when stopped + procInterrupt int32 // interrupt signaler for block processing engine consensus.Engine validator Validator // Block and state validator interface @@ -182,13 +216,9 @@ type BlockChain struct { // NewBlockChain returns a fully initialised block chain using information // available in the database. It initialises the default Ethereum Validator and // Processor. -func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, manualCanonical bool) (*BlockChain, error) { +func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, txLookupLimit *uint64, manualCanonical bool) (*BlockChain, error) { if cacheConfig == nil { - cacheConfig = &CacheConfig{ - TrieCleanLimit: 256, - TrieDirtyLimit: 256, - TrieTimeLimit: 5 * time.Minute, - } + cacheConfig = defaultCacheConfig } bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) @@ -203,7 +233,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par cacheConfig: cacheConfig, db: db, triegc: prque.New(nil), - stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit), + stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit, cacheConfig.TrieCleanJournal), quit: make(chan struct{}), shouldPreserve: shouldPreserve, bodyCache: bodyCache, @@ -222,7 +252,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bc.processor = NewStateProcessor(chainConfig, bc, engine) var err error - bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt) + bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped) if err != nil { return nil, err } @@ -230,18 +260,35 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par if bc.genesisBlock == nil { return nil, ErrNoGenesis } + + var nilBlock *types.Block + bc.currentBlock.Store(nilBlock) + bc.currentFastBlock.Store(nilBlock) + // Initialize the chain with ancient data if it isn't empty. + var txIndexBlock uint64 + if bc.empty() { rawdb.InitDatabaseFromFreezer(bc.db) + // If ancient database is not empty, reconstruct all missing + // indices in the background. + frozen, _ := bc.db.Ancients() + if frozen > 0 { + txIndexBlock = frozen + } } if err := bc.loadLastState(); err != nil { return nil, err } - // The first thing the node will do is reconstruct the verification data for - // the head block (ethash cache or clique voting snapshot). Might as well do - // it in advance. - bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true) - + // Make sure the state associated with the block is available + head := bc.CurrentBlock() + if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil { + log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash()) + if err := bc.SetHead(head.NumberU64()); err != nil { + return nil, err + } + } + // Ensure that a previous crash in SetHead doesn't leave extra ancients if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 { var ( needRewind bool @@ -251,7 +298,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par // blockchain repair. If the head full block is even lower than the ancient // chain, truncate the ancient store. fullBlock := bc.CurrentBlock() - if fullBlock != nil && fullBlock != bc.genesisBlock && fullBlock.NumberU64() < frozen-1 { + if fullBlock != nil && fullBlock.Hash() != bc.genesisBlock.Hash() && fullBlock.NumberU64() < frozen-1 { needRewind = true low = fullBlock.NumberU64() } @@ -266,15 +313,17 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } } if needRewind { - var hashes []common.Hash - previous := bc.CurrentHeader().Number.Uint64() - for i := low + 1; i <= bc.CurrentHeader().Number.Uint64(); i++ { - hashes = append(hashes, rawdb.ReadCanonicalHash(bc.db, i)) + log.Error("Truncating ancient chain", "from", bc.CurrentHeader().Number.Uint64(), "to", low) + if err := bc.SetHead(low); err != nil { + return nil, err } - bc.Rollback(hashes) - log.Warn("Truncate ancient chain", "from", previous, "to", low) } } + // The first thing the node will do is reconstruct the verification data for + // the head block (ethash cache or clique voting snapshot). Might as well do + // it in advance. + bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true) + // Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain for hash := range BadHashes { if header := bc.GetHeaderByHash(hash); header != nil { @@ -283,20 +332,39 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par // make sure the headerByNumber (if present) is in our current canonical chain if headerByNumber != nil && headerByNumber.Hash() == header.Hash() { log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash) - bc.SetHead(header.Number.Uint64() - 1) + if err := bc.SetHead(header.Number.Uint64() - 1); err != nil { + return nil, err + } log.Error("Chain rewind was successful, resuming normal operation") } } } + // Load any existing snapshot, regenerating it if loading failed + if bc.cacheConfig.SnapshotLimit > 0 { + bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, bc.CurrentBlock().Root(), !bc.cacheConfig.SnapshotWait) + } // Take ownership of this particular state go bc.update() + if txLookupLimit != nil { + bc.txLookupLimit = *txLookupLimit + go bc.maintainTxIndex(txIndexBlock) + } + // If periodic cache journal is required, spin it up. + if bc.cacheConfig.TrieCleanRejournal > 0 { + if bc.cacheConfig.TrieCleanRejournal < time.Minute { + log.Warn("Sanitizing invalid trie cache journal time", "provided", bc.cacheConfig.TrieCleanRejournal, "updated", time.Minute) + bc.cacheConfig.TrieCleanRejournal = time.Minute + } + triedb := bc.stateCache.TrieDB() + bc.wg.Add(1) + go func() { + defer bc.wg.Done() + triedb.SaveCachePeriodically(bc.cacheConfig.TrieCleanJournal, bc.cacheConfig.TrieCleanRejournal, bc.quit) + }() + } return bc, nil } -func (bc *BlockChain) getProcInterrupt() bool { - return atomic.LoadInt32(&bc.procInterrupt) == 1 -} - // GetVMConfig returns the block chain VM config. func (bc *BlockChain) GetVMConfig() *vm.Config { return &bc.vmConfig @@ -333,15 +401,6 @@ func (bc *BlockChain) loadLastState() error { log.Warn("Head block missing, resetting chain", "hash", head) return bc.Reset() } - // Make sure the state associated with the block is available - if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil { - // Dangling block without a state associated, init from scratch - log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash()) - if err := bc.repair(¤tBlock); err != nil { - return err - } - rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash()) - } // Everything seems to be fine, set as the head block bc.currentBlock.Store(currentBlock) headBlockGauge.Update(int64(currentBlock.NumberU64())) @@ -375,37 +434,59 @@ func (bc *BlockChain) loadLastState() error { log.Info("Loaded most recent local header", "number", currentHeader.Number, "hash", currentHeader.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(currentHeader.Time), 0))) log.Info("Loaded most recent local full block", "number", currentBlock.Number(), "hash", currentBlock.Hash(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(currentBlock.Time()), 0))) log.Info("Loaded most recent local fast block", "number", currentFastBlock.Number(), "hash", currentFastBlock.Hash(), "td", fastTd, "age", common.PrettyAge(time.Unix(int64(currentFastBlock.Time()), 0))) - + if pivot := rawdb.ReadLastPivotNumber(bc.db); pivot != nil { + log.Info("Loaded last fast-sync pivot marker", "number", *pivot) + } return nil } -// SetHead rewinds the local chain to a new head. In the case of headers, everything -// above the new head will be deleted and the new one set. In the case of blocks -// though, the head may be further rewound if block bodies are missing (non-archive -// nodes after a fast sync). +// SetHead rewinds the local chain to a new head. Depending on whether the node +// was fast synced or full synced and in which state, the method will try to +// delete minimal data from disk whilst retaining chain consistency. func (bc *BlockChain) SetHead(head uint64) error { - log.Warn("Rewinding blockchain", "target", head) - bc.chainmu.Lock() defer bc.chainmu.Unlock() - updateFn := func(db ethdb.KeyValueWriter, header *types.Header) { - // Rewind the block chain, ensuring we don't end up with a stateless head block - if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() < currentBlock.NumberU64() { + // Retrieve the last pivot block to short circuit rollbacks beyond it and the + // current freezer limit to start nuking id underflown + pivot := rawdb.ReadLastPivotNumber(bc.db) + frozen, _ := bc.db.Ancients() + + updateFn := func(db ethdb.KeyValueWriter, header *types.Header) (uint64, bool) { + // Rewind the block chain, ensuring we don't end up with a stateless head + // block. Note, depth equality is permitted to allow using SetHead as a + // chain reparation mechanism without deleting any data! + if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() <= currentBlock.NumberU64() { newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64()) if newHeadBlock == nil { + log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash()) newHeadBlock = bc.genesisBlock } else { - if _, err := state.New(newHeadBlock.Root(), bc.stateCache); err != nil { - // Rewound state missing, rolled back to before pivot, reset to genesis - newHeadBlock = bc.genesisBlock + // Block exists, keep rewinding until we find one with state + for { + if _, err := state.New(newHeadBlock.Root(), bc.stateCache, bc.snaps); err != nil { + log.Trace("Block state missing, rewinding further", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash()) + if pivot == nil || newHeadBlock.NumberU64() > *pivot { + newHeadBlock = bc.GetBlock(newHeadBlock.ParentHash(), newHeadBlock.NumberU64()-1) + continue + } else { + log.Trace("Rewind passed pivot, aiming genesis", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "pivot", *pivot) + newHeadBlock = bc.genesisBlock + } + } + log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash()) + break } } rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash()) + + // Degrade the chain markers if they are explicitly reverted. + // In theory we should update all in-memory markers in the + // last step, however the direction of SetHead is from high + // to low, so it's safe the update in-memory markers directly. bc.currentBlock.Store(newHeadBlock) headBlockGauge.Update(int64(newHeadBlock.NumberU64())) } - // Rewind the fast block in a simpleton way to the target head if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() { newHeadFastBlock := bc.GetBlock(header.Hash(), header.Number.Uint64()) @@ -414,11 +495,25 @@ func (bc *BlockChain) SetHead(head uint64) error { newHeadFastBlock = bc.genesisBlock } rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash()) + + // Degrade the chain markers if they are explicitly reverted. + // In theory we should update all in-memory markers in the + // last step, however the direction of SetHead is from high + // to low, so it's safe the update in-memory markers directly. bc.currentFastBlock.Store(newHeadFastBlock) headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64())) } - } + head := bc.CurrentBlock().NumberU64() + // If setHead underflown the freezer threshold and the block processing + // intent afterwards is full block importing, delete the chain segment + // between the stateful-block and the sethead target. + var wipe bool + if head+1 < frozen { + wipe = pivot == nil || head >= *pivot + } + return head, wipe // Only force wipe if full synced + } // Rewind the header chain, deleting all block bodies until then delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) { // Ignore the error here since light client won't hit this path @@ -426,10 +521,9 @@ func (bc *BlockChain) SetHead(head uint64) error { if num+1 <= frozen { // Truncate all relative data(header, total difficulty, body, receipt // and canonical hash) from ancient store. - if err := bc.db.TruncateAncients(num + 1); err != nil { + if err := bc.db.TruncateAncients(num); err != nil { log.Crit("Failed to truncate ancient data", "number", num, "err", err) } - // Remove the hash <-> number mapping from the active store. rawdb.DeleteHeaderNumber(db, hash) } else { @@ -441,8 +535,18 @@ func (bc *BlockChain) SetHead(head uint64) error { } // Todo(rjl493456442) txlookup, bloombits, etc } - bc.hc.SetHead(head, updateFn, delFn) - + // If SetHead was only called as a chain reparation method, try to skip + // touching the header chain altogether, unless the freezer is broken + if block := bc.CurrentBlock(); block.NumberU64() == head { + if target, force := updateFn(bc.db, block.Header()); force { + bc.hc.SetHead(target, updateFn, delFn) + } + } else { + // Rewind the chain to the requested head and keep going backwards until a + // block with a state is found or fast sync pivot is passed + log.Warn("Rewinding blockchain", "target", head) + bc.hc.SetHead(head, updateFn, delFn) + } // Clear out any stale content from the caches bc.bodyCache.Purge() bc.bodyRLPCache.Purge() @@ -471,6 +575,10 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { headBlockGauge.Update(int64(block.NumberU64())) bc.chainmu.Unlock() + // Destroy any existing state snapshot and regenerate it in the background + if bc.snaps != nil { + bc.snaps.Rebuild(block.Root()) + } log.Info("Committed new head block", "number", block.Number(), "hash", hash) return nil } @@ -486,6 +594,15 @@ func (bc *BlockChain) CurrentBlock() *types.Block { return bc.currentBlock.Load().(*types.Block) } +// Snapshot returns the blockchain snapshot tree. This method is mainly used for +// testing, to make it possible to verify the snapshot after execution. +// +// Warning: There are no guarantees about the safety of using the returned 'snap' if the +// blockchain is simultaneously importing blocks, so take care. +func (bc *BlockChain) Snapshot() *snapshot.Tree { + return bc.snaps +} + // CurrentFastBlock retrieves the current fast-sync head block of the canonical // chain. The block is retrieved from the blockchain's internal cache. func (bc *BlockChain) CurrentFastBlock() *types.Block { @@ -509,7 +626,7 @@ func (bc *BlockChain) State() (*state.StateDB, error) { // StateAt returns a new mutable state based on a particular point in time. func (bc *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) { - return state.New(root, bc.stateCache) + return state.New(root, bc.stateCache, bc.snaps) } // StateCache returns the caching database underpinning the blockchain instance. @@ -533,46 +650,25 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { defer bc.chainmu.Unlock() // Prepare the genesis block and reinitialise the chain - if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil { - log.Crit("Failed to write genesis block TD", "err", err) + batch := bc.db.NewBatch() + rawdb.WriteTd(batch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()) + rawdb.WriteBlock(batch, genesis) + if err := batch.Write(); err != nil { + log.Crit("Failed to write genesis block", "err", err) } - rawdb.WriteBlock(bc.db, genesis) + bc.writeHeadBlock(genesis) + // Last update all in-memory chain markers bc.genesisBlock = genesis - bc.insert(bc.genesisBlock) bc.currentBlock.Store(bc.genesisBlock) headBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) - bc.hc.SetGenesis(bc.genesisBlock.Header()) bc.hc.SetCurrentHeader(bc.genesisBlock.Header()) bc.currentFastBlock.Store(bc.genesisBlock) headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) - return nil } -// repair tries to repair the current blockchain by rolling back the current block -// until one with associated state is found. This is needed to fix incomplete db -// writes caused either by crashes/power outages, or simply non-committed tries. -// -// This method only rolls back the current block. The current header and current -// fast block are left intact. -func (bc *BlockChain) repair(head **types.Block) error { - for { - // Abort if we've rewound to a head block that does have associated state - if _, err := state.New((*head).Root(), bc.stateCache); err == nil { - log.Info("Rewound blockchain to past state", "number", (*head).Number(), "hash", (*head).Hash()) - return nil - } - // Otherwise rewind one block and recheck state availability there - block := bc.GetBlock((*head).ParentHash(), (*head).NumberU64()-1) - if block == nil { - return fmt.Errorf("missing block %d [%x]", (*head).NumberU64()-1, (*head).ParentHash()) - } - *head = block - } -} - // Export writes the active chain to the given writer. func (bc *BlockChain) Export(w io.Writer) error { return bc.ExportN(w, uint64(0), bc.CurrentBlock().NumberU64()) @@ -605,31 +701,39 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { return nil } -// insert injects a new head block into the current block chain. This method +// writeHeadBlock injects a new head block into the current block chain. This method // assumes that the block is indeed a true head. It will also reset the head // header and the head fast sync block to this very same block if they are older // or if they are on a different side chain. // // Note, this function assumes that the `mu` mutex is held! -func (bc *BlockChain) insert(block *types.Block) { +func (bc *BlockChain) writeHeadBlock(block *types.Block) { // If the block is on a side chain or an unknown one, force other heads onto it too updateHeads := rawdb.ReadCanonicalHash(bc.db, block.NumberU64()) != block.Hash() // Add the block to the canonical chain number scheme and mark as the head - rawdb.WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64()) - rawdb.WriteHeadBlockHash(bc.db, block.Hash()) - - bc.currentBlock.Store(block) - headBlockGauge.Update(int64(block.NumberU64())) + batch := bc.db.NewBatch() + rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64()) + rawdb.WriteTxLookupEntriesByBlock(batch, block) + rawdb.WriteHeadBlockHash(batch, block.Hash()) // If the block is better than our head or is on a different chain, force update heads if updateHeads { + rawdb.WriteHeadHeaderHash(batch, block.Hash()) + rawdb.WriteHeadFastBlockHash(batch, block.Hash()) + } + // Flush the whole batch into the disk, exit the node if failed + if err := batch.Write(); err != nil { + log.Crit("Failed to update chain indexes and markers", "err", err) + } + // Update all in-memory chain markers in the last step + if updateHeads { bc.hc.SetCurrentHeader(block.Header()) - rawdb.WriteHeadFastBlockHash(bc.db, block.Hash()) - bc.currentFastBlock.Store(block) headFastBlockGauge.Update(int64(block.NumberU64())) } + bc.currentBlock.Store(block) + headBlockGauge.Update(int64(block.NumberU64())) } // Genesis retrieves the chain's genesis block. @@ -800,12 +904,30 @@ func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types. return uncles } -// TrieNode retrieves a blob of data associated with a trie node (or code hash) +// TrieNode retrieves a blob of data associated with a trie node // either from ephemeral in-memory cache, or from persistent storage. func (bc *BlockChain) TrieNode(hash common.Hash) ([]byte, error) { return bc.stateCache.TrieDB().Node(hash) } +// ContractCode retrieves a blob of data associated with a contract hash +// either from ephemeral in-memory cache, or from persistent storage. +func (bc *BlockChain) ContractCode(hash common.Hash) ([]byte, error) { + return bc.stateCache.ContractCode(common.Hash{}, hash) +} + +// ContractCodeWithPrefix retrieves a blob of data associated with a contract +// hash either from ephemeral in-memory cache, or from persistent storage. +// +// If the code doesn't exist in the in-memory cache, check the storage with +// new code scheme. +func (bc *BlockChain) ContractCodeWithPrefix(hash common.Hash) ([]byte, error) { + type codeReader interface { + ContractCodeWithPrefix(addrHash, codeHash common.Hash) ([]byte, error) + } + return bc.stateCache.(codeReader).ContractCodeWithPrefix(common.Hash{}, hash) +} + // Stop stops the blockchain service. If any imports are currently in progress // it will abort them using the procInterrupt. func (bc *BlockChain) Stop() { @@ -815,10 +937,17 @@ func (bc *BlockChain) Stop() { // Unsubscribe all subscriptions registered from blockchain bc.scope.Close() close(bc.quit) - atomic.StoreInt32(&bc.procInterrupt, 1) - + bc.StopInsert() bc.wg.Wait() + // Ensure that the entirety of the state snapshot is journalled to disk. + var snapBase common.Hash + if bc.snaps != nil { + var err error + if snapBase, err = bc.snaps.Journal(bc.CurrentBlock().Root()); err != nil { + log.Error("Failed to journal state snapshot", "err", err) + } + } // Ensure the state of a recent block is also stored to disk before exiting. // We're writing three different states to catch different restart scenarios: // - HEAD: So we don't need to reprocess any blocks in the general case @@ -832,11 +961,17 @@ func (bc *BlockChain) Stop() { recent := bc.GetBlockByNumber(number - offset) log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root()) - if err := triedb.Commit(recent.Root(), true); err != nil { + if err := triedb.Commit(recent.Root(), true, nil); err != nil { log.Error("Failed to commit recent state trie", "err", err) } } } + if snapBase != (common.Hash{}) { + log.Info("Writing snapshot state to disk", "root", snapBase) + if err := triedb.Commit(snapBase, true, nil); err != nil { + log.Error("Failed to commit recent state trie", "err", err) + } + } for !bc.triegc.Empty() { triedb.Dereference(bc.triegc.PopItem().(common.Hash)) } @@ -844,7 +979,25 @@ func (bc *BlockChain) Stop() { log.Error("Dangling trie nodes after full cleanup") } } - log.Info("Blockchain manager stopped") + // Ensure all live cached entries be saved into disk, so that we can skip + // cache warmup when node restarts. + if bc.cacheConfig.TrieCleanJournal != "" { + triedb := bc.stateCache.TrieDB() + triedb.SaveCache(bc.cacheConfig.TrieCleanJournal) + } + log.Info("Blockchain stopped") +} + +// StopInsert interrupts all insertion methods, causing them to return +// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after +// calling this method. +func (bc *BlockChain) StopInsert() { + atomic.StoreInt32(&bc.procInterrupt, 1) +} + +// insertStopped returns true after StopInsert has been called. +func (bc *BlockChain) insertStopped() bool { + return atomic.LoadInt32(&bc.procInterrupt) == 1 } func (bc *BlockChain) procFutureBlocks() { @@ -855,8 +1008,9 @@ func (bc *BlockChain) procFutureBlocks() { } } if len(blocks) > 0 { - types.BlockBy(types.Number).Sort(blocks) - + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].NumberU64() < blocks[j].NumberU64() + }) // Insert one by one as chain insertion needs contiguous ancestry between blocks for i := range blocks { bc.InsertChain(blocks[i : i+1]) @@ -873,42 +1027,6 @@ const ( SideStatTy ) -// Rollback is designed to remove a chain of links from the database that aren't -// certain enough to be valid. -func (bc *BlockChain) Rollback(chain []common.Hash) { - bc.chainmu.Lock() - defer bc.chainmu.Unlock() - - for i := len(chain) - 1; i >= 0; i-- { - hash := chain[i] - - currentHeader := bc.hc.CurrentHeader() - if currentHeader.Hash() == hash { - bc.hc.SetCurrentHeader(bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1)) - } - if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash { - newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1) - rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash()) - bc.currentFastBlock.Store(newFastBlock) - headFastBlockGauge.Update(int64(newFastBlock.NumberU64())) - } - if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash { - newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1) - rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash()) - bc.currentBlock.Store(newBlock) - headBlockGauge.Update(int64(newBlock.NumberU64())) - } - } - // Truncate ancient data which exceeds the current header. - // - // Notably, it can happen that system crashes without truncating the ancient data - // but the head indicator has been updated in the active store. Regarding this issue, - // system will self recovery by truncating the extra data during the setup phase. - if err := bc.truncateAncient(bc.hc.CurrentHeader().Number.Uint64()); err != nil { - log.Crit("Truncate ancient store failed", "err", err) - } -} - // truncateAncient rewinds the blockchain to the specified header and deletes all // data in the ancient store that exceeds the specified header. func (bc *BlockChain) truncateAncient(head uint64) error { @@ -982,7 +1100,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ size = 0 ) // updateHead updates the head fast sync block if the inserted blocks are better - // and returns a indicator whether the inserted blocks are canonical. + // and returns an indicator whether the inserted blocks are canonical. updateHead := func(head *types.Block) bool { if bc.manualCanonical { return false @@ -1024,7 +1142,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ var deleted []*numberHash for i, block := range blockChain { // Short circuit insertion if shutting down or processing failed - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.insertStopped() { return 0, errInsertionInterrupted } // Short circuit insertion if it is required(used in testing only) @@ -1064,7 +1182,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } // Don't collect too much in-memory, write it out every 100K blocks if len(deleted) > 100000 { - // Sync the ancient store explicitly to ensure all data has been flushed to disk. if err := bc.db.Sync(); err != nil { return 0, err @@ -1096,8 +1213,23 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } // Flush data into ancient database. size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64())) - rawdb.WriteTxLookupEntries(batch, block) + // Write tx indices if any condition is satisfied: + // * If user requires to reserve all tx indices(txlookuplimit=0) + // * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit) + // * If block number is large enough to be regarded as a recent block + // It means blocks below the ancientLimit-txlookupLimit won't be indexed. + // + // But if the `TxIndexTail` is not nil, e.g. Geth is initialized with + // an external ancient database, during the setup, blockchain will start + // a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients) + // range. In this case, all tx indices of newly imported blocks should be + // generated. + if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit { + rawdb.WriteTxLookupEntriesByBlock(batch, block) + } else if rawdb.ReadTxIndexTail(bc.db) != nil { + rawdb.WriteTxLookupEntriesByBlock(batch, block) + } stats.processed++ } // Flush all tx-lookup index data. @@ -1154,26 +1286,37 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } // writeLive writes blockchain and corresponding receipt chain into active store. writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { + skipPresenceCheck := false batch := bc.db.NewBatch() for i, block := range blockChain { // Short circuit insertion if shutting down or processing failed - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.insertStopped() { return 0, errInsertionInterrupted } // Short circuit if the owner header is unknown if !bc.HasHeader(block.Hash(), block.NumberU64()) { return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4]) } - if bc.HasBlock(block.Hash(), block.NumberU64()) { - stats.ignored++ - continue + if !skipPresenceCheck { + // Ignore if the entire data is already known + if bc.HasBlock(block.Hash(), block.NumberU64()) { + stats.ignored++ + continue + } else { + // If block N is not present, neither are the later blocks. + // This should be true, but if we are mistaken, the shortcut + // here will only cause overwriting of some existing data + skipPresenceCheck = true + } } // Write all the data out into the database rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()) rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i]) - rawdb.WriteTxLookupEntries(batch, block) + rawdb.WriteTxLookupEntriesByBlock(batch, block) // Always write tx indices for live blocks, we assume they are needed - stats.processed++ + // Write everything belongs to the blocks into the database. So that + // we can ensure all components of body is completed(body, receipts, + // tx indexes) if batch.ValueSize() >= ethdb.IdealBatchSize { if err := batch.Write(); err != nil { return 0, err @@ -1181,7 +1324,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ size += batch.ValueSize() batch.Reset() } + stats.processed++ } + // Write everything belongs to the blocks into the database. So that |