Skip to content

Commit

Permalink
Better error message on complete (#145)
Browse files Browse the repository at this point in the history
* fix: better error message on complete

* fix: on complete error message
  • Loading branch information
dirkmc authored Feb 12, 2021
1 parent d2ab033 commit dc9e05a
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 20 deletions.
3 changes: 0 additions & 3 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ const ErrPause = errorType("pause channel")
// use to resume the channel
const ErrResume = errorType("resume channel")

// ErrIncomplete indicates a channel did not finish transferring data successfully
const ErrIncomplete = errorType("incomplete response")

// ErrRejected indicates a request was not accepted
const ErrRejected = errorType("response rejected")

Expand Down
8 changes: 5 additions & 3 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ func (m *manager) OnRequestDisconnected(ctx context.Context, chid datatransfer.C
return nil
}

func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool) error {
if success {
func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, completeErr error) error {
if completeErr == nil {
if chid.Initiator != m.peerID {
msg, err := m.completeMessage(chid)
if err != nil {
Expand Down Expand Up @@ -316,7 +316,9 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool)
}
// send an error, but only if we haven't already errored for some reason
if chst.Status() != datatransfer.Failing && chst.Status() != datatransfer.Failed {
return m.channels.Error(chid, datatransfer.ErrIncomplete)
err := xerrors.Errorf("data transfer channel %s failed to transfer data: %w", chid, completeErr)
log.Warnf(err.Error())
return m.channels.Error(chid, err)
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions impl/responding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func TestDataTransferResponding(t *testing.T) {
verify: func(t *testing.T, h *receiverHarness) {
_, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.pullRequest)
require.NoError(t, err)
err = h.transport.EventHandler.OnChannelCompleted(channelID(h.id, h.peers), true)
err = h.transport.EventHandler.OnChannelCompleted(channelID(h.id, h.peers), nil)
require.NoError(t, err)
require.Len(t, h.network.SentMessages, 1)
response, ok := h.network.SentMessages[0].Message.(datatransfer.Response)
Expand Down Expand Up @@ -507,7 +507,7 @@ func TestDataTransferResponding(t *testing.T) {
verify: func(t *testing.T, h *receiverHarness) {
_, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.pullRequest)
require.NoError(t, err)
err = h.transport.EventHandler.OnChannelCompleted(channelID(h.id, h.peers), false)
err = h.transport.EventHandler.OnChannelCompleted(channelID(h.id, h.peers), xerrors.Errorf("err"))
require.NoError(t, err)
},
},
Expand Down
6 changes: 3 additions & 3 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ type EventsHandler interface {
// - err == ErrPause - pause this request (only for new requests)
// - err == ErrResume - resume this request (only for update requests)
OnRequestReceived(chid ChannelID, msg Request) (Response, error)
// OnResponseCompleted is called when we finish sending data for the given channel ID
// Error returns are logged but otherwise have not effect
OnChannelCompleted(chid ChannelID, success bool) error
// OnChannelCompleted is called when we finish transferring data for the given channel ID
// Error returns are logged but otherwise have no effect
OnChannelCompleted(chid ChannelID, err error) error

// OnRequestTimedOut is called when a request we opened (with the given channel Id) to receive data times out.
// Error returns are logged but otherwise have no effect
Expand Down
51 changes: 44 additions & 7 deletions transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,12 @@ func (t *Transport) executeGsRequest(ctx context.Context, internalCtx context.Co
}

log.Debugf("finished executing graphsync request for channel %s", channelID)
err := t.events.OnChannelCompleted(channelID, lastError == nil)

var completeErr error
if lastError != nil {
completeErr = xerrors.Errorf("graphsync request failed to complete: %w", lastError)
}
err := t.events.OnChannelCompleted(channelID, completeErr)
if err != nil {
log.Error(err)
}
Expand Down Expand Up @@ -533,13 +538,45 @@ func (t *Transport) gsCompletedResponseListener(p peer.ID, request graphsync.Req
return
}

if status != graphsync.RequestCancelled {
success := status == graphsync.RequestCompletedFull
err := t.events.OnChannelCompleted(chid, success)
if err != nil {
log.Error(err)
}
if status == graphsync.RequestCancelled {
return
}

var completeErr error
if status != graphsync.RequestCompletedFull {
statusStr := gsResponseStatusCodeString(status)
completeErr = xerrors.Errorf("graphsync response to peer %s did not complete: response status code %s", p, statusStr)
}
err := t.events.OnChannelCompleted(chid, completeErr)
if err != nil {
log.Error(err)
}
}

// Remove this map once this PR lands: https://github.com/ipfs/go-graphsync/pull/148
var gsResponseStatusCodes = map[graphsync.ResponseStatusCode]string{
graphsync.RequestAcknowledged: "RequestAcknowledged",
graphsync.AdditionalPeers: "AdditionalPeers",
graphsync.NotEnoughGas: "NotEnoughGas",
graphsync.OtherProtocol: "OtherProtocol",
graphsync.PartialResponse: "PartialResponse",
graphsync.RequestPaused: "RequestPaused",
graphsync.RequestCompletedFull: "RequestCompletedFull",
graphsync.RequestCompletedPartial: "RequestCompletedPartial",
graphsync.RequestRejected: "RequestRejected",
graphsync.RequestFailedBusy: "RequestFailedBusy",
graphsync.RequestFailedUnknown: "RequestFailedUnknown",
graphsync.RequestFailedLegal: "RequestFailedLegal",
graphsync.RequestFailedContentNotFound: "RequestFailedContentNotFound",
graphsync.RequestCancelled: "RequestCancelled",
}

func gsResponseStatusCodeString(code graphsync.ResponseStatusCode) string {
str, ok := gsResponseStatusCodes[code]
if ok {
return str
}
return gsResponseStatusCodes[graphsync.RequestFailedUnknown]
}

func (t *Transport) cleanupChannel(chid datatransfer.ChannelID, gsKey graphsyncKey) {
Expand Down
4 changes: 2 additions & 2 deletions transport/graphsync/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,9 +1041,9 @@ func (fe *fakeEvents) OnResponseReceived(chid datatransfer.ChannelID, response d
return err
}

func (fe *fakeEvents) OnChannelCompleted(chid datatransfer.ChannelID, success bool) error {
func (fe *fakeEvents) OnChannelCompleted(chid datatransfer.ChannelID, completeErr error) error {
fe.OnChannelCompletedCalled = true
fe.ChannelCompletedSuccess = success
fe.ChannelCompletedSuccess = completeErr == nil
return fe.OnChannelCompletedErr
}

Expand Down

0 comments on commit dc9e05a

Please sign in to comment.