Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(messagequeue): no retry after queue shutdown #38

Merged
merged 1 commit into from
Sep 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
"time"

"github.com/ipfs/go-block-format"
blocks "github.com/ipfs/go-block-format"

gsmsg "github.com/ipfs/go-graphsync/message"
gsnet "github.com/ipfs/go-graphsync/network"
Expand Down Expand Up @@ -189,6 +189,8 @@ func (mq *MessageQueue) attemptSendAndRecovery(message gsmsg.GraphSyncMessage) b
mq.sender = nil

select {
case <-mq.done:
return true
case <-mq.ctx.Done():
return true
case <-time.After(time.Millisecond * 100):
Expand Down
67 changes: 67 additions & 0 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package messagequeue

import (
"context"
"fmt"
"math/rand"
"reflect"
"sync"
Expand Down Expand Up @@ -89,6 +90,72 @@ func TestStartupAndShutdown(t *testing.T) {
}
}

func TestShutdownDuringMessageSend(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

peer := testutil.GeneratePeers(1)[0]
messagesSent := make(chan gsmsg.GraphSyncMessage)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
messageSender := &fakeMessageSender{
fmt.Errorf("Something went wrong"),
fullClosedChan,
resetChan,
messagesSent}
var waitGroup sync.WaitGroup
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}

messageQueue := New(ctx, peer, messageNetwork)
messageQueue.Startup()
id := gsmsg.GraphSyncRequestID(rand.Int31())
priority := gsmsg.GraphSyncPriority(rand.Int31())
selector := testutil.RandomBytes(100)
root := testutil.GenerateCids(1)[0]

// setup a message and advance as far as beginning to send it
waitGroup.Add(1)
messageQueue.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
waitGroup.Wait()

// now shut down
messageQueue.Shutdown()

// let the message send attempt complete and fail (as it would if
// the connection were closed)
select {
case <-ctx.Done():
t.Fatal("message send not attempted")
case <-messagesSent:
}

// verify the connection is reset after a failed send attempt
select {
case <-resetChan:
case <-fullClosedChan:
t.Fatal("message sender should have been reset but was closed")
case <-ctx.Done():
t.Fatal("message sender should have been closed but wasn't")
}

// now verify after it's reset, no further retries, connection
// resets, or attempts to close the connection, cause the queue
// should realize it's shut down and stop processing
// FIXME: this relies on time passing -- 100 ms to be exact
// and we should instead mock out time as a dependency
waitGroup.Add(1)
select {
case <-messagesSent:
t.Fatal("should not have attempted to send second message")
case <-resetChan:
t.Fatal("message sender should not have been reset again")
case <-fullClosedChan:
t.Fatal("message sender should not have been closed closed")
case <-ctx.Done():
}
}

func TestProcessingNotification(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
Expand Down