Skip to content

Commit

Permalink
Merge pull request #38 from ipfs/bugs/fix-messagequeue-shutdown
Browse files Browse the repository at this point in the history
fix(messagequeue): no retry after queue shutdown
  • Loading branch information
hannahhoward authored Sep 25, 2019
2 parents 90ce01c + 9c5652a commit d92aaa8
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
4 changes: 3 additions & 1 deletion messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
"time"

"github.com/ipfs/go-block-format"
blocks "github.com/ipfs/go-block-format"

gsmsg "github.com/ipfs/go-graphsync/message"
gsnet "github.com/ipfs/go-graphsync/network"
Expand Down Expand Up @@ -189,6 +189,8 @@ func (mq *MessageQueue) attemptSendAndRecovery(message gsmsg.GraphSyncMessage) b
mq.sender = nil

select {
case <-mq.done:
return true
case <-mq.ctx.Done():
return true
case <-time.After(time.Millisecond * 100):
Expand Down
67 changes: 67 additions & 0 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package messagequeue

import (
"context"
"fmt"
"math/rand"
"reflect"
"sync"
Expand Down Expand Up @@ -89,6 +90,72 @@ func TestStartupAndShutdown(t *testing.T) {
}
}

func TestShutdownDuringMessageSend(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

peer := testutil.GeneratePeers(1)[0]
messagesSent := make(chan gsmsg.GraphSyncMessage)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
messageSender := &fakeMessageSender{
fmt.Errorf("Something went wrong"),
fullClosedChan,
resetChan,
messagesSent}
var waitGroup sync.WaitGroup
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}

messageQueue := New(ctx, peer, messageNetwork)
messageQueue.Startup()
id := gsmsg.GraphSyncRequestID(rand.Int31())
priority := gsmsg.GraphSyncPriority(rand.Int31())
selector := testutil.RandomBytes(100)
root := testutil.GenerateCids(1)[0]

// setup a message and advance as far as beginning to send it
waitGroup.Add(1)
messageQueue.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
waitGroup.Wait()

// now shut down
messageQueue.Shutdown()

// let the message send attempt complete and fail (as it would if
// the connection were closed)
select {
case <-ctx.Done():
t.Fatal("message send not attempted")
case <-messagesSent:
}

// verify the connection is reset after a failed send attempt
select {
case <-resetChan:
case <-fullClosedChan:
t.Fatal("message sender should have been reset but was closed")
case <-ctx.Done():
t.Fatal("message sender should have been closed but wasn't")
}

// now verify after it's reset, no further retries, connection
// resets, or attempts to close the connection, cause the queue
// should realize it's shut down and stop processing
// FIXME: this relies on time passing -- 100 ms to be exact
// and we should instead mock out time as a dependency
waitGroup.Add(1)
select {
case <-messagesSent:
t.Fatal("should not have attempted to send second message")
case <-resetChan:
t.Fatal("message sender should not have been reset again")
case <-fullClosedChan:
t.Fatal("message sender should not have been closed closed")
case <-ctx.Done():
}
}

func TestProcessingNotification(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
Expand Down

0 comments on commit d92aaa8

Please sign in to comment.