Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(zip): fully implement async deflate #813

Merged
merged 6 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ICSharpCode.SharpZipLib/GZip/GzipInputStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ private void ReadFooter()
int crcval = (footer[0] & 0xff) | ((footer[1] & 0xff) << 8) | ((footer[2] & 0xff) << 16) | (footer[3] << 24);
if (crcval != (int)crc.Value)
{
throw new GZipException("GZIP crc sum mismatch, theirs \"" + crcval + "\" and ours \"" + (int)crc.Value);
throw new GZipException($"GZIP crc sum mismatch, theirs \"{crcval:x8}\" and ours \"{(int)crc.Value:x8}\"");
}

// NOTE The total here is the original total modulo 2 ^ 32.
Expand Down
70 changes: 52 additions & 18 deletions src/ICSharpCode.SharpZipLib/GZip/GzipOutputStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ public string FileName
}
}

/// <summary>
/// If defined, will use this time instead of the current for the output header
/// </summary>
public DateTime? ModifiedTime { get; set; }

#endregion Public API

#region Stream overrides
Expand All @@ -149,21 +154,47 @@ public string FileName
/// <param name="offset">Offset of first byte in buf to write</param>
/// <param name="count">Number of bytes to write</param>
public override void Write(byte[] buffer, int offset, int count)
=> WriteSyncOrAsync(buffer, offset, count, null).GetAwaiter().GetResult();

private async Task WriteSyncOrAsync(byte[] buffer, int offset, int count, CancellationToken? ct)
{
if (state_ == OutputState.Header)
{
WriteHeader();
if (ct.HasValue)
{
await WriteHeaderAsync(ct.Value).ConfigureAwait(false);
}
else
{
WriteHeader();
}
}

if (state_ != OutputState.Footer)
{
throw new InvalidOperationException("Write not permitted in current state");
}


crc.Update(new ArraySegment<byte>(buffer, offset, count));
base.Write(buffer, offset, count);

if (ct.HasValue)
{
await base.WriteAsync(buffer, offset, count, ct.Value).ConfigureAwait(false);
}
else
{
base.Write(buffer, offset, count);
}
}

/// <summary>
/// Asynchronously write given buffer to output updating crc
/// </summary>
/// <param name="buffer">Buffer to write</param>
/// <param name="offset">Offset of first byte in buf to write</param>
/// <param name="count">Number of bytes to write</param>
/// <param name="ct">The token to monitor for cancellation requests</param>
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken ct)
=> await WriteSyncOrAsync(buffer, offset, count, ct).ConfigureAwait(false);

/// <summary>
/// Writes remaining compressed output data to the output stream
/// and closes it.
Expand All @@ -187,7 +218,7 @@ protected override void Dispose(bool disposing)
}
}

#if NETSTANDARD2_1_OR_GREATER
#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_1_OR_GREATER
/// <inheritdoc cref="DeflaterOutputStream.Dispose"/>
public override async ValueTask DisposeAsync()
{
Expand Down Expand Up @@ -225,6 +256,16 @@ public override void Flush()
base.Flush();
}

/// <inheritdoc cref="Flush"/>
public override async Task FlushAsync(CancellationToken ct)
{
if (state_ == OutputState.Header)
{
await WriteHeaderAsync(ct).ConfigureAwait(false);
}
await base.FlushAsync(ct).ConfigureAwait(false);
}

#endregion Stream overrides

#region DeflaterOutputStream overrides
Expand All @@ -249,21 +290,13 @@ public override void Finish()
}
}

/// <inheritdoc cref="Flush"/>
public override async Task FlushAsync(CancellationToken ct)
{
await WriteHeaderAsync().ConfigureAwait(false);
await base.FlushAsync(ct).ConfigureAwait(false);
}


