Skip to content

Commit

Permalink
Reduce code size impact of ArrayPool<T> (dotnet#97058)
Browse files Browse the repository at this point in the history
* Reduce code size impact of `ArrayPool<T>`

Every `SharedArrayPool<T>` brings with it several types, including several array types and all of their interface implementations. This makes the internals of SharedArrayPool a bit less generic to try to reduce that impact.  The changes are effectively:
- Move the nested Partition, Partitions, and ThreadLocalArray types out from being nested types to being peers
- Rename them, since they're no longer inheriting the parent's name
- Make them non-generic in terms of Array rather than generic in terms of T[]. These types never index into the array, so other than accessing Length for logging purposes, it could even have used object.
- Use Unsafe.As in the two places arrays are dequeued and need to be T[] rather than Array.
- Pass in the sizeof(T) rather than using it in the implementation. The size is used only when trimming to determine how much to trim.

* Fix throughput regression from covariance check
  • Loading branch information
stephentoub authored and tmds committed Jan 23, 2024
1 parent d1af285 commit 0ab6cfc
Showing 1 changed file with 119 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,24 @@ internal sealed partial class SharedArrayPool<T> : ArrayPool<T>

/// <summary>A per-thread array of arrays, to cache one array per array size per thread.</summary>
[ThreadStatic]
private static ThreadLocalArray[]? t_tlsBuckets;
private static SharedArrayPoolThreadLocalArray[]? t_tlsBuckets;
/// <summary>Used to keep track of all thread local buckets for trimming if needed.</summary>
private readonly ConditionalWeakTable<ThreadLocalArray[], object?> _allTlsBuckets = new ConditionalWeakTable<ThreadLocalArray[], object?>();
private readonly ConditionalWeakTable<SharedArrayPoolThreadLocalArray[], object?> _allTlsBuckets = new ConditionalWeakTable<SharedArrayPoolThreadLocalArray[], object?>();
/// <summary>
/// An array of per-core partitions. The slots are lazily initialized to avoid creating
/// lots of overhead for unused array sizes.
/// </summary>
private readonly Partitions?[] _buckets = new Partitions[NumBuckets];
private readonly SharedArrayPoolPartitions?[] _buckets = new SharedArrayPoolPartitions[NumBuckets];
/// <summary>Whether the callback to trim arrays in response to memory pressure has been created.</summary>
private int _trimCallbackCreated;

/// <summary>Allocate a new <see cref="Partitions"/> and try to store it into the <see cref="_buckets"/> array.</summary>
private Partitions CreatePerCorePartitions(int bucketIndex)
/// <summary>Allocate a new <see cref="SharedArrayPoolPartitions"/> and try to store it into the <see cref="_buckets"/> array.</summary>
private unsafe SharedArrayPoolPartitions CreatePerCorePartitions(int bucketIndex)
{
var inst = new Partitions();
#pragma warning disable 8500 // sizeof of managed types
int elementSize = sizeof(T);
#pragma warning restore 8500
var inst = new SharedArrayPoolPartitions(elementSize);
return Interlocked.CompareExchange(ref _buckets[bucketIndex], inst, null) ?? inst;
}

Expand All @@ -57,10 +60,10 @@ public override T[] Rent(int minimumLength)
int bucketIndex = Utilities.SelectBucketIndex(minimumLength);

// First, try to get an array from TLS if possible.
ThreadLocalArray[]? tlsBuckets = t_tlsBuckets;
SharedArrayPoolThreadLocalArray[]? tlsBuckets = t_tlsBuckets;
if (tlsBuckets is not null && (uint)bucketIndex < (uint)tlsBuckets.Length)
{
buffer = tlsBuckets[bucketIndex].Array;
buffer = Unsafe.As<T[]?>(tlsBuckets[bucketIndex].Array);
if (buffer is not null)
{
tlsBuckets[bucketIndex].Array = null;
Expand All @@ -73,13 +76,13 @@ public override T[] Rent(int minimumLength)
}

// Next, try to get an array from one of the partitions.
Partitions?[] perCoreBuckets = _buckets;
SharedArrayPoolPartitions?[] perCoreBuckets = _buckets;
if ((uint)bucketIndex < (uint)perCoreBuckets.Length)
{
Partitions? b = perCoreBuckets[bucketIndex];
SharedArrayPoolPartitions? b = perCoreBuckets[bucketIndex];
if (b is not null)
{
buffer = b.TryPop();
buffer = Unsafe.As<T[]?>(b.TryPop());
if (buffer is not null)
{
if (log.IsEnabled())
Expand Down Expand Up @@ -133,7 +136,7 @@ public override void Return(T[] array, bool clearArray = false)
// this if the array being returned is erroneous or too large for the pool, but the
// former condition is an error we don't need to optimize for, and the latter is incredibly
// rare, given a max size of 1B elements.
ThreadLocalArray[] tlsBuckets = t_tlsBuckets ?? InitializeTlsBucketsAndTrimming();
SharedArrayPoolThreadLocalArray[] tlsBuckets = t_tlsBuckets ?? InitializeTlsBucketsAndTrimming();

bool haveBucket = false;
bool returned = true;
Expand All @@ -156,12 +159,12 @@ public override void Return(T[] array, bool clearArray = false)
// Store the array into the TLS bucket. If there's already an array in it,
// push that array down into the partitions, preferring to keep the latest
// one in TLS for better locality.
ref ThreadLocalArray tla = ref tlsBuckets[bucketIndex];
T[]? prev = tla.Array;
tla = new ThreadLocalArray(array);
ref SharedArrayPoolThreadLocalArray tla = ref tlsBuckets[bucketIndex];
Array? prev = tla.Array;
tla = new SharedArrayPoolThreadLocalArray(array);
if (prev is not null)
{
Partitions partitionsForArraySize = _buckets[bucketIndex] ?? CreatePerCorePartitions(bucketIndex);
SharedArrayPoolPartitions partitionsForArraySize = _buckets[bucketIndex] ?? CreatePerCorePartitions(bucketIndex);
returned = partitionsForArraySize.TryPush(prev);
}
}
Expand Down Expand Up @@ -193,7 +196,7 @@ public bool Trim()
}

// Trim each of the per-core buckets.
Partitions?[] perCoreBuckets = _buckets;
SharedArrayPoolPartitions?[] perCoreBuckets = _buckets;
for (int i = 0; i < perCoreBuckets.Length; i++)
{
perCoreBuckets[i]?.Trim(currentMilliseconds, Id, pressure, Utilities.GetMaxSizeForBucket(i));
Expand All @@ -209,16 +212,16 @@ public bool Trim()
{
if (!log.IsEnabled())
{
foreach (KeyValuePair<ThreadLocalArray[], object?> tlsBuckets in _allTlsBuckets)
foreach (KeyValuePair<SharedArrayPoolThreadLocalArray[], object?> tlsBuckets in _allTlsBuckets)
{
Array.Clear(tlsBuckets.Key);
}
}
else
{
foreach (KeyValuePair<ThreadLocalArray[], object?> tlsBuckets in _allTlsBuckets)
foreach (KeyValuePair<SharedArrayPoolThreadLocalArray[], object?> tlsBuckets in _allTlsBuckets)
{
ThreadLocalArray[] buckets = tlsBuckets.Key;
SharedArrayPoolThreadLocalArray[] buckets = tlsBuckets.Key;
for (int i = 0; i < buckets.Length; i++)
{
if (Interlocked.Exchange(ref buckets[i].Array, null) is T[] buffer)
Expand All @@ -241,9 +244,9 @@ public bool Trim()
_ => 30_000,
};

foreach (KeyValuePair<ThreadLocalArray[], object?> tlsBuckets in _allTlsBuckets)
foreach (KeyValuePair<SharedArrayPoolThreadLocalArray[], object?> tlsBuckets in _allTlsBuckets)
{
ThreadLocalArray[] buckets = tlsBuckets.Key;
SharedArrayPoolThreadLocalArray[] buckets = tlsBuckets.Key;
for (int i = 0; i < buckets.Length; i++)
{
if (buckets[i].Array is null)
Expand Down Expand Up @@ -275,11 +278,11 @@ public bool Trim()
return true;
}

private ThreadLocalArray[] InitializeTlsBucketsAndTrimming()
private SharedArrayPoolThreadLocalArray[] InitializeTlsBucketsAndTrimming()
{
Debug.Assert(t_tlsBuckets is null, $"Non-null {nameof(t_tlsBuckets)}");

var tlsBuckets = new ThreadLocalArray[NumBuckets];
var tlsBuckets = new SharedArrayPoolThreadLocalArray[NumBuckets];
t_tlsBuckets = tlsBuckets;

_allTlsBuckets.Add(tlsBuckets, null);
Expand All @@ -290,90 +293,115 @@ private ThreadLocalArray[] InitializeTlsBucketsAndTrimming()

return tlsBuckets;
}
}

// The following partition types are separated out of SharedArrayPool<T> to avoid
// them being generic, in order to avoid the generic code size increase that would
// result, in particular for Native AOT. The only thing that's necessary to actually
// be generic is the return type of TryPop, and we can handle that at the access
// site with a well-placed Unsafe.As.

/// <summary>Provides a collection of partitions, each of which is a pool of arrays.</summary>
private sealed class Partitions
/// <summary>Wrapper for arrays stored in ThreadStatic buckets.</summary>
internal struct SharedArrayPoolThreadLocalArray
{
/// <summary>The stored array.</summary>
public Array? Array;
/// <summary>Environment.TickCount timestamp for when this array was observed by Trim.</summary>
public int MillisecondsTimeStamp;

public SharedArrayPoolThreadLocalArray(Array array)
{
/// <summary>The partitions.</summary>
private readonly Partition[] _partitions;
Array = array;
MillisecondsTimeStamp = 0;
}
}

/// <summary>Provides a collection of partitions, each of which is a pool of arrays.</summary>
internal sealed class SharedArrayPoolPartitions
{
/// <summary>The partitions.</summary>
private readonly Partition[] _partitions;

/// <summary>Initializes the partitions.</summary>
public Partitions()
/// <summary>Initializes the partitions.</summary>
/// <param name="elementSize">The size of the elements stored in arrays.</param>
public SharedArrayPoolPartitions(int elementSize)
{
// Create the partitions. We create as many as there are processors, limited by our max.
var partitions = new Partition[SharedArrayPoolStatics.s_partitionCount];
for (int i = 0; i < partitions.Length; i++)
{
// Create the partitions. We create as many as there are processors, limited by our max.
var partitions = new Partition[SharedArrayPoolStatics.s_partitionCount];
for (int i = 0; i < partitions.Length; i++)
{
partitions[i] = new Partition();
}
_partitions = partitions;
partitions[i] = new Partition(elementSize);
}
_partitions = partitions;
}

/// <summary>
/// Try to push the array into any partition with available space, starting with partition associated with the current core.
/// If all partitions are full, the array will be dropped.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryPush(T[] array)
/// <summary>
/// Try to push the array into any partition with available space, starting with partition associated with the current core.
/// If all partitions are full, the array will be dropped.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryPush(Array array)
{
// Try to push on to the associated partition first. If that fails,
// round-robin through the other partitions.
Partition[] partitions = _partitions;
int index = (int)((uint)Thread.GetCurrentProcessorId() % (uint)SharedArrayPoolStatics.s_partitionCount); // mod by constant in tier 1
for (int i = 0; i < partitions.Length; i++)
{
// Try to push on to the associated partition first. If that fails,
// round-robin through the other partitions.
Partition[] partitions = _partitions;
int index = (int)((uint)Thread.GetCurrentProcessorId() % (uint)SharedArrayPoolStatics.s_partitionCount); // mod by constant in tier 1
for (int i = 0; i < partitions.Length; i++)
{
if (partitions[index].TryPush(array)) return true;
if (++index == partitions.Length) index = 0;
}

return false;
if (partitions[index].TryPush(array)) return true;
if (++index == partitions.Length) index = 0;
}

/// <summary>
/// Try to pop an array from any partition with available arrays, starting with partition associated with the current core.
/// If all partitions are empty, null is returned.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public T[]? TryPop()
return false;
}

/// <summary>
/// Try to pop an array from any partition with available arrays, starting with partition associated with the current core.
/// If all partitions are empty, null is returned.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Array? TryPop()
{
// Try to pop from the associated partition first. If that fails, round-robin through the other partitions.
Array? arr;
Partition[] partitions = _partitions;
int index = (int)((uint)Thread.GetCurrentProcessorId() % (uint)SharedArrayPoolStatics.s_partitionCount); // mod by constant in tier 1
for (int i = 0; i < partitions.Length; i++)
{
// Try to pop from the associated partition first. If that fails, round-robin through the other partitions.
T[]? arr;
Partition[] partitions = _partitions;
int index = (int)((uint)Thread.GetCurrentProcessorId() % (uint)SharedArrayPoolStatics.s_partitionCount); // mod by constant in tier 1
for (int i = 0; i < partitions.Length; i++)
{
if ((arr = partitions[index].TryPop()) is not null) return arr;
if (++index == partitions.Length) index = 0;
}
return null;
if ((arr = partitions[index].TryPop()) is not null) return arr;
if (++index == partitions.Length) index = 0;
}
return null;
}

public void Trim(int currentMilliseconds, int id, Utilities.MemoryPressure pressure, int bucketSize)
public void Trim(int currentMilliseconds, int id, Utilities.MemoryPressure pressure, int bucketSize)
{
Partition[] partitions = _partitions;
for (int i = 0; i < partitions.Length; i++)
{
Partition[] partitions = _partitions;
for (int i = 0; i < partitions.Length; i++)
{
partitions[i].Trim(currentMilliseconds, id, pressure, bucketSize);
}
partitions[i].Trim(currentMilliseconds, id, pressure, bucketSize);
}
}

/// <summary>Provides a simple, bounded stack of arrays, protected by a lock.</summary>
private sealed class Partition
/// <param name="elementSize">The size of the elements stored in arrays.</param>
private sealed class Partition(int elementSize)
{
/// <summary>The arrays in the partition.</summary>
private readonly T[]?[] _arrays = new T[SharedArrayPoolStatics.s_maxArraysPerPartition][];
private readonly Array?[] _arrays = new Array[SharedArrayPoolStatics.s_maxArraysPerPartition];
/// <summary>The size of the elements stored in arrays.</summary>
private readonly int _elementSize = elementSize;
/// <summary>Number of arrays stored in <see cref="_arrays"/>.</summary>
private int _count;
/// <summary>Timestamp set by Trim when it sees this as 0.</summary>
private int _millisecondsTimestamp;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryPush(T[] array)
public bool TryPush(Array array)
{
bool enqueued = false;
Monitor.Enter(this);
T[]?[] arrays = _arrays;
Array?[] arrays = _arrays;
int count = _count;
if ((uint)count < (uint)arrays.Length)
{
Expand All @@ -384,7 +412,7 @@ public bool TryPush(T[] array)
_millisecondsTimestamp = 0;
}

arrays[count] = array;
Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(arrays), count) = array; // arrays[count] = array, but avoiding stelemref
_count = count + 1;
enqueued = true;
}
Expand All @@ -393,11 +421,11 @@ public bool TryPush(T[] array)
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public T[]? TryPop()
public Array? TryPop()
{
T[]? arr = null;
Array? arr = null;
Monitor.Enter(this);
T[]?[] arrays = _arrays;
Array?[] arrays = _arrays;
int count = _count - 1;
if ((uint)count < (uint)arrays.Length)
{
Expand Down Expand Up @@ -463,19 +491,17 @@ public void Trim(int currentMilliseconds, int id, Utilities.MemoryPressure press
{
trimCount++;
}
unsafe

if (_elementSize > ModerateTypeSize)
{
#pragma warning disable 8500 // sizeof of managed types
if (sizeof(T) > ModerateTypeSize)
{
trimCount++;
}
if (sizeof(T) > LargeTypeSize)
trimCount++;

if (_elementSize > LargeTypeSize)
{
trimCount++;
}
#pragma warning restore 8500
}

break;

case Utilities.MemoryPressure.Medium:
Expand All @@ -485,7 +511,7 @@ public void Trim(int currentMilliseconds, int id, Utilities.MemoryPressure press

while (_count > 0 && trimCount-- > 0)
{
T[]? array = _arrays[--_count];
Array? array = _arrays[--_count];
Debug.Assert(array is not null, "No nulls should have been present in slots < _count.");
_arrays[_count] = null;

Expand All @@ -501,21 +527,6 @@ public void Trim(int currentMilliseconds, int id, Utilities.MemoryPressure press
}
}
}

/// <summary>Wrapper for arrays stored in ThreadStatic buckets.</summary>
private struct ThreadLocalArray
{
/// <summary>The stored array.</summary>
public T[]? Array;
/// <summary>Environment.TickCount timestamp for when this array was observed by Trim.</summary>
public int MillisecondsTimeStamp;

public ThreadLocalArray(T[] array)
{
Array = array;
MillisecondsTimeStamp = 0;
}
}
}

internal static class SharedArrayPoolStatics
Expand Down

0 comments on commit 0ab6cfc

Please sign in to comment.