diff options
author | Ted Yin <[email protected]> | 2020-09-18 13:14:29 -0400 |
---|---|---|
committer | GitHub <[email protected]> | 2020-09-18 13:14:29 -0400 |
commit | d048937c48753d9eaef771bf71820cf95d79df26 (patch) | |
tree | 1a7f65fcd72e77092525ab01625b8b9d365e3e40 /core/rawdb/freezer.go | |
parent | 7d1388c743b4ec8f4a86bea95bfada785dee83f7 (diff) | |
parent | 7d8c85cf8895b0f998d8eafb02f99d5b689fcd59 (diff) |
Merge pull request #34 from ava-labs/devv0.3.0-rc.5
Dev
Diffstat (limited to 'core/rawdb/freezer.go')
-rw-r--r-- | core/rawdb/freezer.go | 141 |
1 files changed, 110 insertions, 31 deletions
diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index ce2e879..433290b 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -22,14 +22,15 @@ import ( "math" "os" "path/filepath" + "sync" "sync/atomic" "time" "github.com/ava-labs/coreth/params" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/ethdb" - "github.com/ava-labs/go-ethereum/log" - "github.com/ava-labs/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/prometheus/tsdb/fileutil" ) @@ -69,10 +70,16 @@ type freezer struct { // WARNING: The `frozen` field is accessed atomically. On 32 bit platforms, only // 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned, // so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG). - frozen uint64 // Number of blocks already frozen + frozen uint64 // Number of blocks already frozen + threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests) tables map[string]*freezerTable // Data tables for storing everything instanceLock fileutil.Releaser // File-system lock to prevent double opens + + trigger chan chan struct{} // Manual blocking freeze trigger, test determinism + + quit chan struct{} + closeOnce sync.Once } // newFreezer creates a chain freezer that moves ancient chain data into @@ -80,9 +87,9 @@ type freezer struct { func newFreezer(datadir string, namespace string) (*freezer, error) { // Create the initial freezer object var ( - readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) - writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil) - sizeCounter = metrics.NewRegisteredCounter(namespace+"ancient/size", nil) + readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) + writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil) + sizeGauge = metrics.NewRegisteredGauge(namespace+"ancient/size", nil) ) // Ensure the datadir is not a symbolic link if it exists. if info, err := os.Lstat(datadir); !os.IsNotExist(err) { @@ -99,11 +106,14 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { } // Open all the supported data tables freezer := &freezer{ + threshold: params.FullImmutabilityThreshold, tables: make(map[string]*freezerTable), instanceLock: lock, + trigger: make(chan chan struct{}), + quit: make(chan struct{}), } for name, disableSnappy := range freezerNoSnappy { - table, err := newTable(datadir, name, readMeter, writeMeter, sizeCounter, disableSnappy) + table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, disableSnappy) if err != nil { for _, table := range freezer.tables { table.Close() @@ -127,14 +137,17 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { // Close terminates the chain freezer, unmapping all the data files. func (f *freezer) Close() error { var errs []error - for _, table := range f.tables { - if err := table.Close(); err != nil { + f.closeOnce.Do(func() { + f.quit <- struct{}{} + for _, table := range f.tables { + if err := table.Close(); err != nil { + errs = append(errs, err) + } + } + if err := f.instanceLock.Release(); err != nil { errs = append(errs, err) } - } - if err := f.instanceLock.Release(); err != nil { - errs = append(errs, err) - } + }) if errs != nil { return fmt.Errorf("%v", errs) } @@ -218,7 +231,7 @@ func (f *freezer) AppendAncient(number uint64, hash, header, body, receipts, td return nil } -// Truncate discards any recent data above the provided threshold number. +// TruncateAncients discards any recent data above the provided threshold number. func (f *freezer) TruncateAncients(items uint64) error { if atomic.LoadUint64(&f.frozen) <= items { return nil @@ -232,7 +245,7 @@ func (f *freezer) TruncateAncients(items uint64) error { return nil } -// sync flushes all data tables to disk. +// Sync flushes all data tables to disk. func (f *freezer) Sync() error { var errs []error for _, table := range f.tables { @@ -254,48 +267,75 @@ func (f *freezer) Sync() error { func (f *freezer) freeze(db ethdb.KeyValueStore) { nfdb := &nofreezedb{KeyValueStore: db} + var ( + backoff bool + triggered chan struct{} // Used in tests + ) for { + select { + case <-f.quit: + log.Info("Freezer shutting down") + return + default: + } + if backoff { + // If we were doing a manual trigger, notify it + if triggered != nil { + triggered <- struct{}{} + triggered = nil + } + select { + case <-time.NewTimer(freezerRecheckInterval).C: + backoff = false + case triggered = <-f.trigger: + backoff = false + case <-f.quit: + return + } + } // Retrieve the freezing threshold. hash := ReadHeadBlockHash(nfdb) if hash == (common.Hash{}) { log.Debug("Current full block hash unavailable") // new chain, empty database - time.Sleep(freezerRecheckInterval) + backoff = true continue } number := ReadHeaderNumber(nfdb, hash) + threshold := atomic.LoadUint64(&f.threshold) + switch { case number == nil: log.Error("Current full block number unavailable", "hash", hash) - time.Sleep(freezerRecheckInterval) + backoff = true continue - case *number < params.ImmutabilityThreshold: - log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", params.ImmutabilityThreshold) - time.Sleep(freezerRecheckInterval) + case *number < threshold: + log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", threshold) + backoff = true continue - case *number-params.ImmutabilityThreshold <= f.frozen: + case *number-threshold <= f.frozen: log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen) - time.Sleep(freezerRecheckInterval) + backoff = true continue } head := ReadHeader(nfdb, hash, *number) if head == nil { log.Error("Current full block unavailable", "number", *number, "hash", hash) - time.Sleep(freezerRecheckInterval) + backoff = true continue } // Seems we have data ready to be frozen, process in usable batches - limit := *number - params.ImmutabilityThreshold + limit := *number - threshold if limit-f.frozen > freezerBatchLimit { limit = f.frozen + freezerBatchLimit } var ( start = time.Now() first = f.frozen - ancients = make([]common.Hash, 0, limit) + ancients = make([]common.Hash, 0, limit-f.frozen) ) - for f.frozen < limit { + for f.frozen <= limit { // Retrieves all the components of the canonical block hash := ReadCanonicalHash(nfdb, f.frozen) if hash == (common.Hash{}) { @@ -346,11 +386,15 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { log.Crit("Failed to delete frozen canonical blocks", "err", err) } batch.Reset() - // Wipe out side chain also. + + // Wipe out side chains also and track dangling side chians + var dangling []common.Hash for number := first; number < f.frozen; number++ { // Always keep the genesis block in active database if number != 0 { - for _, hash := range ReadAllHashes(db, number) { + dangling = ReadAllHashes(db, number) + for _, hash := range dangling { + log.Trace("Deleting side chain", "number", number, "hash", hash) DeleteBlock(batch, hash, number) } } @@ -358,6 +402,41 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { if err := batch.Write(); err != nil { log.Crit("Failed to delete frozen side blocks", "err", err) } + batch.Reset() + + // Step into the future and delete and dangling side chains + if f.frozen > 0 { + tip := f.frozen + for len(dangling) > 0 { + drop := make(map[common.Hash]struct{}) + for _, hash := range dangling { + log.Debug("Dangling parent from freezer", "number", tip-1, "hash", hash) + drop[hash] = struct{}{} + } + children := ReadAllHashes(db, tip) + for i := 0; i < len(children); i++ { + // Dig up the child and ensure it's dangling + child := ReadHeader(nfdb, children[i], tip) + if child == nil { + log.Error("Missing dangling header", "number", tip, "hash", children[i]) + continue + } + if _, ok := drop[child.ParentHash]; !ok { + children = append(children[:i], children[i+1:]...) + i-- + continue + } + // Delete all block data associated with the child + log.Debug("Deleting dangling block", "number", tip, "hash", children[i], "parent", child.ParentHash) + DeleteBlock(batch, children[i], tip) + } + dangling = children + tip++ + } + if err := batch.Write(); err != nil { + log.Crit("Failed to delete dangling side blocks", "err", err) + } + } // Log something friendly for the user context := []interface{}{ "blocks", f.frozen - first, "elapsed", common.PrettyDuration(time.Since(start)), "number", f.frozen - 1, @@ -369,7 +448,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { // Avoid database thrashing with tiny writes if f.frozen-first < freezerBatchLimit { - time.Sleep(freezerRecheckInterval) + backoff = true } } } |