Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Free memory on request finish #240

Merged
merged 5 commits into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions requestmanager/asyncloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,22 @@ func (al *AsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {
// CleanupRequest indicates the given request is complete on the client side,
// and no further attempts will be made to load links for this request,
// so any cached response data is invalid can be cleaned
func (al *AsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
func (al *AsyncLoader) CleanupRequest(p peer.ID, requestID graphsync.RequestID) {
al.stateLk.Lock()
defer al.stateLk.Unlock()
responseCache := al.responseCache
aq, ok := al.requestQueues[requestID]
if ok {
al.alternateQueues[aq].responseCache.FinishRequest(requestID)
responseCache = al.alternateQueues[aq].responseCache
delete(al.requestQueues, requestID)
return
}
al.responseCache.FinishRequest(requestID)
toFree := responseCache.FinishRequest(requestID)
if toFree > 0 {
err := al.allocator.ReleaseBlockMemory(p, toFree)
if err != nil {
log.Infow("Error deallocating requestor memory", "p", p, "toFree", toFree, "err", err)
}
}
}

func (al *AsyncLoader) getLoadAttemptQueue(queue string) *loadattemptqueue.LoadAttemptQueue {
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/asyncloader/asyncloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func TestRegisterUnregister(t *testing.T) {
err = asyncLoader.UnregisterPersistenceOption("other")
require.EqualError(t, err, "cannot unregister while requests are in progress")
asyncLoader.CompleteResponsesFor(requestID2)
asyncLoader.CleanupRequest(requestID2)
asyncLoader.CleanupRequest(p, requestID2)
err = asyncLoader.UnregisterPersistenceOption("other")
require.NoError(t, err)

Expand Down
17 changes: 12 additions & 5 deletions requestmanager/asyncloader/responsecache/responsecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var log = logging.Logger("graphsync")
// UnverifiedBlockStore is an interface for storing blocks
// as they come in and removing them as they are verified
type UnverifiedBlockStore interface {
PruneBlocks(func(ipld.Link) bool)
PruneBlocks(func(ipld.Link, uint64) bool)
PruneBlock(ipld.Link)
VerifyBlock(ipld.Link, ipld.LinkContext) ([]byte, error)
AddUnverifiedBlock(ipld.Link, []byte)
Expand All @@ -42,15 +42,22 @@ func New(unverifiedBlockStore UnverifiedBlockStore) *ResponseCache {
}

// FinishRequest indicate there is no more need to track blocks tied to this
// response
func (rc *ResponseCache) FinishRequest(requestID graphsync.RequestID) {
// response. It returns the total number of bytes in blocks that were being
// tracked but are no longer in memory
func (rc *ResponseCache) FinishRequest(requestID graphsync.RequestID) uint64 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment indicating this return value is the total length of data over the blocks that have been pruned? (if i'm reading that right)

rc.responseCacheLk.Lock()
rc.linkTracker.FinishRequest(requestID)

rc.unverifiedBlockStore.PruneBlocks(func(link ipld.Link) bool {
return rc.linkTracker.BlockRefCount(link) == 0
toFree := uint64(0)
rc.unverifiedBlockStore.PruneBlocks(func(link ipld.Link, amt uint64) bool {
shouldPrune := rc.linkTracker.BlockRefCount(link) == 0
if shouldPrune {
toFree += amt
}
return shouldPrune
})
rc.responseCacheLk.Unlock()
return toFree
}

// AttemptLoad attempts to laod the given block from the cache
Expand Down
10 changes: 6 additions & 4 deletions requestmanager/asyncloader/responsecache/responsecache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ func (ubs *fakeUnverifiedBlockStore) AddUnverifiedBlock(lnk ipld.Link, data []by
ubs.inMemoryBlocks[lnk] = data
}

func (ubs *fakeUnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link) bool) {
for link := range ubs.inMemoryBlocks {
if shouldPrune(link) {
func (ubs *fakeUnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link, uint64) bool) {
for link, data := range ubs.inMemoryBlocks {
if shouldPrune(link, uint64(len(data))) {
delete(ubs.inMemoryBlocks, link)
}
}
Expand Down Expand Up @@ -134,14 +134,16 @@ func TestResponseCacheManagingLinks(t *testing.T) {
require.NoError(t, err)
require.Nil(t, data, "no data should be returned for unknown block")

responseCache.FinishRequest(requestID1)
toFree := responseCache.FinishRequest(requestID1)
// should remove only block 0, since it now has no refering outstanding requests
require.Len(t, fubs.blocks(), len(blks)-4, "should prune block when it is orphaned")
testutil.RefuteContainsBlock(t, fubs.blocks(), blks[0])
require.Equal(t, toFree, uint64(len(blks[0].RawData())))

responseCache.FinishRequest(requestID2)
// should remove last block since are no remaining references
require.Len(t, fubs.blocks(), 0, "should prune block when it is orphaned")
testutil.RefuteContainsBlock(t, fubs.blocks(), blks[3])
require.Equal(t, toFree, uint64(len(blks[3].RawData())))

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func (ubs *UnverifiedBlockStore) AddUnverifiedBlock(lnk ipld.Link, data []byte)

// PruneBlocks removes blocks from the unverified store without committing them,
// if the passed in function returns true for the given link
func (ubs *UnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link) bool) {
func (ubs *UnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link, uint64) bool) {
for link, data := range ubs.inMemoryBlocks {
if shouldPrune(link) {
if shouldPrune(link, uint64(len(data))) {
delete(ubs.inMemoryBlocks, link)
ubs.dataSize = ubs.dataSize - uint64(len(data))
}
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type AsyncLoader interface {
blks []blocks.Block)
AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult
CompleteResponsesFor(requestID graphsync.RequestID)
CleanupRequest(requestID graphsync.RequestID)
CleanupRequest(p peer.ID, requestID graphsync.RequestID)
}

// RequestManager tracks outgoing requests and processes incoming reponses
Expand Down
4 changes: 2 additions & 2 deletions requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ func TestPauseResume(t *testing.T) {
// verify no further responses come through
time.Sleep(100 * time.Millisecond)
testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
td.fal.CleanupRequest(rr.gsr.ID())
td.fal.CleanupRequest(peers[0], rr.gsr.ID())

// unpause
err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
Expand Down Expand Up @@ -893,7 +893,7 @@ func TestPauseResumeExternal(t *testing.T) {
// verify no further responses come through
time.Sleep(100 * time.Millisecond)
testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
td.fal.CleanupRequest(rr.gsr.ID())
td.fal.CleanupRequest(peers[0], rr.gsr.ID())

// unpause
err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (rm *RequestManager) terminateRequest(requestID graphsync.RequestID, ipr *i
rm.connManager.Unprotect(ipr.p, requestID.Tag())
delete(rm.inProgressRequestStatuses, requestID)
ipr.cancelFn()
rm.asyncLoader.CleanupRequest(requestID)
rm.asyncLoader.CleanupRequest(ipr.p, requestID)
if ipr.traverser != nil {
ipr.traverserCancel()
ipr.traverser.Shutdown(rm.ctx)
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/testloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (fal *FakeAsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID)

// CleanupRequest simulates the effect of cleaning up the request by removing any response channels
// for the request
func (fal *FakeAsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
func (fal *FakeAsyncLoader) CleanupRequest(p peer.ID, requestID graphsync.RequestID) {
fal.responseChannelsLk.Lock()
for key := range fal.responseChannels {
if key.requestID == requestID {
Expand Down