Skip to content

Commit

Permalink
Merge pull request #1443 from glopesdev/issue-1437
Browse files Browse the repository at this point in the history
Avoid deadlock when disposing serial connection
  • Loading branch information
glopesdev committed Jun 30, 2023
2 parents 0ecf522 + 2ffd271 commit f100b8c
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 21 deletions.
89 changes: 74 additions & 15 deletions Bonsai.System/IO/Ports/ObservableSerialPort.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using System.Reactive.Disposables;
using System.IO.Ports;
using System.Text.RegularExpressions;
using System.Threading;
using System.Reactive.Concurrency;

namespace Bonsai.IO
{
Expand Down Expand Up @@ -48,35 +50,92 @@ public static IObservable<string> ReadLine(string portName, string newLine)
return Observable.Create<string>(observer =>
{
var data = string.Empty;
Action disposeAction = default;
var connection = SerialPortManager.ReserveConnection(portName);
SerialDataReceivedEventHandler dataReceivedHandler;
var serialPort = connection.SerialPort;
var baseStream = connection.SerialPort.BaseStream;
dataReceivedHandler = (sender, e) =>
{
switch (e.EventType)
try
{
case SerialData.Eof: observer.OnCompleted(); break;
case SerialData.Chars:
default:
if (serialPort.IsOpen && serialPort.BytesToRead > 0)
{
data += serialPort.ReadExisting();
var lines = data.Split(new[] { newLine }, StringSplitOptions.None);
for (int i = 0; i < lines.Length; i++)
switch (e.EventType)
{
case SerialData.Eof: observer.OnCompleted(); break;
case SerialData.Chars:
default:
if (serialPort.IsOpen && serialPort.BytesToRead > 0)
{
if (i == lines.Length - 1) data = lines[i];
else observer.OnNext(lines[i]);
data += serialPort.ReadExisting();
var lines = data.Split(new[] { newLine }, StringSplitOptions.None);
for (int i = 0; i < lines.Length; i++)
{
if (i == lines.Length - 1) data = lines[i];
else observer.OnNext(lines[i]);
}
}
}
break;
break;
}
}
finally
{
// We need a volatile read here to prevent reordering of
// instructions on access to the shared dispose delegate
var dispose = Volatile.Read(ref disposeAction);
if (dispose != null)
{
// If we reach this branch, we might be in deadlock
// so we share the responsibility of disposing the
// serial port.
dispose();
Volatile.Write(ref disposeAction, null);
}
}
};
connection.SerialPort.DataReceived += dataReceivedHandler;
return Disposable.Create(() =>
{
connection.SerialPort.DataReceived -= dataReceivedHandler;
connection.Dispose();
// Arm the dispose call. We do not need a memory barrier here
// since both threads are sharing full mutexes and stores
// will be eventually updated (we don't care exactly when)
disposeAction = connection.Dispose;
// We do an async spin lock until someone can dispose the serial port.
// Since the dispose call is idempotent it is enough to guarantee
// at-least-once semantics
void TryDispose()
{
// Same as above we need a volatile read here to prevent
// reordering of instructions
var dispose = Volatile.Read(ref disposeAction);
if (dispose == null) return;
// The SerialPort class holds a lock on base stream to
// ensure synchronization between calls to Dispose and
// calls to DataReceived handler
if (Monitor.TryEnter(baseStream))
{
// If we enter the critical section we can go ahead and
// dispose the serial port
try
{
dispose();
Volatile.Write(ref disposeAction, null);
}
finally { Monitor.Exit(baseStream); }
}
else
{
// If we reach this branch we may be in deadlock so we
// need to release this thread
DefaultScheduler.Instance.Schedule(TryDispose);
}
}
// Run the spin lock
TryDispose();
});
});
}
Expand Down
5 changes: 4 additions & 1 deletion Bonsai.System/IO/Ports/SerialPortDisposable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ public void Dispose()
var disposable = Interlocked.Exchange(ref resource, null);
if (disposable != null)
{
disposable.Dispose();
lock (SerialPortManager.SyncRoot)
{
disposable.Dispose();
}
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions Bonsai.System/IO/Ports/SerialPortManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ internal static class SerialPortManager
{
public const string DefaultConfigurationFile = "SerialPort.config";
static readonly bool IsRunningOnMono = Type.GetType("Mono.Runtime") != null;
static readonly Dictionary<string, Tuple<SerialPort, RefCountDisposable>> openConnections = new Dictionary<string, Tuple<SerialPort, RefCountDisposable>>();
static readonly object openConnectionsLock = new object();
static readonly Dictionary<string, Tuple<SerialPort, RefCountDisposable>> openConnections = new();
internal static readonly object SyncRoot = new();

public static SerialPortDisposable ReserveConnection(string portName)
{
Expand All @@ -30,7 +30,7 @@ internal static SerialPortDisposable ReserveConnection(string portName, SerialPo
}

Tuple<SerialPort, RefCountDisposable> connection;
lock (openConnectionsLock)
lock (SyncRoot)
{
if (!openConnections.TryGetValue(portName, out connection))
{
Expand Down Expand Up @@ -103,9 +103,9 @@ void ConfigureSerialPort(SerialPort serialPort)
openConnections.Add(portName, connection);
return new SerialPortDisposable(serialPort, refCount);
}
}

return new SerialPortDisposable(connection.Item1, connection.Item2.GetDisposable());
return new SerialPortDisposable(connection.Item1, connection.Item2.GetDisposable());
}
}

[Obsolete]
Expand Down

0 comments on commit f100b8c

Please sign in to comment.