Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce code size impact of ArrayPool<T> #97058

Merged
merged 2 commits into from
Jan 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,24 @@ internal sealed partial class SharedArrayPool<T> : ArrayPool<T>

/// <summary>A per-thread array of arrays, to cache one array per array size per thread.</summary>
[ThreadStatic]
private static ThreadLocalArray[]? t_tlsBuckets;
private static SharedArrayPoolThreadLocalArray[]? t_tlsBuckets;
/// <summary>Used to keep track of all thread local buckets for trimming if needed.</summary>
private readonly ConditionalWeakTable<ThreadLocalArray[], object?> _allTlsBuckets = new ConditionalWeakTable<ThreadLocalArray[], object?>();
private readonly ConditionalWeakTable<SharedArrayPoolThreadLocalArray[], object?> _allTlsBuckets = new ConditionalWeakTable<SharedArrayPoolThreadLocalArray[], object?>();
/// <summary>
/// An array of per-core partitions. The slots are lazily initialized to avoid creating
/// lots of overhead for unused array sizes.
/// </summary>
private readonly Partitions?[] _buckets = new Partitions[NumBuckets];
private readonly SharedArrayPoolPartitions?[] _buckets = new SharedArrayPoolPartitions[NumBuckets];
/// <summary>Whether the callback to trim arrays in response to memory pressure has been created.</summary>
private int _trimCallbackCreated;

/// <summary>Allocate a new <see cref="Partitions"/> and try to store it into the <see cref="_buckets"/> array.</summary>
private Partitions CreatePerCorePartitions(int bucketIndex)
/// <summary>Allocate a new <see cref="SharedArrayPoolPartitions"/> and try to store it into the <see cref="_buckets"/> array.</summary>
private unsafe SharedArrayPoolPartitions CreatePerCorePartitions(int bucketIndex)
{
var inst = new Partitions();
#pragma warning disable 8500 // sizeof of managed types
int elementSize = sizeof(T);
#pragma warning restore 8500
var inst = new SharedArrayPoolPartitions(elementSize);
return Interlocked.CompareExchange(ref _buckets[bucketIndex], inst, null) ?? inst;
}

Expand All @@ -57,10 +60,10 @@ public override T[] Rent(int minimumLength)
int bucketIndex = Utilities.SelectBucketIndex(minimumLength);

