diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 00000000..ba18d3a4 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,3 @@ +run: + skip-files: + - testutil/chaintypes/testchain_gen.go \ No newline at end of file diff --git a/graphsync.go b/graphsync.go index b8cfbfee..cf4efbd3 100644 --- a/graphsync.go +++ b/graphsync.go @@ -135,27 +135,37 @@ type ResponseData interface { Extension(name ExtensionName) ([]byte, bool) } -// RequestReceivedHookActions are actions that a request hook can take to change +// IncomingRequestHookActions are actions that a request hook can take to change // behavior for the response -type RequestReceivedHookActions interface { +type IncomingRequestHookActions interface { SendExtensionData(ExtensionData) - UseLoader(ipld.Loader) + UsePersistenceOption(name string) UseNodeBuilderChooser(traversal.NodeBuilderChooser) TerminateWithError(error) ValidateRequest() } -// OnRequestReceivedHook is a hook that runs each time a request is received. +// OutgoingRequestHookActions are actions that an outgoing request hook can take +// to change the execution of this request +type OutgoingRequestHookActions interface { + UsePersistenceOption(name string) + UseNodeBuilderChooser(traversal.NodeBuilderChooser) +} + +// OnIncomingRequestHook is a hook that runs each time a new request is received. // It receives the peer that sent the request and all data about the request. -// It should return: -// extensionData - any extension data to add to the outgoing response -// err - error - if not nil, halt request and return RequestRejected with the responseData -type OnRequestReceivedHook func(p peer.ID, request RequestData, hookActions RequestReceivedHookActions) +// It receives an interface for customizing the response to this request +type OnIncomingRequestHook func(p peer.ID, request RequestData, hookActions IncomingRequestHookActions) -// OnResponseReceivedHook is a hook that runs each time a response is received. +// OnIncomingResponseHook is a hook that runs each time a new response is received. // It receives the peer that sent the response and all data about the response. // If it returns an error processing is halted and the original request is cancelled. -type OnResponseReceivedHook func(p peer.ID, responseData ResponseData) error +type OnIncomingResponseHook func(p peer.ID, responseData ResponseData) error + +// OnOutgoingRequestHook is a hook that runs immediately prior to sending a request +// It receives the peer we're sending a request to and all the data aobut the request +// It receives an interface for customizing how we handle executing this request +type OnOutgoingRequestHook func(p peer.ID, request RequestData, hookActions OutgoingRequestHookActions) // UnregisterHookFunc is a function call to unregister a hook that was previously registered type UnregisterHookFunc func() @@ -165,12 +175,15 @@ type GraphExchange interface { // Request initiates a new GraphSync request to the given peer using the given selector spec. Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...ExtensionData) (<-chan ResponseProgress, <-chan error) - // RegisterRequestReceivedHook adds a hook that runs when a request is received - // If overrideDefaultValidation is set to true, then if the hook does not error, - // it is considered to have "validated" the request -- and that validation supersedes - // the normal validation of requests Graphsync does (i.e. all selectors can be accepted) - RegisterRequestReceivedHook(hook OnRequestReceivedHook) UnregisterHookFunc + // RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default + RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error + + // RegisterIncomingRequestHook adds a hook that runs when a request is received + RegisterIncomingRequestHook(hook OnIncomingRequestHook) UnregisterHookFunc + + // RegisterIncomingResponseHook adds a hook that runs when a response is received + RegisterIncomingResponseHook(OnIncomingResponseHook) UnregisterHookFunc - // RegisterResponseReceivedHook adds a hook that runs when a response is received - RegisterResponseReceivedHook(OnResponseReceivedHook) UnregisterHookFunc + // RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request + RegisterOutgoingRequestHook(hook OnOutgoingRequestHook) UnregisterHookFunc } diff --git a/impl/graphsync.go b/impl/graphsync.go index 833f8002..2a531e38 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -70,7 +70,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, } peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue) responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue) - unregisterDefaultValidator := responseManager.RegisterHook(selectorvalidator.SelectorValidator(maxRecursionDepth)) + unregisterDefaultValidator := responseManager.RegisterRequestHook(selectorvalidator.SelectorValidator(maxRecursionDepth)) graphSync := &GraphSync{ network: network, loader: loader, @@ -103,17 +103,31 @@ func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, sel return gs.requestManager.SendRequest(ctx, p, root, selector, extensions...) } -// RegisterRequestReceivedHook adds a hook that runs when a request is received +// RegisterIncomingRequestHook adds a hook that runs when a request is received // If overrideDefaultValidation is set to true, then if the hook does not error, // it is considered to have "validated" the request -- and that validation supersedes // the normal validation of requests Graphsync does (i.e. all selectors can be accepted) -func (gs *GraphSync) RegisterRequestReceivedHook(hook graphsync.OnRequestReceivedHook) graphsync.UnregisterHookFunc { - return gs.responseManager.RegisterHook(hook) +func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingRequestHook) graphsync.UnregisterHookFunc { + return gs.responseManager.RegisterRequestHook(hook) } -// RegisterResponseReceivedHook adds a hook that runs when a response is received -func (gs *GraphSync) RegisterResponseReceivedHook(hook graphsync.OnResponseReceivedHook) graphsync.UnregisterHookFunc { - return gs.requestManager.RegisterHook(hook) +// RegisterIncomingResponseHook adds a hook that runs when a response is received +func (gs *GraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc { + return gs.requestManager.RegisterResponseHook(hook) +} + +// RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request +func (gs *GraphSync) RegisterOutgoingRequestHook(hook graphsync.OnOutgoingRequestHook) graphsync.UnregisterHookFunc { + return gs.requestManager.RegisterRequestHook(hook) +} + +// RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default +func (gs *GraphSync) RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error { + err := gs.asyncLoader.RegisterPersistenceOption(name, loader, storer) + if err != nil { + return err + } + return gs.responseManager.RegisterPersistenceOption(name, loader) } type graphSyncReceiver GraphSync diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index ec84d698..52c7878a 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -99,8 +99,8 @@ func TestSendResponseToIncomingRequest(t *testing.T) { var receivedRequestData []byte // initialize graphsync on second node to response to requests gsnet := td.GraphSyncHost2() - gsnet.RegisterRequestReceivedHook( - func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) { + gsnet.RegisterIncomingRequestHook( + func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { var has bool receivedRequestData, has = requestData.Extension(td.extensionName) require.True(t, has, "did not have expected extension") @@ -190,7 +190,7 @@ func TestGraphsyncRoundTrip(t *testing.T) { var receivedResponseData []byte var receivedRequestData []byte - requestor.RegisterResponseReceivedHook( + requestor.RegisterIncomingResponseHook( func(p peer.ID, responseData graphsync.ResponseData) error { data, has := responseData.Extension(td.extensionName) if has { @@ -199,7 +199,7 @@ func TestGraphsyncRoundTrip(t *testing.T) { return nil }) - responder.RegisterRequestReceivedHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) { + responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { var has bool receivedRequestData, has = requestData.Extension(td.extensionName) if !has { @@ -220,6 +220,69 @@ func TestGraphsyncRoundTrip(t *testing.T) { require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data") } +func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { + // create network + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + td := newGsTestData(ctx, t) + + // initialize graphsync on first node to make requests + requestor := td.GraphSyncHost1() + + // initialize graphsync on second node to response to requests + responder := td.GraphSyncHost2() + + // alternate storing location for responder + altStore1 := make(map[ipld.Link][]byte) + altLoader1, altStorer1 := testutil.NewTestStore(altStore1) + + // alternate storing location for requestor + altStore2 := make(map[ipld.Link][]byte) + altLoader2, altStorer2 := testutil.NewTestStore(altStore2) + + err := requestor.RegisterPersistenceOption("chainstore", altLoader1, altStorer1) + require.NoError(t, err) + + err = responder.RegisterPersistenceOption("chainstore", altLoader2, altStorer2) + require.NoError(t, err) + + blockChainLength := 100 + blockChain := testutil.SetupBlockChain(ctx, t, altLoader1, altStorer2, 100, blockChainLength) + + extensionName := graphsync.ExtensionName("blockchain") + extension := graphsync.ExtensionData{ + Name: extensionName, + Data: nil, + } + + requestor.RegisterOutgoingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) { + _, has := requestData.Extension(extensionName) + if has { + hookActions.UseNodeBuilderChooser(blockChain.Chooser) + hookActions.UsePersistenceOption("chainstore") + } + }) + responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { + _, has := requestData.Extension(extensionName) + if has { + hookActions.UseNodeBuilderChooser(blockChain.Chooser) + hookActions.UsePersistenceOption("chainstore") + } + }) + + progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector()) + testutil.VerifyEmptyResponse(ctx, t, progressChan) + testutil.VerifySingleTerminalError(ctx, t, errChan) + + progressChan, errChan = requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), extension) + + blockChain.VerifyWholeChainWithTypes(ctx, progressChan) + testutil.VerifyEmptyErrors(ctx, t, errChan) + require.Len(t, td.blockStore1, 0, "should store no blocks in normal store") + require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store") +} + // TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work // under a specific of adverse conditions: // -- large blocks being returned by a query @@ -360,7 +423,7 @@ func TestUnixFSFetch(t *testing.T) { requestor := New(ctx, td.gsnet1, loader1, storer1) responder := New(ctx, td.gsnet2, loader2, storer2) extensionName := graphsync.ExtensionName("Free for all") - responder.RegisterRequestReceivedHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) { + responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() hookActions.SendExtensionData(graphsync.ExtensionData{ Name: extensionName, diff --git a/requestmanager/asyncloader/asyncloader.go b/requestmanager/asyncloader/asyncloader.go index d04faf56..480e62cd 100644 --- a/requestmanager/asyncloader/asyncloader.go +++ b/requestmanager/asyncloader/asyncloader.go @@ -20,6 +20,11 @@ type loaderMessage interface { handle(al *AsyncLoader) } +type alternateQueue struct { + responseCache *responsecache.ResponseCache + loadAttemptQueue *loadattemptqueue.LoadAttemptQueue +} + // AsyncLoader manages loading links asynchronously in as new responses // come in from the network type AsyncLoader struct { @@ -28,38 +33,30 @@ type AsyncLoader struct { incomingMessages chan loaderMessage outgoingMessages chan loaderMessage - activeRequests map[graphsync.RequestID]bool - loadAttemptQueue *loadattemptqueue.LoadAttemptQueue + defaultLoader ipld.Loader + defaultStorer ipld.Storer + activeRequests map[graphsync.RequestID]struct{} + requestQueues map[graphsync.RequestID]string + alternateQueues map[string]alternateQueue responseCache *responsecache.ResponseCache + loadAttemptQueue *loadattemptqueue.LoadAttemptQueue } // New initializes a new link loading manager for asynchronous loads from the given context // and local store loading and storing function func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoader { - unverifiedBlockStore := unverifiedblockstore.New(storer) - responseCache := responsecache.New(unverifiedBlockStore) - loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) ([]byte, error) { - // load from response cache - data, err := responseCache.AttemptLoad(requestID, link) - if data == nil && err == nil { - // fall back to local store - stream, loadErr := loader(link, ipld.LinkContext{}) - if stream != nil && loadErr == nil { - localData, loadErr := ioutil.ReadAll(stream) - if loadErr == nil && localData != nil { - return localData, nil - } - } - } - return data, err - }) + responseCache, loadAttemptQueue := setupAttemptQueue(loader, storer) ctx, cancel := context.WithCancel(ctx) return &AsyncLoader{ ctx: ctx, cancel: cancel, incomingMessages: make(chan loaderMessage), outgoingMessages: make(chan loaderMessage), - activeRequests: make(map[graphsync.RequestID]bool), + defaultLoader: loader, + defaultStorer: storer, + activeRequests: make(map[graphsync.RequestID]struct{}), + requestQueues: make(map[graphsync.RequestID]string), + alternateQueues: make(map[string]alternateQueue), responseCache: responseCache, loadAttemptQueue: loadAttemptQueue, } @@ -76,12 +73,39 @@ func (al *AsyncLoader) Shutdown() { al.cancel() } +// RegisterPersistenceOption registers a new loader/storer option for processing requests +func (al *AsyncLoader) RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error { + if name == "" { + return errors.New("Persistence option must have a name") + } + response := make(chan error, 1) + select { + case <-al.ctx.Done(): + return errors.New("context closed") + case al.incomingMessages <- ®isterPersistenceOptionMessage{name, loader, storer, response}: + } + select { + case <-al.ctx.Done(): + return errors.New("context closed") + case err := <-response: + return err + } +} + // StartRequest indicates the given request has started and the manager should // continually attempt to load links for this request as new responses come in -func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID) { +func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOption string) error { + response := make(chan error, 1) + select { + case <-al.ctx.Done(): + return errors.New("context closed") + case al.incomingMessages <- &startRequestMessage{requestID, persistenceOption, response}: + } select { case <-al.ctx.Done(): - case al.incomingMessages <- &startRequestMessage{requestID}: + return errors.New("context closed") + case err := <-response: + return err } } @@ -89,10 +113,9 @@ func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID) { // neccesary func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) { - al.responseCache.ProcessResponse(responses, blks) select { case <-al.ctx.Done(): - case al.incomingMessages <- &newResponsesAvailableMessage{}: + case al.incomingMessages <- &newResponsesAvailableMessage{responses, blks}: } } @@ -124,7 +147,10 @@ func (al *AsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) { // and no further attempts will be made to load links for this request, // so any cached response data is invalid can be cleaned func (al *AsyncLoader) CleanupRequest(requestID graphsync.RequestID) { - al.responseCache.FinishRequest(requestID) + select { + case <-al.ctx.Done(): + case al.incomingMessages <- &cleanupRequestMessage{requestID}: + } } type loadRequestMessage struct { @@ -133,16 +159,31 @@ type loadRequestMessage struct { } type newResponsesAvailableMessage struct { + responses map[graphsync.RequestID]metadata.Metadata + blks []blocks.Block +} + +type registerPersistenceOptionMessage struct { + name string + loader ipld.Loader + storer ipld.Storer + response chan error } type startRequestMessage struct { - requestID graphsync.RequestID + requestID graphsync.RequestID + persistenceOption string + response chan error } type finishRequestMessage struct { requestID graphsync.RequestID } +type cleanupRequestMessage struct { + requestID graphsync.RequestID +} + func (al *AsyncLoader) run() { for { select { @@ -180,20 +221,117 @@ func (al *AsyncLoader) messageQueueWorker() { } } +func (al *AsyncLoader) getLoadAttemptQueue(queue string) *loadattemptqueue.LoadAttemptQueue { + if queue == "" { + return al.loadAttemptQueue + } + return al.alternateQueues[queue].loadAttemptQueue +} + +func (al *AsyncLoader) getResponseCache(queue string) *responsecache.ResponseCache { + if queue == "" { + return al.responseCache + } + return al.alternateQueues[queue].responseCache +} + func (lrm *loadRequestMessage) handle(al *AsyncLoader) { - retry := al.activeRequests[lrm.requestID] - al.loadAttemptQueue.AttemptLoad(lrm.loadRequest, retry) + _, retry := al.activeRequests[lrm.requestID] + loadAttemptQueue := al.getLoadAttemptQueue(al.requestQueues[lrm.requestID]) + loadAttemptQueue.AttemptLoad(lrm.loadRequest, retry) +} + +func (rpom *registerPersistenceOptionMessage) register(al *AsyncLoader) error { + _, existing := al.alternateQueues[rpom.name] + if existing { + return errors.New("already registerd a persistence option with this name") + } + responseCache, loadAttemptQueue := setupAttemptQueue(rpom.loader, rpom.storer) + al.alternateQueues[rpom.name] = alternateQueue{responseCache, loadAttemptQueue} + return nil +} + +func (rpom *registerPersistenceOptionMessage) handle(al *AsyncLoader) { + err := rpom.register(al) + select { + case <-al.ctx.Done(): + case rpom.response <- err: + } +} + +func (srm *startRequestMessage) startRequest(al *AsyncLoader) error { + if srm.persistenceOption != "" { + _, ok := al.alternateQueues[srm.persistenceOption] + if !ok { + return errors.New("Unknown persistence option") + } + al.requestQueues[srm.requestID] = srm.persistenceOption + } + al.activeRequests[srm.requestID] = struct{}{} + return nil } func (srm *startRequestMessage) handle(al *AsyncLoader) { - al.activeRequests[srm.requestID] = true + err := srm.startRequest(al) + select { + case <-al.ctx.Done(): + case srm.response <- err: + } } func (frm *finishRequestMessage) handle(al *AsyncLoader) { delete(al.activeRequests, frm.requestID) - al.loadAttemptQueue.ClearRequest(frm.requestID) + loadAttemptQueue := al.getLoadAttemptQueue(al.requestQueues[frm.requestID]) + loadAttemptQueue.ClearRequest(frm.requestID) } func (nram *newResponsesAvailableMessage) handle(al *AsyncLoader) { - al.loadAttemptQueue.RetryLoads() + byQueue := make(map[string][]graphsync.RequestID) + for requestID := range nram.responses { + queue := al.requestQueues[requestID] + byQueue[queue] = append(byQueue[queue], requestID) + } + for queue, requestIDs := range byQueue { + loadAttemptQueue := al.getLoadAttemptQueue(queue) + responseCache := al.getResponseCache(queue) + responses := make(map[graphsync.RequestID]metadata.Metadata, len(requestIDs)) + for _, requestID := range requestIDs { + responses[requestID] = nram.responses[requestID] + } + responseCache.ProcessResponse(responses, nram.blks) + loadAttemptQueue.RetryLoads() + } +} + +func (crm *cleanupRequestMessage) handle(al *AsyncLoader) { + aq, ok := al.requestQueues[crm.requestID] + if ok { + al.alternateQueues[aq].responseCache.FinishRequest(crm.requestID) + delete(al.requestQueues, crm.requestID) + return + } + al.responseCache.FinishRequest(crm.requestID) +} + +func setupAttemptQueue(loader ipld.Loader, storer ipld.Storer) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) { + + unverifiedBlockStore := unverifiedblockstore.New(storer) + responseCache := responsecache.New(unverifiedBlockStore) + loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) ([]byte, error) { + // load from response cache + data, err := responseCache.AttemptLoad(requestID, link) + if data == nil && err == nil { + // fall back to local store + stream, loadErr := loader(link, ipld.LinkContext{}) + if stream != nil && loadErr == nil { + localData, loadErr := ioutil.ReadAll(stream) + if loadErr == nil && localData != nil { + return localData, nil + } + } + } + return data, err + }) + + return responseCache, loadAttemptQueue } diff --git a/requestmanager/asyncloader/asyncloader_test.go b/requestmanager/asyncloader/asyncloader_test.go index 4482c1ff..debc54f6 100644 --- a/requestmanager/asyncloader/asyncloader_test.go +++ b/requestmanager/asyncloader/asyncloader_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/metadata" "github.com/ipfs/go-graphsync/requestmanager/types" @@ -18,312 +19,342 @@ import ( ) func TestAsyncLoadInitialLoadSucceedsLocallyPresent(t *testing.T) { - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - callCount := 0 - blockStore := make(map[ipld.Link][]byte) - loader, storer := testutil.NewTestStore(blockStore) block := testutil.GenerateBlocksOfSize(1, 100)[0] - writer, commit, err := storer(ipld.LinkContext{}) - require.NoError(t, err) - _, err = writer.Write(block.RawData()) - require.NoError(t, err, "seeds block store") - link := cidlink.Link{Cid: block.Cid()} - err = commit(link) - require.NoError(t, err, "seeds block store") - - wrappedLoader := func(link ipld.Link, linkContext ipld.LinkContext) (io.Reader, error) { - callCount++ - return loader(link, linkContext) - } - - asyncLoader := New(ctx, wrappedLoader, storer) - asyncLoader.Startup() - - requestID := graphsync.RequestID(rand.Int31()) - resultChan := asyncLoader.AsyncLoad(requestID, link) - - var result types.AsyncLoadResult - testutil.AssertReceive(ctx, t, resultChan, &result, "should close response channel with response") - require.NotNil(t, result.Data, "should send response") - require.Nil(t, result.Err, "should not send error") - - require.NotZero(t, callCount, "should attempt to load link from local store") + st := newStore() + link := st.Store(t, block) + withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { + requestID := graphsync.RequestID(rand.Int31()) + resultChan := asyncLoader.AsyncLoad(requestID, link) + assertSuccessResponse(ctx, t, resultChan) + st.AssertLocalLoads(t, 1) + }) } func TestAsyncLoadInitialLoadSucceedsResponsePresent(t *testing.T) { - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - callCount := 0 - blockStore := make(map[ipld.Link][]byte) - loader, storer := testutil.NewTestStore(blockStore) blocks := testutil.GenerateBlocksOfSize(1, 100) block := blocks[0] - link := cidlink.Link{Cid: block.Cid()} - wrappedLoader := func(link ipld.Link, linkContext ipld.LinkContext) (io.Reader, error) { - callCount++ - return loader(link, linkContext) - } - - asyncLoader := New(ctx, wrappedLoader, storer) - asyncLoader.Startup() + st := newStore() + withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { + requestID := graphsync.RequestID(rand.Int31()) + responses := map[graphsync.RequestID]metadata.Metadata{ + requestID: metadata.Metadata{ + metadata.Item{ + Link: link, + BlockPresent: true, + }, + }, + } + asyncLoader.ProcessResponse(responses, blocks) + resultChan := asyncLoader.AsyncLoad(requestID, link) + + assertSuccessResponse(ctx, t, resultChan) + st.AssertLocalLoads(t, 0) + st.AssertBlockStored(t, block) + }) +} - requestID := graphsync.RequestID(rand.Int31()) - responses := map[graphsync.RequestID]metadata.Metadata{ - requestID: metadata.Metadata{ - metadata.Item{ - Link: link, - BlockPresent: true, +func TestAsyncLoadInitialLoadFails(t *testing.T) { + st := newStore() + withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { + link := testutil.NewTestLink() + requestID := graphsync.RequestID(rand.Int31()) + + responses := map[graphsync.RequestID]metadata.Metadata{ + requestID: metadata.Metadata{ + metadata.Item{ + Link: link, + BlockPresent: false, + }, }, - }, - } - asyncLoader.ProcessResponse(responses, blocks) - resultChan := asyncLoader.AsyncLoad(requestID, link) + } + asyncLoader.ProcessResponse(responses, nil) - var result types.AsyncLoadResult - testutil.AssertReceive(ctx, t, resultChan, &result, "should close response channel with response") - require.NotNil(t, result.Data, "should send response") - require.Nil(t, result.Err, "should not send error") + resultChan := asyncLoader.AsyncLoad(requestID, link) + assertFailResponse(ctx, t, resultChan) + st.AssertLocalLoads(t, 0) + }) +} - require.Zero(t, callCount, "should not attempt to load link from local store") - require.Equal(t, block.RawData(), blockStore[link], "should store block") +func TestAsyncLoadInitialLoadIndeterminateWhenRequestNotInProgress(t *testing.T) { + st := newStore() + withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { + link := testutil.NewTestLink() + requestID := graphsync.RequestID(rand.Int31()) + resultChan := asyncLoader.AsyncLoad(requestID, link) + assertFailResponse(ctx, t, resultChan) + st.AssertLocalLoads(t, 1) + }) } -func TestAsyncLoadInitialLoadFails(t *testing.T) { - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - callCount := 0 - blockStore := make(map[ipld.Link][]byte) - loader, storer := testutil.NewTestStore(blockStore) +func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) { + blocks := testutil.GenerateBlocksOfSize(1, 100) + block := blocks[0] + link := cidlink.Link{Cid: block.Cid()} - wrappedLoader := func(link ipld.Link, linkContext ipld.LinkContext) (io.Reader, error) { - callCount++ - return loader(link, linkContext) - } + st := newStore() - asyncLoader := New(ctx, wrappedLoader, storer) - asyncLoader.Startup() + withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { + requestID := graphsync.RequestID(rand.Int31()) + err := asyncLoader.StartRequest(requestID, "") + require.NoError(t, err) + resultChan := asyncLoader.AsyncLoad(requestID, link) - link := testutil.NewTestLink() - requestID := graphsync.RequestID(rand.Int31()) + st.AssertAttemptLoadWithoutResult(ctx, t, resultChan) - responses := map[graphsync.RequestID]metadata.Metadata{ - requestID: metadata.Metadata{ - metadata.Item{ - Link: link, - BlockPresent: false, + responses := map[graphsync.RequestID]metadata.Metadata{ + requestID: metadata.Metadata{ + metadata.Item{ + Link: link, + BlockPresent: true, + }, }, - }, - } - asyncLoader.ProcessResponse(responses, nil) + } + asyncLoader.ProcessResponse(responses, blocks) + assertSuccessResponse(ctx, t, resultChan) + st.AssertLocalLoads(t, 1) + st.AssertBlockStored(t, block) + }) +} - resultChan := asyncLoader.AsyncLoad(requestID, link) +func TestAsyncLoadInitialLoadIndeterminateThenFails(t *testing.T) { + st := newStore() + + withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { + link := testutil.NewTestLink() + requestID := graphsync.RequestID(rand.Int31()) + err := asyncLoader.StartRequest(requestID, "") + require.NoError(t, err) + resultChan := asyncLoader.AsyncLoad(requestID, link) + + st.AssertAttemptLoadWithoutResult(ctx, t, resultChan) + + responses := map[graphsync.RequestID]metadata.Metadata{ + requestID: metadata.Metadata{ + metadata.Item{ + Link: link, + BlockPresent: false, + }, + }, + } + asyncLoader.ProcessResponse(responses, nil) + assertFailResponse(ctx, t, resultChan) + st.AssertLocalLoads(t, 1) + }) +} - var result types.AsyncLoadResult - testutil.AssertReceive(ctx, t, resultChan, &result, "should close response channel with response") - require.Nil(t, result.Data, "should not send responses") - require.NotNil(t, result.Err, "should send an error") - require.Zero(t, callCount, "should not attempt to load link from local store") +func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) { + st := newStore() + withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { + link := testutil.NewTestLink() + requestID := graphsync.RequestID(rand.Int31()) + err := asyncLoader.StartRequest(requestID, "") + require.NoError(t, err) + resultChan := asyncLoader.AsyncLoad(requestID, link) + st.AssertAttemptLoadWithoutResult(ctx, t, resultChan) + asyncLoader.CompleteResponsesFor(requestID) + assertFailResponse(ctx, t, resultChan) + st.AssertLocalLoads(t, 1) + }) } -func TestAsyncLoadInitialLoadIndeterminateWhenRequestNotInProgress(t *testing.T) { - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - callCount := 0 - blockStore := make(map[ipld.Link][]byte) - loader, storer := testutil.NewTestStore(blockStore) +func TestAsyncLoadTwiceLoadsLocallySecondTime(t *testing.T) { + blocks := testutil.GenerateBlocksOfSize(1, 100) + block := blocks[0] + link := cidlink.Link{Cid: block.Cid()} + st := newStore() + withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { + requestID := graphsync.RequestID(rand.Int31()) + responses := map[graphsync.RequestID]metadata.Metadata{ + requestID: metadata.Metadata{ + metadata.Item{ + Link: link, + BlockPresent: true, + }, + }, + } + asyncLoader.ProcessResponse(responses, blocks) + resultChan := asyncLoader.AsyncLoad(requestID, link) - wrappedLoader := func(link ipld.Link, linkContext ipld.LinkContext) (io.Reader, error) { - callCount++ - return loader(link, linkContext) - } + assertSuccessResponse(ctx, t, resultChan) + st.AssertLocalLoads(t, 0) - asyncLoader := New(ctx, wrappedLoader, storer) - asyncLoader.Startup() + resultChan = asyncLoader.AsyncLoad(requestID, link) + assertSuccessResponse(ctx, t, resultChan) + st.AssertLocalLoads(t, 1) - link := testutil.NewTestLink() - requestID := graphsync.RequestID(rand.Int31()) - resultChan := asyncLoader.AsyncLoad(requestID, link) + st.AssertBlockStored(t, block) + }) +} - var result types.AsyncLoadResult - testutil.AssertReceive(ctx, t, resultChan, &result, "should close response channel with response") - require.Nil(t, result.Data, "should not send responses") - require.NotNil(t, result.Err, "should send an error") - require.NotZero(t, callCount, "should attempt to load link from local store") +func TestRequestSplittingLoadLocallyFromBlockstore(t *testing.T) { + st := newStore() + otherSt := newStore() + block := testutil.GenerateBlocksOfSize(1, 100)[0] + link := otherSt.Store(t, block) + withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { + err := asyncLoader.RegisterPersistenceOption("other", otherSt.loader, otherSt.storer) + require.NoError(t, err) + requestID1 := graphsync.RequestID(rand.Int31()) + resultChan1 := asyncLoader.AsyncLoad(requestID1, link) + requestID2 := graphsync.RequestID(rand.Int31()) + err = asyncLoader.StartRequest(requestID2, "other") + require.NoError(t, err) + resultChan2 := asyncLoader.AsyncLoad(requestID2, link) + + assertFailResponse(ctx, t, resultChan1) + assertSuccessResponse(ctx, t, resultChan2) + st.AssertLocalLoads(t, 1) + }) } -func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) { - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - callCount := 0 - blockStore := make(map[ipld.Link][]byte) - loader, storer := testutil.NewTestStore(blockStore) +func TestRequestSplittingSameBlockTwoStores(t *testing.T) { + st := newStore() + otherSt := newStore() blocks := testutil.GenerateBlocksOfSize(1, 100) block := blocks[0] - link := cidlink.Link{Cid: block.Cid()} - called := make(chan struct{}, 2) - wrappedLoader := func(link ipld.Link, linkContext ipld.LinkContext) (io.Reader, error) { - called <- struct{}{} - callCount++ - return loader(link, linkContext) - } - - asyncLoader := New(ctx, wrappedLoader, storer) - asyncLoader.Startup() - - requestID := graphsync.RequestID(rand.Int31()) - asyncLoader.StartRequest(requestID) - resultChan := asyncLoader.AsyncLoad(requestID, link) - - testutil.AssertDoesReceiveFirst(t, called, "should attempt load with no result", resultChan, ctx.Done()) - - responses := map[graphsync.RequestID]metadata.Metadata{ - requestID: metadata.Metadata{ - metadata.Item{ - Link: link, - BlockPresent: true, + withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { + err := asyncLoader.RegisterPersistenceOption("other", otherSt.loader, otherSt.storer) + require.NoError(t, err) + requestID1 := graphsync.RequestID(rand.Int31()) + requestID2 := graphsync.RequestID(rand.Int31()) + err = asyncLoader.StartRequest(requestID1, "") + require.NoError(t, err) + err = asyncLoader.StartRequest(requestID2, "other") + require.NoError(t, err) + resultChan1 := asyncLoader.AsyncLoad(requestID1, link) + resultChan2 := asyncLoader.AsyncLoad(requestID2, link) + responses := map[graphsync.RequestID]metadata.Metadata{ + requestID1: metadata.Metadata{ + metadata.Item{ + Link: link, + BlockPresent: true, + }, }, - }, - } - asyncLoader.ProcessResponse(responses, blocks) - - var result types.AsyncLoadResult - testutil.AssertReceive(ctx, t, resultChan, &result, "should close response channel with response") - require.NotNil(t, result.Data, "should send response") - require.Nil(t, result.Err, "should not send error") - - require.Equal(t, 1, callCount, "should attempt to load from local store exactly once") - - require.Equal(t, block.RawData(), blockStore[link], "should store block") + requestID2: metadata.Metadata{ + metadata.Item{ + Link: link, + BlockPresent: true, + }, + }, + } + asyncLoader.ProcessResponse(responses, blocks) + + assertSuccessResponse(ctx, t, resultChan1) + assertSuccessResponse(ctx, t, resultChan2) + st.AssertBlockStored(t, block) + otherSt.AssertBlockStored(t, block) + }) } -func TestAsyncLoadInitialLoadIndeterminateThenFails(t *testing.T) { - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - callCount := 0 - blockStore := make(map[ipld.Link][]byte) - loader, storer := testutil.NewTestStore(blockStore) - - link := testutil.NewTestLink() - called := make(chan struct{}, 2) - wrappedLoader := func(link ipld.Link, linkContext ipld.LinkContext) (io.Reader, error) { - called <- struct{}{} - callCount++ - return loader(link, linkContext) - } - - asyncLoader := New(ctx, wrappedLoader, storer) - asyncLoader.Startup() +func TestRequestSplittingSameBlockOnlyOneResponse(t *testing.T) { + st := newStore() + otherSt := newStore() + blocks := testutil.GenerateBlocksOfSize(1, 100) + block := blocks[0] + link := cidlink.Link{Cid: block.Cid()} + withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { + err := asyncLoader.RegisterPersistenceOption("other", otherSt.loader, otherSt.storer) + require.NoError(t, err) + requestID1 := graphsync.RequestID(rand.Int31()) + requestID2 := graphsync.RequestID(rand.Int31()) + err = asyncLoader.StartRequest(requestID1, "") + require.NoError(t, err) + err = asyncLoader.StartRequest(requestID2, "other") + require.NoError(t, err) + resultChan1 := asyncLoader.AsyncLoad(requestID1, link) + resultChan2 := asyncLoader.AsyncLoad(requestID2, link) + responses := map[graphsync.RequestID]metadata.Metadata{ + requestID2: metadata.Metadata{ + metadata.Item{ + Link: link, + BlockPresent: true, + }, + }, + } + asyncLoader.ProcessResponse(responses, blocks) + asyncLoader.CompleteResponsesFor(requestID1) + + assertFailResponse(ctx, t, resultChan1) + assertSuccessResponse(ctx, t, resultChan2) + otherSt.AssertBlockStored(t, block) + }) +} - requestID := graphsync.RequestID(rand.Int31()) - asyncLoader.StartRequest(requestID) - resultChan := asyncLoader.AsyncLoad(requestID, link) +type store struct { + internalLoader ipld.Loader + storer ipld.Storer + blockstore map[ipld.Link][]byte + localLoads int + called chan struct{} +} - testutil.AssertDoesReceiveFirst(t, called, "should attempt load with no result", resultChan, ctx.Done()) - responses := map[graphsync.RequestID]metadata.Metadata{ - requestID: metadata.Metadata{ - metadata.Item{ - Link: link, - BlockPresent: false, - }, - }, +func newStore() *store { + blockstore := make(map[ipld.Link][]byte) + loader, storer := testutil.NewTestStore(blockstore) + return &store{ + internalLoader: loader, + storer: storer, + blockstore: blockstore, + localLoads: 0, + called: make(chan struct{}), } - asyncLoader.ProcessResponse(responses, nil) - - var result types.AsyncLoadResult - testutil.AssertReceive(ctx, t, resultChan, &result, "should close response channel with response") - require.Nil(t, result.Data, "should not send responses") - require.NotNil(t, result.Err, "should send an error") - require.Equal(t, 1, callCount, "should attempt to load from local store exactly once") } -func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) { - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - callCount := 0 - blockStore := make(map[ipld.Link][]byte) - loader, storer := testutil.NewTestStore(blockStore) - - link := testutil.NewTestLink() - called := make(chan struct{}, 2) - wrappedLoader := func(link ipld.Link, linkContext ipld.LinkContext) (io.Reader, error) { - called <- struct{}{} - callCount++ - return loader(link, linkContext) +func (st *store) loader(lnk ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) { + select { + case <-st.called: + default: + close(st.called) } + st.localLoads++ + return st.internalLoader(lnk, lnkCtx) +} - asyncLoader := New(ctx, wrappedLoader, storer) - asyncLoader.Startup() +func (st *store) AssertLocalLoads(t *testing.T, localLoads int) { + require.Equalf(t, localLoads, st.localLoads, "should have loaded locally %d times", localLoads) +} - requestID := graphsync.RequestID(rand.Int31()) - asyncLoader.StartRequest(requestID) - resultChan := asyncLoader.AsyncLoad(requestID, link) +func (st *store) AssertBlockStored(t *testing.T, blk blocks.Block) { + require.Equal(t, blk.RawData(), st.blockstore[cidlink.Link{Cid: blk.Cid()}], "should store block") +} - testutil.AssertDoesReceiveFirst(t, called, "should attempt load with no result", resultChan, ctx.Done()) - asyncLoader.CompleteResponsesFor(requestID) +func (st *store) AssertAttemptLoadWithoutResult(ctx context.Context, t *testing.T, resultChan <-chan types.AsyncLoadResult) { + testutil.AssertDoesReceiveFirst(t, st.called, "should attempt load with no result", resultChan, ctx.Done()) +} - var result types.AsyncLoadResult - testutil.AssertReceive(ctx, t, resultChan, &result, "should close response channel with response") - require.Nil(t, result.Data, "should not send responses") - require.NotNil(t, result.Err, "should send an error") - require.Equal(t, 1, callCount, "should attempt to load from local store exactly once") +func (st *store) Store(t *testing.T, blk blocks.Block) ipld.Link { + writer, commit, err := st.storer(ipld.LinkContext{}) + require.NoError(t, err) + _, err = writer.Write(blk.RawData()) + require.NoError(t, err, "seeds block store") + link := cidlink.Link{Cid: blk.Cid()} + err = commit(link) + require.NoError(t, err, "seeds block store") + return link } -func TestAsyncLoadTwiceLoadsLocallySecondTime(t *testing.T) { +func withLoader(st *store, exec func(ctx context.Context, asyncLoader *AsyncLoader)) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - callCount := 0 - blockStore := make(map[ipld.Link][]byte) - loader, storer := testutil.NewTestStore(blockStore) - blocks := testutil.GenerateBlocksOfSize(1, 100) - block := blocks[0] - - link := cidlink.Link{Cid: block.Cid()} - - wrappedLoader := func(link ipld.Link, linkContext ipld.LinkContext) (io.Reader, error) { - callCount++ - return loader(link, linkContext) - } - - asyncLoader := New(ctx, wrappedLoader, storer) + asyncLoader := New(ctx, st.loader, st.storer) asyncLoader.Startup() + exec(ctx, asyncLoader) +} - requestID := graphsync.RequestID(rand.Int31()) - responses := map[graphsync.RequestID]metadata.Metadata{ - requestID: metadata.Metadata{ - metadata.Item{ - Link: link, - BlockPresent: true, - }, - }, - } - asyncLoader.ProcessResponse(responses, blocks) - resultChan := asyncLoader.AsyncLoad(requestID, link) - +func assertSuccessResponse(ctx context.Context, t *testing.T, resultChan <-chan types.AsyncLoadResult) { var result types.AsyncLoadResult testutil.AssertReceive(ctx, t, resultChan, &result, "should close response channel with response") require.NotNil(t, result.Data, "should send response") require.Nil(t, result.Err, "should not send error") +} - require.Zero(t, callCount, "should not attempt to load link from local store") - require.Equal(t, block.RawData(), blockStore[link], "should store block") - - resultChan = asyncLoader.AsyncLoad(requestID, link) - +func assertFailResponse(ctx context.Context, t *testing.T, resultChan <-chan types.AsyncLoadResult) { + var result types.AsyncLoadResult testutil.AssertReceive(ctx, t, resultChan, &result, "should close response channel with response") - require.NotNil(t, result.Data, "should send response") - require.Nil(t, result.Err, "should not send error") - require.NotZero(t, callCount, "should attempt to load link from local store") - require.Equal(t, block.RawData(), blockStore[link], "should store block") + require.Nil(t, result.Data, "should not send responses") + require.NotNil(t, result.Err, "should send an error") } diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go index 459f95fb..4ee7745d 100644 --- a/requestmanager/requestmanager.go +++ b/requestmanager/requestmanager.go @@ -15,6 +15,7 @@ import ( logging "github.com/ipfs/go-log" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/libp2p/go-libp2p-core/peer" ) @@ -35,7 +36,12 @@ type inProgressRequestStatus struct { type responseHook struct { key uint64 - hook graphsync.OnResponseReceivedHook + hook graphsync.OnIncomingResponseHook +} + +type requestHook struct { + key uint64 + hook graphsync.OnOutgoingRequestHook } // PeerHandler is an interface that can send requests to peers @@ -46,7 +52,7 @@ type PeerHandler interface { // AsyncLoader is an interface for loading links asynchronously, returning // results as new responses are processed type AsyncLoader interface { - StartRequest(requestID graphsync.RequestID) + StartRequest(graphsync.RequestID, string) error ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult @@ -66,8 +72,9 @@ type RequestManager struct { // dont touch out side of run loop nextRequestID graphsync.RequestID inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus - responseHookNextKey uint64 + hooksNextKey uint64 responseHooks []responseHook + requestHooks []requestHook } type requestManagerMessage interface { @@ -203,17 +210,38 @@ func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyn } } -type registerHookMessage struct { - hook graphsync.OnResponseReceivedHook +type registerRequestHookMessage struct { + hook graphsync.OnOutgoingRequestHook + unregisterHookChan chan graphsync.UnregisterHookFunc +} + +type registerResponseHookMessage struct { + hook graphsync.OnIncomingResponseHook unregisterHookChan chan graphsync.UnregisterHookFunc } -// RegisterHook registers an extension to processincoming responses -func (rm *RequestManager) RegisterHook( - hook graphsync.OnResponseReceivedHook) graphsync.UnregisterHookFunc { +// RegisterRequestHook registers an extension to process outgoing requests +func (rm *RequestManager) RegisterRequestHook(hook graphsync.OnOutgoingRequestHook) graphsync.UnregisterHookFunc { + response := make(chan graphsync.UnregisterHookFunc) + select { + case rm.messages <- ®isterRequestHookMessage{hook, response}: + case <-rm.ctx.Done(): + return nil + } + select { + case unregister := <-response: + return unregister + case <-rm.ctx.Done(): + return nil + } +} + +// RegisterResponseHook registers an extension to process incoming responses +func (rm *RequestManager) RegisterResponseHook( + hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc { response := make(chan graphsync.UnregisterHookFunc) select { - case rm.messages <- ®isterHookMessage{hook, response}: + case rm.messages <- ®isterResponseHookMessage{hook, response}: case <-rm.ctx.Done(): return nil } @@ -300,9 +328,26 @@ func (prm *processResponseMessage) handle(rm *RequestManager) { rm.processTerminations(filteredResponses) } -func (rhm *registerHookMessage) handle(rm *RequestManager) { - rh := responseHook{rm.responseHookNextKey, rhm.hook} - rm.responseHookNextKey++ +func (rhm *registerRequestHookMessage) handle(rm *RequestManager) { + rh := requestHook{rm.hooksNextKey, rhm.hook} + rm.hooksNextKey++ + rm.requestHooks = append(rm.requestHooks, rh) + select { + case rhm.unregisterHookChan <- func() { + for i, matchHook := range rm.requestHooks { + if rh.key == matchHook.key { + rm.requestHooks = append(rm.requestHooks[:i], rm.requestHooks[i+1:]...) + return + } + } + }: + case <-rm.ctx.Done(): + } +} + +func (rhm *registerResponseHookMessage) handle(rm *RequestManager) { + rh := responseHook{rm.hooksNextKey, rhm.hook} + rm.hooksNextKey++ rm.responseHooks = append(rm.responseHooks, rh) select { case rhm.unregisterHookChan <- func() { @@ -390,6 +435,19 @@ func (rm *RequestManager) generateResponseErrorFromStatus(status graphsync.Respo } } +type hookActions struct { + persistenceOption string + nodeBuilderChooser traversal.NodeBuilderChooser +} + +func (ha *hookActions) UsePersistenceOption(name string) { + ha.persistenceOption = name +} + +func (ha *hookActions) UseNodeBuilderChooser(nodeBuilderChooser traversal.NodeBuilderChooser) { + ha.nodeBuilderChooser = nodeBuilderChooser +} + func (rm *RequestManager) setupRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node, extensions []graphsync.ExtensionData) (chan graphsync.ResponseProgress, chan error) { _, err := ipldutil.EncodeNode(selectorSpec) if err != nil { @@ -408,9 +466,17 @@ func (rm *RequestManager) setupRequest(requestID graphsync.RequestID, p peer.ID, rm.inProgressRequestStatuses[requestID] = &inProgressRequestStatus{ ctx, cancel, p, networkErrorChan, } - rm.asyncLoader.StartRequest(requestID) - rm.peerHandler.SendRequest(p, gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, maxPriority, extensions...)) - return rm.executeTraversal(ctx, requestID, root, selector, networkErrorChan) + request := gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, maxPriority, extensions...) + ha := &hookActions{} + for _, hook := range rm.requestHooks { + hook.hook(p, request, ha) + } + err = rm.asyncLoader.StartRequest(requestID, ha.persistenceOption) + if err != nil { + return rm.singleErrorResponse(err) + } + rm.peerHandler.SendRequest(p, request) + return rm.executeTraversal(ctx, requestID, root, selector, ha.nodeBuilderChooser, networkErrorChan) } func (rm *RequestManager) executeTraversal( @@ -418,6 +484,7 @@ func (rm *RequestManager) executeTraversal( requestID graphsync.RequestID, root ipld.Link, selector selector.Selector, + nodeBuilderChooser traversal.NodeBuilderChooser, networkErrorChan chan error, ) (chan graphsync.ResponseProgress, chan error) { inProgressChan := make(chan graphsync.ResponseProgress) @@ -425,7 +492,7 @@ func (rm *RequestManager) executeTraversal( loaderFn := loader.WrapAsyncLoader(ctx, rm.asyncLoader.AsyncLoad, requestID, inProgressErr) visitor := visitToChannel(ctx, inProgressChan) go func() { - _ = ipldutil.Traverse(ctx, loaderFn, nil, root, selector, visitor) + _ = ipldutil.Traverse(ctx, loaderFn, nodeBuilderChooser, root, selector, visitor) select { case networkError := <-networkErrorChan: select { diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index 4a73f9a6..5a0137e1 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -46,11 +46,18 @@ type requestKey struct { link ipld.Link } +type storeKey struct { + requestID graphsync.RequestID + storeName string +} + type fakeAsyncLoader struct { responseChannelsLk sync.RWMutex responseChannels map[requestKey]chan types.AsyncLoadResult responses chan map[graphsync.RequestID]metadata.Metadata blks chan []blocks.Block + storesRequestedLk sync.RWMutex + storesRequested map[storeKey]struct{} } func newFakeAsyncLoader() *fakeAsyncLoader { @@ -58,10 +65,17 @@ func newFakeAsyncLoader() *fakeAsyncLoader { responseChannels: make(map[requestKey]chan types.AsyncLoadResult), responses: make(chan map[graphsync.RequestID]metadata.Metadata, 1), blks: make(chan []blocks.Block, 1), + storesRequested: make(map[storeKey]struct{}), } } -func (fal *fakeAsyncLoader) StartRequest(requestID graphsync.RequestID) { + +func (fal *fakeAsyncLoader) StartRequest(requestID graphsync.RequestID, name string) error { + fal.storesRequestedLk.Lock() + fal.storesRequested[storeKey{requestID, name}] = struct{}{} + fal.storesRequestedLk.Unlock() + return nil } + func (fal *fakeAsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) { fal.responses <- responses @@ -88,6 +102,13 @@ func (fal *fakeAsyncLoader) verifyNoRemainingData(t *testing.T, requestID graphs fal.responseChannelsLk.Unlock() } +func (fal *fakeAsyncLoader) verifyStoreUsed(t *testing.T, requestID graphsync.RequestID, storeName string) { + fal.storesRequestedLk.RLock() + _, ok := fal.storesRequested[storeKey{requestID, storeName}] + require.True(t, ok, "request should load from correct store") + fal.storesRequestedLk.RUnlock() +} + func (fal *fakeAsyncLoader) asyncLoad(requestID graphsync.RequestID, link ipld.Link) chan types.AsyncLoadResult { fal.responseChannelsLk.Lock() responseChannel, ok := fal.responseChannels[requestKey{requestID, link}] @@ -538,7 +559,7 @@ func TestEncodingExtensions(t *testing.T) { receivedExtensionData <- data return <-expectedError } - requestManager.RegisterHook(hook) + requestManager.RegisterResponseHook(hook) returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector(), extension1, extension2) rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0] @@ -593,3 +614,68 @@ func TestEncodingExtensions(t *testing.T) { testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan) }) } + +func TestOutgoingRequestHooks(t *testing.T) { + requestRecordChan := make(chan requestRecord, 2) + fph := &fakePeerHandler{requestRecordChan} + ctx := context.Background() + fal := newFakeAsyncLoader() + requestManager := New(ctx, fal) + requestManager.SetDelegate(fph) + requestManager.Startup() + + requestCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + peers := testutil.GeneratePeers(1) + + blockStore := make(map[ipld.Link][]byte) + loader, storer := testutil.NewTestStore(blockStore) + blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5) + + extensionName1 := graphsync.ExtensionName("blockchain") + extension1 := graphsync.ExtensionData{ + Name: extensionName1, + Data: nil, + } + + hook := func(p peer.ID, r graphsync.RequestData, ha graphsync.OutgoingRequestHookActions) { + _, has := r.Extension(extensionName1) + if has { + ha.UseNodeBuilderChooser(blockChain.Chooser) + ha.UsePersistenceOption("chainstore") + } + } + requestManager.RegisterRequestHook(hook) + + returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector(), extension1) + returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector()) + + requestRecords := readNNetworkRequests(requestCtx, t, requestRecordChan, 2) + + md := metadataForBlocks(blockChain.AllBlocks(), true) + mdEncoded, err := metadata.EncodeMetadata(md) + require.NoError(t, err) + mdExt := graphsync.ExtensionData{ + Name: graphsync.ExtensionMetadata, + Data: mdEncoded, + } + responses := []gsmsg.GraphSyncResponse{ + gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, mdExt), + gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, mdExt), + } + requestManager.ProcessResponses(peers[0], responses, blockChain.AllBlocks()) + fal.verifyLastProcessedBlocks(ctx, t, blockChain.AllBlocks()) + fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{ + requestRecords[0].gsr.ID(): md, + requestRecords[1].gsr.ID(): md, + }) + fal.successResponseOn(requestRecords[0].gsr.ID(), blockChain.AllBlocks()) + fal.successResponseOn(requestRecords[1].gsr.ID(), blockChain.AllBlocks()) + + blockChain.VerifyWholeChainWithTypes(requestCtx, returnedResponseChan1) + blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2) + testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan1) + testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan2) + fal.verifyStoreUsed(t, requestRecords[0].gsr.ID(), "chainstore") + fal.verifyStoreUsed(t, requestRecords[1].gsr.ID(), "") +} diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index 3eaf41f0..5e69f9c1 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -2,6 +2,7 @@ package responsemanager import ( "context" + "errors" "sync" "time" @@ -40,7 +41,7 @@ type responseTaskData struct { type requestHook struct { key uint64 - hook graphsync.OnRequestReceivedHook + hook graphsync.OnIncomingRequestHook } // QueryQueue is an interface that can receive new selector query tasks @@ -71,13 +72,15 @@ type ResponseManager struct { peerManager PeerManager queryQueue QueryQueue - messages chan responseManagerMessage - workSignal chan struct{} - ticker *time.Ticker - inProgressResponses map[responseKey]inProgressResponseStatus - requestHooksLk sync.RWMutex - requestHookNextKey uint64 - requestHooks []requestHook + messages chan responseManagerMessage + workSignal chan struct{} + ticker *time.Ticker + inProgressResponses map[responseKey]inProgressResponseStatus + requestHooksLk sync.RWMutex + requestHookNextKey uint64 + requestHooks []requestHook + persistenceOptionsLk sync.RWMutex + persistenceOptions map[string]ipld.Loader } // New creates a new response manager from the given context, loader, @@ -97,6 +100,7 @@ func New(ctx context.Context, workSignal: make(chan struct{}, 1), ticker: time.NewTicker(thawSpeed), inProgressResponses: make(map[responseKey]inProgressResponseStatus), + persistenceOptions: make(map[string]ipld.Loader), } } @@ -114,8 +118,20 @@ func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, reque } } -// RegisterHook registers an extension to process new incoming requests -func (rm *ResponseManager) RegisterHook(hook graphsync.OnRequestReceivedHook) graphsync.UnregisterHookFunc { +// RegisterPersistenceOption registers a new loader for the response manager +func (rm *ResponseManager) RegisterPersistenceOption(name string, loader ipld.Loader) error { + rm.persistenceOptionsLk.Lock() + defer rm.persistenceOptionsLk.Unlock() + _, ok := rm.persistenceOptions[name] + if ok { + return errors.New("persistence option alreayd registered") + } + rm.persistenceOptions[name] = loader + return nil +} + +// RegisterRequestHook registers an extension to process new incoming requests +func (rm *ResponseManager) RegisterRequestHook(hook graphsync.OnIncomingRequestHook) graphsync.UnregisterHookFunc { rm.requestHooksLk.Lock() rh := requestHook{rm.requestHookNextKey, hook} rm.requestHookNextKey++ @@ -205,6 +221,7 @@ func noopVisitor(tp traversal.Progress, n ipld.Node, tr traversal.VisitReason) e } type hookActions struct { + persistenceOptions map[string]ipld.Loader isValidated bool requestID graphsync.RequestID peerResponseSender peerresponsemanager.PeerResponseSender @@ -226,7 +243,12 @@ func (ha *hookActions) ValidateRequest() { ha.isValidated = true } -func (ha *hookActions) UseLoader(loader ipld.Loader) { +func (ha *hookActions) UsePersistenceOption(name string) { + loader, ok := ha.persistenceOptions[name] + if !ok { + ha.TerminateWithError(errors.New("unknown loader option")) + return + } ha.loader = loader } @@ -239,15 +261,18 @@ func (rm *ResponseManager) executeQuery(ctx context.Context, request gsmsg.GraphSyncRequest) { peerResponseSender := rm.peerManager.SenderForPeer(p) selectorSpec := request.Selector() - ha := &hookActions{false, request.ID(), peerResponseSender, nil, rm.loader, nil} rm.requestHooksLk.RLock() + rm.persistenceOptionsLk.RLock() + ha := &hookActions{rm.persistenceOptions, false, request.ID(), peerResponseSender, nil, rm.loader, nil} for _, requestHook := range rm.requestHooks { requestHook.hook(p, request, ha) if ha.err != nil { rm.requestHooksLk.RUnlock() + rm.persistenceOptionsLk.RUnlock() return } } + rm.persistenceOptionsLk.RUnlock() rm.requestHooksLk.RUnlock() if !ha.isValidated { peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown) diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index 34cc6e46..2e64a21d 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -144,7 +144,7 @@ func TestIncomingQuery(t *testing.T) { peerManager := &fakePeerManager{peerResponseSender: fprs} queryQueue := &fakeQueryQueue{} responseManager := New(ctx, loader, peerManager, queryQueue) - responseManager.RegisterHook(selectorvalidator.SelectorValidator(100)) + responseManager.RegisterRequestHook(selectorvalidator.SelectorValidator(100)) responseManager.Startup() requestID := graphsync.RequestID(rand.Int31()) @@ -182,7 +182,7 @@ func TestCancellationQueryInProgress(t *testing.T) { peerManager := &fakePeerManager{peerResponseSender: fprs} queryQueue := &fakeQueryQueue{} responseManager := New(ctx, loader, peerManager, queryQueue) - responseManager.RegisterHook(selectorvalidator.SelectorValidator(100)) + responseManager.RegisterRequestHook(selectorvalidator.SelectorValidator(100)) responseManager.Startup() requestID := graphsync.RequestID(rand.Int31()) @@ -309,7 +309,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("if non validating hook succeeds, does not pass validation", func(t *testing.T) { responseManager := New(ctx, loader, peerManager, queryQueue) responseManager.Startup() - responseManager.RegisterHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) { + responseManager.RegisterRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.SendExtensionData(extensionResponse) }) responseManager.ProcessRequests(ctx, p, requests) @@ -324,7 +324,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("if validating hook succeeds, should pass validation", func(t *testing.T) { responseManager := New(ctx, loader, peerManager, queryQueue) responseManager.Startup() - responseManager.RegisterHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) { + responseManager.RegisterRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() hookActions.SendExtensionData(extensionResponse) }) @@ -340,10 +340,10 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("if any hook fails, should fail", func(t *testing.T) { responseManager := New(ctx, loader, peerManager, queryQueue) responseManager.Startup() - responseManager.RegisterHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) { + responseManager.RegisterRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() }) - responseManager.RegisterHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) { + responseManager.RegisterRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.SendExtensionData(extensionResponse) hookActions.TerminateWithError(errors.New("everything went to crap")) }) @@ -359,7 +359,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("hooks can be unregistered", func(t *testing.T) { responseManager := New(ctx, loader, peerManager, queryQueue) responseManager.Startup() - unregister := responseManager.RegisterHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) { + unregister := responseManager.RegisterRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() hookActions.SendExtensionData(extensionResponse) }) @@ -388,7 +388,7 @@ func TestValidationAndExtensions(t *testing.T) { responseManager := New(ctx, oloader, peerManager, queryQueue) responseManager.Startup() // add validating hook -- so the request SHOULD succeed - responseManager.RegisterHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) { + responseManager.RegisterRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() }) @@ -398,10 +398,12 @@ func TestValidationAndExtensions(t *testing.T) { testutil.AssertReceive(ctx, t, completedRequestChan, &lastRequest, "should complete request") require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure") + err := responseManager.RegisterPersistenceOption("chainstore", loader) + require.NoError(t, err) // register hook to use different loader - _ = responseManager.RegisterHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) { + _ = responseManager.RegisterRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { if _, found := requestData.Extension(extensionName); found { - hookActions.UseLoader(loader) + hookActions.UsePersistenceOption("chainstore") hookActions.SendExtensionData(extensionResponse) } }) @@ -426,7 +428,7 @@ func TestValidationAndExtensions(t *testing.T) { } // add validating hook -- so the request SHOULD succeed - responseManager.RegisterHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) { + responseManager.RegisterRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() }) @@ -438,7 +440,7 @@ func TestValidationAndExtensions(t *testing.T) { require.Equal(t, 0, customChooserCallCount) // register hook to use custom chooser - _ = responseManager.RegisterHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) { + _ = responseManager.RegisterRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { if _, found := requestData.Extension(extensionName); found { hookActions.UseNodeBuilderChooser(customChooser) hookActions.SendExtensionData(extensionResponse) diff --git a/selectorvalidator/selectorvalidator.go b/selectorvalidator/selectorvalidator.go index effe0e97..cd176096 100644 --- a/selectorvalidator/selectorvalidator.go +++ b/selectorvalidator/selectorvalidator.go @@ -21,8 +21,8 @@ var ( // SelectorValidator returns an OnRequestReceivedHook that only validates // requests if their selector only has no recursions that are greater than // maxAcceptedDepth -func SelectorValidator(maxAcceptedDepth int) graphsync.OnRequestReceivedHook { - return func(p peer.ID, request graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) { +func SelectorValidator(maxAcceptedDepth int) graphsync.OnIncomingRequestHook { + return func(p peer.ID, request graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { err := ValidateMaxRecursionDepth(request.Selector(), maxAcceptedDepth) if err == nil { hookActions.ValidateRequest() diff --git a/testutil/chaintypes/gen/main.go b/testutil/chaintypes/gen/main.go new file mode 100644 index 00000000..d8b3b8be --- /dev/null +++ b/testutil/chaintypes/gen/main.go @@ -0,0 +1,44 @@ +package main + +import ( + "os" + + "github.com/ipld/go-ipld-prime/schema" + gengo "github.com/ipld/go-ipld-prime/schema/gen/go" +) + +func main() { + openOrPanic := func(filename string) *os.File { + y, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) + if err != nil { + panic(err) + } + return y + } + + tLink := schema.SpawnLink("Link") + tBytes := schema.SpawnBytes("Bytes") + tString := schema.SpawnString("String") + tParents := schema.SpawnList("Parents", tLink, false) + tMessages := schema.SpawnList("Messages", tBytes, false) + tBlock := schema.SpawnStruct("Block", + []schema.StructField{ + schema.SpawnStructField("Parents", tParents, false, false), + schema.SpawnStructField("Messages", tMessages, false, false), + }, + schema.StructRepresentation_Map{}, + ) + + f := openOrPanic("testchain_minima.go") + gengo.EmitMinima("chaintypes", f) + + f = openOrPanic("testchain_gen.go") + gengo.EmitFileHeader("chaintypes", f) + gengo.EmitEntireType(gengo.NewGeneratorForKindBytes(tBytes), f) + gengo.EmitEntireType(gengo.NewGeneratorForKindLink(tLink), f) + gengo.EmitEntireType(gengo.NewGeneratorForKindString(tString), f) + gengo.EmitEntireType(gengo.NewGeneratorForKindList(tParents), f) + gengo.EmitEntireType(gengo.NewGeneratorForKindList(tMessages), f) + gengo.EmitEntireType(gengo.NewGeneratorForKindStruct(tBlock), f) + +} diff --git a/testutil/chaintypes/testchain_gen.go b/testutil/chaintypes/testchain_gen.go new file mode 100644 index 00000000..c0f08d79 --- /dev/null +++ b/testutil/chaintypes/testchain_gen.go @@ -0,0 +1,1366 @@ +package chaintypes + +import ( + ipld "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/impl/typed" + "github.com/ipld/go-ipld-prime/schema" +) + +// Code generated go-ipld-prime DO NOT EDIT. + +type Bytes struct{ x []byte } + +// TODO generateKindBytes.EmitNativeAccessors +// TODO generateKindBytes.EmitNativeBuilder +type MaybeBytes struct { + Maybe typed.Maybe + Value Bytes +} + +func (m MaybeBytes) Must() Bytes { + if m.Maybe != typed.Maybe_Value { + panic("unbox of a maybe rejected") + } + return m.Value +} + +var _ ipld.Node = Bytes{} +var _ typed.Node = Bytes{} + +func (Bytes) Type() schema.Type { + return nil /*TODO:typelit*/ +} +func (Bytes) ReprKind() ipld.ReprKind { + return ipld.ReprKind_Bytes +} +func (Bytes) LookupString(string) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Bytes", MethodName: "LookupString", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_Bytes} +} +func (Bytes) Lookup(ipld.Node) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Bytes", MethodName: "Lookup", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_Bytes} +} +func (Bytes) LookupIndex(idx int) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Bytes", MethodName: "LookupIndex", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_Bytes} +} +func (Bytes) LookupSegment(seg ipld.PathSegment) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Bytes", MethodName: "LookupSegment", AppropriateKind: ipld.ReprKindSet_Recursive, ActualKind: ipld.ReprKind_Bytes} +} +func (Bytes) MapIterator() ipld.MapIterator { + return mapIteratorReject{ipld.ErrWrongKind{TypeName: "Bytes", MethodName: "MapIterator", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_Bytes}} +} +func (Bytes) ListIterator() ipld.ListIterator { + return listIteratorReject{ipld.ErrWrongKind{TypeName: "Bytes", MethodName: "ListIterator", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_Bytes}} +} +func (Bytes) Length() int { + return -1 +} +func (Bytes) IsUndefined() bool { + return false +} +func (Bytes) IsNull() bool { + return false +} +func (Bytes) AsBool() (bool, error) { + return false, ipld.ErrWrongKind{TypeName: "Bytes", MethodName: "AsBool", AppropriateKind: ipld.ReprKindSet_JustBool, ActualKind: ipld.ReprKind_Bytes} +} +func (Bytes) AsInt() (int, error) { + return 0, ipld.ErrWrongKind{TypeName: "Bytes", MethodName: "AsInt", AppropriateKind: ipld.ReprKindSet_JustInt, ActualKind: ipld.ReprKind_Bytes} +} +func (Bytes) AsFloat() (float64, error) { + return 0, ipld.ErrWrongKind{TypeName: "Bytes", MethodName: "AsFloat", AppropriateKind: ipld.ReprKindSet_JustFloat, ActualKind: ipld.ReprKind_Bytes} +} +func (Bytes) AsString() (string, error) { + return "", ipld.ErrWrongKind{TypeName: "Bytes", MethodName: "AsString", AppropriateKind: ipld.ReprKindSet_JustString, ActualKind: ipld.ReprKind_Bytes} +} +func (x Bytes) AsBytes() ([]byte, error) { + return x.x, nil +} +func (Bytes) AsLink() (ipld.Link, error) { + return nil, ipld.ErrWrongKind{TypeName: "Bytes", MethodName: "AsLink", AppropriateKind: ipld.ReprKindSet_JustLink, ActualKind: ipld.ReprKind_Bytes} +} +func (Bytes) NodeBuilder() ipld.NodeBuilder { + return _Bytes__NodeBuilder{} +} + +type _Bytes__NodeBuilder struct{} + +func Bytes__NodeBuilder() ipld.NodeBuilder { + return _Bytes__NodeBuilder{} +} +func (_Bytes__NodeBuilder) CreateMap() (ipld.MapBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "Bytes.Builder", MethodName: "CreateMap", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_Bytes} +} +func (_Bytes__NodeBuilder) AmendMap() (ipld.MapBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "Bytes.Builder", MethodName: "AmendMap", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_Bytes} +} +func (_Bytes__NodeBuilder) CreateList() (ipld.ListBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "Bytes.Builder", MethodName: "CreateList", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_Bytes} +} +func (_Bytes__NodeBuilder) AmendList() (ipld.ListBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "Bytes.Builder", MethodName: "AmendList", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_Bytes} +} +func (_Bytes__NodeBuilder) CreateNull() (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Bytes.Builder", MethodName: "CreateNull", AppropriateKind: ipld.ReprKindSet_JustNull, ActualKind: ipld.ReprKind_Bytes} +} +func (_Bytes__NodeBuilder) CreateBool(bool) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Bytes.Builder", MethodName: "CreateBool", AppropriateKind: ipld.ReprKindSet_JustBool, ActualKind: ipld.ReprKind_Bytes} +} +func (_Bytes__NodeBuilder) CreateInt(int) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Bytes.Builder", MethodName: "CreateInt", AppropriateKind: ipld.ReprKindSet_JustInt, ActualKind: ipld.ReprKind_Bytes} +} +func (_Bytes__NodeBuilder) CreateFloat(float64) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Bytes.Builder", MethodName: "CreateFloat", AppropriateKind: ipld.ReprKindSet_JustFloat, ActualKind: ipld.ReprKind_Bytes} +} +func (_Bytes__NodeBuilder) CreateString(string) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Bytes.Builder", MethodName: "CreateString", AppropriateKind: ipld.ReprKindSet_JustString, ActualKind: ipld.ReprKind_Bytes} +} +func (nb _Bytes__NodeBuilder) CreateBytes(v []byte) (ipld.Node, error) { + return Bytes{v}, nil +} +func (_Bytes__NodeBuilder) CreateLink(ipld.Link) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Bytes.Builder", MethodName: "CreateLink", AppropriateKind: ipld.ReprKindSet_JustLink, ActualKind: ipld.ReprKind_Bytes} +} +func (Bytes) Representation() ipld.Node { + panic("TODO representation") +} + +type Link struct{ x ipld.Link } + +// TODO generateKindLink.EmitNativeAccessors +// TODO generateKindLink.EmitNativeBuilder +type MaybeLink struct { + Maybe typed.Maybe + Value Link +} + +func (m MaybeLink) Must() Link { + if m.Maybe != typed.Maybe_Value { + panic("unbox of a maybe rejected") + } + return m.Value +} + +var _ ipld.Node = Link{} +var _ typed.Node = Link{} + +func (Link) Type() schema.Type { + return nil /*TODO:typelit*/ +} +func (Link) ReprKind() ipld.ReprKind { + return ipld.ReprKind_Link +} +func (Link) LookupString(string) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Link", MethodName: "LookupString", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_Link} +} +func (Link) Lookup(ipld.Node) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Link", MethodName: "Lookup", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_Link} +} +func (Link) LookupIndex(idx int) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Link", MethodName: "LookupIndex", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_Link} +} +func (Link) LookupSegment(seg ipld.PathSegment) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Link", MethodName: "LookupSegment", AppropriateKind: ipld.ReprKindSet_Recursive, ActualKind: ipld.ReprKind_Link} +} +func (Link) MapIterator() ipld.MapIterator { + return mapIteratorReject{ipld.ErrWrongKind{TypeName: "Link", MethodName: "MapIterator", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_Link}} +} +func (Link) ListIterator() ipld.ListIterator { + return listIteratorReject{ipld.ErrWrongKind{TypeName: "Link", MethodName: "ListIterator", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_Link}} +} +func (Link) Length() int { + return -1 +} +func (Link) IsUndefined() bool { + return false +} +func (Link) IsNull() bool { + return false +} +func (Link) AsBool() (bool, error) { + return false, ipld.ErrWrongKind{TypeName: "Link", MethodName: "AsBool", AppropriateKind: ipld.ReprKindSet_JustBool, ActualKind: ipld.ReprKind_Link} +} +func (Link) AsInt() (int, error) { + return 0, ipld.ErrWrongKind{TypeName: "Link", MethodName: "AsInt", AppropriateKind: ipld.ReprKindSet_JustInt, ActualKind: ipld.ReprKind_Link} +} +func (Link) AsFloat() (float64, error) { + return 0, ipld.ErrWrongKind{TypeName: "Link", MethodName: "AsFloat", AppropriateKind: ipld.ReprKindSet_JustFloat, ActualKind: ipld.ReprKind_Link} +} +func (Link) AsString() (string, error) { + return "", ipld.ErrWrongKind{TypeName: "Link", MethodName: "AsString", AppropriateKind: ipld.ReprKindSet_JustString, ActualKind: ipld.ReprKind_Link} +} +func (Link) AsBytes() ([]byte, error) { + return nil, ipld.ErrWrongKind{TypeName: "Link", MethodName: "AsBytes", AppropriateKind: ipld.ReprKindSet_JustBytes, ActualKind: ipld.ReprKind_Link} +} +func (x Link) AsLink() (ipld.Link, error) { + return x.x, nil +} +func (Link) NodeBuilder() ipld.NodeBuilder { + return _Link__NodeBuilder{} +} + +type _Link__NodeBuilder struct{} + +func Link__NodeBuilder() ipld.NodeBuilder { + return _Link__NodeBuilder{} +} +func (_Link__NodeBuilder) CreateMap() (ipld.MapBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "Link.Builder", MethodName: "CreateMap", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_Link} +} +func (_Link__NodeBuilder) AmendMap() (ipld.MapBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "Link.Builder", MethodName: "AmendMap", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_Link} +} +func (_Link__NodeBuilder) CreateList() (ipld.ListBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "Link.Builder", MethodName: "CreateList", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_Link} +} +func (_Link__NodeBuilder) AmendList() (ipld.ListBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "Link.Builder", MethodName: "AmendList", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_Link} +} +func (_Link__NodeBuilder) CreateNull() (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Link.Builder", MethodName: "CreateNull", AppropriateKind: ipld.ReprKindSet_JustNull, ActualKind: ipld.ReprKind_Link} +} +func (_Link__NodeBuilder) CreateBool(bool) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Link.Builder", MethodName: "CreateBool", AppropriateKind: ipld.ReprKindSet_JustBool, ActualKind: ipld.ReprKind_Link} +} +func (_Link__NodeBuilder) CreateInt(int) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Link.Builder", MethodName: "CreateInt", AppropriateKind: ipld.ReprKindSet_JustInt, ActualKind: ipld.ReprKind_Link} +} +func (_Link__NodeBuilder) CreateFloat(float64) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Link.Builder", MethodName: "CreateFloat", AppropriateKind: ipld.ReprKindSet_JustFloat, ActualKind: ipld.ReprKind_Link} +} +func (_Link__NodeBuilder) CreateString(string) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Link.Builder", MethodName: "CreateString", AppropriateKind: ipld.ReprKindSet_JustString, ActualKind: ipld.ReprKind_Link} +} +func (_Link__NodeBuilder) CreateBytes([]byte) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Link.Builder", MethodName: "CreateBytes", AppropriateKind: ipld.ReprKindSet_JustBytes, ActualKind: ipld.ReprKind_Link} +} +func (nb _Link__NodeBuilder) CreateLink(v ipld.Link) (ipld.Node, error) { + return Link{v}, nil +} +func (Link) Representation() ipld.Node { + panic("TODO representation") +} + +type String struct{ x string } + +func (x String) String() string { + return x.x +} + +type String__Content struct { + Value string +} + +func (b String__Content) Build() (String, error) { + x := String{ + b.Value, + } + // FUTURE : want to support customizable validation. + // but 'if v, ok := x.(schema.Validatable); ok {' doesn't fly: need a way to work on concrete types. + return x, nil +} +func (b String__Content) MustBuild() String { + if x, err := b.Build(); err != nil { + panic(err) + } else { + return x + } +} + +type MaybeString struct { + Maybe typed.Maybe + Value String +} + +func (m MaybeString) Must() String { + if m.Maybe != typed.Maybe_Value { + panic("unbox of a maybe rejected") + } + return m.Value +} + +var _ ipld.Node = String{} +var _ typed.Node = String{} + +func (String) Type() schema.Type { + return nil /*TODO:typelit*/ +} +func (String) ReprKind() ipld.ReprKind { + return ipld.ReprKind_String +} +func (String) LookupString(string) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "String", MethodName: "LookupString", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_String} +} +func (String) Lookup(ipld.Node) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "String", MethodName: "Lookup", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_String} +} +func (String) LookupIndex(idx int) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "String", MethodName: "LookupIndex", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_String} +} +func (String) LookupSegment(seg ipld.PathSegment) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "String", MethodName: "LookupSegment", AppropriateKind: ipld.ReprKindSet_Recursive, ActualKind: ipld.ReprKind_String} +} +func (String) MapIterator() ipld.MapIterator { + return mapIteratorReject{ipld.ErrWrongKind{TypeName: "String", MethodName: "MapIterator", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_String}} +} +func (String) ListIterator() ipld.ListIterator { + return listIteratorReject{ipld.ErrWrongKind{TypeName: "String", MethodName: "ListIterator", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_String}} +} +func (String) Length() int { + return -1 +} +func (String) IsUndefined() bool { + return false +} +func (String) IsNull() bool { + return false +} +func (String) AsBool() (bool, error) { + return false, ipld.ErrWrongKind{TypeName: "String", MethodName: "AsBool", AppropriateKind: ipld.ReprKindSet_JustBool, ActualKind: ipld.ReprKind_String} +} +func (String) AsInt() (int, error) { + return 0, ipld.ErrWrongKind{TypeName: "String", MethodName: "AsInt", AppropriateKind: ipld.ReprKindSet_JustInt, ActualKind: ipld.ReprKind_String} +} +func (String) AsFloat() (float64, error) { + return 0, ipld.ErrWrongKind{TypeName: "String", MethodName: "AsFloat", AppropriateKind: ipld.ReprKindSet_JustFloat, ActualKind: ipld.ReprKind_String} +} +func (x String) AsString() (string, error) { + return x.x, nil +} +func (String) AsBytes() ([]byte, error) { + return nil, ipld.ErrWrongKind{TypeName: "String", MethodName: "AsBytes", AppropriateKind: ipld.ReprKindSet_JustBytes, ActualKind: ipld.ReprKind_String} +} +func (String) AsLink() (ipld.Link, error) { + return nil, ipld.ErrWrongKind{TypeName: "String", MethodName: "AsLink", AppropriateKind: ipld.ReprKindSet_JustLink, ActualKind: ipld.ReprKind_String} +} +func (String) NodeBuilder() ipld.NodeBuilder { + return _String__NodeBuilder{} +} + +type _String__NodeBuilder struct{} + +func String__NodeBuilder() ipld.NodeBuilder { + return _String__NodeBuilder{} +} +func (_String__NodeBuilder) CreateMap() (ipld.MapBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "String.Builder", MethodName: "CreateMap", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_String} +} +func (_String__NodeBuilder) AmendMap() (ipld.MapBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "String.Builder", MethodName: "AmendMap", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_String} +} +func (_String__NodeBuilder) CreateList() (ipld.ListBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "String.Builder", MethodName: "CreateList", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_String} +} +func (_String__NodeBuilder) AmendList() (ipld.ListBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "String.Builder", MethodName: "AmendList", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_String} +} +func (_String__NodeBuilder) CreateNull() (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "String.Builder", MethodName: "CreateNull", AppropriateKind: ipld.ReprKindSet_JustNull, ActualKind: ipld.ReprKind_String} +} +func (_String__NodeBuilder) CreateBool(bool) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "String.Builder", MethodName: "CreateBool", AppropriateKind: ipld.ReprKindSet_JustBool, ActualKind: ipld.ReprKind_String} +} +func (_String__NodeBuilder) CreateInt(int) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "String.Builder", MethodName: "CreateInt", AppropriateKind: ipld.ReprKindSet_JustInt, ActualKind: ipld.ReprKind_String} +} +func (_String__NodeBuilder) CreateFloat(float64) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "String.Builder", MethodName: "CreateFloat", AppropriateKind: ipld.ReprKindSet_JustFloat, ActualKind: ipld.ReprKind_String} +} +func (nb _String__NodeBuilder) CreateString(v string) (ipld.Node, error) { + return String{v}, nil +} +func (_String__NodeBuilder) CreateBytes([]byte) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "String.Builder", MethodName: "CreateBytes", AppropriateKind: ipld.ReprKindSet_JustBytes, ActualKind: ipld.ReprKind_String} +} +func (_String__NodeBuilder) CreateLink(ipld.Link) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "String.Builder", MethodName: "CreateLink", AppropriateKind: ipld.ReprKindSet_JustLink, ActualKind: ipld.ReprKind_String} +} +func (String) Representation() ipld.Node { + panic("TODO representation") +} + +type Parents struct { + x []Link +} + +// TODO generateKindList.EmitNativeAccessors +// TODO generateKindList.EmitNativeBuilder +type MaybeParents struct { + Maybe typed.Maybe + Value Parents +} + +func (m MaybeParents) Must() Parents { + if m.Maybe != typed.Maybe_Value { + panic("unbox of a maybe rejected") + } + return m.Value +} + +var _ ipld.Node = Parents{} +var _ typed.Node = Parents{} + +func (Parents) Type() schema.Type { + return nil /*TODO:typelit*/ +} +func (Parents) ReprKind() ipld.ReprKind { + return ipld.ReprKind_List +} +func (Parents) LookupString(string) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Parents", MethodName: "LookupString", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_List} +} +func (x Parents) Lookup(key ipld.Node) (ipld.Node, error) { + ki, err := key.AsInt() + if err != nil { + return nil, ipld.ErrInvalidKey{"got " + key.ReprKind().String() + ", need Int"} + } + return x.LookupIndex(ki) +} +func (x Parents) LookupIndex(index int) (ipld.Node, error) { + if index >= len(x.x) { + return nil, ipld.ErrNotExists{ipld.PathSegmentOfInt(index)} + } + return x.x[index], nil +} +func (n Parents) LookupSegment(seg ipld.PathSegment) (ipld.Node, error) { + idx, err := seg.Index() + if err != nil { + return nil, err + } + return n.LookupIndex(idx) +} +func (Parents) MapIterator() ipld.MapIterator { + return mapIteratorReject{ipld.ErrWrongKind{TypeName: "Parents", MethodName: "MapIterator", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_List}} +} +func (x Parents) ListIterator() ipld.ListIterator { + return &_Parents__Itr{&x, 0} +} + +type _Parents__Itr struct { + node *Parents + idx int +} + +func (itr *_Parents__Itr) Next() (idx int, value ipld.Node, _ error) { + if itr.idx >= len(itr.node.x) { + return 0, nil, ipld.ErrIteratorOverread{} + } + idx = itr.idx + value = itr.node.x[idx] + itr.idx++ + return +} + +func (itr *_Parents__Itr) Done() bool { + return itr.idx >= len(itr.node.x) +} + +func (x Parents) Length() int { + return len(x.x) +} +func (Parents) IsUndefined() bool { + return false +} +func (Parents) IsNull() bool { + return false +} +func (Parents) AsBool() (bool, error) { + return false, ipld.ErrWrongKind{TypeName: "Parents", MethodName: "AsBool", AppropriateKind: ipld.ReprKindSet_JustBool, ActualKind: ipld.ReprKind_List} +} +func (Parents) AsInt() (int, error) { + return 0, ipld.ErrWrongKind{TypeName: "Parents", MethodName: "AsInt", AppropriateKind: ipld.ReprKindSet_JustInt, ActualKind: ipld.ReprKind_List} +} +func (Parents) AsFloat() (float64, error) { + return 0, ipld.ErrWrongKind{TypeName: "Parents", MethodName: "AsFloat", AppropriateKind: ipld.ReprKindSet_JustFloat, ActualKind: ipld.ReprKind_List} +} +func (Parents) AsString() (string, error) { + return "", ipld.ErrWrongKind{TypeName: "Parents", MethodName: "AsString", AppropriateKind: ipld.ReprKindSet_JustString, ActualKind: ipld.ReprKind_List} +} +func (Parents) AsBytes() ([]byte, error) { + return nil, ipld.ErrWrongKind{TypeName: "Parents", MethodName: "AsBytes", AppropriateKind: ipld.ReprKindSet_JustBytes, ActualKind: ipld.ReprKind_List} +} +func (Parents) AsLink() (ipld.Link, error) { + return nil, ipld.ErrWrongKind{TypeName: "Parents", MethodName: "AsLink", AppropriateKind: ipld.ReprKindSet_JustLink, ActualKind: ipld.ReprKind_List} +} +func (Parents) NodeBuilder() ipld.NodeBuilder { + return _Parents__NodeBuilder{} +} + +type _Parents__NodeBuilder struct{} + +func Parents__NodeBuilder() ipld.NodeBuilder { + return _Parents__NodeBuilder{} +} +func (_Parents__NodeBuilder) CreateMap() (ipld.MapBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "Parents.Builder", MethodName: "CreateMap", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_List} +} +func (_Parents__NodeBuilder) AmendMap() (ipld.MapBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "Parents.Builder", MethodName: "AmendMap", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_List} +} +func (nb _Parents__NodeBuilder) CreateList() (ipld.ListBuilder, error) { + return &_Parents__ListBuilder{v: &Parents{}}, nil +} + +type _Parents__ListBuilder struct { + v *Parents +} + +func (lb *_Parents__ListBuilder) growList(k int) { + oldLen := len(lb.v.x) + minLen := k + 1 + if minLen > oldLen { + // Grow. + oldCap := cap(lb.v.x) + if minLen > oldCap { + // Out of cap; do whole new backing array allocation. + // Growth maths are per stdlib's reflect.grow. + // First figure out how much growth to do. + newCap := oldCap + if newCap == 0 { + newCap = minLen + } else { + for minLen > newCap { + if minLen < 1024 { + newCap += newCap + } else { + newCap += newCap / 4 + } + } + } + // Now alloc and copy over old. + newArr := make([]Link, minLen, newCap) + copy(newArr, lb.v.x) + lb.v.x = newArr + } else { + // Still have cap, just extend the slice. + lb.v.x = lb.v.x[0:minLen] + } + } +} + +func (lb *_Parents__ListBuilder) validate(v ipld.Node) error { + if v.IsNull() { + panic("type mismatch on struct field assignment: cannot assign null to non-nullable field") // FIXME need an error type for this + } + tv, ok := v.(typed.Node) + if !ok { + panic("need typed.Node for insertion into struct") // FIXME need an error type for this + } + _, ok = v.(Link) + if !ok { + panic("value for type Parents is type Link; cannot assign " + tv.Type().Name()) // FIXME need an error type for this + } + return nil +} + +func (lb *_Parents__ListBuilder) unsafeSet(idx int, v ipld.Node) { + x := v.(Link) + lb.v.x[idx] = x +} + +func (lb *_Parents__ListBuilder) AppendAll(vs []ipld.Node) error { + for _, v := range vs { + err := lb.validate(v) + if err != nil { + return err + } + } + off := len(lb.v.x) + new := off + len(vs) + lb.growList(new - 1) + for _, v := range vs { + lb.unsafeSet(off, v) + off++ + } + return nil +} + +func (lb *_Parents__ListBuilder) Append(v ipld.Node) error { + err := lb.validate(v) + if err != nil { + return err + } + off := len(lb.v.x) + lb.growList(off) + lb.unsafeSet(off, v) + return nil +} +func (lb *_Parents__ListBuilder) Set(idx int, v ipld.Node) error { + err := lb.validate(v) + if err != nil { + return err + } + lb.growList(idx) + lb.unsafeSet(idx, v) + return nil +} + +func (lb *_Parents__ListBuilder) Build() (ipld.Node, error) { + v := *lb.v + lb = nil + return v, nil +} + +func (lb *_Parents__ListBuilder) BuilderForValue(_ int) ipld.NodeBuilder { + return Link__NodeBuilder() +} + +func (nb _Parents__NodeBuilder) AmendList() (ipld.ListBuilder, error) { + panic("TODO later") +} +func (_Parents__NodeBuilder) CreateNull() (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Parents.Builder", MethodName: "CreateNull", AppropriateKind: ipld.ReprKindSet_JustNull, ActualKind: ipld.ReprKind_List} +} +func (_Parents__NodeBuilder) CreateBool(bool) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Parents.Builder", MethodName: "CreateBool", AppropriateKind: ipld.ReprKindSet_JustBool, ActualKind: ipld.ReprKind_List} +} +func (_Parents__NodeBuilder) CreateInt(int) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Parents.Builder", MethodName: "CreateInt", AppropriateKind: ipld.ReprKindSet_JustInt, ActualKind: ipld.ReprKind_List} +} +func (_Parents__NodeBuilder) CreateFloat(float64) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Parents.Builder", MethodName: "CreateFloat", AppropriateKind: ipld.ReprKindSet_JustFloat, ActualKind: ipld.ReprKind_List} +} +func (_Parents__NodeBuilder) CreateString(string) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Parents.Builder", MethodName: "CreateString", AppropriateKind: ipld.ReprKindSet_JustString, ActualKind: ipld.ReprKind_List} +} +func (_Parents__NodeBuilder) CreateBytes([]byte) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Parents.Builder", MethodName: "CreateBytes", AppropriateKind: ipld.ReprKindSet_JustBytes, ActualKind: ipld.ReprKind_List} +} +func (_Parents__NodeBuilder) CreateLink(ipld.Link) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Parents.Builder", MethodName: "CreateLink", AppropriateKind: ipld.ReprKindSet_JustLink, ActualKind: ipld.ReprKind_List} +} +func (n Parents) Representation() ipld.Node { + panic("TODO representation") +} + +type Messages struct { + x []Bytes +} + +// TODO generateKindList.EmitNativeAccessors +// TODO generateKindList.EmitNativeBuilder +type MaybeMessages struct { + Maybe typed.Maybe + Value Messages +} + +func (m MaybeMessages) Must() Messages { + if m.Maybe != typed.Maybe_Value { + panic("unbox of a maybe rejected") + } + return m.Value +} + +var _ ipld.Node = Messages{} +var _ typed.Node = Messages{} + +func (Messages) Type() schema.Type { + return nil /*TODO:typelit*/ +} +func (Messages) ReprKind() ipld.ReprKind { + return ipld.ReprKind_List +} +func (Messages) LookupString(string) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Messages", MethodName: "LookupString", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_List} +} +func (x Messages) Lookup(key ipld.Node) (ipld.Node, error) { + ki, err := key.AsInt() + if err != nil { + return nil, ipld.ErrInvalidKey{"got " + key.ReprKind().String() + ", need Int"} + } + return x.LookupIndex(ki) +} +func (x Messages) LookupIndex(index int) (ipld.Node, error) { + if index >= len(x.x) { + return nil, ipld.ErrNotExists{ipld.PathSegmentOfInt(index)} + } + return x.x[index], nil +} +func (n Messages) LookupSegment(seg ipld.PathSegment) (ipld.Node, error) { + idx, err := seg.Index() + if err != nil { + return nil, err + } + return n.LookupIndex(idx) +} +func (Messages) MapIterator() ipld.MapIterator { + return mapIteratorReject{ipld.ErrWrongKind{TypeName: "Messages", MethodName: "MapIterator", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_List}} +} +func (x Messages) ListIterator() ipld.ListIterator { + return &_Messages__Itr{&x, 0} +} + +type _Messages__Itr struct { + node *Messages + idx int +} + +func (itr *_Messages__Itr) Next() (idx int, value ipld.Node, _ error) { + if itr.idx >= len(itr.node.x) { + return 0, nil, ipld.ErrIteratorOverread{} + } + idx = itr.idx + value = itr.node.x[idx] + itr.idx++ + return +} + +func (itr *_Messages__Itr) Done() bool { + return itr.idx >= len(itr.node.x) +} + +func (x Messages) Length() int { + return len(x.x) +} +func (Messages) IsUndefined() bool { + return false +} +func (Messages) IsNull() bool { + return false +} +func (Messages) AsBool() (bool, error) { + return false, ipld.ErrWrongKind{TypeName: "Messages", MethodName: "AsBool", AppropriateKind: ipld.ReprKindSet_JustBool, ActualKind: ipld.ReprKind_List} +} +func (Messages) AsInt() (int, error) { + return 0, ipld.ErrWrongKind{TypeName: "Messages", MethodName: "AsInt", AppropriateKind: ipld.ReprKindSet_JustInt, ActualKind: ipld.ReprKind_List} +} +func (Messages) AsFloat() (float64, error) { + return 0, ipld.ErrWrongKind{TypeName: "Messages", MethodName: "AsFloat", AppropriateKind: ipld.ReprKindSet_JustFloat, ActualKind: ipld.ReprKind_List} +} +func (Messages) AsString() (string, error) { + return "", ipld.ErrWrongKind{TypeName: "Messages", MethodName: "AsString", AppropriateKind: ipld.ReprKindSet_JustString, ActualKind: ipld.ReprKind_List} +} +func (Messages) AsBytes() ([]byte, error) { + return nil, ipld.ErrWrongKind{TypeName: "Messages", MethodName: "AsBytes", AppropriateKind: ipld.ReprKindSet_JustBytes, ActualKind: ipld.ReprKind_List} +} +func (Messages) AsLink() (ipld.Link, error) { + return nil, ipld.ErrWrongKind{TypeName: "Messages", MethodName: "AsLink", AppropriateKind: ipld.ReprKindSet_JustLink, ActualKind: ipld.ReprKind_List} +} +func (Messages) NodeBuilder() ipld.NodeBuilder { + return _Messages__NodeBuilder{} +} + +type _Messages__NodeBuilder struct{} + +func Messages__NodeBuilder() ipld.NodeBuilder { + return _Messages__NodeBuilder{} +} +func (_Messages__NodeBuilder) CreateMap() (ipld.MapBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "Messages.Builder", MethodName: "CreateMap", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_List} +} +func (_Messages__NodeBuilder) AmendMap() (ipld.MapBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "Messages.Builder", MethodName: "AmendMap", AppropriateKind: ipld.ReprKindSet_JustMap, ActualKind: ipld.ReprKind_List} +} +func (nb _Messages__NodeBuilder) CreateList() (ipld.ListBuilder, error) { + return &_Messages__ListBuilder{v: &Messages{}}, nil +} + +type _Messages__ListBuilder struct { + v *Messages +} + +func (lb *_Messages__ListBuilder) growList(k int) { + oldLen := len(lb.v.x) + minLen := k + 1 + if minLen > oldLen { + // Grow. + oldCap := cap(lb.v.x) + if minLen > oldCap { + // Out of cap; do whole new backing array allocation. + // Growth maths are per stdlib's reflect.grow. + // First figure out how much growth to do. + newCap := oldCap + if newCap == 0 { + newCap = minLen + } else { + for minLen > newCap { + if minLen < 1024 { + newCap += newCap + } else { + newCap += newCap / 4 + } + } + } + // Now alloc and copy over old. + newArr := make([]Bytes, minLen, newCap) + copy(newArr, lb.v.x) + lb.v.x = newArr + } else { + // Still have cap, just extend the slice. + lb.v.x = lb.v.x[0:minLen] + } + } +} + +func (lb *_Messages__ListBuilder) validate(v ipld.Node) error { + if v.IsNull() { + panic("type mismatch on struct field assignment: cannot assign null to non-nullable field") // FIXME need an error type for this + } + tv, ok := v.(typed.Node) + if !ok { + panic("need typed.Node for insertion into struct") // FIXME need an error type for this + } + _, ok = v.(Bytes) + if !ok { + panic("value for type Messages is type Bytes; cannot assign " + tv.Type().Name()) // FIXME need an error type for this + } + return nil +} + +func (lb *_Messages__ListBuilder) unsafeSet(idx int, v ipld.Node) { + x := v.(Bytes) + lb.v.x[idx] = x +} + +func (lb *_Messages__ListBuilder) AppendAll(vs []ipld.Node) error { + for _, v := range vs { + err := lb.validate(v) + if err != nil { + return err + } + } + off := len(lb.v.x) + new := off + len(vs) + lb.growList(new - 1) + for _, v := range vs { + lb.unsafeSet(off, v) + off++ + } + return nil +} + +func (lb *_Messages__ListBuilder) Append(v ipld.Node) error { + err := lb.validate(v) + if err != nil { + return err + } + off := len(lb.v.x) + lb.growList(off) + lb.unsafeSet(off, v) + return nil +} +func (lb *_Messages__ListBuilder) Set(idx int, v ipld.Node) error { + err := lb.validate(v) + if err != nil { + return err + } + lb.growList(idx) + lb.unsafeSet(idx, v) + return nil +} + +func (lb *_Messages__ListBuilder) Build() (ipld.Node, error) { + v := *lb.v + lb = nil + return v, nil +} + +func (lb *_Messages__ListBuilder) BuilderForValue(_ int) ipld.NodeBuilder { + return Bytes__NodeBuilder() +} + +func (nb _Messages__NodeBuilder) AmendList() (ipld.ListBuilder, error) { + panic("TODO later") +} +func (_Messages__NodeBuilder) CreateNull() (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Messages.Builder", MethodName: "CreateNull", AppropriateKind: ipld.ReprKindSet_JustNull, ActualKind: ipld.ReprKind_List} +} +func (_Messages__NodeBuilder) CreateBool(bool) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Messages.Builder", MethodName: "CreateBool", AppropriateKind: ipld.ReprKindSet_JustBool, ActualKind: ipld.ReprKind_List} +} +func (_Messages__NodeBuilder) CreateInt(int) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Messages.Builder", MethodName: "CreateInt", AppropriateKind: ipld.ReprKindSet_JustInt, ActualKind: ipld.ReprKind_List} +} +func (_Messages__NodeBuilder) CreateFloat(float64) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Messages.Builder", MethodName: "CreateFloat", AppropriateKind: ipld.ReprKindSet_JustFloat, ActualKind: ipld.ReprKind_List} +} +func (_Messages__NodeBuilder) CreateString(string) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Messages.Builder", MethodName: "CreateString", AppropriateKind: ipld.ReprKindSet_JustString, ActualKind: ipld.ReprKind_List} +} +func (_Messages__NodeBuilder) CreateBytes([]byte) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Messages.Builder", MethodName: "CreateBytes", AppropriateKind: ipld.ReprKindSet_JustBytes, ActualKind: ipld.ReprKind_List} +} +func (_Messages__NodeBuilder) CreateLink(ipld.Link) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Messages.Builder", MethodName: "CreateLink", AppropriateKind: ipld.ReprKindSet_JustLink, ActualKind: ipld.ReprKind_List} +} +func (n Messages) Representation() ipld.Node { + panic("TODO representation") +} + +type Block struct { + Parents Parents + Messages Messages +} + +func (x Parents) FieldParents() Parents { + // TODO going to tear through here with changes to Maybe system in a moment anyway + return Parents{} +} +func (x Messages) FieldMessages() Messages { + // TODO going to tear through here with changes to Maybe system in a moment anyway + return Messages{} +} + +type Block__Content struct { + // TODO + // TODO +} + +func (b Block__Content) Build() (Block, error) { + x := Block{ + // TODO + } + // FUTURE : want to support customizable validation. + // but 'if v, ok := x.(schema.Validatable); ok {' doesn't fly: need a way to work on concrete types. + return x, nil +} +func (b Block__Content) MustBuild() Block { + if x, err := b.Build(); err != nil { + panic(err) + } else { + return x + } +} + +type MaybeBlock struct { + Maybe typed.Maybe + Value Block +} + +func (m MaybeBlock) Must() Block { + if m.Maybe != typed.Maybe_Value { + panic("unbox of a maybe rejected") + } + return m.Value +} + +var _ ipld.Node = Block{} +var _ typed.Node = Block{} + +func (Block) Type() schema.Type { + return nil /*TODO:typelit*/ +} +func (Block) ReprKind() ipld.ReprKind { + return ipld.ReprKind_Map +} +func (x Block) LookupString(key string) (ipld.Node, error) { + switch key { + case "Parents": + return x.Parents, nil + case "Messages": + return x.Messages, nil + default: + return nil, typed.ErrNoSuchField{Type: nil /*TODO*/, FieldName: key} + } +} +func (x Block) Lookup(key ipld.Node) (ipld.Node, error) { + ks, err := key.AsString() + if err != nil { + return nil, ipld.ErrInvalidKey{"got " + key.ReprKind().String() + ", need string"} + } + return x.LookupString(ks) +} +func (Block) LookupIndex(idx int) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block", MethodName: "LookupIndex", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_Map} +} +func (n Block) LookupSegment(seg ipld.PathSegment) (ipld.Node, error) { + return n.LookupString(seg.String()) +} +func (x Block) MapIterator() ipld.MapIterator { + return &_Block__Itr{&x, 0} +} + +type _Block__Itr struct { + node *Block + idx int +} + +func (itr *_Block__Itr) Next() (k ipld.Node, v ipld.Node, _ error) { + if itr.idx >= 2 { + return nil, nil, ipld.ErrIteratorOverread{} + } + switch itr.idx { + case 0: + k = String{"Parents"} + v = itr.node.Parents + case 1: + k = String{"Messages"} + v = itr.node.Messages + default: + panic("unreachable") + } + itr.idx++ + return +} +func (itr *_Block__Itr) Done() bool { + return itr.idx >= 2 +} + +func (Block) ListIterator() ipld.ListIterator { + return listIteratorReject{ipld.ErrWrongKind{TypeName: "Block", MethodName: "ListIterator", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_Map}} +} +func (Block) Length() int { + return 2 +} +func (Block) IsUndefined() bool { + return false +} +func (Block) IsNull() bool { + return false +} +func (Block) AsBool() (bool, error) { + return false, ipld.ErrWrongKind{TypeName: "Block", MethodName: "AsBool", AppropriateKind: ipld.ReprKindSet_JustBool, ActualKind: ipld.ReprKind_Map} +} +func (Block) AsInt() (int, error) { + return 0, ipld.ErrWrongKind{TypeName: "Block", MethodName: "AsInt", AppropriateKind: ipld.ReprKindSet_JustInt, ActualKind: ipld.ReprKind_Map} +} +func (Block) AsFloat() (float64, error) { + return 0, ipld.ErrWrongKind{TypeName: "Block", MethodName: "AsFloat", AppropriateKind: ipld.ReprKindSet_JustFloat, ActualKind: ipld.ReprKind_Map} +} +func (Block) AsString() (string, error) { + return "", ipld.ErrWrongKind{TypeName: "Block", MethodName: "AsString", AppropriateKind: ipld.ReprKindSet_JustString, ActualKind: ipld.ReprKind_Map} +} +func (Block) AsBytes() ([]byte, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block", MethodName: "AsBytes", AppropriateKind: ipld.ReprKindSet_JustBytes, ActualKind: ipld.ReprKind_Map} +} +func (Block) AsLink() (ipld.Link, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block", MethodName: "AsLink", AppropriateKind: ipld.ReprKindSet_JustLink, ActualKind: ipld.ReprKind_Map} +} +func (Block) NodeBuilder() ipld.NodeBuilder { + return _Block__NodeBuilder{} +} + +type _Block__NodeBuilder struct{} + +func Block__NodeBuilder() ipld.NodeBuilder { + return _Block__NodeBuilder{} +} +func (nb _Block__NodeBuilder) CreateMap() (ipld.MapBuilder, error) { + return &_Block__MapBuilder{v: &Block{}}, nil +} + +type _Block__MapBuilder struct { + v *Block + Parents__isset bool + Messages__isset bool +} + +func (mb *_Block__MapBuilder) Insert(k, v ipld.Node) error { + ks, err := k.AsString() + if err != nil { + return ipld.ErrInvalidKey{"not a string: " + err.Error()} + } + switch ks { + case "Parents": + if v.IsNull() { + panic("type mismatch on struct field assignment: cannot assign null to non-nullable field") // FIXME need an error type for this + } + tv, ok := v.(typed.Node) + if !ok { + panic("need typed.Node for insertion into struct") // FIXME need an error type for this + } + x, ok := v.(Parents) + if !ok { + panic("field 'Parents' in type Block is type Parents; cannot assign " + tv.Type().Name()) // FIXME need an error type for this + } + mb.v.Parents = x + mb.Parents__isset = true + case "Messages": + if v.IsNull() { + panic("type mismatch on struct field assignment: cannot assign null to non-nullable field") // FIXME need an error type for this + } + tv, ok := v.(typed.Node) + if !ok { + panic("need typed.Node for insertion into struct") // FIXME need an error type for this + } + x, ok := v.(Messages) + if !ok { + panic("field 'Messages' in type Block is type Messages; cannot assign " + tv.Type().Name()) // FIXME need an error type for this + } + mb.v.Messages = x + mb.Messages__isset = true + default: + return typed.ErrNoSuchField{Type: nil /*TODO:typelit*/, FieldName: ks} + } + return nil +} +func (mb *_Block__MapBuilder) Delete(k ipld.Node) error { + panic("TODO later") +} +func (mb *_Block__MapBuilder) Build() (ipld.Node, error) { + if !mb.Parents__isset { + panic("missing required field 'Parents' in building struct Block") // FIXME need an error type for this + } + if !mb.Messages__isset { + panic("missing required field 'Messages' in building struct Block") // FIXME need an error type for this + } + v := *mb.v + mb = nil + return v, nil +} +func (mb *_Block__MapBuilder) BuilderForKeys() ipld.NodeBuilder { + return _String__NodeBuilder{} +} +func (mb *_Block__MapBuilder) BuilderForValue(ks string) ipld.NodeBuilder { + switch ks { + case "Parents": + return Parents__NodeBuilder() + case "Messages": + return Messages__NodeBuilder() + default: + panic(typed.ErrNoSuchField{Type: nil /*TODO:typelit*/, FieldName: ks}) + } + return nil +} + +func (nb _Block__NodeBuilder) AmendMap() (ipld.MapBuilder, error) { + panic("TODO later") +} +func (_Block__NodeBuilder) CreateList() (ipld.ListBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Builder", MethodName: "CreateList", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_Map} +} +func (_Block__NodeBuilder) AmendList() (ipld.ListBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Builder", MethodName: "AmendList", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_Map} +} +func (_Block__NodeBuilder) CreateNull() (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Builder", MethodName: "CreateNull", AppropriateKind: ipld.ReprKindSet_JustNull, ActualKind: ipld.ReprKind_Map} +} +func (_Block__NodeBuilder) CreateBool(bool) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Builder", MethodName: "CreateBool", AppropriateKind: ipld.ReprKindSet_JustBool, ActualKind: ipld.ReprKind_Map} +} +func (_Block__NodeBuilder) CreateInt(int) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Builder", MethodName: "CreateInt", AppropriateKind: ipld.ReprKindSet_JustInt, ActualKind: ipld.ReprKind_Map} +} +func (_Block__NodeBuilder) CreateFloat(float64) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Builder", MethodName: "CreateFloat", AppropriateKind: ipld.ReprKindSet_JustFloat, ActualKind: ipld.ReprKind_Map} +} +func (_Block__NodeBuilder) CreateString(string) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Builder", MethodName: "CreateString", AppropriateKind: ipld.ReprKindSet_JustString, ActualKind: ipld.ReprKind_Map} +} +func (_Block__NodeBuilder) CreateBytes([]byte) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Builder", MethodName: "CreateBytes", AppropriateKind: ipld.ReprKindSet_JustBytes, ActualKind: ipld.ReprKind_Map} +} +func (_Block__NodeBuilder) CreateLink(ipld.Link) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Builder", MethodName: "CreateLink", AppropriateKind: ipld.ReprKindSet_JustLink, ActualKind: ipld.ReprKind_Map} +} +func (n Block) Representation() ipld.Node { + return _Block__Repr{&n} +} + +var _ ipld.Node = _Block__Repr{} + +type _Block__Repr struct { + n *Block +} + +func (_Block__Repr) ReprKind() ipld.ReprKind { + return ipld.ReprKind_Map +} +func (rn _Block__Repr) LookupString(key string) (ipld.Node, error) { + switch key { + case "Parents": + return rn.n.Parents, nil + case "Messages": + return rn.n.Messages, nil + default: + return nil, typed.ErrNoSuchField{Type: nil /*TODO*/, FieldName: key} + } +} +func (rn _Block__Repr) Lookup(key ipld.Node) (ipld.Node, error) { + ks, err := key.AsString() + if err != nil { + return nil, ipld.ErrInvalidKey{"got " + key.ReprKind().String() + ", need string"} + } + return rn.LookupString(ks) +} +func (_Block__Repr) LookupIndex(idx int) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Representation", MethodName: "LookupIndex", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_Map} +} +func (n _Block__Repr) LookupSegment(seg ipld.PathSegment) (ipld.Node, error) { + return n.LookupString(seg.String()) +} +func (rn _Block__Repr) MapIterator() ipld.MapIterator { + return &_Block__ReprItr{rn.n, 0} +} + +type _Block__ReprItr struct { + node *Block + idx int +} + +func (itr *_Block__ReprItr) Next() (k ipld.Node, v ipld.Node, _ error) { + if itr.idx >= 2 { + return nil, nil, ipld.ErrIteratorOverread{} + } + for { + switch itr.idx { + case 0: + k = String{"Parents"} + v = itr.node.Parents + case 1: + k = String{"Messages"} + v = itr.node.Messages + default: + panic("unreachable") + } + } + itr.idx++ + return +} +func (itr *_Block__ReprItr) Done() bool { + return itr.idx >= 2 +} + +func (_Block__Repr) ListIterator() ipld.ListIterator { + return listIteratorReject{ipld.ErrWrongKind{TypeName: "Block.Representation", MethodName: "ListIterator", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_Map}} +} +func (rn _Block__Repr) Length() int { + l := 2 + return l +} +func (_Block__Repr) IsUndefined() bool { + return false +} +func (_Block__Repr) IsNull() bool { + return false +} +func (_Block__Repr) AsBool() (bool, error) { + return false, ipld.ErrWrongKind{TypeName: "Block.Representation", MethodName: "AsBool", AppropriateKind: ipld.ReprKindSet_JustBool, ActualKind: ipld.ReprKind_Map} +} +func (_Block__Repr) AsInt() (int, error) { + return 0, ipld.ErrWrongKind{TypeName: "Block.Representation", MethodName: "AsInt", AppropriateKind: ipld.ReprKindSet_JustInt, ActualKind: ipld.ReprKind_Map} +} +func (_Block__Repr) AsFloat() (float64, error) { + return 0, ipld.ErrWrongKind{TypeName: "Block.Representation", MethodName: "AsFloat", AppropriateKind: ipld.ReprKindSet_JustFloat, ActualKind: ipld.ReprKind_Map} +} +func (_Block__Repr) AsString() (string, error) { + return "", ipld.ErrWrongKind{TypeName: "Block.Representation", MethodName: "AsString", AppropriateKind: ipld.ReprKindSet_JustString, ActualKind: ipld.ReprKind_Map} +} +func (_Block__Repr) AsBytes() ([]byte, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Representation", MethodName: "AsBytes", AppropriateKind: ipld.ReprKindSet_JustBytes, ActualKind: ipld.ReprKind_Map} +} +func (_Block__Repr) AsLink() (ipld.Link, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Representation", MethodName: "AsLink", AppropriateKind: ipld.ReprKindSet_JustLink, ActualKind: ipld.ReprKind_Map} +} +func (_Block__Repr) NodeBuilder() ipld.NodeBuilder { + return _Block__ReprBuilder{} +} + +type _Block__ReprBuilder struct{} + +func Block__ReprBuilder() ipld.NodeBuilder { + return _Block__ReprBuilder{} +} +func (nb _Block__ReprBuilder) CreateMap() (ipld.MapBuilder, error) { + return &_Block__ReprMapBuilder{v: &Block{}}, nil +} + +type _Block__ReprMapBuilder struct { + v *Block + Parents__isset bool + Messages__isset bool +} + +func (mb *_Block__ReprMapBuilder) Insert(k, v ipld.Node) error { + ks, err := k.AsString() + if err != nil { + return ipld.ErrInvalidKey{"not a string: " + err.Error()} + } + switch ks { + case "Parents": + if mb.Parents__isset { + panic("repeated assignment to field") // FIXME need an error type for this + } + if v.IsNull() { + panic("type mismatch on struct field assignment: cannot assign null to non-nullable field") // FIXME need an error type for this + } + tv, ok := v.(typed.Node) + if !ok { + panic("need typed.Node for insertion into struct") // FIXME need an error type for this + } + x, ok := v.(Parents) + if !ok { + panic("field 'Parents' (key: 'Parents') in type Block is type Parents; cannot assign " + tv.Type().Name()) // FIXME need an error type for this + } + mb.v.Parents = x + mb.Parents__isset = true + case "Messages": + if mb.Messages__isset { + panic("repeated assignment to field") // FIXME need an error type for this + } + if v.IsNull() { + panic("type mismatch on struct field assignment: cannot assign null to non-nullable field") // FIXME need an error type for this + } + tv, ok := v.(typed.Node) + if !ok { + panic("need typed.Node for insertion into struct") // FIXME need an error type for this + } + x, ok := v.(Messages) + if !ok { + panic("field 'Messages' (key: 'Messages') in type Block is type Messages; cannot assign " + tv.Type().Name()) // FIXME need an error type for this + } + mb.v.Messages = x + mb.Messages__isset = true + default: + return typed.ErrNoSuchField{Type: nil /*TODO:typelit*/, FieldName: ks} + } + return nil +} +func (mb *_Block__ReprMapBuilder) Delete(k ipld.Node) error { + panic("TODO later") +} +func (mb *_Block__ReprMapBuilder) Build() (ipld.Node, error) { + if !mb.Parents__isset { + panic("missing required field 'Parents' (key: 'Parents') in building struct Block") // FIXME need an error type for this + } + if !mb.Messages__isset { + panic("missing required field 'Messages' (key: 'Messages') in building struct Block") // FIXME need an error type for this + } + v := mb.v + mb = nil + return v, nil +} +func (mb *_Block__ReprMapBuilder) BuilderForKeys() ipld.NodeBuilder { + return _String__NodeBuilder{} +} +func (mb *_Block__ReprMapBuilder) BuilderForValue(ks string) ipld.NodeBuilder { + switch ks { + case "Parents": + return Parents__NodeBuilder() + case "Messages": + return Messages__NodeBuilder() + default: + panic(typed.ErrNoSuchField{Type: nil /*TODO:typelit*/, FieldName: ks}) + } + return nil +} + +func (nb _Block__ReprBuilder) AmendMap() (ipld.MapBuilder, error) { + panic("TODO later") +} +func (_Block__ReprBuilder) CreateList() (ipld.ListBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Representation.Builder", MethodName: "CreateList", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_Map} +} +func (_Block__ReprBuilder) AmendList() (ipld.ListBuilder, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Representation.Builder", MethodName: "AmendList", AppropriateKind: ipld.ReprKindSet_JustList, ActualKind: ipld.ReprKind_Map} +} +func (_Block__ReprBuilder) CreateNull() (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Representation.Builder", MethodName: "CreateNull", AppropriateKind: ipld.ReprKindSet_JustNull, ActualKind: ipld.ReprKind_Map} +} +func (_Block__ReprBuilder) CreateBool(bool) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Representation.Builder", MethodName: "CreateBool", AppropriateKind: ipld.ReprKindSet_JustBool, ActualKind: ipld.ReprKind_Map} +} +func (_Block__ReprBuilder) CreateInt(int) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Representation.Builder", MethodName: "CreateInt", AppropriateKind: ipld.ReprKindSet_JustInt, ActualKind: ipld.ReprKind_Map} +} +func (_Block__ReprBuilder) CreateFloat(float64) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Representation.Builder", MethodName: "CreateFloat", AppropriateKind: ipld.ReprKindSet_JustFloat, ActualKind: ipld.ReprKind_Map} +} +func (_Block__ReprBuilder) CreateString(string) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Representation.Builder", MethodName: "CreateString", AppropriateKind: ipld.ReprKindSet_JustString, ActualKind: ipld.ReprKind_Map} +} +func (_Block__ReprBuilder) CreateBytes([]byte) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Representation.Builder", MethodName: "CreateBytes", AppropriateKind: ipld.ReprKindSet_JustBytes, ActualKind: ipld.ReprKind_Map} +} +func (_Block__ReprBuilder) CreateLink(ipld.Link) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{TypeName: "Block.Representation.Builder", MethodName: "CreateLink", AppropriateKind: ipld.ReprKindSet_JustLink, ActualKind: ipld.ReprKind_Map} +} diff --git a/testutil/chaintypes/testchain_minima.go b/testutil/chaintypes/testchain_minima.go new file mode 100644 index 00000000..57cbfcd5 --- /dev/null +++ b/testutil/chaintypes/testchain_minima.go @@ -0,0 +1,14 @@ +package chaintypes + +import ( + ipld "github.com/ipld/go-ipld-prime" +) + +type mapIteratorReject struct{ err error } +type listIteratorReject struct{ err error } + +func (itr mapIteratorReject) Next() (ipld.Node, ipld.Node, error) { return nil, nil, itr.err } +func (itr mapIteratorReject) Done() bool { return false } + +func (itr listIteratorReject) Next() (int, ipld.Node, error) { return -1, nil, itr.err } +func (itr listIteratorReject) Done() bool { return false } diff --git a/testutil/testchain.go b/testutil/testchain.go index daff1ee2..d73ccb31 100644 --- a/testutil/testchain.go +++ b/testutil/testchain.go @@ -8,8 +8,8 @@ import ( blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/testutil/chaintypes" "github.com/ipld/go-ipld-prime" - "github.com/ipld/go-ipld-prime/fluent" ipldfree "github.com/ipld/go-ipld-prime/impl/free" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/traversal/selector" @@ -34,17 +34,53 @@ type TestBlockChain struct { TipLink ipld.Link } -func createBlock(nb fluent.NodeBuilder, parents []ipld.Link, size uint64) ipld.Node { - return nb.CreateMap(func(mb fluent.MapBuilder, knb fluent.NodeBuilder, vnb fluent.NodeBuilder) { - mb.Insert(knb.CreateString("Parents"), vnb.CreateList(func(lb fluent.ListBuilder, vnb fluent.NodeBuilder) { - for _, parent := range parents { - lb.Append(vnb.CreateLink(parent)) - } - })) - mb.Insert(knb.CreateString("Messages"), vnb.CreateList(func(lb fluent.ListBuilder, vnb fluent.NodeBuilder) { - lb.Append(vnb.CreateBytes(RandomBytes(int64(size)))) - })) - }) +func createBlock(parents []ipld.Link, size uint64) (ipld.Node, error) { + links := make([]ipld.Node, 0, len(parents)) + for _, parent := range parents { + lnb := chaintypes.Link__NodeBuilder() + link, err := lnb.CreateLink(parent) + if err != nil { + return nil, err + } + links = append(links, link) + } + pnb := chaintypes.Parents__NodeBuilder() + pnblnb, err := pnb.CreateList() + if err != nil { + return nil, err + } + err = pnblnb.AppendAll(links) + if err != nil { + return nil, err + } + parentsNd, err := pnblnb.Build() + if err != nil { + return nil, err + } + parentsNdTyped := parentsNd.(chaintypes.Parents) + mnb := chaintypes.Messages__NodeBuilder() + mnblnb, err := mnb.CreateList() + if err != nil { + return nil, err + } + bnb := chaintypes.Bytes__NodeBuilder() + bytes, err := bnb.CreateBytes(RandomBytes(int64(size))) + if err != nil { + return nil, err + } + err = mnblnb.Append(bytes) + if err != nil { + return nil, err + } + mesagesNd, err := mnblnb.Build() + if err != nil { + return nil, err + } + messagesNdTyped := mesagesNd.(chaintypes.Messages) + return chaintypes.Block{ + Parents: parentsNdTyped, + Messages: messagesNdTyped, + }, nil } // SetupBlockChain creates a new test block chain with the given height @@ -56,11 +92,7 @@ func SetupBlockChain( size uint64, blockChainLength int) *TestBlockChain { linkBuilder := cidlink.LinkBuilder{Prefix: cid.NewPrefixV1(cid.DagCBOR, mh.SHA2_256)} - var genisisNode ipld.Node - err := fluent.Recover(func() { - nb := fluent.WrapNodeBuilder(ipldfree.NodeBuilder()) - genisisNode = createBlock(nb, []ipld.Link{}, size) - }) + genisisNode, err := createBlock([]ipld.Link{}, size) require.NoError(t, err, "Error creating genesis block") genesisLink, err := linkBuilder.Build(ctx, ipld.LinkContext{}, genisisNode, storer) require.NoError(t, err, "Error creating link to genesis block") @@ -68,11 +100,7 @@ func SetupBlockChain( middleNodes := make([]ipld.Node, 0, blockChainLength-2) middleLinks := make([]ipld.Link, 0, blockChainLength-2) for i := 0; i < blockChainLength-2; i++ { - var node ipld.Node - err := fluent.Recover(func() { - nb := fluent.WrapNodeBuilder(ipldfree.NodeBuilder()) - node = createBlock(nb, []ipld.Link{parent}, size) - }) + node, err := createBlock([]ipld.Link{parent}, size) require.NoError(t, err, "Error creating middle block") middleNodes = append(middleNodes, node) link, err := linkBuilder.Build(ctx, ipld.LinkContext{}, node, storer) @@ -80,11 +108,7 @@ func SetupBlockChain( middleLinks = append(middleLinks, link) parent = link } - var tipNode ipld.Node - err = fluent.Recover(func() { - nb := fluent.WrapNodeBuilder(ipldfree.NodeBuilder()) - tipNode = createBlock(nb, []ipld.Link{parent}, size) - }) + tipNode, err := createBlock([]ipld.Link{parent}, size) require.NoError(t, err, "Error creating tip block") tipLink, err := linkBuilder.Build(ctx, ipld.LinkContext{}, tipNode, storer) require.NoError(t, err, "Error creating link to tip block") @@ -124,7 +148,7 @@ func (tbc *TestBlockChain) NodeTipIndex(fromTip int) ipld.Node { return tbc.MiddleNodes[height-1] } } -func (tbc *TestBlockChain) checkResponses(responses []graphsync.ResponseProgress, start int, end int) { +func (tbc *TestBlockChain) checkResponses(responses []graphsync.ResponseProgress, start int, end int, verifyTypes bool) { require.Len(tbc.t, responses, (end-start)*blockChainTraversedNodesPerBlock, "traverses all nodes") expectedPath := "" for i := 0; i < start; i++ { @@ -137,12 +161,20 @@ func (tbc *TestBlockChain) checkResponses(responses []graphsync.ResponseProgress for i, response := range responses { require.Equal(tbc.t, expectedPath, response.Path.String(), "response has correct path") if i%2 == 0 { + if verifyTypes { + _, ok := response.Node.(chaintypes.Block) + require.True(tbc.t, ok, "nodes in response should have correct type") + } if expectedPath == "" { expectedPath = "Parents" } else { expectedPath = expectedPath + "/Parents" } } else { + if verifyTypes { + _, ok := response.Node.(chaintypes.Parents) + require.True(tbc.t, ok, "nodes in response should have correct type") + } expectedPath = expectedPath + "/0" } if response.LastBlock.Path.String() != response.Path.String() { @@ -164,14 +196,27 @@ func (tbc *TestBlockChain) VerifyWholeChain(ctx context.Context, responseChan <- // VerifyRemainder verifies the given response channel returns the remainder of the chain starting at the nth block from the tip func (tbc *TestBlockChain) VerifyRemainder(ctx context.Context, responseChan <-chan graphsync.ResponseProgress, from int) { responses := CollectResponses(ctx, tbc.t, responseChan) - tbc.checkResponses(responses, from, tbc.blockChainLength) + tbc.checkResponses(responses, from, tbc.blockChainLength, false) } // VerifyResponseRange verifies the given response channel returns the given range of respnses, indexed from the tip // (with possibly more data left in the channel) func (tbc *TestBlockChain) VerifyResponseRange(ctx context.Context, responseChan <-chan graphsync.ResponseProgress, from int, to int) { responses := ReadNResponses(ctx, tbc.t, responseChan, (to-from)*blockChainTraversedNodesPerBlock) - tbc.checkResponses(responses, from, to) + tbc.checkResponses(responses, from, to, false) +} + +// VerifyWholeChainWithTypes verifies the given response channel returns the expected responses for the whole chain +// and that the types in the response are the expected types for a block chain +func (tbc *TestBlockChain) VerifyWholeChainWithTypes(ctx context.Context, responseChan <-chan graphsync.ResponseProgress) { + tbc.VerifyRemainderWithTypes(ctx, responseChan, 0) +} + +// VerifyRemainderWithTypes verifies the given response channel returns the remainder of the chain starting at the nth block from the tip +// and that the types in the response are the expected types for a block chain +func (tbc *TestBlockChain) VerifyRemainderWithTypes(ctx context.Context, responseChan <-chan graphsync.ResponseProgress, from int) { + responses := CollectResponses(ctx, tbc.t, responseChan) + tbc.checkResponses(responses, from, tbc.blockChainLength, true) } // Blocks Returns the given raw blocks for the block chain for the given range, indexed from the tip @@ -199,3 +244,8 @@ func (tbc *TestBlockChain) AllBlocks() []blocks.Block { func (tbc *TestBlockChain) RemainderBlocks(from int) []blocks.Block { return tbc.Blocks(from, tbc.blockChainLength) } + +// Chooser is a NodeBuilderChooser function that always returns the block chain +func (tbc *TestBlockChain) Chooser(ipld.Link, ipld.LinkContext) ipld.NodeBuilder { + return chaintypes.Block__NodeBuilder() +}