aboutsummaryrefslogtreecommitdiff
path: root/core/rawdb/freezer.go
diff options
context:
space:
mode:
authorTed Yin <[email protected]>2020-09-18 13:14:29 -0400
committerGitHub <[email protected]>2020-09-18 13:14:29 -0400
commitd048937c48753d9eaef771bf71820cf95d79df26 (patch)
tree1a7f65fcd72e77092525ab01625b8b9d365e3e40 /core/rawdb/freezer.go
parent7d1388c743b4ec8f4a86bea95bfada785dee83f7 (diff)
parent7d8c85cf8895b0f998d8eafb02f99d5b689fcd59 (diff)
Merge pull request #34 from ava-labs/devv0.3.0-rc.5
Dev
Diffstat (limited to 'core/rawdb/freezer.go')
-rw-r--r--core/rawdb/freezer.go141
1 files changed, 110 insertions, 31 deletions
diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go
index ce2e879..433290b 100644
--- a/core/rawdb/freezer.go
+++ b/core/rawdb/freezer.go
@@ -22,14 +22,15 @@ import (
"math"
"os"
"path/filepath"
+ "sync"
"sync/atomic"
"time"
"github.com/ava-labs/coreth/params"
- "github.com/ava-labs/go-ethereum/common"
- "github.com/ava-labs/go-ethereum/ethdb"
- "github.com/ava-labs/go-ethereum/log"
- "github.com/ava-labs/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/prometheus/tsdb/fileutil"
)
@@ -69,10 +70,16 @@ type freezer struct {
// WARNING: The `frozen` field is accessed atomically. On 32 bit platforms, only
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
- frozen uint64 // Number of blocks already frozen
+ frozen uint64 // Number of blocks already frozen
+ threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)
tables map[string]*freezerTable // Data tables for storing everything
instanceLock fileutil.Releaser // File-system lock to prevent double opens
+
+ trigger chan chan struct{} // Manual blocking freeze trigger, test determinism
+
+ quit chan struct{}
+ closeOnce sync.Once
}
// newFreezer creates a chain freezer that moves ancient chain data into
@@ -80,9 +87,9 @@ type freezer struct {
func newFreezer(datadir string, namespace string) (*freezer, error) {
// Create the initial freezer object
var (
- readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
- writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil)
- sizeCounter = metrics.NewRegisteredCounter(namespace+"ancient/size", nil)
+ readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
+ writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil)
+ sizeGauge = metrics.NewRegisteredGauge(namespace+"ancient/size", nil)
)
// Ensure the datadir is not a symbolic link if it exists.
if info, err := os.Lstat(datadir); !os.IsNotExist(err) {
@@ -99,11 +106,14 @@ func newFreezer(datadir string, namespace string) (*freezer, error) {
}
// Open all the supported data tables
freezer := &freezer{
+ threshold: params.FullImmutabilityThreshold,
tables: make(map[string]*freezerTable),
instanceLock: lock,
+ trigger: make(chan chan struct{}),
+ quit: make(chan struct{}),
}
for name, disableSnappy := range freezerNoSnappy {
- table, err := newTable(datadir, name, readMeter, writeMeter, sizeCounter, disableSnappy)
+ table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, disableSnappy)
if err != nil {
for _, table := range freezer.tables {
table.Close()
@@ -127,14 +137,17 @@ func newFreezer(datadir string, namespace string) (*freezer, error) {
// Close terminates the chain freezer, unmapping all the data files.
func (f *freezer) Close() error {
var errs []error
- for _, table := range f.tables {
- if err := table.Close(); err != nil {
+ f.closeOnce.Do(func() {
+ f.quit <- struct{}{}
+ for _, table := range f.tables {
+ if err := table.Close(); err != nil {
+ errs = append(errs, err)
+ }
+ }
+ if err := f.instanceLock.Release(); err != nil {
errs = append(errs, err)
}
- }
- if err := f.instanceLock.Release(); err != nil {
- errs = append(errs, err)
- }
+ })
if errs != nil {
return fmt.Errorf("%v", errs)
}
@@ -218,7 +231,7 @@ func (f *freezer) AppendAncient(number uint64, hash, header, body, receipts, td
return nil
}
-// Truncate discards any recent data above the provided threshold number.
+// TruncateAncients discards any recent data above the provided threshold number.
func (f *freezer) TruncateAncients(items uint64) error {
if atomic.LoadUint64(&f.frozen) <= items {
return nil
@@ -232,7 +245,7 @@ func (f *freezer) TruncateAncients(items uint64) error {
return nil
}
-// sync flushes all data tables to disk.
+// Sync flushes all data tables to disk.
func (f *freezer) Sync() error {
var errs []error
for _, table := range f.tables {
@@ -254,48 +267,75 @@ func (f *freezer) Sync() error {
func (f *freezer) freeze(db ethdb.KeyValueStore) {
nfdb := &nofreezedb{KeyValueStore: db}
+ var (
+ backoff bool
+ triggered chan struct{} // Used in tests
+ )
for {
+ select {
+ case <-f.quit:
+ log.Info("Freezer shutting down")
+ return
+ default:
+ }
+ if backoff {
+ // If we were doing a manual trigger, notify it
+ if triggered != nil {
+ triggered <- struct{}{}
+ triggered = nil
+ }
+ select {
+ case <-time.NewTimer(freezerRecheckInterval).C:
+ backoff = false
+ case triggered = <-f.trigger:
+ backoff = false
+ case <-f.quit:
+ return
+ }
+ }
// Retrieve the freezing threshold.
hash := ReadHeadBlockHash(nfdb)
if hash == (common.Hash{}) {
log.Debug("Current full block hash unavailable") // new chain, empty database
- time.Sleep(freezerRecheckInterval)
+ backoff = true
continue
}
number := ReadHeaderNumber(nfdb, hash)
+ threshold := atomic.LoadUint64(&f.threshold)
+
switch {
case number == nil:
log.Error("Current full block number unavailable", "hash", hash)
- time.Sleep(freezerRecheckInterval)
+ backoff = true
continue
- case *number < params.ImmutabilityThreshold:
- log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", params.ImmutabilityThreshold)
- time.Sleep(freezerRecheckInterval)
+ case *number < threshold:
+ log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", threshold)
+ backoff = true
continue
- case *number-params.ImmutabilityThreshold <= f.frozen:
+ case *number-threshold <= f.frozen:
log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen)
- time.Sleep(freezerRecheckInterval)
+ backoff = true
continue
}
head := ReadHeader(nfdb, hash, *number)
if head == nil {
log.Error("Current full block unavailable", "number", *number, "hash", hash)
- time.Sleep(freezerRecheckInterval)
+ backoff = true
continue
}
// Seems we have data ready to be frozen, process in usable batches
- limit := *number - params.ImmutabilityThreshold
+ limit := *number - threshold
if limit-f.frozen > freezerBatchLimit {
limit = f.frozen + freezerBatchLimit
}
var (
start = time.Now()
first = f.frozen
- ancients = make([]common.Hash, 0, limit)
+ ancients = make([]common.Hash, 0, limit-f.frozen)
)
- for f.frozen < limit {
+ for f.frozen <= limit {
// Retrieves all the components of the canonical block
hash := ReadCanonicalHash(nfdb, f.frozen)
if hash == (common.Hash{}) {
@@ -346,11 +386,15 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
log.Crit("Failed to delete frozen canonical blocks", "err", err)
}
batch.Reset()
- // Wipe out side chain also.
+
+ // Wipe out side chains also and track dangling side chians
+ var dangling []common.Hash
for number := first; number < f.frozen; number++ {
// Always keep the genesis block in active database
if number != 0 {
- for _, hash := range ReadAllHashes(db, number) {
+ dangling = ReadAllHashes(db, number)
+ for _, hash := range dangling {
+ log.Trace("Deleting side chain", "number", number, "hash", hash)
DeleteBlock(batch, hash, number)
}
}
@@ -358,6 +402,41 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
if err := batch.Write(); err != nil {
log.Crit("Failed to delete frozen side blocks", "err", err)
}
+ batch.Reset()
+
+ // Step into the future and delete and dangling side chains
+ if f.frozen > 0 {
+ tip := f.frozen
+ for len(dangling) > 0 {
+ drop := make(map[common.Hash]struct{})
+ for _, hash := range dangling {
+ log.Debug("Dangling parent from freezer", "number", tip-1, "hash", hash)
+ drop[hash] = struct{}{}
+ }
+ children := ReadAllHashes(db, tip)
+ for i := 0; i < len(children); i++ {
+ // Dig up the child and ensure it's dangling
+ child := ReadHeader(nfdb, children[i], tip)
+ if child == nil {
+ log.Error("Missing dangling header", "number", tip, "hash", children[i])
+ continue
+ }
+ if _, ok := drop[child.ParentHash]; !ok {
+ children = append(children[:i], children[i+1:]...)
+ i--
+ continue
+ }
+ // Delete all block data associated with the child
+ log.Debug("Deleting dangling block", "number", tip, "hash", children[i], "parent", child.ParentHash)
+ DeleteBlock(batch, children[i], tip)
+ }
+ dangling = children
+ tip++
+ }
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to delete dangling side blocks", "err", err)
+ }
+ }
// Log something friendly for the user
context := []interface{}{
"blocks", f.frozen - first, "elapsed", common.PrettyDuration(time.Since(start)), "number", f.frozen - 1,
@@ -369,7 +448,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
// Avoid database thrashing with tiny writes
if f.frozen-first < freezerBatchLimit {
- time.Sleep(freezerRecheckInterval)
+ backoff = true
}
}
}