// First, try to get an array from TLS if possible.
ThreadLocalArray[]? tlsBuckets = t_tlsBuckets;
SharedArrayPoolThreadLocalArray[]? tlsBuckets = t_tlsBuckets;
if (tlsBuckets is not null && (uint)bucketIndex < (uint)tlsBuckets.Length)
{
buffer = tlsBuckets[bucketIndex].Array;
buffer = Unsafe.As<T[]?>(tlsBuckets[bucketIndex].Array);
if (buffer is not null)
{
tlsBuckets[bucketIndex].Array = null;
Expand All @@ -73,13 +76,13 @@ public override T[] Rent(int minimumLength)
}

// Next, try to get an array from one of the partitions.
Partitions?[] perCoreBuckets = _buckets;
SharedArrayPoolPartitions?[] perCoreBuckets = _buckets;
if ((uint)bucketIndex < (uint)perCoreBuckets.Length)
{
Partitions? b = perCoreBuckets[bucketIndex];
SharedArrayPoolPartitions? b = perCoreBuckets[bucketIndex];
if (b is not null)
{
buffer = b.TryPop();
buffer = Unsafe.As<T[]?>(b.TryPop());
if (buffer is not null)
{
if (log.IsEnabled())
Expand Down Expand Up @@ -133,7 +136,7 @@ public override void Return(T[] array, bool clearArray = false)
// this if the array being returned is erroneous or too large for the pool, but the
// former condition is an error we don't need to optimize for, and the latter is incredibly
// rare, given a max size of 1B elements.
ThreadLocalArray[] tlsBuckets = t_tlsBuckets ?? InitializeTlsBucketsAndTrimming();
SharedArrayPoolThreadLocalArray[] tlsBuckets = t_tlsBuckets ?? InitializeTlsBucketsAndTrimming();

bool haveBucket = false;
bool returned = true;
Expand All @@ -156,12 +159,12 @@ public override void Return(T[] array, bool clearArray = false)
// Store the array into the TLS bucket. If there's already an array in it,
// push that array down into the partitions, preferring to keep the latest
// one in TLS for better locality.
ref ThreadLocalArray tla = ref tlsBuckets[bucketIndex];
T[]? prev = tla.Array;
tla = new ThreadLocalArray(array);
ref SharedArrayPoolThreadLocalArray tla = ref tlsBuckets[bucketIndex];
Array? prev = tla.Array;
tla = new SharedArrayPoolThreadLocalArray(array);
if (prev is not null)
{
Partitions partitionsForArraySize = _buckets[bucketIndex] ?? CreatePerCorePartitions(bucketIndex);
SharedArrayPoolPartitions partitionsForArraySize = _buckets[bucketIndex] ?? CreatePerCorePartitions(bucketIndex);
returned = partitionsForArraySize.TryPush(prev);
}
}
Expand Down Expand Up @@ -193,7 +196,7 @@ public bool Trim()
}

// Trim each of the per-core buckets.
Partitions?[] perCoreBuckets = _buckets;
SharedArrayPoolPartitions?[] perCoreBuckets = _buckets;
for (int i = 0; i < perCoreBuckets.Length; i++)
{
perCoreBuckets[i]?.Trim(currentMilliseconds, Id, pressure, Utilities.GetMaxSizeForBucket(i));
Expand All @@ -209,16 +212,16 @@ public bool Trim()
{
if (!log.IsEnabled())
{
foreach (KeyValuePair<ThreadLocalArray[], object?> tlsBuckets in _allTlsBuckets)
foreach (KeyValuePair<SharedArrayPoolThreadLocalArray[], object?> tlsBuckets in _allTlsBuckets)
{
Array.Clear(tlsBuckets.Key);
}
}
else
{
foreach (KeyValuePair<ThreadLocalArray[], object?> tlsBuckets in _allTlsBuckets)
foreach (KeyValuePair<SharedArrayPoolThreadLocalArray[], object?> tlsBuckets in _allTlsBuckets)
{
ThreadLocalArray[] buckets = tlsBuckets.Key;
SharedArrayPoolThreadLocalArray[] buckets = tlsBuckets.Key;
for (int i = 0; i < buckets.Length; i++)
{
if (Interlocked.Exchange(ref buckets[i].Array, null) is T[] buffer)
Expand All @@ -241,9 +244,9 @@ public bool Trim()
_ => 30_000,
};

foreach (KeyValuePair<ThreadLocalArray[], object?> tlsBuckets in _allTlsBuckets)
foreach (KeyValuePair<SharedArrayPoolThreadLocalArray[], object?> tlsBuckets in _allTlsBuckets)
{
ThreadLocalArray[] buckets = tlsBuckets.Key;
SharedArrayPoolThreadLocalArray[] buckets = tlsBuckets.Key;
for (int i = 0; i < buckets.Length; i++)
{
if (buckets[i].Array is null)
Expand Down Expand Up @@ -275,11 +278,11 @@ public bool Trim()
return true;
}

private ThreadLocalArray[] InitializeTlsBucketsAndTrimming()
private SharedArrayPoolThreadLocalArray[] InitializeTlsBucketsAndTrimming()
{
Debug.Assert(t_tlsBuckets is null, $"Non-null {nameof(t_tlsBuckets)}");

var tlsBuckets = new ThreadLocalArray[NumBuckets];
var tlsBuckets = new SharedArrayPoolThreadLocalArray[NumBuckets];
t_tlsBuckets = tlsBuckets;

_allTlsBuckets.Add(tlsBuckets, null);
Expand All @@ -290,90 +293,115 @@ private ThreadLocalArray[] InitializeTlsBucketsAndTrimming()

return tlsBuckets;
}
}

// The following partition types are separated out of SharedArrayPool<T> to avoid
// them being generic, in order to avoid the generic code size increase that would
// result, in particular for Native AOT. The only thing that's necessary to actually
// be generic is the return type of TryPop, and we can handle that at the access
// site with a well-placed Unsafe.As.

/// <summary>Provides a collection of partitions, each of which is a pool of arrays.</summary>
private sealed class Partitions
/// <summary>Wrapper for arrays stored in ThreadStatic buckets.</summary>
internal struct SharedArrayPoolThreadLocalArray
{
/// <summary>The stored array.</summary>
public Array? Array;
/// <summary>Environment.TickCount timestamp for when this array was observed by Trim.</summary>
public int MillisecondsTimeStamp;

public SharedArrayPoolThreadLocalArray(Array array)
{
/// <summary>The partitions.</summary>
private readonly Partition[] _partitions;
Array = array;
MillisecondsTimeStamp = 0;
}
}

/// <summary>Provides a collection of partitions, each of which is a pool of arrays.</summary>
internal sealed class SharedArrayPoolPartitions
{
/// <summary>The partitions.</summary>
private readonly Partition[] _partitions;

/// <summary>Initializes the partitions.</summary>
public Partitions()
/// <summary>Initializes the partitions.</summary>
/// <param name="elementSize">The size of the elements stored in arrays.</param>
public SharedArrayPoolPartitions(int elementSize)
{
// Create the partitions. We create as many as there are processors, limited by our max.
var partitions = new Partition[SharedArrayPoolStatics.s_partitionCount];
for (int i = 0; i < partitions.Length; i++)
{
// Create the partitions. We create as many as there are processors, limited by our max.
var partitions = new Partition[SharedArrayPoolStatics.s_partitionCount];
for (int i = 0; i < partitions.Length; i++)
{
partitions[i] = new Partition();
}
_partitions = partitions;
partitions[i] = new Partition(elementSize);
}
_partitions = partitions;
}

/// <summary>
/// Try to push the array into any partition with available space, starting with partition associated with the current core.
/// If all partitions are full, the array will be dropped.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryPush(T[] array)
/// <summary>
/// Try to push the array into any partition with available space, starting with partition associated with the current core.
/// If all partitions are full, the array will be dropped.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryPush(Array array)
{
// Try to push on to the associated partition first. If that fails,
// round-robin through the other partitions.
Partition[] partitions = _partitions;
int index = (int)((uint)Thread.GetCurrentProcessorId() % (uint)SharedArrayPoolStatics.s_partitionCount); // mod by constant in tier 1
for (int i = 0; i < partitions.Length; i++)
{
// Try to push on to the associated partition first. If that fails,
// round-robin through the other partitions.
Partition[] partitions = _partitions;
int index = (int)((uint)Thread.GetCurrentProcessorId() % (uint)SharedArrayPoolStatics.s_partitionCount); // mod by constant in tier 1
for (int i = 0; i < partitions.Length; i++)
{
if (partitions[index].TryPush(array)) return true;
if (++index == partitions.Length) index = 0;
}

return false;
if (partitions[index].TryPush(array)) return true;
if (++index == partitions.Length) index = 0;
}

/// <summary>
/// Try to pop an array from any partition with available arrays, starting with partition associated with the current core.
/// If all partitions are empty, null is returned.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public T[]? TryPop()
return false;
}

/// <summary>
/// Try to pop an array from any partition with available arrays, starting with partition associated with the current core.
/// If all partitions are empty, null is returned.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Array? TryPop()
{
// Try to pop from the associated partition first. If that fails, round-robin through the other partitions.
Array? arr;
Partition[] partitions = _partitions;
int index = (int)((uint)Thread.GetCurrentProcessorId() % (uint)SharedArrayPoolStatics.s_partitionCount); // mod by constant in tier 1
for (int i = 0; i < partitions.Length; i++)
{
// Try to pop from the associated partition first. If that fails, round-robin through the other partitions.
T[]? arr;
Partition[] partitions = _partitions;
int index = (int)((uint)Thread.GetCurrentProcessorId() % (uint)SharedArrayPoolStatics.s_partitionCount); // mod by constant in tier 1
for (int i = 0; i < partitions.Length; i++)
{
if ((arr = partitions[index].TryPop()) is not null) return arr;
if (++index == partitions.Length) index = 0;
}
return null;
if ((arr = partitions[index].TryPop()) is not null) return arr;
if (++index == partitions.Length) index = 0;
}
return null;
}

public void Trim(int currentMilliseconds, int id, Utilities.MemoryPressure pressure, int bucketSize)
public void Trim(int currentMilliseconds, int id, Utilities.MemoryPressure pressure, int bucketSize)
{
Partition[] partitions = _partitions;
for (int i = 0; i < partitions.Length; i++)
{
Partition[] partitions = _partitions;
for (int i = 0; i < partitions.Length; i++)
{
partitions[i].Trim(currentMilliseconds, id, pressure, bucketSize);
}
partitions[i].Trim(currentMilliseconds, id, pressure, bucketSize);
}
}

/// <summary>Provides a simple, bounded stack of arrays, protected by a lock.</summary>
private sealed class Partition
/// <param name="elementSize">The size of the elements stored in arrays.</param>
private sealed class Partition(int elementSize)
{
/// <summary>The arrays in the partition.</summary>
private readonly T[]?[] _arrays = new T[SharedArrayPoolStatics.s_maxArraysPerPartition][];
private readonly Array?[] _arrays = new Array[SharedArrayPoolStatics.s_maxArraysPerPartition];
/// <summary>The size of the elements stored in arrays.</summary>
private readonly int _elementSize = elementSize;
/// <summary>Number of arrays stored in <see cref="_arrays"/>.</summary>
private int _count;
/// <summary>Timestamp set by Trim when it sees this as 0.</summary>
private int _millisecondsTimestamp;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryPush(T[] array)
public bool TryPush(Array array)
{
bool enqueued = false;
Monitor.Enter(this);
T[]?[] arrays = _arrays;
Array?[] arrays = _arrays;
int count = _count;
if ((uint)count < (uint)arrays.Length)
{
Expand All @@ -384,7 +412,7 @@ public bool TryPush(T[] array)
_millisecondsTimestamp = 0;
}

arrays[count] = array;
Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(arrays), count) = array; // arrays[count] = array, but avoiding stelemref
_count = count + 1;
enqueued = true;
}
Expand All @@ -393,11 +421,11 @@ public bool TryPush(T[] array)
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public T[]? TryPop()
public Array? TryPop()
{
T[]? arr = null;
Array? arr = null;
Monitor.Enter(this);
T[]?[] arrays = _arrays;
Array?[] arrays = _arrays;
int count = _count - 1;
if ((uint)count < (uint)arrays.Length)
{
Expand Down Expand Up @@ -463,19 +491,17 @@ public void Trim(int currentMilliseconds, int id, Utilities.MemoryPressure press
{
trimCount++;
}
unsafe

if (_elementSize > ModerateTypeSize)
{
#pragma warning disable 8500 // sizeof of managed types
if (sizeof(T) > ModerateTypeSize)
{
trimCount++;
}
if (sizeof(T) > LargeTypeSize)
trimCount++;

if (_elementSize > LargeTypeSize)
{
trimCount++;
}
#pragma warning restore 8500
}

break;

case Utilities.MemoryPressure.Medium:
Expand All @@ -485,7 +511,7 @@ public void Trim(int currentMilliseconds, int id, Utilities.MemoryPressure press

while (_count > 0 && trimCount-- > 0)
{
T[]? array = _arrays[--_count];
Array? array = _arrays[--_count];
Debug.Assert(array is not null, "No nulls should have been present in slots < _count.");
_arrays[_count] = null;

Expand All @@ -501,21 +527,6 @@ public void Trim(int currentMilliseconds, int id, Utilities.MemoryPressure press
}
}
}

/// <summary>Wrapper for arrays stored in ThreadStatic buckets.</summary>
private struct ThreadLocalArray
{
/// <summary>The stored array.</summary>
public T[]? Array;
/// <summary>Environment.TickCount timestamp for when this array was observed by Trim.</summary>
public int MillisecondsTimeStamp;

public ThreadLocalArray(T[] array)
{
Array = array;
MillisecondsTimeStamp = 0;
}
}
}

internal static class SharedArrayPoolStatics
Expand Down
Loading