Skip to content

Commit

Permalink
Test cleanup, accessors for optional cancelCtx
Browse files Browse the repository at this point in the history
  • Loading branch information
adamcfraser committed Oct 13, 2024
1 parent 1d5bd9b commit a3e9229
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 46 deletions.
8 changes: 6 additions & 2 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,18 @@ func (blipCtx *Context) GetBytesReceived() uint64 {
return blipCtx.bytesReceived.Load()
}

// Opens a BLIP connection to a host.
func (blipCtx *Context) CancelCtx() context.Context {
// GetCancelCtx returns cancelc=Ctx if it has been set. Otherwise returns non-cancellable context.
func (blipCtx *Context) GetCancelCtx() context.Context {
if blipCtx.cancelCtx != nil {
return blipCtx.cancelCtx
}
return context.TODO()
}

func (blipCtx *Context) SetCancelCtx(cancelCtx context.Context) {
blipCtx.cancelCtx = cancelCtx
}

// DialOptions is used by DialConfig to oepn a BLIP connection.
type DialOptions struct {
URL string
Expand Down
94 changes: 51 additions & 43 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,12 +581,10 @@ func TestOrigin(t *testing.T) {
// TestServerContextClose tests closing server using cancellable context, ensure that clients are disconnected
//
// Test:
//
// - Start two blip contexts: an echo server and an echo client
// - The echo server is configured to respond to incoming echo requests and return responses
// - The echo client tries to read the response after sending the request
// - Expected: the echo client should receive some sort of error when trying to read the response, since the server abruptly terminated the connection
// - Actual: the echo client blocks indefinitely trying to read the response
// - The echo client sends echo requests on a loop
// - Expected: the echo client should receive some sort of error when the server closes the connection, and should not block
func TestServerContextClose(t *testing.T) {

blipContextEchoServer, err := NewContext(defaultContextOptions)
Expand All @@ -596,9 +594,9 @@ func TestServerContextClose(t *testing.T) {

receivedRequests := sync.WaitGroup{}

// ----------------- Setup Echo Server that abruptly terminates socket -------------------------
// ----------------- Setup Echo Server that will be closed via cancellation context -------------------------

// Create a blip profile handler to respond to echo requests and then abruptly close the socket
// Create a blip profile handler to respond to echo requests
dispatchEcho := func(request *Message) {
defer receivedRequests.Done()
body, err := request.Body()
Expand All @@ -621,7 +619,7 @@ func TestServerContextClose(t *testing.T) {
blipContextEchoServer.LogFrames = true

serverCancelCtx, cancelFunc := context.WithCancel(context.Background())
blipContextEchoServer.cancelCtx = serverCancelCtx
blipContextEchoServer.SetCancelCtx(serverCancelCtx)

// Websocket Server
server := blipContextEchoServer.WebSocketServer()
Expand All @@ -632,12 +630,13 @@ func TestServerContextClose(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer listener.Close()
go func() {
t.Error(http.Serve(listener, nil))
err := http.Serve(listener, nil)
log.Printf("server goroutine closed with error: %v", err)
}()

// ----------------- Setup Echo Client ----------------------------------------

blipContextEchoClient, err := NewContext(defaultContextOptions)
if err != nil {
t.Fatal(err)
Expand All @@ -651,52 +650,61 @@ func TestServerContextClose(t *testing.T) {

var closeWg, delayWg sync.WaitGroup

// Start a goroutine to send echo request every 100 ms
delayWg.Add(1)
closeWg.Add(1)
// Start a goroutine to send echo request every 100 ms, time out after 30s (if test fails)
delayWg.Add(1) // wait for connection and messages to be sent before cancelling server context
closeWg.Add(1) // wait for client to disconnect before exiting test
go func() {
defer closeWg.Done()
for i := 0; i < 100; i++ {
if i == 10 {
delayWg.Done()
}
// Create echo request
echoResponseBody := []byte("hello")
echoRequest := NewRequest()
echoRequest.SetProfile("BLIPTest/EchoData")
echoRequest.Properties["Content-Type"] = "application/octet-stream"
echoRequest.SetBody(echoResponseBody)
receivedRequests.Add(1)
sent := sender.Send(echoRequest)
assert.True(t, sent)

// Read the echo response
response := echoRequest.Response() // <--- SG #3268 was causing this to block indefinitely
responseBody, err := response.Body()
assert.True(t, err == nil)
if len(responseBody) == 0 {
log.Printf("empty response, connection closed")
timeout := time.After(time.Second * 30)
ticker := time.NewTicker(time.Millisecond * 50)
echoCount := 0
for {
select {
case <-timeout:
t.Fatalf("Echo client connection wasn't closed before timeout expired")
return
case <-ticker.C:
{
echoCount++
// After sending 10 echoes, close delayWg to trigger server-side cancellation
log.Printf("Sending echo %v", echoCount)
if echoCount == 10 {
delayWg.Done()
}
// Create echo request
echoResponseBody := []byte("hello")
echoRequest := NewRequest()
echoRequest.SetProfile("BLIPTest/EchoData")
echoRequest.Properties["Content-Type"] = "application/octet-stream"
echoRequest.SetBody(echoResponseBody)
receivedRequests.Add(1)
sent := sender.Send(echoRequest)
assert.True(t, sent)

// Read the echo response. Closed connection will result in empty response, as EOF message
// isn't currently returned by blip client
response := echoRequest.Response()
responseBody, err := response.Body()
assert.True(t, err == nil)
if len(responseBody) == 0 {
log.Printf("empty response, connection closed")
return
}

assert.Equal(t, echoResponseBody, responseBody)
}
}
assert.Equal(t, echoResponseBody, responseBody)
time.Sleep(time.Millisecond * 100)
}
}()

// Wait for client to start sending echo messages before stopping server
delayWg.Wait()

// Cancel context on server
cancelFunc()

// Wait for client echo loop to exit due to closed connection before exiting test
closeWg.Wait()
//request.Sender.conn.Close(websocket.StatusNoStatusRcvd, "")
/*
// Wait until the echo server profile handler was invoked and completely finished (and thus abruptly closed socket)
err = WaitWithTimeout(&receivedRequests, time.Second*60)
if err != nil {
t.Fatal(err)
}
*/

}

Expand Down
2 changes: 1 addition & 1 deletion receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (r *receiver) receiveLoop() error {

for {
// Receive the next raw WebSocket frame:
_, frame, err := r.conn.Read(r.context.CancelCtx())
_, frame, err := r.conn.Read(r.context.GetCancelCtx())
if err != nil {
if isCloseError(err) {
// lower log level for close
Expand Down

0 comments on commit a3e9229

Please sign in to comment.