From 2d33dec0daad828fc00b4e55a53ef9c761353907 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Thu, 26 Oct 2023 17:05:43 +0200 Subject: [PATCH] Made the gRPC daemon actually wait for port close completion --- commands/daemon/daemon.go | 17 +++++++++++++++-- .../monitor/monitor_grpc_test.go | 2 +- internal/mock_serial_monitor/main.go | 2 +- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/commands/daemon/daemon.go b/commands/daemon/daemon.go index 90304f1d480..5d4018aae2d 100644 --- a/commands/daemon/daemon.go +++ b/commands/daemon/daemon.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "sync/atomic" "github.com/arduino/arduino-cli/arduino" "github.com/arduino/arduino-cli/commands" @@ -451,6 +452,10 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer _ = syncSend.Send(&rpc.MonitorResponse{Success: true}) cancelCtx, cancel := context.WithCancel(stream.Context()) + gracefulCloseInitiated := &atomic.Bool{} + gracefuleCloseCtx, gracefulCloseCancel := context.WithCancel(context.Background()) + + // gRPC stream receiver (gRPC data -> monitor, config, close) go func() { defer cancel() for { @@ -470,9 +475,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer } } if closeMsg := msg.GetClose(); closeMsg { + gracefulCloseInitiated.Store(true) if err := portProxy.Close(); err != nil { logrus.WithError(err).Debug("Error closing monitor port") } + gracefulCloseCancel() } tx := msg.GetTxData() for len(tx) > 0 { @@ -489,8 +496,9 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer } }() + // gRPC stream sender (monitor -> gRPC) go func() { - defer cancel() + defer cancel() // unlock the receiver buff := make([]byte, 4096) for { n, err := portProxy.Read(buff) @@ -508,6 +516,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer }() <-cancelCtx.Done() - portProxy.Close() + if gracefulCloseInitiated.Load() { + // Port closing has been initiated in the receiver + <-gracefuleCloseCtx.Done() + } else { + portProxy.Close() + } return nil } diff --git a/internal/integrationtest/monitor/monitor_grpc_test.go b/internal/integrationtest/monitor/monitor_grpc_test.go index a7a681ce28a..c96dc36d304 100644 --- a/internal/integrationtest/monitor/monitor_grpc_test.go +++ b/internal/integrationtest/monitor/monitor_grpc_test.go @@ -82,7 +82,7 @@ func TestMonitorGRPCClose(t *testing.T) { } // Now close the monitor using MonitorRequest_Close - for tries := 0; tries < 5; tries++ { // Try the test 5 times to avoid flukes + { // Keep a timeout to allow the test to exit in any case ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) mon, err := grpcInst.Monitor(ctx, ports[0].Port) diff --git a/internal/mock_serial_monitor/main.go b/internal/mock_serial_monitor/main.go index c7779ca0e00..f13745e1458 100644 --- a/internal/mock_serial_monitor/main.go +++ b/internal/mock_serial_monitor/main.go @@ -197,7 +197,7 @@ func (d *SerialMonitor) Close() error { d.mockedSerialPort.Close() d.openedPort = false if d.muxFile != nil { - time.Sleep(500 * time.Millisecond) // Emulate a small delay closing the port to check gRPC synchronization + time.Sleep(2000 * time.Millisecond) // Emulate a small delay closing the port to check gRPC synchronization d.muxFile.Remove() d.muxFile = nil }