Skip to content

Commit

Permalink
localstore reserve logic
Browse files Browse the repository at this point in the history
  • Loading branch information
zelig committed Mar 15, 2021
1 parent 435d47b commit 9b978c0
Show file tree
Hide file tree
Showing 14 changed files with 850 additions and 646 deletions.
154 changes: 23 additions & 131 deletions pkg/localstore/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package localstore

import (
"errors"
"fmt"
"time"

"github.com/ethersphere/bee/pkg/shed"
Expand All @@ -40,20 +39,20 @@ var (
gcBatchSize uint64 = 200
)

// collectGarbageWorker is a long running function that waits for
// collectGarbageTrigger channel to signal a garbage collection
// gcWorker is a long running function that waits for
// gcTrigger channel to signal a garbage collection
// run. GC run iterates on gcIndex and removes older items
// form retrieval and other indexes.
func (db *DB) collectGarbageWorker() {
defer close(db.collectGarbageWorkerDone)
func (db *DB) gcWorker() {
defer close(db.gcWorkerDone)

for {
select {
case <-db.collectGarbageTrigger:
case <-db.gcTrigger:
// run a single collect garbage run and
// if done is false, gcBatchSize is reached and
// another collect garbage run is needed
collectedCount, done, err := db.collectGarbage()
collectedCount, done, err := db.gc()
if err != nil {
db.logger.Errorf("localstore: collect garbage: %v", err)
}
Expand All @@ -62,22 +61,22 @@ func (db *DB) collectGarbageWorker() {
db.triggerGarbageCollection()
}

if testHookCollectGarbage != nil {
testHookCollectGarbage(collectedCount)
if testGCHook != nil {
testGCHook(collectedCount)
}
case <-db.close:
return
}
}
}

// collectGarbage removes chunks from retrieval and other
// gc removes chunks from retrieval and other
// indexes if maximal number of chunks in database is reached.
// This function returns the number of removed chunks. If done
// is false, another call to this function is needed to collect
// the rest of the garbage as the batch size limit is reached.
// This function is called in collectGarbageWorker.
func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
// This function is called in gcWorker.
func (db *DB) gc() (collectedCount uint64, done bool, err error) {
db.metrics.GCCounter.Inc()
defer totalTimeMetric(db.metrics.TotalTimeCollectGarbage, time.Now())
defer func() {
Expand All @@ -93,13 +92,6 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
db.batchMu.Lock()
defer db.batchMu.Unlock()

// run through the recently pinned chunks and
// remove them from the gcIndex before iterating through gcIndex
err = db.removeChunksInExcludeIndexFromGC()
if err != nil {
return 0, true, fmt.Errorf("remove chunks in exclude index: %v", err)
}

gcSize, err := db.gcSize.Get()
if err != nil {
return 0, true, err
Expand All @@ -111,31 +103,13 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
if gcSize-collectedCount <= target {
return true, nil
}

db.metrics.GCStoreTimeStamps.Set(float64(item.StoreTimestamp))
db.metrics.GCStoreAccessTimeStamps.Set(float64(item.AccessTimestamp))

// delete from retrieve, pull, gc
err = db.retrievalDataIndex.DeleteInBatch(batch, item)
if err != nil {
return true, nil
}
err = db.retrievalAccessIndex.DeleteInBatch(batch, item)
if err != nil {
return true, nil
}
err = db.pullIndex.DeleteInBatch(batch, item)
if err != nil {
return true, nil
}
err = db.gcIndex.DeleteInBatch(batch, item)
gcSizeChange, err := db.setRemove(batch, item, false)
if err != nil {
return true, nil
}
collectedCount++
collectedCount += uint64(-gcSizeChange)
if collectedCount >= gcBatchSize {
// bach size limit reached,
// another gc run is needed
// batch size limit reached, another gc run is needed
done = false
return true, nil
}
Expand All @@ -149,93 +123,23 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
db.gcSize.PutInBatch(batch, gcSize-collectedCount)
err = db.shed.WriteBatch(batch)
if err != nil {
db.metrics.GCExcludeWriteBatchError.Inc()
db.metrics.GCWriteBatchError.Inc()
return 0, false, err
}
return collectedCount, done, nil
}

// removeChunksInExcludeIndexFromGC removed any recently chunks in the exclude Index, from the gcIndex.
func (db *DB) removeChunksInExcludeIndexFromGC() (err error) {
db.metrics.GCExcludeCounter.Inc()
defer totalTimeMetric(db.metrics.TotalTimeGCExclude, time.Now())
defer func() {
if err != nil {
db.metrics.GCExcludeError.Inc()
}
}()

batch := new(leveldb.Batch)
excludedCount := 0
var gcSizeChange int64
err = db.gcExcludeIndex.Iterate(func(item shed.Item) (stop bool, err error) {
// Get access timestamp
retrievalAccessIndexItem, err := db.retrievalAccessIndex.Get(item)
if err != nil {
return false, err
}
item.AccessTimestamp = retrievalAccessIndexItem.AccessTimestamp

// Get the binId
retrievalDataIndexItem, err := db.retrievalDataIndex.Get(item)
if err != nil {
return false, err
}
item.BinID = retrievalDataIndexItem.BinID

// Check if this item is in gcIndex and remove it
ok, err := db.gcIndex.Has(item)
if err != nil {
return false, nil
}
if ok {
err = db.gcIndex.DeleteInBatch(batch, item)
if err != nil {
return false, nil
}
if _, err := db.gcIndex.Get(item); err == nil {
gcSizeChange--
}
excludedCount++
err = db.gcExcludeIndex.DeleteInBatch(batch, item)
if err != nil {
return false, nil
}
}

return false, nil
}, nil)
if err != nil {
return err
}

// update the gc size based on the no of entries deleted in gcIndex
err = db.incGCSizeInBatch(batch, gcSizeChange)
if err != nil {
return err
}

db.metrics.GCExcludeCounter.Inc()
err = db.shed.WriteBatch(batch)
if err != nil {
db.metrics.GCExcludeWriteBatchError.Inc()
return err
}

return nil
}

// gcTrigger retruns the absolute value for garbage collection
// target value, calculated from db.capacity and gcTargetRatio.
func (db *DB) gcTarget() (target uint64) {
return uint64(float64(db.capacity) * gcTargetRatio)
}

// triggerGarbageCollection signals collectGarbageWorker
// to call collectGarbage.
// triggerGarbageCollection signals gcWorker
// to call gc.
func (db *DB) triggerGarbageCollection() {
select {
case db.collectGarbageTrigger <- struct{}{}:
case db.gcTrigger <- struct{}{}:
case <-db.close:
default:
}
Expand All @@ -253,29 +157,17 @@ func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) {
return err
}

var newSize uint64
if change > 0 {
newSize = gcSize + uint64(change)
} else {
// 'change' is an int64 and is negative
// a conversion is needed with correct sign
c := uint64(-change)
if c > gcSize {
// protect uint64 undeflow
return nil
}
newSize = gcSize - c
}
db.gcSize.PutInBatch(batch, newSize)
gcSize = uint64(int64(gcSize) + change)
db.gcSize.PutInBatch(batch, gcSize)

// trigger garbage collection if we reached the capacity
if newSize >= db.capacity {
if gcSize >= db.capacity {
db.triggerGarbageCollection()
}
return nil
}

// testHookCollectGarbage is a hook that can provide
// testGCHook is a hook that can provide
// information when a garbage collection run is done
// and how many items it removed.
var testHookCollectGarbage func(collectedCount uint64)
var testGCHook func(collectedCount uint64)
Loading

0 comments on commit 9b978c0

Please sign in to comment.