aboutsummaryrefslogtreecommitdiff
path: root/core/blockchain.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/blockchain.go')
-rw-r--r--core/blockchain.go911
1 files changed, 589 insertions, 322 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index 6e316c7..208a026 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -23,6 +23,7 @@ import (
"io"
"math/big"
mrand "math/rand"
+ "sort"
"sync"
"sync/atomic"
"time"
@@ -30,19 +31,20 @@ import (
"github.com/ava-labs/coreth/consensus"
"github.com/ava-labs/coreth/core/rawdb"
"github.com/ava-labs/coreth/core/state"
+ "github.com/ava-labs/coreth/core/state/snapshot"
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/core/vm"
"github.com/ava-labs/coreth/params"
- "github.com/ava-labs/go-ethereum/common"
- "github.com/ava-labs/go-ethereum/common/mclock"
- "github.com/ava-labs/go-ethereum/common/prque"
- "github.com/ava-labs/go-ethereum/ethdb"
- "github.com/ava-labs/go-ethereum/event"
- "github.com/ava-labs/go-ethereum/log"
- "github.com/ava-labs/go-ethereum/metrics"
- "github.com/ava-labs/go-ethereum/rlp"
- "github.com/ava-labs/go-ethereum/trie"
- "github.com/hashicorp/golang-lru"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/common/prque"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/trie"
+ lru "github.com/hashicorp/golang-lru"
)
var (
@@ -60,12 +62,19 @@ var (
storageUpdateTimer = metrics.NewRegisteredTimer("chain/storage/updates", nil)
storageCommitTimer = metrics.NewRegisteredTimer("chain/storage/commits", nil)
+ snapshotAccountReadTimer = metrics.NewRegisteredTimer("chain/snapshot/account/reads", nil)
+ snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storage/reads", nil)
+ snapshotCommitTimer = metrics.NewRegisteredTimer("chain/snapshot/commits", nil)
+
blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
- blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)
- blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
+
+ blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil)
+ blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
+ blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)
+ blockReorgInvalidatedTx = metrics.NewRegisteredMeter("chain/reorg/invalidTx", nil)
blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil)
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
@@ -103,17 +112,35 @@ const (
// - Version 7
// The following incompatible database changes were added:
// * Use freezer as the ancient database to maintain all ancient data
- BlockChainVersion uint64 = 7
+ // - Version 8
+ // The following incompatible database changes were added:
+ // * New scheme for contract code in order to separate the codes and trie nodes
+ BlockChainVersion uint64 = 8
)
// CacheConfig contains the configuration values for the trie caching/pruning
// that's resident in a blockchain.
type CacheConfig struct {
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
+ TrieCleanJournal string // Disk journal for saving clean cache entries.
+ TrieCleanRejournal time.Duration // Time interval to dump clean cache to disk periodically
TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
+ SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
+
+ SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
+}
+
+// defaultCacheConfig are the default caching values if none are specified by the
+// user (also used during testing).
+var defaultCacheConfig = &CacheConfig{
+ TrieCleanLimit: 256,
+ TrieDirtyLimit: 256,
+ TrieTimeLimit: 5 * time.Minute,
+ SnapshotLimit: 256,
+ SnapshotWait: true,
}
// BlockChain represents the canonical chain given a database with a genesis
@@ -135,9 +162,17 @@ type BlockChain struct {
cacheConfig *CacheConfig // Cache configuration for pruning
db ethdb.Database // Low level persistent database to store final content in
+ snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
+ // txLookupLimit is the maximum number of blocks from head whose tx indices
+ // are reserved:
+ // * 0: means no limit and regenerate any missing indexes
+ // * N: means N block limit [HEAD-N+1, HEAD] and delete extra indexes
+ // * nil: disable tx reindexer/deleter, but still index new blocks
+ txLookupLimit uint64
+
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
@@ -161,11 +196,10 @@ type BlockChain struct {
txLookupCache *lru.Cache // Cache for the most recent transaction lookup data.
futureBlocks *lru.Cache // future blocks are blocks added for later processing
- quit chan struct{} // blockchain quit channel
- running int32 // running must be called atomically
- // procInterrupt must be atomically called
- procInterrupt int32 // interrupt signaler for block processing
+ quit chan struct{} // blockchain quit channel
wg sync.WaitGroup // chain processing wait group for shutting down
+ running int32 // 0 if chain is running, 1 when stopped
+ procInterrupt int32 // interrupt signaler for block processing
engine consensus.Engine
validator Validator // Block and state validator interface
@@ -182,13 +216,9 @@ type BlockChain struct {
// NewBlockChain returns a fully initialised block chain using information
// available in the database. It initialises the default Ethereum Validator and
// Processor.
-func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, manualCanonical bool) (*BlockChain, error) {
+func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, txLookupLimit *uint64, manualCanonical bool) (*BlockChain, error) {
if cacheConfig == nil {
- cacheConfig = &CacheConfig{
- TrieCleanLimit: 256,
- TrieDirtyLimit: 256,
- TrieTimeLimit: 5 * time.Minute,
- }
+ cacheConfig = defaultCacheConfig
}
bodyCache, _ := lru.New(bodyCacheLimit)
bodyRLPCache, _ := lru.New(bodyCacheLimit)
@@ -203,7 +233,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(nil),
- stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit),
+ stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit, cacheConfig.TrieCleanJournal),
quit: make(chan struct{}),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
@@ -222,7 +252,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
bc.processor = NewStateProcessor(chainConfig, bc, engine)
var err error
- bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt)
+ bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped)
if err != nil {
return nil, err
}
@@ -230,18 +260,35 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
if bc.genesisBlock == nil {
return nil, ErrNoGenesis
}
+
+ var nilBlock *types.Block
+ bc.currentBlock.Store(nilBlock)
+ bc.currentFastBlock.Store(nilBlock)
+
// Initialize the chain with ancient data if it isn't empty.
+ var txIndexBlock uint64
+
if bc.empty() {
rawdb.InitDatabaseFromFreezer(bc.db)
+ // If ancient database is not empty, reconstruct all missing
+ // indices in the background.
+ frozen, _ := bc.db.Ancients()
+ if frozen > 0 {
+ txIndexBlock = frozen
+ }
}
if err := bc.loadLastState(); err != nil {
return nil, err
}
- // The first thing the node will do is reconstruct the verification data for
- // the head block (ethash cache or clique voting snapshot). Might as well do
- // it in advance.
- bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true)
-
+ // Make sure the state associated with the block is available
+ head := bc.CurrentBlock()
+ if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil {
+ log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash())
+ if err := bc.SetHead(head.NumberU64()); err != nil {
+ return nil, err
+ }
+ }
+ // Ensure that a previous crash in SetHead doesn't leave extra ancients
if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 {
var (
needRewind bool
@@ -251,7 +298,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
// blockchain repair. If the head full block is even lower than the ancient
// chain, truncate the ancient store.
fullBlock := bc.CurrentBlock()
- if fullBlock != nil && fullBlock != bc.genesisBlock && fullBlock.NumberU64() < frozen-1 {
+ if fullBlock != nil && fullBlock.Hash() != bc.genesisBlock.Hash() && fullBlock.NumberU64() < frozen-1 {
needRewind = true
low = fullBlock.NumberU64()
}
@@ -266,15 +313,17 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
}
if needRewind {
- var hashes []common.Hash
- previous := bc.CurrentHeader().Number.Uint64()
- for i := low + 1; i <= bc.CurrentHeader().Number.Uint64(); i++ {
- hashes = append(hashes, rawdb.ReadCanonicalHash(bc.db, i))
+ log.Error("Truncating ancient chain", "from", bc.CurrentHeader().Number.Uint64(), "to", low)
+ if err := bc.SetHead(low); err != nil {
+ return nil, err
}
- bc.Rollback(hashes)
- log.Warn("Truncate ancient chain", "from", previous, "to", low)
}
}
+ // The first thing the node will do is reconstruct the verification data for
+ // the head block (ethash cache or clique voting snapshot). Might as well do
+ // it in advance.
+ bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true)
+
// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
for hash := range BadHashes {
if header := bc.GetHeaderByHash(hash); header != nil {
@@ -283,20 +332,39 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
// make sure the headerByNumber (if present) is in our current canonical chain
if headerByNumber != nil && headerByNumber.Hash() == header.Hash() {
log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash)
- bc.SetHead(header.Number.Uint64() - 1)
+ if err := bc.SetHead(header.Number.Uint64() - 1); err != nil {
+ return nil, err
+ }
log.Error("Chain rewind was successful, resuming normal operation")
}
}
}
+ // Load any existing snapshot, regenerating it if loading failed
+ if bc.cacheConfig.SnapshotLimit > 0 {
+ bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, bc.CurrentBlock().Root(), !bc.cacheConfig.SnapshotWait)
+ }
// Take ownership of this particular state
go bc.update()
+ if txLookupLimit != nil {
+ bc.txLookupLimit = *txLookupLimit
+ go bc.maintainTxIndex(txIndexBlock)
+ }
+ // If periodic cache journal is required, spin it up.
+ if bc.cacheConfig.TrieCleanRejournal > 0 {
+ if bc.cacheConfig.TrieCleanRejournal < time.Minute {
+ log.Warn("Sanitizing invalid trie cache journal time", "provided", bc.cacheConfig.TrieCleanRejournal, "updated", time.Minute)
+ bc.cacheConfig.TrieCleanRejournal = time.Minute
+ }
+ triedb := bc.stateCache.TrieDB()
+ bc.wg.Add(1)
+ go func() {
+ defer bc.wg.Done()
+ triedb.SaveCachePeriodically(bc.cacheConfig.TrieCleanJournal, bc.cacheConfig.TrieCleanRejournal, bc.quit)
+ }()
+ }
return bc, nil
}
-func (bc *BlockChain) getProcInterrupt() bool {
- return atomic.LoadInt32(&bc.procInterrupt) == 1
-}
-
// GetVMConfig returns the block chain VM config.
func (bc *BlockChain) GetVMConfig() *vm.Config {
return &bc.vmConfig
@@ -333,15 +401,6 @@ func (bc *BlockChain) loadLastState() error {
log.Warn("Head block missing, resetting chain", "hash", head)
return bc.Reset()
}
- // Make sure the state associated with the block is available
- if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
- // Dangling block without a state associated, init from scratch
- log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash())
- if err := bc.repair(&currentBlock); err != nil {
- return err
- }
- rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash())
- }
// Everything seems to be fine, set as the head block
bc.currentBlock.Store(currentBlock)
headBlockGauge.Update(int64(currentBlock.NumberU64()))
@@ -375,37 +434,59 @@ func (bc *BlockChain) loadLastState() error {
log.Info("Loaded most recent local header", "number", currentHeader.Number, "hash", currentHeader.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(currentHeader.Time), 0)))
log.Info("Loaded most recent local full block", "number", currentBlock.Number(), "hash", currentBlock.Hash(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(currentBlock.Time()), 0)))
log.Info("Loaded most recent local fast block", "number", currentFastBlock.Number(), "hash", currentFastBlock.Hash(), "td", fastTd, "age", common.PrettyAge(time.Unix(int64(currentFastBlock.Time()), 0)))
-
+ if pivot := rawdb.ReadLastPivotNumber(bc.db); pivot != nil {
+ log.Info("Loaded last fast-sync pivot marker", "number", *pivot)
+ }
return nil
}
-// SetHead rewinds the local chain to a new head. In the case of headers, everything
-// above the new head will be deleted and the new one set. In the case of blocks
-// though, the head may be further rewound if block bodies are missing (non-archive
-// nodes after a fast sync).
+// SetHead rewinds the local chain to a new head. Depending on whether the node
+// was fast synced or full synced and in which state, the method will try to
+// delete minimal data from disk whilst retaining chain consistency.
func (bc *BlockChain) SetHead(head uint64) error {
- log.Warn("Rewinding blockchain", "target", head)
-
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
- updateFn := func(db ethdb.KeyValueWriter, header *types.Header) {
- // Rewind the block chain, ensuring we don't end up with a stateless head block
- if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() < currentBlock.NumberU64() {
+ // Retrieve the last pivot block to short circuit rollbacks beyond it and the
+ // current freezer limit to start nuking id underflown
+ pivot := rawdb.ReadLastPivotNumber(bc.db)
+ frozen, _ := bc.db.Ancients()
+
+ updateFn := func(db ethdb.KeyValueWriter, header *types.Header) (uint64, bool) {
+ // Rewind the block chain, ensuring we don't end up with a stateless head
+ // block. Note, depth equality is permitted to allow using SetHead as a
+ // chain reparation mechanism without deleting any data!
+ if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() <= currentBlock.NumberU64() {
newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
if newHeadBlock == nil {
+ log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
newHeadBlock = bc.genesisBlock
} else {
- if _, err := state.New(newHeadBlock.Root(), bc.stateCache); err != nil {
- // Rewound state missing, rolled back to before pivot, reset to genesis
- newHeadBlock = bc.genesisBlock
+ // Block exists, keep rewinding until we find one with state
+ for {
+ if _, err := state.New(newHeadBlock.Root(), bc.stateCache, bc.snaps); err != nil {
+ log.Trace("Block state missing, rewinding further", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
+ if pivot == nil || newHeadBlock.NumberU64() > *pivot {
+ newHeadBlock = bc.GetBlock(newHeadBlock.ParentHash(), newHeadBlock.NumberU64()-1)
+ continue
+ } else {
+ log.Trace("Rewind passed pivot, aiming genesis", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "pivot", *pivot)
+ newHeadBlock = bc.genesisBlock
+ }
+ }
+ log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
+ break
}
}
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
+
+ // Degrade the chain markers if they are explicitly reverted.
+ // In theory we should update all in-memory markers in the
+ // last step, however the direction of SetHead is from high
+ // to low, so it's safe the update in-memory markers directly.
bc.currentBlock.Store(newHeadBlock)
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
}
-
// Rewind the fast block in a simpleton way to the target head
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() {
newHeadFastBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
@@ -414,11 +495,25 @@ func (bc *BlockChain) SetHead(head uint64) error {
newHeadFastBlock = bc.genesisBlock
}
rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash())
+
+ // Degrade the chain markers if they are explicitly reverted.
+ // In theory we should update all in-memory markers in the
+ // last step, however the direction of SetHead is from high
+ // to low, so it's safe the update in-memory markers directly.
bc.currentFastBlock.Store(newHeadFastBlock)
headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
}
- }
+ head := bc.CurrentBlock().NumberU64()
+ // If setHead underflown the freezer threshold and the block processing
+ // intent afterwards is full block importing, delete the chain segment
+ // between the stateful-block and the sethead target.
+ var wipe bool
+ if head+1 < frozen {
+ wipe = pivot == nil || head >= *pivot
+ }
+ return head, wipe // Only force wipe if full synced
+ }
// Rewind the header chain, deleting all block bodies until then
delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
// Ignore the error here since light client won't hit this path
@@ -426,10 +521,9 @@ func (bc *BlockChain) SetHead(head uint64) error {
if num+1 <= frozen {
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
- if err := bc.db.TruncateAncients(num + 1); err != nil {
+ if err := bc.db.TruncateAncients(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}
-
// Remove the hash <-> number mapping from the active store.
rawdb.DeleteHeaderNumber(db, hash)
} else {
@@ -441,8 +535,18 @@ func (bc *BlockChain) SetHead(head uint64) error {
}
// Todo(rjl493456442) txlookup, bloombits, etc
}
- bc.hc.SetHead(head, updateFn, delFn)
-
+ // If SetHead was only called as a chain reparation method, try to skip
+ // touching the header chain altogether, unless the freezer is broken
+ if block := bc.CurrentBlock(); block.NumberU64() == head {
+ if target, force := updateFn(bc.db, block.Header()); force {
+ bc.hc.SetHead(target, updateFn, delFn)
+ }
+ } else {
+ // Rewind the chain to the requested head and keep going backwards until a
+ // block with a state is found or fast sync pivot is passed
+ log.Warn("Rewinding blockchain", "target", head)
+ bc.hc.SetHead(head, updateFn, delFn)
+ }
// Clear out any stale content from the caches
bc.bodyCache.Purge()
bc.bodyRLPCache.Purge()
@@ -471,6 +575,10 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
headBlockGauge.Update(int64(block.NumberU64()))
bc.chainmu.Unlock()
+ // Destroy any existing state snapshot and regenerate it in the background
+ if bc.snaps != nil {
+ bc.snaps.Rebuild(block.Root())
+ }
log.Info("Committed new head block", "number", block.Number(), "hash", hash)
return nil
}
@@ -486,6 +594,15 @@ func (bc *BlockChain) CurrentBlock() *types.Block {
return bc.currentBlock.Load().(*types.Block)
}
+// Snapshot returns the blockchain snapshot tree. This method is mainly used for
+// testing, to make it possible to verify the snapshot after execution.
+//
+// Warning: There are no guarantees about the safety of using the returned 'snap' if the
+// blockchain is simultaneously importing blocks, so take care.
+func (bc *BlockChain) Snapshot() *snapshot.Tree {
+ return bc.snaps
+}
+
// CurrentFastBlock retrieves the current fast-sync head block of the canonical
// chain. The block is retrieved from the blockchain's internal cache.
func (bc *BlockChain) CurrentFastBlock() *types.Block {
@@ -509,7 +626,7 @@ func (bc *BlockChain) State() (*state.StateDB, error) {
// StateAt returns a new mutable state based on a particular point in time.
func (bc *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) {
- return state.New(root, bc.stateCache)
+ return state.New(root, bc.stateCache, bc.snaps)
}
// StateCache returns the caching database underpinning the blockchain instance.
@@ -533,46 +650,25 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
defer bc.chainmu.Unlock()
// Prepare the genesis block and reinitialise the chain
- if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
- log.Crit("Failed to write genesis block TD", "err", err)
+ batch := bc.db.NewBatch()
+ rawdb.WriteTd(batch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty())
+ rawdb.WriteBlock(batch, genesis)
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to write genesis block", "err", err)
}
- rawdb.WriteBlock(bc.db, genesis)
+ bc.writeHeadBlock(genesis)
+ // Last update all in-memory chain markers
bc.genesisBlock = genesis
- bc.insert(bc.genesisBlock)
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
-
bc.hc.SetGenesis(bc.genesisBlock.Header())
bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
bc.currentFastBlock.Store(bc.genesisBlock)
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
-
return nil
}
-// repair tries to repair the current blockchain by rolling back the current block
-// until one with associated state is found. This is needed to fix incomplete db
-// writes caused either by crashes/power outages, or simply non-committed tries.
-//
-// This method only rolls back the current block. The current header and current
-// fast block are left intact.
-func (bc *BlockChain) repair(head **types.Block) error {
- for {
- // Abort if we've rewound to a head block that does have associated state
- if _, err := state.New((*head).Root(), bc.stateCache); err == nil {
- log.Info("Rewound blockchain to past state", "number", (*head).Number(), "hash", (*head).Hash())
- return nil
- }
- // Otherwise rewind one block and recheck state availability there
- block := bc.GetBlock((*head).ParentHash(), (*head).NumberU64()-1)
- if block == nil {
- return fmt.Errorf("missing block %d [%x]", (*head).NumberU64()-1, (*head).ParentHash())
- }
- *head = block
- }
-}
-
// Export writes the active chain to the given writer.
func (bc *BlockChain) Export(w io.Writer) error {
return bc.ExportN(w, uint64(0), bc.CurrentBlock().NumberU64())
@@ -605,31 +701,39 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
return nil
}
-// insert injects a new head block into the current block chain. This method
+// writeHeadBlock injects a new head block into the current block chain. This method
// assumes that the block is indeed a true head. It will also reset the head
// header and the head fast sync block to this very same block if they are older
// or if they are on a different side chain.
//
// Note, this function assumes that the `mu` mutex is held!
-func (bc *BlockChain) insert(block *types.Block) {
+func (bc *BlockChain) writeHeadBlock(block *types.Block) {
// If the block is on a side chain or an unknown one, force other heads onto it too
updateHeads := rawdb.ReadCanonicalHash(bc.db, block.NumberU64()) != block.Hash()
// Add the block to the canonical chain number scheme and mark as the head
- rawdb.WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64())
- rawdb.WriteHeadBlockHash(bc.db, block.Hash())
-
- bc.currentBlock.Store(block)
- headBlockGauge.Update(int64(block.NumberU64()))
+ batch := bc.db.NewBatch()
+ rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64())
+ rawdb.WriteTxLookupEntriesByBlock(batch, block)
+ rawdb.WriteHeadBlockHash(batch, block.Hash())
// If the block is better than our head or is on a different chain, force update heads
if updateHeads {
+ rawdb.WriteHeadHeaderHash(batch, block.Hash())
+ rawdb.WriteHeadFastBlockHash(batch, block.Hash())
+ }
+ // Flush the whole batch into the disk, exit the node if failed
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to update chain indexes and markers", "err", err)
+ }
+ // Update all in-memory chain markers in the last step
+ if updateHeads {
bc.hc.SetCurrentHeader(block.Header())
- rawdb.WriteHeadFastBlockHash(bc.db, block.Hash())
-
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
}
+ bc.currentBlock.Store(block)
+ headBlockGauge.Update(int64(block.NumberU64()))
}
// Genesis retrieves the chain's genesis block.
@@ -800,12 +904,30 @@ func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.
return uncles
}
-// TrieNode retrieves a blob of data associated with a trie node (or code hash)
+// TrieNode retrieves a blob of data associated with a trie node
// either from ephemeral in-memory cache, or from persistent storage.
func (bc *BlockChain) TrieNode(hash common.Hash) ([]byte, error) {
return bc.stateCache.TrieDB().Node(hash)
}
+// ContractCode retrieves a blob of data associated with a contract hash
+// either from ephemeral in-memory cache, or from persistent storage.
+func (bc *BlockChain) ContractCode(hash common.Hash) ([]byte, error) {
+ return bc.stateCache.ContractCode(common.Hash{}, hash)
+}
+
+// ContractCodeWithPrefix retrieves a blob of data associated with a contract
+// hash either from ephemeral in-memory cache, or from persistent storage.
+//
+// If the code doesn't exist in the in-memory cache, check the storage with
+// new code scheme.
+func (bc *BlockChain) ContractCodeWithPrefix(hash common.Hash) ([]byte, error) {
+ type codeReader interface {
+ ContractCodeWithPrefix(addrHash, codeHash common.Hash) ([]byte, error)
+ }
+ return bc.stateCache.(codeReader).ContractCodeWithPrefix(common.Hash{}, hash)
+}
+
// Stop stops the blockchain service. If any imports are currently in progress
// it will abort them using the procInterrupt.
func (bc *BlockChain) Stop() {
@@ -815,10 +937,17 @@ func (bc *BlockChain) Stop() {
// Unsubscribe all subscriptions registered from blockchain
bc.scope.Close()
close(bc.quit)
- atomic.StoreInt32(&bc.procInterrupt, 1)
-
+ bc.StopInsert()
bc.wg.Wait()
+ // Ensure that the entirety of the state snapshot is journalled to disk.
+ var snapBase common.Hash
+ if bc.snaps != nil {
+ var err error
+ if snapBase, err = bc.snaps.Journal(bc.CurrentBlock().Root()); err != nil {
+ log.Error("Failed to journal state snapshot", "err", err)
+ }
+ }
// Ensure the state of a recent block is also stored to disk before exiting.
// We're writing three different states to catch different restart scenarios:
// - HEAD: So we don't need to reprocess any blocks in the general case
@@ -832,11 +961,17 @@ func (bc *BlockChain) Stop() {
recent := bc.GetBlockByNumber(number - offset)
log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root())
- if err := triedb.Commit(recent.Root(), true); err != nil {
+ if err := triedb.Commit(recent.Root(), true, nil); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
}
}
}
+ if snapBase != (common.Hash{}) {
+ log.Info("Writing snapshot state to disk", "root", snapBase)
+ if err := triedb.Commit(snapBase, true, nil); err != nil {
+ log.Error("Failed to commit recent state trie", "err", err)
+ }
+ }
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem().(common.Hash))
}
@@ -844,7 +979,25 @@ func (bc *BlockChain) Stop() {
log.Error("Dangling trie nodes after full cleanup")
}
}
- log.Info("Blockchain manager stopped")
+ // Ensure all live cached entries be saved into disk, so that we can skip
+ // cache warmup when node restarts.
+ if bc.cacheConfig.TrieCleanJournal != "" {
+ triedb := bc.stateCache.TrieDB()
+ triedb.SaveCache(bc.cacheConfig.TrieCleanJournal)
+ }
+ log.Info("Blockchain stopped")
+}
+
+// StopInsert interrupts all insertion methods, causing them to return
+// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
+// calling this method.
+func (bc *BlockChain) StopInsert() {
+ atomic.StoreInt32(&bc.procInterrupt, 1)
+}
+
+// insertStopped returns true after StopInsert has been called.
+func (bc *BlockChain) insertStopped() bool {
+ return atomic.LoadInt32(&bc.procInterrupt) == 1
}
func (bc *BlockChain) procFutureBlocks() {
@@ -855,8 +1008,9 @@ func (bc *BlockChain) procFutureBlocks() {
}
}
if len(blocks) > 0 {
- types.BlockBy(types.Number).Sort(blocks)
-
+ sort.Slice(blocks, func(i, j int) bool {
+ return blocks[i].NumberU64() < blocks[j].NumberU64()
+ })
// Insert one by one as chain insertion needs contiguous ancestry between blocks
for i := range blocks {
bc.InsertChain(blocks[i : i+1])
@@ -873,42 +1027,6 @@ const (
SideStatTy
)
-// Rollback is designed to remove a chain of links from the database that aren't
-// certain enough to be valid.
-func (bc *BlockChain) Rollback(chain []common.Hash) {
- bc.chainmu.Lock()
- defer bc.chainmu.Unlock()
-
- for i := len(chain) - 1; i >= 0; i-- {
- hash := chain[i]
-
- currentHeader := bc.hc.CurrentHeader()
- if currentHeader.Hash() == hash {
- bc.hc.SetCurrentHeader(bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1))
- }
- if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash {
- newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
- rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash())
- bc.currentFastBlock.Store(newFastBlock)
- headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
- }
- if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
- newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
- rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash())
- bc.currentBlock.Store(newBlock)
- headBlockGauge.Update(int64(newBlock.NumberU64()))
- }
- }
- // Truncate ancient data which exceeds the current header.
- //
- // Notably, it can happen that system crashes without truncating the ancient data
- // but the head indicator has been updated in the active store. Regarding this issue,
- // system will self recovery by truncating the extra data during the setup phase.
- if err := bc.truncateAncient(bc.hc.CurrentHeader().Number.Uint64()); err != nil {
- log.Crit("Truncate ancient store failed", "err", err)
- }
-}
-
// truncateAncient rewinds the blockchain to the specified header and deletes all
// data in the ancient store that exceeds the specified header.
func (bc *BlockChain) truncateAncient(head uint64) error {
@@ -982,7 +1100,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
size = 0
)
// updateHead updates the head fast sync block if the inserted blocks are better
- // and returns a indicator whether the inserted blocks are canonical.
+ // and returns an indicator whether the inserted blocks are canonical.
updateHead := func(head *types.Block) bool {
if bc.manualCanonical {
return false
@@ -1024,7 +1142,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
var deleted []*numberHash
for i, block := range blockChain {
// Short circuit insertion if shutting down or processing failed
- if atomic.LoadInt32(&bc.procInterrupt) == 1 {
+ if bc.insertStopped() {
return 0, errInsertionInterrupted
}
// Short circuit insertion if it is required(used in testing only)
@@ -1064,7 +1182,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
// Don't collect too much in-memory, write it out every 100K blocks
if len(deleted) > 100000 {
-
// Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.Sync(); err != nil {
return 0, err
@@ -1096,8 +1213,23 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
// Flush data into ancient database.
size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64()))
- rawdb.WriteTxLookupEntries(batch, block)
+ // Write tx indices if any condition is satisfied:
+ // * If user requires to reserve all tx indices(txlookuplimit=0)
+ // * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit)
+ // * If block number is large enough to be regarded as a recent block
+ // It means blocks below the ancientLimit-txlookupLimit won't be indexed.
+ //
+ // But if the `TxIndexTail` is not nil, e.g. Geth is initialized with
+ // an external ancient database, during the setup, blockchain will start
+ // a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
+ // range. In this case, all tx indices of newly imported blocks should be
+ // generated.
+ if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit {
+ rawdb.WriteTxLookupEntriesByBlock(batch, block)
+ } else if rawdb.ReadTxIndexTail(bc.db) != nil {
+ rawdb.WriteTxLookupEntriesByBlock(batch, block)
+ }
stats.processed++
}
// Flush all tx-lookup index data.
@@ -1154,26 +1286,37 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
// writeLive writes blockchain and corresponding receipt chain into active store.
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
+ skipPresenceCheck := false
batch := bc.db.NewBatch()
for i, block := range blockChain {
// Short circuit insertion if shutting down or processing failed
- if atomic.LoadInt32(&bc.procInterrupt) == 1 {
+ if bc.insertStopped() {
return 0, errInsertionInterrupted
}
// Short circuit if the owner header is unknown
if !bc.HasHeader(block.Hash(), block.NumberU64()) {
return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
}
- if bc.HasBlock(block.Hash(), block.NumberU64()) {
- stats.ignored++
- continue
+ if !skipPresenceCheck {
+ // Ignore if the entire data is already known
+ if bc.HasBlock(block.Hash(), block.NumberU64()) {
+ stats.ignored++
+ continue
+ } else {
+ // If block N is not present, neither are the later blocks.
+ // This should be true, but if we are mistaken, the shortcut
+ // here will only cause overwriting of some existing data
+ skipPresenceCheck = true
+ }
}
// Write all the data out into the database
rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
- rawdb.WriteTxLookupEntries(batch, block)
+ rawdb.WriteTxLookupEntriesByBlock(batch, block) // Always write tx indices for live blocks, we assume they are needed
- stats.processed++
+ // Write everything belongs to the blocks into the database. So that
+ // we can ensure all components of body is completed(body, receipts,
+ // tx indexes)
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return 0, err
@@ -1181,7 +1324,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
size += batch.ValueSize()
batch.Reset()
}
+ stats.processed++
}
+ // Write everything belongs to the blocks into the database. So that
+ // we can ensure all components of body is completed(body, receipts,
+ // tx indexes)
if batch.ValueSize() > 0 {
size += batch.ValueSize()
if err := batch.Write(); err != nil {
@@ -1191,7 +1338,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
updateHead(blockChain[len(blockChain)-1])
return 0, nil
}
- // Write downloaded chain data and corresponding receipt chain data.
+ // Write downloaded chain data and corresponding receipt chain data
if len(ancientBlocks) > 0 {
if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil {
if err == errInsertionInterrupted {
@@ -1200,6 +1347,19 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return n, err
}
}
+ // Write the tx index tail (block number from where we index) before write any live blocks
+ if len(liveBlocks) > 0 && liveBlocks[0].NumberU64() == ancientLimit+1 {
+ // The tx index tail can only be one of the following two options:
+ // * 0: all ancient blocks have been indexed
+ // * ancient-limit: the indices of blocks before ancient-limit are ignored
+ if tail := rawdb.ReadTxIndexTail(bc.db); tail == nil {
+ if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit {</