diff options
author | Ted Yin <[email protected]> | 2020-09-18 13:14:29 -0400 |
---|---|---|
committer | GitHub <[email protected]> | 2020-09-18 13:14:29 -0400 |
commit | d048937c48753d9eaef771bf71820cf95d79df26 (patch) | |
tree | 1a7f65fcd72e77092525ab01625b8b9d365e3e40 /core/rawdb | |
parent | 7d1388c743b4ec8f4a86bea95bfada785dee83f7 (diff) | |
parent | 7d8c85cf8895b0f998d8eafb02f99d5b689fcd59 (diff) |
Merge pull request #34 from ava-labs/devv0.3.0-rc.5
Dev
Diffstat (limited to 'core/rawdb')
-rw-r--r-- | core/rawdb/accessors_chain.go | 238 | ||||
-rw-r--r-- | core/rawdb/accessors_indexes.go | 68 | ||||
-rw-r--r-- | core/rawdb/accessors_metadata.go | 25 | ||||
-rw-r--r-- | core/rawdb/accessors_snapshot.go | 120 | ||||
-rw-r--r-- | core/rawdb/accessors_state.go | 96 | ||||
-rw-r--r-- | core/rawdb/chain_iterator.go | 304 | ||||
-rw-r--r-- | core/rawdb/database.go | 61 | ||||
-rw-r--r-- | core/rawdb/freezer.go | 141 | ||||
-rw-r--r-- | core/rawdb/freezer_reinit.go | 127 | ||||
-rw-r--r-- | core/rawdb/freezer_table.go | 72 | ||||
-rw-r--r-- | core/rawdb/schema.go | 56 | ||||
-rw-r--r-- | core/rawdb/table.go | 93 |
12 files changed, 1112 insertions, 289 deletions
diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index fdfd6ec..cd48885 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -23,10 +23,11 @@ import ( "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/params" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/ethdb" - "github.com/ava-labs/go-ethereum/log" - "github.com/ava-labs/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" ) // ReadCanonicalHash retrieves the hash assigned to a canonical block number. @@ -68,7 +69,7 @@ func ReadAllHashes(db ethdb.Iteratee, number uint64) []common.Hash { prefix := headerKeyPrefix(number) hashes := make([]common.Hash, 0, 1) - it := db.NewIteratorWithPrefix(prefix) + it := db.NewIterator(prefix, nil) defer it.Release() for it.Next() { @@ -79,6 +80,39 @@ func ReadAllHashes(db ethdb.Iteratee, number uint64) []common.Hash { return hashes } +// ReadAllCanonicalHashes retrieves all canonical number and hash mappings at the +// certain chain range. If the accumulated entries reaches the given threshold, +// abort the iteration and return the semi-finish result. +func ReadAllCanonicalHashes(db ethdb.Iteratee, from uint64, to uint64, limit int) ([]uint64, []common.Hash) { + // Short circuit if the limit is 0. + if limit == 0 { + return nil, nil + } + var ( + numbers []uint64 + hashes []common.Hash + ) + // Construct the key prefix of start point. + start, end := headerHashKey(from), headerHashKey(to) + it := db.NewIterator(nil, start) + defer it.Release() + + for it.Next() { + if bytes.Compare(it.Key(), end) >= 0 { + break + } + if key := it.Key(); len(key) == len(headerPrefix)+8+1 && bytes.Equal(key[len(key)-1:], headerHashSuffix) { + numbers = append(numbers, binary.BigEndian.Uint64(key[len(headerPrefix):len(headerPrefix)+8])) + hashes = append(hashes, common.BytesToHash(it.Value())) + // If the accumulated entries reaches the limit threshold, return. + if len(numbers) >= limit { + break + } + } + } + return numbers, hashes +} + // ReadHeaderNumber returns the header number assigned to a hash. func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 { data, _ := db.Get(headerNumberKey(hash)) @@ -153,6 +187,32 @@ func WriteHeadFastBlockHash(db ethdb.KeyValueWriter, hash common.Hash) { } } +// ReadLastPivotNumber retrieves the number of the last pivot block. If the node +// full synced, the last pivot will always be nil. +func ReadLastPivotNumber(db ethdb.KeyValueReader) *uint64 { + data, _ := db.Get(lastPivotKey) + if len(data) == 0 { + return nil + } + var pivot uint64 + if err := rlp.DecodeBytes(data, &pivot); err != nil { + log.Error("Invalid pivot block number in database", "err", err) + return nil + } + return &pivot +} + +// WriteLastPivotNumber stores the number of the last pivot block. +func WriteLastPivotNumber(db ethdb.KeyValueWriter, pivot uint64) { + enc, err := rlp.EncodeToBytes(pivot) + if err != nil { + log.Crit("Failed to encode pivot block number", "err", err) + } + if err := db.Put(lastPivotKey, enc); err != nil { + log.Crit("Failed to store pivot block number", "err", err) + } +} + // ReadFastTrieProgress retrieves the number of tries nodes fast synced to allow // reporting correct numbers across restarts. func ReadFastTrieProgress(db ethdb.KeyValueReader) uint64 { @@ -171,20 +231,66 @@ func WriteFastTrieProgress(db ethdb.KeyValueWriter, count uint64) { } } +// ReadTxIndexTail retrieves the number of oldest indexed block +// whose transaction indices has been indexed. If the corresponding entry +// is non-existent in database it means the indexing has been finished. +func ReadTxIndexTail(db ethdb.KeyValueReader) *uint64 { + data, _ := db.Get(txIndexTailKey) + if len(data) != 8 { + return nil + } + number := binary.BigEndian.Uint64(data) + return &number +} + +// WriteTxIndexTail stores the number of oldest indexed block +// into database. +func WriteTxIndexTail(db ethdb.KeyValueWriter, number uint64) { + if err := db.Put(txIndexTailKey, encodeBlockNumber(number)); err != nil { + log.Crit("Failed to store the transaction index tail", "err", err) + } +} + +// ReadFastTxLookupLimit retrieves the tx lookup limit used in fast sync. +func ReadFastTxLookupLimit(db ethdb.KeyValueReader) *uint64 { + data, _ := db.Get(fastTxLookupLimitKey) + if len(data) != 8 { + return nil + } + number := binary.BigEndian.Uint64(data) + return &number +} + +// WriteFastTxLookupLimit stores the txlookup limit used in fast sync into database. +func WriteFastTxLookupLimit(db ethdb.KeyValueWriter, number uint64) { + if err := db.Put(fastTxLookupLimitKey, encodeBlockNumber(number)); err != nil { + log.Crit("Failed to store transaction lookup limit for fast sync", "err", err) + } +} + // ReadHeaderRLP retrieves a block header in its raw RLP database encoding. func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { + // First try to look up the data in ancient database. Extra hash + // comparison is necessary since ancient database only maintains + // the canonical data. data, _ := db.Ancient(freezerHeaderTable, number) - if len(data) == 0 { - data, _ = db.Get(headerKey(number, hash)) - // In the background freezer is moving data from leveldb to flatten files. - // So during the first check for ancient db, the data is not yet in there, - // but when we reach into leveldb, the data was already moved. That would - // result in a not found error. - if len(data) == 0 { - data, _ = db.Ancient(freezerHeaderTable, number) - } + if len(data) > 0 && crypto.Keccak256Hash(data) == hash { + return data } - return data + // Then try to look up the data in leveldb. + data, _ = db.Get(headerKey(number, hash)) + if len(data) > 0 { + return data + } + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + data, _ = db.Ancient(freezerHeaderTable, number) + if len(data) > 0 && crypto.Keccak256Hash(data) == hash { + return data + } + return nil // Can't find the data anywhere. } // HasHeader verifies the existence of a block header corresponding to the hash. @@ -251,9 +357,43 @@ func deleteHeaderWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number // ReadBodyRLP retrieves the block body (transactions and uncles) in RLP encoding. func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { + // First try to look up the data in ancient database. Extra hash + // comparison is necessary since ancient database only maintains + // the canonical data. + data, _ := db.Ancient(freezerBodiesTable, number) + if len(data) > 0 { + h, _ := db.Ancient(freezerHashTable, number) + if common.BytesToHash(h) == hash { + return data + } + } + // Then try to look up the data in leveldb. + data, _ = db.Get(blockBodyKey(number, hash)) + if len(data) > 0 { + return data + } + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + data, _ = db.Ancient(freezerBodiesTable, number) + if len(data) > 0 { + h, _ := db.Ancient(freezerHashTable, number) + if common.BytesToHash(h) == hash { + return data + } + } + return nil // Can't find the data anywhere. +} + +// ReadCanonicalBodyRLP retrieves the block body (transactions and uncles) for the canonical +// block at number, in RLP encoding. +func ReadCanonicalBodyRLP(db ethdb.Reader, number uint64) rlp.RawValue { + // If it's an ancient one, we don't need the canonical hash data, _ := db.Ancient(freezerBodiesTable, number) if len(data) == 0 { - data, _ = db.Get(blockBodyKey(number, hash)) + // Need to get the hash + data, _ = db.Get(blockBodyKey(number, ReadCanonicalHash(db, number))) // In the background freezer is moving data from leveldb to flatten files. // So during the first check for ancient db, the data is not yet in there, // but when we reach into leveldb, the data was already moved. That would @@ -315,18 +455,33 @@ func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { // ReadTdRLP retrieves a block's total difficulty corresponding to the hash in RLP encoding. func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { + // First try to look up the data in ancient database. Extra hash + // comparison is necessary since ancient database only maintains + // the canonical data. data, _ := db.Ancient(freezerDifficultyTable, number) - if len(data) == 0 { - data, _ = db.Get(headerTDKey(number, hash)) - // In the background freezer is moving data from leveldb to flatten files. - // So during the first check for ancient db, the data is not yet in there, - // but when we reach into leveldb, the data was already moved. That would - // result in a not found error. - if len(data) == 0 { - data, _ = db.Ancient(freezerDifficultyTable, number) + if len(data) > 0 { + h, _ := db.Ancient(freezerHashTable, number) + if common.BytesToHash(h) == hash { + return data } } - return data + // Then try to look up the data in leveldb. + data, _ = db.Get(headerTDKey(number, hash)) + if len(data) > 0 { + return data + } + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + data, _ = db.Ancient(freezerDifficultyTable, number) + if len(data) > 0 { + h, _ := db.Ancient(freezerHashTable, number) + if common.BytesToHash(h) == hash { + return data + } + } + return nil // Can't find the data anywhere. } // ReadTd retrieves a block's total difficulty corresponding to the hash. @@ -375,18 +530,33 @@ func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool { // ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding. func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { + // First try to look up the data in ancient database. Extra hash + // comparison is necessary since ancient database only maintains + // the canonical data. data, _ := db.Ancient(freezerReceiptTable, number) - if len(data) == 0 { - data, _ = db.Get(blockReceiptsKey(number, hash)) - // In the background freezer is moving data from leveldb to flatten files. - // So during the first check for ancient db, the data is not yet in there, - // but when we reach into leveldb, the data was already moved. That would - // result in a not found error. - if len(data) == 0 { - data, _ = db.Ancient(freezerReceiptTable, number) + if len(data) > 0 { + h, _ := db.Ancient(freezerHashTable, number) + if common.BytesToHash(h) == hash { + return data } } - return data + // Then try to look up the data in leveldb. + data, _ = db.Get(blockReceiptsKey(number, hash)) + if len(data) > 0 { + return data + } + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + data, _ = db.Ancient(freezerReceiptTable, number) + if len(data) > 0 { + h, _ := db.Ancient(freezerHashTable, number) + if common.BytesToHash(h) == hash { + return data + } + } + return nil // Can't find the data anywhere. } // ReadRawReceipts retrieves all the transaction receipts belonging to a block. diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go index 1dd478a..bf3ba07 100644 --- a/core/rawdb/accessors_indexes.go +++ b/core/rawdb/accessors_indexes.go @@ -17,14 +17,15 @@ package rawdb import ( + "bytes" "math/big" "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/params" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/ethdb" - "github.com/ava-labs/go-ethereum/log" - "github.com/ava-labs/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" ) // ReadTxLookupEntry retrieves the positional metadata associated with a transaction @@ -52,20 +53,44 @@ func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 { return &entry.BlockIndex } -// WriteTxLookupEntries stores a positional metadata for every transaction from +// writeTxLookupEntry stores a positional metadata for a transaction, +// enabling hash based transaction and receipt lookups. +func writeTxLookupEntry(db ethdb.KeyValueWriter, hash common.Hash, numberBytes []byte) { + if err := db.Put(txLookupKey(hash), numberBytes); err != nil { + log.Crit("Failed to store transaction lookup entry", "err", err) + } +} + +// WriteTxLookupEntries is identical to WriteTxLookupEntry, but it works on +// a list of hashes +func WriteTxLookupEntries(db ethdb.KeyValueWriter, number uint64, hashes []common.Hash) { + numberBytes := new(big.Int).SetUint64(number).Bytes() + for _, hash := range hashes { + writeTxLookupEntry(db, hash, numberBytes) + } +} + +// WriteTxLookupEntriesByBlock stores a positional metadata for every transaction from // a block, enabling hash based transaction and receipt lookups. -func WriteTxLookupEntries(db ethdb.KeyValueWriter, block *types.Block) { - number := block.Number().Bytes() +func WriteTxLookupEntriesByBlock(db ethdb.KeyValueWriter, block *types.Block) { + numberBytes := block.Number().Bytes() for _, tx := range block.Transactions() { - if err := db.Put(txLookupKey(tx.Hash()), number); err != nil { - log.Crit("Failed to store transaction lookup entry", "err", err) - } + writeTxLookupEntry(db, tx.Hash(), numberBytes) } } // DeleteTxLookupEntry removes all transaction data associated with a hash. func DeleteTxLookupEntry(db ethdb.KeyValueWriter, hash common.Hash) { - db.Delete(txLookupKey(hash)) + if err := db.Delete(txLookupKey(hash)); err != nil { + log.Crit("Failed to delete transaction lookup entry", "err", err) + } +} + +// DeleteTxLookupEntries removes all transaction lookups for a given block. +func DeleteTxLookupEntries(db ethdb.KeyValueWriter, hashes []common.Hash) { + for _, hash := range hashes { + DeleteTxLookupEntry(db, hash) + } } // ReadTransaction retrieves a specific transaction from the database, along with @@ -129,3 +154,24 @@ func WriteBloomBits(db ethdb.KeyValueWriter, bit uint, section uint64, head comm log.Crit("Failed to store bloom bits", "err", err) } } + +// DeleteBloombits removes all compressed bloom bits vector belonging to the +// given section range and bit index. +func DeleteBloombits(db ethdb.Database, bit uint, from uint64, to uint64) { + start, end := bloomBitsKey(bit, from, common.Hash{}), bloomBitsKey(bit, to, common.Hash{}) + it := db.NewIterator(nil, start) + defer it.Release() + + for it.Next() { + if bytes.Compare(it.Key(), end) >= 0 { + break + } + if len(it.Key()) != len(bloomBitsPrefix)+2+8+32 { + continue + } + db.Delete(it.Key()) + } + if it.Error() != nil { + log.Crit("Failed to delete bloom bits", "err", it.Error()) + } +} diff --git a/core/rawdb/accessors_metadata.go b/core/rawdb/accessors_metadata.go index 7a17123..c5e5655 100644 --- a/core/rawdb/accessors_metadata.go +++ b/core/rawdb/accessors_metadata.go @@ -20,10 +20,10 @@ import ( "encoding/json" "github.com/ava-labs/coreth/params" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/ethdb" - "github.com/ava-labs/go-ethereum/log" - "github.com/ava-labs/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" ) // ReadDatabaseVersion retrieves the version number of the database. @@ -79,20 +79,3 @@ func WriteChainConfig(db ethdb.KeyValueWriter, hash common.Hash, cfg *params.Cha log.Crit("Failed to store chain config", "err", err) } } - -// ReadPreimage retrieves a single preimage of the provided hash. -func ReadPreimage(db ethdb.KeyValueReader, hash common.Hash) []byte { - data, _ := db.Get(preimageKey(hash)) - return data -} - -// WritePreimages writes the provided set of preimages to the database. -func WritePreimages(db ethdb.KeyValueWriter, preimages map[common.Hash][]byte) { - for hash, preimage := range preimages { - if err := db.Put(preimageKey(hash), preimage); err != nil { - log.Crit("Failed to store trie preimage", "err", err) - } - } - preimageCounter.Inc(int64(len(preimages))) - preimageHitCounter.Inc(int64(len(preimages))) -} diff --git a/core/rawdb/accessors_snapshot.go b/core/rawdb/accessors_snapshot.go new file mode 100644 index 0000000..ecd4e65 --- /dev/null +++ b/core/rawdb/accessors_snapshot.go @@ -0,0 +1,120 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package rawdb + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +// ReadSnapshotRoot retrieves the root of the block whose state is contained in +// the persisted snapshot. +func ReadSnapshotRoot(db ethdb.KeyValueReader) common.Hash { + data, _ := db.Get(snapshotRootKey) + if len(data) != common.HashLength { + return common.Hash{} + } + return common.BytesToHash(data) +} + +// WriteSnapshotRoot stores the root of the block whose state is contained in +// the persisted snapshot. +func WriteSnapshotRoot(db ethdb.KeyValueWriter, root common.Hash) { + if err := db.Put(snapshotRootKey, root[:]); err != nil { + log.Crit("Failed to store snapshot root", "err", err) + } +} + +// DeleteSnapshotRoot deletes the hash of the block whose state is contained in +// the persisted snapshot. Since snapshots are not immutable, this method can +// be used during updates, so a crash or failure will mark the entire snapshot +// invalid. +func DeleteSnapshotRoot(db ethdb.KeyValueWriter) { + if err := db.Delete(snapshotRootKey); err != nil { + log.Crit("Failed to remove snapshot root", "err", err) + } +} + +// ReadAccountSnapshot retrieves the snapshot entry of an account trie leaf. +func ReadAccountSnapshot(db ethdb.KeyValueReader, hash common.Hash) []byte { + data, _ := db.Get(accountSnapshotKey(hash)) + return data +} + +// WriteAccountSnapshot stores the snapshot entry of an account trie leaf. +func WriteAccountSnapshot(db ethdb.KeyValueWriter, hash common.Hash, entry []byte) { + if err := db.Put(accountSnapshotKey(hash), entry); err != nil { + log.Crit("Failed to store account snapshot", "err", err) + } +} + +// DeleteAccountSnapshot removes the snapshot entry of an account trie leaf. +func DeleteAccountSnapshot(db ethdb.KeyValueWriter, hash common.Hash) { + if err := db.Delete(accountSnapshotKey(hash)); err != nil { + log.Crit("Failed to delete account snapshot", "err", err) + } +} + +// ReadStorageSnapshot retrieves the snapshot entry of an storage trie leaf. +func ReadStorageSnapshot(db ethdb.KeyValueReader, accountHash, storageHash common.Hash) []byte { + data, _ := db.Get(storageSnapshotKey(accountHash, storageHash)) + return data +} + +// WriteStorageSnapshot stores the snapshot entry of an storage trie leaf. +func WriteStorageSnapshot(db ethdb.KeyValueWriter, accountHash, storageHash common.Hash, entry []byte) { + if err := db.Put(storageSnapshotKey(accountHash, storageHash), entry); err != nil { + log.Crit("Failed to store storage snapshot", "err", err) + } +} + +// DeleteStorageSnapshot removes the snapshot entry of an storage trie leaf. +func DeleteStorageSnapshot(db ethdb.KeyValueWriter, accountHash, storageHash common.Hash) { + if err := db.Delete(storageSnapshotKey(accountHash, storageHash)); err != nil { + log.Crit("Failed to delete storage snapshot", "err", err) + } +} + +// IterateStorageSnapshots returns an iterator for walking the entire storage +// space of a specific account. +func IterateStorageSnapshots(db ethdb.Iteratee, accountHash common.Hash) ethdb.Iterator { + return db.NewIterator(storageSnapshotsKey(accountHash), nil) +} + +// ReadSnapshotJournal retrieves the serialized in-memory diff layers saved at +// the last shutdown. The blob is expected to be max a few 10s of megabytes. +func ReadSnapshotJournal(db ethdb.KeyValueReader) []byte { + data, _ := db.Get(snapshotJournalKey) + return data +} + +// WriteSnapshotJournal stores the serialized in-memory diff layers to save at +// shutdown. The blob is expected to be max a few 10s of megabytes. +func WriteSnapshotJournal(db ethdb.KeyValueWriter, journal []byte) { + if err := db.Put(snapshotJournalKey, journal); err != nil { + log.Crit("Failed to store snapshot journal", "err", err) + } +} + +// DeleteSnapshotJournal deletes the serialized in-memory diff layers saved at +// the last shutdown +func DeleteSnapshotJournal(db ethdb.KeyValueWriter) { + if err := db.Delete(snapshotJournalKey); err != nil { + log.Crit("Failed to remove snapshot journal", "err", err) + } +} diff --git a/core/rawdb/accessors_state.go b/core/rawdb/accessors_state.go new file mode 100644 index 0000000..6112de0 --- /dev/null +++ b/core/rawdb/accessors_state.go @@ -0,0 +1,96 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package rawdb + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +// ReadPreimage retrieves a single preimage of the provided hash. +func ReadPreimage(db ethdb.KeyValueReader, hash common.Hash) []byte { + data, _ := db.Get(preimageKey(hash)) + return data +} + +// WritePreimages writes the provided set of preimages to the database. +func WritePreimages(db ethdb.KeyValueWriter, preimages map[common.Hash][]byte) { + for hash, preimage := range preimages { + if err := db.Put(preimageKey(hash), preimage); err != nil { + log.Crit("Failed to store trie preimage", "err", err) + } + } + preimageCounter.Inc(int64(len(preimages))) + preimageHitCounter.Inc(int64(len(preimages))) +} + +// ReadCode retrieves the contract code of the provided code hash. +func ReadCode(db ethdb.KeyValueReader, hash common.Hash) []byte { + // Try with the legacy code scheme first, if not then try with current + // scheme. Since most of the code will be found with legacy scheme. + // + // todo(rjl493456442) change the order when we forcibly upgrade the code + // scheme with snapshot. + data, _ := db.Get(hash[:]) + if len(data) != 0 { + return data + } + return ReadCodeWithPrefix(db, hash) +} + +// ReadCodeWithPrefix retrieves the contract code of the provided code hash. +// The main difference between this function and ReadCode is this function +// will only check the existence with latest scheme(with prefix). +func ReadCodeWithPrefix(db ethdb.KeyValueReader, hash common.Hash) []byte { + data, _ := db.Get(codeKey(hash)) + return data +} + +// WriteCode writes the provided contract code database. +func WriteCode(db ethdb.KeyValueWriter, hash common.Hash, code []byte) { + if err := db.Put(codeKey(hash), code); err != nil { + log.Crit("Failed to store contract code", "err", err) + } +} + +// DeleteCode deletes the specified contract code from the database. +func DeleteCode(db ethdb.KeyValueWriter, hash common.Hash) { + if err := db.Delete(codeKey(hash)); err != nil { + log.Crit("Failed to delete contract code", "err", err) + } +} + +// ReadTrieNode retrieves the trie node of the provided hash. +func ReadTrieNode(db ethdb.KeyValueReader, hash common.Hash) []byte { + data, _ := db.Get(hash.Bytes()) + return data +} + +// WriteTrieNode writes the provided trie node database. +func WriteTrieNode(db ethdb.KeyValueWriter, hash common.Hash, node []byte) { + if err := db.Put(hash.Bytes(), node); err != nil { + log.Crit("Failed to store trie node", "err", err) + } +} + +// DeleteTrieNode deletes the specified trie node from the database. +func DeleteTrieNode(db ethdb.KeyValueWriter, hash common.Hash) { + if err := db.Delete(hash.Bytes()); err != nil { + log.Crit("Failed to delete trie node", "err", err) + } +} diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go new file mode 100644 index 0000000..3130e92 --- /dev/null +++ b/core/rawdb/chain_iterator.go @@ -0,0 +1,304 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package rawdb + +import ( + "runtime" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/prque" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "golang.org/x/crypto/sha3" +) + +// InitDatabaseFromFreezer reinitializes an empty database from a previous batch +// of frozen ancient blocks. The method iterates over all the frozen blocks and +// injects into the database the block hash->number mappings. +func InitDatabaseFromFreezer(db ethdb.Database) { + // If we can't access the freezer or it's empty, abort + frozen, err := db.Ancients() + if err != nil || frozen == 0 { + return + } + var ( + batch = db.NewBatch() + start = time.Now() + logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log + hash common.Hash + ) + for i := uint64(0); i < frozen; i++ { + // Since the freezer has all data in sequential order on a file, + // it would be 'neat' to read more data in one go, and let the + // freezerdb return N items (e.g up to 1000 items per go) + // That would require an API change in Ancients though + if h, err := db.Ancient(freezerHashTable, i); err != nil { + log.Crit("Failed to init database from freezer", "err", err) + } else { + hash = common.BytesToHash(h) + } + WriteHeaderNumber(batch, hash, i) + // If enough data was accumulated in memory or we're at the last block, dump to disk + if batch.ValueSize() > ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + log.Crit("Failed to write data to db", "err", err) + } + batch.Reset() + } + // If we've spent too much time already, notify the user of what we're doing + if time.Since(logged) > 8*time.Second { + log.Info("Initializing database from freezer", "total", frozen, "number", i, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } + } + if err := batch.Write(); err != nil { + log.Crit("Failed to write data to db", "err", err) + } + batch.Reset() + + WriteHeadHeaderHash(db, hash) + WriteHeadFastBlockHash(db, hash) + log.Info("Initialized database from freezer", "blocks", frozen, "elapsed", common.PrettyDuration(time.Since(start))) +} + +type blockTxHashes struct { + number uint64 + hashes []common.Hash +} + +// iterateTransactions iterates over all transactions in the (canon) block +// number(s) given, and yields the hashes on a channel +func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool) (chan *blockTxHashes, chan struct{}) { + // One thread sequentially reads data from db + type numberRlp struct { + number uint64 + rlp rlp.RawValue + } + if to == from { + return nil, nil + } + threads := to - from + if cpus := runtime.NumCPU(); threads > uint64(cpus) { + threads = uint64(cpus) + } + var ( + rlpCh = make(chan *numberRlp, threads*2) // we send raw rlp over this channel + hashesCh = make(chan *blockTxHashes, threads*2) // send hashes over hashesCh + abortCh = make(chan struct{}) + ) + // lookup runs in one instance + lookup := func() { + n, end := from, to + if reverse { + n, end = to-1, from-1 + } + defer close(rlpCh) + for n != end { + data := ReadCanonicalBodyRLP(db, n) + // Feed the block to the aggregator, or abort on interrupt + select { + case rlpCh <- &numberRlp{n, data}: + case <-abortCh: + return + } + if reverse { + n-- + } else { + n++ + } + } + } + // process runs in parallel + nThreadsAlive := int32(threads) + process := func() { + defer func() { + // Last processor closes the result channel + if atomic.AddInt32(&nThreadsAlive, -1) == 0 { + close(hashesCh) + } + }() + + var hasher = sha3.NewLegacyKeccak256() + for data := range rlpCh { + it, err := rlp.NewListIterator(data.rlp) + if err != nil { + log.Warn("tx iteration error", "error", err) + return + } + it.Next() + txs := it.Value() + txIt, err := rlp.NewListIterator(txs) + if err != nil { + log.Warn("tx iteration error", "error", err) + return + } + var hashes []common.Hash + for txIt.Next() { + if err := txIt.Err(); err != nil { + log.Warn("tx iteration error", "error", err) + return + } + var txHash common.Hash + hasher.Reset() + hasher.Write(txIt.Value()) + hasher.Sum(txHash[:0]) + hashes = append(hashes, txHash) + } + result := &blockTxHashes{ + hashes: hashes, + number: data.number, + } + // Feed the block to the aggregator, or abort on interrupt + select { + case hashesCh <- result: + case <-abortCh: + return + } + } + } + go lookup() // start the sequential db accessor + for i := 0; i < int(threads); i++ { + go process() + } + return hashesCh, abortCh +} + +// IndexTransactions creates txlookup indices of the specified block range. +// +// This function iterates canonical chain in reverse order, it has one main advantage: +// We can write tx index tail flag periodically even without the whole indexing +// procedure is finished. So that we can resume indexing procedure next time quickly. +func IndexTransactions(db ethdb.Database, from uint64, to uint64) { + // short circuit for invalid range + if from >= to { + return + } + var ( + hashesCh, abortCh = iterateTransactions(db, from, to, true) + batch = db.NewBatch() + start = time.Now() + logged = start.Add(-7 * time.Second) + // Since we iterate in reverse, we expect the first number to come + // in to be [to-1]. Therefore, setting lastNum to means that the + // prqueue gap-evaluation will work correctly + lastNum = to + queue = prque.New(nil) + // for stats reporting + blocks, txs = 0, 0 + ) + defer close(abortCh) + + for chanDelivery := range hashesCh { + // Push the delivery into the queue and process contiguous ranges. + // Since we iterate in reverse, so lower numbers have lower prio, and + // we can use the number directly as prio marker + queue.Push(chanDelivery, int64(chanDelivery.number)) + for !queue.Empty() { + // If the next available item is gapped, return + if _, priority := queue.Peek(); priority != int64(lastNum-1) { + break + } + // Next block available, pop it off and index it + delivery := queue.PopItem().(*blockTxHashes) + lastNum = delivery.number + WriteTxLookupEntries(batch, delivery.number, delivery.hashes) + blocks++ + txs += len(delivery.hashes) + // If enough data was accumulated in memory or we're at the last block, dump to disk + if batch.ValueSize() > ethdb.IdealBatchSize { + // Also write the tail there + WriteTxIndexTail(batch, lastNum) + if err := batch.Write(); err != nil { + log.Crit("Failed writing batch to db", "error", err) + return + } + batch.Reset() + } + // If we've spent too much time already, notify the user of what we're doing + if time.Since(logged) > 8*time.Second { + log.Info("Indexing transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } + } + } + if lastNum < to { + WriteTxIndexTail(batch, lastNum) + // No need to write the batch if we never entered the loop above... + if err := batch.Write(); err != nil { + log.Crit("Failed writing batch to db", "error", err) + return + } + } + log.Info("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start))) +} + +// UnindexTransactions removes txlookup indices of the specified block range. +func UnindexTransactions(db ethdb.Database, from uint64, to uint64) { + // short circuit for invalid range + if from >= to { + return + } + // Write flag first and then unindex the transaction indices. Some indices + // will be left in the database if crash happens but it's fine. + WriteTxIndexTail(db, to) + // If only one block is unindexed, do it directly + //if from+1 == to { + // data := ReadCanonicalBodyRLP(db, uint64(from)) + // DeleteTxLookupEntries(db, ReadBlock(db, ReadCanonicalHash(db, from), from)) + // log.Info("Unindexed transactions", "blocks", 1, "tail", to) + // return + //} + // TODO @holiman, add this back (if we want it) + var ( + hashesCh, abortCh = iterateTransactions(db, from, to, false) + batch = db.NewBatch() + start = time.Now() + logged = start.Add(-7 * time.Second) + ) + defer close(abortCh) + // Otherwise spin up the concurrent iterator and unindexer + blocks, txs := 0, 0 + for delivery := range hashesCh { + DeleteTxLookupEntries(batch, delivery.hashes) + txs += len(delivery.hashes) + blocks++ + + // If enough data was accumulated in memory or we're at the last block, dump to disk + // A batch counts the size of deletion as '1', so we need to flush more + // often than that. + if blocks%1000 == 0 { + if err := batch.Write(); err != nil { + log.Crit("Failed writing batch to db", "error", err) + return + } + batch.Reset() + } + // If we've spent too much time already, notify the user of what we're doing + if time.Since(logged) > 8*time.Second { + log.Info("Unindexing transactions", "blocks", blocks, "txs", txs, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } + } + if err := batch.Write(); err != nil { + log.Crit("Failed writing batch to db", "error", err) + return + } + log.Info("Unindexed transactions", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start))) +} diff --git a/core/rawdb/database.go b/core/rawdb/database.go index f04c34f..316b5ad 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -21,13 +21,14 @@ import ( "errors" "fmt" "os" + "sync/atomic" "time" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/ethdb" - "github.com/ava-labs/go-ethereum/ethdb/leveldb" - "github.com/ava-labs/go-ethereum/ethdb/memorydb" - "github.com/ava-labs/go-ethereum/log" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/ethdb/leveldb" + "github.com/ethereum/go-ethereum/ethdb/memorydb" + "github.com/ethereum/go-ethereum/log" "github.com/olekukonko/tablewriter" ) @@ -41,10 +42,10 @@ type freezerdb struct { // the slow ancient tables. func (frdb *freezerdb) Close() error { var errs []error - if err := frdb.KeyValueStore.Close(); err != nil { + if err := frdb.AncientStore.Close(); err != nil { errs = append(errs, err) } - if err := frdb.AncientStore.Close(); err != nil { + if err := frdb.KeyValueStore.Close(); err != nil { errs = append(errs, err) } if len(errs) != 0 { @@ -53,6 +54,22 @@ func (frdb *freezerdb) Close() error { return nil } +// Freeze is a helper method used for external testing to trigger and block until +// a freeze cycle completes, without having to sleep for a minute to trigger the +// automatic background run. +func (frdb *freezerdb) Freeze(threshold uint64) { + // Set the freezer threshold to a temporary value + defer func(old uint64) { + atomic.StoreUint64(&frdb.AncientStore.(*freezer).threshold, old) + }(atomic.LoadUint64(&frdb.AncientStore.(*freezer).threshold)) + atomic.StoreUint64(&frdb.AncientStore.(*freezer).threshold, threshold) + + // Trigger a freeze cycle and block until it's done + trigger := make(chan struct{}, 1) + frdb.AncientStore.(*freezer).trigger <- trigger + <-trigger +} + // nofreezedb is a database wrapper that disables freezer data retrievals. type nofreezedb struct { ethdb.KeyValueStore @@ -137,7 +154,10 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st // If the freezer already contains something, ensure that the genesis blocks // match, otherwise we might mix up freezers across chains and destroy both // the freezer and the key-value store. - if frgenesis, _ := frdb.Ancient(freezerHashTable, 0); !bytes.Equal(kvgenesis, frgenesis) { + frgenesis, err := frdb.Ancient(freezerHashTable, 0) + if err != nil { + return nil, fmt.Errorf("failed to retrieve genesis from ancient %v", err) + } else if !bytes.Equal(kvgenesis, frgenesis) { return nil, fmt.Errorf("genesis mismatch: %#x (leveldb) != %#x (ancients)", kvgenesis, frgenesis) } // Key-value store and freezer belong to the same network. Ensure that they @@ -150,11 +170,10 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st } // Database contains only older data than the freezer, this happens if the // state was wiped and reinited from an existing freezer. - } else { - // Key-value store continues where the freezer left off, all is fine. We might - // have duplicate blocks (crash after freezer write but before kay-value store - // deletion, but that's fine). } + // Otherwise, key-value store continues where the freezer left off, all is fine. + // We might have duplicate blocks (crash after freezer write but before key-value + // store deletion, but that's fine). } else { // If the freezer is empty, ensure nothing was moved yet from the key-value // store, otherwise we'll end up missing data. We check block #1 to decide @@ -167,9 +186,9 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st return nil, errors.New("ancient chain segments already extracted, please set --datadir.ancient to the correct path") } // Block #1 is still in the database, we're allowed to init a new feezer - } else { - // The head header is still the genesis, we're allowed to init a new feezer } + // Otherwise, the head header is still the genesis, we're allowed to init a new + // feezer. } } // Freezer is consistent with the key-value database, permit combining the two @@ -222,7 +241,7 @@ func NewLevelDBDatabaseWithFreezer(file string, cache int, handles int, freezer // InspectDatabase traverses the entire database and checks the size // of all different categories of data. func InspectDatabase(db ethdb.Database) error { - it := db.NewIterator() + it := db.NewIterator(nil, nil) defer it.Release() var ( @@ -239,7 +258,10 @@ func InspectDatabase(db ethdb.Database) error { numHashPairing common.StorageSize hashNumPairing common.StorageSize trieSize common.StorageSize + codeSize common.StorageSize txlookupSize common.StorageSize + accountSnapSize common.StorageSize + storageSnapSize common.StorageSize preimageSize common.StorageSize bloomBitsSize common.StorageSize cliqueSnapsSize common.StorageSize @@ -281,6 +303,10 @@ func InspectDatabase(db ethdb.Database) error { receiptSize += size case bytes.HasPrefix(key, txLookupPrefix) && len(key) == (len(txLookupPrefix)+common.HashLength): txlookupSize += size + case bytes.HasPrefix(key, SnapshotAccountPrefix) && len(key) == (len(SnapshotAccountPrefix)+common.HashLength): + accountSnapSize += size + case bytes.HasPrefix(key, SnapshotStoragePrefix) && len(key) == (len(SnapshotStoragePrefix)+2*common.HashLength): + storageSnapSize += size case bytes.HasPrefix(key, preimagePrefix) && len(key) == (len(preimagePrefix)+common.HashLength): preimageSize += size case bytes.HasPrefix(key, bloomBitsPrefix) && len(key) == (len(bloomBitsPrefix)+10+common.HashLength): @@ -291,6 +317,8 @@ func InspectDatabase(db ethdb.Database) error { chtTrieNodes += size case bytes.HasPrefix(key, []byte("blt-")) && len(key) == 4+common.HashLength: bloomTrieNodes += size + case bytes.HasPrefix(key, codePrefix) && len(key) == len(codePrefix)+common.HashLength: + codeSize += size case len(key) == common.HashLength: trieSize += size default: @@ -330,8 +358,11 @@ func InspectDatabase(db ethdb.Database) error { {"Key-Value store", "Block hash->number", hashNumPairing.String()}, {"Key-Value store", "Transaction index", txlookupSize.String()}, {"Key-Value store", "Bloombit index", bloomBitsSize.String()}, + {"Key-Value store", "Contract codes", codeSize.String()}, {"Key-Value store", "Trie nodes", trieSize.String()}, {"Key-Value store", "Trie preimages", preimageSize.String()}, + {"Key-Value store", "Account snapshot", accountSnapSize.String()}, + {"Key-Value store", "Storage snapshot", storageSnapSize.String()}, {"Key-Value store", "Clique snapshots", cliqueSnapsSize.String()}, {"Key-Value store", "Singleton metadata", metadata.String()}, {"Ancient store", "Headers", ancientHeaders.String()}, diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index ce2e879..433290b 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -22,14 +22,15 @@ import ( "math" "os" "path/filepath" + "sync" "sync/atomic" "time" "github.com/ava-labs/coreth/params" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/ethdb" - "github.com/ava-labs/go-ethereum/log" - "github.com/ava-labs/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/prometheus/tsdb/fileutil" ) @@ -69,10 +70,16 @@ type freezer struct { // WARNING: The `frozen` field is accessed atomically. On 32 bit platforms, only // 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned, // so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG). - frozen uint64 // Number of blocks already frozen + frozen uint64 // Number of blocks already frozen + threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests) tables map[string]*freezerTable // Data tables for storing everything instanceLock fileutil.Releaser // File-system lock to prevent double opens + + trigger chan chan struct{} // Manual blocking freeze trigger, test determinism + + quit chan struct{} + closeOnce sync.Once } // newFreezer creates a chain freezer that moves ancient chain data into @@ -80,9 +87,9 @@ type freezer struct { func newFreezer(datadir string, namespace string) (*freezer, error) { // Create the initial freezer object var ( - readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) - writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil) - sizeCounter = metrics.NewRegisteredCounter(namespace+"ancient/size", nil) + readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) + writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil) + sizeGauge = metrics.NewRegisteredGauge(namespace+"ancient/size", nil) ) // Ensure the datadir is not a symbolic link if it exists. if info, err := os.Lstat(datadir); !os.IsNotExist(err) { @@ -99,11 +106,14 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { } // Open all the supported data tables freezer := &freezer{ + threshold: params.FullImmutabilityThreshold, tables: make(map[string]*freezerTable), instanceLock: lock, + trigger: make(chan chan struct{}), + quit: make(chan struct{}), } for name, disableSnappy := range freezerNoSnappy { - table, err := newTable(datadir, name, readMeter, writeMeter, sizeCounter, disableSnappy) + table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, disableSnappy) if err != nil { for _, table := range freezer.tables { table.Close() @@ -127,14 +137,17 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { // Close terminates the chain freezer, unmapping all the data files. func (f *freezer) Close() error { var errs []error - for _, table := range f.tables { - if err := table.Close(); err != nil { + f.closeOnce.Do(func() { + f.quit <- struct{}{} + for _, table := range f.tables { + if err := table.Close(); err != nil { + errs = append(errs, err) + } + } + if err := f.instanceLock.Release(); err != nil { errs = append(errs, err) } - } - if err := f.instanceLock.Release(); err != nil { - errs = append(errs, err) - } + }) if errs != nil { return fmt.Errorf("%v", errs) } @@ -218,7 +231,7 @@ func (f *freezer) AppendAncient(number uint64, hash, header, body, receipts, td return nil } -// Truncate discards any recent data above the provided threshold number. +// TruncateAncients discards any recent data above the provided threshold number. func (f *freezer) TruncateAncients(items uint64) error { if atomic.LoadUint64(&f.frozen) <= items { return nil @@ -232,7 +245,7 @@ func (f *freezer) TruncateAncients(items uint64) error { return nil } -// sync flushes all data tables to disk. +// Sync flushes all data tables to disk. func (f *freezer) Sync() error { var errs []error for _, table := range f.tables { @@ -254,48 +267,75 @@ func (f *freezer) Sync() error { func (f *freezer) freeze(db ethdb.KeyValueStore) { nfdb := &nofreezedb{KeyValueStore: db} + var ( + backoff bool + triggered chan struct{} // Used in tests + ) for { + select { + case <-f.quit: + log.Info("Freezer shutting down") + return + default: + } + if backoff { + // If we were doing a manual trigger, notify it + if triggered != nil { + triggered <- struct{}{} + triggered = nil + } + select { + case <-time.NewTimer(freezerRecheckInterval).C: + backoff = false + case triggered = <-f.trigger: + backoff = false + case <-f.quit: + return + } + } // Retrieve the freezing threshold. hash := ReadHeadBlockHash(nfdb) if hash == (common.Hash{}) { log.Debug("Current full block hash unavailable") // new chain, empty database - time.Sleep(freezerRecheckInterval) + backoff = true continue } number := ReadHeaderNumber(nfdb, hash) + threshold := atomic.LoadUint64(&f.threshold) + switch { case number == nil: log.Error("Current full block number unavailable", "hash", hash) - time.Sleep(freezerRecheckInterval) + backoff = true continue - case *number < params.ImmutabilityThreshold: - log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", params.ImmutabilityThreshold) - time.Sleep(freezerRecheckInterval) + case *number < threshold: + log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", threshold) + backoff = true continue - case *number-params.ImmutabilityThreshold <= f.frozen: + case *number-threshold <= f.frozen: log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen) - time.Sleep(freezerRecheckInterval) + backoff = true continue } head := ReadHeader(nfdb, hash, *number) if head == nil { log.Error("Current full block unavailable", "number", *number, "hash", hash) - time.Sleep(freezerRecheckInterval) + backoff = true continue } // Seems we have data ready to be frozen, process in usable batches - limit := *number - params.ImmutabilityThreshold + limit := *number - threshold if limit-f.frozen > freezerBatchLimit { limit = f.frozen + freezerBatchLimit } var ( start = time.Now() first = f.frozen - ancients = make([]common.Hash, 0, limit) + ancients = make([]common.Hash, 0, limit-f.frozen) ) - for f.frozen < limit { + for f.frozen <= limit { // Retrieves all the components of the canonical block hash := ReadCanonicalHash(nfdb, f.frozen) if hash == (common.Hash{}) { @@ -346,11 +386,15 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { log.Crit("Failed to delete frozen canonical blocks", "err", err) } batch.Reset() - // Wipe out side chain also. + + // Wipe out side chains also and track dangling side chians + var dangling []common.Hash for number := first; number < f.frozen; number++ { // Always keep the genesis block in active database if number != 0 { - for _, hash := range ReadAllHashes(db, number) { + dangling = ReadAllHashes(db, number) + for _, hash := range dangling { + log.Trace("Deleting side chain", "number", number, "hash", hash) DeleteBlock(batch, hash, number) } } @@ -358,6 +402,41 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { if err := batch.Write(); err != nil { log.Crit("Failed to delete frozen side blocks", "err", err) } + batch.Reset() + + // Step into the future and delete and dangling side chains + if f.frozen > 0 { + tip := f.frozen + for len(dangling) > 0 { + drop := make(map[common.Hash]struct{}) + for _, hash := range dangling { + log.Debug("Dangling parent from freezer", "number", tip-1, "hash", hash) + drop[hash] = struct{}{} + } + children := ReadAllHashes(db, tip) + for i := 0; i < len(children); i++ { + // Dig up the child and ensure it's dangling + child := ReadHeader(nfdb, children[i], tip) + if child == nil { + log.Error("Missing dangling header", "number", tip, "hash", children[i]) + continue + } + if _, ok := drop[child.ParentHash]; !ok { + children = append(children[:i], children[i+1:]...) + i-- + continue + } + // Delete all block data associated with the child + log.Debug("Deleting dangling block", "number", tip, "hash", children[i], "parent", child.ParentHash) + DeleteBlock(batch, children[i], tip) + } + dangling = children + tip++ + } + if err := batch.Write(); err != nil { + log.Crit("Failed to delete dangling side blocks", "err", err) + } + } // Log something friendly for the user context := []interface{}{ "blocks", f.frozen - first, "elapsed", common.PrettyDuration(time.Since(start)), "number", f.frozen - 1, @@ -369,7 +448,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { // Avoid database thrashing with tiny writes if f.frozen-first < freezerBatchLimit { - time.Sleep(freezerRecheckInterval) + backoff = true } } } diff --git a/core/rawdb/freezer_reinit.go b/core/rawdb/freezer_reinit.go deleted file mode 100644 index 6b9fb79..0000000 --- a/core/rawdb/freezer_reinit.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package rawdb - -import ( - "errors" - "runtime" - "sync/atomic" - "time" - - "github.com/ava-labs/coreth/core/types" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/common/prque" - "github.com/ava-labs/go-ethereum/ethdb" - "github.com/ava-labs/go-ethereum/log" -) - -// InitDatabaseFromFreezer reinitializes an empty database from a previous batch -// of frozen ancient blocks. The method iterates over all the frozen blocks and -// injects into the database the block hash->number mappings and the transaction -// lookup entries. -func InitDatabaseFromFreezer(db ethdb.Database) error { - // If we can't access the freezer or it's empty, abort - frozen, err := db.Ancients() - if err != nil || frozen == 0 { - return err - } - // Blocks previously frozen, iterate over- and hash them concurrently - var ( - number = ^uint64(0) // -1 - results = make(chan *types.Block, 4*runtime.NumCPU()) - ) - abort := make(chan struct{}) - defer close(abort) - - for i := 0; i < runtime.NumCPU(); i++ { - go func() { - for { - // Fetch the next task number, terminating if everything's done - n := atomic.AddUint64(&number, 1) - if n >= frozen { - return - } - // Retrieve the block from the freezer (no need for the hash, we pull by - // number from the freezer). If successful, pre-cache the block hash and - // the individual transaction hashes for storing into the database. - block := ReadBlock(db, common.Hash{}, n) - if block != nil { - block.Hash() - for _, tx := range block.Transactions() { - tx.Hash() - } - } - // Feed the block to the aggregator, or abort on interrupt - select { - case results <- block: - case <-abort: - return - } - } - }() - } - // Reassemble the blocks into a contiguous stream and push them out to disk - var ( - queue = prque.New(nil) - next = int64(0) - - batch = db.NewBatch() - start = time.Now() - logged time.Time - ) - for i := uint64(0); i < frozen; i++ { - // Retrieve the next result and bail if it's nil - block := <-results - if block == nil { - return errors.New("broken ancient database") - } - // Push the block into the import queue and process contiguous ranges - queue.Push(block, -int64(block.NumberU64())) - for !queue.Empty() { - // If the next available item is gapped, return - if _, priority := queue.Peek(); -priority != next { - break - } - // Next block available, pop it off and index it - block = queue.PopItem().(*types.Block) - next++ - - // Inject hash<->number mapping and txlookup indexes - WriteHeaderNumber(batch, block.Hash(), block.NumberU64()) - WriteTxLookupEntries(batch, block) - - // If enough data was accumulated in memory or we're at the last block, dump to disk - if batch.ValueSize() > ethdb.IdealBatchSize || uint64(next) == frozen { - if err := batch.Write(); err != nil { - return err - } - batch.Reset() - } - // If we've spent too much time already, notify the user of what we're doing - if time.Since(logged) > 8*time.Second { - log.Info("Initializing chain from ancient data", "number", block.Number(), "hash", block.Hash(), "total", frozen-1, "elapsed", common.PrettyDuration(time.Since(start))) - logged = time.Now() - } - } - } - hash := ReadCanonicalHash(db, frozen-1) - WriteHeadHeaderHash(db, hash) - WriteHeadFastBlockHash(db, hash) - - log.Info("Initialized chain from ancient data", "number", frozen-1, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start))) - return nil -} diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index fc72669..b9d8a27 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -26,9 +26,9 @@ import ( "sync" "sync/atomic" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/log" - "github.com/ava-labs/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/golang/snappy" ) @@ -94,18 +94,18 @@ type freezerTable struct { // to count how many historic items have gone missing. itemOffset uint32 // Offset (number of discarded items) - headBytes uint32 // Number of bytes written to the head file - readMeter metrics.Meter // Meter for measuring the effective amount of data read - writeMeter metrics.Meter // Meter for measuring the effective amount of data written - sizeCounter metrics.Counter // Counter for tracking the combined size of all freezer tables + headBytes uint32 // Number of bytes written to the head file + readMeter metrics.Meter // Meter for measuring the effective amount of data read + writeMeter metrics.Meter // Meter for measuring the effective amount of data written + sizeGauge metrics.Gauge // Gauge for tracking the combined size of all freezer tables logger log.Logger // Logger with database path and table name ambedded lock sync.RWMutex // Mutex protecting the data file descriptors } // newTable opens a freezer table with default settings - 2G files -func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeCounter metrics.Counter, disableSnappy bool) (*freezerTable, error) { - return newCustomTable(path, name, readMeter, writeMeter, sizeCounter, 2*1000*1000*1000, disableSnappy) +func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, disableSnappy bool) (*freezerTable, error) { + return newCustomTable(path, name, readMeter, writeMeter, sizeGauge, 2*1000*1000*1000, disableSnappy) } // openFreezerFileForAppend opens a freezer table file and seeks to the end @@ -149,7 +149,7 @@ func truncateFreezerFile(file *os.File, size int64) error { // newCustomTable opens a freezer table, creating the data and index files if they are // non existent. Both files are truncated to the shortest common length to ensure // they don't go out of sync. -func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeCounter metrics.Counter, maxFilesize uint32, noCompression bool) (*freezerTable, error) { +func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression bool) (*freezerTable, error) { // Ensure the containing directory exists and open the indexEntry file if err := os.MkdirAll(path, 0755); err != nil { return nil, err @@ -172,7 +172,7 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete files: make(map[uint32]*os.File), readMeter: readMeter, writeMeter: writeMeter, - sizeCounter: sizeCounter, + sizeGauge: sizeGauge, name: name, path: path, logger: log.New("database", path, "table", name), @@ -189,7 +189,7 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete tab.Close() return nil, err } - tab.sizeCounter.Inc(int64(size)) + tab.sizeGauge.Inc(int64(size)) return tab, nil } @@ -232,8 +232,8 @@ func (t *freezerTable) repair() error { t.index.ReadAt(buffer, 0) firstIndex.unmarshalBinary(buffer) - t.tailId = firstIndex.offset - t.itemOffset = firstIndex.filenum + t.tailId = firstIndex.filenum + t.itemOffset = firstIndex.offset t.index.ReadAt(buffer, offsetsSize-indexEntrySize) lastIndex.unmarshalBinary(buffer) @@ -330,7 +330,8 @@ func (t *freezerTable) truncate(items uint64) error { defer t.lock.Unlock() // If our item count is correct, don't do anything - if atomic.LoadUint64(&t.items) <= items { + existing := atomic.LoadUint64(&t.items) + if existing <= items { return nil } // We need to truncate, save the old size for metrics tracking @@ -339,7 +340,11 @@ func (t *freezerTable) truncate(items uint64) error { return err } // Something's out of sync, truncate the table's offset index - t.logger.Warn("Truncating freezer table", "items", t.items, "limit", items) + log := t.logger.Debug + if existing > items+1 { + log = t.logger.Warn // Only loud warn if we delete multiple items + } + log("Truncating freezer table", "items", existing, "limit", items) if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil { return err } @@ -378,7 +383,7 @@ func (t *freezerTable) truncate(items uint64) error { if err != nil { return err } - t.sizeCounter.Dec(int64(oldSize - newSize)) + t.sizeGauge.Dec(int64(oldSize - newSize)) return nil } @@ -510,7 +515,7 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { t.index.Write(idx.marshallBinary()) t.writeMeter.Mark(int64(bLen + indexEntrySize)) - t.sizeCounter.Inc(int64(bLen + indexEntrySize)) + t.sizeGauge.Inc(int64(bLen + indexEntrySize)) atomic.AddUint64(&t.items, 1) return nil @@ -519,16 +524,27 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { // getBounds returns the indexes for the item // returns start, end, filenumber and error func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) { - var startIdx, endIdx indexEntry buffer := make([]byte, indexEntrySize) - if _, err := t.index.ReadAt(buffer, int64(item*indexEntrySize)); err != nil { - return 0, 0, 0, err - } - startIdx.unmarshalBinary(buffer) + var startIdx, endIdx indexEntry + // Read second index if _, err := t.index.ReadAt(buffer, int64((item+1)*indexEntrySize)); err != nil { return 0, 0, 0, err } endIdx.unmarshalBinary(buffer) + // Read first index (unless it's the very first item) + if item != 0 { + if _, err := t.index.ReadAt(buffer, int64(item*indexEntrySize)); err != nil { + return 0, 0, 0, err + } + startIdx.unmarshalBinary(buffer) + } else { + // Special case if we're reading the first item in the freezer. We assume that + // the first item always start from zero(regarding the deletion, we + // only support deletion by files, so that the assumption is held). + // This means we can use the first item metadata to carry information about + // the 'global' offset, for the deletion-case + return 0, endIdx.offset, endIdx.filenum, nil + } if startIdx.filenum != endIdx.filenum { // If a piece of data 'crosses' a data-file, // it's actually in one piece on the second data-file. @@ -541,20 +557,22 @@ func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) { // Retrieve looks up the data offset of an item with the given number and retrieves // the raw binary blob from the data file. func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { + t.lock.RLock() // Ensure the table and the item is accessible if t.index == nil || t.head == nil { + t.lock.RUnlock() return nil, errClosed } if atomic.LoadUint64(&t.items) <= item { + t.lock.RUnlock() return nil, errOutOfBounds } // Ensure the item was not deleted from the tail either - offset := atomic.LoadUint32(&t.itemOffset) - if uint64(offset) > item { + if uint64(t.itemOffset) > item { + t.lock.RUnlock() return nil, errOutOfBounds } - t.lock.RLock() - startOffset, endOffset, filenum, err := t.getBounds(item - uint64(offset)) + startOffset, endOffset, filenum, err := t.getBounds(item - uint64(t.itemOffset)) if err != nil { t.lock.RUnlock() return nil, err diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 51d1f66..e2b093a 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -18,10 +18,11 @@ package rawdb import ( + "bytes" "encoding/binary" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/metrics" ) // The fields below define the low level database schema prefixing. @@ -38,9 +39,24 @@ var ( // headFastBlockKey tracks the latest known incomplete block's hash during fast sync. headFastBlockKey = []byte("LastFast") + // lastPivotKey tracks the last pivot block used by fast sync (to reenable on sethead). + lastPivotKey = []byte("LastPivot") + // fastTrieProgressKey tracks the number of trie entries imported during fast sync. fastTrieProgressKey = []byte("TrieSync") + // snapshotRootKey tracks the hash of the last snapshot. + snapshotRootKey = []byte("SnapshotRoot") + + // snapshotJournalKey tracks the in-memory diff layers across restarts. + snapshotJournalKey = []byte("SnapshotJournal") + + // txIndexTailKey tracks the oldest block whose transactions have been indexed. + txIndexTailKey = []byte("TransactionIndexTail") + + // fastTxLookupLimitKey tracks the transaction lookup limit during fast sync. + fastTxLookupLimitKey = []byte("FastTransactionLookupLimit") + // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td @@ -50,8 +66,11 @@ var ( blockBodyPrefix = []byte("b") // blockBodyPrefix + num (uint64 big endian) + hash -> block body blockReceiptsPrefix = []byte("r") // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts - txLookupPrefix = []byte("l") // txLookupPrefix + hash -> transaction/receipt lookup metadata - bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash -> bloom bits + txLookupPrefix = []byte("l") // txLookupPrefix + hash -> transaction/receipt lookup metadata + bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash -> bloom bits + SnapshotAccountPrefix = []byte("a") // SnapshotAccountPrefix + account hash -> account trie value + SnapshotStoragePrefix = []byte("o") // SnapshotStoragePrefix + account hash + storage hash -> storage trie value + codePrefix = []byte("c") // codePrefix + code hash -> account code preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db @@ -145,6 +164,21 @@ func txLookupKey(hash common.Hash) []byte { return append(txLookupPrefix, hash.Bytes()...) } +// accountSnapshotKey = SnapshotAccountPrefix + hash +func accountSnapshotKey(hash common.Hash) []byte { + return append(SnapshotAccountPrefix, hash.Bytes()...) +} + +// storageSnapshotKey = SnapshotStoragePrefix + account hash + storage hash +func storageSnapshotKey(accountHash, storageHash common.Hash) []byte { + return append(append(SnapshotStoragePrefix, accountHash.Bytes()...), storageHash.Bytes()...) +} + +// storageSnapshotsKey = SnapshotStoragePrefix + account hash + storage hash +func storageSnapshotsKey(accountHash common.Hash) []byte { + return append(SnapshotStoragePrefix, accountHash.Bytes()...) +} + // bloomBitsKey = bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash func bloomBitsKey(bit uint, section uint64, hash common.Hash) []byte { key := append(append(bloomBitsPrefix, make([]byte, 10)...), hash.Bytes()...) @@ -160,6 +194,20 @@ func preimageKey(hash common.Hash) []byte { return append(preimagePrefix, hash.Bytes()...) } +// codeKey = codePrefix + hash +func codeKey(hash common.Hash) []byte { + return append(codePrefix, hash.Bytes()...) +} + +// IsCodeKey reports whether the given byte slice is the key of contract code, +// if so return the raw code hash as well. +func IsCodeKey(key []byte) (bool, []byte) { + if bytes.HasPrefix(key, codePrefix) && len(key) == common.HashLength+len(codePrefix) { + return true, key[len(codePrefix):] + } + return false, nil +} + // configKey = configPrefix + hash func configKey(hash common.Hash) []byte { return append(configPrefix, hash.Bytes()...) diff --git a/core/rawdb/table.go b/core/rawdb/table.go index f9078e8..323ef62 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -17,7 +17,7 @@ package rawdb import ( - "github.com/ava-labs/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/ethdb" ) // table is a wrapper around a database that prefixes each key access with a pre- @@ -103,23 +103,16 @@ func (t *table) Delete(key []byte) error { return t.db.Delete(append([]byte(t.prefix), key...)) } -// NewIterator creates a binary-alphabetical iterator over the entire keyspace -// contained within the database. -func (t *table) NewIterator() ethdb.Iterator { - return t.NewIteratorWithPrefix(nil) -} - -// NewIteratorWithStart creates a binary-alphabetical iterator over a subset of -// database content starting at a particular initial key (or after, if it does -// not exist). -func (t *table) NewIteratorWithStart(start []byte) ethdb.Iterator { - return t.db.NewIteratorWithStart(start) -} - -// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset -// of database content with a particular key prefix. -func (t *table) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator { - return t.db.NewIteratorWithPrefix(append([]byte(t.prefix), prefix...)) +// NewIterator creates a binary-alphabetical iterator over a subset +// of database content with a particular key prefix, starting at a particular +// initial key (or after, if it does not exist). +func (t *table) NewIterator(prefix []byte, start []byte) ethdb.Iterator { + innerPrefix := append([]byte(t.prefix), prefix...) + iter := t.db.NewIterator(innerPrefix, start) + return &tableIterator{ + iter: iter, + prefix: t.prefix, + } } // Stat returns a particular internal stat of the database. @@ -198,7 +191,69 @@ func (b *tableBatch) Reset() { b.batch.Reset() } +// tableReplayer is a wrapper around a batch replayer which truncates +// the added prefix. +type tableReplayer struct { + w ethdb.KeyValueWriter + prefix string +} + +// Put implements the interface KeyValueWriter. +func (r *tableReplayer) Put(key []byte, value []byte) error { + trimmed := key[len(r.prefix):] + return r.w.Put(trimmed, value) +} + +// Delete implements the interface KeyValueWriter. +func (r *tableReplayer) Delete(key []byte) error { + trimmed := key[len(r.prefix):] + return r.w.Delete(trimmed) +} + // Replay replays the batch contents. func (b *tableBatch) Replay(w ethdb.KeyValueWriter) error { - return b.batch.Replay(w) + return b.batch.Replay(&tableReplayer{w: w, prefix: b.prefix}) +} + +// tableIterator is a wrapper around a database iterator that prefixes each key access +// with a pre-configured string. +type tableIterator struct { + iter ethdb.Iterator + prefix string +} + +// Next moves the iterator to the next key/value pair. It returns whether the +// iterator is exhausted. +func (iter *tableIterator) Next() bool { + return iter.iter.Next() +} + +// Error returns any accumulated error. Exhausting all the key/value pairs +// is not considered to be an error. +func (iter *tableIterator) Error() error { + return iter.iter.Error() +} + +// Key returns the key of the current key/value pair, or nil if done. The caller +// should not modify the contents of the returned slice, and its contents may +// change on the next call to Next. +func (iter *tableIterator) Key() []byte { + key := iter.iter.Key() + if key == nil { + return nil + } + return key[len(iter.prefix):] +} + +// Value returns the value of the current key/value pair, or nil if done. The +// caller should not modify the contents of the returned slice, and its contents +// may change on the next call to Next. +func (iter *tableIterator) Value() []byte { + return iter.iter.Value() +} + +// Release releases associated resources. Release should always succeed and can +// be called multiple times without causing error. +func (iter *tableIterator) Release() { + iter.iter.Release() } |