From da4c34648b2247a014e55ff7494e8e6c87832ab4 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Fri, 30 Jun 2023 13:43:10 +0100 Subject: [PATCH 1/4] Ensure thread-safe ref count update --- Bonsai.System/IO/Ports/SerialPortDisposable.cs | 5 ++++- Bonsai.System/IO/Ports/SerialPortManager.cs | 10 +++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/Bonsai.System/IO/Ports/SerialPortDisposable.cs b/Bonsai.System/IO/Ports/SerialPortDisposable.cs index b623d9ae6..d5ba79cb9 100644 --- a/Bonsai.System/IO/Ports/SerialPortDisposable.cs +++ b/Bonsai.System/IO/Ports/SerialPortDisposable.cs @@ -27,7 +27,10 @@ public void Dispose() var disposable = Interlocked.Exchange(ref resource, null); if (disposable != null) { - disposable.Dispose(); + lock (SerialPortManager.SyncRoot) + { + disposable.Dispose(); + } } } } diff --git a/Bonsai.System/IO/Ports/SerialPortManager.cs b/Bonsai.System/IO/Ports/SerialPortManager.cs index 3633ae8a8..539b0ebe5 100644 --- a/Bonsai.System/IO/Ports/SerialPortManager.cs +++ b/Bonsai.System/IO/Ports/SerialPortManager.cs @@ -13,8 +13,8 @@ internal static class SerialPortManager { public const string DefaultConfigurationFile = "SerialPort.config"; static readonly bool IsRunningOnMono = Type.GetType("Mono.Runtime") != null; - static readonly Dictionary> openConnections = new Dictionary>(); - static readonly object openConnectionsLock = new object(); + static readonly Dictionary> openConnections = new(); + internal static readonly object SyncRoot = new(); public static SerialPortDisposable ReserveConnection(string portName) { @@ -30,7 +30,7 @@ internal static SerialPortDisposable ReserveConnection(string portName, SerialPo } Tuple connection; - lock (openConnectionsLock) + lock (SyncRoot) { if (!openConnections.TryGetValue(portName, out connection)) { @@ -103,9 +103,9 @@ void ConfigureSerialPort(SerialPort serialPort) openConnections.Add(portName, connection); return new SerialPortDisposable(serialPort, refCount); } - } - return new SerialPortDisposable(connection.Item1, connection.Item2.GetDisposable()); + return new SerialPortDisposable(connection.Item1, connection.Item2.GetDisposable()); + } } [Obsolete] From 5626d067f36b0e6366e4c981a8bac2485fdb1a5b Mon Sep 17 00:00:00 2001 From: glopesdev Date: Fri, 30 Jun 2023 14:02:40 +0100 Subject: [PATCH 2/4] Avoid deadlock when disposing serial connection --- .../IO/Ports/ObservableSerialPort.cs | 82 +++++++++++++++---- 1 file changed, 67 insertions(+), 15 deletions(-) diff --git a/Bonsai.System/IO/Ports/ObservableSerialPort.cs b/Bonsai.System/IO/Ports/ObservableSerialPort.cs index 804f152b5..1e8f1a052 100644 --- a/Bonsai.System/IO/Ports/ObservableSerialPort.cs +++ b/Bonsai.System/IO/Ports/ObservableSerialPort.cs @@ -3,6 +3,8 @@ using System.Reactive.Disposables; using System.IO.Ports; using System.Text.RegularExpressions; +using System.Threading; +using System.Reactive.Concurrency; namespace Bonsai.IO { @@ -48,35 +50,85 @@ public static IObservable ReadLine(string portName, string newLine) return Observable.Create(observer => { var data = string.Empty; + Action dispose = default; var connection = SerialPortManager.ReserveConnection(portName); SerialDataReceivedEventHandler dataReceivedHandler; var serialPort = connection.SerialPort; + var baseStream = connection.SerialPort.BaseStream; dataReceivedHandler = (sender, e) => { - switch (e.EventType) + try { - case SerialData.Eof: observer.OnCompleted(); break; - case SerialData.Chars: - default: - if (serialPort.IsOpen && serialPort.BytesToRead > 0) - { - data += serialPort.ReadExisting(); - var lines = data.Split(new[] { newLine }, StringSplitOptions.None); - for (int i = 0; i < lines.Length; i++) + switch (e.EventType) + { + case SerialData.Eof: observer.OnCompleted(); break; + case SerialData.Chars: + default: + if (serialPort.IsOpen && serialPort.BytesToRead > 0) { - if (i == lines.Length - 1) data = lines[i]; - else observer.OnNext(lines[i]); + data += serialPort.ReadExisting(); + var lines = data.Split(new[] { newLine }, StringSplitOptions.None); + for (int i = 0; i < lines.Length; i++) + { + if (i == lines.Length - 1) data = lines[i]; + else observer.OnNext(lines[i]); + } } - } - break; + break; + } + } + finally + { + if (dispose != null) + { + // If we reach this branch, we might be in deadlock + // so we share the responsibility of disposing the + // serial port. + dispose?.Invoke(); + dispose = null; + } } }; - connection.SerialPort.DataReceived += dataReceivedHandler; return Disposable.Create(() => { connection.SerialPort.DataReceived -= dataReceivedHandler; - connection.Dispose(); + + // Arm the dispose call. We should not need a memory barrier here + // since both threads are already sharing a lock. + dispose = connection.Dispose; + + void TryDispose() + { + // We do an async spin lock until someone can dispose the serial port. + // Since the dispose call is idempotent it is enough to guarantee + // at-least-once semantics + if (dispose == null) return; + + // The SerialPort class holds a lock on base stream to + // ensure synchronization between calls to Dispose and + // calls to DataReceived handler + if (Monitor.TryEnter(baseStream)) + { + // If we enter the critical section we can go ahead and + // dispose the serial port + try + { + dispose?.Invoke(); + dispose = null; + } + finally { Monitor.Exit(baseStream); } + } + else + { + // If we reach this branch we may be in deadlock so we + // need to release this thread + DefaultScheduler.Instance.Schedule(TryDispose); + } + } + + // Run the spin lock + TryDispose(); }); }); } From dd4d9f0114d495e1a59d5e3aadc2e7370c5aa67b Mon Sep 17 00:00:00 2001 From: glopesdev Date: Fri, 30 Jun 2023 17:01:51 +0100 Subject: [PATCH 3/4] Avoid reordering of read access to shared variable --- .../IO/Ports/ObservableSerialPort.cs | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/Bonsai.System/IO/Ports/ObservableSerialPort.cs b/Bonsai.System/IO/Ports/ObservableSerialPort.cs index 1e8f1a052..39aebde87 100644 --- a/Bonsai.System/IO/Ports/ObservableSerialPort.cs +++ b/Bonsai.System/IO/Ports/ObservableSerialPort.cs @@ -50,7 +50,7 @@ public static IObservable ReadLine(string portName, string newLine) return Observable.Create(observer => { var data = string.Empty; - Action dispose = default; + Action disposeAction = default; var connection = SerialPortManager.ReserveConnection(portName); SerialDataReceivedEventHandler dataReceivedHandler; var serialPort = connection.SerialPort; @@ -79,13 +79,16 @@ public static IObservable ReadLine(string portName, string newLine) } finally { + // We need a volatile read here to prevent reordering of + // instructions on access to the shared dispose delegate + var dispose = Volatile.Read(ref disposeAction); if (dispose != null) { // If we reach this branch, we might be in deadlock // so we share the responsibility of disposing the // serial port. - dispose?.Invoke(); - dispose = null; + dispose(); + disposeAction = null; } } }; @@ -94,15 +97,19 @@ public static IObservable ReadLine(string portName, string newLine) { connection.SerialPort.DataReceived -= dataReceivedHandler; - // Arm the dispose call. We should not need a memory barrier here - // since both threads are already sharing a lock. - dispose = connection.Dispose; + // Arm the dispose call. We do not need a memory barrier here + // since both threads are sharing full mutexes and stores + // will be eventually updated (we don't care exactly when) + disposeAction = connection.Dispose; + // We do an async spin lock until someone can dispose the serial port. + // Since the dispose call is idempotent it is enough to guarantee + // at-least-once semantics void TryDispose() { - // We do an async spin lock until someone can dispose the serial port. - // Since the dispose call is idempotent it is enough to guarantee - // at-least-once semantics + // Same as above we need a volatile read here to prevent + // reordering of instructions + var dispose = Volatile.Read(ref disposeAction); if (dispose == null) return; // The SerialPort class holds a lock on base stream to @@ -114,8 +121,8 @@ void TryDispose() // dispose the serial port try { - dispose?.Invoke(); - dispose = null; + dispose(); + disposeAction = null; } finally { Monitor.Exit(baseStream); } } From 2ffd27160866675ef568d2c74cbdf39aa53da187 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Fri, 30 Jun 2023 17:02:12 +0100 Subject: [PATCH 4/4] Ensure volatile write to shared variable --- Bonsai.System/IO/Ports/ObservableSerialPort.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Bonsai.System/IO/Ports/ObservableSerialPort.cs b/Bonsai.System/IO/Ports/ObservableSerialPort.cs index 39aebde87..fb5ba8bb3 100644 --- a/Bonsai.System/IO/Ports/ObservableSerialPort.cs +++ b/Bonsai.System/IO/Ports/ObservableSerialPort.cs @@ -88,7 +88,7 @@ public static IObservable ReadLine(string portName, string newLine) // so we share the responsibility of disposing the // serial port. dispose(); - disposeAction = null; + Volatile.Write(ref disposeAction, null); } } }; @@ -122,7 +122,7 @@ void TryDispose() try { dispose(); - disposeAction = null; + Volatile.Write(ref disposeAction, null); } finally { Monitor.Exit(baseStream); } }