From 64ea051285e7df4633babb8e7d25c9662bb3aaf2 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Mon, 13 Dec 2021 22:36:01 +1100 Subject: [PATCH] feat(requestid): use uuids for requestids Ref: https://github.com/ipfs/go-graphsync/issues/278 Closes: https://github.com/ipfs/go-graphsync/issues/279 Closes: https://github.com/ipfs/go-graphsync/issues/281 --- go.mod | 1 + graphsync.go | 15 ++++++-- impl/graphsync_test.go | 3 +- linktracker/linktracker_test.go | 7 ++-- message/builder_test.go | 13 ++++--- message/message.go | 19 +++++++--- message/message_test.go | 16 ++++----- message/pb/message.pb.go | 18 +++++----- message/pb/message.proto | 4 +-- messagequeue/messagequeue_test.go | 12 +++---- network/libp2p_impl_test.go | 2 +- peermanager/peermessagemanager_test.go | 2 +- peerstate/peerstate_test.go | 3 +- .../asyncloader/asyncloader_test.go | 35 +++++++++---------- .../loadattemptqueue/loadattemptqueue_test.go | 11 +++--- .../responsecache/responsecache_test.go | 5 ++- requestmanager/client.go | 1 - requestmanager/executor/executor_test.go | 2 +- requestmanager/hooks/hooks_test.go | 7 ++-- requestmanager/requestmanager_test.go | 10 +++--- requestmanager/server.go | 5 ++- responsemanager/hooks/hooks_test.go | 7 ++-- .../queryexecutor/queryexecutor_test.go | 2 +- .../responseassembler_test.go | 25 +++++++------ responsemanager/responsemanager_test.go | 3 +- responsemanager/server.go | 4 +-- 26 files changed, 122 insertions(+), 110 deletions(-) diff --git a/go.mod b/go.mod index 33447445..8ff4005b 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/ipfs/go-graphsync go 1.16 require ( + github.com/google/uuid v1.3.0 github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1 github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e github.com/ipfs/go-block-format v0.0.3 diff --git a/graphsync.go b/graphsync.go index 75756c88..90d53de7 100644 --- a/graphsync.go +++ b/graphsync.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" + "github.com/google/uuid" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/traversal" @@ -12,11 +13,21 @@ import ( ) // RequestID is a unique identifier for a GraphSync request. -type RequestID int32 +type RequestID uuid.UUID // Tag returns an easy way to identify this request id as a graphsync request (for libp2p connections) func (r RequestID) Tag() string { - return fmt.Sprintf("graphsync-request-%d", r) + return r.String() +} + +// String form of a RequestID (should be a well-formed UUIDv4 string) +func (r RequestID) String() string { + return uuid.UUID(r).String() +} + +// Create a new, random RequestID (should be a UUIDv4) +func NewRequestID() RequestID { + return RequestID(uuid.New()) } // Priority a priority for a GraphSync request. diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index bb977e26..1e3f18d6 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -8,7 +8,6 @@ import ( "io" "io/ioutil" "math" - "math/rand" "os" "path/filepath" "testing" @@ -133,7 +132,7 @@ func TestSendResponseToIncomingRequest(t *testing.T) { blockChainLength := 100 blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength) - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() builder := gsmsg.NewBuilder(gsmsg.Topic(0)) builder.AddRequest(gsmsg.NewRequest(requestID, blockChain.TipLink.(cidlink.Link).Cid, blockChain.Selector(), graphsync.Priority(math.MaxInt32), td.extension)) diff --git a/linktracker/linktracker_test.go b/linktracker/linktracker_test.go index af9745c9..4ba4c336 100644 --- a/linktracker/linktracker_test.go +++ b/linktracker/linktracker_test.go @@ -1,7 +1,6 @@ package linktracker import ( - "math/rand" "testing" "github.com/ipld/go-ipld-prime" @@ -74,7 +73,7 @@ func TestBlockRefCount(t *testing.T) { linkTracker := New() link := testutil.NewTestLink() for _, rq := range data.requests { - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() for _, present := range rq.traversals { linkTracker.RecordLinkTraversal(requestID, link, present) } @@ -116,7 +115,7 @@ func TestFinishRequest(t *testing.T) { for testCase, data := range testCases { t.Run(testCase, func(t *testing.T) { linkTracker := New() - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() for _, lt := range data.linksTraversed { linkTracker.RecordLinkTraversal(requestID, lt.link, lt.blockPresent) } @@ -151,7 +150,7 @@ func TestIsKnownMissingLink(t *testing.T) { t.Run(testCase, func(t *testing.T) { linkTracker := New() link := testutil.NewTestLink() - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() for _, present := range data.traversals { linkTracker.RecordLinkTraversal(requestID, link, present) } diff --git a/message/builder_test.go b/message/builder_test.go index 0464d7d1..eb95a51f 100644 --- a/message/builder_test.go +++ b/message/builder_test.go @@ -2,7 +2,6 @@ package message import ( "fmt" - "math/rand" "testing" "github.com/ipld/go-ipld-prime" @@ -21,10 +20,10 @@ func TestMessageBuilding(t *testing.T) { for _, block := range blocks { links = append(links, cidlink.Link{Cid: block.Cid()}) } - requestID1 := graphsync.RequestID(rand.Int31()) - requestID2 := graphsync.RequestID(rand.Int31()) - requestID3 := graphsync.RequestID(rand.Int31()) - requestID4 := graphsync.RequestID(rand.Int31()) + requestID1 := graphsync.NewRequestID() + requestID2 := graphsync.NewRequestID() + requestID3 := graphsync.NewRequestID() + requestID4 := graphsync.NewRequestID() rb.AddLink(requestID1, links[0], true) rb.AddLink(requestID1, links[1], false) @@ -133,8 +132,8 @@ func TestMessageBuilding(t *testing.T) { func TestMessageBuildingExtensionOnly(t *testing.T) { rb := NewBuilder(Topic(0)) - requestID1 := graphsync.RequestID(rand.Int31()) - requestID2 := graphsync.RequestID(rand.Int31()) + requestID1 := graphsync.NewRequestID() + requestID2 := graphsync.NewRequestID() extensionData1 := testutil.RandomBytes(100) extensionName1 := graphsync.ExtensionName("AppleSauce/McGee") diff --git a/message/message.go b/message/message.go index adf7d733..c61317c8 100644 --- a/message/message.go +++ b/message/message.go @@ -6,6 +6,7 @@ import ( "fmt" "io" + "github.com/google/uuid" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" @@ -162,7 +163,12 @@ func newMessageFromProto(pbm *pb.Message) (GraphSyncMessage, error) { if exts == nil { exts = make(map[string][]byte) } - requests[graphsync.RequestID(req.Id)] = newRequest(graphsync.RequestID(req.Id), root, selector, graphsync.Priority(req.Priority), req.Cancel, req.Update, exts) + uid, err := uuid.FromBytes(req.Id) + if err != nil { + return GraphSyncMessage{}, err + } + id := graphsync.RequestID(uid) + requests[id] = newRequest(id, root, selector, graphsync.Priority(req.Priority), req.Cancel, req.Update, exts) } responses := make(map[graphsync.RequestID]GraphSyncResponse, len(pbm.GetResponses())) @@ -174,7 +180,12 @@ func newMessageFromProto(pbm *pb.Message) (GraphSyncMessage, error) { if exts == nil { exts = make(map[string][]byte) } - responses[graphsync.RequestID(res.Id)] = newResponse(graphsync.RequestID(res.Id), graphsync.ResponseStatusCode(res.Status), exts) + uid, err := uuid.FromBytes(res.Id) + if err != nil { + return GraphSyncMessage{}, err + } + id := graphsync.RequestID(uid) + responses[id] = newResponse(id, graphsync.ResponseStatusCode(res.Status), exts) } blks := make(map[cid.Cid]blocks.Block, len(pbm.GetData())) @@ -270,7 +281,7 @@ func (gsm GraphSyncMessage) ToProto() (*pb.Message, error) { } } pbm.Requests = append(pbm.Requests, &pb.Message_Request{ - Id: int32(request.id), + Id: request.id[:], Root: request.root.Bytes(), Selector: selector, Priority: int32(request.priority), @@ -283,7 +294,7 @@ func (gsm GraphSyncMessage) ToProto() (*pb.Message, error) { pbm.Responses = make([]*pb.Message_Response, 0, len(gsm.responses)) for _, response := range gsm.responses { pbm.Responses = append(pbm.Responses, &pb.Message_Response{ - Id: int32(response.requestID), + Id: response.requestID[:], Status: int32(response.status), Extensions: response.extensions, }) diff --git a/message/message_test.go b/message/message_test.go index c69a92fd..575c885f 100644 --- a/message/message_test.go +++ b/message/message_test.go @@ -26,7 +26,7 @@ func TestAppendingRequests(t *testing.T) { root := testutil.GenerateCids(1)[0] ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) selector := ssb.Matcher().Node() - id := graphsync.RequestID(rand.Int31()) + id := graphsync.NewRequestID() priority := graphsync.Priority(rand.Int31()) builder := NewBuilder(Topic(0)) @@ -51,7 +51,7 @@ func TestAppendingRequests(t *testing.T) { require.NoError(t, err) pbRequest := pbMessage.Requests[0] - require.Equal(t, int32(id), pbRequest.Id) + require.Equal(t, id[:], pbRequest.Id) require.Equal(t, int32(priority), pbRequest.Priority) require.False(t, pbRequest.Cancel) require.False(t, pbRequest.Update) @@ -82,7 +82,7 @@ func TestAppendingResponses(t *testing.T) { Name: extensionName, Data: testutil.RandomBytes(100), } - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() status := graphsync.RequestAcknowledged builder := NewBuilder(Topic(0)) @@ -102,7 +102,7 @@ func TestAppendingResponses(t *testing.T) { pbMessage, err := gsm.ToProto() require.NoError(t, err, "serialize to protobuf errored") pbResponse := pbMessage.Responses[0] - require.Equal(t, int32(requestID), pbResponse.Id) + require.Equal(t, requestID[:], pbResponse.Id) require.Equal(t, int32(status), pbResponse.Status) require.Equal(t, extension.Data, pbResponse.Extensions["graphsync/awesome"]) @@ -154,7 +154,7 @@ func contains(strs []string, x string) bool { func TestRequestCancel(t *testing.T) { ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) selector := ssb.Matcher().Node() - id := graphsync.RequestID(rand.Int31()) + id := graphsync.NewRequestID() priority := graphsync.Priority(rand.Int31()) root := testutil.GenerateCids(1)[0] @@ -184,7 +184,7 @@ func TestRequestCancel(t *testing.T) { func TestRequestUpdate(t *testing.T) { - id := graphsync.RequestID(rand.Int31()) + id := graphsync.NewRequestID() extensionName := graphsync.ExtensionName("graphsync/awesome") extension := graphsync.ExtensionData{ Name: extensionName, @@ -235,7 +235,7 @@ func TestToNetFromNetEquivalency(t *testing.T) { Name: extensionName, Data: testutil.RandomBytes(100), } - id := graphsync.RequestID(rand.Int31()) + id := graphsync.NewRequestID() priority := graphsync.Priority(rand.Int31()) status := graphsync.RequestAcknowledged @@ -325,7 +325,7 @@ func TestMergeExtensions(t *testing.T) { root := testutil.GenerateCids(1)[0] ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) selector := ssb.Matcher().Node() - id := graphsync.RequestID(rand.Int31()) + id := graphsync.NewRequestID() priority := graphsync.Priority(rand.Int31()) defaultRequest := NewRequest(id, root, selector, priority, initialExtensions...) t.Run("when merging into empty", func(t *testing.T) { diff --git a/message/pb/message.pb.go b/message/pb/message.pb.go index 7110c0b1..897027eb 100644 --- a/message/pb/message.pb.go +++ b/message/pb/message.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v3.17.3 +// protoc v3.19.1 // source: message.proto package graphsync_message_pb @@ -98,7 +98,7 @@ type Message_Request struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` // unique id set on the requester side + Id []byte `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // unique id set on the requester side Root []byte `protobuf:"bytes,2,opt,name=root,proto3" json:"root,omitempty"` // a CID for the root node in the query Selector []byte `protobuf:"bytes,3,opt,name=selector,proto3" json:"selector,omitempty"` // ipld selector to retrieve Extensions map[string][]byte `protobuf:"bytes,4,rep,name=extensions,proto3" json:"extensions,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` // aux information. useful for other protocols @@ -139,11 +139,11 @@ func (*Message_Request) Descriptor() ([]byte, []int) { return file_message_proto_rawDescGZIP(), []int{0, 0} } -func (x *Message_Request) GetId() int32 { +func (x *Message_Request) GetId() []byte { if x != nil { return x.Id } - return 0 + return nil } func (x *Message_Request) GetRoot() []byte { @@ -193,7 +193,7 @@ type Message_Response struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` // the request id + Id []byte `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // the request id Status int32 `protobuf:"varint,2,opt,name=status,proto3" json:"status,omitempty"` // a status code. Extensions map[string][]byte `protobuf:"bytes,3,rep,name=extensions,proto3" json:"extensions,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` // additional data } @@ -230,11 +230,11 @@ func (*Message_Response) Descriptor() ([]byte, []int) { return file_message_proto_rawDescGZIP(), []int{0, 1} } -func (x *Message_Response) GetId() int32 { +func (x *Message_Response) GetId() []byte { if x != nil { return x.Id } - return 0 + return nil } func (x *Message_Response) GetStatus() int32 { @@ -328,7 +328,7 @@ var file_message_proto_rawDesc = []byte{ 0x70, 0x68, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x1a, 0xab, 0x02, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, + 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x72, 0x6f, 0x6f, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x72, 0x6f, 0x6f, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x73, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, @@ -347,7 +347,7 @@ var file_message_proto_rawDesc = []byte{ 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, 0xc9, 0x01, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, + 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x02, 0x69, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x56, 0x0a, 0x0a, 0x65, 0x78, 0x74, 0x65, 0x6e, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x36, 0x2e, 0x67, diff --git a/message/pb/message.proto b/message/pb/message.proto index ed6d561d..0703e50a 100644 --- a/message/pb/message.proto +++ b/message/pb/message.proto @@ -7,7 +7,7 @@ option go_package = ".;graphsync_message_pb"; message Message { message Request { - int32 id = 1; // unique id set on the requester side + bytes id = 1; // unique id set on the requester side bytes root = 2; // a CID for the root node in the query bytes selector = 3; // ipld selector to retrieve map extensions = 4; // aux information. useful for other protocols @@ -17,7 +17,7 @@ message Message { } message Response { - int32 id = 1; // the request id + bytes id = 1; // the request id int32 status = 2; // a status code. map extensions = 3; // additional data } diff --git a/messagequeue/messagequeue_test.go b/messagequeue/messagequeue_test.go index 9e8c79a5..c85f1c76 100644 --- a/messagequeue/messagequeue_test.go +++ b/messagequeue/messagequeue_test.go @@ -73,7 +73,7 @@ func TestStartupAndShutdown(t *testing.T) { messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout) messageQueue.Startup() - id := graphsync.RequestID(rand.Int31()) + id := graphsync.NewRequestID() priority := graphsync.Priority(rand.Int31()) ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) selector := ssb.Matcher().Node() @@ -111,7 +111,7 @@ func TestShutdownDuringMessageSend(t *testing.T) { messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout) messageQueue.Startup() - id := graphsync.RequestID(rand.Int31()) + id := graphsync.NewRequestID() priority := graphsync.Priority(rand.Int31()) ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) selector := ssb.Matcher().Node() @@ -162,7 +162,7 @@ func TestProcessingNotification(t *testing.T) { waitGroup.Add(1) blks := testutil.GenerateBlocksOfSize(3, 128) - responseID := graphsync.RequestID(rand.Int31()) + responseID := graphsync.NewRequestID() extensionName := graphsync.ExtensionName("graphsync/awesome") extension := graphsync.ExtensionData{ Name: extensionName, @@ -216,7 +216,7 @@ func TestDedupingMessages(t *testing.T) { messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout) messageQueue.Startup() waitGroup.Add(1) - id := graphsync.RequestID(rand.Int31()) + id := graphsync.NewRequestID() priority := graphsync.Priority(rand.Int31()) ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) selector := ssb.Matcher().Node() @@ -227,11 +227,11 @@ func TestDedupingMessages(t *testing.T) { }, []notifications.Notifee{}) // wait for send attempt waitGroup.Wait() - id2 := graphsync.RequestID(rand.Int31()) + id2 := graphsync.NewRequestID() priority2 := graphsync.Priority(rand.Int31()) selector2 := ssb.ExploreAll(ssb.Matcher()).Node() root2 := testutil.GenerateCids(1)[0] - id3 := graphsync.RequestID(rand.Int31()) + id3 := graphsync.NewRequestID() priority3 := graphsync.Priority(rand.Int31()) selector3 := ssb.ExploreIndex(0, ssb.Matcher()).Node() root3 := testutil.GenerateCids(1)[0] diff --git a/network/libp2p_impl_test.go b/network/libp2p_impl_test.go index 76dc8b56..2348d93e 100644 --- a/network/libp2p_impl_test.go +++ b/network/libp2p_impl_test.go @@ -77,7 +77,7 @@ func TestMessageSendAndReceive(t *testing.T) { Name: extensionName, Data: testutil.RandomBytes(100), } - id := graphsync.RequestID(rand.Int31()) + id := graphsync.NewRequestID() priority := graphsync.Priority(rand.Int31()) status := graphsync.RequestAcknowledged diff --git a/peermanager/peermessagemanager_test.go b/peermanager/peermessagemanager_test.go index 03b84ec1..69417b96 100644 --- a/peermanager/peermessagemanager_test.go +++ b/peermanager/peermessagemanager_test.go @@ -67,7 +67,7 @@ func TestSendingMessagesToPeers(t *testing.T) { tp := testutil.GeneratePeers(5) - id := graphsync.RequestID(rand.Int31()) + id := graphsync.NewRequestID() priority := graphsync.Priority(rand.Int31()) root := testutil.GenerateCids(1)[0] ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) diff --git a/peerstate/peerstate_test.go b/peerstate/peerstate_test.go index 4519555f..aa4e860a 100644 --- a/peerstate/peerstate_test.go +++ b/peerstate/peerstate_test.go @@ -2,7 +2,6 @@ package peerstate_test import ( "fmt" - "math/rand" "testing" "github.com/stretchr/testify/require" @@ -14,7 +13,7 @@ import ( func TestDiagnostics(t *testing.T) { requestIDs := make([]graphsync.RequestID, 0, 5) for i := 0; i < 5; i++ { - requestIDs = append(requestIDs, graphsync.RequestID(rand.Int31())) + requestIDs = append(requestIDs, graphsync.NewRequestID()) } testCases := map[string]struct { requestStates graphsync.RequestStates diff --git a/requestmanager/asyncloader/asyncloader_test.go b/requestmanager/asyncloader/asyncloader_test.go index c1f7b858..3c9daedd 100644 --- a/requestmanager/asyncloader/asyncloader_test.go +++ b/requestmanager/asyncloader/asyncloader_test.go @@ -3,7 +3,6 @@ package asyncloader import ( "context" "io" - "math/rand" "testing" "time" @@ -23,7 +22,7 @@ func TestAsyncLoadInitialLoadSucceedsLocallyPresent(t *testing.T) { st := newStore() link := st.Store(t, block) withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() p := testutil.GeneratePeers(1)[0] resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) assertSuccessResponse(ctx, t, resultChan) @@ -38,7 +37,7 @@ func TestAsyncLoadInitialLoadSucceedsResponsePresent(t *testing.T) { st := newStore() withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() responses := map[graphsync.RequestID]metadata.Metadata{ requestID: { metadata.Item{ @@ -61,7 +60,7 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) { st := newStore() withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { link := testutil.NewTestLink() - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() responses := map[graphsync.RequestID]metadata.Metadata{ requestID: { @@ -84,7 +83,7 @@ func TestAsyncLoadInitialLoadIndeterminateWhenRequestNotInProgress(t *testing.T) st := newStore() withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { link := testutil.NewTestLink() - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() p := testutil.GeneratePeers(1)[0] resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) assertFailResponse(ctx, t, resultChan) @@ -100,7 +99,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) { st := newStore() withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() err := asyncLoader.StartRequest(requestID, "") require.NoError(t, err) p := testutil.GeneratePeers(1)[0] @@ -128,7 +127,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenFails(t *testing.T) { withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { link := testutil.NewTestLink() - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() err := asyncLoader.StartRequest(requestID, "") require.NoError(t, err) p := testutil.GeneratePeers(1)[0] @@ -154,7 +153,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) { st := newStore() withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { link := testutil.NewTestLink() - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() err := asyncLoader.StartRequest(requestID, "") require.NoError(t, err) p := testutil.GeneratePeers(1)[0] @@ -172,7 +171,7 @@ func TestAsyncLoadTwiceLoadsLocallySecondTime(t *testing.T) { link := cidlink.Link{Cid: block.Cid()} st := newStore() withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() responses := map[graphsync.RequestID]metadata.Metadata{ requestID: { metadata.Item{ @@ -203,13 +202,13 @@ func TestRegisterUnregister(t *testing.T) { link1 := otherSt.Store(t, blocks[0]) withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { - requestID1 := graphsync.RequestID(rand.Int31()) + requestID1 := graphsync.NewRequestID() err := asyncLoader.StartRequest(requestID1, "other") require.EqualError(t, err, "unknown persistence option") err = asyncLoader.RegisterPersistenceOption("other", otherSt.lsys) require.NoError(t, err) - requestID2 := graphsync.RequestID(rand.Int31()) + requestID2 := graphsync.NewRequestID() err = asyncLoader.StartRequest(requestID2, "other") require.NoError(t, err) p := testutil.GeneratePeers(1)[0] @@ -222,7 +221,7 @@ func TestRegisterUnregister(t *testing.T) { err = asyncLoader.UnregisterPersistenceOption("other") require.NoError(t, err) - requestID3 := graphsync.RequestID(rand.Int31()) + requestID3 := graphsync.NewRequestID() err = asyncLoader.StartRequest(requestID3, "other") require.EqualError(t, err, "unknown persistence option") }) @@ -235,11 +234,11 @@ func TestRequestSplittingLoadLocallyFromBlockstore(t *testing.T) { withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { err := asyncLoader.RegisterPersistenceOption("other", otherSt.lsys) require.NoError(t, err) - requestID1 := graphsync.RequestID(rand.Int31()) + requestID1 := graphsync.NewRequestID() p := testutil.GeneratePeers(1)[0] resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link, ipld.LinkContext{}) - requestID2 := graphsync.RequestID(rand.Int31()) + requestID2 := graphsync.NewRequestID() err = asyncLoader.StartRequest(requestID2, "other") require.NoError(t, err) resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link, ipld.LinkContext{}) @@ -259,8 +258,8 @@ func TestRequestSplittingSameBlockTwoStores(t *testing.T) { withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { err := asyncLoader.RegisterPersistenceOption("other", otherSt.lsys) require.NoError(t, err) - requestID1 := graphsync.RequestID(rand.Int31()) - requestID2 := graphsync.RequestID(rand.Int31()) + requestID1 := graphsync.NewRequestID() + requestID2 := graphsync.NewRequestID() err = asyncLoader.StartRequest(requestID1, "") require.NoError(t, err) err = asyncLoader.StartRequest(requestID2, "other") @@ -300,8 +299,8 @@ func TestRequestSplittingSameBlockOnlyOneResponse(t *testing.T) { withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { err := asyncLoader.RegisterPersistenceOption("other", otherSt.lsys) require.NoError(t, err) - requestID1 := graphsync.RequestID(rand.Int31()) - requestID2 := graphsync.RequestID(rand.Int31()) + requestID1 := graphsync.NewRequestID() + requestID2 := graphsync.NewRequestID() err = asyncLoader.StartRequest(requestID1, "") require.NoError(t, err) err = asyncLoader.StartRequest(requestID2, "other") diff --git a/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue_test.go b/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue_test.go index ae992711..9c83a426 100644 --- a/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue_test.go +++ b/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue_test.go @@ -3,7 +3,6 @@ package loadattemptqueue import ( "context" "fmt" - "math/rand" "testing" "time" @@ -31,7 +30,7 @@ func TestAsyncLoadInitialLoadSucceeds(t *testing.T) { link := testutil.NewTestLink() linkContext := ipld.LinkContext{} - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() p := testutil.GeneratePeers(1)[0] resultChan := make(chan types.AsyncLoadResult, 1) @@ -61,7 +60,7 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) { link := testutil.NewTestLink() linkContext := ipld.LinkContext{} - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() resultChan := make(chan types.AsyncLoadResult, 1) p := testutil.GeneratePeers(1)[0] @@ -95,7 +94,7 @@ func TestAsyncLoadInitialLoadIndeterminateRetryFalse(t *testing.T) { link := testutil.NewTestLink() linkContext := ipld.LinkContext{} - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() p := testutil.GeneratePeers(1)[0] resultChan := make(chan types.AsyncLoadResult, 1) @@ -130,7 +129,7 @@ func TestAsyncLoadInitialLoadIndeterminateRetryTrueThenRetriedSuccess(t *testing link := testutil.NewTestLink() linkContext := ipld.LinkContext{} - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() resultChan := make(chan types.AsyncLoadResult, 1) p := testutil.GeneratePeers(1)[0] lr := NewLoadRequest(p, requestID, link, linkContext, resultChan) @@ -167,7 +166,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) { link := testutil.NewTestLink() linkContext := ipld.LinkContext{} - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() resultChan := make(chan types.AsyncLoadResult, 1) p := testutil.GeneratePeers(1)[0] lr := NewLoadRequest(p, requestID, link, linkContext, resultChan) diff --git a/requestmanager/asyncloader/responsecache/responsecache_test.go b/requestmanager/asyncloader/responsecache/responsecache_test.go index 22de3563..f5b5c991 100644 --- a/requestmanager/asyncloader/responsecache/responsecache_test.go +++ b/requestmanager/asyncloader/responsecache/responsecache_test.go @@ -2,7 +2,6 @@ package responsecache import ( "fmt" - "math/rand" "testing" blocks "github.com/ipfs/go-block-format" @@ -57,8 +56,8 @@ func (ubs *fakeUnverifiedBlockStore) blocks() []blocks.Block { func TestResponseCacheManagingLinks(t *testing.T) { blks := testutil.GenerateBlocksOfSize(5, 100) - requestID1 := graphsync.RequestID(rand.Int31()) - requestID2 := graphsync.RequestID(rand.Int31()) + requestID1 := graphsync.NewRequestID() + requestID2 := graphsync.NewRequestID() request1Metadata := metadata.Metadata{ metadata.Item{ diff --git a/requestmanager/client.go b/requestmanager/client.go index 95081b3d..fd2c959b 100644 --- a/requestmanager/client.go +++ b/requestmanager/client.go @@ -97,7 +97,6 @@ type RequestManager struct { maxLinksPerRequest uint64 // dont touch out side of run loop - nextRequestID graphsync.RequestID inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus requestHooks RequestHooks responseHooks ResponseHooks diff --git a/requestmanager/executor/executor_test.go b/requestmanager/executor/executor_test.go index 8b737e3e..34321f2e 100644 --- a/requestmanager/executor/executor_test.go +++ b/requestmanager/executor/executor_test.go @@ -179,7 +179,7 @@ func TestRequestExecutionBlockChain(t *testing.T) { persistence := testutil.NewTestStore(make(map[ipld.Link][]byte)) tbc := testutil.SetupBlockChain(ctx, t, persistence, 100, 10) fal := testloader.NewFakeAsyncLoader() - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() p := testutil.GeneratePeers(1)[0] configureLoader := data.configureLoader if configureLoader == nil { diff --git a/requestmanager/hooks/hooks_test.go b/requestmanager/hooks/hooks_test.go index 4f008f09..2df87541 100644 --- a/requestmanager/hooks/hooks_test.go +++ b/requestmanager/hooks/hooks_test.go @@ -2,7 +2,6 @@ package hooks_test import ( "errors" - "math/rand" "testing" "github.com/ipld/go-ipld-prime" @@ -29,7 +28,7 @@ func TestRequestHookProcessing(t *testing.T) { } root := testutil.GenerateCids(1)[0] - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) request := gsmsg.NewRequest(requestID, root, ssb.Matcher().Node(), graphsync.Priority(0), extension) p := testutil.GeneratePeers(1)[0] @@ -111,7 +110,7 @@ func TestBlockHookProcessing(t *testing.T) { Name: extensionName, Data: extensionUpdateData, } - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() response := gsmsg.NewResponse(requestID, graphsync.PartialResponse, extensionResponse) p := testutil.GeneratePeers(1)[0] @@ -208,7 +207,7 @@ func TestResponseHookProcessing(t *testing.T) { Name: extensionName, Data: extensionUpdateData, } - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() response := gsmsg.NewResponse(requestID, graphsync.PartialResponse, extensionResponse) p := testutil.GeneratePeers(1)[0] diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index c62d9702..8af232de 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "sort" "testing" "time" @@ -1039,9 +1038,12 @@ func readNNetworkRequests(ctx context.Context, } // because of the simultaneous request queues it's possible for the requests to go to the network layer out of order // if the requests are queued at a near identical time - sort.Slice(requestRecords, func(i, j int) bool { - return requestRecords[i].gsr.ID() < requestRecords[j].gsr.ID() - }) + // TODO: howdo? + /* + sort.Slice(requestRecords, func(i, j int) bool { + return requestRecords[i].gsr.ID() < requestRecords[j].gsr.ID() + }) + */ return requestRecords } diff --git a/requestmanager/server.go b/requestmanager/server.go index 568a52c8..2c45bdb7 100644 --- a/requestmanager/server.go +++ b/requestmanager/server.go @@ -56,10 +56,9 @@ func (rm *RequestManager) cleanupInProcessRequests() { } func (rm *RequestManager) newRequest(parentSpan trace.Span, p peer.ID, root ipld.Link, selector ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, chan graphsync.ResponseProgress, chan error) { - requestID := rm.nextRequestID - rm.nextRequestID++ + requestID := graphsync.NewRequestID() - parentSpan.SetAttributes(attribute.Int("requestID", int(requestID))) + parentSpan.SetAttributes(attribute.String("requestID", requestID.String())) ctx, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, parentSpan), "newRequest") defer span.End() diff --git a/responsemanager/hooks/hooks_test.go b/responsemanager/hooks/hooks_test.go index e8a56903..3d0e7812 100644 --- a/responsemanager/hooks/hooks_test.go +++ b/responsemanager/hooks/hooks_test.go @@ -3,7 +3,6 @@ package hooks_test import ( "errors" "io" - "math/rand" "testing" "github.com/ipld/go-ipld-prime" @@ -54,7 +53,7 @@ func TestRequestHookProcessing(t *testing.T) { } root := testutil.GenerateCids(1)[0] - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) request := gsmsg.NewRequest(requestID, root, ssb.Matcher().Node(), graphsync.Priority(0), extension) p := testutil.GeneratePeers(1)[0] @@ -232,7 +231,7 @@ func TestBlockHookProcessing(t *testing.T) { } root := testutil.GenerateCids(1)[0] - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) request := gsmsg.NewRequest(requestID, root, ssb.Matcher().Node(), graphsync.Priority(0), extension) p := testutil.GeneratePeers(1)[0] @@ -315,7 +314,7 @@ func TestUpdateHookProcessing(t *testing.T) { } root := testutil.GenerateCids(1)[0] - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) request := gsmsg.NewRequest(requestID, root, ssb.Matcher().Node(), graphsync.Priority(0), extension) update := gsmsg.UpdateRequest(requestID, extensionUpdate) diff --git a/responsemanager/queryexecutor/queryexecutor_test.go b/responsemanager/queryexecutor/queryexecutor_test.go index 3e3f2922..199c8191 100644 --- a/responsemanager/queryexecutor/queryexecutor_test.go +++ b/responsemanager/queryexecutor/queryexecutor_test.go @@ -305,7 +305,7 @@ func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData, td.responseAssembler = &fauxResponseAssembler{} td.blockHooks = hooks.NewBlockHooks() td.updateHooks = hooks.NewUpdateHooks() - td.requestID = graphsync.RequestID(rand.Int31()) + td.requestID = graphsync.NewRequestID() td.requestCid, _ = cid.Decode("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi") td.requestSelector = basicnode.NewInt(rand.Int63()) td.extensionData = testutil.RandomBytes(100) diff --git a/responsemanager/responseassembler/responseassembler_test.go b/responsemanager/responseassembler/responseassembler_test.go index b13cbd15..2a60f8ad 100644 --- a/responsemanager/responseassembler/responseassembler_test.go +++ b/responsemanager/responseassembler/responseassembler_test.go @@ -3,7 +3,6 @@ package responseassembler import ( "context" "fmt" - "math/rand" "testing" "time" @@ -24,9 +23,9 @@ func TestResponseAssemblerSendsResponses(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() p := testutil.GeneratePeers(1)[0] - requestID1 := graphsync.RequestID(rand.Int31()) - requestID2 := graphsync.RequestID(rand.Int31()) - requestID3 := graphsync.RequestID(rand.Int31()) + requestID1 := graphsync.NewRequestID() + requestID2 := graphsync.NewRequestID() + requestID3 := graphsync.NewRequestID() sendResponseNotifee1, _ := testutil.NewTestNotifee(requestID1, 10) sendResponseNotifee2, _ := testutil.NewTestNotifee(requestID2, 10) sendResponseNotifee3, _ := testutil.NewTestNotifee(requestID3, 10) @@ -116,7 +115,7 @@ func TestResponseAssemblerSendsExtensionData(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() p := testutil.GeneratePeers(1)[0] - requestID1 := graphsync.RequestID(rand.Int31()) + requestID1 := graphsync.NewRequestID() blks := testutil.GenerateBlocksOfSize(5, 100) links := make([]ipld.Link, 0, len(blks)) for _, block := range blks { @@ -161,7 +160,7 @@ func TestResponseAssemblerSendsResponsesInTransaction(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() p := testutil.GeneratePeers(1)[0] - requestID1 := graphsync.RequestID(rand.Int31()) + requestID1 := graphsync.NewRequestID() blks := testutil.GenerateBlocksOfSize(5, 100) links := make([]ipld.Link, 0, len(blks)) for _, block := range blks { @@ -198,8 +197,8 @@ func TestResponseAssemblerIgnoreBlocks(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() p := testutil.GeneratePeers(1)[0] - requestID1 := graphsync.RequestID(rand.Int31()) - requestID2 := graphsync.RequestID(rand.Int31()) + requestID1 := graphsync.NewRequestID() + requestID2 := graphsync.NewRequestID() blks := testutil.GenerateBlocksOfSize(5, 100) links := make([]ipld.Link, 0, len(blks)) for _, block := range blks { @@ -264,8 +263,8 @@ func TestResponseAssemblerSkipFirstBlocks(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() p := testutil.GeneratePeers(1)[0] - requestID1 := graphsync.RequestID(rand.Int31()) - requestID2 := graphsync.RequestID(rand.Int31()) + requestID1 := graphsync.NewRequestID() + requestID2 := graphsync.NewRequestID() blks := testutil.GenerateBlocksOfSize(5, 100) links := make([]ipld.Link, 0, len(blks)) for _, block := range blks { @@ -342,9 +341,9 @@ func TestResponseAssemblerDupKeys(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() p := testutil.GeneratePeers(1)[0] - requestID1 := graphsync.RequestID(rand.Int31()) - requestID2 := graphsync.RequestID(rand.Int31()) - requestID3 := graphsync.RequestID(rand.Int31()) + requestID1 := graphsync.NewRequestID() + requestID2 := graphsync.NewRequestID() + requestID3 := graphsync.NewRequestID() blks := testutil.GenerateBlocksOfSize(5, 100) links := make([]ipld.Link, 0, len(blks)) for _, block := range blks { diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index d8eb3084..93bea9b7 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -3,7 +3,6 @@ package responsemanager import ( "context" "errors" - "math/rand" "sync" "testing" "time" @@ -1096,7 +1095,7 @@ func newTestData(t *testing.T) testData { Name: td.extensionName, Data: td.extensionUpdateData, } - td.requestID = graphsync.RequestID(rand.Int31()) + td.requestID = graphsync.NewRequestID() td.requests = []gsmsg.GraphSyncRequest{ gsmsg.NewRequest(td.requestID, td.blockChain.TipLink.(cidlink.Link).Cid, td.blockChain.Selector(), graphsync.Priority(0), td.extension), } diff --git a/responsemanager/server.go b/responsemanager/server.go index 046dc20f..a9e057bc 100644 --- a/responsemanager/server.go +++ b/responsemanager/server.go @@ -65,7 +65,7 @@ func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSync } _, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, response.span), "processUpdate", trace.WithAttributes( - attribute.Int("id", int(update.ID())), + attribute.String("id", update.ID().String()), attribute.StringSlice("extensions", update.ExtensionNames()), )) defer span.End() @@ -181,7 +181,7 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync rm.connManager.Protect(p, request.ID().Tag()) ctx := rm.requestQueuedHooks.ProcessRequestQueuedHooks(p, request, rm.ctx) ctx, responseSpan := otel.Tracer("graphsync").Start(ctx, "response", trace.WithAttributes( - attribute.Int("id", int(request.ID())), + attribute.String("id", request.ID().String()), attribute.Int("priority", int(request.Priority())), attribute.String("root", request.Root().String()), attribute.StringSlice("extensions", request.ExtensionNames()),