aboutsummaryrefslogtreecommitdiff
path: root/core/blockchain.go
diff options
context:
space:
mode:
authorTed Yin <[email protected]>2020-09-18 13:14:29 -0400
committerGitHub <[email protected]>2020-09-18 13:14:29 -0400
commitd048937c48753d9eaef771bf71820cf95d79df26 (patch)
tree1a7f65fcd72e77092525ab01625b8b9d365e3e40 /core/blockchain.go
parent7d1388c743b4ec8f4a86bea95bfada785dee83f7 (diff)
parent7d8c85cf8895b0f998d8eafb02f99d5b689fcd59 (diff)
Merge pull request #34 from ava-labs/devv0.3.0-rc.5
Dev
Diffstat (limited to 'core/blockchain.go')
-rw-r--r--core/blockchain.go911
1 files changed, 589 insertions, 322 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index 6e316c7..208a026 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -23,6 +23,7 @@ import (
"io"
"math/big"
mrand "math/rand"
+ "sort"
"sync"
"sync/atomic"
"time"
@@ -30,19 +31,20 @@ import (
"github.com/ava-labs/coreth/consensus"
"github.com/ava-labs/coreth/core/rawdb"
"github.com/ava-labs/coreth/core/state"
+ "github.com/ava-labs/coreth/core/state/snapshot"
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/core/vm"
"github.com/ava-labs/coreth/params"
- "github.com/ava-labs/go-ethereum/common"
- "github.com/ava-labs/go-ethereum/common/mclock"
- "github.com/ava-labs/go-ethereum/common/prque"
- "github.com/ava-labs/go-ethereum/ethdb"
- "github.com/ava-labs/go-ethereum/event"
- "github.com/ava-labs/go-ethereum/log"
- "github.com/ava-labs/go-ethereum/metrics"
- "github.com/ava-labs/go-ethereum/rlp"
- "github.com/ava-labs/go-ethereum/trie"
- "github.com/hashicorp/golang-lru"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/common/prque"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/trie"
+ lru "github.com/hashicorp/golang-lru"
)
var (
@@ -60,12 +62,19 @@ var (
storageUpdateTimer = metrics.NewRegisteredTimer("chain/storage/updates", nil)
storageCommitTimer = metrics.NewRegisteredTimer("chain/storage/commits", nil)
+ snapshotAccountReadTimer = metrics.NewRegisteredTimer("chain/snapshot/account/reads", nil)
+ snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storage/reads", nil)
+ snapshotCommitTimer = metrics.NewRegisteredTimer("chain/snapshot/commits", nil)
+
blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
- blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)
- blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
+
+ blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil)
+ blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
+ blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)
+ blockReorgInvalidatedTx = metrics.NewRegisteredMeter("chain/reorg/invalidTx", nil)
blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil)
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
@@ -103,17 +112,35 @@ const (
// - Version 7
// The following incompatible database changes were added:
// * Use freezer as the ancient database to maintain all ancient data
- BlockChainVersion uint64 = 7
+ // - Version 8
+ // The following incompatible database changes were added:
+ // * New scheme for contract code in order to separate the codes and trie nodes
+ BlockChainVersion uint64 = 8
)
// CacheConfig contains the configuration values for the trie caching/pruning
// that's resident in a blockchain.
type CacheConfig struct {
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
+ TrieCleanJournal string // Disk journal for saving clean cache entries.
+ TrieCleanRejournal time.Duration // Time interval to dump clean cache to disk periodically
TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
+ SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
+
+ SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
+}
+
+// defaultCacheConfig are the default caching values if none are specified by the
+// user (also used during testing).
+var defaultCacheConfig = &CacheConfig{
+ TrieCleanLimit: 256,
+ TrieDirtyLimit: 256,
+ TrieTimeLimit: 5 * time.Minute,
+ SnapshotLimit: 256,
+ SnapshotWait: true,
}
// BlockChain represents the canonical chain given a database with a genesis
@@ -135,9 +162,17 @@ type BlockChain struct {
cacheConfig *CacheConfig // Cache configuration for pruning
db ethdb.Database // Low level persistent database to store final content in
+ snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
+ // txLookupLimit is the maximum number of blocks from head whose tx indices
+ // are reserved:
+ // * 0: means no limit and regenerate any missing indexes
+ // * N: means N block limit [HEAD-N+1, HEAD] and delete extra indexes
+ // * nil: disable tx reindexer/deleter, but still index new blocks
+ txLookupLimit uint64
+
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
@@ -161,11 +196,10 @@ type BlockChain struct {
txLookupCache *lru.Cache // Cache for the most recent transaction lookup data.
futureBlocks *lru.Cache // future blocks are blocks added for later processing
- quit chan struct{} // blockchain quit channel
- running int32 // running must be called atomically
- // procInterrupt must be atomically called
- procInterrupt int32 // interrupt signaler for block processing
+ quit chan struct{} // blockchain quit channel
wg sync.WaitGroup // chain processing wait group for shutting down
+ running int32 // 0 if chain is running, 1 when stopped
+ procInterrupt int32 // interrupt signaler for block processing
engine consensus.Engine
validator Validator // Block and state validator interface
@@ -182,13 +216,9 @@ type BlockChain struct {
// NewBlockChain returns a fully initialised block chain using information
// available in the database. It initialises the default Ethereum Validator and
// Processor.
-func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, manualCanonical bool) (*BlockChain, error) {
+func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, txLookupLimit *uint64, manualCanonical bool) (*BlockChain, error) {
if cacheConfig == nil {
- cacheConfig = &CacheConfig{
- TrieCleanLimit: 256,
- TrieDirtyLimit: 256,
- TrieTimeLimit: 5 * time.Minute,
- }
+ cacheConfig = defaultCacheConfig
}
bodyCache, _ := lru.New(bodyCacheLimit)
bodyRLPCache, _ := lru.New(bodyCacheLimit)
@@ -203,7 +233,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(nil),
- stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit),
+ stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit, cacheConfig.TrieCleanJournal),
quit: make(chan struct{}),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
@@ -222,7 +252,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
bc.processor = NewStateProcessor(chainConfig, bc, engine)
var err error
- bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt)
+ bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped)
if err != nil {
return nil, err
}
@@ -230,18 +260,35 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
if bc.genesisBlock == nil {
return nil, ErrNoGenesis
}
+
+ var nilBlock *types.Block
+ bc.currentBlock.Store(nilBlock)
+ bc.currentFastBlock.Store(nilBlock)
+
// Initialize the chain with ancient data if it isn't empty.
+ var txIndexBlock uint64
+
if bc.empty() {
rawdb.InitDatabaseFromFreezer(bc.db)
+ // If ancient database is not empty, reconstruct all missing
+ // indices in the background.
+ frozen, _ := bc.db.Ancients()
+ if frozen > 0 {
+ txIndexBlock = frozen
+ }
}
if err := bc.loadLastState(); err != nil {
return nil, err
}
- // The first thing the node will do is reconstruct the verification data for
- // the head block (ethash cache or clique voting snapshot). Might as well do
- // it in advance.
- bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true)
-
+ // Make sure the state associated with the block is available
+ head := bc.CurrentBlock()
+ if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil {
+ log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash())
+ if err := bc.SetHead(head.NumberU64()); err != nil {
+ return nil, err
+ }
+ }
+ // Ensure that a previous crash in SetHead doesn't leave extra ancients
if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 {
var (
needRewind bool
@@ -251,7 +298,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
// blockchain repair. If the head full block is even lower than the ancient
// chain, truncate the ancient store.
fullBlock := bc.CurrentBlock()
- if fullBlock != nil && fullBlock != bc.genesisBlock && fullBlock.NumberU64() < frozen-1 {
+ if fullBlock != nil && fullBlock.Hash() != bc.genesisBlock.Hash() && fullBlock.NumberU64() < frozen-1 {
needRewind = true
low = fullBlock.NumberU64()
}
@@ -266,15 +313,17 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
}
if needRewind {
- var hashes []common.Hash
- previous := bc.CurrentHeader().Number.Uint64()
- for i := low + 1; i <= bc.CurrentHeader().Number.Uint64(); i++ {
- hashes = append(hashes, rawdb.ReadCanonicalHash(bc.db, i))
+ log.Error("Truncating ancient chain", "from", bc.CurrentHeader().Number.Uint64(), "to", low)
+ if err := bc.SetHead(low); err != nil {
+ return nil, err
}
- bc.Rollback(hashes)
- log.Warn("Truncate ancient chain", "from", previous, "to", low)
}
}
+ // The first thing the node will do is reconstruct the verification data for
+ // the head block (ethash cache or clique voting snapshot). Might as well do
+ // it in advance.
+ bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true)
+
// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
for hash := range BadHashes {
if header := bc.GetHeaderByHash(hash); header != nil {
@@ -283,20 +332,39 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
// make sure the headerByNumber (if present) is in our current canonical chain
if headerByNumber != nil && headerByNumber.Hash() == header.Hash() {
log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash)
- bc.SetHead(header.Number.Uint64() - 1)
+ if err := bc.SetHead(header.Number.Uint64() - 1); err != nil {
+ return nil, err
+ }
log.Error("Chain rewind was successful, resuming normal operation")
}
}
}
+ // Load any existing snapshot, regenerating it if loading failed
+ if bc.cacheConfig.SnapshotLimit > 0 {
+ bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, bc.CurrentBlock().Root(), !bc.cacheConfig.SnapshotWait)
+ }
// Take ownership of this particular state
go bc.update()
+ if txLookupLimit != nil {
+ bc.txLookupLimit = *txLookupLimit
+ go bc.maintainTxIndex(txIndexBlock)
+ }
+ // If periodic cache journal is required, spin it up.
+ if bc.cacheConfig.TrieCleanRejournal > 0 {
+ if bc.cacheConfig.TrieCleanRejournal < time.Minute {
+ log.Warn("Sanitizing invalid trie cache journal time", "provided", bc.cacheConfig.TrieCleanRejournal, "updated", time.Minute)
+ bc.cacheConfig.TrieCleanRejournal = time.Minute
+ }
+ triedb := bc.stateCache.TrieDB()
+ bc.wg.Add(1)
+ go func() {
+ defer bc.wg.Done()
+ triedb.SaveCachePeriodically(bc.cacheConfig.TrieCleanJournal, bc.cacheConfig.TrieCleanRejournal, bc.quit)
+ }()
+ }
return bc, nil
}
-func (bc *BlockChain) getProcInterrupt() bool {
- return atomic.LoadInt32(&bc.procInterrupt) == 1
-}
-
// GetVMConfig returns the block chain VM config.
func (bc *BlockChain) GetVMConfig() *vm.Config {
return &bc.vmConfig
@@ -333,15 +401,6 @@ func (bc *BlockChain) loadLastState() error {
log.Warn("Head block missing, resetting chain", "hash", head)
return bc.Reset()
}
- // Make sure the state associated with the block is available
- if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
- // Dangling block without a state associated, init from scratch
- log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash())
- if err := bc.repair(&currentBlock); err != nil {
- return err
- }
- rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash())
- }
// Everything seems to be fine, set as the head block
bc.currentBlock.Store(currentBlock)
headBlockGauge.Update(int64(currentBlock.NumberU64()))
@@ -375,37 +434,59 @@ func (bc *BlockChain) loadLastState() error {
log.Info("Loaded most recent local header", "number", currentHeader.Number, "hash", currentHeader.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(currentHeader.Time), 0)))
log.Info("Loaded most recent local full block", "number", currentBlock.Number(), "hash", currentBlock.Hash(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(currentBlock.Time()), 0)))
log.Info("Loaded most recent local fast block", "number", currentFastBlock.Number(), "hash", currentFastBlock.Hash(), "td", fastTd, "age", common.PrettyAge(time.Unix(int64(currentFastBlock.Time()), 0)))
-
+ if pivot := rawdb.ReadLastPivotNumber(bc.db); pivot != nil {
+ log.Info("Loaded last fast-sync pivot marker", "number", *pivot)
+ }
return nil
}
-// SetHead rewinds the local chain to a new head. In the case of headers, everything
-// above the new head will be deleted and the new one set. In the case of blocks
-// though, the head may be further rewound if block bodies are missing (non-archive
-// nodes after a fast sync).
+// SetHead rewinds the local chain to a new head. Depending on whether the node
+// was fast synced or full synced and in which state, the method will try to
+// delete minimal data from disk whilst retaining chain consistency.
func (bc *BlockChain) SetHead(head uint64) error {
- log.Warn("Rewinding blockchain", "target", head)
-
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
- updateFn := func(db ethdb.KeyValueWriter, header *types.Header) {
- // Rewind the block chain, ensuring we don't end up with a stateless head block
- if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() < currentBlock.NumberU64() {
+ // Retrieve the last pivot block to short circuit rollbacks beyond it and the
+ // current freezer limit to start nuking id underflown
+ pivot := rawdb.ReadLastPivotNumber(bc.db)
+ frozen, _ := bc.db.Ancients()
+
+ updateFn := func(db ethdb.KeyValueWriter, header *types.Header) (uint64, bool) {
+ // Rewind the block chain, ensuring we don't end up with a stateless head
+ // block. Note, depth equality is permitted to allow using SetHead as a
+ // chain reparation mechanism without deleting any data!
+ if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() <= currentBlock.NumberU64() {
newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
if newHeadBlock == nil {
+ log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
newHeadBlock = bc.genesisBlock
} else {
- if _, err := state.New(newHeadBlock.Root(), bc.stateCache); err != nil {
- // Rewound state missing, rolled back to before pivot, reset to genesis
- newHeadBlock = bc.genesisBlock
+ // Block exists, keep rewinding until we find one with state
+ for {
+ if _, err := state.New(newHeadBlock.Root(), bc.stateCache, bc.snaps); err != nil {
+ log.Trace("Block state missing, rewinding further", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
+ if pivot == nil || newHeadBlock.NumberU64() > *pivot {
+ newHeadBlock = bc.GetBlock(newHeadBlock.ParentHash(), newHeadBlock.NumberU64()-1)
+ continue
+ } else {
+ log.Trace("Rewind passed pivot, aiming genesis", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "pivot", *pivot)
+ newHeadBlock = bc.genesisBlock
+ }
+ }
+ log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
+ break
}
}
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
+
+ // Degrade the chain markers if they are explicitly reverted.
+ // In theory we should update all in-memory markers in the
+ // last step, however the direction of SetHead is from high
+ // to low, so it's safe the update in-memory markers directly.
bc.currentBlock.Store(newHeadBlock)
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
}
-
// Rewind the fast block in a simpleton way to the target head
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() {
newHeadFastBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
@@ -414,11 +495,25 @@ func (bc *BlockChain) SetHead(head uint64) error {
newHeadFastBlock = bc.genesisBlock
}
rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash())
+
+ // Degrade the chain markers if they are explicitly reverted.
+ // In theory we should update all in-memory markers in the
+ // last step, however the direction of SetHead is from high
+ // to low, so it's safe the update in-memory markers directly.
bc.currentFastBlock.Store(newHeadFastBlock)
headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
}
- }
+ head := bc.CurrentBlock().NumberU64()
+ // If setHead underflown the freezer threshold and the block processing
+ // intent afterwards is full block importing, delete the chain segment
+ // between the stateful-block and the sethead target.
+ var wipe bool
+ if head+1 < frozen {
+ wipe = pivot == nil || head >= *pivot
+ }
+ return head, wipe // Only force wipe if full synced
+ }
// Rewind the header chain, deleting all block bodies until then
delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
// Ignore the error here since light client won't hit this path
@@ -426,10 +521,9 @@ func (bc *BlockChain) SetHead(head uint64) error {
if num+1 <= frozen {
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
- if err := bc.db.TruncateAncients(num + 1); err != nil {
+ if err := bc.db.TruncateAncients(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}
-
// Remove the hash <-> number mapping from the active store.
rawdb.DeleteHeaderNumber(db, hash)
} else {
@@ -441,8 +535,18 @@ func (bc *BlockChain) SetHead(head uint64) error {
}
// Todo(rjl493456442) txlookup, bloombits, etc
}
- bc.hc.SetHead(head, updateFn, delFn)
-
+ // If SetHead was only called as a chain reparation method, try to skip
+ // touching the header chain altogether, unless the freezer is broken
+ if block := bc.CurrentBlock(); block.NumberU64() == head {
+ if target, force := updateFn(bc.db, block.Header()); force {
+ bc.hc.SetHead(target, updateFn, delFn)
+ }
+ } else {
+ // Rewind the chain to the requested head and keep going backwards until a
+ // block with a state is found or fast sync pivot is passed
+ log.Warn("Rewinding blockchain", "target", head)
+ bc.hc.SetHead(head, updateFn, delFn)
+ }
// Clear out any stale content from the caches
bc.bodyCache.Purge()
bc.bodyRLPCache.Purge()
@@ -471,6 +575,10 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
headBlockGauge.Update(int64(block.NumberU64()))
bc.chainmu.Unlock()
+ // Destroy any existing state snapshot and regenerate it in the background
+ if bc.snaps != nil {
+ bc.snaps.Rebuild(block.Root())
+ }
log.Info("Committed new head block", "number", block.Number(), "hash", hash)
return nil
}
@@ -486,6 +594,15 @@ func (bc *BlockChain) CurrentBlock() *types.Block {
return bc.currentBlock.Load().(*types.Block)
}
+// Snapshot returns the blockchain snapshot tree. This method is mainly used for
+// testing, to make it possible to verify the snapshot after execution.
+//
+// Warning: There are no guarantees about the safety of using the returned 'snap' if the
+// blockchain is simultaneously importing blocks, so take care.
+func (bc *BlockChain) Snapshot() *snapshot.Tree {
+ return bc.snaps
+}
+
// CurrentFastBlock retrieves the current fast-sync head block of the canonical
// chain. The block is retrieved from the blockchain's internal cache.
func (bc *BlockChain) CurrentFastBlock() *types.Block {
@@ -509,7 +626,7 @@ func (bc *BlockChain) State() (*state.StateDB, error) {
// StateAt returns a new mutable state based on a particular point in time.
func (bc *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) {
- return state.New(root, bc.stateCache)
+ return state.New(root, bc.stateCache, bc.snaps)
}
// StateCache returns the caching database underpinning the blockchain instance.
@@ -533,46 +650,25 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
defer bc.chainmu.Unlock()
// Prepare the genesis block and reinitialise the chain
- if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
- log.Crit("Failed to write genesis block TD", "err", err)
+ batch := bc.db.NewBatch()
+ rawdb.WriteTd(batch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty())
+ rawdb.WriteBlock(batch, genesis)
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to write genesis block", "err", err)
}
- rawdb.WriteBlock(bc.db, genesis)
+ bc.writeHeadBlock(genesis)
+ // Last update all in-memory chain markers
bc.genesisBlock = genesis
- bc.insert(bc.genesisBlock)
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
-
bc.hc.SetGenesis(bc.genesisBlock.Header())
bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
bc.currentFastBlock.Store(bc.genesisBlock)
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
-
return nil
}
-// repair tries to repair the current blockchain by rolling back the current block
-// until one with associated state is found. This is needed to fix incomplete db
-// writes caused either by crashes/power outages, or simply non-committed tries.
-//
-// This method only rolls back the current block. The current header and current
-// fast block are left intact.
-func (bc *BlockChain) repair(head **types.Block) error {
- for {
- // Abort if we've rewound to a head block that does have associated state
- if _, err := state.New((*head).Root(), bc.stateCache); err == nil {
- log.Info("Rewound blockchain to past state", "number", (*head).Number(), "hash", (*head).Hash())
- return nil
- }
- // Otherwise rewind one block and recheck state availability there
- block := bc.GetBlock((*head).ParentHash(), (*head).NumberU64()-1)
- if block == nil {
- return fmt.Errorf("missing block %d [%x]", (*head).NumberU64()-1, (*head).ParentHash())
- }
- *head = block
- }
-}
-
// Export writes the active chain to the given writer.
func (bc *BlockChain) Export(w io.Writer) error {
return bc.ExportN(w, uint64(0), bc.CurrentBlock().NumberU64())
@@ -605,31 +701,39 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
return nil
}
-// insert injects a new head block into the current block chain. This method
+// writeHeadBlock injects a new head block into the current block chain. This method
// assumes that the block is indeed a true head. It will also reset the head
// header and the head fast sync block to this very same block if they are older
// or if they are on a different side chain.
//
// Note, this function assumes that the `mu` mutex is held!
-func (bc *BlockChain) insert(block *types.Block) {
+func (bc *BlockChain) writeHeadBlock(block *types.Block) {
// If the block is on a side chain or an unknown one, force other heads onto it too
updateHeads := rawdb.ReadCanonicalHash(bc.db, block.NumberU64()) != block.Hash()
// Add the block to the canonical chain number scheme and mark as the head
- rawdb.WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64())
- rawdb.WriteHeadBlockHash(bc.db, block.Hash())
-
- bc.currentBlock.Store(block)
- headBlockGauge.Update(int64(block.NumberU64()))
+ batch := bc.db.NewBatch()
+ rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64())
+ rawdb.WriteTxLookupEntriesByBlock(batch, block)
+ rawdb.WriteHeadBlockHash(batch, block.Hash())
// If the block is better than our head or is on a different chain, force update heads
if updateHeads {
+ rawdb.WriteHeadHeaderHash(batch, block.Hash())
+ rawdb.WriteHeadFastBlockHash(batch, block.Hash())
+ }
+ // Flush the whole batch into the disk, exit the node if failed
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to update chain indexes and markers", "err", err)
+ }
+ // Update all in-memory chain markers in the last step
+ if updateHeads {
bc.hc.SetCurrentHeader(block.Header())
- rawdb.WriteHeadFastBlockHash(bc.db, block.Hash())
-
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
}
+ bc.currentBlock.Store(block)
+ headBlockGauge.Update(int64(block.NumberU64()))
}
// Genesis retrieves the chain's genesis block.
@@ -800,12 +904,30 @@ func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.
return uncles
}
-// TrieNode retrieves a blob of data associated with a trie node (or code hash)
+// TrieNode retrieves a blob of data associated with a trie node
// either from ephemeral in-memory cache, or from persistent storage.
func (bc *BlockChain) TrieNode(hash common.Hash) ([]byte, error) {
return bc.stateCache.TrieDB().Node(hash)
}
+// ContractCode retrieves a blob of data associated with a contract hash
+// either from ephemeral in-memory cache, or from persistent storage.
+func (bc *BlockChain) ContractCode(hash common.Hash) ([]byte, error) {
+ return bc.stateCache.ContractCode(common.Hash{}, hash)
+}
+
+// ContractCodeWithPrefix retrieves a blob of data associated with a contract
+// hash either from ephemeral in-memory cache, or from persistent storage.
+//
+// If the code doesn't exist in the in-memory cache, check the storage with
+// new code scheme.
+func (bc *BlockChain) ContractCodeWithPrefix(hash common.Hash) ([]byte, error) {
+ type codeReader interface {
+ ContractCodeWithPrefix(addrHash, codeHash common.Hash) ([]byte, error)
+ }
+ return bc.stateCache.(codeReader).ContractCodeWithPrefix(common.Hash{}, hash)
+}
+
// Stop stops the blockchain service. If any imports are currently in progress
// it will abort them using the procInterrupt.
func (bc *BlockChain) Stop() {
@@ -815,10 +937,17 @@ func (bc *BlockChain) Stop() {
// Unsubscribe all subscriptions registered from blockchain
bc.scope.Close()
close(bc.quit)
- atomic.StoreInt32(&bc.procInterrupt, 1)
-
+ bc.StopInsert()
bc.wg.Wait()
+ // Ensure that the entirety of the state snapshot is journalled to disk.
+ var snapBase common.Hash
+ if bc.snaps != nil {
+ var err error
+ if snapBase, err = bc.snaps.Journal(bc.CurrentBlock().Root()); err != nil {
+ log.Error("Failed to journal state snapshot", "err", err)
+ }
+ }
// Ensure the state of a recent block is also stored to disk before exiting.
// We're writing three different states to catch different restart scenarios:
// - HEAD: So we don't need to reprocess any blocks in the general case
@@ -832,11 +961,17 @@ func (bc *BlockChain) Stop() {
recent := bc.GetBlockByNumber(number - offset)
log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root())
- if err := triedb.Commit(recent.Root(), true); err != nil {
+ if err := triedb.Commit(recent.Root(), true, nil); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
}
}
}
+ if snapBase != (common.Hash{}) {
+ log.Info("Writing snapshot state to disk", "root", snapBase)
+ if err := triedb.Commit(snapBase, true, nil); err != nil {
+ log.Error("Failed to commit recent state trie", "err", err)
+ }
+ }
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem().(common.Hash))
}
@@ -844,7 +979,25 @@ func (bc *BlockChain) Stop() {
log.Error("Dangling trie nodes after full cleanup")
}
}
- log.Info("Blockchain manager stopped")
+ // Ensure all live cached entries be saved into disk, so that we can skip
+ // cache warmup when node restarts.
+ if bc.cacheConfig.TrieCleanJournal != "" {
+ triedb := bc.stateCache.TrieDB()
+ triedb.SaveCache(bc.cacheConfig.TrieCleanJournal)
+ }
+ log.Info("Blockchain stopped")
+}
+
+// StopInsert interrupts all insertion methods, causing them to return
+// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
+// calling this method.
+func (bc *BlockChain) StopInsert() {
+ atomic.StoreInt32(&bc.procInterrupt, 1)
+}
+
+// insertStopped returns true after StopInsert has been called.
+func (bc *BlockChain) insertStopped() bool {
+ return atomic.LoadInt32(&bc.procInterrupt) == 1
}
func (bc *BlockChain) procFutureBlocks() {
@@ -855,8 +1008,9 @@ func (bc *BlockChain) procFutureBlocks() {
}
}
if len(blocks) > 0 {
- types.BlockBy(types.Number).Sort(blocks)
-
+ sort.Slice(blocks, func(i, j int) bool {
+ return blocks[i].NumberU64() < blocks[j].NumberU64()
+ })
// Insert one by one as chain insertion needs contiguous ancestry between blocks
for i := range blocks {
bc.InsertChain(blocks[i : i+1])
@@ -873,42 +1027,6 @@ const (
SideStatTy
)
-// Rollback is designed to remove a chain of links from the database that aren't
-// certain enough to be valid.
-func (bc *BlockChain) Rollback(chain []common.Hash) {
- bc.chainmu.Lock()
- defer bc.chainmu.Unlock()
-
- for i := len(chain) - 1; i >= 0; i-- {
- hash := chain[i]
-
- currentHeader := bc.hc.CurrentHeader()
- if currentHeader.Hash() == hash {
- bc.hc.SetCurrentHeader(bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1))
- }
- if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash {
- newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
- rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash())
- bc.currentFastBlock.Store(newFastBlock)
- headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
- }
- if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
- newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
- rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash())
- bc.currentBlock.Store(newBlock)
- headBlockGauge.Update(int64(newBlock.NumberU64()))
- }
- }
- // Truncate ancient data which exceeds the current header.
- //
- // Notably, it can happen that system crashes without truncating the ancient data
- // but the head indicator has been updated in the active store. Regarding this issue,
- // system will self recovery by truncating the extra data during the setup phase.
- if err := bc.truncateAncient(bc.hc.CurrentHeader().Number.Uint64()); err != nil {
- log.Crit("Truncate ancient store failed", "err", err)
- }
-}
-
// truncateAncient rewinds the blockchain to the specified header and deletes all
// data in the ancient store that exceeds the specified header.
func (bc *BlockChain) truncateAncient(head uint64) error {
@@ -982,7 +1100,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
size = 0
)
// updateHead updates the head fast sync block if the inserted blocks are better
- // and returns a indicator whether the inserted blocks are canonical.
+ // and returns an indicator whether the inserted blocks are canonical.
updateHead := func(head *types.Block) bool {
if bc.manualCanonical {
return false
@@ -1024,7 +1142,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
var deleted []*numberHash
for i, block := range blockChain {
// Short circuit insertion if shutting down or processing failed
- if atomic.LoadInt32(&bc.procInterrupt) == 1 {
+ if bc.insertStopped() {
return 0, errInsertionInterrupted
}
// Short circuit insertion if it is required(used in testing only)
@@ -1064,7 +1182,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
// Don't collect too much in-memory, write it out every 100K blocks
if len(deleted) > 100000 {
-
// Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.Sync(); err != nil {
return 0, err
@@ -1096,8 +1213,23 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
// Flush data into ancient database.
size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64()))
- rawdb.WriteTxLookupEntries(batch, block)
+ // Write tx indices if any condition is satisfied:
+ // * If user requires to reserve all tx indices(txlookuplimit=0)
+ // * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit)
+ // * If block number is large enough to be regarded as a recent block
+ // It means blocks below the ancientLimit-txlookupLimit won't be indexed.
+ //
+ // But if the `TxIndexTail` is not nil, e.g. Geth is initialized with
+ // an external ancient database, during the setup, blockchain will start
+ // a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
+ // range. In this case, all tx indices of newly imported blocks should be
+ // generated.
+ if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit {
+ rawdb.WriteTxLookupEntriesByBlock(batch, block)
+ } else if rawdb.ReadTxIndexTail(bc.db) != nil {
+ rawdb.WriteTxLookupEntriesByBlock(batch, block)
+ }
stats.processed++
}
// Flush all tx-lookup index data.
@@ -1154,26 +1286,37 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
// writeLive writes blockchain and corresponding receipt chain into active store.
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
+ skipPresenceCheck := false
batch := bc.db.NewBatch()
for i, block := range blockChain {
// Short circuit insertion if shutting down or processing failed
- if atomic.LoadInt32(&bc.procInterrupt) == 1 {
+ if bc.insertStopped() {
return 0, errInsertionInterrupted
}
// Short circuit if the owner header is unknown
if !bc.HasHeader(block.Hash(), block.NumberU64()) {
return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
}
- if bc.HasBlock(block.Hash(), block.NumberU64()) {
- stats.ignored++
- continue
+ if !skipPresenceCheck {
+ // Ignore if the entire data is already known
+ if bc.HasBlock(block.Hash(), block.NumberU64()) {
+ stats.ignored++
+ continue
+ } else {
+ // If block N is not present, neither are the later blocks.
+ // This should be true, but if we are mistaken, the shortcut
+ // here will only cause overwriting of some existing data
+ skipPresenceCheck = true
+ }
}
// Write all the data out into the database
rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
- rawdb.WriteTxLookupEntries(batch, block)
+ rawdb.WriteTxLookupEntriesByBlock(batch, block) // Always write tx indices for live blocks, we assume they are needed
- stats.processed++
+ // Write everything belongs to the blocks into the database. So that
+ // we can ensure all components of body is completed(body, receipts,
+ // tx indexes)
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return 0, err
@@ -1181,7 +1324,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
size += batch.ValueSize()
batch.Reset()
}
+ stats.processed++
}
+ // Write everything belongs to the blocks into the database. So that
+ // we can ensure all components of body is completed(body, receipts,
+ // tx indexes)
if batch.ValueSize() > 0 {
size += batch.ValueSize()
if err := batch.Write(); err != nil {
@@ -1191,7 +1338,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
updateHead(blockChain[len(blockChain)-1])
return 0, nil
}
- // Write downloaded chain data and corresponding receipt chain data.
+ // Write downloaded chain data and corresponding receipt chain data
if len(ancientBlocks) > 0 {
if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil {
if err == errInsertionInterrupted {
@@ -1200,6 +1347,19 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return n, err
}
}
+ // Write the tx index tail (block number from where we index) before write any live blocks
+ if len(liveBlocks) > 0 && liveBlocks[0].NumberU64() == ancientLimit+1 {
+ // The tx index tail can only be one of the following two options:
+ // * 0: all ancient blocks have been indexed
+ // * ancient-limit: the indices of blocks before ancient-limit are ignored
+ if tail := rawdb.ReadTxIndexTail(bc.db); tail == nil {
+ if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit {
+ rawdb.WriteTxIndexTail(bc.db, 0)
+ } else {
+ rawdb.WriteTxIndexTail(bc.db, ancientLimit-bc.txLookupLimit)
+ }
+ }
+ }
if len(liveBlocks) > 0 {
if n, err := writeLive(liveBlocks, liveReceipts); err != nil {
if err == errInsertionInterrupted {
@@ -1223,6 +1383,18 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return 0, nil
}
+// SetTxLookupLimit is responsible for updating the txlookup limit to the
+// original one stored in db if the new mismatches with the old one.
+func (bc *BlockChain) SetTxLookupLimit(limit uint64) {
+ bc.txLookupLimit = limit
+}
+
+// TxLookupLimit retrieves the txlookup limit used by blockchain to prune
+// stale transaction indices.
+func (bc *BlockChain) TxLookupLimit() uint64 {
+ return bc.txLookupLimit
+}
+
var lastWrite uint64
// writeBlockWithoutState writes only the block and its metadata to the database,
@@ -1232,11 +1404,12 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e
bc.wg.Add(1)
defer bc.wg.Done()
- if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), td); err != nil {
- return err
+ batch := bc.db.NewBatch()
+ rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td)
+ rawdb.WriteBlock(batch, block)
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to write block into disk", "err", err)
}
- rawdb.WriteBlock(bc.db, block)
-
return nil
}
@@ -1247,30 +1420,26 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
defer bc.wg.Done()
current := bc.CurrentBlock()
- if block.ParentHash() != current.Hash() {
+ if !bc.manualCanonical && block.ParentHash() != current.Hash() {
if err := bc.reorg(current, block); err != nil {
return err
}
}
- // Write the positional metadata for transaction/receipt lookups.
- // Preimages here is empty, ignore it.
- rawdb.WriteTxLookupEntries(bc.db, block)
-
- bc.insert(block)
+ bc.writeHeadBlock(block)
return nil
}
// WriteBlockWithState writes the block and all associated state to the database.
-func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
+func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
- return bc.writeBlockWithState(block, receipts, state)
+ return bc.writeBlockWithState(block, receipts, logs, state, emitHeadEvent)
}
// writeBlockWithState writes the block and all associated state to the database,
// but is expects the chain mutex to be held.
-func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
+func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
bc.wg.Add(1)
defer bc.wg.Done()
@@ -1284,12 +1453,19 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
externTd := new(big.Int).Add(block.Difficulty(), ptd)
- // Irrelevant of the canonical status, write the block itself to the database
- if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
- return NonStatTy, err
- }
- rawdb.WriteBlock(bc.db, block)
-
+ // Irrelevant of the canonical status, write the block itself to the database.
+ //
+ // Note all the components of block(td, hash->number map, header, body, receipts)
+ // should be written atomically. BlockBatch is used for containing all components.
+ blockBatch := bc.db.NewBatch()
+ rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd)
+ rawdb.WriteBlock(blockBatch, block)
+ rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts)
+ rawdb.WritePreimages(blockBatch, state.Preimages())
+ if err := blockBatch.Write(); err != nil {
+ log.Crit("Failed to write block into disk", "err", err)
+ }
+ // Commit all cached state changes into underlying memory database.
root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
return NonStatTy, err
@@ -1298,7 +1474,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
// If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled {
- if err := triedb.Commit(root, false); err != nil {
+ if err := triedb.Commit(root, false, nil); err != nil {
return NonStatTy, err
}
} else {
@@ -1332,7 +1508,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory)
}
// Flush an entire trie and restart the counters
- triedb.Commit(header.Root, true)
+ triedb.Commit(header.Root, true, nil)
lastWrite = chosen
bc.gcproc = 0
}
@@ -1348,11 +1524,6 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
}
}
-
- // Write other block data using a batch.
- batch := bc.db.NewBatch()
- rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
-
// If the total difficulty is higher than our known, add it to the canonical chain
// Second clause in the if statement reduces the vulnerability to selfish mining.
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
@@ -1378,23 +1549,32 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
return NonStatTy, err
}
}
- // Write the positional metadata for transaction/receipt lookups and preimages
- rawdb.WriteTxLookupEntries(batch, block)
- rawdb.WritePreimages(batch, state.Preimages())
-
status = CanonStatTy
} else {
status = SideStatTy
}
- if err := batch.Write(); err != nil {
- return NonStatTy, err
- }
-
// Set new head.
if status == CanonStatTy {
- bc.insert(block)
+ bc.writeHeadBlock(block)
}
bc.futureBlocks.Remove(block.Hash())
+
+ if status == CanonStatTy {
+ bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
+ if len(logs) > 0 {
+ bc.logsFeed.Send(logs)
+ }
+ // In theory we should fire a ChainHeadEvent when we inject
+ // a canonical block, but sometimes we can insert a batch of
+ // canonicial blocks. Avoid firing too much ChainHeadEvents,
+ // we will fire an accumulated ChainHeadEvent and disable fire
+ // event here.
+ if emitHeadEvent {
+ bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
+ }
+ } else {
+ bc.chainSideFeed.Send(ChainSideEvent{Block: block})
+ }
return status, nil
}
@@ -1445,11 +1625,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// Pre-checks passed, start the full block imports
bc.wg.Add(1)
bc.chainmu.Lock()
- n, events, logs, err := bc.insertChain(chain, true)
+ n, err := bc.insertChain(chain, true)
bc.chainmu.Unlock()
bc.wg.Done()
- bc.PostChainEvents(events, logs)
return n, err
}
@@ -1461,23 +1640,24 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// racey behaviour. If a sidechain import is in progress, and the historic state
// is imported, but then new canon-head is added before the actual sidechain
// completes, then the historic state could be pruned again
-func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
+func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, error) {
// If the chain is terminating, don't even bother starting up
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
- return 0, nil, nil, nil
+ return 0, nil
}
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
- // A queued approach to delivering events. This is generally
- // faster than direct delivery and requires much less mutex
- // acquiring.
var (
- stats = insertStats{startTime: mclock.Now()}
- events = make([]interface{}, 0, len(chain))
- lastCanon *types.Block
- coalescedLogs []*types.Log
+ stats = insertStats{startTime: mclock.Now()}
+ lastCanon *types.Block
)
+ // Fire a single chain head event if we've progressed the chain
+ defer func() {
+ if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
+ bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon})
+ }
+ }()
// Start the parallel header verifier
headers := make([]*types.Header, len(chain))
seals := make([]bool, len(chain))
@@ -1527,7 +1707,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
for block != nil && err == ErrKnownBlock {
log.Debug("Writing previously known block", "number", block.Number(), "hash", block.Hash())
if err := bc.writeKnownBlock(block); err != nil {
- return it.index, nil, nil, err
+ return it.index, err
}
lastCanon = block
@@ -1537,16 +1717,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
}
switch {
// First block is pruned, insert as sidechain and reorg only if TD grows enough
- case err == consensus.ErrPrunedAncestor:
+ case errors.Is(err, consensus.ErrPrunedAncestor):
log.Debug("Pruned ancestor, inserting as sidechain", "number", block.Number(), "hash", block.Hash())
return bc.insertSideChain(block, it)
// First block is future, shove it (and all children) to the future queue (unknown ancestor)
- case err == consensus.ErrFutureBlock || (err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(it.first().ParentHash())):
- for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) {
+ case errors.Is(err, consensus.ErrFutureBlock) || (errors.Is(err, consensus.ErrUnknownAncestor) && bc.futureBlocks.Contains(it.first().ParentHash())):
+ for block != nil && (it.index == 0 || errors.Is(err, consensus.ErrUnknownAncestor)) {
log.Debug("Future block, postponing import", "number", block.Number(), "hash", block.Hash())
if err := bc.addFutureBlock(block); err != nil {
- return it.index, events, coalescedLogs, err
+ return it.index, err
}
block, err = it.next()
}
@@ -1554,25 +1734,26 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
stats.ignored += it.remaining()
// If there are any still remaining, mark as ignored
- return it.index, events, coalescedLogs, err
+ return it.index, err
// Some other error occurred, abort
case err != nil:
+ bc.futureBlocks.Remove(block.Hash())
stats.ignored += len(it.chain)
bc.reportBlock(block, nil, err)
- return it.index, events, coalescedLogs, err
+ return it.index, err
}
// No validation errors for the first block (or chain prefix skipped)
for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() {
// If the chain is terminating, stop processing blocks
- if atomic.LoadInt32(&bc.procInterrupt) == 1 {
- log.Debug("Premature abort during blocks processing")
+ if bc.insertStopped() {
+ log.Debug("Abort during block processing")
break
}
// If the header is a banned one, straight out abort
if BadHashes[block.Hash()] {
bc.reportBlock(block, nil, ErrBlacklistedHash)
- return it.index, events, coalescedLogs, ErrBlacklistedHash
+ return it.index, ErrBlacklistedHash
}
// If the block is known (in the middle of the chain), it's a special case for
// Clique blocks where they can share state among each other, so importing an
@@ -1588,16 +1769,28 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
"uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(),
"root", block.Root())
+ // Special case. Commit the empty receipt slice if we meet the known
+ // block in the middle. It can only happen in the clique chain. Whenever
+ // we insert blocks via `insertSideChain`, we only commit `td`, `header`
+ // and `body` if it's non-existent. Since we don't have receipts without
+ // reexecution, so nothing to commit. But if the sidechain will be adpoted
+ // as the canonical chain eventually, it needs to be reexecuted for missing
+ // state, but if it's this special case here(skip reexecution) we will lose
+ // the empty receipt entry.
+ if len(block.Transactions()) == 0 {
+ rawdb.WriteReceipts(bc.db, block.Hash(), block.NumberU64(), nil)
+ } else {
+ log.Error("Please file an issue, skip known block execution without receipt",
+ "hash", block.Hash(), "number", block.NumberU64())
+ }
if err := bc.writeKnownBlock(block); err != nil {
- return it.index, nil, nil, err
+ return it.index, err
}
stats.processed++
// We can assume that logs are empty here, since the only way for consecutive
// Clique blocks to have the same state is if there are no transactions.
- events = append(events, ChainEvent{block, block.Hash(), nil})
lastCanon = block
-
continue
}
// Retrieve the parent block and it's state to execute on top
@@ -1607,25 +1800,24 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
- statedb, err := state.New(parent.Root, bc.stateCache)
+ statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
- return it.index, events, coalescedLogs, err
+ return it.index, err
}
// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
var followupInterrupt uint32
-
if !bc.cacheConfig.TrieCleanNoPrefetch {
if followup, err := it.peek(); followup != nil && err == nil {
- go func(start time.Time) {
- throwaway, _ := state.New(parent.Root, bc.stateCache)
+ throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)
+ go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
blockPrefetchExecuteTimer.Update(time.Since(start))
- if atomic.LoadUint32(&followupInterrupt) == 1 {
+ if atomic.LoadUint32(interrupt) == 1 {
blockPrefetchInterruptMeter.Mark(1)
}
- }(time.Now())
+ }(time.Now(), followup, throwaway, &followupInterrupt)
}
}
// Process block using the parent state as reference point
@@ -1634,17 +1826,19 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
if err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
- return it.index, events, coalescedLogs, err
+ return it.index, err
}
// Update the metrics touched during block processing
- accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
- storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them
- accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
- storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
+ accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
+ storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them
+ accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
+ storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
+ snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete, we can mark them
+ snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete, we can mark them
triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
- trieproc := statedb.AccountReads + statedb.AccountUpdates
- trieproc += statedb.StorageReads + statedb.StorageUpdates
+ trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates
+ trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates
blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash)
@@ -1653,7 +1847,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
- return it.index, events, coalescedLogs, err
+ return it.index, err
}
proctime := time.Since(start)
@@ -1665,18 +1859,18 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
// Write the block to the chain and get the status.
substart = time.Now()
- status, err := bc.writeBlockWithState(block, receipts, statedb)
+ status, err := bc.writeBlockWithState(block, receipts, logs, statedb, false)
+ atomic.StoreUint32(&followupInterrupt, 1)
if err != nil {
- atomic.StoreUint32(&followupInterrupt, 1)
- return it.index, events, coalescedLogs, err
+ return it.index, err
}
- atomic.StoreUint32(&followupInterrupt, 1)
// Update the metrics touched during block commit
- accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
- storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
+ accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
+ storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
+ snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them
- blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits)
+ blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits)
blockInsertTimer.UpdateSince(start)
switch status {
@@ -1686,8 +1880,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
"elapsed", common.PrettyDuration(time.Since(start)),
"root", block.Root())
- coalescedLogs = append(coalescedLogs, logs...)
- events = append(events, ChainEvent{block, block.Hash(), logs})
lastCanon = block
// Only count canonical blocks for GC processing time
@@ -1698,7 +1890,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
"txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
"root", block.Root())
- events = append(events, ChainSideEvent{block})
default:
// This in theory is impossible, but lets be nice to our future selves and leave
@@ -1715,26 +1906,22 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
stats.report(chain, it.index, dirty)
}
// Any blocks remaining here? The only ones we care about are the future ones
- if block != nil && err == consensus.ErrFutureBlock {
+ if block != nil && errors.Is(err, consensus.ErrFutureBlock) {
if err := bc.addFutureBlock(block); err != nil {
- return it.index, events, coalescedLogs, err
+ return it.index, err
}
block, err = it.next()
- for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() {
+ for ; block != nil && errors.Is(err, consensus.ErrUnknownAncestor); block, err = it.next() {
if err := bc.addFutureBlock(block); err != nil {
- return it.index, events, coalescedLogs, err
+ return it.index, err
}
stats.queued++
}
}
stats.ignored += it.remaining()
- // Append a single chain head event if we've progressed the chain
- if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
- events = append(events, ChainHeadEvent{lastCanon})
- }
- return it.index, events, coalescedLogs, err
+ return it.index, err
}
// insertSideChain is called when an import batch hits upon a pruned ancestor
@@ -1743,7 +1930,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
//
// The method writes all (header-and-body-valid) blocks to disk, then tries to
// switch over to the new chain if the TD exceeded the current chain.
-func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, error) {
+func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, error) {
var (
externTd *big.Int
current = bc.CurrentBlock()
@@ -1753,7 +1940,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
// ones. Any other errors means that the block is invalid, and should not be written
// to disk.
err := consensus.ErrPrunedAncestor
- for ; block != nil && (err == consensus.ErrPrunedAncestor); block, err = it.next() {
+ for ; block != nil && errors.Is(err, consensus.ErrPrunedAncestor); block, err = it.next() {
// Check the canonical state root for that number
if number := block.NumberU64(); current.NumberU64() >= number {
canonical := bc.GetBlockByNumber(number)
@@ -1779,7 +1966,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
// If someone legitimately side-mines blocks, they would still be imported as usual. However,
// we cannot risk writing unverified blocks to disk when they obviously target the pruning
// mechanism.
- return it.index, nil, nil, errors.New("sidechain ghost-state attack")
+ return it.index, errors.New("sidechain ghost-state attack")
}
}
if externTd == nil {
@@ -1790,7 +1977,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
if !bc.HasBlock(block.Hash(), block.NumberU64()) {
start := time.Now()
if err := bc.writeBlockWithoutState(block, externTd); err != nil {
- return it.index, nil, nil, err
+ return it.index, err
}
log.Debug("Injected sidechain block", "number", block.Number(), "hash", block.Hash(),
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
@@ -1807,7 +1994,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
localTd := bc.GetTd(current.Hash(), current.NumberU64())
if bc.manualCanonical || localTd.Cmp(externTd) > 0 {
log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().Number, "sidetd", externTd, "localtd", localTd)
- return it.index, nil, nil, err
+ return it.index, err
}
// Gather all the sidechain hashes (full blocks may be memory heavy)
var (
@@ -1822,7 +2009,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1)
}
if parent == nil {
- return it.index, nil, nil, errors.New("missing parent")
+ return it.index, errors.New("missing parent")
}
// Import all the pruned blocks to make the state available
var (
@@ -1841,15 +2028,15 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
// memory here.
if len(blocks) >= 2048 || memory > 64*1024*1024 {
log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64())
- if _, _, _, err := bc.insertChain(blocks, false); err != nil {
- return 0, nil, nil, err
+ if _, err := bc.insertChain(blocks, false); err != nil {
+ return 0, err
}
blocks, memory = blocks[:0], 0
// If the chain is terminating, stop processing blocks
- if atomic.LoadInt32(&bc.procInterrupt) == 1 {
- log.Debug("Premature abort during blocks processing")
- return 0, nil, nil, nil
+ if bc.insertStopped() {
+ log.Debug("Abort during blocks processing")
+ return 0, nil
}
}
}
@@ -1857,13 +2044,14 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64())
return bc.insertChain(blocks, false)
}
- return 0, nil, nil, nil
+ return 0, nil
}
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
// blocks and inserts them to be part of the new canonical chain and accumulates
// potential missing transactions and post an event about them.
func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
+ log.Error("reorg shouldn't happen!!!")
var (
newChain types.Blocks
oldChain types.Blocks
@@ -1872,11 +2060,11 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
deletedTxs types.Transactions
addedTxs types.Transactions
- deletedLogs []*types.Log
- rebirthLogs []*types.Log
+ deletedLogs [][]*types.Log
+ rebirthLogs [][]*types.Log
- // collectLogs collects the logs that were generated during the
- // processing of the block that corresponds with the given hash.
+ // collectLogs collects the logs that were generated or removed during
+ // the processing of the block that corresponds with the given hash.
// These logs are later announced as deleted or reborn
collectLogs = func(hash common.Hash, removed bool) {
number := bc.hc.GetBlockNumber(hash)
@@ -1884,18 +2072,40 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
return
}
receipts := rawdb.ReadReceipts(bc.db, hash, *number, bc.chainConfig)
+
+ var logs []*types.Log
for _, receipt := range receipts {
for _, log := range receipt.Logs {
l := *log
if removed {
l.Removed = true
- deletedLogs = append(deletedLogs, &l)
} else {
- rebirthLogs = append(rebirthLogs, &l)
}
+ logs = append(logs, &l)
+ }
+ }
+ if len(logs) > 0 {
+ if removed {
+ deletedLogs = append(deletedLogs, logs)
+ } else {
+ rebirthLogs = append(rebirthLogs, logs)
}
}
}
+ // mergeLogs returns a merged log slice with specified sort order.
+ mergeLogs = func(logs [][]*types.Log, reverse bool) []*types.Log {
+ var ret []*types.Log
+ if reverse {
+ for i := len(logs) - 1; i >= 0; i-- {
+ ret = append(ret, logs[i]...)
+ }
+ } else {
+ for i := 0; i < len(logs); i++ {
+ ret = append(ret, logs[i]...)
+ }
+ }
+ return ret
+ }
)
// Reduce the longer chain to the same number as the shorter one
if oldBlock.NumberU64() > newBlock.NumberU64() {
@@ -1954,6 +2164,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
"drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash())
blockReorgAddMeter.Mark(int64(len(newChain)))
blockReorgDropMeter.Mark(int64(len(oldChain)))
+ blockReorgMeter.Mark(1)
} else {
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash())
}
@@ -1961,20 +2172,19 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// taking care of the proper incremental order.
for i := len(newChain) - 1; i >= 1; i-- {
// Insert the block in the canonical way, re-writing history
- bc.insert(newChain[i])
+ bc.writeHeadBlock(newChain[i])
// Collect reborn logs due to chain reorg
collectLogs(newChain[i].Hash(), false)
- // Write lookup entries for hash based transaction/receipt searches
- rawdb.WriteTxLookupEntries(bc.db, newChain[i])
+ // Collect the new added transactions.
addedTxs = append(addedTxs, newChain[i].Transactions()...)
}
- // When transactions get deleted from the database, the receipts that were
- // created in the fork must also be deleted
- batch := bc.db.NewBatch()
+ // Delete useless indexes right now which includes the non-canonical
+ // transaction indexes, canonical chain indexes which above the head.
+ indexesBatch := bc.db.NewBatch()
for _, tx := range types.TxDifference(deletedTxs, addedTxs) {
- rawdb.DeleteTxLookupEntry(batch, tx.Hash())
+ rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash())
}
// Delete any canonical number assignments above the new head
number := bc.CurrentBlock().NumberU64()
@@ -1983,52 +2193,27 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
if hash == (common.Hash{}) {
break
}
- rawdb.DeleteCanonicalHash(batch, i)
+ rawdb.DeleteCanonicalHash(indexesBatch, i)
+ }
+ if err := indexesBatch.Write(); err != nil {
+ log.Crit("Failed to delete useless indexes", "err", err)
}
- batch.Write()
// If any logs need to be fired, do it now. In theory we could avoid creating
// this goroutine if there are no events to fire, but realistcally that only
// ever happens if we're reorging empty blocks, which will only happen on idle
// networks where performance is not an issue either way.
- //
- // TODO(karalabe): Can we get rid of the goroutine somehow to guarantee correct
- // event ordering?
- go func() {
- if len(deletedLogs) > 0 {
- bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
- }
- if len(rebirthLogs) > 0 {
- bc.logsFeed.Send(rebirthLogs)
- }
- if len(oldChain) > 0 {
- for _, block := range oldChain {
- bc.chainSideFeed.Send(ChainSideEvent{Block: block})
- }
- }
- }()
- return nil
-}
-
-// PostChainEvents iterates over the events generated by a chain insertion and
-// posts them into the event feed.
-// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
-func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
- // post event logs for further processing
- if logs != nil {
- bc.logsFeed.Send(logs)
+ if len(deletedLogs) > 0 {
+ bc.rmLogsFeed.Send(RemovedLogsEvent{mergeLogs(deletedLogs, true)})
}
- for _, event := range events {
- switch ev := event.(type) {
- case ChainEvent:
- bc.chainFeed.Send(ev)
-
- case ChainHeadEvent:
- bc.chainHeadFeed.Send(ev)
-
- case ChainSideEvent:
- bc.chainSideFeed.Send(ev)
+ if len(rebirthLogs) > 0 {
+ bc.logsFeed.Send(mergeLogs(rebirthLogs, false))
+ }
+ if len(oldChain) > 0 {
+ for i := len(oldChain) - 1; i >= 0; i-- {
+ bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})
}
}
+ return nil
}
func (bc *BlockChain) update() {
@@ -2044,6 +2229,86 @@ func (bc *BlockChain) update() {
}
}
+// maintainTxIndex is responsible for the construction and deletion of the
+// transaction index.
+//
+// User can use flag `txlookuplimit` to specify a "recentness" block, below
+// which ancient tx indices get deleted. If `txlookuplimit` is 0, it means
+// all tx indices will be reserved.
+//
+// The user can adjust the txlookuplimit value for each launch after fast
+// sync, Geth will automatically construct the missing indices and delete
+// the extra indices.
+func (bc *BlockChain) maintainTxIndex(ancients uint64) {
+ // Before starting the actual maintenance, we need to handle a special case,
+ // where user might init Geth with an external ancient database. If so, we
+ // need to reindex all necessary transactions before starting to process any
+ // pruning requests.
+ if ancients > 0 {
+ var from = uint64(0)
+ if bc.txLookupLimit != 0 && ancients > bc.txLookupLimit {
+ from = ancients - bc.txLookupLimit
+ }
+ rawdb.IndexTransactions(bc.db, from, ancients)
+ }
+ // indexBlocks reindexes or unindexes transactions depending on user configuration
+ indexBlocks := func(tail *uint64, head uint64, done chan struct{}) {
+ defer func() { done <- struct{}{} }()
+
+ // If the user just upgraded Geth to a new version which supports transaction
+ // index pruning, write the new tail and remove anything older.
+ if tail == nil {
+ if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
+ // Nothing to delete, write the tail and return
+ rawdb.WriteTxIndexTail(bc.db, 0)
+ } else {
+ // Prune all stale tx indices and record the tx index tail
+ rawdb.UnindexTransactions(bc.db, 0, head-bc.txLookupLimit+1)
+ }
+ return
+ }
+ // If a previous indexing existed, make sure that we fill in any missing entries
+ if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
+ if *tail > 0 {
+ rawdb.IndexTransactions(bc.db, 0, *tail)
+ }
+ return
+ }
+ // Update the transaction index to the new chain state
+ if head-bc.txLookupLimit+1 < *tail {
+ // Reindex a part of missing indices and rewind index tail to HEAD-limit
+ rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail)
+ } else {
+ // Unindex a part of stale indices and forward index tail to HEAD-limit
+ rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1)
+ }
+ }
+ // Any reindexing done, start listening to chain events and moving the index window
+ var (
+ done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
+ headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed
+ )
+ sub := bc.SubscribeChainHeadEvent(headCh)
+ if sub == nil {
+ return
+ }
+ defer sub.Unsubscribe()
+
+ for {
+ select {
+ case head := <-headCh:
+ if done == nil {
+ done = make(chan struct{})
+ go indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done)
+ }
+ case <-done:
+ done = nil
+ case <-bc.quit:
+ return
+ }
+ }
+}
+
// BadBlocks returns a list of the last 'bad blocks' that the client has seen on the network
func (bc *BlockChain) BadBlocks() []*types.Block {
blocks := make([]*types.Block, 0, bc.badBlocks.Len())
@@ -2148,6 +2413,11 @@ func (bc *BlockChain) HasHeader(hash common.Hash, number uint64) bool {
return bc.hc.HasHeader(hash, number)
}
+// GetCanonicalHash returns the canonical hash for a given block number
+func (bc *BlockChain) GetCanonicalHash(number uint64) common.Hash {
+ return bc.hc.GetCanonicalHash(number)
+}
+
// GetBlockHashesFromHash retrieves a number of block hashes starting at a given
// hash, fetching towards the genesis block.
func (bc *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash {
@@ -2160,9 +2430,6 @@ func (bc *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []com
//
// Note: ancestor == 0 returns the same block, 1 returns its parent and so on.
func (bc *BlockChain) GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) {
- bc.chainmu.RLock()
- defer bc.chainmu.RUnlock()
-
return bc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical)
}
@@ -2232,6 +2499,6 @@ func (bc *BlockChain) ManualHead(hash common.Hash) error {
}
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
- bc.insert(block)
+ bc.writeHeadBlock(block)
return nil
}