Skip to content

Commit

Permalink
localstore, shed, etc: postage localstore related changes
Browse files Browse the repository at this point in the history
  • Loading branch information
zelig committed Dec 8, 2020
1 parent 97b2a15 commit 29c4a73
Show file tree
Hide file tree
Showing 21 changed files with 569 additions and 104 deletions.
22 changes: 19 additions & 3 deletions pkg/localstore/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"io/ioutil"
"sync"

"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
Expand Down Expand Up @@ -61,12 +62,18 @@ func (db *DB) Export(w io.Writer) (count int64, err error) {
hdr := &tar.Header{
Name: hex.EncodeToString(item.Address),
Mode: 0644,
Size: int64(len(item.Data)),
Size: int64(postage.StampSize + len(item.Data)),
}

if err := tw.WriteHeader(hdr); err != nil {
return false, err
}
if _, err := tw.Write(item.BatchID); err != nil {
return false, err
}
if _, err := tw.Write(item.Sig); err != nil {
return false, err
}
if _, err := tw.Write(item.Data); err != nil {
return false, err
}
Expand Down Expand Up @@ -135,19 +142,28 @@ func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) {
continue
}

data, err := ioutil.ReadAll(tr)
rawdata, err := ioutil.ReadAll(tr)
if err != nil {
select {
case errC <- err:
case <-ctx.Done():
}
}
stamp := postage.NewStamp(nil, nil)
err = stamp.UnmarshalBinary(rawdata[:postage.StampSize])
if err != nil {
select {
case errC <- err:
case <-ctx.Done():
}
}
data := rawdata[postage.StampSize:]
key := swarm.NewAddress(keybytes)

var ch swarm.Chunk
switch version {
case currentExportVersion:
ch = swarm.NewChunk(key, data)
ch = swarm.NewChunk(key, data).WithStamp(stamp)
default:
select {
case errC <- fmt.Errorf("unsupported export data version %q", version):
Expand Down
14 changes: 11 additions & 3 deletions pkg/localstore/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ func TestExportImport(t *testing.T) {
if err != nil {
t.Fatal(err)
}
chunks[ch.Address().String()] = ch.Data()
stamp, err := ch.Stamp().MarshalBinary()
if err != nil {
t.Fatal(err)
}
chunks[ch.Address().String()] = append(stamp, ch.Data()...)
}

var buf bytes.Buffer
Expand Down Expand Up @@ -71,9 +75,13 @@ func TestExportImport(t *testing.T) {
if err != nil {
t.Fatal(err)
}
got := ch.Data()
stamp, err := ch.Stamp().MarshalBinary()
if err != nil {
t.Fatal(err)
}
got := append(stamp, ch.Data()...)
if !bytes.Equal(got, want) {
t.Fatalf("chunk %s: got data %x, want %x", addr, got, want)
t.Fatalf("chunk %s: got stamp+data %x, want %x", addr, got, want)
}
}
}
11 changes: 10 additions & 1 deletion pkg/localstore/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,15 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
db.metrics.GCSize.Inc()

done = true
err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
err = db.gcIndex.Iterate(func(e shed.Item) (stop bool, err error) {
if gcSize-collectedCount <= target {
return true, nil
}

item, err := db.retrievalDataIndex.Get(e)
if err != nil {
return true, nil
}
db.metrics.GCStoreTimeStamps.Set(float64(item.StoreTimestamp))
db.metrics.GCStoreAccessTimeStamps.Set(float64(item.AccessTimestamp))

Expand All @@ -132,6 +136,11 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
if err != nil {
return true, nil
}
err = db.postage.deleteInBatch(batch, item)
if err != nil {
fmt.Printf("postage-delete: %v, %#v\n", err, item)
return true, nil
}
collectedCount++
if collectedCount >= gcBatchSize {
// bach size limit reached,
Expand Down
59 changes: 24 additions & 35 deletions pkg/localstore/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
mockbatchstore "github.com/ethersphere/bee/pkg/storage/mock/batchstore"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)
Expand Down Expand Up @@ -70,60 +71,47 @@ func testDBCollectGarbageWorker(t *testing.T) {
})
closed = db.close

addrs := make([]swarm.Address, 0)

// upload random chunks
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()

_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
chunks := generateTestRandomChunks(chunkCount)
_, err := db.Put(context.Background(), storage.ModePutSync, chunks...)
if err != nil {
t.Fatal(err)
}

err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
target := db.gcTarget()
sizef := func() uint64 {
size, err := db.gcSize.Get()
if err != nil {
t.Fatal(err)
}

addrs = append(addrs, ch.Address())

return size
}

gcTarget := db.gcTarget()

for {
size := sizef()
for target < size {
t.Logf("GC index: size=%d, target=%d", size, target)
select {
case <-testHookCollectGarbageChan:
case <-time.After(10 * time.Second):
t.Error("collect garbage timeout")
}
gcSize, err := db.gcSize.Get()
if err != nil {
t.Fatal(err)
}
if gcSize == gcTarget {
break
t.Fatalf("collect garbage timeout. ")
}
size = sizef()
}
t.Run("pull index count", newItemsCountTest(db.pullIndex, int(target)))

t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))

t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget)))
t.Run("gc index count", newItemsCountTest(db.gcIndex, int(target)))

