Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune blocks more intelligently #101

Merged
merged 1 commit into from
Oct 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions benchmarks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,15 @@ func BenchmarkRoundtripSuccess(b *testing.B) {
b.Run("test-20-10000", func(b *testing.B) {
subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm)
})
b.Run("test-20-128MB", func(b *testing.B) {
subtestDistributeAndFetch(ctx, b, 10, delay.Fixed(0), time.Duration(0), allFilesUniformSize(128*(1<<20), defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm)
})
b.Run("test-p2p-stress-10-128MB", func(b *testing.B) {
p2pStrestTest(ctx, b, 20, allFilesUniformSize(128*(1<<20), 1<<20, 1024), tdm)
})
b.Run("test-p2p-stress-10-128MB-1KB-chunks", func(b *testing.B) {
p2pStrestTest(ctx, b, 10, allFilesUniformSize(128*(1<<20), 1<<10, 1024), tdm)
})
}

func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, tdm *tempDirMaker) {
Expand Down
9 changes: 6 additions & 3 deletions requestmanager/asyncloader/responsecache/responsecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var log = logging.Logger("graphsync")
// as they come in and removing them as they are verified
type UnverifiedBlockStore interface {
PruneBlocks(func(ipld.Link) bool)
PruneBlock(ipld.Link)
VerifyBlock(ipld.Link) ([]byte, error)
AddUnverifiedBlock(ipld.Link, []byte)
}
Expand Down Expand Up @@ -83,9 +84,11 @@ func (rc *ResponseCache) ProcessResponse(responses map[graphsync.RequestID]metad
}

// prune unused blocks right away
rc.unverifiedBlockStore.PruneBlocks(func(link ipld.Link) bool {
return rc.linkTracker.BlockRefCount(link) == 0
})
for _, block := range blks {
if rc.linkTracker.BlockRefCount(cidlink.Link{Cid: block.Cid()}) == 0 {
rc.unverifiedBlockStore.PruneBlock(cidlink.Link{Cid: block.Cid()})
}
}

rc.responseCacheLk.Unlock()
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func (ubs *fakeUnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link) boo
}
}

func (ubs *fakeUnverifiedBlockStore) PruneBlock(link ipld.Link) {
delete(ubs.inMemoryBlocks, link)
}

func (ubs *fakeUnverifiedBlockStore) VerifyBlock(lnk ipld.Link) ([]byte, error) {
data, ok := ubs.inMemoryBlocks[lnk]
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ func (ubs *UnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link) bool) {
}
}

// PruneBlock deletes an individual block from the store
func (ubs *UnverifiedBlockStore) PruneBlock(link ipld.Link) {
delete(ubs.inMemoryBlocks, link)
}

// VerifyBlock verifies the data for the given link as being part of a traversal,
// removes it from the unverified store, and writes it to permaneant storage.
func (ubs *UnverifiedBlockStore) VerifyBlock(lnk ipld.Link) ([]byte, error) {
Expand Down