Skip to content

Commit

Permalink
Made the gRPC daemon actually wait for port close completion
Browse files Browse the repository at this point in the history
  • Loading branch information
cmaglie committed Oct 26, 2023
1 parent 7dc60f3 commit 2d33dec
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 4 deletions.
17 changes: 15 additions & 2 deletions commands/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"sync/atomic"

"github.com/arduino/arduino-cli/arduino"
"github.com/arduino/arduino-cli/commands"
Expand Down Expand Up @@ -451,6 +452,10 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
_ = syncSend.Send(&rpc.MonitorResponse{Success: true})

cancelCtx, cancel := context.WithCancel(stream.Context())
gracefulCloseInitiated := &atomic.Bool{}
gracefuleCloseCtx, gracefulCloseCancel := context.WithCancel(context.Background())

// gRPC stream receiver (gRPC data -> monitor, config, close)
go func() {
defer cancel()
for {
Expand All @@ -470,9 +475,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
}
}
if closeMsg := msg.GetClose(); closeMsg {
gracefulCloseInitiated.Store(true)
if err := portProxy.Close(); err != nil {
logrus.WithError(err).Debug("Error closing monitor port")
}
gracefulCloseCancel()
}
tx := msg.GetTxData()
for len(tx) > 0 {
Expand All @@ -489,8 +496,9 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
}
}()

// gRPC stream sender (monitor -> gRPC)
go func() {
defer cancel()
defer cancel() // unlock the receiver
buff := make([]byte, 4096)
for {
n, err := portProxy.Read(buff)
Expand All @@ -508,6 +516,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
}()

<-cancelCtx.Done()
portProxy.Close()
if gracefulCloseInitiated.Load() {
// Port closing has been initiated in the receiver
<-gracefuleCloseCtx.Done()
} else {
portProxy.Close()
}
return nil
}
2 changes: 1 addition & 1 deletion internal/integrationtest/monitor/monitor_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestMonitorGRPCClose(t *testing.T) {
}

// Now close the monitor using MonitorRequest_Close
for tries := 0; tries < 5; tries++ { // Try the test 5 times to avoid flukes
{
// Keep a timeout to allow the test to exit in any case
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
mon, err := grpcInst.Monitor(ctx, ports[0].Port)
Expand Down
2 changes: 1 addition & 1 deletion internal/mock_serial_monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (d *SerialMonitor) Close() error {
d.mockedSerialPort.Close()
d.openedPort = false
if d.muxFile != nil {
time.Sleep(500 * time.Millisecond) // Emulate a small delay closing the port to check gRPC synchronization
time.Sleep(2000 * time.Millisecond) // Emulate a small delay closing the port to check gRPC synchronization
d.muxFile.Remove()
d.muxFile = nil
}
Expand Down

0 comments on commit 2d33dec

Please sign in to comment.