t.Run("gc size", newIndexGCSizeTest(db))

// the first synced chunk should be removed
t.Run("get the first synced chunk", func(t *testing.T) {
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[0])
_, err := db.Get(context.Background(), storage.ModeGetRequest, chunks[0].Address())
if !errors.Is(err, storage.ErrNotFound) {
t.Errorf("got error %v, want %v", err, storage.ErrNotFound)
}
})

t.Run("only first inserted chunks should be removed", func(t *testing.T) {
for i := 0; i < (chunkCount - int(gcTarget)); i++ {
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[i])
for i := 0; i < (chunkCount - int(target)); i++ {
_, err := db.Get(context.Background(), storage.ModeGetRequest, chunks[i].Address())
if !errors.Is(err, storage.ErrNotFound) {
t.Errorf("got error %v, want %v", err, storage.ErrNotFound)
}
Expand All @@ -132,7 +120,7 @@ func testDBCollectGarbageWorker(t *testing.T) {

// last synced chunk should not be removed
t.Run("get most recent synced chunk", func(t *testing.T) {
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[len(addrs)-1])
_, err := db.Get(context.Background(), storage.ModeGetRequest, chunks[len(chunks)-1].Address())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -438,8 +426,9 @@ func TestDB_gcSize(t *testing.T) {
if _, err := rand.Read(baseKey); err != nil {
t.Fatal(err)
}
batchStore := mockbatchstore.New()
logger := logging.New(ioutil.Discard, 0)
db, err := New(dir, baseKey, nil, logger)
db, err := New(dir, baseKey, nil, batchStore, logger)
if err != nil {
t.Fatal(err)
}
Expand All @@ -463,7 +452,7 @@ func TestDB_gcSize(t *testing.T) {
if err := db.Close(); err != nil {
t.Fatal(err)
}
db, err = New(dir, baseKey, nil, logger)
db, err = New(dir, baseKey, nil, batchStore, logger)
if err != nil {
t.Fatal(err)
}
Expand Down
58 changes: 43 additions & 15 deletions pkg/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"time"

"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/postage"
mockstamp "github.com/ethersphere/bee/pkg/postage/testing"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
Expand Down Expand Up @@ -104,6 +106,8 @@ type DB struct {
// are done before closing the database
updateGCWG sync.WaitGroup

// postage batch
postage *postageBatches
// baseKey is the overlay address
baseKey []byte

Expand Down Expand Up @@ -141,7 +145,7 @@ type Options struct {
// New returns a new DB. All fields and indexes are initialized
// and possible conflicts with schema from existing database is checked.
// One goroutine for writing batches is created.
func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB, err error) {
func New(path string, baseKey []byte, o *Options, batchStore BatchStore, logger logging.Logger) (db *DB, err error) {
if o == nil {
// default options
o = &Options{
Expand All @@ -158,10 +162,10 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
// to signal another event if it
// is triggered during already running function
collectGarbageTrigger: make(chan struct{}, 1),
close: make(chan struct{}),
collectGarbageWorkerDone: make(chan struct{}),
metrics: newMetrics(),
logger: logger,
close: make(chan struct{}),
metrics: newMetrics(),
logger: logger,
}
if db.capacity == 0 {
db.capacity = defaultCapacity
Expand Down Expand Up @@ -214,7 +218,8 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
}

// Index storing actual chunk address, data and bin id.
db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|Data", shed.IndexFuncs{
headerSize := 16 + postage.StampSize
db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|Sig|Data", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
Expand All @@ -223,16 +228,27 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
b := make([]byte, 16)
b := make([]byte, headerSize)
binary.BigEndian.PutUint64(b[:8], fields.BinID)
binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
stamp, err := postage.NewStamp(fields.BatchID, fields.Sig).MarshalBinary()
if err != nil {
return nil, err
}
copy(b[16:], stamp)
value = append(b, fields.Data...)
return value, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
e.BinID = binary.BigEndian.Uint64(value[:8])
e.Data = value[16:]
stamp := postage.NewStamp(nil, nil)
if err = stamp.UnmarshalBinary(value[16:headerSize]); err != nil {
return e, err
}
e.BatchID = stamp.BatchID()
e.Sig = stamp.Sig()
e.Data = value[headerSize:]
return e, nil
},
})
Expand Down Expand Up @@ -265,7 +281,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
// pull index allows history and live syncing per po bin
db.pullIndex, err = db.shed.NewIndex("PO|BinID->Hash|Tag", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 41)
key = make([]byte, 9)
key[0] = db.po(swarm.NewAddress(fields.Address))
binary.BigEndian.PutUint64(key[1:9], fields.BinID)
return key, nil
Expand Down Expand Up @@ -401,6 +417,10 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
return nil, err
}

db.postage, err = db.newPostageBatches(batchStore)
if err != nil {
return nil, err
}
// start garbage collection worker
go db.collectGarbageWorker()
return db, nil
Expand Down Expand Up @@ -446,13 +466,15 @@ func (db *DB) po(addr swarm.Address) (bin uint8) {
func (db *DB) DebugIndices() (indexInfo map[string]int, err error) {
indexInfo = make(map[string]int)
for k, v := range map[string]shed.Index{
"retrievalDataIndex": db.retrievalDataIndex,
"retrievalAccessIndex": db.retrievalAccessIndex,
"pushIndex": db.pushIndex,
"pullIndex": db.pullIndex,
"gcIndex": db.gcIndex,
"gcExcludeIndex": db.gcExcludeIndex,
"pinIndex": db.pinIndex,
"retrievalDataIndex": db.retrievalDataIndex,
"retrievalAccessIndex": db.retrievalAccessIndex,
"pushIndex": db.pushIndex,
"pullIndex": db.pullIndex,
"gcIndex": db.gcIndex,
"gcExcludeIndex": db.gcExcludeIndex,
"pinIndex": db.pinIndex,
"postageBatchChunksIndex": db.postage.chunks,
"postageBatchCountsIndex": db.postage.counts,
} {
indexSize, err := v.Count()
if err != nil {
Expand All @@ -471,10 +493,16 @@ func (db *DB) DebugIndices() (indexInfo map[string]int, err error) {

// chunkToItem creates new Item with data provided by the Chunk.
func chunkToItem(ch swarm.Chunk) shed.Item {
if ch.Stamp() == nil {
ch = ch.WithStamp(mockstamp.NewStamp())
}
return shed.Item{
Address: ch.Address().Bytes(),
Data: ch.Data(),
Tag: ch.TagID(),
// PinCounter: ch.PinCounter(),
BatchID: ch.Stamp().BatchID(),
Sig: ch.Stamp().Sig(),
}
}

Expand Down
Loading

0 comments on commit 29c4a73

Please sign in to comment.