From 70d3ec7ac7e3e5472b049b968cc7296abf4868d2 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Sun, 11 Oct 2020 17:57:21 -0700 Subject: [PATCH] feat(responsecache): prune blocks more intelligently (#101) --- benchmarks/benchmark_test.go | 6 ++++++ .../asyncloader/responsecache/responsecache.go | 9 ++++++--- .../asyncloader/responsecache/responsecache_test.go | 4 ++++ .../unverifiedblockstore/unverifiedblockstore.go | 5 +++++ 4 files changed, 21 insertions(+), 3 deletions(-) diff --git a/benchmarks/benchmark_test.go b/benchmarks/benchmark_test.go index cf430274..0b5aec7c 100644 --- a/benchmarks/benchmark_test.go +++ b/benchmarks/benchmark_test.go @@ -52,9 +52,15 @@ func BenchmarkRoundtripSuccess(b *testing.B) { b.Run("test-20-10000", func(b *testing.B) { subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm) }) + b.Run("test-20-128MB", func(b *testing.B) { + subtestDistributeAndFetch(ctx, b, 10, delay.Fixed(0), time.Duration(0), allFilesUniformSize(128*(1<<20), defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm) + }) b.Run("test-p2p-stress-10-128MB", func(b *testing.B) { p2pStrestTest(ctx, b, 20, allFilesUniformSize(128*(1<<20), 1<<20, 1024), tdm) }) + b.Run("test-p2p-stress-10-128MB-1KB-chunks", func(b *testing.B) { + p2pStrestTest(ctx, b, 10, allFilesUniformSize(128*(1<<20), 1<<10, 1024), tdm) + }) } func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, tdm *tempDirMaker) { diff --git a/requestmanager/asyncloader/responsecache/responsecache.go b/requestmanager/asyncloader/responsecache/responsecache.go index 5420fa47..a2288267 100644 --- a/requestmanager/asyncloader/responsecache/responsecache.go +++ b/requestmanager/asyncloader/responsecache/responsecache.go @@ -20,6 +20,7 @@ var log = logging.Logger("graphsync") // as they come in and removing them as they are verified type UnverifiedBlockStore interface { PruneBlocks(func(ipld.Link) bool) + PruneBlock(ipld.Link) VerifyBlock(ipld.Link) ([]byte, error) AddUnverifiedBlock(ipld.Link, []byte) } @@ -83,9 +84,11 @@ func (rc *ResponseCache) ProcessResponse(responses map[graphsync.RequestID]metad } // prune unused blocks right away - rc.unverifiedBlockStore.PruneBlocks(func(link ipld.Link) bool { - return rc.linkTracker.BlockRefCount(link) == 0 - }) + for _, block := range blks { + if rc.linkTracker.BlockRefCount(cidlink.Link{Cid: block.Cid()}) == 0 { + rc.unverifiedBlockStore.PruneBlock(cidlink.Link{Cid: block.Cid()}) + } + } rc.responseCacheLk.Unlock() } diff --git a/requestmanager/asyncloader/responsecache/responsecache_test.go b/requestmanager/asyncloader/responsecache/responsecache_test.go index f2a5981b..7702502c 100644 --- a/requestmanager/asyncloader/responsecache/responsecache_test.go +++ b/requestmanager/asyncloader/responsecache/responsecache_test.go @@ -31,6 +31,10 @@ func (ubs *fakeUnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link) boo } } +func (ubs *fakeUnverifiedBlockStore) PruneBlock(link ipld.Link) { + delete(ubs.inMemoryBlocks, link) +} + func (ubs *fakeUnverifiedBlockStore) VerifyBlock(lnk ipld.Link) ([]byte, error) { data, ok := ubs.inMemoryBlocks[lnk] if !ok { diff --git a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go index 5250cee9..7a548bce 100644 --- a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go +++ b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go @@ -38,6 +38,11 @@ func (ubs *UnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link) bool) { } } +// PruneBlock deletes an individual block from the store +func (ubs *UnverifiedBlockStore) PruneBlock(link ipld.Link) { + delete(ubs.inMemoryBlocks, link) +} + // VerifyBlock verifies the data for the given link as being part of a traversal, // removes it from the unverified store, and writes it to permaneant storage. func (ubs *UnverifiedBlockStore) VerifyBlock(lnk ipld.Link) ([]byte, error) {