Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disk-based cache for the downloader #306

Merged
merged 34 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
784bfea
use billy
syntrust Jul 5, 2024
dcedfa7
tests
syntrust Jul 8, 2024
9db7d29
merge
syntrust Jul 11, 2024
b9ebdfd
fix test
syntrust Jul 12, 2024
04c03c1
debug
syntrust Jul 12, 2024
8014651
fix test
syntrust Jul 12, 2024
e4602ef
refactor
syntrust Jul 12, 2024
fd7dcd3
fix test
syntrust Jul 12, 2024
9e5c751
test perf
syntrust Jul 12, 2024
a1370f7
minor
syntrust Jul 15, 2024
180b70e
add lock to store
syntrust Jul 16, 2024
094d3a6
minor
syntrust Jul 16, 2024
7321930
minor
syntrust Jul 16, 2024
e26e889
fix slotter
syntrust Jul 17, 2024
f1bdc81
Merge branch 'miner_fix_l2' of https://github.com/ethstorage/es-node …
syntrust Jul 17, 2024
6131bd1
Merge branch 'main' of https://github.com/ethstorage/es-node into sav…
syntrust Jul 17, 2024
c3207d2
minor
syntrust Jul 17, 2024
3428777
fix lock
syntrust Jul 18, 2024
a37a073
clean cache when close
syntrust Jul 23, 2024
ac80f2e
use block number instead of hash to prevent re-org
syntrust Jul 23, 2024
489eb95
remove hash
syntrust Jul 23, 2024
0f67978
no lock for store
syntrust Jul 24, 2024
ce0367c
get sample by billy
syntrust Jul 26, 2024
b05b882
fix test
syntrust Jul 27, 2024
2de8114
fix test
syntrust Jul 27, 2024
60d3c22
fix test
syntrust Jul 27, 2024
58bcd20
test sample
syntrust Jul 30, 2024
772c646
Merge branch 'main' of https://github.com/ethstorage/es-node into sav…
syntrust Jul 30, 2024
7ed64e0
tidy
syntrust Jul 30, 2024
cb1db88
tidy
syntrust Jul 30, 2024
b9ebd75
refactor
syntrust Jul 30, 2024
e5d22c8
fix comments
syntrust Jul 31, 2024
8ba6628
stats
syntrust Jul 31, 2024
808d9af
fix comments
syntrust Jul 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions ethstorage/blobs/blob_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

type BlobCacheReader interface {
GetKeyValueByIndex(index uint64, hash common.Hash) []byte
GetKeyValueByIndexUnchecked(index uint64) []byte
GetSampleData(kvIndex, sampleIndexInKv uint64) []byte
}

