Skip to content

Commit

Permalink
Add an option to do zero byte reads on StreamPipeReader (#49117)
Browse files Browse the repository at this point in the history
- Added UseZeroByteReads to StreamPipeReaderOptions that allows not allocating a buffer by doing a zero byte read on the underlying Stream before the internal buffer is allocated.
  • Loading branch information
davidfowl authored Mar 11, 2021
1 parent 09d0d04 commit 82ca681
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 20 deletions.
4 changes: 3 additions & 1 deletion src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@ public static partial class StreamPipeExtensions
}
public partial class StreamPipeReaderOptions
{
public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false) { }
public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool, int bufferSize, int minimumReadSize, bool leaveOpen) { }
public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false, bool useZeroByteReads = false) { }
public int BufferSize { get { throw null; } }
public bool LeaveOpen { get { throw null; } }
public int MinimumReadSize { get { throw null; } }
public System.Buffers.MemoryPool<byte> Pool { get { throw null; } }
public bool UseZeroByteReads { get { throw null; } }
}
public partial class StreamPipeWriterOptions
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ internal class StreamPipeReader : PipeReader
internal const int InitialSegmentPoolSize = 4; // 16K
internal const int MaxSegmentPoolSize = 256; // 1MB

private readonly int _bufferSize;
private readonly int _minimumReadThreshold;
private readonly MemoryPool<byte>? _pool;

private CancellationTokenSource? _internalTokenSource;
private bool _isReaderCompleted;
private bool _isStreamCompleted;
Expand All @@ -31,7 +27,8 @@ internal class StreamPipeReader : PipeReader

// Mutable struct! Don't make this readonly
private BufferSegmentStack _bufferSegmentPool;
private readonly bool _leaveOpen;

private StreamPipeReaderOptions _options;

/// <summary>
/// Creates a new StreamPipeReader.
Expand All @@ -47,13 +44,17 @@ public StreamPipeReader(Stream readingStream, StreamPipeReaderOptions options)
throw new ArgumentNullException(nameof(options));
}

_options = options;
_bufferSegmentPool = new BufferSegmentStack(InitialSegmentPoolSize);
_minimumReadThreshold = Math.Min(options.MinimumReadSize, options.BufferSize);
_pool = options.Pool == MemoryPool<byte>.Shared ? null : options.Pool;
_bufferSize = _pool == null ? options.BufferSize : Math.Min(options.BufferSize, _pool.MaxBufferSize);
_leaveOpen = options.LeaveOpen;
}

// All derived from the options
private bool LeaveOpen => _options.LeaveOpen;
private bool UseZeroByteReads => _options.UseZeroByteReads;
private int BufferSize => _options.BufferSize;
private int MinimumReadThreshold => _options.MinimumReadSize;
private MemoryPool<byte> Pool => _options.Pool;

/// <summary>
/// Gets the inner stream that is being read from.
/// </summary>
Expand Down Expand Up @@ -180,7 +181,7 @@ public override void Complete(Exception? exception = null)
returnSegment.ResetMemory();
}

