From 9b978c0e0cd10eb23598e6fc938072dae5dc678a Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 15 Mar 2021 22:07:52 +0100 Subject: [PATCH] localstore reserve logic --- pkg/localstore/gc.go | 154 ++----------- pkg/localstore/gc_test.go | 353 +++++++++++++----------------- pkg/localstore/localstore.go | 84 +++---- pkg/localstore/localstore_test.go | 7 +- pkg/localstore/mode_get_test.go | 2 + pkg/localstore/mode_put.go | 56 ++--- pkg/localstore/mode_put_test.go | 30 +-- pkg/localstore/mode_set.go | 218 +++++++++--------- pkg/localstore/mode_set_test.go | 66 ------ pkg/localstore/pin_test.go | 183 +++++++++++++--- pkg/localstore/postage.go | 202 +++++++++++++++++ pkg/localstore/postage_test.go | 127 +++++++++++ pkg/node/node.go | 4 +- pkg/storage/testing/chunk.go | 10 + 14 files changed, 850 insertions(+), 646 deletions(-) create mode 100644 pkg/localstore/postage.go create mode 100644 pkg/localstore/postage_test.go diff --git a/pkg/localstore/gc.go b/pkg/localstore/gc.go index 336a21e233b..8f8813f3525 100644 --- a/pkg/localstore/gc.go +++ b/pkg/localstore/gc.go @@ -18,7 +18,6 @@ package localstore import ( "errors" - "fmt" "time" "github.com/ethersphere/bee/pkg/shed" @@ -40,20 +39,20 @@ var ( gcBatchSize uint64 = 200 ) -// collectGarbageWorker is a long running function that waits for -// collectGarbageTrigger channel to signal a garbage collection +// gcWorker is a long running function that waits for +// gcTrigger channel to signal a garbage collection // run. GC run iterates on gcIndex and removes older items // form retrieval and other indexes. -func (db *DB) collectGarbageWorker() { - defer close(db.collectGarbageWorkerDone) +func (db *DB) gcWorker() { + defer close(db.gcWorkerDone) for { select { - case <-db.collectGarbageTrigger: + case <-db.gcTrigger: // run a single collect garbage run and // if done is false, gcBatchSize is reached and // another collect garbage run is needed - collectedCount, done, err := db.collectGarbage() + collectedCount, done, err := db.gc() if err != nil { db.logger.Errorf("localstore: collect garbage: %v", err) } @@ -62,8 +61,8 @@ func (db *DB) collectGarbageWorker() { db.triggerGarbageCollection() } - if testHookCollectGarbage != nil { - testHookCollectGarbage(collectedCount) + if testGCHook != nil { + testGCHook(collectedCount) } case <-db.close: return @@ -71,13 +70,13 @@ func (db *DB) collectGarbageWorker() { } } -// collectGarbage removes chunks from retrieval and other +// gc removes chunks from retrieval and other // indexes if maximal number of chunks in database is reached. // This function returns the number of removed chunks. If done // is false, another call to this function is needed to collect // the rest of the garbage as the batch size limit is reached. -// This function is called in collectGarbageWorker. -func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { +// This function is called in gcWorker. +func (db *DB) gc() (collectedCount uint64, done bool, err error) { db.metrics.GCCounter.Inc() defer totalTimeMetric(db.metrics.TotalTimeCollectGarbage, time.Now()) defer func() { @@ -93,13 +92,6 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { db.batchMu.Lock() defer db.batchMu.Unlock() - // run through the recently pinned chunks and - // remove them from the gcIndex before iterating through gcIndex - err = db.removeChunksInExcludeIndexFromGC() - if err != nil { - return 0, true, fmt.Errorf("remove chunks in exclude index: %v", err) - } - gcSize, err := db.gcSize.Get() if err != nil { return 0, true, err @@ -111,31 +103,13 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { if gcSize-collectedCount <= target { return true, nil } - - db.metrics.GCStoreTimeStamps.Set(float64(item.StoreTimestamp)) - db.metrics.GCStoreAccessTimeStamps.Set(float64(item.AccessTimestamp)) - - // delete from retrieve, pull, gc - err = db.retrievalDataIndex.DeleteInBatch(batch, item) - if err != nil { - return true, nil - } - err = db.retrievalAccessIndex.DeleteInBatch(batch, item) - if err != nil { - return true, nil - } - err = db.pullIndex.DeleteInBatch(batch, item) - if err != nil { - return true, nil - } - err = db.gcIndex.DeleteInBatch(batch, item) + gcSizeChange, err := db.setRemove(batch, item, false) if err != nil { return true, nil } - collectedCount++ + collectedCount += uint64(-gcSizeChange) if collectedCount >= gcBatchSize { - // bach size limit reached, - // another gc run is needed + // batch size limit reached, another gc run is needed done = false return true, nil } @@ -149,93 +123,23 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { db.gcSize.PutInBatch(batch, gcSize-collectedCount) err = db.shed.WriteBatch(batch) if err != nil { - db.metrics.GCExcludeWriteBatchError.Inc() + db.metrics.GCWriteBatchError.Inc() return 0, false, err } return collectedCount, done, nil } -// removeChunksInExcludeIndexFromGC removed any recently chunks in the exclude Index, from the gcIndex. -func (db *DB) removeChunksInExcludeIndexFromGC() (err error) { - db.metrics.GCExcludeCounter.Inc() - defer totalTimeMetric(db.metrics.TotalTimeGCExclude, time.Now()) - defer func() { - if err != nil { - db.metrics.GCExcludeError.Inc() - } - }() - - batch := new(leveldb.Batch) - excludedCount := 0 - var gcSizeChange int64 - err = db.gcExcludeIndex.Iterate(func(item shed.Item) (stop bool, err error) { - // Get access timestamp - retrievalAccessIndexItem, err := db.retrievalAccessIndex.Get(item) - if err != nil { - return false, err - } - item.AccessTimestamp = retrievalAccessIndexItem.AccessTimestamp - - // Get the binId - retrievalDataIndexItem, err := db.retrievalDataIndex.Get(item) - if err != nil { - return false, err - } - item.BinID = retrievalDataIndexItem.BinID - - // Check if this item is in gcIndex and remove it - ok, err := db.gcIndex.Has(item) - if err != nil { - return false, nil - } - if ok { - err = db.gcIndex.DeleteInBatch(batch, item) - if err != nil { - return false, nil - } - if _, err := db.gcIndex.Get(item); err == nil { - gcSizeChange-- - } - excludedCount++ - err = db.gcExcludeIndex.DeleteInBatch(batch, item) - if err != nil { - return false, nil - } - } - - return false, nil - }, nil) - if err != nil { - return err - } - - // update the gc size based on the no of entries deleted in gcIndex - err = db.incGCSizeInBatch(batch, gcSizeChange) - if err != nil { - return err - } - - db.metrics.GCExcludeCounter.Inc() - err = db.shed.WriteBatch(batch) - if err != nil { - db.metrics.GCExcludeWriteBatchError.Inc() - return err - } - - return nil -} - // gcTrigger retruns the absolute value for garbage collection // target value, calculated from db.capacity and gcTargetRatio. func (db *DB) gcTarget() (target uint64) { return uint64(float64(db.capacity) * gcTargetRatio) } -// triggerGarbageCollection signals collectGarbageWorker -// to call collectGarbage. +// triggerGarbageCollection signals gcWorker +// to call gc. func (db *DB) triggerGarbageCollection() { select { - case db.collectGarbageTrigger <- struct{}{}: + case db.gcTrigger <- struct{}{}: case <-db.close: default: } @@ -253,29 +157,17 @@ func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) { return err } - var newSize uint64 - if change > 0 { - newSize = gcSize + uint64(change) - } else { - // 'change' is an int64 and is negative - // a conversion is needed with correct sign - c := uint64(-change) - if c > gcSize { - // protect uint64 undeflow - return nil - } - newSize = gcSize - c - } - db.gcSize.PutInBatch(batch, newSize) + gcSize = uint64(int64(gcSize) + change) + db.gcSize.PutInBatch(batch, gcSize) // trigger garbage collection if we reached the capacity - if newSize >= db.capacity { + if gcSize >= db.capacity { db.triggerGarbageCollection() } return nil } -// testHookCollectGarbage is a hook that can provide +// testGCHook is a hook that can provide // information when a garbage collection run is done // and how many items it removed. -var testHookCollectGarbage func(collectedCount uint64) +var testGCHook func(collectedCount uint64) diff --git a/pkg/localstore/gc_test.go b/pkg/localstore/gc_test.go index 44d5988ed1a..2a10f230b35 100644 --- a/pkg/localstore/gc_test.go +++ b/pkg/localstore/gc_test.go @@ -33,35 +33,26 @@ import ( "github.com/syndtr/goleveldb/leveldb" ) -// TestDB_collectGarbageWorker tests garbage collection runs -// by uploading and syncing a number of chunks. -func TestDB_collectGarbageWorker(t *testing.T) { - testDBCollectGarbageWorker(t) -} - -// TestDB_collectGarbageWorker_multipleBatches tests garbage -// collection runs by uploading and syncing a number of -// chunks by having multiple smaller batches. -func TestDB_collectGarbageWorker_multipleBatches(t *testing.T) { - // lower the maximal number of chunks in a single - // gc batch to ensure multiple batches. +// TestGC tests garbage collection runs +// after uploading and syncing a number of chunks. +func TestGC(t *testing.T) { + t.Run("single round", testGC) defer func(s uint64) { gcBatchSize = s }(gcBatchSize) gcBatchSize = 2 - - testDBCollectGarbageWorker(t) + t.Run("multiple rounds", testGC) } -// testDBCollectGarbageWorker is a helper test function to test -// garbage collection runs by uploading and syncing a number of chunks. -func testDBCollectGarbageWorker(t *testing.T) { - +// testGC is a helper test function to test +// garbage collection runs after uploading and syncing a number of chunks. +func testGC(t *testing.T) { + ctx := context.Background() chunkCount := 150 var closed chan struct{} - testHookCollectGarbageChan := make(chan uint64) - t.Cleanup(setTestHookCollectGarbage(func(collectedCount uint64) { + testGCHookChan := make(chan uint64) + t.Cleanup(setTestGCHook(func(collectedCount uint64) { select { - case testHookCollectGarbageChan <- collectedCount: + case testGCHookChan <- collectedCount: case <-closed: } })) @@ -70,60 +61,47 @@ func testDBCollectGarbageWorker(t *testing.T) { }) closed = db.close - addrs := make([]swarm.Address, 0) - - // upload random chunks - for i := 0; i < chunkCount; i++ { - ch := generateTestRandomChunk() - - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) - if err != nil { - t.Fatal(err) - } + chunks := generateTestRandomChunks(chunkCount) + _, err := db.Put(ctx, storage.ModePutSync, chunks...) + if err != nil { + t.Fatal(err) + } - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) + target := db.gcTarget() + sizef := func() uint64 { + size, err := db.gcSize.Get() if err != nil { t.Fatal(err) } - - addrs = append(addrs, ch.Address()) - + return size } - - gcTarget := db.gcTarget() - - for { + size := sizef() + for target < size { + t.Logf("GC index: size=%d, target=%d", size, target) select { - case <-testHookCollectGarbageChan: + case <-testGCHookChan: case <-time.After(10 * time.Second): - t.Error("collect garbage timeout") - } - gcSize, err := db.gcSize.Get() - if err != nil { - t.Fatal(err) - } - if gcSize == gcTarget { - break + t.Fatalf("collect garbage timeout") } + size = sizef() } + t.Run("pull index count", newItemsCountTest(db.pullIndex, int(target))) - t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget))) - - t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget))) + t.Run("gc index count", newItemsCountTest(db.gcIndex, int(target))) t.Run("gc size", newIndexGCSizeTest(db)) // the first synced chunk should be removed t.Run("get the first synced chunk", func(t *testing.T) { - _, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[0]) + _, err := db.Get(ctx, storage.ModeGetRequest, chunks[0].Address()) if !errors.Is(err, storage.ErrNotFound) { t.Errorf("got error %v, want %v", err, storage.ErrNotFound) } }) t.Run("only first inserted chunks should be removed", func(t *testing.T) { - for i := 0; i < (chunkCount - int(gcTarget)); i++ { - _, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[i]) + for i := 0; i < (chunkCount - int(target)); i++ { + _, err := db.Get(ctx, storage.ModeGetRequest, chunks[i].Address()) if !errors.Is(err, storage.ErrNotFound) { t.Errorf("got error %v, want %v", err, storage.ErrNotFound) } @@ -132,7 +110,7 @@ func testDBCollectGarbageWorker(t *testing.T) { // last synced chunk should not be removed t.Run("get most recent synced chunk", func(t *testing.T) { - _, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[len(addrs)-1]) + _, err := db.Get(ctx, storage.ModeGetRequest, chunks[len(chunks)-1].Address()) if err != nil { t.Fatal(err) } @@ -142,16 +120,16 @@ func testDBCollectGarbageWorker(t *testing.T) { // Pin a file, upload chunks to go past the gc limit to trigger GC, // check if the pinned files are still around and removed from gcIndex func TestPinGC(t *testing.T) { - + ctx := context.Background() chunkCount := 150 pinChunksCount := 50 dbCapacity := uint64(100) var closed chan struct{} - testHookCollectGarbageChan := make(chan uint64) - t.Cleanup(setTestHookCollectGarbage(func(collectedCount uint64) { + testGCHookChan := make(chan uint64) + t.Cleanup(setTestGCHook(func(collectedCount uint64) { select { - case testHookCollectGarbageChan <- collectedCount: + case testGCHookChan <- collectedCount: case <-closed: } })) @@ -165,35 +143,47 @@ func TestPinGC(t *testing.T) { pinAddrs := make([]swarm.Address, 0) // upload random chunks - for i := 0; i < chunkCount; i++ { + for cnt := 0; cnt < chunkCount; cnt++ { ch := generateTestRandomChunk() - - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + addr := ch.Address() + _, err := db.Put(ctx, storage.ModePutUpload, ch) if err != nil { t.Fatal(err) } - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) + err = db.Set(ctx, storage.ModeSetSync, addr) if err != nil { t.Fatal(err) } - addrs = append(addrs, ch.Address()) + addrs = append(addrs, addr) // Pin the chunks at the beginning to make sure they are not removed by GC - if i < pinChunksCount { - err = db.Set(context.Background(), storage.ModeSetPin, ch.Address()) + if cnt < pinChunksCount { + err = db.Set(ctx, storage.ModeSetPin, addr) if err != nil { t.Fatal(err) } - pinAddrs = append(pinAddrs, ch.Address()) + item := addressToItem(addr) + i, err := db.retrievalAccessIndex.Get(item) + if err != nil { + t.Fatal(err) + } + item.AccessTimestamp = i.AccessTimestamp + if _, err := db.gcIndex.Get(item); !errors.Is(err, leveldb.ErrNotFound) { + t.Fatal("pinned chunk present in gcIndex") + } + if _, err := db.Get(ctx, storage.ModeGetSync, addr); err != nil { + t.Fatal(err) + } + pinAddrs = append(pinAddrs, addr) } } gcTarget := db.gcTarget() for { select { - case <-testHookCollectGarbageChan: + case <-testGCHookChan: case <-time.After(10 * time.Second): t.Error("collect garbage timeout") } @@ -205,46 +195,34 @@ func TestPinGC(t *testing.T) { break } } - - t.Run("pin Index count", newItemsCountTest(db.pinIndex, pinChunksCount)) - - t.Run("gc exclude index count", newItemsCountTest(db.gcExcludeIndex, 0)) - - t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)+pinChunksCount)) - - t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget))) - - t.Run("gc size", newIndexGCSizeTest(db)) + afterCnt := int(gcTarget) + pinChunksCount + runCountsTest(t, "after GC", db, afterCnt, afterCnt, 0, afterCnt, pinChunksCount, int(gcTarget)) t.Run("pinned chunk not in gc Index", func(t *testing.T) { - err := db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) { - for _, pinHash := range pinAddrs { - if bytes.Equal(pinHash.Bytes(), item.Address) { - t.Fatal("pin chunk present in gcIndex") - } + for _, addr := range pinAddrs { + item := addressToItem(addr) + i, err := db.retrievalAccessIndex.Get(item) + if err != nil { + t.Fatal(err) + } + item.AccessTimestamp = i.AccessTimestamp + if _, err := db.gcIndex.Get(item); !errors.Is(err, leveldb.ErrNotFound) { + t.Fatal("pinned chunk present in gcIndex") } - return false, nil - }, nil) - if err != nil { - t.Fatal("could not iterate gcIndex") } }) - t.Run("pinned chunks exists", func(t *testing.T) { - for _, hash := range pinAddrs { - _, err := db.Get(context.Background(), storage.ModeGetRequest, hash) - if err != nil { - t.Fatal(err) - } + t.Run("pinned chunks exist", func(t *testing.T) { + if _, err := db.GetMulti(ctx, storage.ModeGetRequest, pinAddrs...); err != nil { + t.Fatal(err) } }) - t.Run("first chunks after pinned chunks should be removed", func(t *testing.T) { - for i := pinChunksCount; i < (int(dbCapacity) - int(gcTarget)); i++ { - _, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[i]) - if !errors.Is(err, leveldb.ErrNotFound) { - t.Fatal(err) - } + t.Run("first chunks not pinned removed", func(t *testing.T) { + recentIdx := chunkCount - int(dbCapacity) + int(gcTarget) + _, err := db.GetMulti(ctx, storage.ModeGetRequest, addrs[recentIdx:]...) + if err != nil { + t.Fatal(err) } }) } @@ -252,6 +230,7 @@ func TestPinGC(t *testing.T) { // Upload chunks, pin those chunks, add to GC after it is pinned // check if the pinned files are still around func TestGCAfterPin(t *testing.T) { + ctx := context.Background() chunkCount := 50 @@ -265,19 +244,19 @@ func TestGCAfterPin(t *testing.T) { for i := 0; i < chunkCount; i++ { ch := generateTestRandomChunk() - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + _, err := db.Put(ctx, storage.ModePutUpload, ch) if err != nil { t.Fatal(err) } // Pin before adding to GC in ModeSetSync - err = db.Set(context.Background(), storage.ModeSetPin, ch.Address()) + err = db.Set(ctx, storage.ModeSetPin, ch.Address()) if err != nil { t.Fatal(err) } pinAddrs = append(pinAddrs, ch.Address()) - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) + err = db.Set(ctx, storage.ModeSetSync, ch.Address()) if err != nil { t.Fatal(err) } @@ -285,48 +264,38 @@ func TestGCAfterPin(t *testing.T) { t.Run("pin Index count", newItemsCountTest(db.pinIndex, chunkCount)) - t.Run("gc exclude index count", newItemsCountTest(db.gcExcludeIndex, chunkCount)) - - t.Run("gc index count", newItemsCountTest(db.gcIndex, int(0))) + t.Run("gc index count", newItemsCountTest(db.gcIndex, 0)) - for _, hash := range pinAddrs { - _, err := db.Get(context.Background(), storage.ModeGetRequest, hash) + for _, addr := range pinAddrs { + _, err := db.Get(ctx, storage.ModeGetRequest, addr) if err != nil { t.Fatal(err) } } } -// TestDB_collectGarbageWorker_withRequests is a helper test function -// to test garbage collection runs by uploading, syncing and -// requesting a number of chunks. -func TestDB_collectGarbageWorker_withRequests(t *testing.T) { +// TestGCRequests +func TestGCRequests(t *testing.T) { + ctx := context.Background() + capacity := 100 db := newTestDB(t, &Options{ - Capacity: 100, + Capacity: uint64(capacity), }) - testHookCollectGarbageChan := make(chan uint64) - defer setTestHookCollectGarbage(func(collectedCount uint64) { - testHookCollectGarbageChan <- collectedCount + testGCHookChan := make(chan uint64) + defer setTestGCHook(func(collectedCount uint64) { + testGCHookChan <- collectedCount })() - addrs := make([]swarm.Address, 0) - // upload random chunks just up to the capacity - for i := 0; i < int(db.capacity)-1; i++ { - ch := generateTestRandomChunk() - - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) - if err != nil { - t.Fatal(err) - } - - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) - if err != nil { - t.Fatal(err) - } - - addrs = append(addrs, ch.Address()) + chunks := generateTestRandomChunks(capacity) + _, err := db.Put(ctx, storage.ModePutSync, chunks[1:]...) + if err != nil { + t.Fatal(err) + } + addrs := make([]swarm.Address, capacity) + for i, ch := range chunks { + addrs[i] = ch.Address() } // set update gc test hook to signal when @@ -340,7 +309,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { // request the latest synced chunk // to prioritize it in the gc index // not to be collected - _, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[0]) + _, err = db.GetMulti(ctx, storage.ModeGetRequest, addrs[1:10]...) if err != nil { t.Fatal(err) } @@ -358,25 +327,17 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { // upload and sync another chunk to trigger // garbage collection - ch := generateTestRandomChunk() - _, err = db.Put(context.Background(), storage.ModePutUpload, ch) + _, err = db.Put(ctx, storage.ModePutSync, chunks[0]) if err != nil { t.Fatal(err) } - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) - if err != nil { - t.Fatal(err) - } - addrs = append(addrs, ch.Address()) // wait for garbage collection - gcTarget := db.gcTarget() - var totalCollectedCount uint64 for { select { - case c := <-testHookCollectGarbageChan: + case c := <-testGCHookChan: totalCollectedCount += c case <-time.After(10 * time.Second): t.Error("collect garbage timeout") @@ -394,32 +355,25 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { if totalCollectedCount != wantTotalCollectedCount { t.Errorf("total collected chunks %v, want %v", totalCollectedCount, wantTotalCollectedCount) } - - t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget))) - - t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget))) - - t.Run("gc size", newIndexGCSizeTest(db)) - - // requested chunk should not be removed - t.Run("get requested chunk", func(t *testing.T) { - _, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[0]) + cnt := int(gcTarget) + runCountsTest(t, "after GC", db, cnt, cnt, 0, cnt, 0, cnt) + t.Run("requested chunks not removed", func(t *testing.T) { + _, err := db.GetMulti(ctx, storage.ModeGetRequest, addrs[0:10]...) if err != nil { t.Fatal(err) } }) - // the second synced chunk should be removed - t.Run("get gc-ed chunk", func(t *testing.T) { - _, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[1]) + t.Run("longest requested chunks get gc-ed", func(t *testing.T) { + _, err := db.GetMulti(ctx, storage.ModeGetRequest, addrs[10:20]...) if !errors.Is(err, storage.ErrNotFound) { t.Errorf("got error %v, want %v", err, storage.ErrNotFound) } }) // last synced chunk should not be removed - t.Run("get most recent synced chunk", func(t *testing.T) { - _, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[len(addrs)-1]) + t.Run("recent chunks not removed", func(t *testing.T) { + _, err := db.GetMulti(ctx, storage.ModeGetRequest, addrs[20:]...) if err != nil { t.Fatal(err) } @@ -429,6 +383,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { // TestDB_gcSize checks if gcSize has a correct value after // database is initialized with existing data. func TestDB_gcSize(t *testing.T) { + ctx := context.Background() dir, err := ioutil.TempDir("", "localstore-stored-gc-size") if err != nil { t.Fatal(err) @@ -449,12 +404,12 @@ func TestDB_gcSize(t *testing.T) { for i := 0; i < count; i++ { ch := generateTestRandomChunk() - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + _, err := db.Put(ctx, storage.ModePutUpload, ch) if err != nil { t.Fatal(err) } - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) + err = db.Set(ctx, storage.ModeSetSync, ch.Address()) if err != nil { t.Fatal(err) } @@ -472,22 +427,22 @@ func TestDB_gcSize(t *testing.T) { t.Run("gc index size", newIndexGCSizeTest(db)) } -// setTestHookCollectGarbage sets testHookCollectGarbage and +// setTestGCHook sets testGCHook and // returns a function that will reset it to the // value before the change. -func setTestHookCollectGarbage(h func(collectedCount uint64)) (reset func()) { - current := testHookCollectGarbage - reset = func() { testHookCollectGarbage = current } - testHookCollectGarbage = h +func setTestGCHook(h func(collectedCount uint64)) (reset func()) { + current := testGCHook + reset = func() { testGCHook = current } + testGCHook = h return reset } -// TestSetTestHookCollectGarbage tests if setTestHookCollectGarbage changes -// testHookCollectGarbage function correctly and if its reset function +// TestSetTestGCHook tests if setTestGCHook changes +// testGCHook function correctly and if its reset function // resets the original function. -func TestSetTestHookCollectGarbage(t *testing.T) { +func TestSetTestGCHook(t *testing.T) { // Set the current function after the test finishes. - defer func(h func(collectedCount uint64)) { testHookCollectGarbage = h }(testHookCollectGarbage) + defer func(h func(collectedCount uint64)) { testGCHook = h }(testGCHook) // expected value for the unchanged function original := 1 @@ -498,12 +453,12 @@ func TestSetTestHookCollectGarbage(t *testing.T) { var got int // define the original (unchanged) functions - testHookCollectGarbage = func(_ uint64) { + testGCHook = func(_ uint64) { got = original } // set got variable - testHookCollectGarbage(0) + testGCHook(0) // test if got variable is set correctly if got != original { @@ -511,12 +466,12 @@ func TestSetTestHookCollectGarbage(t *testing.T) { } // set the new function - reset := setTestHookCollectGarbage(func(_ uint64) { + reset := setTestGCHook(func(_ uint64) { got = changed }) // set got variable - testHookCollectGarbage(0) + testGCHook(0) // test if got variable is set correctly to changed value if got != changed { @@ -527,7 +482,7 @@ func TestSetTestHookCollectGarbage(t *testing.T) { reset() // set got variable - testHookCollectGarbage(0) + testGCHook(0) // test if got variable is set correctly to original value if got != original { @@ -536,6 +491,7 @@ func TestSetTestHookCollectGarbage(t *testing.T) { } func TestPinAfterMultiGC(t *testing.T) { + ctx := context.Background() db := newTestDB(t, &Options{ Capacity: 10, }) @@ -545,11 +501,11 @@ func TestPinAfterMultiGC(t *testing.T) { // upload random chunks above db capacity to see if chunks are still pinned for i := 0; i < 20; i++ { ch := generateTestRandomChunk() - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + _, err := db.Put(ctx, storage.ModePutUpload, ch) if err != nil { t.Fatal(err) } - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) + err = db.Set(ctx, storage.ModeSetSync, ch.Address()) if err != nil { t.Fatal(err) } @@ -561,22 +517,22 @@ func TestPinAfterMultiGC(t *testing.T) { } for i := 0; i < 20; i++ { ch := generateTestRandomChunk() - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + _, err := db.Put(ctx, storage.ModePutUpload, ch) if err != nil { t.Fatal(err) } - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) + err = db.Set(ctx, storage.ModeSetSync, ch.Address()) if err != nil { t.Fatal(err) } } for i := 0; i < 20; i++ { ch := generateTestRandomChunk() - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + _, err := db.Put(ctx, storage.ModePutUpload, ch) if err != nil { t.Fatal(err) } - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) + err = db.Set(ctx, storage.ModeSetSync, ch.Address()) if err != nil { t.Fatal(err) } @@ -589,7 +545,7 @@ func TestPinAfterMultiGC(t *testing.T) { outItem := shed.Item{ Address: addr.Bytes(), } - gotChunk, err := db.Get(context.Background(), storage.ModeGetRequest, swarm.NewAddress(outItem.Address)) + gotChunk, err := db.Get(ctx, storage.ModeGetRequest, swarm.NewAddress(outItem.Address)) if err != nil { t.Fatal(err) } @@ -601,18 +557,19 @@ func TestPinAfterMultiGC(t *testing.T) { } func generateAndPinAChunk(t *testing.T, db *DB) swarm.Chunk { + ctx := context.Background() // Create a chunk and pin it pinnedChunk := generateTestRandomChunk() - _, err := db.Put(context.Background(), storage.ModePutUpload, pinnedChunk) + _, err := db.Put(ctx, storage.ModePutUpload, pinnedChunk) if err != nil { t.Fatal(err) } - err = db.Set(context.Background(), storage.ModeSetPin, pinnedChunk.Address()) + err = db.Set(ctx, storage.ModeSetPin, pinnedChunk.Address()) if err != nil { t.Fatal(err) } - err = db.Set(context.Background(), storage.ModeSetSync, pinnedChunk.Address()) + err = db.Set(ctx, storage.ModeSetSync, pinnedChunk.Address()) if err != nil { t.Fatal(err) } @@ -620,11 +577,12 @@ func generateAndPinAChunk(t *testing.T, db *DB) swarm.Chunk { } func TestPinSyncAndAccessPutSetChunkMultipleTimes(t *testing.T) { + ctx := context.Background() var closed chan struct{} - testHookCollectGarbageChan := make(chan uint64) - t.Cleanup(setTestHookCollectGarbage(func(collectedCount uint64) { + testGCHookChan := make(chan uint64) + t.Cleanup(setTestGCHook(func(collectedCount uint64) { select { - case testHookCollectGarbageChan <- collectedCount: + case testGCHookChan <- collectedCount: case <-closed: } })) @@ -636,13 +594,13 @@ func TestPinSyncAndAccessPutSetChunkMultipleTimes(t *testing.T) { pinnedChunks := addRandomChunks(t, 5, db, true) rand1Chunks := addRandomChunks(t, 15, db, false) for _, ch := range pinnedChunks { - _, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address()) + _, err := db.Get(ctx, storage.ModeGetRequest, ch.Address()) if err != nil { t.Fatal(err) } } for _, ch := range rand1Chunks { - _, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address()) + _, err := db.Get(ctx, storage.ModeGetRequest, ch.Address()) if err != nil { // ignore the chunks that are GCd continue @@ -651,7 +609,7 @@ func TestPinSyncAndAccessPutSetChunkMultipleTimes(t *testing.T) { rand2Chunks := addRandomChunks(t, 20, db, false) for _, ch := range rand2Chunks { - _, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address()) + _, err := db.Get(ctx, storage.ModeGetRequest, ch.Address()) if err != nil { // ignore the chunks that are GCd continue @@ -661,7 +619,7 @@ func TestPinSyncAndAccessPutSetChunkMultipleTimes(t *testing.T) { rand3Chunks := addRandomChunks(t, 20, db, false) for _, ch := range rand3Chunks { - _, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address()) + _, err := db.Get(ctx, storage.ModeGetRequest, ch.Address()) if err != nil { // ignore the chunks that are GCd continue @@ -670,7 +628,7 @@ func TestPinSyncAndAccessPutSetChunkMultipleTimes(t *testing.T) { // check if the pinned chunk is present after GC for _, ch := range pinnedChunks { - gotChunk, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address()) + gotChunk, err := db.Get(ctx, storage.ModeGetRequest, ch.Address()) if err != nil { t.Fatal("Pinned chunk missing ", err) } @@ -686,31 +644,32 @@ func TestPinSyncAndAccessPutSetChunkMultipleTimes(t *testing.T) { } func addRandomChunks(t *testing.T, count int, db *DB, pin bool) []swarm.Chunk { + ctx := context.Background() var chunks []swarm.Chunk for i := 0; i < count; i++ { ch := generateTestRandomChunk() - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + _, err := db.Put(ctx, storage.ModePutUpload, ch) if err != nil { t.Fatal(err) } if pin { - err = db.Set(context.Background(), storage.ModeSetPin, ch.Address()) + err = db.Set(ctx, storage.ModeSetPin, ch.Address()) if err != nil { t.Fatal(err) } - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) + err = db.Set(ctx, storage.ModeSetSync, ch.Address()) if err != nil { t.Fatal(err) } - _, err = db.Get(context.Background(), storage.ModeGetRequest, ch.Address()) + _, err = db.Get(ctx, storage.ModeGetRequest, ch.Address()) if err != nil { t.Fatal(err) } } else { // Non pinned chunks could be GC'd by the time they reach here. // so it is okay to ignore the error - _ = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) - _, _ = db.Get(context.Background(), storage.ModeGetRequest, ch.Address()) + _ = db.Set(ctx, storage.ModeSetSync, ch.Address()) + _, _ = db.Get(ctx, storage.ModeGetRequest, ch.Address()) } chunks = append(chunks, ch) } diff --git a/pkg/localstore/localstore.go b/pkg/localstore/localstore.go index 095d81321f6..d2909da3d06 100644 --- a/pkg/localstore/localstore.go +++ b/pkg/localstore/localstore.go @@ -26,6 +26,7 @@ import ( "github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/postage" + mockstamp "github.com/ethersphere/bee/pkg/postage/testing" "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" @@ -81,9 +82,6 @@ type DB struct { // garbage collection index gcIndex shed.Index - // garbage collection exclude index for pinned contents - gcExcludeIndex shed.Index - // pin files Index pinIndex shed.Index @@ -95,7 +93,7 @@ type DB struct { capacity uint64 // triggers garbage collection event loop - collectGarbageTrigger chan struct{} + gcTrigger chan struct{} // a buffered channel acting as a semaphore // to limit the maximal number of goroutines @@ -105,6 +103,9 @@ type DB struct { // are done before closing the database updateGCWG sync.WaitGroup + // postage batch + postage *postageBatches + // baseKey is the overlay address baseKey []byte @@ -117,7 +118,7 @@ type DB struct { // protect Close method from exiting before // garbage collection and gc size write workers // are done - collectGarbageWorkerDone chan struct{} + gcWorkerDone chan struct{} // wait for all subscriptions to finish before closing // underlaying BadgerDB to prevent possible panics from @@ -154,15 +155,15 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB capacity: o.Capacity, baseKey: baseKey, tags: o.Tags, - // channel collectGarbageTrigger + // channel gcTrigger // needs to be buffered with the size of 1 // to signal another event if it // is triggered during already running function - collectGarbageTrigger: make(chan struct{}, 1), - close: make(chan struct{}), - collectGarbageWorkerDone: make(chan struct{}), - metrics: newMetrics(), - logger: logger, + gcTrigger: make(chan struct{}, 1), + gcWorkerDone: make(chan struct{}), + close: make(chan struct{}), + metrics: newMetrics(), + logger: logger, } if db.capacity == 0 { db.capacity = defaultCapacity @@ -216,7 +217,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB // Index storing actual chunk address, data and bin id. headerSize := 16 + postage.StampSize - db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|Stamp|Data", shed.IndexFuncs{ + db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|Sig|Data", shed.IndexFuncs{ EncodeKey: func(fields shed.Item) (key []byte, err error) { return fields.Address, nil }, @@ -276,9 +277,9 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB return nil, err } // pull index allows history and live syncing per po bin - db.pullIndex, err = db.shed.NewIndex("PO|BinID->Hash|Tag", shed.IndexFuncs{ + db.pullIndex, err = db.shed.NewIndex("PO|BinID->Hash|BatchID", shed.IndexFuncs{ EncodeKey: func(fields shed.Item) (key []byte, err error) { - key = make([]byte, 41) + key = make([]byte, 9) key[0] = db.po(swarm.NewAddress(fields.Address)) binary.BigEndian.PutUint64(key[1:9], fields.BinID) return key, nil @@ -288,20 +289,14 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB return e, nil }, EncodeValue: func(fields shed.Item) (value []byte, err error) { - value = make([]byte, 36) // 32 bytes address, 4 bytes tag + value = make([]byte, 64) // 32 bytes address, 32 bytes batch id copy(value, fields.Address) - - if fields.Tag != 0 { - binary.BigEndian.PutUint32(value[32:], fields.Tag) - } - + copy(value[32:], fields.BatchID) return value, nil }, DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { e.Address = value[:32] - if len(value) > 32 { - e.Tag = binary.BigEndian.Uint32(value[32:]) - } + e.BatchID = value[32:64] return e, nil }, }) @@ -394,28 +389,13 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB return nil, err } - // Create a index structure for excluding pinned chunks from gcIndex - db.gcExcludeIndex, err = db.shed.NewIndex("Hash->nil", shed.IndexFuncs{ - EncodeKey: func(fields shed.Item) (key []byte, err error) { - return fields.Address, nil - }, - DecodeKey: func(key []byte) (e shed.Item, err error) { - e.Address = key - return e, nil - }, - EncodeValue: func(fields shed.Item) (value []byte, err error) { - return nil, nil - }, - DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { - return e, nil - }, - }) + db.postage, err = newPostageBatches(db) if err != nil { return nil, err } // start garbage collection worker - go db.collectGarbageWorker() + go db.gcWorker() return db, nil } @@ -430,7 +410,7 @@ func (db *DB) Close() (err error) { db.subscritionsWG.Wait() // wait for gc worker to // return before closing the shed - <-db.collectGarbageWorkerDone + <-db.gcWorkerDone close(done) }() select { @@ -459,13 +439,14 @@ func (db *DB) po(addr swarm.Address) (bin uint8) { func (db *DB) DebugIndices() (indexInfo map[string]int, err error) { indexInfo = make(map[string]int) for k, v := range map[string]shed.Index{ - "retrievalDataIndex": db.retrievalDataIndex, - "retrievalAccessIndex": db.retrievalAccessIndex, - "pushIndex": db.pushIndex, - "pullIndex": db.pullIndex, - "gcIndex": db.gcIndex, - "gcExcludeIndex": db.gcExcludeIndex, - "pinIndex": db.pinIndex, + "retrievalDataIndex": db.retrievalDataIndex, + "retrievalAccessIndex": db.retrievalAccessIndex, + "pushIndex": db.pushIndex, + "pullIndex": db.pullIndex, + "gcIndex": db.gcIndex, + "pinIndex": db.pinIndex, + "postageBatchChunksIndex": db.postage.chunks, + "postageBatchCountsIndex": db.postage.counts, } { indexSize, err := v.Count() if err != nil { @@ -484,12 +465,19 @@ func (db *DB) DebugIndices() (indexInfo map[string]int, err error) { // chunkToItem creates new Item with data provided by the Chunk. func chunkToItem(ch swarm.Chunk) shed.Item { + // FIXME + if ch.Stamp() == nil { + ch = ch.WithStamp(mockstamp.MustNewStamp()) + } return shed.Item{ Address: ch.Address().Bytes(), Data: ch.Data(), Tag: ch.TagID(), + // PinCounter: ch.PinCounter(), BatchID: ch.Stamp().BatchID(), Sig: ch.Stamp().Sig(), + Depth: ch.Depth(), + Radius: ch.Radius(), } } diff --git a/pkg/localstore/localstore_test.go b/pkg/localstore/localstore_test.go index 3d5eaee5381..51b6eece3ef 100644 --- a/pkg/localstore/localstore_test.go +++ b/pkg/localstore/localstore_test.go @@ -172,8 +172,9 @@ func newTestDB(t testing.TB, o *Options) *DB { } var ( - generateTestRandomChunk = chunktesting.GenerateTestRandomChunk - generateTestRandomChunks = chunktesting.GenerateTestRandomChunks + generateTestRandomChunk = chunktesting.GenerateTestRandomChunk + generateTestRandomChunkAt = chunktesting.GenerateTestRandomChunkAt + generateTestRandomChunks = chunktesting.GenerateTestRandomChunks ) // chunkAddresses return chunk addresses of provided chunks. @@ -298,7 +299,7 @@ func newPullIndexTest(db *DB, ch swarm.Chunk, binID uint64, wantError error) fun t.Errorf("got error %v, want %v", err, wantError) } if err == nil { - validateItem(t, item, ch.Address().Bytes(), nil, 0, 0, postage.NewStamp(nil, nil)) + validateItem(t, item, ch.Address().Bytes(), nil, 0, 0, postage.NewStamp(ch.Stamp().BatchID(), nil)) } } } diff --git a/pkg/localstore/mode_get_test.go b/pkg/localstore/mode_get_test.go index f356ad52a9c..79c89b25c0d 100644 --- a/pkg/localstore/mode_get_test.go +++ b/pkg/localstore/mode_get_test.go @@ -100,6 +100,7 @@ func TestModeGetRequest(t *testing.T) { t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, uploadTimestamp, 1, nil)) + t.Run("access count", newItemsCountTest(db.retrievalAccessIndex, 1)) t.Run("gc index count", newItemsCountTest(db.gcIndex, 1)) t.Run("gc size", newIndexGCSizeTest(db)) @@ -130,6 +131,7 @@ func TestModeGetRequest(t *testing.T) { t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, accessTimestamp, 1, nil)) + t.Run("access count", newItemsCountTest(db.retrievalAccessIndex, 1)) t.Run("gc index count", newItemsCountTest(db.gcIndex, 1)) t.Run("gc size", newIndexGCSizeTest(db)) diff --git a/pkg/localstore/mode_put.go b/pkg/localstore/mode_put.go index 5587e36fb59..b3510206d99 100644 --- a/pkg/localstore/mode_put.go +++ b/pkg/localstore/mode_put.go @@ -73,7 +73,7 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e binIDs := make(map[uint8]uint64) switch mode { - case storage.ModePutRequest, storage.ModePutRequestPin: + case storage.ModePutRequest: for i, ch := range chs { if containsChunk(ch.Address(), chs[:i]...) { exist[i] = true @@ -85,13 +85,6 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e } exist[i] = exists gcSizeChange += c - - if mode == storage.ModePutRequestPin { - err = db.setPin(batch, ch.Address()) - if err != nil { - return nil, err - } - } } case storage.ModePutUpload, storage.ModePutUploadPin: @@ -100,7 +93,8 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e exist[i] = true continue } - exists, c, err := db.putUpload(batch, binIDs, chunkToItem(ch)) + item := chunkToItem(ch) + exists, c, err := db.putUpload(batch, binIDs, item) if err != nil { return nil, err } @@ -113,11 +107,12 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e } gcSizeChange += c if mode == storage.ModePutUploadPin { - err = db.setPin(batch, ch.Address()) + c, err = db.setPin(batch, item) if err != nil { return nil, err } } + gcSizeChange += c } case storage.ModePutSync: @@ -172,11 +167,11 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e // The batch can be written to the database. // Provided batch and binID map are updated. func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) { - has, err := db.retrievalDataIndex.Has(item) + exists, err = db.retrievalDataIndex.Has(item) if err != nil { return false, 0, err } - if has { + if exists { return true, 0, nil } @@ -185,16 +180,18 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she if err != nil { return false, 0, err } - - gcSizeChange, err = db.setGC(batch, item) + err = db.retrievalDataIndex.PutInBatch(batch, item) if err != nil { return false, 0, err } - err = db.retrievalDataIndex.PutInBatch(batch, item) + err = db.postage.putInBatch(batch, item) + if err != nil { + return false, 0, err + } + gcSizeChange, err = db.preserveOrCache(batch, item) if err != nil { return false, 0, err } - return false, gcSizeChange, nil } @@ -210,7 +207,6 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed if exists { return true, 0, nil } - item.StoreTimestamp = now() item.BinID, err = db.incBinID(binIDs, db.po(swarm.NewAddress(item.Address))) if err != nil { @@ -228,7 +224,10 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed if err != nil { return false, 0, err } - + err = db.postage.putInBatch(batch, item) + if err != nil { + return false, 0, err + } return false, 0, nil } @@ -244,7 +243,6 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I if exists { return true, 0, nil } - item.StoreTimestamp = now() item.BinID, err = db.incBinID(binIDs, db.po(swarm.NewAddress(item.Address))) if err != nil { @@ -258,21 +256,23 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I if err != nil { return false, 0, err } - gcSizeChange, err = db.setGC(batch, item) + err = db.postage.putInBatch(batch, item) + if err != nil { + return false, 0, err + } + gcSizeChange, err = db.preserveOrCache(batch, item) if err != nil { return false, 0, err } - return false, gcSizeChange, nil } -// setGC is a helper function used to add chunks to the retrieval access -// index and the gc index in the cases that the putToGCCheck condition -// warrants a gc set. this is to mitigate index leakage in edge cases where -// a chunk is added to a node's localstore and given that the chunk is -// already within that node's NN (thus, it can be added to the gc index -// safely) -func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, err error) { +// preserveOrCache is a helper function used to add chunks to either a pinned reserve or gc cache +// (the retrieval access index and the gc index) +func (db *DB) preserveOrCache(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, err error) { + if db.postage.withinRadius(item) { + return db.setPin(batch, item) + } if item.BinID == 0 { i, err := db.retrievalDataIndex.Get(item) if err != nil { diff --git a/pkg/localstore/mode_put_test.go b/pkg/localstore/mode_put_test.go index 1c76f31e939..0939be8e1bf 100644 --- a/pkg/localstore/mode_put_test.go +++ b/pkg/localstore/mode_put_test.go @@ -83,34 +83,6 @@ func TestModePutRequest(t *testing.T) { } } -// TestModePutRequestPin validates ModePutRequestPin index values on the provided DB. -func TestModePutRequestPin(t *testing.T) { - for _, tc := range multiChunkTestCases { - t.Run(tc.name, func(t *testing.T) { - db := newTestDB(t, nil) - - chunks := generateTestRandomChunks(tc.count) - - wantTimestamp := time.Now().UTC().UnixNano() - defer setNow(func() (t int64) { - return wantTimestamp - })() - - _, err := db.Put(context.Background(), storage.ModePutRequestPin, chunks...) - if err != nil { - t.Fatal(err) - } - - for _, ch := range chunks { - newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp)(t) - newPinIndexTest(db, ch, nil)(t) - } - - newItemsCountTest(db.gcIndex, tc.count)(t) - }) - } -} - // TestModePutSync validates ModePutSync index values on the provided DB. func TestModePutSync(t *testing.T) { for _, tc := range multiChunkTestCases { @@ -141,6 +113,8 @@ func TestModePutSync(t *testing.T) { newItemsCountTest(db.gcIndex, tc.count)(t) newIndexGCSizeTest(db)(t) } + newItemsCountTest(db.gcIndex, tc.count)(t) + newIndexGCSizeTest(db)(t) }) } } diff --git a/pkg/localstore/mode_set.go b/pkg/localstore/mode_set.go index c424b0f495f..f5d83989df7 100644 --- a/pkg/localstore/mode_set.go +++ b/pkg/localstore/mode_set.go @@ -21,11 +21,11 @@ import ( "errors" "time" - "github.com/syndtr/goleveldb/leveldb" - + "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/tags" + "github.com/syndtr/goleveldb/leveldb" ) // Set updates database indexes for @@ -44,8 +44,6 @@ func (db *DB) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr // set updates database indexes for // chunks represented by provided addresses. -// It acquires lockAddr to protect two calls -// of this function for the same address in parallel. func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) { // protect parallel updates db.batchMu.Lock() @@ -59,9 +57,10 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) { triggerPullFeed := make(map[uint8]struct{}) // signal pull feed subscriptions to iterate switch mode { + case storage.ModeSetSync: for _, addr := range addrs { - c, err := db.setSync(batch, addr, mode) + c, err := db.setSync(batch, addr) if err != nil { return err } @@ -70,7 +69,8 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) { case storage.ModeSetRemove: for _, addr := range addrs { - c, err := db.setRemove(batch, addr) + item := addressToItem(addr) + c, err := db.setRemove(batch, item, true) if err != nil { return err } @@ -79,26 +79,20 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) { case storage.ModeSetPin: for _, addr := range addrs { - has, err := db.retrievalDataIndex.Has(addressToItem(addr)) - if err != nil { - return err - } - - if !has { - return storage.ErrNotFound - } - - err = db.setPin(batch, addr) + item := addressToItem(addr) + c, err := db.setPin(batch, item) if err != nil { return err } + gcSizeChange += c } case storage.ModeSetUnpin: for _, addr := range addrs { - err := db.setUnpin(batch, addr) + c, err := db.setUnpin(batch, addr) if err != nil { return err } + gcSizeChange += c } default: return ErrInvalidMode @@ -124,7 +118,7 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) { // from push sync index // - update to gc index happens given item does not exist in pin index // Provided batch is updated. -func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.ModeSet) (gcSizeChange int64, err error) { +func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address) (gcSizeChange int64, err error) { item := addressToItem(addr) // need to get access timestamp here as it is not @@ -179,65 +173,32 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod return 0, err } - i, err = db.retrievalAccessIndex.Get(item) - switch { - case err == nil: - item.AccessTimestamp = i.AccessTimestamp - err = db.gcIndex.DeleteInBatch(batch, item) - if err != nil { + return db.preserveOrCache(batch, item) +} + +// setRemove removes the chunk by updating indexes: +// - delete from retrieve, pull, gc +// Provided batch is updated. +func (db *DB) setRemove(batch *leveldb.Batch, item shed.Item, check bool) (gcSizeChange int64, err error) { + if item.AccessTimestamp == 0 { + i, err := db.retrievalAccessIndex.Get(item) + switch { + case err == nil: + item.AccessTimestamp = i.AccessTimestamp + case errors.Is(err, leveldb.ErrNotFound): + default: return 0, err } - gcSizeChange-- - case errors.Is(err, leveldb.ErrNotFound): - // the chunk is not accessed before - default: - return 0, err - } - item.AccessTimestamp = now() - err = db.retrievalAccessIndex.PutInBatch(batch, item) - if err != nil { - return 0, err - } - - // Add in gcIndex only if this chunk is not pinned - ok, err := db.pinIndex.Has(item) - if err != nil { - return 0, err } - if !ok { - err = db.gcIndex.PutInBatch(batch, item) + if item.StoreTimestamp == 0 { + item, err = db.retrievalDataIndex.Get(item) if err != nil { return 0, err } - gcSizeChange++ } - return gcSizeChange, nil -} - -// setRemove removes the chunk by updating indexes: -// - delete from retrieve, pull, gc -// Provided batch is updated. -func (db *DB) setRemove(batch *leveldb.Batch, addr swarm.Address) (gcSizeChange int64, err error) { - item := addressToItem(addr) - - // need to get access timestamp here as it is not - // provided by the access function, and it is not - // a property of a chunk provided to Accessor.Put. - i, err := db.retrievalAccessIndex.Get(item) - switch { - case err == nil: - item.AccessTimestamp = i.AccessTimestamp - case errors.Is(err, leveldb.ErrNotFound): - default: - return 0, err - } - i, err = db.retrievalDataIndex.Get(item) - if err != nil { - return 0, err - } - item.StoreTimestamp = i.StoreTimestamp - item.BinID = i.BinID + db.metrics.GCStoreTimeStamps.Set(float64(item.StoreTimestamp)) + db.metrics.GCStoreAccessTimeStamps.Set(float64(item.AccessTimestamp)) err = db.retrievalDataIndex.DeleteInBatch(batch, item) if err != nil { @@ -251,81 +212,114 @@ func (db *DB) setRemove(batch *leveldb.Batch, addr swarm.Address) (gcSizeChange if err != nil { return 0, err } - err = db.gcIndex.DeleteInBatch(batch, item) + err = db.postage.deleteInBatch(batch, item) if err != nil { return 0, err } + // unless called by GC which iterates through the gcIndex // a check is needed for decrementing gcSize - // as delete is not reporting if the key/value pair - // is deleted or not - if _, err := db.gcIndex.Get(item); err == nil { - gcSizeChange = -1 + // as delete is not reporting if the key/value pair is deleted or not + if check { + _, err := db.gcIndex.Get(item) + if err != nil { + if !errors.Is(err, leveldb.ErrNotFound) { + return 0, err + } + return 0, db.pinIndex.DeleteInBatch(batch, item) + } } - - return gcSizeChange, nil + err = db.gcIndex.DeleteInBatch(batch, item) + if err != nil { + return 0, err + } + return -1, nil } // setPin increments pin counter for the chunk by updating // pin index and sets the chunk to be excluded from garbage collection. // Provided batch is updated. -func (db *DB) setPin(batch *leveldb.Batch, addr swarm.Address) (err error) { - item := addressToItem(addr) - +func (db *DB) setPin(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, err error) { // Get the existing pin counter of the chunk - existingPinCounter := uint64(0) - pinnedChunk, err := db.pinIndex.Get(item) + i, err := db.pinIndex.Get(item) + item.PinCounter = i.PinCounter if err != nil { - if errors.Is(err, leveldb.ErrNotFound) { - // If this Address is not present in DB, then its a new entry - existingPinCounter = 0 + if !errors.Is(err, leveldb.ErrNotFound) { + return 0, err + } + // if this Address is not pinned yet, then + i, err := db.retrievalAccessIndex.Get(item) + if err != nil { + if !errors.Is(err, leveldb.ErrNotFound) { + return 0, err + } + // not synced yet + } else { + item.AccessTimestamp = i.AccessTimestamp + i, err = db.retrievalDataIndex.Get(item) + if err != nil { + return 0, err + } + item.StoreTimestamp = i.StoreTimestamp + item.BinID = i.BinID - // Add in gcExcludeIndex of the chunk is not pinned already - err = db.gcExcludeIndex.PutInBatch(batch, item) + err = db.gcIndex.DeleteInBatch(batch, item) if err != nil { - return err + return 0, err } - } else { - return err + gcSizeChange = -1 } - } else { - existingPinCounter = pinnedChunk.PinCounter } // Otherwise increase the existing counter by 1 - item.PinCounter = existingPinCounter + 1 + item.PinCounter++ err = db.pinIndex.PutInBatch(batch, item) if err != nil { - return err + return 0, err } - - return nil + return gcSizeChange, nil } // setUnpin decrements pin counter for the chunk by updating pin index. // Provided batch is updated. -func (db *DB) setUnpin(batch *leveldb.Batch, addr swarm.Address) (err error) { +func (db *DB) setUnpin(batch *leveldb.Batch, addr swarm.Address) (gcSizeChange int64, err error) { item := addressToItem(addr) // Get the existing pin counter of the chunk - pinnedChunk, err := db.pinIndex.Get(item) + i, err := db.pinIndex.Get(item) if err != nil { - return err + return 0, err } - + item.PinCounter = i.PinCounter // Decrement the pin counter or // delete it from pin index if the pin counter has reached 0 - if pinnedChunk.PinCounter > 1 { - item.PinCounter = pinnedChunk.PinCounter - 1 - err = db.pinIndex.PutInBatch(batch, item) - if err != nil { - return err - } - } else { - err = db.pinIndex.DeleteInBatch(batch, item) - if err != nil { - return err - } + if item.PinCounter > 1 { + item.PinCounter-- + return 0, db.pinIndex.PutInBatch(batch, item) } - - return nil + err = db.pinIndex.DeleteInBatch(batch, item) + if err != nil { + return 0, err + } + i, err = db.retrievalDataIndex.Get(item) + if err != nil { + return 0, err + } + item.StoreTimestamp = i.StoreTimestamp + item.BinID = i.BinID + i, err = db.pushIndex.Get(item) + // if in pushindex, then not synced yet, dont put in gcIndex + if !errors.Is(err, leveldb.ErrNotFound) { + return 0, err + } + i, err = db.retrievalAccessIndex.Get(item) + if err != nil { + return 0, err + } + item.AccessTimestamp = i.AccessTimestamp + err = db.gcIndex.PutInBatch(batch, item) + if err != nil { + return 0, err + } + gcSizeChange++ + return gcSizeChange, nil } diff --git a/pkg/localstore/mode_set_test.go b/pkg/localstore/mode_set_test.go index 3f3aad25457..48ad2fc58f3 100644 --- a/pkg/localstore/mode_set_test.go +++ b/pkg/localstore/mode_set_test.go @@ -19,78 +19,12 @@ package localstore import ( "context" "errors" - "io/ioutil" "testing" - "github.com/ethersphere/bee/pkg/logging" - statestore "github.com/ethersphere/bee/pkg/statestore/mock" - - "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/storage" - "github.com/ethersphere/bee/pkg/tags" - tagtesting "github.com/ethersphere/bee/pkg/tags/testing" "github.com/syndtr/goleveldb/leveldb" ) -// here we try to set a normal tag (that should be handled by pushsync) -// as a result we should expect the tag value to remain in the pull index -// and we expect that the tag should not be incremented by pull sync set -func TestModeSetSyncNormalTag(t *testing.T) { - mockStatestore := statestore.NewStateStore() - logger := logging.New(ioutil.Discard, 0) - db := newTestDB(t, &Options{Tags: tags.NewTags(mockStatestore, logger)}) - - tag, err := db.tags.Create(1) - if err != nil { - t.Fatal(err) - } - - ch := generateTestRandomChunk().WithTagID(tag.Uid) - _, err = db.Put(context.Background(), storage.ModePutUpload, ch) - if err != nil { - t.Fatal(err) - } - - err = tag.Inc(tags.StateStored) // so we don't get an error on tag.Status later on - if err != nil { - t.Fatal(err) - } - - item, err := db.pullIndex.Get(shed.Item{ - Address: ch.Address().Bytes(), - BinID: 1, - }) - if err != nil { - t.Fatal(err) - } - - if item.Tag != tag.Uid { - t.Fatalf("unexpected tag id value got %d want %d", item.Tag, tag.Uid) - } - - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) - if err != nil { - t.Fatal(err) - } - - item, err = db.pullIndex.Get(shed.Item{ - Address: ch.Address().Bytes(), - BinID: 1, - }) - if err != nil { - t.Fatal(err) - } - - // expect the same tag Uid because when we set pull sync on a normal tag - // the tag Uid should remain untouched in pull index - if item.Tag != tag.Uid { - t.Fatalf("unexpected tag id value got %d want %d", item.Tag, tag.Uid) - } - - // 1 stored (because incremented manually in test), 1 sent, 1 synced, 1 total - tagtesting.CheckTag(t, tag, 0, 1, 0, 1, 1, 1) -} - // TestModeSetRemove validates ModeSetRemove index values on the provided DB. func TestModeSetRemove(t *testing.T) { for _, tc := range multiChunkTestCases { diff --git a/pkg/localstore/pin_test.go b/pkg/localstore/pin_test.go index 09c723ee7e9..d7722ad7628 100644 --- a/pkg/localstore/pin_test.go +++ b/pkg/localstore/pin_test.go @@ -26,12 +26,6 @@ func TestPinning(t *testing.T) { t.Fatal(err) } - // chunk must be present - _, err = db.Put(context.Background(), storage.ModePutUpload, chunks...) - if err != nil { - t.Fatal(err) - } - err = db.Set(context.Background(), storage.ModeSetPin, chunkAddresses(chunks)...) if err != nil { t.Fatal(err) @@ -57,49 +51,184 @@ func TestPinning(t *testing.T) { func TestPinCounter(t *testing.T) { chunk := generateTestRandomChunk() db := newTestDB(t, nil) + addr := chunk.Address() + ctx := context.Background() + _, err := db.Put(ctx, storage.ModePutUpload, chunk) + if err != nil { + t.Fatal(err) + } + var pinCounter uint64 + t.Run("+1 after first pin", func(t *testing.T) { + err := db.Set(ctx, storage.ModeSetPin, addr) + if err != nil { + t.Fatal(err) + } + pinCounter, err = db.PinCounter(addr) + if err != nil { + t.Fatal(err) + } + if pinCounter != 1 { + t.Fatalf("want pin counter %d but got %d", 1, pinCounter) + } + }) + t.Run("2 after second pin", func(t *testing.T) { + err = db.Set(ctx, storage.ModeSetPin, addr) + if err != nil { + t.Fatal(err) + } + pinCounter, err = db.PinCounter(addr) + if err != nil { + t.Fatal(err) + } + if pinCounter != 2 { + t.Fatalf("want pin counter %d but got %d", 2, pinCounter) + } + }) + t.Run("1 after first unpin", func(t *testing.T) { + err = db.Set(ctx, storage.ModeSetUnpin, addr) + if err != nil { + t.Fatal(err) + } + pinCounter, err = db.PinCounter(addr) + if err != nil { + t.Fatal(err) + } + if pinCounter != 1 { + t.Fatalf("want pin counter %d but got %d", 1, pinCounter) + } + }) + t.Run("not found after second unpin", func(t *testing.T) { + err = db.Set(ctx, storage.ModeSetUnpin, addr) + if err != nil { + t.Fatal(err) + } + _, err = db.PinCounter(addr) + if !errors.Is(err, storage.ErrNotFound) { + t.Fatal(err) + } + }) +} + +// Pin a file, upload chunks to go past the gc limit to trigger GC, +// check if the pinned files are still around and removed from gcIndex +func TestPinIndexes(t *testing.T) { + ctx := context.Background() - // chunk must be present - _, err := db.Put(context.Background(), storage.ModePutUpload, chunk) + db := newTestDB(t, &Options{ + Capacity: 150, + }) + + ch := generateTestRandomChunk() + addr := ch.Address() + _, err := db.Put(ctx, storage.ModePutUpload, ch) if err != nil { t.Fatal(err) } + runCountsTest(t, "putUpload", db, 1, 0, 1, 1, 0, 0) - // pin once - err = db.Set(context.Background(), storage.ModeSetPin, swarm.NewAddress(chunk.Address().Bytes())) + err = db.Set(ctx, storage.ModeSetSync, addr) + if err != nil { + t.Fatal(err) + } + runCountsTest(t, "setSync", db, 1, 1, 0, 1, 0, 1) + + err = db.Set(ctx, storage.ModeSetPin, addr) + if err != nil { + t.Fatal(err) + } + runCountsTest(t, "setPin", db, 1, 1, 0, 1, 1, 0) + + err = db.Set(ctx, storage.ModeSetPin, addr) if err != nil { t.Fatal(err) } - pinCounter, err := db.PinCounter(swarm.NewAddress(chunk.Address().Bytes())) + runCountsTest(t, "setPin 2", db, 1, 1, 0, 1, 1, 0) + + err = db.Set(ctx, storage.ModeSetUnpin, addr) if err != nil { t.Fatal(err) } - if pinCounter != 1 { - t.Fatalf("want pin counter %d but got %d", 1, pinCounter) + runCountsTest(t, "setUnPin", db, 1, 1, 0, 1, 1, 0) + + err = db.Set(ctx, storage.ModeSetUnpin, addr) + if err != nil { + t.Fatal(err) } + runCountsTest(t, "setUnPin 2", db, 1, 1, 0, 1, 0, 1) - // pin twice - err = db.Set(context.Background(), storage.ModeSetPin, swarm.NewAddress(chunk.Address().Bytes())) +} + +func TestPinIndexesSync(t *testing.T) { + ctx := context.Background() + + db := newTestDB(t, &Options{ + Capacity: 150, + }) + + ch := generateTestRandomChunk() + addr := ch.Address() + _, err := db.Put(ctx, storage.ModePutUpload, ch) if err != nil { t.Fatal(err) } - pinCounter, err = db.PinCounter(swarm.NewAddress(chunk.Address().Bytes())) + runCountsTest(t, "putUpload", db, 1, 0, 1, 1, 0, 0) + + err = db.Set(ctx, storage.ModeSetPin, addr) if err != nil { t.Fatal(err) } - if pinCounter != 2 { - t.Fatalf("want pin counter %d but got %d", 2, pinCounter) + runCountsTest(t, "setPin", db, 1, 0, 1, 1, 1, 0) + + err = db.Set(ctx, storage.ModeSetPin, addr) + if err != nil { + t.Fatal(err) } + runCountsTest(t, "setPin 2", db, 1, 0, 1, 1, 1, 0) - err = db.Set(context.Background(), storage.ModeSetUnpin, swarm.NewAddress(chunk.Address().Bytes())) + err = db.Set(ctx, storage.ModeSetUnpin, addr) if err != nil { t.Fatal(err) } - _, err = db.PinCounter(swarm.NewAddress(chunk.Address().Bytes())) + runCountsTest(t, "setUnPin", db, 1, 0, 1, 1, 1, 0) + + err = db.Set(ctx, storage.ModeSetUnpin, addr) if err != nil { - if !errors.Is(err, storage.ErrNotFound) { - t.Fatal(err) - } + t.Fatal(err) + } + runCountsTest(t, "setUnPin 2", db, 1, 0, 1, 1, 0, 0) + + err = db.Set(ctx, storage.ModeSetPin, addr) + if err != nil { + t.Fatal(err) + } + runCountsTest(t, "setPin 3", db, 1, 0, 1, 1, 1, 0) + + err = db.Set(ctx, storage.ModeSetSync, addr) + if err != nil { + t.Fatal(err) } + runCountsTest(t, "setSync", db, 1, 1, 0, 1, 1, 0) + + err = db.Set(ctx, storage.ModeSetUnpin, addr) + if err != nil { + t.Fatal(err) + } + runCountsTest(t, "setUnPin", db, 1, 1, 0, 1, 0, 1) + +} + +func runCountsTest(t *testing.T, name string, db *DB, r, a, push, pull, pin, gc int) { + t.Helper() + t.Run(name, func(t *testing.T) { + t.Helper() + t.Run("retrieval data Index count", newItemsCountTest(db.retrievalDataIndex, r)) + t.Run("retrieval access Index count", newItemsCountTest(db.retrievalAccessIndex, a)) + t.Run("push Index count", newItemsCountTest(db.pushIndex, push)) + t.Run("pull Index count", newItemsCountTest(db.pullIndex, pull)) + t.Run("pin Index count", newItemsCountTest(db.pinIndex, pin)) + t.Run("gc index count", newItemsCountTest(db.gcIndex, gc)) + t.Run("gc size", newIndexGCSizeTest(db)) + }) } func TestPaging(t *testing.T) { @@ -107,14 +236,8 @@ func TestPaging(t *testing.T) { addresses := chunksToSortedStrings(chunks) db := newTestDB(t, nil) - // chunk must be present - _, err := db.Put(context.Background(), storage.ModePutUpload, chunks...) - if err != nil { - t.Fatal(err) - } - // pin once - err = db.Set(context.Background(), storage.ModeSetPin, chunkAddresses(chunks)...) + err := db.Set(context.Background(), storage.ModeSetPin, chunkAddresses(chunks)...) if err != nil { t.Fatal(err) } diff --git a/pkg/localstore/postage.go b/pkg/localstore/postage.go new file mode 100644 index 00000000000..0692452469c --- /dev/null +++ b/pkg/localstore/postage.go @@ -0,0 +1,202 @@ +// Copyright 2020 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package localstore + +import ( + "errors" + + "github.com/ethersphere/bee/pkg/shed" + "github.com/ethersphere/bee/pkg/swarm" + "github.com/syndtr/goleveldb/leveldb" +) + +var ( + // ErrBatchOverissued is returned if number of chunks found in neighbourhood extrapolates to overissued stamp + // count(batch, po) > 1<< (depth(batch) - po) + ErrBatchOverissued = errors.New("postage batch overissued") +) + +type postageBatches struct { + // postage batch to chunks index + chunks shed.Index + counts shed.Index + po func(itemAddr []byte) (bin int) + db *DB +} + +func newPostageBatches(db *DB) (*postageBatches, error) { + // po applied to the item address returns the proximity order (as int) + // of the chunk relative to the node base address + // return value is max swarm.MaxPO + pof := func(addr []byte) int { + po := db.po(swarm.NewAddress(addr)) + if po > swarm.MaxPO { + po = swarm.MaxPO + } + return int(po) + } + + chunksIndex, err := db.shed.NewIndex("BatchID|PO|Hash->nil", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + key = make([]byte, 65) + copy(key[:32], fields.BatchID) + key[32] = uint8(pof(fields.Address)) + copy(key[33:], fields.Address) + return key, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.BatchID = key[:32] + e.Address = key[33:65] + return e, nil + }, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + return nil, nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + return e, nil + }, + }) + if err != nil { + return nil, err + } + + countsIndex, err := db.shed.NewIndex("BatchID->reserveRadius|counts", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + return fields.BatchID, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.BatchID = key[:32] + return e, nil + }, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + return append([]byte{fields.Radius}, fields.Counts.Counts...), nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + e.Radius = value[0] + e.Counts = &shed.Counts{Counts: value[1:]} + return e, nil + }, + }) + if err != nil { + return nil, err + } + + return &postageBatches{ + chunks: chunksIndex, + counts: countsIndex, + po: pof, + db: db, + }, nil +} + +func (p *postageBatches) decInBatch(batch *leveldb.Batch, e shed.Item) (bool, error) { + item, err := p.counts.Get(e) + if err != nil { + return false, err + } + for i := 0; i < p.po(item.Address); i++ { + count := item.Counts.Dec(i) + if count == 0 { // if 0 then all subsequent counts are 0 too + if i == 0 { // if all counts 0 the entire batch entry can be deleted + return true, nil + } + break + } + } + return false, p.counts.PutInBatch(batch, item) +} + +func (p *postageBatches) incInBatch(batch *leveldb.Batch, e shed.Item) error { + item, err := p.counts.Get(e) + if err != nil { + // initialise counts + if !errors.Is(err, leveldb.ErrNotFound) { + return err + } + e.Counts = &shed.Counts{Counts: make([]byte, swarm.MaxPO*4+4)} + item = e + } + + depth := int(e.Depth) + po := p.po(item.Address) + // increment counts + for i := 0; i <= po; i++ { + count := item.Counts.Inc(i) + // counts track batch number of stamps in the batch for neighbourhoods of all depths + // if neighbourhood_depth > batch_depth then the batch itself is invalid + if order := depth - i; order >= 0 { + if count > 1<po, then do nothing +// unpinning will result in all chunks with pincounter 0 to be put in the gc index +// so if a chunk was only pinned by the reserve, unreserving it will make it gc-able +func (db *DB) UnreserveBatch(id []byte, radius uint8) error { + db.batchMu.Lock() + defer db.batchMu.Unlock() + + batch := new(leveldb.Batch) + var gcSizeChange int64 // number to add or subtract from gcSize + unpin := func(item shed.Item) (stop bool, err error) { + c, err := db.setUnpin(batch, swarm.NewAddress(item.Address)) + gcSizeChange += c + return false, err + } + bi := shed.Item{BatchID: id} + item, err := db.postage.counts.Get(bi) + if err != nil { + return err + } + // iterate over chunk in bins + for bin := item.Radius; bin < radius; bin++ { + err := db.postage.chunks.Iterate(unpin, &shed.IterateOptions{Prefix: append(id, bin)}) + if err != nil { + return err + } + } + // adjust gcSize + if err := db.incGCSizeInBatch(batch, gcSizeChange); err != nil { + return err + } + item.Radius = radius + if err = db.postage.counts.PutInBatch(batch, item); err != nil { + return err + } + return db.shed.WriteBatch(batch) +} + +func (p *postageBatches) withinRadius(item shed.Item) bool { + return false +} diff --git a/pkg/localstore/postage_test.go b/pkg/localstore/postage_test.go new file mode 100644 index 00000000000..87272897695 --- /dev/null +++ b/pkg/localstore/postage_test.go @@ -0,0 +1,127 @@ +package localstore + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "testing" + + "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/swarm" +) + +var postageTestCases = []struct { + mode storage.ModePut + count int + batches int + depth uint8 +}{ + {storage.ModePutRequest, 1, 1, 8}, + {storage.ModePutRequest, 16, 2, 8}, + {storage.ModePutSync, 1, 1, 8}, + {storage.ModePutSync, 16, 2, 8}, + {storage.ModePutUpload, 1, 1, 8}, + {storage.ModePutUpload, 16, 2, 8}, + {storage.ModePutUpload, 16, 1, 8}, + {storage.ModePutUpload, 16, 1, 8}, +} + +func TestPutPostage(t *testing.T) { + for _, tc := range postageTestCases { + t.Run(fmt.Sprintf("mode:%v,count=%v,batches=%v,depth:%v", tc.mode, tc.count, tc.batches, tc.depth), func(t *testing.T) { + db := newTestDB(t, nil) + chunks := generateTestRandomChunks(tc.count) + for i := tc.batches; i < tc.count; i++ { + chunks[i].WithStamp(chunks[i-1].Stamp()).WithBatch(0, tc.depth) + } + t.Run("first put", func(t *testing.T) { + for _, ch := range chunks { + _, err := db.Put(context.Background(), tc.mode, ch) + if err != nil { + t.Fatal(err) + } + } + newItemsCountTest(db.retrievalDataIndex, tc.count)(t) + newItemsCountTest(db.postage.chunks, tc.count)(t) + newItemsCountTest(db.postage.counts, tc.batches)(t) + }) + + t.Run("second put", func(t *testing.T) { + for _, ch := range chunks { + exists, err := db.Put(context.Background(), tc.mode, ch) + if err != nil { + t.Fatal(err) + } + if !exists[0] { + t.Fatalf("expected chunk to exists in db") + } + } + newItemsCountTest(db.retrievalDataIndex, tc.count)(t) + newItemsCountTest(db.postage.chunks, tc.count)(t) + newItemsCountTest(db.postage.counts, tc.batches)(t) + }) + }) + } +} + +func TestPutPostageOverissue(t *testing.T) { + for _, mode := range []storage.ModePut{storage.ModePutRequest, storage.ModePutSync, storage.ModePutUpload} { + for depth := 0; depth < int(swarm.MaxPO); depth++ { + for oi := 0; oi <= depth; oi++ { + t.Run(fmt.Sprintf("mode: %v,overissued depth: %v,batch depth:%v", mode, oi, depth), func(t *testing.T) { + db := newTestDB(t, nil) + // generate chunk with address matching oi bits of the overlay + chunk := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), oi).WithBatch(0, uint8(depth)) + // extract postage stamp which will be reused + stamp := chunk.Stamp() + // + head := binary.BigEndian.Uint32(chunk.Address().Bytes()[:4]) + order := depth - oi + max := 1 << order + seed := (head << oi) >> (32 - order) + rem := (head << oi) >> oi + prefix := head - rem + chunks := make([]swarm.Chunk, max) + for i := 0; i < max; i++ { + _, err := db.Put(context.Background(), mode, chunk) + if err != nil { + t.Fatal(err) + } + chunks[i] = chunk + chunk = generateTestRandomChunk() + addr := chunk.Address().Bytes() + balanced := ((seed + uint32(i+1)) % uint32(max)) << (32 - depth) + random := (binary.BigEndian.Uint32(addr[:4]) << depth) >> depth + head = prefix + balanced + random + binary.BigEndian.PutUint32(addr[:4], head) + chunk = swarm.NewChunk(swarm.NewAddress(addr), chunk.Data()).WithStamp(stamp).WithBatch(0, uint8(depth)) + } + + _, err := db.Put(context.Background(), mode, chunk) + if !errors.Is(err, ErrBatchOverissued) { + t.Fatalf("expected error %v, got %v", ErrBatchOverissued, err) + } + newItemsCountTest(db.retrievalDataIndex, max)(t) + newItemsCountTest(db.postage.chunks, max)(t) + newItemsCountTest(db.postage.counts, 1)(t) + + t.Run("second put", func(t *testing.T) { + for _, ch := range chunks { + exists, err := db.Put(context.Background(), mode, ch) + if err != nil { + t.Fatal(err) + } + if !exists[0] { + t.Fatalf("expected chunk to exists in db") + } + } + newItemsCountTest(db.retrievalDataIndex, max)(t) + newItemsCountTest(db.postage.chunks, max)(t) + newItemsCountTest(db.postage.counts, 1)(t) + }) + }) + } + } + } +} diff --git a/pkg/node/node.go b/pkg/node/node.go index cbf989324b4..30fd066e143 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -258,10 +258,8 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, return nil, fmt.Errorf("localstore: %w", err) } b.localstoreCloser = storer - // register localstore unreserve function on the batchstore before batch service starts listening to blockchain events - batchStore, err := batchstore.New(stateStore, nil) - // batchStore, err := batchstore.New(stateStore, storer.UnreserveBatch) + batchStore, err := batchstore.New(stateStore, storer.UnreserveBatch) if err != nil { return nil, fmt.Errorf("batchstore: %w", err) } diff --git a/pkg/storage/testing/chunk.go b/pkg/storage/testing/chunk.go index a9db1a8def5..11a97a5edbc 100644 --- a/pkg/storage/testing/chunk.go +++ b/pkg/storage/testing/chunk.go @@ -24,6 +24,7 @@ import ( "github.com/ethersphere/bee/pkg/bmtpool" postagetesting "github.com/ethersphere/bee/pkg/postage/testing" "github.com/ethersphere/bee/pkg/swarm" + swarmtesting "github.com/ethersphere/bee/pkg/swarm/test" ) var mockStamp swarm.Stamp @@ -103,6 +104,15 @@ func GenerateTestRandomChunks(count int) []swarm.Chunk { return chunks } +// GenerateTestRandomChunkAt generates an invalid (!) chunk with address of proximity order po wrt target. +func GenerateTestRandomChunkAt(target swarm.Address, po int) swarm.Chunk { + data := make([]byte, swarm.ChunkSize) + _, _ = rand.Read(data) + addr := swarmtesting.RandomAddressAt(target, po) + stamp := postagetesting.MustNewStamp() + return swarm.NewChunk(addr, data).WithStamp(stamp) +} + // FixtureChunk gets a pregenerated content-addressed chunk and // panics if one is not found. func FixtureChunk(prefix string) swarm.Chunk {