// BlobReader provides unified interface for the miner to read blobs and samples
Expand Down Expand Up @@ -53,13 +53,9 @@ func (n *BlobReader) GetBlob(kvIdx uint64, kvHash common.Hash) ([]byte, error) {
func (n *BlobReader) ReadSample(shardIdx, sampleIdx uint64) (common.Hash, error) {
sampleLenBits := n.sm.MaxKvSizeBits() - es.SampleSizeBits
kvIdx := sampleIdx >> sampleLenBits
// get blob without checking commit since kvHash is not available
if blob := n.cr.GetKeyValueByIndexUnchecked(kvIdx); blob != nil {
n.lg.Debug("Loaded blob from downloader cache", "kvIdx", kvIdx)
sampleIdxInKv := sampleIdx % (1 << sampleLenBits)
sampleSize := uint64(1 << es.SampleSizeBits)
sampleIdxByte := sampleIdxInKv << es.SampleSizeBits
sample := blob[sampleIdxByte : sampleIdxByte+sampleSize]
sampleIdxInKv := sampleIdx % (1 << sampleLenBits)

if sample := n.cr.GetSampleData(kvIdx, sampleIdxInKv); sample != nil {
return common.BytesToHash(sample), nil
}

Expand Down
176 changes: 168 additions & 8 deletions ethstorage/downloader/blob_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,97 @@ import (
"bytes"
"fmt"
"math/big"
"math/rand"
"os"
"path/filepath"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethstorage/go-ethstorage/ethstorage"
"github.com/ethstorage/go-ethstorage/ethstorage/log"
"github.com/protolambda/go-kzg/eth"
)

var (
bc BlobCache
cache BlobCache
syntrust marked this conversation as resolved.
Show resolved Hide resolved
kvHashes []common.Hash
datadir string
fileName = "test_shard_0.dat"
blobData = "blob data of kvIndex %d"
sampleLen = blobSize / sampleSize
minerAddr = common.BigToAddress(common.Big1)
kvSize uint64 = 1 << 17
kvEntries uint64 = 16
shardID = uint64(0)
)

func init() {
bc = NewBlobMemCache()
func TestDiskBlobCache(t *testing.T) {
setup(t)
t.Cleanup(func() {
teardown(t)
})

block, err := newBlockBlobs(10, 4)
if err != nil {
t.Fatalf("Failed to create new block blobs: %v", err)
}

err = cache.SetBlockBlobs(block)
if err != nil {
t.Fatalf("Failed to set block blobs: %v", err)
}

blobs := cache.Blobs(block.number)
if len(blobs) != len(block.blobs) {
t.Fatalf("Unexpected number of blobs: got %d, want %d", len(blobs), len(block.blobs))
}

for i, blob := range block.blobs {
blobData := cache.GetKeyValueByIndex(uint64(i), blob.hash)
if !bytes.Equal(blobData, blob.data) {
t.Fatalf("Unexpected blob data at index %d: got %x, want %x", i, blobData, blob.data)
}
}

cache.Cleanup(5)
blobsAfterCleanup := cache.Blobs(block.number)
if len(blobsAfterCleanup) != len(block.blobs) {
t.Fatalf("Unexpected number of blobs after cleanup: got %d, want %d", len(blobsAfterCleanup), len(block.blobs))
}

block, err = newBlockBlobs(20, 6)
if err != nil {
t.Fatalf("Failed to create new block blobs: %v", err)
}

err = cache.SetBlockBlobs(block)
if err != nil {
t.Fatalf("Failed to set block blobs: %v", err)
}

cache.Cleanup(15)
blobsAfterCleanup = cache.Blobs(block.number)
if len(blobsAfterCleanup) != len(block.blobs) {
t.Fatalf("Unexpected number of blobs after cleanup: got %d, want %d", len(blobsAfterCleanup), len(block.blobs))
}
}

func TestBlobCache_Encoding(t *testing.T) {
func TestEncoding(t *testing.T) {
setup(t)
t.Cleanup(func() {
teardown(t)
})

blockBlobsParams := []struct {
blockNum uint64
blobLen uint64
}{
{0, 1},
{1000, 4},
{1, 5},
{222, 6},
{1000, 4},
{12345, 2},
{2000000, 3},
}
Expand All @@ -66,14 +124,16 @@ func TestBlobCache_Encoding(t *testing.T) {
for i, b := range bb.blobs {
bb.blobs[i].data = sm.EncodeBlob(b.data, b.hash, b.kvIndex.Uint64(), kvSize)
}
bc.SetBlockBlobs(bb)
if err := cache.SetBlockBlobs(bb); err != nil {
t.Fatalf("failed to set block blobs: %v", err)
}
}

// load from cache and verify
for i, kvHash := range kvHashes {
kvIndex := uint64(i)
t.Run(fmt.Sprintf("test kv: %d", i), func(t *testing.T) {
blobEncoded := bc.GetKeyValueByIndexUnchecked(kvIndex)
blobEncoded := cache.GetKeyValueByIndex(kvIndex, kvHash)
blobDecoded := sm.DecodeBlob(blobEncoded, kvHash, kvIndex, kvSize)
bytesWant := []byte(fmt.Sprintf(blobData, kvIndex))
if !bytes.Equal(blobDecoded[:len(bytesWant)], bytesWant) {
Expand All @@ -83,10 +143,83 @@ func TestBlobCache_Encoding(t *testing.T) {
}
}

func TestBlobDiskCache_GetSampleData(t *testing.T) {
setup(t)
t.Cleanup(func() {
teardown(t)
})

const blockStart = 10000000
rand.New(rand.NewSource(time.Now().UnixNano()))
kvIndex2BlockNumber := map[uint64]uint64{}
kvIndex2BlobIndex := map[uint64]uint64{}

newBlockBlobsFilled := func(blockNumber, blobLen uint64) (*blockBlobs, error) {
block := &blockBlobs{
number: blockNumber,
blobs: make([]*blob, blobLen),
}
for i := uint64(0); i < blobLen; i++ {
kvIndex := uint64(len(kvHashes))
blob := &blob{
kvIndex: new(big.Int).SetUint64(kvIndex),
data: fill(blockNumber, i),
}
kzgBlob := kzg4844.Blob{}
copy(kzgBlob[:], blob.data)
commitment, err := kzg4844.BlobToCommitment(kzgBlob)
if err != nil {
return nil, fmt.Errorf(
"failed to create commitment for blob %d: %w", kvIndex, err)
}
blob.hash = common.Hash(eth.KZGToVersionedHash(eth.KZGCommitment(commitment)))
block.blobs[i] = blob
kvHashes = append(kvHashes, blob.hash)
kvIndex2BlockNumber[kvIndex] = blockNumber
kvIndex2BlobIndex[kvIndex] = i
}
t.Log("Block created", "number", block.number, "blobs", blobLen)
return block, nil
}
for i := 0; i < 10; i++ {
blockn, blobn := blockStart+i, rand.Intn(6)+1
block, err := newBlockBlobsFilled(uint64(blockn), uint64(blobn))
if err != nil {
t.Fatalf("Failed to create new block blobs: %v", err)
}
if err := cache.SetBlockBlobs(block); err != nil {
t.Fatalf("Failed to set block blobs: %v", err)
}
}

for kvi := range kvHashes {
kvIndex := uint64(kvi)
sampleIndex := rand.Intn(int(sampleLen))
sample := cache.GetSampleData(kvIndex, uint64(sampleIndex))
sampleWant := make([]byte, sampleSize)
copy(sampleWant, fmt.Sprintf("%d_%d_%d", kvIndex2BlockNumber[kvIndex], kvIndex2BlobIndex[kvIndex], sampleIndex))
t.Run(fmt.Sprintf("test sample: kvIndex=%d, sampleIndex=%d", kvIndex, sampleIndex), func(t *testing.T) {
if !bytes.Equal(sample, sampleWant) {
t.Errorf("GetSampleData got %x, want %x", sample, sampleWant)
}
})
}

}

func fill(blockNumber, blobIndex uint64) []byte {
var content []byte
for i := uint64(0); i < sampleLen; i++ {
sample := make([]byte, sampleSize)
copy(sample, fmt.Sprintf("%d_%d_%d", blockNumber, blobIndex, i))
content = append(content, sample...)
}
return content
}

func newBlockBlobs(blockNumber, blobLen uint64) (*blockBlobs, error) {
block := &blockBlobs{
number: blockNumber,
hash: common.BigToHash(new(big.Int).SetUint64(blockNumber)),
blobs: make([]*blob, blobLen),
}
for i := uint64(0); i < blobLen; i++ {
Expand All @@ -109,3 +242,30 @@ func newBlockBlobs(blockNumber, blobLen uint64) (*blockBlobs, error) {
}
return block, nil
}

func setup(t *testing.T) {
// cache = NewBlobMemCache()
tmpDir := t.TempDir()
datadir = filepath.Join(tmpDir, "datadir")
err := os.MkdirAll(datadir, 0700)
if err != nil {
t.Fatalf("Failed to create datadir: %v", err)
}
t.Logf("datadir %s", datadir)
cache = NewBlobDiskCache(datadir, log.NewLogger(log.CLIConfig{
Level: "warn",
Format: "text",
}))
}

func teardown(t *testing.T) {
err := cache.Close()
if err != nil {
t.Errorf("Failed to close BlobCache: %v", err)
}
err = os.RemoveAll(datadir)
if err != nil {
t.Errorf("Failed to remove datadir: %v", err)
}
kvHashes = nil
}
Loading
Loading