Skip to content

Commit

Permalink
Merge pull request #306 from ethstorage/save_cache
Browse files Browse the repository at this point in the history
Disk-based cache for the downloader
  • Loading branch information
syntrust committed Aug 1, 2024
2 parents 8362514 + 808d9af commit 264bf03
Show file tree
Hide file tree
Showing 8 changed files with 433 additions and 41 deletions.
12 changes: 4 additions & 8 deletions ethstorage/blobs/blob_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

type BlobCacheReader interface {
GetKeyValueByIndex(index uint64, hash common.Hash) []byte
GetKeyValueByIndexUnchecked(index uint64) []byte
GetSampleData(kvIndex, sampleIndexInKv uint64) []byte
}

// BlobReader provides unified interface for the miner to read blobs and samples
Expand Down Expand Up @@ -53,13 +53,9 @@ func (n *BlobReader) GetBlob(kvIdx uint64, kvHash common.Hash) ([]byte, error) {
func (n *BlobReader) ReadSample(shardIdx, sampleIdx uint64) (common.Hash, error) {
sampleLenBits := n.sm.MaxKvSizeBits() - es.SampleSizeBits
kvIdx := sampleIdx >> sampleLenBits
// get blob without checking commit since kvHash is not available
if blob := n.cr.GetKeyValueByIndexUnchecked(kvIdx); blob != nil {
n.lg.Debug("Loaded blob from downloader cache", "kvIdx", kvIdx)
sampleIdxInKv := sampleIdx % (1 << sampleLenBits)
sampleSize := uint64(1 << es.SampleSizeBits)
sampleIdxByte := sampleIdxInKv << es.SampleSizeBits
sample := blob[sampleIdxByte : sampleIdxByte+sampleSize]
sampleIdxInKv := sampleIdx % (1 << sampleLenBits)

if sample := n.cr.GetSampleData(kvIdx, sampleIdxInKv); sample != nil {
return common.BytesToHash(sample), nil
}

Expand Down
176 changes: 168 additions & 8 deletions ethstorage/downloader/blob_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,97 @@ import (
"bytes"
"fmt"
"math/big"
"math/rand"
"os"
"path/filepath"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethstorage/go-ethstorage/ethstorage"
"github.com/ethstorage/go-ethstorage/ethstorage/log"
"github.com/protolambda/go-kzg/eth"
)

var (
bc BlobCache
cache BlobCache
kvHashes []common.Hash
datadir string
fileName = "test_shard_0.dat"
blobData = "blob data of kvIndex %d"
sampleLen = blobSize / sampleSize
minerAddr = common.BigToAddress(common.Big1)
kvSize uint64 = 1 << 17
kvEntries uint64 = 16
shardID = uint64(0)
)

func init() {
bc = NewBlobMemCache()
func TestDiskBlobCache(t *testing.T) {
setup(t)
t.Cleanup(func() {
teardown(t)
})

block, err := newBlockBlobs(10, 4)
if err != nil {
t.Fatalf("Failed to create new block blobs: %v", err)
}

err = cache.SetBlockBlobs(block)
if err != nil {
t.Fatalf("Failed to set block blobs: %v", err)
}

blobs := cache.Blobs(block.number)
if len(blobs) != len(block.blobs) {
t.Fatalf("Unexpected number of blobs: got %d, want %d", len(blobs), len(block.blobs))
}

for i, blob := range block.blobs {
blobData := cache.GetKeyValueByIndex(uint64(i), blob.hash)
if !bytes.Equal(blobData, blob.data) {
t.Fatalf("Unexpected blob data at index %d: got %x, want %x", i, blobData, blob.data)
}
}

cache.Cleanup(5)
blobsAfterCleanup := cache.Blobs(block.number)
if len(blobsAfterCleanup) != len(block.blobs) {
t.Fatalf("Unexpected number of blobs after cleanup: got %d, want %d", len(blobsAfterCleanup), len(block.blobs))
}

block, err = newBlockBlobs(20, 6)
if err != nil {
t.Fatalf("Failed to create new block blobs: %v", err)
}

err = cache.SetBlockBlobs(block)
if err != nil {
t.Fatalf("Failed to set block blobs: %v", err)
}

cache.Cleanup(15)
blobsAfterCleanup = cache.Blobs(block.number)
if len(blobsAfterCleanup) != len(block.blobs) {
t.Fatalf("Unexpected number of blobs after cleanup: got %d, want %d", len(blobsAfterCleanup), len(block.blobs))
}
}

func TestBlobCache_Encoding(t *testing.T) {
func TestEncoding(t *testing.T) {
setup(t)
t.Cleanup(func() {
teardown(t)
})

blockBlobsParams := []struct {
blockNum uint64
blobLen uint64
}{
{0, 1},
{1000, 4},
{1, 5},
{222, 6},
{1000, 4},
{12345, 2},
{2000000, 3},
}
Expand All @@ -66,14 +124,16 @@ func TestBlobCache_Encoding(t *testing.T) {
for i, b := range bb.blobs {
bb.blobs[i].data = sm.EncodeBlob(b.data, b.hash, b.kvIndex.Uint64(), kvSize)
}
bc.SetBlockBlobs(bb)
if err := cache.SetBlockBlobs(bb); err != nil {
t.Fatalf("failed to set block blobs: %v", err)
}
}

// load from cache and verify
for i, kvHash := range kvHashes {
kvIndex := uint64(i)
t.Run(fmt.Sprintf("test kv: %d", i), func(t *testing.T) {
blobEncoded := bc.GetKeyValueByIndexUnchecked(kvIndex)
blobEncoded := cache.GetKeyValueByIndex(kvIndex, kvHash)
blobDecoded := sm.DecodeBlob(blobEncoded, kvHash, kvIndex, kvSize)
bytesWant := []byte(fmt.Sprintf(blobData, kvIndex))
if !bytes.Equal(blobDecoded[:len(bytesWant)], bytesWant) {
Expand All @@ -83,10 +143,83 @@ func TestBlobCache_Encoding(t *testing.T) {
}
}

func TestBlobDiskCache_GetSampleData(t *testing.T) {
setup(t)
t.Cleanup(func() {
teardown(t)
})

const blockStart = 10000000
rand.New(rand.NewSource(time.Now().UnixNano()))
kvIndex2BlockNumber := map[uint64]uint64{}
kvIndex2BlobIndex := map[uint64]uint64{}

newBlockBlobsFilled := func(blockNumber, blobLen uint64) (*blockBlobs, error) {
block := &blockBlobs{
number: blockNumber,
blobs: make([]*blob, blobLen),
}
for i := uint64(0); i < blobLen; i++ {
kvIndex := uint64(len(kvHashes))
blob := &blob{
kvIndex: new(big.Int).SetUint64(kvIndex),
data: fill(blockNumber, i),
}
kzgBlob := kzg4844.Blob{}
copy(kzgBlob[:], blob.data)
commitment, err := kzg4844.BlobToCommitment(kzgBlob)
if err != nil {
return nil, fmt.Errorf(
"failed to create commitment for blob %d: %w", kvIndex, err)
}
blob.hash = common.Hash(eth.KZGToVersionedHash(eth.KZGCommitment(commitment)))
block.blobs[i] = blob
kvHashes = append(kvHashes, blob.hash)
kvIndex2BlockNumber[kvIndex] = blockNumber
kvIndex2BlobIndex[kvIndex] = i
}
t.Log("Block created", "number", block.number, "blobs", blobLen)
return block, nil
}
for i := 0; i < 10; i++ {
blockn, blobn := blockStart+i, rand.Intn(6)+1
block, err := newBlockBlobsFilled(uint64(blockn), uint64(blobn))
if err != nil {
t.Fatalf("Failed to create new block blobs: %v", err)
}
if err := cache.SetBlockBlobs(block); err != nil {
t.Fatalf("Failed to set block blobs: %v", err)
}
}

for kvi := range kvHashes {
kvIndex := uint64(kvi)
sampleIndex := rand.Intn(int(sampleLen))
sample := cache.GetSampleData(kvIndex, uint64(sampleIndex))
sampleWant := make([]byte, sampleSize)
copy(sampleWant, fmt.Sprintf("%d_%d_%d", kvIndex2BlockNumber[kvIndex], kvIndex2BlobIndex[kvIndex], sampleIndex))
t.Run(fmt.Sprintf("test sample: kvIndex=%d, sampleIndex=%d", kvIndex, sampleIndex), func(t *testing.T) {
if !bytes.Equal(sample, sampleWant) {
t.Errorf("GetSampleData got %x, want %x", sample, sampleWant)
}
})
}

}

func fill(blockNumber, blobIndex uint64) []byte {
var content []byte
for i := uint64(0); i < sampleLen; i++ {
sample := make([]byte, sampleSize)
copy(sample, fmt.Sprintf("%d_%d_%d", blockNumber, blobIndex, i))
content = append(content, sample...)
}
return content
}

func newBlockBlobs(blockNumber, blobLen uint64) (*blockBlobs, error) {
block := &blockBlobs{
number: blockNumber,
hash: common.BigToHash(new(big.Int).SetUint64(blockNumber)),
blobs: make([]*blob, blobLen),
}
for i := uint64(0); i < blobLen; i++ {
Expand All @@ -109,3 +242,30 @@ func newBlockBlobs(blockNumber, blobLen uint64) (*blockBlobs, error) {
}
return block, nil
}

func setup(t *testing.T) {
// cache = NewBlobMemCache()
tmpDir := t.TempDir()
datadir = filepath.Join(tmpDir, "datadir")
err := os.MkdirAll(datadir, 0700)
if err != nil {
t.Fatalf("Failed to create datadir: %v", err)
}
t.Logf("datadir %s", datadir)
cache = NewBlobDiskCache(datadir, log.NewLogger(log.CLIConfig{
Level: "warn",
Format: "text",
}))
}

func teardown(t *testing.T) {
err := cache.Close()
if err != nil {
t.Errorf("Failed to close BlobCache: %v", err)
}
err = os.RemoveAll(datadir)
if err != nil {
t.Errorf("Failed to remove datadir: %v", err)
}
kvHashes = nil
}
Loading

0 comments on commit 264bf03

Please sign in to comment.