From 634646255b2e2caed17e29fa99a856c5bfa93bf9 Mon Sep 17 00:00:00 2001 From: Daniel Wertheim Date: Thu, 17 Oct 2019 13:51:16 +0200 Subject: [PATCH] Uses lock was taken logic (as lock would do) --- src/NATS.Client/Channel.cs | 60 +++++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/src/NATS.Client/Channel.cs b/src/NATS.Client/Channel.cs index 0ebf1d45a..9e04c8dd8 100644 --- a/src/NATS.Client/Channel.cs +++ b/src/NATS.Client/Channel.cs @@ -24,7 +24,7 @@ namespace NATS.Client // to be shared by any more than a single producer and single consumer. internal sealed class SingleUseChannel { - static readonly ConcurrentBag> Channels + static readonly ConcurrentBag> Channels = new ConcurrentBag>(); readonly ManualResetEventSlim e = new ManualResetEventSlim(); @@ -93,9 +93,9 @@ internal void reset() internal sealed class Channel { readonly Queue q; - readonly Object qLock = new Object(); + readonly Object qLock = new Object(); - bool finished = false; + bool finished = false; public string Name { get; set; } @@ -112,9 +112,12 @@ internal Channel(int initialCapacity) internal T get(int timeout) { - Monitor.Enter(qLock); + var lockWasTaken = false; + try { + Monitor.Enter(qLock, ref lockWasTaken); + if (finished) { return default(T); @@ -157,7 +160,8 @@ internal T get(int timeout) } finally { - Monitor.Exit(qLock); + if (lockWasTaken) + Monitor.Exit(qLock); } } // get @@ -168,9 +172,12 @@ internal int get(int timeout, T[] buffer) if (buffer.Length < 1) throw new ArgumentException(); int delivered = 0; - Monitor.Enter(qLock); + var lockWasTaken = false; + try { + Monitor.Enter(qLock, ref lockWasTaken); + if (finished) { return 0; @@ -226,15 +233,19 @@ internal int get(int timeout, T[] buffer) } finally { - Monitor.Exit(qLock); + if (lockWasTaken) + Monitor.Exit(qLock); } } // get internal void add(T item) { - Monitor.Enter(qLock); + var lockWasTaken = false; + try { + Monitor.Enter(qLock, ref lockWasTaken); + q.Enqueue(item); // if the queue count was previously zero, we were @@ -246,7 +257,8 @@ internal void add(T item) } finally { - Monitor.Exit(qLock); + if (lockWasTaken) + Monitor.Exit(qLock); } } @@ -254,9 +266,12 @@ internal void add(T item) // exceeded the given upper bound. internal bool tryAdd(T item, int upperBound) { - Monitor.Enter(qLock); + var lockWasTaken = false; + try { + Monitor.Enter(qLock, ref lockWasTaken); + if (q.Count >= upperBound) return false; @@ -273,15 +288,18 @@ internal bool tryAdd(T item, int upperBound) } finally { - Monitor.Exit(qLock); + if (lockWasTaken) + Monitor.Exit(qLock); } } internal void close() { - Monitor.Enter(qLock); + var lockWasTaken = false; + try { + Monitor.Enter(qLock, ref lockWasTaken); finished = true; @@ -291,7 +309,8 @@ internal void close() } finally { - Monitor.Exit(qLock); + if (lockWasTaken) + Monitor.Exit(qLock); } } @@ -300,10 +319,17 @@ internal int Count get { int rv; - - Monitor.Enter(qLock); - rv = q.Count; - Monitor.Exit(qLock); + var lockWasTaken = false; + try + { + Monitor.Enter(qLock, ref lockWasTaken); + rv = q.Count; + } + finally + { + if (lockWasTaken) + Monitor.Exit(qLock); + } return rv; }