diff --git a/messagequeue/messagequeue.go b/messagequeue/messagequeue.go index ff19722a..541170ce 100644 --- a/messagequeue/messagequeue.go +++ b/messagequeue/messagequeue.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/ipfs/go-block-format" + blocks "github.com/ipfs/go-block-format" gsmsg "github.com/ipfs/go-graphsync/message" gsnet "github.com/ipfs/go-graphsync/network" @@ -189,6 +189,8 @@ func (mq *MessageQueue) attemptSendAndRecovery(message gsmsg.GraphSyncMessage) b mq.sender = nil select { + case <-mq.done: + return true case <-mq.ctx.Done(): return true case <-time.After(time.Millisecond * 100): diff --git a/messagequeue/messagequeue_test.go b/messagequeue/messagequeue_test.go index f4553868..e512e280 100644 --- a/messagequeue/messagequeue_test.go +++ b/messagequeue/messagequeue_test.go @@ -2,6 +2,7 @@ package messagequeue import ( "context" + "fmt" "math/rand" "reflect" "sync" @@ -89,6 +90,72 @@ func TestStartupAndShutdown(t *testing.T) { } } +func TestShutdownDuringMessageSend(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + peer := testutil.GeneratePeers(1)[0] + messagesSent := make(chan gsmsg.GraphSyncMessage) + resetChan := make(chan struct{}, 1) + fullClosedChan := make(chan struct{}, 1) + messageSender := &fakeMessageSender{ + fmt.Errorf("Something went wrong"), + fullClosedChan, + resetChan, + messagesSent} + var waitGroup sync.WaitGroup + messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup} + + messageQueue := New(ctx, peer, messageNetwork) + messageQueue.Startup() + id := gsmsg.GraphSyncRequestID(rand.Int31()) + priority := gsmsg.GraphSyncPriority(rand.Int31()) + selector := testutil.RandomBytes(100) + root := testutil.GenerateCids(1)[0] + + // setup a message and advance as far as beginning to send it + waitGroup.Add(1) + messageQueue.AddRequest(gsmsg.NewRequest(id, root, selector, priority)) + waitGroup.Wait() + + // now shut down + messageQueue.Shutdown() + + // let the message send attempt complete and fail (as it would if + // the connection were closed) + select { + case <-ctx.Done(): + t.Fatal("message send not attempted") + case <-messagesSent: + } + + // verify the connection is reset after a failed send attempt + select { + case <-resetChan: + case <-fullClosedChan: + t.Fatal("message sender should have been reset but was closed") + case <-ctx.Done(): + t.Fatal("message sender should have been closed but wasn't") + } + + // now verify after it's reset, no further retries, connection + // resets, or attempts to close the connection, cause the queue + // should realize it's shut down and stop processing + // FIXME: this relies on time passing -- 100 ms to be exact + // and we should instead mock out time as a dependency + waitGroup.Add(1) + select { + case <-messagesSent: + t.Fatal("should not have attempted to send second message") + case <-resetChan: + t.Fatal("message sender should not have been reset again") + case <-fullClosedChan: + t.Fatal("message sender should not have been closed closed") + case <-ctx.Done(): + } +} + func TestProcessingNotification(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second)