diff options
Diffstat (limited to 'core/state/snapshot')
-rw-r--r-- | core/state/snapshot/account.go | 88 | ||||
-rw-r--r-- | core/state/snapshot/conversion.go | 275 | ||||
-rw-r--r-- | core/state/snapshot/difflayer.go | 553 | ||||
-rw-r--r-- | core/state/snapshot/disklayer.go | 166 | ||||
-rw-r--r-- | core/state/snapshot/generate.go | 265 | ||||
-rw-r--r-- | core/state/snapshot/iterator.go | 400 | ||||
-rw-r--r-- | core/state/snapshot/iterator_binary.go | 213 | ||||
-rw-r--r-- | core/state/snapshot/iterator_fast.go | 350 | ||||
-rw-r--r-- | core/state/snapshot/journal.go | 270 | ||||
-rw-r--r-- | core/state/snapshot/snapshot.go | 619 | ||||
-rw-r--r-- | core/state/snapshot/sort.go | 36 | ||||
-rw-r--r-- | core/state/snapshot/wipe.go | 131 |
12 files changed, 3366 insertions, 0 deletions
diff --git a/core/state/snapshot/account.go b/core/state/snapshot/account.go new file mode 100644 index 0000000..303c2fc --- /dev/null +++ b/core/state/snapshot/account.go @@ -0,0 +1,88 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package snapshot + +import ( + "bytes" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" +) + +// Account is a modified version of a state.Account, where the root is replaced +// with a byte slice. This format can be used to represent full-consensus format +// or slim-snapshot format which replaces the empty root and code hash as nil +// byte slice. +type Account struct { + Nonce uint64 + Balance *big.Int + Root []byte + CodeHash []byte + IsMultiCoin bool +} + +// SlimAccount converts a state.Account content into a slim snapshot account +func SlimAccount(nonce uint64, balance *big.Int, root common.Hash, codehash []byte, isMultiCoin bool) Account { + slim := Account{ + Nonce: nonce, + Balance: balance, + IsMultiCoin: isMultiCoin, + } + if root != emptyRoot { + slim.Root = root[:] + } + if !bytes.Equal(codehash, emptyCode[:]) { + slim.CodeHash = codehash + } + return slim +} + +// SlimAccountRLP converts a state.Account content into a slim snapshot +// version RLP encoded. +func SlimAccountRLP(nonce uint64, balance *big.Int, root common.Hash, codehash []byte, isMultiCoin bool) []byte { + data, err := rlp.EncodeToBytes(SlimAccount(nonce, balance, root, codehash, isMultiCoin)) + if err != nil { + panic(err) + } + return data +} + +// FullAccount decodes the data on the 'slim RLP' format and return +// the consensus format account. +func FullAccount(data []byte) (Account, error) { + var account Account + if err := rlp.DecodeBytes(data, &account); err != nil { + return Account{}, err + } + if len(account.Root) == 0 { + account.Root = emptyRoot[:] + } + if len(account.CodeHash) == 0 { + account.CodeHash = emptyCode[:] + } + return account, nil +} + +// FullAccountRLP converts data on the 'slim RLP' format into the full RLP-format. +func FullAccountRLP(data []byte) ([]byte, error) { + account, err := FullAccount(data) + if err != nil { + return nil, err + } + return rlp.EncodeToBytes(account) +} diff --git a/core/state/snapshot/conversion.go b/core/state/snapshot/conversion.go new file mode 100644 index 0000000..dee9ff0 --- /dev/null +++ b/core/state/snapshot/conversion.go @@ -0,0 +1,275 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package snapshot + +import ( + "bytes" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb/memorydb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" +) + +// trieKV represents a trie key-value pair +type trieKV struct { + key common.Hash + value []byte +} + +type ( + // trieGeneratorFn is the interface of trie generation which can + // be implemented by different trie algorithm. + trieGeneratorFn func(in chan (trieKV), out chan (common.Hash)) + + // leafCallbackFn is the callback invoked at the leaves of the trie, + // returns the subtrie root with the specified subtrie identifier. + leafCallbackFn func(hash common.Hash, stat *generateStats) common.Hash +) + +// GenerateAccountTrieRoot takes an account iterator and reproduces the root hash. +func GenerateAccountTrieRoot(it AccountIterator) (common.Hash, error) { + return generateTrieRoot(it, common.Hash{}, stdGenerate, nil, &generateStats{start: time.Now()}, true) +} + +// GenerateStorageTrieRoot takes a storage iterator and reproduces the root hash. +func GenerateStorageTrieRoot(account common.Hash, it StorageIterator) (common.Hash, error) { + return generateTrieRoot(it, account, stdGenerate, nil, &generateStats{start: time.Now()}, true) +} + +// VerifyState takes the whole snapshot tree as the input, traverses all the accounts +// as well as the corresponding storages and compares the re-computed hash with the +// original one(state root and the storage root). +func VerifyState(snaptree *Tree, root common.Hash) error { + acctIt, err := snaptree.AccountIterator(root, common.Hash{}) + if err != nil { + return err + } + defer acctIt.Release() + + got, err := generateTrieRoot(acctIt, common.Hash{}, stdGenerate, func(account common.Hash, stat *generateStats) common.Hash { + storageIt, err := snaptree.StorageIterator(root, account, common.Hash{}) + if err != nil { + return common.Hash{} + } + defer storageIt.Release() + + hash, err := generateTrieRoot(storageIt, account, stdGenerate, nil, stat, false) + if err != nil { + return common.Hash{} + } + return hash + }, &generateStats{start: time.Now()}, true) + + if err != nil { + return err + } + if got != root { + return fmt.Errorf("state root hash mismatch: got %x, want %x", got, root) + } + return nil +} + +// generateStats is a collection of statistics gathered by the trie generator +// for logging purposes. +type generateStats struct { + accounts uint64 + slots uint64 + curAccount common.Hash + curSlot common.Hash + start time.Time + lock sync.RWMutex +} + +// progress records the progress trie generator made recently. +func (stat *generateStats) progress(accounts, slots uint64, curAccount common.Hash, curSlot common.Hash) { + stat.lock.Lock() + defer stat.lock.Unlock() + + stat.accounts += accounts + stat.slots += slots + stat.curAccount = curAccount + stat.curSlot = curSlot +} + +// report prints the cumulative progress statistic smartly. +func (stat *generateStats) report() { + stat.lock.RLock() + defer stat.lock.RUnlock() + + var ctx []interface{} + if stat.curSlot != (common.Hash{}) { + ctx = append(ctx, []interface{}{ + "in", stat.curAccount, + "at", stat.curSlot, + }...) + } else { + ctx = append(ctx, []interface{}{"at", stat.curAccount}...) + } + // Add the usual measurements + ctx = append(ctx, []interface{}{"accounts", stat.accounts}...) + if stat.slots != 0 { + ctx = append(ctx, []interface{}{"slots", stat.slots}...) + } + ctx = append(ctx, []interface{}{"elapsed", common.PrettyDuration(time.Since(stat.start))}...) + log.Info("Generating trie hash from snapshot", ctx...) +} + +// reportDone prints the last log when the whole generation is finished. +func (stat *generateStats) reportDone() { + stat.lock.RLock() + defer stat.lock.RUnlock() + + var ctx []interface{} + ctx = append(ctx, []interface{}{"accounts", stat.accounts}...) + if stat.slots != 0 { + ctx = append(ctx, []interface{}{"slots", stat.slots}...) + } + ctx = append(ctx, []interface{}{"elapsed", common.PrettyDuration(time.Since(stat.start))}...) + log.Info("Generated trie hash from snapshot", ctx...) +} + +// generateTrieRoot generates the trie hash based on the snapshot iterator. +// It can be used for generating account trie, storage trie or even the +// whole state which connects the accounts and the corresponding storages. +func generateTrieRoot(it Iterator, account common.Hash, generatorFn trieGeneratorFn, leafCallback leafCallbackFn, stats *generateStats, report bool) (common.Hash, error) { + var ( + in = make(chan trieKV) // chan to pass leaves + out = make(chan common.Hash, 1) // chan to collect result + stoplog = make(chan bool, 1) // 1-size buffer, works when logging is not enabled + wg sync.WaitGroup + ) + // Spin up a go-routine for trie hash re-generation + wg.Add(1) + go func() { + defer wg.Done() + generatorFn(in, out) + }() + + // Spin up a go-routine for progress logging + if report && stats != nil { + wg.Add(1) + go func() { + defer wg.Done() + + timer := time.NewTimer(0) + defer timer.Stop() + + for { + select { + case <-timer.C: + stats.report() + timer.Reset(time.Second * 8) + case success := <-stoplog: + if success { + stats.reportDone() + } + return + } + } + }() + } + // stop is a helper function to shutdown the background threads + // and return the re-generated trie hash. + stop := func(success bool) common.Hash { + close(in) + result := <-out + stoplog <- success + wg.Wait() + return result + } + var ( + logged = time.Now() + processed = uint64(0) + leaf trieKV + last common.Hash + ) + // Start to feed leaves + for it.Next() { + if account == (common.Hash{}) { + var ( + err error + fullData []byte + ) + if leafCallback == nil { + fullData, err = FullAccountRLP(it.(AccountIterator).Account()) + if err != nil { + stop(false) + return common.Hash{}, err + } + } else { + account, err := FullAccount(it.(AccountIterator).Account()) + if err != nil { + stop(false) + return common.Hash{}, err + } + // Apply the leaf callback. Normally the callback is used to traverse + // the storage trie and re-generate the subtrie root. + subroot := leafCallback(it.Hash(), stats) + if !bytes.Equal(account.Root, subroot.Bytes()) { + stop(false) + return common.Hash{}, fmt.Errorf("invalid subroot(%x), want %x, got %x", it.Hash(), account.Root, subroot) + } + fullData, err = rlp.EncodeToBytes(account) + if err != nil { + stop(false) + return common.Hash{}, err + } + } + leaf = trieKV{it.Hash(), fullData} + } else { + leaf = trieKV{it.Hash(), common.CopyBytes(it.(StorageIterator).Slot())} + } + in <- leaf + + // Accumulate the generaation statistic if it's required. + processed++ + if time.Since(logged) > 3*time.Second && stats != nil { + if account == (common.Hash{}) { + stats.progress(processed, 0, it.Hash(), common.Hash{}) + } else { + stats.progress(0, processed, account, it.Hash()) + } + logged, processed = time.Now(), 0 + } + last = it.Hash() + } + // Commit the last part statistic. + if processed > 0 && stats != nil { + if account == (common.Hash{}) { + stats.progress(processed, 0, last, common.Hash{}) + } else { + stats.progress(0, processed, account, last) + } + } + result := stop(true) + return result, nil +} + +// stdGenerate is a very basic hexary trie builder which uses the same Trie +// as the rest of geth, with no enhancements or optimizations +func stdGenerate(in chan (trieKV), out chan (common.Hash)) { + t, _ := trie.New(common.Hash{}, trie.NewDatabase(memorydb.New())) + for leaf := range in { + t.TryUpdate(leaf.key[:], leaf.value) + } + out <- t.Hash() +} diff --git a/core/state/snapshot/difflayer.go b/core/state/snapshot/difflayer.go new file mode 100644 index 0000000..0aef6cf --- /dev/null +++ b/core/state/snapshot/difflayer.go @@ -0,0 +1,553 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package snapshot + +import ( + "encoding/binary" + "fmt" + "math" + "math/rand" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" + "github.com/steakknife/bloomfilter" +) + +var ( + // aggregatorMemoryLimit is the maximum size of the bottom-most diff layer + // that aggregates the writes from above until it's flushed into the disk + // layer. + // + // Note, bumping this up might drastically increase the size of the bloom + // filters that's stored in every diff layer. Don't do that without fully + // understanding all the implications. + aggregatorMemoryLimit = uint64(4 * 1024 * 1024) + + // aggregatorItemLimit is an approximate number of items that will end up + // in the agregator layer before it's flushed out to disk. A plain account + // weighs around 14B (+hash), a storage slot 32B (+hash), a deleted slot + // 0B (+hash). Slots are mostly set/unset in lockstep, so thet average at + // 16B (+hash). All in all, the average entry seems to be 15+32=47B. Use a + // smaller number to be on the safe side. + aggregatorItemLimit = aggregatorMemoryLimit / 42 + + // bloomTargetError is the target false positive rate when the aggregator + // layer is at its fullest. The actual value will probably move around up + // and down from this number, it's mostly a ballpark figure. + // + // Note, dropping this down might drastically increase the size of the bloom + // filters that's stored in every diff layer. Don't do that without fully + // understanding all the implications. + bloomTargetError = 0.02 + + // bloomSize is the ideal bloom filter size given the maximum number of items + // it's expected to hold and the target false positive error rate. + bloomSize = math.Ceil(float64(aggregatorItemLimit) * math.Log(bloomTargetError) / math.Log(1/math.Pow(2, math.Log(2)))) + + // bloomFuncs is the ideal number of bits a single entry should set in the + // bloom filter to keep its size to a minimum (given it's size and maximum + // entry count). + bloomFuncs = math.Round((bloomSize / float64(aggregatorItemLimit)) * math.Log(2)) + + // the bloom offsets are runtime constants which determines which part of the + // the account/storage hash the hasher functions looks at, to determine the + // bloom key for an account/slot. This is randomized at init(), so that the + // global population of nodes do not all display the exact same behaviour with + // regards to bloom content + bloomDestructHasherOffset = 0 + bloomAccountHasherOffset = 0 + bloomStorageHasherOffset = 0 +) + +func init() { + // Init the bloom offsets in the range [0:24] (requires 8 bytes) + bloomDestructHasherOffset = rand.Intn(25) + bloomAccountHasherOffset = rand.Intn(25) + bloomStorageHasherOffset = rand.Intn(25) + + // The destruct and account blooms must be different, as the storage slots + // will check for destruction too for every bloom miss. It should not collide + // with modified accounts. + for bloomAccountHasherOffset == bloomDestructHasherOffset { + bloomAccountHasherOffset = rand.Intn(25) + } +} + +// diffLayer represents a collection of modifications made to a state snapshot +// after running a block on top. It contains one sorted list for the account trie +// and one-one list for each storage tries. +// +// The goal of a diff layer is to act as a journal, tracking recent modifications +// made to the state, that have not yet graduated into a semi-immutable state. +type diffLayer struct { + origin *diskLayer // Base disk layer to directly use on bloom misses + parent snapshot // Parent snapshot modified by this one, never nil + memory uint64 // Approximate guess as to how much memory we use + + root common.Hash // Root hash to which this snapshot diff belongs to + stale uint32 // Signals that the layer became stale (state progressed) + + // destructSet is a very special helper marker. If an account is marked as + // deleted, then it's recorded in this set. However it's allowed that an account + // is included here but still available in other sets(e.g. storageData). The + // reason is the diff layer includes all the changes in a *block*. It can + // happen that in the tx_1, account A is self-destructed while in the tx_2 + // it's recreated. But we still need this marker to indicate the "old" A is + // deleted, all data in other set belongs to the "new" A. + destructSet map[common.Hash]struct{} // Keyed markers for deleted (and potentially) recreated accounts + accountList []common.Hash // List of account for iteration. If it exists, it's sorted, otherwise it's nil + accountData map[common.Hash][]byte // Keyed accounts for direct retrival (nil means deleted) + storageList map[common.Hash][]common.Hash // List of storage slots for iterated retrievals, one per account. Any existing lists are sorted if non-nil + storageData map[common.Hash]map[common.Hash][]byte // Keyed storage slots for direct retrival. one per account (nil means deleted) + + diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer + + lock sync.RWMutex +} + +// destructBloomHasher is a wrapper around a common.Hash to satisfy the interface +// API requirements of the bloom library used. It's used to convert a destruct +// event into a 64 bit mini hash. +type destructBloomHasher common.Hash + +func (h destructBloomHasher) Write(p []byte) (n int, err error) { panic("not implemented") } +func (h destructBloomHasher) Sum(b []byte) []byte { panic("not implemented") } +func (h destructBloomHasher) Reset() { panic("not implemented") } +func (h destructBloomHasher) BlockSize() int { panic("not implemented") } +func (h destructBloomHasher) Size() int { return 8 } +func (h destructBloomHasher) Sum64() uint64 { + return binary.BigEndian.Uint64(h[bloomDestructHasherOffset : bloomDestructHasherOffset+8]) +} + +// accountBloomHasher is a wrapper around a common.Hash to satisfy the interface +// API requirements of the bloom library used. It's used to convert an account +// hash into a 64 bit mini hash. +type accountBloomHasher common.Hash + +func (h accountBloomHasher) Write(p []byte) (n int, err error) { panic("not implemented") } +func (h accountBloomHasher) Sum(b []byte) []byte { panic("not implemented") } +func (h accountBloomHasher) Reset() { panic("not implemented") } +func (h accountBloomHasher) BlockSize() int { panic("not implemented") } +func (h accountBloomHasher) Size() int { return 8 } +func (h accountBloomHasher) Sum64() uint64 { + return binary.BigEndian.Uint64(h[bloomAccountHasherOffset : bloomAccountHasherOffset+8]) +} + +// storageBloomHasher is a wrapper around a [2]common.Hash to satisfy the interface +// API requirements of the bloom library used. It's used to convert an account +// hash into a 64 bit mini hash. +type storageBloomHasher [2]common.Hash + +func (h storageBloomHasher) Write(p []byte) (n int, err error) { panic("not implemented") } +func (h storageBloomHasher) Sum(b []byte) []byte { panic("not implemented") } +func (h storageBloomHasher) Reset() { panic("not implemented") } +func (h storageBloomHasher) BlockSize() int { panic("not implemented") } +func (h storageBloomHasher) Size() int { return 8 } +func (h storageBloomHasher) Sum64() uint64 { + return binary.BigEndian.Uint64(h[0][bloomStorageHasherOffset:bloomStorageHasherOffset+8]) ^ + binary.BigEndian.Uint64(h[1][bloomStorageHasherOffset:bloomStorageHasherOffset+8]) +} + +// newDiffLayer creates a new diff on top of an existing snapshot, whether that's a low +// level persistent database or a hierarchical diff already. +func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer { + // Create the new layer with some pre-allocated data segments + dl := &diffLayer{ + parent: parent, + root: root, + destructSet: destructs, + accountData: accounts, + storageData: storage, + storageList: make(map[common.Hash][]common.Hash), + } + switch parent := parent.(type) { + case *diskLayer: + dl.rebloom(parent) + case *diffLayer: + dl.rebloom(parent.origin) + default: + panic("unknown parent type") + } + // Sanity check that accounts or storage slots are never nil + for accountHash, blob := range accounts { + if blob == nil { + panic(fmt.Sprintf("account %#x nil", accountHash)) + } + } + for accountHash, slots := range storage { + if slots == nil { + panic(fmt.Sprintf("storage %#x nil", accountHash)) + } + } + // Determine memory size and track the dirty writes + for _, data := range accounts { + dl.memory += uint64(common.HashLength + len(data)) + snapshotDirtyAccountWriteMeter.Mark(int64(len(data))) + } + // Determine memory size and track the dirty writes + for _, slots := range storage { + for _, data := range slots { + dl.memory += uint64(common.HashLength + len(data)) + snapshotDirtyStorageWriteMeter.Mark(int64(len(data))) + } + } + dl.memory += uint64(len(destructs) * common.HashLength) + return dl +} + +// rebloom discards the layer's current bloom and rebuilds it from scratch based +// on the parent's and the local diffs. +func (dl *diffLayer) rebloom(origin *diskLayer) { + dl.lock.Lock() + defer dl.lock.Unlock() + + defer func(start time.Time) { + snapshotBloomIndexTimer.Update(time.Since(start)) + }(time.Now()) + + // Inject the new origin that triggered the rebloom + dl.origin = origin + + // Retrieve the parent bloom or create a fresh empty one + if parent, ok := dl.parent.(*diffLayer); ok { + parent.lock.RLock() + dl.diffed, _ = parent.diffed.Copy() + parent.lock.RUnlock() + } else { + dl.diffed, _ = bloomfilter.New(uint64(bloomSize), uint64(bloomFuncs)) + } + // Iterate over all the accounts and storage slots and index them + for hash := range dl.destructSet { + dl.diffed.Add(destructBloomHasher(hash)) + } + for hash := range dl.accountData { + dl.diffed.Add(accountBloomHasher(hash)) + } + for accountHash, slots := range dl.storageData { + for storageHash := range slots { + dl.diffed.Add(storageBloomHasher{accountHash, storageHash}) + } + } + // Calculate the current false positive rate and update the error rate meter. + // This is a bit cheating because subsequent layers will overwrite it, but it + // should be fine, we're only interested in ballpark figures. + k := float64(dl.diffed.K()) + n := float64(dl.diffed.N()) + m := float64(dl.diffed.M()) + snapshotBloomErrorGauge.Update(math.Pow(1.0-math.Exp((-k)*(n+0.5)/(m-1)), k)) +} + +// Root returns the root hash for which this snapshot was made. +func (dl *diffLayer) Root() common.Hash { + return dl.root +} + +// Parent returns the subsequent layer of a diff layer. +func (dl *diffLayer) Parent() snapshot { + return dl.parent +} + +// Stale return whether this layer has become stale (was flattened across) or if +// it's still live. +func (dl *diffLayer) Stale() bool { + return atomic.LoadUint32(&dl.stale) != 0 +} + +// Account directly retrieves the account associated with a particular hash in +// the snapshot slim data format. +func (dl *diffLayer) Account(hash common.Hash) (*Account, error) { + data, err := dl.AccountRLP(hash) + if err != nil { + return nil, err + } + if len(data) == 0 { // can be both nil and []byte{} + return nil, nil + } + account := new(Account) + if err := rlp.DecodeBytes(data, account); err != nil { + panic(err) + } + return account, nil +} + +// AccountRLP directly retrieves the account RLP associated with a particular +// hash in the snapshot slim data format. +// +// Note the returned account is not a copy, please don't modify it. +func (dl *diffLayer) AccountRLP(hash common.Hash) ([]byte, error) { + // Check the bloom filter first whether there's even a point in reaching into + // all the maps in all the layers below + dl.lock.RLock() + hit := dl.diffed.Contains(accountBloomHasher(hash)) + if !hit { + hit = dl.diffed.Contains(destructBloomHasher(hash)) + } + dl.lock.RUnlock() + + // If the bloom filter misses, don't even bother with traversing the memory + // diff layers, reach straight into the bottom persistent disk layer + if !hit { + snapshotBloomAccountMissMeter.Mark(1) + return dl.origin.AccountRLP(hash) + } + // The bloom filter hit, start poking in the internal maps + return dl.accountRLP(hash, 0) +} + +// accountRLP is an internal version of AccountRLP that skips the bloom filter +// checks and uses the internal maps to try and retrieve the data. It's meant +// to be used if a higher layer's bloom filter hit already. +func (dl *diffLayer) accountRLP(hash common.Hash, depth int) ([]byte, error) { + dl.lock.RLock() + defer dl.lock.RUnlock() + + // If the layer was flattened into, consider it invalid (any live reference to + // the original should be marked as unusable). + if dl.Stale() { + return nil, ErrSnapshotStale + } + // If the account is known locally, return it + if data, ok := dl.accountData[hash]; ok { + snapshotDirtyAccountHitMeter.Mark(1) + snapshotDirtyAccountHitDepthHist.Update(int64(depth)) + snapshotDirtyAccountReadMeter.Mark(int64(len(data))) + snapshotBloomAccountTrueHitMeter.Mark(1) + return data, nil + } + // If the account is known locally, but deleted, return it + if _, ok := dl.destructSet[hash]; ok { + snapshotDirtyAccountHitMeter.Mark(1) + snapshotDirtyAccountHitDepthHist.Update(int64(depth)) + snapshotDirtyAccountInexMeter.Mark(1) + snapshotBloomAccountTrueHitMeter.Mark(1) + return nil, nil + } + // Account unknown to this diff, resolve from parent + if diff, ok := dl.parent.(*diffLayer); ok { + return diff.accountRLP(hash, depth+1) + } + // Failed to resolve through diff layers, mark a bloom error and use the disk + snapshotBloomAccountFalseHitMeter.Mark(1) + return dl.parent.AccountRLP(hash) +} + +// Storage directly retrieves the storage data associated with a particular hash, +// within a particular account. If the slot is unknown to this diff, it's parent +// is consulted. +// +// Note the returned slot is not a copy, please don't modify it. +func (dl *diffLayer) Storage(accountHash, storageHash common.Hash) ([]byte, error) { + // Check the bloom filter first whether there's even a point in reaching into + // all the maps in all the layers below + dl.lock.RLock() + hit := dl.diffed.Contains(storageBloomHasher{accountHash, storageHash}) + if !hit { + hit = dl.diffed.Contains(destructBloomHasher(accountHash)) + } + dl.lock.RUnlock() + + // If the bloom filter misses, don't even bother with traversing the memory + // diff layers, reach straight into the bottom persistent disk layer + if !hit { + snapshotBloomStorageMissMeter.Mark(1) + return dl.origin.Storage(accountHash, storageHash) + } + // The bloom filter hit, start poking in the internal maps + return dl.storage(accountHash, storageHash, 0) +} + +// storage is an internal version of Storage that skips the bloom filter checks +// and uses the internal maps to try and retrieve the data. It's meant to be +// used if a higher layer's bloom filter hit already. +func (dl *diffLayer) storage(accountHash, storageHash common.Hash, depth int) ([]byte, error) { + dl.lock.RLock() + defer dl.lock.RUnlock() + + // If the layer was flattened into, consider it invalid (any live reference to + // the original should be marked as unusable). + if dl.Stale() { + return nil, ErrSnapshotStale + } + // If the account is known locally, try to resolve the slot locally + if storage, ok := dl.storageData[accountHash]; ok { + if data, ok := storage[storageHash]; ok { + snapshotDirtyStorageHitMeter.Mark(1) + snapshotDirtyStorageHitDepthHist.Update(int64(depth)) + if n := len(data); n > 0 { + snapshotDirtyStorageReadMeter.Mark(int64(n)) + } else { + snapshotDirtyStorageInexMeter.Mark(1) + } + snapshotBloomStorageTrueHitMeter.Mark(1) + return data, nil + } + } + // If the account is known locally, but deleted, return an empty slot + if _, ok := dl.destructSet[accountHash]; ok { + snapshotDirtyStorageHitMeter.Mark(1) + snapshotDirtyStorageHitDepthHist.Update(int64(depth)) + snapshotDirtyStorageInexMeter.Mark(1) + snapshotBloomStorageTrueHitMeter.Mark(1) + return nil, nil + } + // Storage slot unknown to this diff, resolve from parent + if diff, ok := dl.parent.(*diffLayer); ok { + return diff.storage(accountHash, storageHash, depth+1) + } + // Failed to resolve through diff layers, mark a bloom error and use the disk + snapshotBloomStorageFalseHitMeter.Mark(1) + return dl.parent.Storage(accountHash, storageHash) +} + +// Update creates a new layer on top of the existing snapshot diff tree with +// the specified data items. +func (dl *diffLayer) Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer { + return newDiffLayer(dl, blockRoot, destructs, accounts, storage) +} + +// flatten pushes all data from this point downwards, flattening everything into +// a single diff at the bottom. Since usually the lowermost diff is the largest, +// the flattening builds up from there in reverse. +func (dl *diffLayer) flatten() snapshot { + // If the parent is not diff, we're the first in line, return unmodified + parent, ok := dl.parent.(*diffLayer) + if !ok { + return dl + } + // Parent is a diff, flatten it first (note, apart from weird corned cases, + // flatten will realistically only ever merge 1 layer, so there's no need to + // be smarter about grouping flattens together). + parent = parent.flatten().(*diffLayer) + + parent.lock.Lock() + defer parent.lock.Unlock() + + // Before actually writing all our data to the parent, first ensure that the + // parent hasn't been 'corrupted' by someone else already flattening into it + if atomic.SwapUint32(&parent.stale, 1) != 0 { + panic("parent diff layer is stale") // we've flattened into the same parent from two children, boo + } + // Overwrite all the updated accounts blindly, merge the sorted list + for hash := range dl.destructSet { + parent.destructSet[hash] = struct{}{} + delete(parent.accountData, hash) + delete(parent.storageData, hash) + } + for hash, data := range dl.accountData { + parent.accountData[hash] = data + } + // Overwrite all the updated storage slots (individually) + for accountHash, storage := range dl.storageData { + // If storage didn't exist (or was deleted) in the parent, overwrite blindly + if _, ok := parent.storageData[accountHash]; !ok { + parent.storageData[accountHash] = storage + continue + } + // Storage exists in both parent and child, merge the slots + comboData := parent.storageData[accountHash] + for storageHash, data := range storage { + comboData[storageHash] = data + } + parent.storageData[accountHash] = comboData + } + // Return the combo parent + return &diffLayer{ + parent: parent.parent, + origin: parent.origin, + root: dl.root, + destructSet: parent.destructSet, + accountData: parent.accountData, + storageData: parent.storageData, + storageList: make(map[common.Hash][]common.Hash), + diffed: dl.diffed, + memory: parent.memory + dl.memory, + } +} + +// AccountList returns a sorted list of all accounts in this difflayer, including +// the deleted ones. +// +// Note, the returned slice is not a copy, so do not modify it. +func (dl *diffLayer) AccountList() []common.Hash { + // If an old list already exists, return it + dl.lock.RLock() + list := dl.accountList + dl.lock.RUnlock() + + if list != nil { + return list + } + // No old sorted account list exists, generate a new one + dl.lock.Lock() + defer dl.lock.Unlock() + + dl.accountList = make([]common.Hash, 0, len(dl.destructSet)+len(dl.accountData)) + for hash := range dl.accountData { + dl.accountList = append(dl.accountList, hash) + } + for hash := range dl.destructSet { + if _, ok := dl.accountData[hash]; !ok { + dl.accountList = append(dl.accountList, hash) + } + } + sort.Sort(hashes(dl.accountList)) + dl.memory += uint64(len(dl.accountList) * common.HashLength) + return dl.accountList +} + +// StorageList returns a sorted list of all storage slot hashes in this difflayer +// for the given account. If the whole storage is destructed in this layer, then +// an additional flag *destructed = true* will be returned, otherwise the flag is +// false. Besides, the returned list will include the hash of deleted storage slot. +// Note a special case is an account is deleted in a prior tx but is recreated in +// the following tx with some storage slots set. In this case the returned list is +// not empty but the flag is true. +// +// Note, the returned slice is not a copy, so do not modify it. +func (dl *diffLayer) StorageList(accountHash common.Hash) ([]common.Hash, bool) { + dl.lock.RLock() + _, destructed := dl.destructSet[accountHash] + if _, ok := dl.storageData[accountHash]; !ok { + // Account not tracked by this layer + dl.lock.RUnlock() + return nil, destructed + } + // If an old list already exists, return it + if list, exist := dl.storageList[accountHash]; exist { + dl.lock.RUnlock() + return list, destructed // the cached list can't be nil + } + dl.lock.RUnlock() + + // No old sorted account list exists, generate a new one + dl.lock.Lock() + defer dl.lock.Unlock() + + storageMap := dl.storageData[accountHash] + storageList := make([]common.Hash, 0, len(storageMap)) + for k := range storageMap { + storageList = append(storageList, k) + } + sort.Sort(hashes(storageList)) + dl.storageList[accountHash] = storageList + dl.memory += uint64(len(dl.storageList)*common.HashLength + common.HashLength) + return storageList, destructed +} diff --git a/core/state/snapshot/disklayer.go b/core/state/snapshot/disklayer.go new file mode 100644 index 0000000..496edaa --- /dev/null +++ b/core/state/snapshot/disklayer.go @@ -0,0 +1,166 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package snapshot + +import ( + "bytes" + "sync" + + "github.com/VictoriaMetrics/fastcache" + "github.com/ava-labs/coreth/core/rawdb" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" +) + +// diskLayer is a low level persistent snapshot built on top of a key-value store. +type diskLayer struct { + diskdb ethdb.KeyValueStore // Key-value store containing the base snapshot + triedb *trie.Database // Trie node cache for reconstuction purposes + cache *fastcache.Cache // Cache to avoid hitting the disk for direct access + + root common.Hash // Root hash of the base snapshot + stale bool // Signals that the layer became stale (state progressed) + + genMarker []byte // Marker for the state that's indexed during initial layer generation + genPending chan struct{} // Notification channel when generation is done (test synchronicity) + genAbort chan chan *generatorStats // Notification channel to abort generating the snapshot in this layer + + lock sync.RWMutex +} + +// Root returns root hash for which this snapshot was made. +func (dl *diskLayer) Root() common.Hash { + return dl.root +} + +// Parent always returns nil as there's no layer below the disk. +func (dl *diskLayer) Parent() snapshot { + return nil +} + +// Stale return whether this layer has become stale (was flattened across) or if +// it's still live. +func (dl *diskLayer) Stale() bool { + dl.lock.RLock() + defer dl.lock.RUnlock() + + return dl.stale +} + +// Account directly retrieves the account associated with a particular hash in +// the snapshot slim data format. +func (dl *diskLayer) Account(hash common.Hash) (*Account, error) { + data, err := dl.AccountRLP(hash) + if err != nil { + return nil, err + } + if len(data) == 0 { // can be both nil and []byte{} + return nil, nil + } + account := new(Account) + if err := rlp.DecodeBytes(data, account); err != nil { + panic(err) + } + return account, nil +} + +// AccountRLP directly retrieves the account RLP associated with a particular +// hash in the snapshot slim data format. +func (dl *diskLayer) AccountRLP(hash common.Hash) ([]byte, error) { + dl.lock.RLock() + defer dl.lock.RUnlock() + + // If the layer was flattened into, consider it invalid (any live reference to + // the original should be marked as unusable). + if dl.stale { + return nil, ErrSnapshotStale + } + // If the layer is being generated, ensure the requested hash has already been + // covered by the generator. + if dl.genMarker != nil && bytes.Compare(hash[:], dl.genMarker) > 0 { + return nil, ErrNotCoveredYet + } + // If we're in the disk layer, all diff layers missed + snapshotDirtyAccountMissMeter.Mark(1) + + // Try to retrieve the account from the memory cache + if blob, found := dl.cache.HasGet(nil, hash[:]); found { + snapshotCleanAccountHitMeter.Mark(1) + snapshotCleanAccountReadMeter.Mark(int64(len(blob))) + return blob, nil + } + // Cache doesn't contain account, pull from disk and cache for later + blob := rawdb.ReadAccountSnapshot(dl.diskdb, hash) + dl.cache.Set(hash[:], blob) + + snapshotCleanAccountMissMeter.Mark(1) + if n := len(blob); n > 0 { + snapshotCleanAccountWriteMeter.Mark(int64(n)) + } else { + snapshotCleanAccountInexMeter.Mark(1) + } + return blob, nil +} + +// Storage directly retrieves the storage data associated with a particular hash, +// within a particular account. +func (dl *diskLayer) Storage(accountHash, storageHash common.Hash) ([]byte, error) { + dl.lock.RLock() + defer dl.lock.RUnlock() + + // If the layer was flattened into, consider it invalid (any live reference to + // the original should be marked as unusable). + if dl.stale { + return nil, ErrSnapshotStale + } + key := append(accountHash[:], storageHash[:]...) + + // If the layer is being generated, ensure the requested hash has already been + // covered by the generator. + if dl.genMarker != nil && bytes.Compare(key, dl.genMarker) > 0 { + return nil, ErrNotCoveredYet + } + // If we're in the disk layer, all diff layers missed + snapshotDirtyStorageMissMeter.Mark(1) + + // Try to retrieve the storage slot from the memory cache + if blob, found := dl.cache.HasGet(nil, key); found { + snapshotCleanStorageHitMeter.Mark(1) + snapshotCleanStorageReadMeter.Mark(int64(len(blob))) + return blob, nil + } + // Cache doesn't contain storage slot, pull from disk and cache for later + blob := rawdb.ReadStorageSnapshot(dl.diskdb, accountHash, storageHash) + dl.cache.Set(key, blob) + + snapshotCleanStorageMissMeter.Mark(1) + if n := len(blob); n > 0 { + snapshotCleanStorageWriteMeter.Mark(int64(n)) + } else { + snapshotCleanStorageInexMeter.Mark(1) + } + return blob, nil +} + +// Update creates a new layer on top of the existing snapshot diff tree with +// the specified data items. Note, the maps are retained by the method to avoid +// copying everything. +func (dl *diskLayer) Update(blockHash common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer { + return newDiffLayer(dl, blockHash, destructs, accounts, storage) +} diff --git a/core/state/snapshot/generate.go b/core/state/snapshot/generate.go new file mode 100644 index 0000000..27f8dfc --- /dev/null +++ b/core/state/snapshot/generate.go @@ -0,0 +1,265 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package snapshot + +import ( + "bytes" + "encoding/binary" + "math/big" + "time" + + "github.com/VictoriaMetrics/fastcache" + "github.com/ava-labs/coreth/core/rawdb" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/math" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" +) + +var ( + // emptyRoot is the known root hash of an empty trie. + emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") + + // emptyCode is the known hash of the empty EVM bytecode. + emptyCode = crypto.Keccak256Hash(nil) +) + +// generatorStats is a collection of statistics gathered by the snapshot generator +// for logging purposes. +type generatorStats struct { + wiping chan struct{} // Notification channel if wiping is in progress + origin uint64 // Origin prefix where generation started + start time.Time // Timestamp when generation started + accounts uint64 // Number of accounts indexed + slots uint64 // Number of storage slots indexed + storage common.StorageSize // Account and storage slot size +} + +// Log creates an contextual log with the given message and the context pulled +// from the internally maintained statistics. +func (gs *generatorStats) Log(msg string, root common.Hash, marker []byte) { + var ctx []interface{} + if root != (common.Hash{}) { + ctx = append(ctx, []interface{}{"root", root}...) + } + // Figure out whether we're after or within an account + switch len(marker) { + case common.HashLength: + ctx = append(ctx, []interface{}{"at", common.BytesToHash(marker)}...) + case 2 * common.HashLength: + ctx = append(ctx, []interface{}{ + "in", common.BytesToHash(marker[:common.HashLength]), + "at", common.BytesToHash(marker[common.HashLength:]), + }...) + } + // Add the usual measurements + ctx = append(ctx, []interface{}{ + "accounts", gs.accounts, + "slots", gs.slots, + "storage", gs.storage, + "elapsed", common.PrettyDuration(time.Since(gs.start)), + }...) + // Calculate the estimated indexing time based on current stats + if len(marker) > 0 { + if done := binary.BigEndian.Uint64(marker[:8]) - gs.origin; done > 0 { + left := math.MaxUint64 - binary.BigEndian.Uint64(marker[:8]) + + speed := done/uint64(time.Since(gs.start)/time.Millisecond+1) + 1 // +1s to avoid division by zero + ctx = append(ctx, []interface{}{ + "eta", common.PrettyDuration(time.Duration(left/speed) * time.Millisecond), + }...) + } + } + log.Info(msg, ctx...) +} + +// generateSnapshot regenerates a brand new snapshot based on an existing state +// database and head block asynchronously. The snapshot is returned immediately +// and generation is continued in the background until done. +func generateSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, wiper chan struct{}) *diskLayer { + // Wipe any previously existing snapshot from the database if no wiper is + // currently in progress. + if wiper == nil { + wiper = wipeSnapshot(diskdb, true) + } + // Create a new disk layer with an initialized state marker at zero + rawdb.WriteSnapshotRoot(diskdb, root) + + base := &diskLayer{ + diskdb: diskdb, + triedb: triedb, + root: root, + cache: fastcache.New(cache * 1024 * 1024), + genMarker: []byte{}, // Initialized but empty! + genPending: make(chan struct{}), + genAbort: make(chan chan *generatorStats), + } + go base.generate(&generatorStats{wiping: wiper, start: time.Now()}) + return base +} + +// generate is a background thread that iterates over the state and storage tries, +// constructing the state snapshot. All the arguments are purely for statistics +// gethering and logging, since the method surfs the blocks as they arrive, often +// being restarted. +func (dl *diskLayer) generate(stats *generatorStats) { + // If a database wipe is in operation, wait until it's done + if stats.wiping != nil { + stats.Log("Wiper running, state snapshotting paused", common.Hash{}, dl.genMarker) + select { + // If wiper is done, resume normal mode of operation + case <-stats.wiping: + stats.wiping = nil + stats.start = time.Now() + + // If generator was aboted during wipe, return + case abort := <-dl.genAbort: + abort <- stats + return + } + } + // Create an account and state iterator pointing to the current generator marker + accTrie, err := trie.NewSecure(dl.root, dl.triedb) + if err != nil { + // The account trie is missing (GC), surf the chain until one becomes available + stats.Log("Trie missing, state snapshotting paused", dl.root, dl.genMarker) + + abort := <-dl.genAbort + abort <- stats + return + } + stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker) + + var accMarker []byte + if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that + accMarker = dl.genMarker[:common.HashLength] + } + accIt := trie.NewIterator(accTrie.NodeIterator(accMarker)) + batch := dl.diskdb.NewBatch() + + // Iterate from the previous marker and continue generating the state snapshot + logged := time.Now() + for accIt.Next() { + // Retrieve the current account and flatten it into the internal format + accountHash := common.BytesToHash(accIt.Key) + + var acc struct { + Nonce uint64 + Balance *big.Int + Root common.Hash + CodeHash []byte + IsMultiCoin bool + } + if err := rlp.DecodeBytes(accIt.Value, &acc); err != nil { + log.Crit("Invalid account encountered during snapshot creation", "err", err) + } + data := SlimAccountRLP(acc.Nonce, acc.Balance, acc.Root, acc.CodeHash, acc.IsMultiCoin) + + // If the account is not yet in-progress, write it out + if accMarker == nil || !bytes.Equal(accountHash[:], accMarker) { + rawdb.WriteAccountSnapshot(batch, accountHash, data) + stats.storage += common.StorageSize(1 + common.HashLength + len(data)) + stats.accounts++ + } + // If we've exceeded our batch allowance or termination was requested, flush to disk + var abort chan *generatorStats + select { + case abort = <-dl.genAbort: + default: + } + if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { + // Only write and set the marker if we actually did something useful + if batch.ValueSize() > 0 { + batch.Write() + batch.Reset() + + dl.lock.Lock() + dl.genMarker = accountHash[:] + dl.lock.Unlock() + } + if abort != nil { + stats.Log("Aborting state snapshot generation", dl.root, accountHash[:]) + abort <- stats + return + } + } + // If the account is in-progress, continue where we left off (otherwise iterate all) + if acc.Root != emptyRoot { + storeTrie, err := trie.NewSecure(acc.Root, dl.triedb) + if err != nil { + log.Crit("Storage trie inaccessible for snapshot generation", "err", err) + } + var storeMarker []byte + if accMarker != nil && bytes.Equal(accountHash[:], accMarker) && len(dl.genMarker) > common.HashLength { + storeMarker = dl.genMarker[common.HashLength:] + } + storeIt := trie.NewIterator(storeTrie.NodeIterator(storeMarker)) + for storeIt.Next() { + rawdb.WriteStorageSnapshot(batch, accountHash, common.BytesToHash(storeIt.Key), storeIt.Value) + stats.storage += common.StorageSize(1 + 2*common.HashLength + len(storeIt.Value)) + stats.slots++ + + // If we've exceeded our batch allowance or termination was requested, flush to disk + var abort chan *generatorStats + select { + case abort = <-dl.genAbort: + default: + } + if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { + // Only write and set the marker if we actually did something useful + if batch.ValueSize() > 0 { + batch.Write() + batch.Reset() + + dl.lock.Lock() + dl.genMarker = append(accountHash[:], storeIt.Key...) + dl.lock.Unlock() + } + if abort != nil { + stats.Log("Aborting state snapshot generation", dl.root, append(accountHash[:], storeIt.Key...)) + abort <- stats + return + } + } + } + } + if time.Since(logged) > 8*time.Second { + stats.Log("Generating state snapshot", dl.root, accIt.Key) + logged = time.Now() + } + // Some account processed, unmark the marker + accMarker = nil + } + // Snapshot fully generated, set the marker to nil + if batch.ValueSize() > 0 { + batch.Write() + } + log.Info("Generated state snapshot", "accounts", stats.accounts, "slots", stats.slots, + "storage", stats.storage, "elapsed", common.PrettyDuration(time.Since(stats.start))) + + dl.lock.Lock() + dl.genMarker = nil + close(dl.genPending) + dl.lock.Unlock() + + // Someone will be looking for us, wait it out + abort := <-dl.genAbort + abort <- nil +} diff --git a/core/state/snapshot/iterator.go b/core/state/snapshot/iterator.go new file mode 100644 index 0000000..ef527ff --- /dev/null +++ b/core/state/snapshot/iterator.go @@ -0,0 +1,400 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package snapshot + +import ( + "bytes" + "fmt" + "sort" + + "github.com/ava-labs/coreth/core/rawdb" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" +) + +// Iterator is an iterator to step over all the accounts or the specific +// storage in a snapshot which may or may not be composed of multiple layers. +type Iterator interface { + // Next steps the iterator forward one element, returning false if exhausted, + // or an error if iteration failed for some reason (e.g. root being iterated + // becomes stale and garbage collected). + Next() bool + + // Error returns any failure that occurred during iteration, which might have + // caused a premature iteration exit (e.g. snapshot stack becoming stale). + Error() error + + // Hash returns the hash of the account or storage slot the iterator is + // currently at. + Hash() common.Hash + + // Release releases associated resources. Release should always succeed and + // can be called multiple times without causing error. + Release() +} + +// AccountIterator is an iterator to step over all the accounts in a snapshot, +// which may or may not be composed of multiple layers. +type AccountIterator interface { + Iterator + + // Account returns the RLP encoded slim account the iterator is currently at. + // An error will be returned if the iterator becomes invalid + Account() []byte +} + +// StorageIterator is an iterator to step over the specific storage in a snapshot, +// which may or may not be composed of multiple layers. +type StorageIterator interface { + Iterator + + // Slot returns the storage slot the iterator is currently at. An error will + // be returned if the iterator becomes invalid + Slot() []byte +} + +// diffAccountIterator is an account iterator that steps over the accounts (both +// live and deleted) contained within a single diff layer. Higher order iterators +// will use the deleted accounts to skip deeper iterators. +type diffAccountIterator struct { + // curHash is the current hash the iterator is positioned on. The field is + // explicitly tracked since the referenced diff layer might go stale after + // the iterator was positioned and we don't want to fail accessing the old + // hash as long as the iterator is not touched any more. + curHash common.Hash + + layer *diffLayer // Live layer to retrieve values from + keys []common.Hash // Keys left in the layer to iterate + fail error // Any failures encountered (stale) +} + +// AccountIterator creates an account iterator over a single diff layer. +func (dl *diffLayer) AccountIterator(seek common.Hash) AccountIterator { + // Seek out the requested starting account + hashes := dl.AccountList() + index := sort.Search(len(hashes), func(i int) bool { + return bytes.Compare(seek[:], hashes[i][:]) <= 0 + }) + // Assemble and returned the already seeked iterator + return &diffAccountIterator{ + layer: dl, + keys: hashes[index:], + } +} + +// Next steps the iterator forward one element, returning false if exhausted. +func (it *diffAccountIterator) Next() bool { + // If the iterator was already stale, consider it a programmer error. Although + // we could just return false here, triggering this path would probably mean + // somebody forgot to check for Error, so lets blow up instead of undefined + // behavior that's hard to debug. + if it.fail != nil { + panic(fmt.Sprintf("called Next of failed iterator: %v", it.fail)) + } + // Stop iterating if all keys were exhausted + if len(it.keys) == 0 { + return false + } + if it.layer.Stale() { + it.fail, it.keys = ErrSnapshotStale, nil + return false + } + // Iterator seems to be still alive, retrieve and cache the live hash + it.curHash = it.keys[0] + // key cached, shift the iterator and notify the user of success + it.keys = it.keys[1:] + return true +} + +// Error returns any failure that occurred during iteration, which might have +// caused a premature iteration exit (e.g. snapshot stack becoming stale). +func (it *diffAccountIterator) Error() error { + return it.fail +} + +// Hash returns the hash of the account the iterator is currently at. +func (it *diffAccountIterator) Hash() common.Hash { + return it.curHash +} + +// Account returns the RLP encoded slim account the iterator is currently at. +// This method may _fail_, if the underlying layer has been flattened between +// the call to Next and Acccount. That type of error will set it.Err. +// This method assumes that flattening does not delete elements from +// the accountdata mapping (writing nil into it is fine though), and will panic +// if elements have been deleted. +// +// Note the returned account is not a copy, please don't modify it. +func (it *diffAccountIterator) Account() []byte { + it.layer.lock.RLock() + blob, ok := it.layer.accountData[it.curHash] + if !ok { + if _, ok := it.layer.destructSet[it.curHash]; ok { + it.layer.lock.RUnlock() + return nil + } + panic(fmt.Sprintf("iterator referenced non-existent account: %x", it.curHash)) + } + it.layer.lock.RUnlock() + if it.layer.Stale() { + it.fail, it.keys = ErrSnapshotStale, nil + } + return blob +} + +// Release is a noop for diff account iterators as there are no held resources. +func (it *diffAccountIterator) Release() {} + +// diskAccountIterator is an account iterator that steps over the live accounts +// contained within a disk layer. +type diskAccountIterator struct { + layer *diskLayer + it ethdb.Iterator +} + +// AccountIterator creates an account iterator over a disk layer. +func (dl *diskLayer) AccountIterator(seek common.Hash) AccountIterator { + pos := common.TrimRightZeroes(seek[:]) + return &diskAccountIterator{ + layer: dl, + it: dl.diskdb.NewIterator(rawdb.SnapshotAccountPrefix, pos), + } +} + +// Next steps the iterator forward one element, returning false if exhausted. +func (it *diskAccountIterator) Next() bool { + // If the iterator was already exhausted, don't bother + if it.it == nil { + return false + } + // Try to advance the iterator and release it if we reached the end + for { + if !it.it.Next() { + it.it.Release() + it.it = nil + return false + } + if len(it.it.Key()) == len(rawdb.SnapshotAccountPrefix)+common.HashLength { + break + } + } + return true +} + +// Error returns any failure that occurred during iteration, which might have +// caused a premature iteration exit (e.g. snapshot stack becoming stale). +// +// A diff layer is immutable after creation content wise and can always be fully +// iterated without error, so this method always returns nil. +func (it *diskAccountIterator) Error() error { + if it.it == nil { + return nil // Iterator is exhausted and released + } + return it.it.Error() +} + +// Hash returns the hash of the account the iterator is currently at. +func (it *diskAccountIterator) Hash() common.Hash { + return common.BytesToHash(it.it.Key()) // The prefix will be truncated +} + +// Account returns the RLP encoded slim account the iterator is currently at. +func (it *diskAccountIterator) Account() []byte { + return it.it.Value() +} + +// Release releases the database snapshot held during iteration. +func (it *diskAccountIterator) Release() { + // The iterator is auto-released on exhaustion, so make sure it's still alive + if it.it != nil { + it.it.Release() + it.it = nil + } +} + +// diffStorageIterator is a storage iterator that steps over the specific storage +// (both live and deleted) contained within a single diff layer. Higher order +// iterators will use the deleted slot to skip deeper iterators. +type diffStorageIterator struct { + // curHash is the current hash the iterator is positioned on. The field is + // explicitly tracked since the referenced diff layer might go stale after + // the iterator was positioned and we don't want to fail accessing the old + // hash as long as the iterator is not touched any more. + curHash common.Hash + account common.Hash + + layer *diffLayer // Live layer to retrieve values from + keys []common.Hash // Keys left in the layer to iterate + fail error // Any failures encountered (stale) +} + +// StorageIterator creates a storage iterator over a single diff layer. +// Execept the storage iterator is returned, there is an additional flag +// "destructed" returned. If it's true then it means the whole storage is +// destructed in this layer(maybe recreated too), don't bother deeper layer +// for storage retrieval. +func (dl *diffLayer) StorageIterator(account common.Hash, seek common.Hash) (StorageIterator, bool) { + // Create the storage for this account even it's marked + // as destructed. The iterator is for the new one which + // just has the same address as the deleted one. + hashes, destructed := dl.StorageList(account) + index := sort.Search(len(hashes), func(i int) bool { + return bytes.Compare(seek[:], hashes[i][:]) <= 0 + }) + // Assemble and returned the already seeked iterator + return &diffStorageIterator{ + layer: dl, + account: account, + keys: hashes[index:], + }, destructed +} + +// Next steps the iterator forward one element, returning false if exhausted. +func (it *diffStorageIterator) Next() bool { + // If the iterator was already stale, consider it a programmer error. Although + // we could just return false here, triggering this path would probably mean + // somebody forgot to check for Error, so lets blow up instead of undefined + // behavior that's hard to debug. + if it.fail != nil { + panic(fmt.Sprintf("called Next of failed iterator: %v", it.fail)) + } + // Stop iterating if all keys were exhausted + if len(it.keys) == 0 { + return false + } + if it.layer.Stale() { + it.fail, it.keys = ErrSnapshotStale, nil + return false + } + // Iterator seems to be still alive, retrieve and cache the live hash + it.curHash = it.keys[0] + // key cached, shift the iterator and notify the user of success + it.keys = it.keys[1:] + return true +} + +// Error returns any failure that occurred during iteration, which might have +// caused a premature iteration exit (e.g. snapshot stack becoming stale). +func (it *diffStorageIterator) Error() error { + return it.fail +} + +// Hash returns the hash of the storage slot the iterator is currently at. +func (it *diffStorageIterator) Hash() common.Hash { + return it.curHash +} + +// Slot returns the raw storage slot value the iterator is currently at. +// This method may _fail_, if the underlying layer has been flattened between +// the call to Next and Value. That type of error will set it.Err. +// This method assumes that flattening does not delete elements from +// the storage mapping (writing nil into it is fine though), and will panic +// if elements have been deleted. +// +// Note the returned slot is not a copy, please don't modify it. +func (it *diffStorageIterator) Slot() []byte { + it.layer.lock.RLock() + storage, ok := it.layer.storageData[it.account] + if !ok { + panic(fmt.Sprintf("iterator referenced non-existent account storage: %x", it.account)) + } + // Storage slot might be nil(deleted), but it must exist + blob, ok := storage[it.curHash] + if !ok { + panic(fmt.Sprintf("iterator referenced non-existent storage slot: %x", it.curHash)) + } + it.layer.lock.RUnlock() + if it.layer.Stale() { + it.fail, it.keys = ErrSnapshotStale, nil + } + return blob +} + +// Release is a noop for diff account iterators as there are no held resources. +func (it *diffStorageIterator) Release() {} + +// diskStorageIterator is a storage iterator that steps over the live storage +// contained within a disk layer. +type diskStorageIterator struct { + layer *diskLayer + account common.Hash + it ethdb.Iterator +} + +// StorageIterator creates a storage iterator over a disk layer. +// If the whole storage is destructed, then all entries in the disk +// layer are deleted already. So the "destructed" flag returned here +// is always false. +func (dl *diskLayer) StorageIterator(account common.Hash, seek common.Hash) (StorageIterator, bool) { + pos := common.TrimRightZeroes(seek[:]) + return &diskStorageIterator{ + layer: dl, + account: account, + it: dl.diskdb.NewIterator(append(rawdb.SnapshotStoragePrefix, account.Bytes()...), pos), + }, false +} + +// Next steps the iterator forward one element, returning false if exhausted. +func (it *diskStorageIterator) Next() bool { + // If the iterator was already exhausted, don't bother + if it.it == nil { + return false + } + // Try to advance the iterator and release it if we reached the end + for { + if !it.it.Next() { + it.it.Release() + it.it = nil + return false + } + if len(it.it.Key()) == len(rawdb.SnapshotStoragePrefix)+common.HashLength+common.HashLength { + break + } + } + return true +} + +// Error returns any failure that occurred during iteration, which might have +// caused a premature iteration exit (e.g. snapshot stack becoming stale). +// +// A diff layer is immutable after creation content wise and can always be fully +// iterated without error, so this method always returns nil. +func (it *diskStorageIterator) Error() error { + if it.it == nil { + return nil // Iterator is exhausted and released + } + return it.it.Error() +} + +// Hash returns the hash of the storage slot the iterator is currently at. +func (it *diskStorageIterator) Hash() common.Hash { + return common.BytesToHash(it.it.Key()) // The prefix will be truncated +} + +// Slot returns the raw strorage slot content the iterator is currently at. +func (it *diskStorageIterator) Slot() []byte { + return it.it.Value() +} + +// Release releases the database snapshot held during iteration. +func (it *diskStorageIterator) Release() { + // The iterator is auto-released on exhaustion, so make sure it's still alive + if it.it != nil { + it.it.Release() + it.it = nil + } +} diff --git a/core/state/snapshot/iterator_binary.go b/core/state/snapshot/iterator_binary.go new file mode 100644 index 0000000..f82f750 --- /dev/null +++ b/core/state/snapshot/iterator_binary.go @@ -0,0 +1,213 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package snapshot + +import ( + "bytes" + + "github.com/ethereum/go-ethereum/common" +) + +// binaryIterator is a simplistic iterator to step over the accounts or storage +// in a snapshot, which may or may not be composed of multiple layers. Performance +// wise this iterator is slow, it's meant for cross validating the fast one, +type binaryIterator struct { + a Iterator + b Iterator + aDone bool + bDone bool + accountIterator bool + k common.Hash + account common.Hash + fail error +} + +// initBinaryAccountIterator creates a simplistic iterator to step over all the +// accounts in a slow, but eaily verifiable way. Note this function is used for +// initialization, use `newBinaryAccountIterator` as the API. +func (dl *diffLayer) initBinaryAccountIterator() Iterator { + parent, ok := dl.parent.(*diffLayer) + if !ok { + l := &binaryIterator{ + a: dl.AccountIterator(common.Hash{}), + b: dl.Parent().AccountIterator(common.Hash{}), + accountIterator: true, + } + l.aDone = !l.a.Next() + l.bDone = !l.b.Next() + return l + } + l := &binaryIterator{ + a: dl.AccountIterator(common.Hash{}), + b: parent.initBinaryAccountIterator(), + accountIterator: true, + } + l.aDone = !l.a.Next() + l.bDone = !l.b.Next() + return l +} + +// initBinaryStorageIterator creates a simplistic iterator to step over all the +// storage slots in a slow, but eaily verifiable way. Note this function is used +// for initialization, use `newBinaryStorageIterator` as the API. +func (dl *diffLayer) initBinaryStorageIterator(account common.Hash) Iterator { + parent, ok := dl.parent.(*diffLayer) + if !ok { + // If the storage in this layer is already destructed, discard all + // deeper layers but still return an valid single-branch iterator. + a, destructed := dl.StorageIterator(account, common.Hash{}) + if destructed { + l := &binaryIterator{ + a: a, + account: account, + } + l.aDone = !l.a.Next() + l.bDone = true + return l + } + // The parent is disk layer, don't need to take care "destructed" + // anymore. + b, _ := dl.Parent().StorageIterator(account, common.Hash{}) + l := &binaryIterator{ + a: a, + b: b, + account: account, + } + l.aDone = !l.a.Next() + l.bDone = !l.b.Next() + return l + } + // If the storage in this layer is already destructed, discard all + // deeper layers but still return an valid single-branch iterator. + a, destructed := dl.StorageIterator(account, common.Hash{}) + if destructed { + l := &binaryIterator{ + a: a, + account: account, + } + l.aDone = !l.a.Next() + l.bDone = true + return l + } + l := &binaryIterator{ + a: a, + b: parent.initBinaryStorageIterator(account), + account: account, + } + l.aDone = !l.a.Next() + l.bDone = !l.b.Next() + return l +} + +// Next steps the iterator forward one element, returning false if exhausted, +// or an error if iteration failed for some reason (e.g. root being iterated +// becomes stale and garbage collected). +func (it *binaryIterator) Next() bool { + if it.aDone && it.bDone { + return false + } +first: + if it.aDone { + it.k = it.b.Hash() + it.bDone = !it.b.Next() + return true + } + if it.bDone { + it.k = it.a.Hash() + it.aDone = !it.a.Next() + return true + } + nextA, nextB := it.a.Hash(), it.b.Hash() + if diff := bytes.Compare(nextA[:], nextB[:]); diff < 0 { + it.aDone = !it.a.Next() + it.k = nextA + return true + } else if diff == 0 { + // Now we need to advance one of them + it.aDone = !it.a.Next() + goto first + } + it.bDone = !it.b.Next() + it.k = nextB + return true +} + +// Error returns any failure that occurred during iteration, which might have +// caused a premature iteration exit (e.g. snapshot stack becoming stale). +func (it *binaryIterator) Error() error { + return it.fail +} + +// Hash returns the hash of the account the iterator is currently at. +func (it *binaryIterator) Hash() common.Hash { + return it.k +} + +// Account returns the RLP encoded slim account the iterator is currently at, or +// nil if the iterated snapshot stack became stale (you can check Error after +// to see if it failed or not). +// +// Note the returned account is not a copy, please don't modify it. +func (it *binaryIterator) Account() []byte { + if !it.accountIterator { + return nil + } + // The topmost iterator must be `diffAccountIterator` + blob, err := it.a.(*diffAccountIterator).layer.AccountRLP(it.k) + if err != nil { + it.fail = err + return nil + } + return blob +} + +// Slot returns the raw storage slot data the iterator is currently at, or +// nil if the iterated snapshot stack became stale (you can check Error after +// to see if it failed or not). +// +// Note the returned slot is not a copy, please don't modify it. +func (it *binaryIterator) Slot() []byte { + if it.accountIterator { + return nil + } + blob, err := it.a.(*diffStorageIterator).layer.Storage(it.account, it.k) + if err != nil { + it.fail = err + return nil + } + return blob +} + +// Release recursively releases all the iterators in the stack. +func (it *binaryIterator) Release() { + it.a.Release() + it.b.Release() +} + +// newBinaryAccountIterator creates a simplistic account iterator to step over +// all the accounts in a slow, but eaily verifiable way. +func (dl *diffLayer) newBinaryAccountIterator() AccountIterator { + iter := dl.initBinaryAccountIterator() + return iter.(AccountIterator) +} + +// newBinaryStorageIterator creates a simplistic account iterator to step over +// all the storage slots in a slow, but eaily verifiable way. +func (dl *diffLayer) newBinaryStorageIterator(account common.Hash) StorageIterator { + iter := dl.initBinaryStorageIterator(account) + return iter.(StorageIterator) +} diff --git a/core/state/snapshot/iterator_fast.go b/core/state/snapshot/iterator_fast.go new file mode 100644 index 0000000..291d529 --- /dev/null +++ b/core/state/snapshot/iterator_fast.go @@ -0,0 +1,350 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package snapshot + +import ( + "bytes" + "fmt" + "sort" + + "github.com/ethereum/go-ethereum/common" +) + +// weightedIterator is a iterator with an assigned weight. It is used to prioritise +// which account or storage slot is the correct one if multiple iterators find the +// same one (modified in multiple consecutive blocks). +type weightedIterator struct { + it Iterator + priority int +} + +// weightedIterators is a set of iterators implementing the sort.Interface. +type weightedIterators []*weightedIterator + +// Len implements sort.Interface, returning the number of active iterators. +func (its weightedIterators) Len() int { return len(its) } + +// Less implements sort.Interface, returning which of two iterators in the stack +// is before the other. +func (its weightedIterators) Less(i, j int) bool { + // Order the iterators primarily by the account hashes + hashI := its[i].it.Hash() + hashJ := its[j].it.Hash() + + switch bytes.Compare(hashI[:], hashJ[:]) { + case -1: + return true + case 1: + return false + } + // Same account/storage-slot in multiple layers, split by priority + return its[i].priority < its[j].priority +} + +// Swap implements sort.Interface, swapping two entries in the iterator stack. +func (its weightedIterators) Swap(i, j int) { + its[i], its[j] = its[j], its[i] +} + +// fastIterator is a more optimized multi-layer iterator which maintains a +// direct mapping of all iterators leading down to the bottom layer. +type fastIterator struct { + tree *Tree // Snapshot tree to reinitialize stale sub-iterators with + root common.Hash // Root hash to reinitialize stale sub-iterators through + + curAccount []byte + curSlot []byte + + iterators weightedIterators + initiated bool + account bool + fail error +} + +// newFastIterator creates a new hierarhical account or storage iterator with one +// element per diff layer. The returned combo iterator can be used to walk over +// the entire snapshot diff stack simultaneously. +func newFastIterator(tree *Tree, root common.Hash, account common.Hash, seek common.Hash, accountIterator bool) (*fastIterator, error) { + snap := tree.Snapshot(root) + if snap == nil { + return nil, fmt.Errorf("unknown snapshot: %x", root) + } + fi := &fastIterator{ + tree: tree, + root: root, + account: accountIterator, + } + current := snap.(snapshot) + for depth := 0; current != nil; depth++ { + if accountIterator { + fi.iterators = append(fi.iterators, &weightedIterator{ + it: current.AccountIterator(seek), + priority: depth, + }) + } else { + // If the whole storage is destructed in this layer, don't + // bother deeper layer anymore. But we should still keep + // the iterator for this layer, since the iterator can contain + // some valid slots which belongs to the re-created account. + it, destructed := current.StorageIterator(account, seek) + fi.iterators = append(fi.iterators, &weightedIterator{ + it: it, + priority: depth, + }) + if destructed { + break + } + } + current = current.Parent() + } + fi.init() + return fi, nil +} + +// init walks over all the iterators and resolves any clashes between them, after +// which it prepares the stack for step-by-step iteration. +func (fi *fastIterator) init() { + // Track which account hashes are iterators positioned on + var positioned = make(map[common.Hash]int) + + // Position all iterators and track how many remain live + for i := 0; i < len(fi.iterators); i++ { + // Retrieve the first element and if it clashes with a previous iterator, + // advance either the current one or the old one. Repeat until nothing is + // clashing any more. + it := fi.iterators[i] + for { + // If the iterator is exhausted, drop it off the end + if !it.it.Next() { + it.it.Release() + last := len(fi.iterators) - 1 + + fi.iterators[i] = fi.iterators[last] + fi.iterators[last] = nil + fi.iterators = fi.iterators[:last] + + i-- + break + } + // The iterator is still alive, check for collisions with previous ones + hash := it.it.Hash() + if other, exist := positioned[hash]; !exist { + positioned[hash] = i + break + } else { + // Iterators collide, one needs to be progressed, use priority to + // determine which. + // + // This whole else-block can be avoided, if we instead + // do an initial priority-sort of the iterators. If we do that, + // then we'll only wind up here if a lower-priority (preferred) iterator + // has the same value, and then we will always just continue. + // However, it costs an extra sort, so it's probably not better + if fi.iterators[other].priority < it.priority { + // The 'it' should be progressed + continue + } else { + // The 'other' should be progressed, swap them + it = fi.iterators[other] + fi.iterators[other], fi.iterators[i] = fi.iterators[i], fi.iterators[other] + continue + } + } + } + } + // Re-sort the entire list + sort.Sort(fi.iterators) + fi.initiated = false +} + +// Next steps the iterator forward one element, returning false if exhausted. +func (fi *fastIterator) Next() bool { + if len(fi.iterators) == 0 { + return false + } + if !fi.initiated { + // Don't forward first time -- we had to 'Next' once in order to + // do the sorting already + fi.initiated = true + if fi.account { + fi.curAccount = fi.iterators[0].it.(AccountIterator).Account() + } else { + fi.curSlot = fi.iterators[0].it.(StorageIterator).Slot() + } + if innerErr := fi.iterators[0].it.Error(); innerErr != nil { + fi.fail = innerErr + return false + } + if fi.curAccount != nil || fi.curSlot != nil { + return true + } + // Implicit else: we've hit a nil-account or nil-slot, and need to + // fall through to the loop below to land on something non-nil + } + // If an account or a slot is deleted in one of the layers, the key will + // still be there, but the actual value will be nil. However, the iterator + // should not export nil-values (but instead simply omit the key), so we + // need to loop here until we either + // - get a non-nil value, + // - hit an error, + // - or exhaust the iterator + for { + if !fi.next(0) { + return false // exhausted + } + if fi.account { + fi.curAccount = fi.iterators[0].it.(AccountIterator).Account() + } else { + fi.curSlot = fi.iterators[0].it.(StorageIterator).Slot() + } + if innerErr := fi.iterators[0].it.Error(); innerErr != nil { + fi.fail = innerErr + return false // error + } + if fi.curAccount != nil || fi.curSlot != nil { + break // non-nil value found + } + } + return true +} + +// next handles the next operation internally and should be invoked when we know +// that two elements in the list may have the same value. +// +// For example, if the iterated hashes become [2,3,5,5,8,9,10], then we should +// invoke next(3), which will call Next on elem 3 (the second '5') and will +// cascade along the list, applying the same operation if needed. +func (fi *fastIterator) next(idx int) bool { + // If this particular iterator got exhausted, remove it and return true (the + // next one is surely not exhausted yet, otherwise it would have been removed + // already). + if it := fi.iterators[idx].it; !it.Next() { + it.Release() + + fi.iterators = append(fi.iterators[:idx], fi.iterators[idx+1:]...) + return len(fi.iterators) > 0 + } + // If there's no one left to cascade into, return + if idx == len(fi.iterators)-1 { + return true + } + // We next-ed the iterator at 'idx', now we may have to re-sort that element + var ( + cur, next = fi.iterators[idx], fi.iterators[idx+1] + curHash, nextHash = cur.it.Hash(), next.it.Hash() + ) + if diff := bytes.Compare(curHash[:], nextHash[:]); diff < 0 { + // It is still in correct place + return true + } else if diff == 0 && cur.priority < next.priority { + // So still in correct place, but we need to iterate on the next + fi.next(idx + 1) + return true + } + // At this point, the iterator is in the wrong location, but the remaining + // list is sorted. Find out where to move the item. + clash := -1 + index := sort.Search(len(fi.iterators), func(n int) bool { + // The iterator always advances forward, so anything before the old slot + // is known to be behind us, so just skip them altogether. This actually + // is an important clause since the sort order got invalidated. + if n < idx { + return false + } + if n == len(fi.iterators)-1 { + // Can always place an elem last + return true + } + nextHash := fi.iterators[n+1].it.Hash() + if diff := bytes.Compare(curHash[:], nextHash[:]); diff < 0 { + return true + } else if diff > 0 { + return false + } + // The elem we're placing it next to has the same value, + // so whichever winds up on n+1 will need further iteraton + clash = n + 1 + + return cur.priority < fi.iterators[n+1].priority + }) + fi.move(idx, index) + if clash != -1 { + fi.next(clash) + } + return true +} + +// move advances an iterator to another position in the list. +func (fi *fastIterator) move(index, newpos int) { + elem := fi.iterators[index] + copy(fi.iterators[index:], fi.iterators[index+1:newpos+1]) + fi.iterators[newpos] = elem +} + +// Error returns any failure that occurred during iteration, which might have +// caused a premature iteration exit (e.g. snapshot stack becoming stale). +func (fi *fastIterator) Error() error { + return fi.fail +} + +// Hash returns the current key +func (fi *fastIterator) Hash() common.Hash { + return fi.iterators[0].it.Hash() +} + +// Account returns the current account blob. +// Note the returned account is not a copy, please don't modify it. +func (fi *fastIterator) Account() []byte { + return fi.curAccount +} + +// Slot returns the current storage slot. +// Note the returned slot is not a copy, please don't modify it. +func (fi *fastIterator) Slot() []byte { + return fi.curSlot +} + +// Release iterates over all the remaining live layer iterators and releases each +// of thme individually. +func (fi *fastIterator) Release() { + for _, it := range fi.iterators { + it.it.Release() + } + fi.iterators = nil +} + +// Debug is a convencience helper during testing +func (fi *fastIterator) Debug() { + for _, it := range fi.iterators { + fmt.Printf("[p=%v v=%v] ", it.priority, it.it.Hash()[0]) + } + fmt.Println() +} + +// newFastAccountIterator creates a new hierarhical account iterator with one +// element per diff layer. The returned combo iterator can be used to walk over +// the entire snapshot diff stack simultaneously. +func newFastAccountIterator(tree *Tree, root common.Hash, seek common.Hash) (AccountIterator, error) { + return newFastIterator(tree, root, common.Hash{}, seek, true) +} + +// newFastStorageIterator creates a new hierarhical storage iterator with one +// element per diff layer. The returned combo iterator can be used to walk over +// the entire snapshot diff stack simultaneously. +func newFastStorageIterator(tree *Tree, root common.Hash, account common.Hash, seek common.Hash) (StorageIterator, error) { + return newFastIterator(tree, root, account, seek, false) +} diff --git a/core/state/snapshot/journal.go b/core/state/snapshot/journal.go new file mode 100644 index 0000000..3d4c9de --- /dev/null +++ b/core/state/snapshot/journal.go @@ -0,0 +1,270 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package snapshot + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "time" + + "github.com/VictoriaMetrics/fastcache" + "github.com/ava-labs/coreth/core/rawdb" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" +) + +// journalGenerator is a disk layer entry containing the generator progress marker. +type journalGenerator struct { + Wiping bool // Whether the database was in progress of being wiped + Done bool // Whether the generator finished creating the snapshot + Marker []byte + Accounts uint64 + Slots uint64 + Storage uint64 +} + +// journalDestruct is an account deletion entry in a diffLayer's disk journal. +type journalDestruct struct { + Hash common.Hash +} + +// journalAccount is an account entry in a diffLayer's disk journal. +type journalAccount struct { + Hash common.Hash + Blob []byte +} + +// journalStorage is an account's storage map in a diffLayer's disk journal. +type journalStorage struct { + Hash common.Hash + Keys []common.Hash + Vals [][]byte +} + +// loadSnapshot loads a pre-existing state snapshot backed by a key-value store. +func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash) (snapshot, error) { + // Retrieve the block number and hash of the snapshot, failing if no snapshot + // is present in the database (or crashed mid-update). + baseRoot := rawdb.ReadSnapshotRoot(diskdb) + if baseRoot == (common.Hash{}) { + return nil, errors.New("missing or corrupted snapshot") + } + base := &diskLayer{ + diskdb: diskdb, + triedb: triedb, + cache: fastcache.New(cache * 1024 * 1024), + root: baseRoot, + } + // Retrieve the journal, it must exist since even for 0 layer it stores whether + // we've already generated the snapshot or are in progress only + journal := rawdb.ReadSnapshotJournal(diskdb) + if len(journal) == 0 { + return nil, errors.New("missing or corrupted snapshot journal") + } + r := rlp.NewStream(bytes.NewReader(journal), 0) + + // Read the snapshot generation progress for the disk layer + var generator journalGenerator + if err := r.Decode(&generator); err != nil { + return nil, fmt.Errorf("failed to load snapshot progress marker: %v", err) + } + // Load all the snapshot diffs from the journal + snapshot, err := loadDiffLayer(base, r) + if err != nil { + return nil, err + } + // Entire snapshot journal loaded, sanity check the head and return + // Journal doesn't exist, don't worry if it's not supposed to + if head := snapshot.Root(); head != root { + return nil, fmt.Errorf("head doesn't match snapshot: have %#x, want %#x", head, root) + } + // Everything loaded correctly, resume any suspended operations + if !generator.Done { + // If the generator was still wiping, restart one from scratch (fine for + // now as it's rare and the wiper deletes the stuff it touches anyway, so + // restarting won't incur a lot of extra database hops. + var wiper chan struct{} + if generator.Wiping { + log.Info("Resuming previous snapshot wipe") + wiper = wipeSnapshot(diskdb, false) + } + // Whether or not wiping was in progress, load any generator progress too + base.genMarker = generator.Marker + if base.genMarker == nil { + base.genMarker = []byte{} + } + base.genPending = make(chan struct{}) + base.genAbort = make(chan chan *generatorStats) + + var origin uint64 + if len(generator.Marker) >= 8 { + origin = binary.BigEndian.Uint64(generator.Marker) + } + go base.generate(&generatorStats{ + wiping: wiper, + origin: origin, + start: time.Now(), + accounts: generator.Accounts, + slots: generator.Slots, + storage: common.StorageSize(generator.Storage), + }) + } + return snapshot, nil +} + +// loadDiffLayer reads the next sections of a snapshot journal, reconstructing a new +// diff and verifying that it can be linked to the requested parent. +func loadDiffLayer(parent snapshot, r *rlp.Stream) (snapshot, error) { + // Read the next diff journal entry + var root common.Hash + if err := r.Decode(&root); err != nil { + // The first read may fail with EOF, marking the end of the journal + if err == io.EOF { + return parent, nil + } + return nil, fmt.Errorf("load diff root: %v", err) + } + var destructs []journalDestruct + if err := r.Decode(&destructs); err != nil { + return nil, fmt.Errorf("load diff destructs: %v", err) + } + destructSet := make(map[common.Hash]struct{}) + for _, entry := range destructs { + destructSet[entry.Hash] = struct{}{} + } + var accounts []journalAccount + if err := r.Decode(&accounts); err != nil { + return nil, fmt.Errorf("load diff accounts: %v", err) + } + accountData := make(map[common.Hash][]byte) + for _, entry := range accounts { + if len(entry.Blob) > 0 { // RLP loses nil-ness, but `[]byte{}` is not a valid item, so reinterpret that + accountData[entry.Hash] = entry.Blob + } else { + accountData[entry.Hash] = nil + } + } + var storage []journalStorage + if err := r.Decode(&storage); err != nil { + return nil, fmt.Errorf("load diff storage: %v", err) + } + storageData := make(map[common.Hash]map[common.Hash][]byte) + for _, entry := range storage { + slots := make(map[common.Hash][]byte) + for i, key := range entry.Keys { + if len(entry.Vals[i]) > 0 { // RLP loses nil-ness, but `[]byte{}` is not a valid item, so reinterpret that + slots[key] = entry.Vals[i] + } else { + slots[key] = nil + } + } + storageData[entry.Hash] = slots + } + return loadDiffLayer(newDiffLayer(parent, root, destructSet, accountData, storageData), r) +} + +// Journal writes the persistent layer generator stats into a buffer to be stored +// in the database as the snapshot journal. +func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) { + // If the snapshot is currently being generated, abort it + var stats *generatorStats + if dl.genAbort != nil { + abort := make(chan *generatorStats) + dl.genAbort <- abort + + if stats = <-abort; stats != nil { + stats.Log("Journalling in-progress snapshot", dl.root, dl.genMarker) + } + } + // Ensure the layer didn't get stale + dl.lock.RLock() + defer dl.lock.RUnlock() + + if dl.stale { + return common.Hash{}, ErrSnapshotStale + } + // Write out the generator marker + entry := journalGenerator{ + Done: dl.genMarker == nil, + Marker: dl.genMarker, + } + if stats != nil { + entry.Wiping = (stats.wiping != nil) + entry.Accounts = stats.accounts + entry.Slots = stats.slots + entry.Storage = uint64(stats.storage) + } + if err := rlp.Encode(buffer, entry); err != nil { + return common.Hash{}, err + } + return dl.root, nil +} + +// Journal writes the memory layer contents into a buffer to be stored in the +// database as the snapshot journal. +func (dl *diffLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) { + // Journal the parent first + base, err := dl.parent.Journal(buffer) + if err != nil { + return common.Hash{}, err + } + // Ensure the layer didn't get stale + dl.lock.RLock() + defer dl.lock.RUnlock() + + if dl.Stale() { + return common.Hash{}, ErrSnapshotStale + } + // Everything below was journalled, persist this layer too + if err := rlp.Encode(buffer, dl.root); err != nil { + return common.Hash{}, err + } + destructs := make([]journalDestruct, 0, len(dl.destructSet)) + for hash := range dl.destructSet { + destructs = append(destructs, journalDestruct{Hash: hash}) + } + if err := rlp.Encode(buffer, destructs); err != nil { + return common.Hash{}, err + } + accounts := make([]journalAccount, 0, len(dl.accountData)) + for hash, blob := range dl.accountData { + accounts = append(accounts, journalAccount{Hash: hash, Blob: blob}) + } + if err := rlp.Encode(buffer, accounts); err != nil { + return common.Hash{}, err + } + storage := make([]journalStorage, 0, len(dl.storageData)) + for hash, slots := range dl.storageData { + keys := make([]common.Hash, 0, len(slots)) + vals := make([][]byte, 0, len(slots)) + for key, val := range slots { + keys = append(keys, key) + vals = append(vals, val) + } + storage = append(storage, journalStorage{Hash: hash, Keys: keys, Vals: vals}) + } + if err := rlp.Encode(buffer, storage); err != nil { + return common.Hash{}, err + } + return base, nil +} diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go new file mode 100644 index 0000000..3348685 --- /dev/null +++ b/core/state/snapshot/snapshot.go @@ -0,0 +1,619 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package snapshot implements a journalled, dynamic state dump. +package snapshot + +import ( + "bytes" + "errors" + "fmt" + "sync" + "sync/atomic" + + "github.com/ava-labs/coreth/core/rawdb" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/trie" +) + +var ( + snapshotCleanAccountHitMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/hit", nil) + snapshotCleanAccountMissMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/miss", nil) + snapshotCleanAccountInexMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/inex", nil) + snapshotCleanAccountReadMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/read", nil) + snapshotCleanAccountWriteMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/write", nil) + + snapshotCleanStorageHitMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/hit", nil) + snapshotCleanStorageMissMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/miss", nil) + snapshotCleanStorageInexMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/inex", nil) + snapshotCleanStorageReadMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/read", nil) + snapshotCleanStorageWriteMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/write", nil) + + snapshotDirtyAccountHitMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/hit", nil) + snapshotDirtyAccountMissMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/miss", nil) + snapshotDirtyAccountInexMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/inex", nil) + snapshotDirtyAccountReadMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/read", nil) + snapshotDirtyAccountWriteMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/write", nil) + + snapshotDirtyStorageHitMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/hit", nil) + snapshotDirtyStorageMissMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/miss", nil) + snapshotDirtyStorageInexMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/inex", nil) + snapshotDirtyStorageReadMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/read", nil) + snapshotDirtyStorageWriteMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/write", nil) + + snapshotDirtyAccountHitDepthHist = metrics.NewRegisteredHistogram("state/snapshot/dirty/account/hit/depth", nil, metrics.NewExpDecaySample(1028, 0.015)) + snapshotDirtyStorageHitDepthHist = metrics.NewRegisteredHistogram("state/snapshot/dirty/storage/hit/depth", nil, metrics.NewExpDecaySample(1028, 0.015)) + + snapshotFlushAccountItemMeter = metrics.NewRegisteredMeter("state/snapshot/flush/account/item", nil) + snapshotFlushAccountSizeMeter = metrics.NewRegisteredMeter("state/snapshot/flush/account/size", nil) + snapshotFlushStorageItemMeter = metrics.NewRegisteredMeter("state/snapshot/flush/storage/item", nil) + snapshotFlushStorageSizeMeter = metrics.NewRegisteredMeter("state/snapshot/flush/storage/size", nil) + + snapshotBloomIndexTimer = metrics.NewRegisteredResettingTimer("state/snapshot/bloom/index", nil) + snapshotBloomErrorGauge = metrics.NewRegisteredGaugeFloat64("state/snapshot/bloom/error", nil) + + snapshotBloomAccountTrueHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/truehit", nil) + snapshotBloomAccountFalseHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/falsehit", nil) + snapshotBloomAccountMissMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/miss", nil) + + snapshotBloomStorageTrueHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/storage/truehit", nil) + snapshotBloomStorageFalseHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/storage/falsehit", nil) + snapshotBloomStorageMissMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/storage/miss", nil) + + // ErrSnapshotStale is returned from data accessors if the underlying snapshot + // layer had been invalidated due to the chain progressing forward far enough + // to not maintain the layer's original state. + ErrSnapshotStale = errors.New("snapshot stale") + + // ErrNotCoveredYet is returned from data accessors if the underlying snapshot + // is being generated currently and the requested data item is not yet in the + // range of accounts covered. + ErrNotCoveredYet = errors.New("not covered yet") + + // errSnapshotCycle is returned if a snapshot is attempted to be inserted + // that forms a cycle in the snapshot tree. + errSnapshotCycle = errors.New("snapshot cycle") +) + +// Snapshot represents the functionality supported by a snapshot storage layer. +type Snapshot interface { + // Root returns the root hash for which this snapshot was made. + Root() common.Hash + + // Account directly retrieves the account associated with a particular hash in + // the snapshot slim data format. + Account(hash common.Hash) (*Account, error) + + // AccountRLP directly retrieves the account RLP associated with a particular + // hash in the snapshot slim data format. + AccountRLP(hash common.Hash) ([]byte, error) + + // Storage directly retrieves the storage data associated with a particular hash, + // within a particular account. + Storage(accountHash, storageHash common.Hash) ([]byte, error) +} + +// snapshot is the internal version of the snapshot data layer that supports some +// additional methods compared to the public API. +type snapshot interface { + Snapshot + + // Parent returns the subsequent layer of a snapshot, or nil if the base was + // reached. + // + // Note, the method is an internal helper to avoid type switching between the + // disk and diff layers. There is no locking involved. + Parent() snapshot + + // Update creates a new layer on top of the existing snapshot diff tree with + // the specified data items. + // + // Note, the maps are retained by the method to avoid copying everything. + Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer + + // Journal commits an entire diff hierarchy to disk into a single journal entry. + // This is meant to be used during shutdown to persist the snapshot without + // flattening everything down (bad for reorgs). + Journal(buffer *bytes.Buffer) (common.Hash, error) + + // Stale return whether this layer has become stale (was flattened across) or + // if it's still live. + Stale() bool + + // AccountIterator creates an account iterator over an arbitrary layer. + AccountIterator(seek common.Hash) AccountIterator + + // StorageIterator creates a storage iterator over an arbitrary layer. + StorageIterator(account common.Hash, seek common.Hash) (StorageIterator, bool) +} + +// SnapshotTree is an Ethereum state snapshot tree. It consists of one persistent +// base layer backed by a key-value store, on top of which arbitrarily many in- +// memory diff layers are topped. The memory diffs can form a tree with branching, +// but the disk layer is singleton and common to all. If a reorg goes deeper than +// the disk layer, everything needs to be deleted. +// +// The goal of a state snapshot is twofold: to allow direct access to account and +// storage data to avoid expensive multi-level trie lookups; and to allow sorted, +// cheap iteration of the account/storage tries for sync aid. +type Tree struct { + diskdb ethdb.KeyValueStore // Persistent database to store the snapshot + triedb *trie.Database // In-memory cache to access the trie through + cache int // Megabytes permitted to use for read caches + layers map[common.Hash]snapshot // Collection of all known layers + lock sync.RWMutex +} + +// New attempts to load an already existing snapshot from a persistent key-value +// store (with a number of memory layers from a journal), ensuring that the head +// of the snapshot matches the expected one. +// +// If the snapshot is missing or inconsistent, the entirety is deleted and will +// be reconstructed from scratch based on the tries in the key-value store, on a +// background thread. +func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, async bool) *Tree { + // Create a new, empty snapshot tree + snap := &Tree{ + diskdb: diskdb, + triedb: triedb, + cache: cache, + layers: make(map[common.Hash]snapshot), + } + if !async { + defer snap.waitBuild() + } + // Attempt to load a previously persisted snapshot and rebuild one if failed + head, err := loadSnapshot(diskdb, triedb, cache, root) + if err != nil { + log.Warn("Failed to load snapshot, regenerating", "err", err) + snap.Rebuild(root) + return snap + } + // Existing snapshot loaded, seed all the layers + for head != nil { + snap.layers[head.Root()] = head + head = head.Parent() + } + return snap +} + +// waitBuild blocks until the snapshot finishes rebuilding. This method is meant +// to be used by tests to ensure we're testing what we believe we are. +func (t *Tree) waitBuild() { + // Find the rebuild termination channel + var done chan struct{} + + t.lock.RLock() + for _, layer := range t.layers { + if layer, ok := layer.(*diskLayer); ok { + done = layer.genPending + break + } + } + t.lock.RUnlock() + + // Wait until the snapshot is generated + if done != nil { + <-done + } +} + +// Snapshot retrieves a snapshot belonging to the given block root, or nil if no +// snapshot is maintained for that block. +func (t *Tree) Snapshot(blockRoot common.Hash) Snapshot { + t.lock.RLock() + defer t.lock.RUnlock() + + return t.layers[blockRoot] +} + +// Update adds a new snapshot into the tree, if that can be linked to an existing +// old parent. It is disallowed to insert a disk layer (the origin of all). +func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) error { + // Reject noop updates to avoid self-loops in the snapshot tree. This is a + // special case that can only happen for Clique networks where empty blocks + // don't modify the state (0 block subsidy). + // + // Although we could silently ignore this internally, it should be the caller's + // responsibility to avoid even attempting to insert such a snapshot. + if blockRoot == parentRoot { + return errSnapshotCycle + } + // Generate a new snapshot on top of the parent + parent := t.Snapshot(parentRoot).(snapshot) + if parent == nil { + return fmt.Errorf("parent [%#x] snapshot missing", parentRoot) + } + snap := parent.Update(blockRoot, destructs, accounts, storage) + + // Save the new snapshot for later + t.lock.Lock() + defer t.lock.Unlock() + + t.layers[snap.root] = snap + return nil +} + +// Cap traverses downwards the snapshot tree from a head block hash until the +// number of allowed layers are crossed. All layers beyond the permitted number +// are flattened downwards. +func (t *Tree) Cap(root common.Hash, layers int) error { + // Retrieve the head snapshot to cap from + snap := t.Snapshot(root) + if snap == nil { + return fmt.Errorf("snapshot [%#x] missing", root) + } + diff, ok := snap.(*diffLayer) + if !ok { + return fmt.Errorf("snapshot [%#x] is disk layer", root) + } + // If the generator is still running, use a more aggressive cap + diff.origin.lock.RLock() + if diff.origin.genMarker != nil && layers > 8 { + layers = 8 + } + diff.origin.lock.RUnlock() + + // Run the internal capping and discard all stale layers + t.lock.Lock() + defer t.lock.Unlock() + + // Flattening the bottom-most diff layer requires special casing since there's + // no child to rewire to the grandparent. In that case we can fake a temporary + // child for the capping and then remove it. + var persisted *diskLayer + + switch layers { + case 0: + // If full commit was requested, flatten the diffs and merge onto disk + diff.lock.RLock() + base := diffToDisk(diff.flatten().(*diffLayer)) + diff.lock.RUnlock() + + // Replace the entire snapshot tree with the flat base + t.layers = map[common.Hash]snapshot{base.root: base} + return nil + + case 1: + // If full flattening was requested, flatten the diffs but only merge if the + // memory limit was reached + var ( + bottom *diffLayer + base *diskLayer + ) + diff.lock.RLock() + bottom = diff.flatten().(*diffLayer) + if bottom.memory >= aggregatorMemoryLimit { + base = diffToDisk(bottom) + } + diff.lock.RUnlock() + + // If all diff layers were removed, replace the entire snapshot tree + if base != nil { + t.layers = map[common.Hash]snapshot{base.root: base} + return nil + } + // Merge the new aggregated layer into the snapshot tree, clean stales below + t.layers[bottom.root] = bottom + + default: + // Many layers requested to be retained, cap normally + persisted = t.cap(diff, layers) + } + // Remove any layer that is stale or links into a stale layer + children := make(map[common.Hash][]common.Hash) + for root, snap := range t.layers { + if diff, ok := snap.(*diffLayer); ok { + parent := diff.parent.Root() + children[parent] = append(children[parent], root) + } + } + var remove func(root common.Hash) + remove = func(root common.Hash) { + delete(t.layers, root) + for _, child := range children[root] { + remove(child) + } + delete(children, root) + } + for root, snap := range t.layers { + if snap.Stale() { + remove(root) + } + } + // If the disk layer was modified, regenerate all the cumulative blooms + if persisted != nil { + var rebloom func(root common.Hash) + rebloom = func(root common.Hash) { + if diff, ok := t.layers[root].(*diffLayer); ok { + diff.rebloom(persisted) + } + for _, child := range children[root] { + rebloom(child) + } + } + rebloom(persisted.root) + } + return nil +} + +// cap traverses downwards the diff tree until the number of allowed layers are +// crossed. All diffs beyond the permitted number are flattened downwards. If the +// layer limit is reached, memory cap is also enforced (but not before). +// +// The method returns the new disk layer if diffs were persistend into it. +func (t *Tree) cap(diff *diffLayer, layers int) *diskLayer { + // Dive until we run out of layers or reach the persistent database + for ; layers > 2; layers-- { + // If we still have diff layers below, continue down + if parent, ok := diff.parent.(*diffLayer); ok { + diff = parent + } else { + // Diff stack too shallow, return without modifications + return nil + } + } + // We're out of layers, flatten anything below, stopping if it's the disk or if + // the memory limit is not yet exceeded. + switch parent := diff.parent.(type) { + case *diskLayer: + return nil + + case *diffLayer: + // Flatten the parent into the grandparent. The flattening internally obtains a + // write lock on grandparent. + flattened := parent.flatten().(*diffLayer) + t.layers[flattened.root] = flattened + + diff.lock.Lock() + defer diff.lock.Unlock() + + diff.parent = flattened + if flattened.memory < aggregatorMemoryLimit { + // Accumulator layer is smaller than the limit, so we can abort, unless + // there's a snapshot being generated currently. In that case, the trie + // will move fron underneath the generator so we **must** merge all the + // partial data down into the snapshot and restart the generation. + if flattened.parent.(*diskLayer).genAbort == nil { + return nil + } + } + default: + panic(fmt.Sprintf("unknown data layer: %T", parent)) + } + // If the bottom-most layer is larger than our memory cap, persist to disk + bottom := diff.parent.(*diffLayer) + + bottom.lock.RLock() + base := diffToDisk(bottom) + bottom.lock.RUnlock() + + t.layers[base.root] = base + diff.parent = base + return base +} + +// diffToDisk merges a bottom-most diff into the persistent disk layer underneath +// it. The method will panic if called onto a non-bottom-most diff layer. +func diffToDisk(bottom *diffLayer) *diskLayer { + var ( + base = bottom.parent.(*diskLayer) + batch = base.diskdb.NewBatch() + stats *generatorStats + ) + // If the disk layer is running a snapshot generator, abort it + if base.genAbort != nil { + abort := make(chan *generatorStats) + base.genAbort <- abort + stats = <-abort + } + // Start by temporarily deleting the current snapshot block marker. This + // ensures that in the case of a crash, the entire snapshot is invalidated. + rawdb.DeleteSnapshotRoot(batch) + + // Mark the original base as stale as we're going to create a new wrapper + base.lock.Lock() + if base.stale { + panic("parent disk layer is stale") // we've committed into the same base from two children, boo + } + base.stale = true + base.lock.Unlock() + + // Destroy all the destructed accounts from the database + for hash := range bottom.destructSet { + // Skip any account not covered yet by the snapshot + if base.genMarker != nil && bytes.Compare(hash[:], base.genMarker) > 0 { + continue + } + // Remove all storage slots + rawdb.DeleteAccountSnapshot(batch, hash) + base.cache.Set(hash[:], nil) + + it := rawdb.IterateStorageSnapshots(base.diskdb, hash) + for it.Next() { + if key := it.Key(); len(key) == 65 { // TODO(karalabe): Yuck, we should move this into the iterator + batch.Delete(key) + base.cache.Del(key[1:]) + + snapshotFlushStorageItemMeter.Mark(1) + } + } + it.Release() + } + // Push all updated accounts into the database + for hash, data := range bottom.accountData { + // Skip any account not covered yet by the snapshot + if base.genMarker != nil && bytes.Compare(hash[:], base.genMarker) > 0 { + continue + } + // Push the account to disk + rawdb.WriteAccountSnapshot(batch, hash, data) + base.cache.Set(hash[:], data) + snapshotCleanAccountWriteMeter.Mark(int64(len(data))) + + if batch.ValueSize() > ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + log.Crit("Failed to write account snapshot", "err", err) + } + batch.Reset() + } + snapshotFlushAccountItemMeter.Mark(1) + snapshotFlushAccountSizeMeter.Mark(int64(len(data))) + } + // Push all the storage slots into the database + for accountHash, storage := range bottom.storageData { + // Skip any account not covered yet by the snapshot + if base.genMarker != nil && bytes.Compare(accountHash[:], base.genMarker) > 0 { + continue + } + // Generation might be mid-account, track that case too + midAccount := base.genMarker != nil && bytes.Equal(accountHash[:], base.genMarker[:common.HashLength]) + + for storageHash, data := range storage { + // Skip any slot not covered yet by the snapshot + if midAccount && bytes.Compare(storageHash[:], base.genMarker[common.HashLength:]) > 0 { + continue + } + if len(data) > 0 { + rawdb.WriteStorageSnapshot(batch, accountHash, storageHash, data) + base.cache.Set(append(accountHash[:], storageHash[:]...), data) + snapshotCleanStorageWriteMeter.Mark(int64(len(data))) + } else { + rawdb.DeleteStorageSnapshot(batch, accountHash, storageHash) + base.cache.Set(append(accountHash[:], storageHash[:]...), nil) + } + snapshotFlushStorageItemMeter.Mark(1) + snapshotFlushStorageSizeMeter.Mark(int64(len(data))) + } + if batch.ValueSize() > ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + log.Crit("Failed to write storage snapshot", "err", err) + } + batch.Reset() + } + } + // Update the snapshot block marker and write any remainder data + rawdb.WriteSnapshotRoot(batch, bottom.root) + if err := batch.Write(); err != nil { + log.Crit("Failed to write leftover snapshot", "err", err) + } + res := &diskLayer{ + root: bottom.root, + cache: base.cache, + diskdb: base.diskdb, + triedb: base.triedb, + genMarker: base.genMarker, + genPending: base.genPending, + } + // If snapshot generation hasn't finished yet, port over all the starts and + // continue where the previous round left off. + // + // Note, the `base.genAbort` comparison is not used normally, it's checked + // to allow the tests to play with the marker without triggering this path. + if base.genMarker != nil && base.genAbort != nil { + res.genMarker = base.genMarker + res.genAbort = make(chan chan *generatorStats) + go res.generate(stats) + } + return res +} + +// Journal commits an entire diff hierarchy to disk into a single journal entry. +// This is meant to be used during shutdown to persist the snapshot without +// flattening everything down (bad for reorgs). +// +// The method returns the root hash of the base layer that needs to be persisted +// to disk as a trie too to allow continuing any pending generation op. +func (t *Tree) Journal(root common.Hash) (common.Hash, error) { + // Retrieve the head snapshot to journal from var snap snapshot + snap := t.Snapshot(root) + if snap == nil { + return common.Hash{}, fmt.Errorf("snapshot [%#x] missing", root) + } + // Run the journaling + t.lock.Lock() + defer t.lock.Unlock() + + journal := new(bytes.Buffer) + base, err := snap.(snapshot).Journal(journal) + if err != nil { + return common.Hash{}, err + } + // Store the journal into the database and return + rawdb.WriteSnapshotJournal(t.diskdb, journal.Bytes()) + return base, nil +} + +// Rebuild wipes all available snapshot data from the persistent database and +// discard all caches and diff layers. Afterwards, it starts a new snapshot +// generator with the given root hash. +func (t *Tree) Rebuild(root common.Hash) { + t.lock.Lock() + defer t.lock.Unlock() + + // Track whether there's a wipe currently running and keep it alive if so + var wiper chan struct{} + + // Iterate over and mark all layers stale + for _, layer := range t.layers { + switch layer := layer.(type) { + case *diskLayer: + // If the base layer is generating, abort it and save + if layer.genAbort != nil { + abort := make(chan *generatorStats) + layer.genAbort <- abort + + if stats := <-abort; stats != nil { + wiper = stats.wiping + } + } + // Layer should be inactive now, mark it as stale + layer.lock.Lock() + layer.stale = true + layer.lock.Unlock() + + case *diffLayer: + // If the layer is a simple diff, simply mark as stale + layer.lock.Lock() + atomic.StoreUint32(&layer.stale, 1) + layer.lock.Unlock() + + default: + panic(fmt.Sprintf("unknown layer type: %T", layer)) + } + } + // Start generating a new snapshot from scratch on a backgroung thread. The + // generator will run a wiper first if there's not one running right now. + log.Info("Rebuilding state snapshot") + t.layers = map[common.Hash]snapshot{ + root: generateSnapshot(t.diskdb, t.triedb, t.cache, root, wiper), + } +} + +// AccountIterator creates a new account iterator for the specified root hash and +// seeks to a starting account hash. +func (t *Tree) AccountIterator(root common.Hash, seek common.Hash) (AccountIterator, error) { + return newFastAccountIterator(t, root, seek) +} + +// StorageIterator creates a new storage iterator for the specified root hash and +// account. The iterator will be move to the specific start position. +func (t *Tree) StorageIterator(root common.Hash, account common.Hash, seek common.Hash) (StorageIterator, error) { + return newFastStorageIterator(t, root, account, seek) +} diff --git a/core/state/snapshot/sort.go b/core/state/snapshot/sort.go new file mode 100644 index 0000000..8884123 --- /dev/null +++ b/core/state/snapshot/sort.go @@ -0,0 +1,36 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package snapshot + +import ( + "bytes" + + "github.com/ethereum/go-ethereum/common" +) + +// hashes is a helper to implement sort.Interface. +type hashes []common.Hash + +// Len is the number of elements in the collection. +func (hs hashes) Len() int { return len(hs) } + +// Less reports whether the element with index i should sort before the element +// with index j. +func (hs hashes) Less(i, j int) bool { return bytes.Compare(hs[i][:], hs[j][:]) < 0 } + +// Swap swaps the elements with indexes i and j. +func (hs hashes) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] } diff --git a/core/state/snapshot/wipe.go b/core/state/snapshot/wipe.go new file mode 100644 index 0000000..853a1a7 --- /dev/null +++ b/core/state/snapshot/wipe.go @@ -0,0 +1,131 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package snapshot + +import ( + "bytes" + "time" + + "github.com/ava-labs/coreth/core/rawdb" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +// wipeSnapshot starts a goroutine to iterate over the entire key-value database +// and delete all the data associated with the snapshot (accounts, storage, +// metadata). After all is done, the snapshot range of the database is compacted +// to free up unused data blocks. +func wipeSnapshot(db ethdb.KeyValueStore, full bool) chan struct{} { + // Wipe the snapshot root marker synchronously + if full { + rawdb.DeleteSnapshotRoot(db) + } + // Wipe everything else asynchronously + wiper := make(chan struct{}, 1) + go func() { + if err := wipeContent(db); err != nil { + log.Error("Failed to wipe state snapshot", "err", err) // Database close will trigger this + return + } + close(wiper) + }() + return wiper +} + +// wipeContent iterates over the entire key-value database and deletes all the +// data associated with the snapshot (accounts, storage), but not the root hash +// as the wiper is meant to run on a background thread but the root needs to be +// removed in sync to avoid data races. After all is done, the snapshot range of +// the database is compacted to free up unused data blocks. +func wipeContent(db ethdb.KeyValueStore) error { + if err := wipeKeyRange(db, "accounts", rawdb.SnapshotAccountPrefix, len(rawdb.SnapshotAccountPrefix)+common.HashLength); err != nil { + return err + } + if err := wipeKeyRange(db, "storage", rawdb.SnapshotStoragePrefix, len(rawdb.SnapshotStoragePrefix)+2*common.HashLength); err != nil { + return err + } + // Compact the snapshot section of the database to get rid of unused space + start := time.Now() + + log.Info("Compacting snapshot account area ") + end := common.CopyBytes(rawdb.SnapshotAccountPrefix) + end[len(end)-1]++ + + if err := db.Compact(rawdb.SnapshotAccountPrefix, end); err != nil { + return err + } + log.Info("Compacting snapshot storage area ") + end = common.CopyBytes(rawdb.SnapshotStoragePrefix) + end[len(end)-1]++ + + if err := db.Compact(rawdb.SnapshotStoragePrefix, end); err != nil { + return err + } + log.Info("Compacted snapshot area in database", "elapsed", common.PrettyDuration(time.Since(start))) + + return nil +} + +// wipeKeyRange deletes a range of keys from the database starting with prefix +// and having a specific total key length. +func wipeKeyRange(db ethdb.KeyValueStore, kind string, prefix []byte, keylen int) error { + // Batch deletions together to avoid holding an iterator for too long + var ( + batch = db.NewBatch() + items int + ) + // Iterate over the key-range and delete all of them + start, logged := time.Now(), time.Now() + + it := db.NewIterator(prefix, nil) + for it.Next() { + // Skip any keys with the correct prefix but wrong length (trie nodes) + key := it.Key() + if !bytes.HasPrefix(key, prefix) { + break + } + if len(key) != keylen { + continue + } + // Delete the key and periodically recreate the batch and iterator + batch.Delete(key) + items++ + + if items%10000 == 0 { + // Batch too large (or iterator too long lived, flush and recreate) + it.Release() + if err := batch.Write(); err != nil { + return err + } + batch.Reset() + seekPos := key[len(prefix):] + it = db.NewIterator(prefix, seekPos) + + if time.Since(logged) > 8*time.Second { + log.Info("Deleting state snapshot leftovers", "kind", kind, "wiped", items, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } + } + } + it.Release() + if err := batch.Write(); err != nil { + return err + } + log.Info("Deleted state snapshot leftovers", "kind", kind, "wiped", items, "elapsed", common.PrettyDuration(time.Since(start))) + return nil +} |