Skip to content

Commit

Permalink
fix(messagequeue): no retry after queue shutdown
Browse files Browse the repository at this point in the history
Previously, we allowed the last message send attempt to run to maxRetries if the queue was shutdown,
which also generated warning logs. Now, if a send attempt fails, check that the queue has shutdown
and immediately return if it has
  • Loading branch information
hannahhoward committed Sep 25, 2019
1 parent 90ce01c commit 9c5652a
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
4 changes: 3 additions & 1 deletion messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
"time"

"github.com/ipfs/go-block-format"
blocks "github.com/ipfs/go-block-format"

gsmsg "github.com/ipfs/go-graphsync/message"
gsnet "github.com/ipfs/go-graphsync/network"
Expand Down Expand Up @@ -189,6 +189,8 @@ func (mq *MessageQueue) attemptSendAndRecovery(message gsmsg.GraphSyncMessage) b
mq.sender = nil

select {
case <-mq.done:
return true
case <-mq.ctx.Done():
return true
case <-time.After(time.Millisecond * 100):
Expand Down
67 changes: 67 additions & 0 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package messagequeue

import (
"context"
"fmt"
"math/rand"
"reflect"
"sync"
Expand Down Expand Up @@ -89,6 +90,72 @@ func TestStartupAndShutdown(t *testing.T) {
}
}

func TestShutdownDuringMessageSend(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

peer := testutil.GeneratePeers(1)[0]
messagesSent := make(chan gsmsg.GraphSyncMessage)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
messageSender := &fakeMessageSender{
fmt.Errorf("Something went wrong"),
fullClosedChan,
resetChan,
messagesSent}
var waitGroup sync.WaitGroup
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}

messageQueue := New(ctx, peer, messageNetwork)
messageQueue.Startup()
id := gsmsg.GraphSyncRequestID(rand.Int31())
priority := gsmsg.GraphSyncPriority(rand.Int31())
selector := testutil.RandomBytes(100)
root := testutil.GenerateCids(1)[0]

// setup a message and advance as far as beginning to send it
waitGroup.Add(1)
messageQueue.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
waitGroup.Wait()

// now shut down
messageQueue.Shutdown()

// let the message send attempt complete and fail (as it would if
// the connection were closed)
select {
case <-ctx.Done():
t.Fatal("message send not attempted")
case <-messagesSent:
}

// verify the connection is reset after a failed send attempt
select {
case <-resetChan:
case <-fullClosedChan:
t.Fatal("message sender should have been reset but was closed")
case <-ctx.Done():
t.Fatal("message sender should have been closed but wasn't")
}

// now verify after it's reset, no further retries, connection
// resets, or attempts to close the connection, cause the queue
// should realize it's shut down and stop processing
// FIXME: this relies on time passing -- 100 ms to be exact
// and we should instead mock out time as a dependency
waitGroup.Add(1)
select {
case <-messagesSent:
t.Fatal("should not have attempted to send second message")
case <-resetChan:
t.Fatal("message sender should not have been reset again")
case <-fullClosedChan:
t.Fatal("message sender should not have been closed closed")
case <-ctx.Done():
}
}

func TestProcessingNotification(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
Expand Down

0 comments on commit 9c5652a

Please sign in to comment.