Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not send first blocks extension #230

Merged
merged 5 commits into from
Sep 29, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions donotsendfirstblocks/donotsendfirstblocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package donotsendfirstblocks

import (
basicnode "github.com/ipld/go-ipld-prime/node/basic"

"github.com/ipfs/go-graphsync/ipldutil"
)

// EncodeDoNotSendFirstBlocks returns encoded cbor data for the given number
// of blocks to skip
func EncodeDoNotSendFirstBlocks(skipBlockCount int64) ([]byte, error) {
nb := basicnode.Prototype.Int.NewBuilder()
err := nb.AssignInt(skipBlockCount)
if err != nil {
return nil, err
}
nd := nb.Build()
hannahhoward marked this conversation as resolved.
Show resolved Hide resolved
return ipldutil.EncodeNode(nd)
}

// DecodeDoNotSendFirstBlocks returns the number of blocks to skip
func DecodeDoNotSendFirstBlocks(data []byte) (int64, error) {
nd, err := ipldutil.DecodeNode(data)
if err != nil {
return 0, err
}
return nd.AsInt()
}
4 changes: 4 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ const (
// https://github.com/ipld/specs/blob/master/block-layer/graphsync/known_extensions.md
ExtensionDoNotSendCIDs = ExtensionName("graphsync/do-not-send-cids")

// ExtensionsDoNotSendFirstBlocks tells the responding peer not to wait till the given
// number of blocks have been traversed before it begins to send blocks over the wire
ExtensionsDoNotSendFirstBlocks = ExtensionName("graphsync/do-not-send-first-blocks")

// ExtensionDeDupByKey tells the responding peer to only deduplicate block sending
// for requests that have the same key. The data for the extension is a string key
ExtensionDeDupByKey = ExtensionName("graphsync/dedup-by-key")
Expand Down
50 changes: 50 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/donotsendfirstblocks"
gsmsg "github.com/ipfs/go-graphsync/message"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
Expand Down Expand Up @@ -331,6 +332,55 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
require.Equal(t, blockChainLength-set.Len(), totalSentOnWire)
}

func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
hannahhoward marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)

// store blocks locally
firstHalf := blockChain.Blocks(0, 50)
for _, blk := range firstHalf {
td.blockStore1[cidlink.Link{Cid: blk.Cid()}] = blk.RawData()
}

doNotSendFirstBlocksData, err := donotsendfirstblocks.EncodeDoNotSendFirstBlocks(50)
require.NoError(t, err)
extension := graphsync.ExtensionData{
Name: graphsync.ExtensionsDoNotSendFirstBlocks,
Data: doNotSendFirstBlocksData,
}

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()

totalSent := 0
totalSentOnWire := 0
responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
totalSent++
if blockData.BlockSizeOnWire() > 0 {
totalSentOnWire++
}
})

progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), extension)

blockChain.VerifyWholeChain(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

require.Equal(t, blockChainLength, totalSent)
require.Equal(t, blockChainLength-50, totalSentOnWire)
}

