diff options
Diffstat (limited to 'core/rawdb/freezer_table.go')
-rw-r--r-- | core/rawdb/freezer_table.go | 72 |
1 files changed, 45 insertions, 27 deletions
diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index fc72669..b9d8a27 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -26,9 +26,9 @@ import ( "sync" "sync/atomic" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/log" - "github.com/ava-labs/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/golang/snappy" ) @@ -94,18 +94,18 @@ type freezerTable struct { // to count how many historic items have gone missing. itemOffset uint32 // Offset (number of discarded items) - headBytes uint32 // Number of bytes written to the head file - readMeter metrics.Meter // Meter for measuring the effective amount of data read - writeMeter metrics.Meter // Meter for measuring the effective amount of data written - sizeCounter metrics.Counter // Counter for tracking the combined size of all freezer tables + headBytes uint32 // Number of bytes written to the head file + readMeter metrics.Meter // Meter for measuring the effective amount of data read + writeMeter metrics.Meter // Meter for measuring the effective amount of data written + sizeGauge metrics.Gauge // Gauge for tracking the combined size of all freezer tables logger log.Logger // Logger with database path and table name ambedded lock sync.RWMutex // Mutex protecting the data file descriptors } // newTable opens a freezer table with default settings - 2G files -func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeCounter metrics.Counter, disableSnappy bool) (*freezerTable, error) { - return newCustomTable(path, name, readMeter, writeMeter, sizeCounter, 2*1000*1000*1000, disableSnappy) +func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, disableSnappy bool) (*freezerTable, error) { + return newCustomTable(path, name, readMeter, writeMeter, sizeGauge, 2*1000*1000*1000, disableSnappy) } // openFreezerFileForAppend opens a freezer table file and seeks to the end @@ -149,7 +149,7 @@ func truncateFreezerFile(file *os.File, size int64) error { // newCustomTable opens a freezer table, creating the data and index files if they are // non existent. Both files are truncated to the shortest common length to ensure // they don't go out of sync. -func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeCounter metrics.Counter, maxFilesize uint32, noCompression bool) (*freezerTable, error) { +func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression bool) (*freezerTable, error) { // Ensure the containing directory exists and open the indexEntry file if err := os.MkdirAll(path, 0755); err != nil { return nil, err @@ -172,7 +172,7 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete files: make(map[uint32]*os.File), readMeter: readMeter, writeMeter: writeMeter, - sizeCounter: sizeCounter, + sizeGauge: sizeGauge, name: name, path: path, logger: log.New("database", path, "table", name), @@ -189,7 +189,7 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete tab.Close() return nil, err } - tab.sizeCounter.Inc(int64(size)) + tab.sizeGauge.Inc(int64(size)) return tab, nil } @@ -232,8 +232,8 @@ func (t *freezerTable) repair() error { t.index.ReadAt(buffer, 0) firstIndex.unmarshalBinary(buffer) - t.tailId = firstIndex.offset - t.itemOffset = firstIndex.filenum + t.tailId = firstIndex.filenum + t.itemOffset = firstIndex.offset t.index.ReadAt(buffer, offsetsSize-indexEntrySize) lastIndex.unmarshalBinary(buffer) @@ -330,7 +330,8 @@ func (t *freezerTable) truncate(items uint64) error { defer t.lock.Unlock() // If our item count is correct, don't do anything - if atomic.LoadUint64(&t.items) <= items { + existing := atomic.LoadUint64(&t.items) + if existing <= items { return nil } // We need to truncate, save the old size for metrics tracking @@ -339,7 +340,11 @@ func (t *freezerTable) truncate(items uint64) error { return err } // Something's out of sync, truncate the table's offset index - t.logger.Warn("Truncating freezer table", "items", t.items, "limit", items) + log := t.logger.Debug + if existing > items+1 { + log = t.logger.Warn // Only loud warn if we delete multiple items + } + log("Truncating freezer table", "items", existing, "limit", items) if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil { return err } @@ -378,7 +383,7 @@ func (t *freezerTable) truncate(items uint64) error { if err != nil { return err } - t.sizeCounter.Dec(int64(oldSize - newSize)) + t.sizeGauge.Dec(int64(oldSize - newSize)) return nil } @@ -510,7 +515,7 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { t.index.Write(idx.marshallBinary()) t.writeMeter.Mark(int64(bLen + indexEntrySize)) - t.sizeCounter.Inc(int64(bLen + indexEntrySize)) + t.sizeGauge.Inc(int64(bLen + indexEntrySize)) atomic.AddUint64(&t.items, 1) return nil @@ -519,16 +524,27 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { // getBounds returns the indexes for the item // returns start, end, filenumber and error func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) { - var startIdx, endIdx indexEntry buffer := make([]byte, indexEntrySize) - if _, err := t.index.ReadAt(buffer, int64(item*indexEntrySize)); err != nil { - return 0, 0, 0, err - } - startIdx.unmarshalBinary(buffer) + var startIdx, endIdx indexEntry + // Read second index if _, err := t.index.ReadAt(buffer, int64((item+1)*indexEntrySize)); err != nil { return 0, 0, 0, err } endIdx.unmarshalBinary(buffer) + // Read first index (unless it's the very first item) + if item != 0 { + if _, err := t.index.ReadAt(buffer, int64(item*indexEntrySize)); err != nil { + return 0, 0, 0, err + } + startIdx.unmarshalBinary(buffer) + } else { + // Special case if we're reading the first item in the freezer. We assume that + // the first item always start from zero(regarding the deletion, we + // only support deletion by files, so that the assumption is held). + // This means we can use the first item metadata to carry information about + // the 'global' offset, for the deletion-case + return 0, endIdx.offset, endIdx.filenum, nil + } if startIdx.filenum != endIdx.filenum { // If a piece of data 'crosses' a data-file, // it's actually in one piece on the second data-file. @@ -541,20 +557,22 @@ func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) { // Retrieve looks up the data offset of an item with the given number and retrieves // the raw binary blob from the data file. func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { + t.lock.RLock() // Ensure the table and the item is accessible if t.index == nil || t.head == nil { + t.lock.RUnlock() return nil, errClosed } if atomic.LoadUint64(&t.items) <= item { + t.lock.RUnlock() return nil, errOutOfBounds } // Ensure the item was not deleted from the tail either - offset := atomic.LoadUint32(&t.itemOffset) - if uint64(offset) > item { + if uint64(t.itemOffset) > item { + t.lock.RUnlock() return nil, errOutOfBounds } - t.lock.RLock() - startOffset, endOffset, filenum, err := t.getBounds(item - uint64(offset)) + startOffset, endOffset, filenum, err := t.getBounds(item - uint64(t.itemOffset)) if err != nil { t.lock.RUnlock() return nil, err |