/// <inheritdoc cref="Finish"/>
public override async Task FinishAsync(CancellationToken ct)
{
// If no data has been written a header should be added.
if (state_ == OutputState.Header)
{
await WriteHeaderAsync().ConfigureAwait(false);
await WriteHeaderAsync(ct).ConfigureAwait(false);
}

if (state_ == OutputState.Footer)
Expand Down Expand Up @@ -305,7 +338,8 @@ private byte[] GetFooter()

private byte[] GetHeader()
{
var modTime = (int)((DateTime.Now.Ticks - new DateTime(1970, 1, 1).Ticks) / 10000000L); // Ticks give back 100ns intervals
var modifiedUtc = ModifiedTime?.ToUniversalTime() ?? DateTime.UtcNow;
var modTime = (int)((modifiedUtc - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).Ticks / 10000000L); // Ticks give back 100ns intervals
byte[] gzipHeader = {
// The two magic bytes
GZipConstants.ID1,
Expand Down Expand Up @@ -351,12 +385,12 @@ private void WriteHeader()
baseOutputStream_.Write(gzipHeader, 0, gzipHeader.Length);
}

private async Task WriteHeaderAsync()
private async Task WriteHeaderAsync(CancellationToken ct)
{
if (state_ != OutputState.Header) return;
state_ = OutputState.Footer;
var gzipHeader = GetHeader();
await baseOutputStream_.WriteAsync(gzipHeader, 0, gzipHeader.Length).ConfigureAwait(false);
await baseOutputStream_.WriteAsync(gzipHeader, 0, gzipHeader.Length, ct).ConfigureAwait(false);
}

#endregion Support Routines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,9 @@ protected void EncryptBlock(byte[] buffer, int offset, int length)
/// are processed.
/// </summary>
protected void Deflate()
{
Deflate(false);
}
=> DeflateSyncOrAsync(false, null).GetAwaiter().GetResult();

private void Deflate(bool flushing)
private async Task DeflateSyncOrAsync(bool flushing, CancellationToken? ct)
{
while (flushing || !deflater_.IsNeedingInput)
{
Expand All @@ -257,7 +255,14 @@ private void Deflate(bool flushing)

EncryptBlock(buffer_, 0, deflateCount);

baseOutputStream_.Write(buffer_, 0, deflateCount);
if (ct.HasValue)
{
await baseOutputStream_.WriteAsync(buffer_, 0, deflateCount, ct.Value).ConfigureAwait(false);
}
else
{
baseOutputStream_.Write(buffer_, 0, deflateCount);
}
}

if (!deflater_.IsNeedingInput)
Expand Down Expand Up @@ -383,10 +388,18 @@ public override int Read(byte[] buffer, int offset, int count)
public override void Flush()
{
deflater_.Flush();
Deflate(true);
DeflateSyncOrAsync(true, null).GetAwaiter().GetResult();
baseOutputStream_.Flush();
}

/// <inheritdoc/>
public override async Task FlushAsync(CancellationToken cancellationToken)
{
deflater_.Flush();
await DeflateSyncOrAsync(true, cancellationToken).ConfigureAwait(false);
await baseOutputStream_.FlushAsync(cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Calls <see cref="Finish"/> and closes the underlying
/// stream when <see cref="IsStreamOwner"></see> is true.
Expand Down Expand Up @@ -491,6 +504,13 @@ public override void Write(byte[] buffer, int offset, int count)
Deflate();
}

/// <inheritdoc />
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken ct)
{
deflater_.SetInput(buffer, offset, count);
await DeflateSyncOrAsync(false, ct).ConfigureAwait(false);
}

#endregion Stream Overrides

#region Instance Fields
Expand Down
35 changes: 27 additions & 8 deletions src/ICSharpCode.SharpZipLib/Zip/ZipOutputStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ await baseOutputStream_.WriteProcToStreamAsync(s =>
public void CloseEntry()
{
// Note: This method will run synchronously
FinishCompression(null).Wait();
FinishCompressionSyncOrAsync(null).GetAwaiter().GetResult();
WriteEntryFooter(baseOutputStream_);

// Patch the header if possible
Expand All @@ -566,7 +566,7 @@ public void CloseEntry()
curEntry = null;
}

private async Task FinishCompression(CancellationToken? ct)
private async Task FinishCompressionSyncOrAsync(CancellationToken? ct)
{
// Compression handled externally
if (entryIsPassthrough) return;
Expand Down Expand Up @@ -600,7 +600,7 @@ private async Task FinishCompression(CancellationToken? ct)
/// <inheritdoc cref="CloseEntry"/>
public async Task CloseEntryAsync(CancellationToken ct)
{
await FinishCompression(ct).ConfigureAwait(false);
await FinishCompressionSyncOrAsync(ct).ConfigureAwait(false);
await baseOutputStream_.WriteProcToStreamAsync(WriteEntryFooter, ct).ConfigureAwait(false);

// Patch the header if possible
Expand Down Expand Up @@ -767,9 +767,7 @@ private byte[] CreateZipCryptoHeader(long crcValue)
private void InitializeZipCryptoPassword(string password)
{
var pkManaged = new PkzipClassicManaged();
Console.WriteLine($"Output Encoding: {ZipCryptoEncoding.EncodingName}");
byte[] key = PkzipClassic.GenerateKeys(ZipCryptoEncoding.GetBytes(password));
Console.WriteLine($"Output Bytes: {string.Join(", ", key.Select(b => $"{b:x2}").ToArray())}");
cryptoTransform_ = pkManaged.CreateEncryptor(key, null);
}

Expand All @@ -782,6 +780,13 @@ private void InitializeZipCryptoPassword(string password)
/// <exception cref="ZipException">Archive size is invalid</exception>
/// <exception cref="System.InvalidOperationException">No entry is active.</exception>
public override void Write(byte[] buffer, int offset, int count)
=> WriteSyncOrAsync(buffer, offset, count, null).GetAwaiter().GetResult();

/// <inheritdoc />
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken ct)
=> await WriteSyncOrAsync(buffer, offset, count, ct).ConfigureAwait(false);

private async Task WriteSyncOrAsync(byte[] buffer, int offset, int count, CancellationToken? ct)
{
if (curEntry == null)
{
Expand Down Expand Up @@ -816,20 +821,34 @@ public override void Write(byte[] buffer, int offset, int count)

size += count;

if(curMethod == CompressionMethod.Stored || entryIsPassthrough)
if (curMethod == CompressionMethod.Stored || entryIsPassthrough)
{
if (Password != null)
{
CopyAndEncrypt(buffer, offset, count);
}
else
{
baseOutputStream_.Write(buffer, offset, count);
if (ct.HasValue)
{
await baseOutputStream_.WriteAsync(buffer, offset, count, ct.Value).ConfigureAwait(false);
}
else
{
baseOutputStream_.Write(buffer, offset, count);
}
}
}
else
{
base.Write(buffer, offset, count);
if (ct.HasValue)
{
await base.WriteAsync(buffer, offset, count, ct.Value).ConfigureAwait(false);
}
else
{
base.Write(buffer, offset, count);
}
}
}

Expand Down
48 changes: 45 additions & 3 deletions test/ICSharpCode.SharpZipLib.Tests/GZip/GZipAsyncTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.IO;
using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using ICSharpCode.SharpZipLib.GZip;
Expand All @@ -7,8 +8,6 @@

namespace ICSharpCode.SharpZipLib.Tests.GZip
{


[TestFixture]
public class GZipAsyncTests
{
Expand Down Expand Up @@ -140,5 +139,48 @@ public async Task EmptyGZipStreamAsync()
Assert.IsEmpty(content);
}
}

[Test]
[Category("GZip")]
[Category("Async")]
public async Task WriteGZipStreamToAsyncOnlyStream()
{
#if NETSTANDARD2_1 || NETCOREAPP3_0_OR_GREATER
var content = Encoding.ASCII.GetBytes("a");
var modTime = DateTime.UtcNow;

await using (var msAsync = new MemoryStreamWithoutSync())
{
await using (var outStream = new GZipOutputStream(msAsync) { IsStreamOwner = false })
{
outStream.ModifiedTime = modTime;
await outStream.WriteAsync(content);
}

using var msSync = new MemoryStream();
using (var outStream = new GZipOutputStream(msSync) { IsStreamOwner = false })
{
outStream.ModifiedTime = modTime;
outStream.Write(content);
}

var syncBytes = string.Join(' ', msSync.ToArray());
var asyncBytes = string.Join(' ', msAsync.ToArray());

Assert.AreEqual(syncBytes, asyncBytes, "Sync and Async compressed streams are not equal");

// Since GZipInputStream isn't async yet we need to read from it from a regular MemoryStream
using (var readStream = new MemoryStream(msAsync.ToArray()))
using (var inStream = new GZipInputStream(readStream))
using (var reader = new StreamReader(inStream))
{
Assert.AreEqual(content, await reader.ReadToEndAsync());
}
}
#else
await Task.CompletedTask;
Assert.Ignore("AsyncDispose is not supported");
#endif
}
}
}
8 changes: 5 additions & 3 deletions test/ICSharpCode.SharpZipLib.Tests/Zip/ZipStreamAsyncTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,19 @@ public async Task WriteReadOnlyZipStreamAsync ()
[Test]
[Category("Zip")]
[Category("Async")]
public async Task WriteZipStreamToAsyncOnlyStream ()
[TestCase(12, Description = "Small files")]
[TestCase(12000, Description = "Large files")]
public async Task WriteZipStreamToAsyncOnlyStream (int fileSize)
{
#if NETSTANDARD2_1 || NETCOREAPP3_0_OR_GREATER
await using(var ms = new MemoryStreamWithoutSync()){
await using(var outStream = new ZipOutputStream(ms) { IsStreamOwner = false })
{
await outStream.PutNextEntryAsync(new ZipEntry("FirstFile"));
await Utils.WriteDummyDataAsync(outStream, 12);
await Utils.WriteDummyDataAsync(outStream, fileSize);

await outStream.PutNextEntryAsync(new ZipEntry("SecondFile"));
await Utils.WriteDummyDataAsync(outStream, 12);
await Utils.WriteDummyDataAsync(outStream, fileSize);

await outStream.FinishAsync(CancellationToken.None);
await outStream.DisposeAsync();
Expand Down