func TestPauseResume(t *testing.T) {
// create network
ctx := context.Background()
Expand Down
1 change: 1 addition & 0 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type NetworkErrorListeners interface {
type ResponseAssembler interface {
DedupKey(p peer.ID, requestID graphsync.RequestID, key string)
IgnoreBlocks(p peer.ID, requestID graphsync.RequestID, links []ipld.Link)
SkipFirstBlocks(p peer.ID, requestID graphsync.RequestID, skipCount int64)
Transaction(p peer.ID, requestID graphsync.RequestID, transaction responseassembler.Transaction) error
}

Expand Down
22 changes: 22 additions & 0 deletions responsemanager/querypreparer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/dedupkey"
"github.com/ipfs/go-graphsync/donotsendfirstblocks"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/notifications"
Expand Down Expand Up @@ -62,6 +63,9 @@ func (qe *queryPreparer) prepareQuery(ctx context.Context,
if err := qe.processDoNoSendCids(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
if err := qe.processDoNotSendFirstBlocks(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
rootLink := cidlink.Link{Cid: request.Root()}
linkSystem := result.CustomLinkSystem
if linkSystem.StorageReadOpener == nil {
Expand Down Expand Up @@ -120,3 +124,21 @@ func (qe *queryPreparer) processDoNoSendCids(request gsmsg.GraphSyncRequest, p p
qe.responseAssembler.IgnoreBlocks(p, request.ID(), links)
return nil
}

func (qe *queryPreparer) processDoNotSendFirstBlocks(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error {
doNotSendFirstBlocksData, has := request.Extension(graphsync.ExtensionsDoNotSendFirstBlocks)
if !has {
return nil
}
skipCount, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData)
if err != nil {
_ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
return nil
})
return err
}
qe.responseAssembler.SkipFirstBlocks(p, request.ID(), skipCount)
return nil
}
36 changes: 25 additions & 11 deletions responsemanager/responseassembler/peerlinktracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@ import (
)

type peerLinkTracker struct {
linkTrackerLk sync.RWMutex
linkTracker *linktracker.LinkTracker
altTrackers map[string]*linktracker.LinkTracker
dedupKeys map[graphsync.RequestID]string
linkTrackerLk sync.RWMutex
linkTracker *linktracker.LinkTracker
altTrackers map[string]*linktracker.LinkTracker
dedupKeys map[graphsync.RequestID]string
blockSentCount map[graphsync.RequestID]int64
skipFirstBlocks map[graphsync.RequestID]int64
}

func newTracker() *peerLinkTracker {
return &peerLinkTracker{
linkTracker: linktracker.New(),
dedupKeys: make(map[graphsync.RequestID]string),
altTrackers: make(map[string]*linktracker.LinkTracker),
linkTracker: linktracker.New(),
dedupKeys: make(map[graphsync.RequestID]string),
altTrackers: make(map[string]*linktracker.LinkTracker),
blockSentCount: make(map[graphsync.RequestID]int64),
skipFirstBlocks: make(map[graphsync.RequestID]int64),
}
}

Expand Down Expand Up @@ -54,6 +58,12 @@ func (prs *peerLinkTracker) IgnoreBlocks(requestID graphsync.RequestID, links []
prs.linkTrackerLk.Unlock()
}

func (prs *peerLinkTracker) SkipFirstBlocks(requestID graphsync.RequestID, blocksToSkip int64) {
prs.linkTrackerLk.Lock()
prs.skipFirstBlocks[requestID] = blocksToSkip
prs.linkTrackerLk.Unlock()
}

// FinishTracking clears link tracking data for the request.
func (prs *peerLinkTracker) FinishTracking(requestID graphsync.RequestID) bool {
prs.linkTrackerLk.Lock()
Expand All @@ -74,16 +84,20 @@ func (prs *peerLinkTracker) FinishTracking(requestID graphsync.RequestID) bool {
delete(prs.altTrackers, key)
}
}
delete(prs.blockSentCount, requestID)
delete(prs.skipFirstBlocks, requestID)
return allBlocks
}

// RecordLinkTraversal records whether a link is found for a request.
func (prs *peerLinkTracker) RecordLinkTraversal(requestID graphsync.RequestID,
link ipld.Link, hasBlock bool) (isUnique bool) {
link ipld.Link, hasBlock bool) bool {
prs.linkTrackerLk.Lock()
defer prs.linkTrackerLk.Unlock()
prs.blockSentCount[requestID]++
notSkipped := prs.skipFirstBlocks[requestID] < prs.blockSentCount[requestID]
linkTracker := prs.getLinkTracker(requestID)
isUnique = linkTracker.BlockRefCount(link) == 0
isUnique := linkTracker.BlockRefCount(link) == 0
linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
prs.linkTrackerLk.Unlock()
return
return hasBlock && notSkipped && isUnique
}
4 changes: 2 additions & 2 deletions responsemanager/responseassembler/responseBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func (rb *responseBuilder) AddNotifee(notifee notifications.Notifee) {
func (rb *responseBuilder) setupBlockOperation(
link ipld.Link, data []byte) blockOperation {
hasBlock := data != nil
isUnique := rb.linkTracker.RecordLinkTraversal(rb.requestID, link, hasBlock)
send := rb.linkTracker.RecordLinkTraversal(rb.requestID, link, hasBlock)
return blockOperation{
data, hasBlock && isUnique, link, rb.requestID,
data, send, link, rb.requestID,
}
}

Expand Down
5 changes: 5 additions & 0 deletions responsemanager/responseassembler/responseassembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (ra *ResponseAssembler) IgnoreBlocks(p peer.ID, requestID graphsync.Request
ra.GetProcess(p).(*peerLinkTracker).IgnoreBlocks(requestID, links)
}

// SkipFirstBlocks tells the assembler for the given request to not send the first N blocks
func (ra *ResponseAssembler) SkipFirstBlocks(p peer.ID, requestID graphsync.RequestID, skipFirstBlocks int64) {
ra.GetProcess(p).(*peerLinkTracker).SkipFirstBlocks(requestID, skipFirstBlocks)
}

// Transaction builds a response, and queues it for sending in the next outgoing message
func (ra *ResponseAssembler) Transaction(p peer.ID, requestID graphsync.RequestID, transaction Transaction) error {
rb := &responseBuilder{
Expand Down
78 changes: 78 additions & 0 deletions responsemanager/responseassembler/responseassembler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,84 @@ func TestResponseAssemblerIgnoreBlocks(t *testing.T) {

}

func TestResponseAssemblerSkipFirstBlocks(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
p := testutil.GeneratePeers(1)[0]
requestID1 := graphsync.RequestID(rand.Int31())
requestID2 := graphsync.RequestID(rand.Int31())
blks := testutil.GenerateBlocksOfSize(5, 100)
links := make([]ipld.Link, 0, len(blks))
for _, block := range blks {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
fph := newFakePeerHandler(ctx, t)
responseAssembler := New(ctx, fph)

responseAssembler.SkipFirstBlocks(p, requestID1, 3)

var bd1, bd2, bd3, bd4, bd5 graphsync.BlockData
err := responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error {
bd1 = b.SendResponse(links[0], blks[0].RawData())
return nil
})
require.NoError(t, err)

assertSentNotOnWire(t, bd1, blks[0])
fph.RefuteBlocks()
fph.AssertResponses(expectedResponses{requestID1: graphsync.PartialResponse})

err = responseAssembler.Transaction(p, requestID2, func(b ResponseBuilder) error {
bd1 = b.SendResponse(links[0], blks[0].RawData())
return nil
})
require.NoError(t, err)
fph.AssertResponses(expectedResponses{
requestID2: graphsync.PartialResponse,
})

err = responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error {
bd2 = b.SendResponse(links[1], blks[1].RawData())
bd3 = b.SendResponse(links[2], blks[2].RawData())
return nil
})
require.NoError(t, err)

assertSentNotOnWire(t, bd1, blks[0])
assertSentNotOnWire(t, bd2, blks[1])
assertSentNotOnWire(t, bd3, blks[2])

fph.RefuteBlocks()
fph.AssertResponses(expectedResponses{
requestID1: graphsync.PartialResponse,
})
err = responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error {
bd4 = b.SendResponse(links[3], blks[3].RawData())
bd5 = b.SendResponse(links[4], blks[4].RawData())
b.FinishRequest()
return nil
})
require.NoError(t, err)

assertSentOnWire(t, bd4, blks[3])
assertSentOnWire(t, bd5, blks[4])

fph.AssertBlocks(blks[3], blks[4])
fph.AssertResponses(expectedResponses{requestID1: graphsync.RequestCompletedFull})

err = responseAssembler.Transaction(p, requestID2, func(b ResponseBuilder) error {
b.SendResponse(links[3], blks[3].RawData())
b.FinishRequest()
return nil
})
require.NoError(t, err)

fph.AssertBlocks(blks[3])
fph.AssertResponses(expectedResponses{requestID2: graphsync.RequestCompletedFull})

}

func TestResponseAssemblerDupKeys(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
Expand Down
Loading