diff --git a/src/libraries/System.Linq.Parallel/Directory.Build.props b/src/libraries/System.Linq.Parallel/Directory.Build.props index 7c0e0c24870de..e8d65546d0c80 100644 --- a/src/libraries/System.Linq.Parallel/Directory.Build.props +++ b/src/libraries/System.Linq.Parallel/Directory.Build.props @@ -2,6 +2,5 @@ Microsoft - browser diff --git a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Channels/AsynchronousChannel.cs b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Channels/AsynchronousChannel.cs index 105bbea7e4549..d0e60aff91e6d 100644 --- a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Channels/AsynchronousChannel.cs +++ b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Channels/AsynchronousChannel.cs @@ -17,6 +17,7 @@ namespace System.Linq.Parallel /// This is a bounded channel meant for single-producer/single-consumer scenarios. /// /// Specifies the type of data in the channel. + [System.Runtime.Versioning.UnsupportedOSPlatform("browser")] internal sealed class AsynchronousChannel : IDisposable { // The producer will be blocked once the channel reaches a capacity, and unblocked diff --git a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Merging/AsynchronousChannelMergeEnumerator.cs b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Merging/AsynchronousChannelMergeEnumerator.cs index 3432a14aef671..161fb73749130 100644 --- a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Merging/AsynchronousChannelMergeEnumerator.cs +++ b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Merging/AsynchronousChannelMergeEnumerator.cs @@ -26,6 +26,7 @@ namespace System.Linq.Parallel /// /// /// + [System.Runtime.Versioning.UnsupportedOSPlatform("browser")] internal sealed class AsynchronousChannelMergeEnumerator : MergeEnumerator { private readonly AsynchronousChannel[] _channels; // The channels being enumerated. @@ -220,6 +221,7 @@ private bool MoveNextSlowPath() break; } + Debug.Assert(!ParallelEnumerable.SinglePartitionMode); Debug.Assert(_consumerEvent != null); //This Wait() does not require cancellation support as it will wake up when all the producers into the //channel have finished. Hence, if all the producers wake up on cancellation, so will this. diff --git a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Merging/DefaultMergeHelper.cs b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Merging/DefaultMergeHelper.cs index c318b3b5954e0..26058969e63c5 100644 --- a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Merging/DefaultMergeHelper.cs +++ b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Merging/DefaultMergeHelper.cs @@ -64,6 +64,7 @@ internal DefaultMergeHelper(PartitionedStream partitio { if (partitions.PartitionCount > 1) { + Debug.Assert(!ParallelEnumerable.SinglePartitionMode); _asyncChannels = MergeExecutor.MakeAsynchronousChannels(partitions.PartitionCount, options, consumerEvent, cancellationState.MergedCancellationToken); _channelEnumerator = new AsynchronousChannelMergeEnumerator(_taskGroupState, _asyncChannels, consumerEvent); @@ -99,6 +100,7 @@ void IMergeHelper.Execute() { if (_asyncChannels != null) { + Debug.Assert(!ParallelEnumerable.SinglePartitionMode); SpoolingTask.SpoolPipeline(_taskGroupState, _partitions, _asyncChannels, _taskScheduler); } else if (_syncChannels != null) diff --git a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Merging/MergeExecutor.cs b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Merging/MergeExecutor.cs index 984ae2f014a28..5a18d6fd93ca6 100644 --- a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Merging/MergeExecutor.cs +++ b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Merging/MergeExecutor.cs @@ -64,6 +64,7 @@ internal static MergeExecutor Execute( if (partitions.PartitionCount > 1) { + Debug.Assert(!ParallelEnumerable.SinglePartitionMode); // We use a pipelining ordered merge mergeExecutor._mergeHelper = new OrderPreservingPipeliningMergeHelper( partitions, taskScheduler, cancellationState, autoBuffered, queryId, partitions.KeyComparer); @@ -140,6 +141,7 @@ public IEnumerator GetEnumerator() // An array of asynchronous channels, one for each partition. // + [System.Runtime.Versioning.UnsupportedOSPlatform("browser")] internal static AsynchronousChannel[] MakeAsynchronousChannels(int partitionCount, ParallelMergeOptions options, IntValueEvent? consumerEvent, CancellationToken cancellationToken) { AsynchronousChannel[] channels = new AsynchronousChannel[partitionCount]; diff --git a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Merging/OrderPreservingPipeliningMergeHelper.cs b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Merging/OrderPreservingPipeliningMergeHelper.cs index 4153cf39a875c..ddf0377cc9397 100644 --- a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Merging/OrderPreservingPipeliningMergeHelper.cs +++ b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Merging/OrderPreservingPipeliningMergeHelper.cs @@ -36,6 +36,7 @@ namespace System.Linq.Parallel /// Finally, if the producer notices that its buffer has exceeded an even greater threshold, it will /// go to sleep and wait until the consumer takes the entire buffer. /// + [System.Runtime.Versioning.UnsupportedOSPlatform("browser")] internal sealed class OrderPreservingPipeliningMergeHelper : IMergeHelper { private readonly QueryTaskGroupState _taskGroupState; // State shared among tasks. @@ -354,12 +355,14 @@ private void ThrowIfInTearDown() { // Wake up all producers. Since the cancellation token has already been // set, the producers will eventually stop after waking up. - object[] locks = _mergeHelper._bufferLocks; - for (int i = 0; i < locks.Length; i++) - { - lock (locks[i]) + if (!ParallelEnumerable.SinglePartitionMode) { + object[] locks = _mergeHelper._bufferLocks; + for (int i = 0; i < locks.Length; i++) { - Monitor.Pulse(locks[i]); + lock (locks[i]) + { + Monitor.Pulse(locks[i]); + } } } @@ -398,6 +401,9 @@ private bool TryWaitForElement(int producer, ref Pair element) return false; } + if (ParallelEnumerable.SinglePartitionMode) + return false; + _mergeHelper._consumerWaiting[producer] = true; Monitor.Wait(bufferLock); @@ -416,6 +422,7 @@ private bool TryWaitForElement(int producer, ref Pair element) // If the producer is waiting, wake it up if (_mergeHelper._producerWaiting[producer]) { + Debug.Assert(!ParallelEnumerable.SinglePartitionMode); Monitor.Pulse(bufferLock); _mergeHelper._producerWaiting[producer] = false; } @@ -469,15 +476,17 @@ private bool TryGetPrivateElement(int producer, ref Pair element) public override void Dispose() { // Wake up any waiting producers - int partitionCount = _mergeHelper._buffers.Length; - for (int producer = 0; producer < partitionCount; producer++) - { - object bufferLock = _mergeHelper._bufferLocks[producer]; - lock (bufferLock) + if (!ParallelEnumerable.SinglePartitionMode) { + int partitionCount = _mergeHelper._buffers.Length; + for (int producer = 0; producer < partitionCount; producer++) { - if (_mergeHelper._producerWaiting[producer]) + object bufferLock = _mergeHelper._bufferLocks[producer]; + lock (bufferLock) { - Monitor.Pulse(bufferLock); + if (_mergeHelper._producerWaiting[producer]) + { + Monitor.Pulse(bufferLock); + } } } } diff --git a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Partitioning/HashRepartitionEnumerator.cs b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Partitioning/HashRepartitionEnumerator.cs index 466102746d15a..5a9ecf5eda2a2 100644 --- a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Partitioning/HashRepartitionEnumerator.cs +++ b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Partitioning/HashRepartitionEnumerator.cs @@ -83,6 +83,9 @@ internal HashRepartitionEnumerator( _barrier = barrier; _valueExchangeMatrix = valueExchangeMatrix; _cancellationToken = cancellationToken; + + if (ParallelEnumerable.SinglePartitionMode) + Debug.Assert(partitionCount == 1); } //--------------------------------------------------------------------------------------- @@ -120,6 +123,8 @@ internal override bool MoveNext(ref Pair currentElement, return false; } + Debug.Assert(!ParallelEnumerable.SinglePartitionMode); + Mutables? mutables = _mutables; if (mutables == null) mutables = _mutables = new Mutables(); diff --git a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Partitioning/OrderedHashRepartitionEnumerator.cs b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Partitioning/OrderedHashRepartitionEnumerator.cs index b82e73c276f23..477f0f1838942 100644 --- a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Partitioning/OrderedHashRepartitionEnumerator.cs +++ b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Partitioning/OrderedHashRepartitionEnumerator.cs @@ -121,6 +121,8 @@ internal override bool MoveNext(ref Pair currentElement, return false; } + Debug.Assert(!ParallelEnumerable.SinglePartitionMode); + Mutables? mutables = _mutables; if (mutables == null) mutables = _mutables = new Mutables(); diff --git a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/DefaultIfEmptyQueryOperator.cs b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/DefaultIfEmptyQueryOperator.cs index 31f1a24f95192..78d145631e939 100644 --- a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/DefaultIfEmptyQueryOperator.cs +++ b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/DefaultIfEmptyQueryOperator.cs @@ -58,6 +58,8 @@ internal override void WrapPartitionedStream( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; + if (ParallelEnumerable.SinglePartitionMode) + Debug.Assert(partitionCount == 1); // Generate the shared data. Shared sharedEmptyCount = new Shared(0); @@ -153,7 +155,13 @@ internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TSource cu if (!moveNextResult) { - if (_partitionIndex == 0) + if (ParallelEnumerable.SinglePartitionMode) + { + currentElement = _defaultValue; + currentKey = default(TKey)!; + return true; + } + else if (_partitionIndex == 0) { // If this is the 0th partition, we must wait for all others. Note: we could // actually do a wait-any here: if at least one other partition finds an element, diff --git a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/FirstQueryOperator.cs b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/FirstQueryOperator.cs index e7b1e1c94a0c7..da5f61f35206e 100644 --- a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/FirstQueryOperator.cs +++ b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/FirstQueryOperator.cs @@ -72,6 +72,8 @@ private void WrapHelper( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; + if (ParallelEnumerable.SinglePartitionMode) + Debug.Assert(partitionCount == 1); // Generate the shared data. FirstQueryOperatorState operatorState = new FirstQueryOperatorState(); @@ -200,9 +202,11 @@ internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TSource cu } finally { - // No matter whether we exit due to an exception or normal completion, we must ensure - // that we signal other partitions that we have completed. Otherwise, we can cause deadlocks. - _sharedBarrier.Signal(); + if (!ParallelEnumerable.SinglePartitionMode) { + // No matter whether we exit due to an exception or normal completion, we must ensure + // that we signal other partitions that we have completed. Otherwise, we can cause deadlocks. + _sharedBarrier.Signal(); + } } _alreadySearched = true; @@ -210,7 +214,8 @@ internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TSource cu // Wait only if we may have the result if (_partitionId == _operatorState._partitionId) { - _sharedBarrier.Wait(_cancellationToken); + if (!ParallelEnumerable.SinglePartitionMode) + _sharedBarrier.Wait(_cancellationToken); // Now re-read the shared index. If it's the same as ours, we won and return true. if (_partitionId == _operatorState._partitionId) diff --git a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/LastQueryOperator.cs b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/LastQueryOperator.cs index e8153f3240e38..6d3de6b44b96c 100644 --- a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/LastQueryOperator.cs +++ b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/LastQueryOperator.cs @@ -72,6 +72,8 @@ internal override void WrapPartitionedStream( private void WrapHelper(PartitionedStream inputStream, IPartitionedStreamRecipient recipient, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; + if (ParallelEnumerable.SinglePartitionMode) + Debug.Assert(partitionCount == 1); // Generate the shared data. LastQueryOperatorState operatorState = new LastQueryOperatorState(); @@ -212,7 +214,8 @@ internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TSource cu // Only if we have a candidate do we wait. if (_partitionId == _operatorState._partitionId) { - _sharedBarrier.Wait(_cancellationToken); + if (!ParallelEnumerable.SinglePartitionMode) + _sharedBarrier.Wait(_cancellationToken); // Now re-read the shared index. If it's the same as ours, we won and return true. if (_operatorState._partitionId == _partitionId) diff --git a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/TakeOrSkipQueryOperator.cs b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/TakeOrSkipQueryOperator.cs index b8c1188ada9f3..586a301cf1059 100644 --- a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/TakeOrSkipQueryOperator.cs +++ b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/TakeOrSkipQueryOperator.cs @@ -108,6 +108,9 @@ private void WrapHelper(PartitionedStream inputStream, IPar FixedMaxHeap sharedIndices = new FixedMaxHeap(_count, inputStream.KeyComparer); // an array used to track the sequence of indices leading up to the Nth index CountdownEvent sharedBarrier = new CountdownEvent(partitionCount); // a barrier to synchronize before yielding + if (ParallelEnumerable.SinglePartitionMode) + Debug.Assert(partitionCount == 1); + PartitionedStream outputStream = new PartitionedStream(partitionCount, inputStream.KeyComparer, OrdinalIndexState); for (int i = 0; i < partitionCount; i++) @@ -222,9 +225,11 @@ internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TResult cu } } - // Before exiting the search phase, we will synchronize with others. This is a barrier. - _sharedBarrier.Signal(); - _sharedBarrier.Wait(_cancellationToken); + if (!ParallelEnumerable.SinglePartitionMode) { + // Before exiting the search phase, we will synchronize with others. This is a barrier. + _sharedBarrier.Signal(); + _sharedBarrier.Wait(_cancellationToken); + } // Publish the buffer and set the index to just before the 1st element. _buffer = buffer; diff --git a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/TakeOrSkipWhileQueryOperator.cs b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/TakeOrSkipWhileQueryOperator.cs index 1e5f51591dcfe..3624dfb29fc56 100644 --- a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/TakeOrSkipWhileQueryOperator.cs +++ b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Unary/TakeOrSkipWhileQueryOperator.cs @@ -127,6 +127,8 @@ internal override void WrapPartitionedStream( private void WrapHelper(PartitionedStream inputStream, IPartitionedStreamRecipient recipient, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; + if (ParallelEnumerable.SinglePartitionMode) + Debug.Assert(partitionCount == 1); // Create shared data. OperatorState operatorState = new OperatorState(); @@ -321,13 +323,16 @@ internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TResult cu } finally { - // No matter whether we exit due to an exception or normal completion, we must ensure - // that we signal other partitions that we have completed. Otherwise, we can cause deadlocks. - _sharedBarrier.Signal(); + if (!ParallelEnumerable.SinglePartitionMode) { + // No matter whether we exit due to an exception or normal completion, we must ensure + // that we signal other partitions that we have completed. Otherwise, we can cause deadlocks. + _sharedBarrier.Signal(); + } } // Before exiting the search phase, we will synchronize with others. This is a barrier. - _sharedBarrier.Wait(_cancellationToken); + if (!ParallelEnumerable.SinglePartitionMode) + _sharedBarrier.Wait(_cancellationToken); // Publish the buffer and set the index to just before the 1st element. _buffer = buffer; diff --git a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Scheduling/OrderPreservingPipeliningSpoolingTask.cs b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Scheduling/OrderPreservingPipeliningSpoolingTask.cs index ccac54907c319..b419c38ce7839 100644 --- a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Scheduling/OrderPreservingPipeliningSpoolingTask.cs +++ b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Scheduling/OrderPreservingPipeliningSpoolingTask.cs @@ -18,6 +18,7 @@ namespace System.Linq.Parallel { + [System.Runtime.Versioning.UnsupportedOSPlatform("browser")] internal sealed class OrderPreservingPipeliningSpoolingTask : SpoolingTaskBase { private readonly QueryTaskGroupState _taskGroupState; // State shared among tasks. diff --git a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Scheduling/SpoolingTask.cs b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Scheduling/SpoolingTask.cs index 440be3d9f9ce0..55cfdc28c6682 100644 --- a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Scheduling/SpoolingTask.cs +++ b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Scheduling/SpoolingTask.cs @@ -82,6 +82,7 @@ internal static void SpoolStopAndGo( // taskScheduler - the task manager on which to execute // + [System.Runtime.Versioning.UnsupportedOSPlatform("browser")] internal static void SpoolPipeline( QueryTaskGroupState groupState, PartitionedStream partitions, AsynchronousChannel[] channels, TaskScheduler taskScheduler) @@ -264,6 +265,7 @@ protected override void SpoolingFinally() /// /// /// + [System.Runtime.Versioning.UnsupportedOSPlatform("browser")] internal sealed class PipelineSpoolingTask : SpoolingTaskBase { // The data source from which to pull data. diff --git a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Utils/Sorting.cs b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Utils/Sorting.cs index 6461b11a1bb26..f2f8f5135a7e4 100644 --- a/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Utils/Sorting.cs +++ b/src/libraries/System.Linq.Parallel/src/System/Linq/Parallel/Utils/Sorting.cs @@ -204,6 +204,7 @@ internal override TInputOutput[] Sort() // Step 3. Enter into the merging phases, each separated by several barriers. if (_partitionCount > 1) { + Debug.Assert(!ParallelEnumerable.SinglePartitionMode); // We only need to merge if there is more than 1 partition. MergeSortCooperatively(); } @@ -357,6 +358,7 @@ private void QuickSortIndicesInPlace(GrowingArray keys, List // negatively impact speedups. // + [System.Runtime.Versioning.UnsupportedOSPlatform("browser")] private void MergeSortCooperatively() { CancellationToken cancelToken = _groupState.CancellationState.MergedCancellationToken; diff --git a/src/libraries/System.Linq.Parallel/src/System/Linq/ParallelEnumerable.cs b/src/libraries/System.Linq.Parallel/src/System/Linq/ParallelEnumerable.cs index f4f870c316423..0bb4d6990a9f5 100644 --- a/src/libraries/System.Linq.Parallel/src/System/Linq/ParallelEnumerable.cs +++ b/src/libraries/System.Linq.Parallel/src/System/Linq/ParallelEnumerable.cs @@ -62,6 +62,11 @@ public static class ParallelEnumerable + "System.Collections.Generic.IEnumerable. To fix this problem, use the AsParallel() extension method " + "to convert the right data source to System.Linq.ParallelQuery."; + // When running in single partition mode, PLINQ operations will occur on a single partition and will not + // be executed in parallel, but will retain PLINQ semantics (exceptions wrapped as aggregates, etc). + [System.Runtime.Versioning.SupportedOSPlatformGuard("browser")] + internal static bool SinglePartitionMode => OperatingSystem.IsBrowser(); + //----------------------------------------------------------------------------------- // Converts any IEnumerable into something that can be the target of parallel // query execution. diff --git a/src/libraries/tests.proj b/src/libraries/tests.proj index 2788d0e5e5bfe..60b966eaa76ce 100644 --- a/src/libraries/tests.proj +++ b/src/libraries/tests.proj @@ -261,8 +261,6 @@ - -