if (!_leaveOpen)
if (!LeaveOpen)
{
InnerStream.Dispose();
}
Expand Down Expand Up @@ -215,6 +216,13 @@ public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancella
var isCanceled = false;
try
{
// This optimization only makes sense if we don't have anything buffered
if (UseZeroByteReads && _bufferedBytes == 0)
{
// Wait for data by doing 0 byte read before
await InnerStream.ReadAsync(Memory<byte>.Empty, cancellationToken).ConfigureAwait(false);
}

AllocateReadTail();

Memory<byte> buffer = _readTail!.AvailableMemory.Slice(_readTail.End);
Expand Down Expand Up @@ -296,7 +304,7 @@ private bool TryReadInternal(CancellationTokenSource source, out ReadResult resu

private ReadOnlySequence<byte> GetCurrentReadOnlySequence()
{
Debug.Assert(_readHead != null &&_readTail != null);
Debug.Assert(_readHead != null && _readTail != null);
return new ReadOnlySequence<byte>(_readHead, _readIndex, _readTail, _readTail.End);
}

Expand All @@ -311,7 +319,7 @@ private void AllocateReadTail()
else
{
Debug.Assert(_readTail != null);
if (_readTail.WritableBytes < _minimumReadThreshold)
if (_readTail.WritableBytes < MinimumReadThreshold)
{
BufferSegment nextSegment = AllocateSegment();
_readTail.SetNext(nextSegment);
Expand All @@ -324,13 +332,13 @@ private BufferSegment AllocateSegment()
{
BufferSegment nextSegment = CreateSegmentUnsynchronized();

if (_pool is null)
if (_options.IsDefaultSharedMemoryPool)
{
nextSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(_bufferSize));
nextSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(BufferSize));
}
else
{
nextSegment.SetOwnedMemory(_pool.Rent(_bufferSize));
nextSegment.SetOwnedMemory(Pool.Rent(BufferSize));
}

return nextSegment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,24 @@ public class StreamPipeReaderOptions
/// <param name="bufferSize">The minimum buffer size to use when renting memory from the <paramref name="pool" />. The default value is 4096.</param>
/// <param name="minimumReadSize">The threshold of remaining bytes in the buffer before a new buffer is allocated. The default value is 1024.</param>
/// <param name="leaveOpen"><see langword="true" /> to leave the underlying stream open after the <see cref="System.IO.Pipelines.PipeReader" /> completes; <see langword="false" /> to close it. The default is <see langword="false" />.</param>
public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false)
public StreamPipeReaderOptions(MemoryPool<byte>? pool, int bufferSize, int minimumReadSize, bool leaveOpen) :
this(pool, bufferSize, minimumReadSize, leaveOpen, useZeroByteReads: false)
{

}

/// <summary>Initializes a <see cref="System.IO.Pipelines.StreamPipeReaderOptions" /> instance, optionally specifying a memory pool, a minimum buffer size, a minimum read size, and whether the underlying stream should be left open after the <see cref="System.IO.Pipelines.PipeReader" /> completes.</summary>
/// <param name="pool">The memory pool to use when allocating memory. The default value is <see langword="null" />.</param>
/// <param name="bufferSize">The minimum buffer size to use when renting memory from the <paramref name="pool" />. The default value is 4096.</param>
/// <param name="minimumReadSize">The threshold of remaining bytes in the buffer before a new buffer is allocated. The default value is 1024.</param>
/// <param name="leaveOpen"><see langword="true" /> to leave the underlying stream open after the <see cref="System.IO.Pipelines.PipeReader" /> completes; <see langword="false" /> to close it. The default is <see langword="false" />.</param>
/// <param name="useZeroByteReads"><see langword="true" /> if reads with an empty buffer should be issued to the underlying stream before allocating memory; otherwise, <see langword="false" />.</param>
public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false, bool useZeroByteReads = false)
{
Pool = pool ?? MemoryPool<byte>.Shared;

IsDefaultSharedMemoryPool = Pool == MemoryPool<byte>.Shared;

BufferSize =
bufferSize == -1 ? DefaultBufferSize :
bufferSize <= 0 ? throw new ArgumentOutOfRangeException(nameof(bufferSize)) :
Expand All @@ -33,6 +47,8 @@ public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -
minimumReadSize;

LeaveOpen = leaveOpen;

UseZeroByteReads = useZeroByteReads;
}

/// <summary>Gets the minimum buffer size to use when renting memory from the <see cref="System.IO.Pipelines.StreamPipeReaderOptions.Pool" />.</summary>
Expand All @@ -50,5 +66,14 @@ public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -
/// <summary>Gets the value that indicates if the underlying stream should be left open after the <see cref="System.IO.Pipelines.PipeReader" /> completes.</summary>
/// <value><see langword="true" /> if the underlying stream should be left open after the <see cref="System.IO.Pipelines.PipeReader" /> completes; otherwise, <see langword="false" />.</value>
public bool LeaveOpen { get; }

/// <summary>Gets the value that indicates if reads with an empty buffer should be issued to the underlying stream, in order to wait for data to arrive before allocating memory.</summary>
/// <value><see langword="true" /> if reads with an empty buffer should be issued to the underlying stream before allocating memory; otherwise, <see langword="false" />.</value>
public bool UseZeroByteReads { get; }

/// <summary>
/// Returns true if Pool is <see cref="MemoryPool{Byte}"/>.Shared
/// </summary>
internal bool IsDefaultSharedMemoryPool { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ public async Task TryReadReturnsFalseIfBufferedBytesAndEverythingExamined()
reader.Complete();
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task CanReadMultipleTimes()
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[InlineData(false)]
[InlineData(true)]
public async Task CanReadMultipleTimes(bool useZeroByteReads)
{
// This needs to run inline to synchronize the reader and writer
TaskCompletionSource<object> waitForRead = null;
Expand Down Expand Up @@ -109,7 +111,7 @@ async Task DoAsyncWrites(PipeWriter writer, int[] bufferSizes)

// We're using the pipe here as a way to pump bytes into the reader asynchronously
var pipe = new Pipe();
var options = new StreamPipeReaderOptions(bufferSize: 4096);
var options = new StreamPipeReaderOptions(bufferSize: 4096, useZeroByteReads: useZeroByteReads);
PipeReader reader = PipeReader.Create(pipe.Reader.AsStream(), options);

var writes = new[] { 4096, 1024, 123, 4096, 100 };
Expand Down

0 comments on commit 82ca681

Please sign in to comment.