Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-32240: [C#] Add new Apache.Arrow.Compression package to implement IPC decompression #33893

Merged
merged 11 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions csharp/Apache.Arrow.sln
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Flight.AspNetC
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.IntegrationTest", "test\Apache.Arrow.IntegrationTest\Apache.Arrow.IntegrationTest.csproj", "{E8264B7F-B680-4A55-939B-85DB628164BB}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Compression", "src\Apache.Arrow.Compression\Apache.Arrow.Compression.csproj", "{B62E77D2-D0B0-4C0C-BA78-1C117DE4C299}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Compression.Tests", "test\Apache.Arrow.Compression.Tests\Apache.Arrow.Compression.Tests.csproj", "{5D7FF380-B7DF-4752-B415-7C08C70C9F06}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -57,6 +61,14 @@ Global
{E8264B7F-B680-4A55-939B-85DB628164BB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E8264B7F-B680-4A55-939B-85DB628164BB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E8264B7F-B680-4A55-939B-85DB628164BB}.Release|Any CPU.Build.0 = Release|Any CPU
{B62E77D2-D0B0-4C0C-BA78-1C117DE4C299}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B62E77D2-D0B0-4C0C-BA78-1C117DE4C299}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B62E77D2-D0B0-4C0C-BA78-1C117DE4C299}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B62E77D2-D0B0-4C0C-BA78-1C117DE4C299}.Release|Any CPU.Build.0 = Release|Any CPU
{5D7FF380-B7DF-4752-B415-7C08C70C9F06}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5D7FF380-B7DF-4752-B415-7C08C70C9F06}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5D7FF380-B7DF-4752-B415-7C08C70C9F06}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5D7FF380-B7DF-4752-B415-7C08C70C9F06}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
10 changes: 10 additions & 0 deletions csharp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ for currently available features.
- File
- Stream

## IPC Format

### Compression

- Buffer compression is not supported when writing IPC files or streams
- Buffer decompression is supported, but requires installing the `Apache.Arrow.Compression` package,
and passing an `Apache.Arrow.Compression.CompressionCodecFactory` instance to the
`ArrowFileReader` or `ArrowStreamReader` constructor.
Alternatively, a custom implementation of `ICompressionCodecFactory` can be used.

## Not Implemented

- Serialization
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
adamreeve marked this conversation as resolved.
Show resolved Hide resolved
<Description>Provides decompression support for the Arrow IPC format</Description>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="CommunityToolkit.HighPerformance" Version="8.0.0" />
Copy link
Contributor Author

@adamreeve adamreeve Jan 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CommunityToolkit.HighPerformance dependency is only needed for the AsStream extension methods on Memory<byte> and ReadOnlyMemory<byte> which could be re-implemented if we wanted to minimize extra dependencies.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that the LZ4 library offers APIs that takes Spans. We should just use those APIs instead, and then we can get rid of this dependency.

https://github.com/MiloszKrajewski/K4os.Compression.LZ4/blob/fa8b8e038b500d565efe12769db097852a28ddf7/src/K4os.Compression.LZ4/LZ4Codec.cs#L139-L144

Note that there are 2 LZ4 libraries - one for streams (which this pr is using now) and one that doesn't use streams - https://www.nuget.org/packages/K4os.Compression.LZ4

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two libraries actually implement different formats. The K4os.Compression.LZ4.Streams library implements the LZ4 frame format, and the K4os.Compression.LZ4 library implements the LZ4 block format. Arrow IPC uses the frame format (https://github.com/apache/arrow/blob/apache-arrow-11.0.0/format/Message.fbs#L46-L48), so we need to use the streams library.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened MiloszKrajewski/K4os.Compression.LZ4#79 to request this is added to the K4os.Compression.LZ4 library.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Eric, your issue prompted me to dig more into the K4os library though and I realised that since I initially started working on this, a new API has been added that allows using the frame format with more types than just Stream, so I've switched to this and removed the CommunityToolkit.HighPerformance dependency.

<PackageReference Include="K4os.Compression.LZ4.Streams" Version="1.3.5" />
<PackageReference Include="ZstdSharp.Port" Version="0.6.7" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Apache.Arrow\Apache.Arrow.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
using System;
using Apache.Arrow.Ipc;

namespace Apache.Arrow.Tests.Compression
namespace Apache.Arrow.Compression
{
internal sealed class CompressionCodecFactory : ICompressionCodecFactory
/// <summary>
/// Creates compression codec implementations for decompressing Arrow IPC data
/// </summary>
public sealed class CompressionCodecFactory : ICompressionCodecFactory
{
public ICompressionCodec CreateCodec(CompressionCodecType compressionCodecType)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
using CommunityToolkit.HighPerformance;
using K4os.Compression.LZ4.Streams;

namespace Apache.Arrow.Tests.Compression
namespace Apache.Arrow.Compression
{
internal sealed class Lz4CompressionCodec : ICompressionCodec
eerhardt marked this conversation as resolved.
Show resolved Hide resolved
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
using Apache.Arrow.Ipc;
using ZstdSharp;

namespace Apache.Arrow.Tests.Compression
namespace Apache.Arrow.Compression
{
internal sealed class ZstdCompressionCodec : ICompressionCodec
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>

<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="xunit" Version="2.4.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" />
<PackageReference Include="coverlet.collector" Version="1.2.0" />
adamreeve marked this conversation as resolved.
Show resolved Hide resolved
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Apache.Arrow\Apache.Arrow.csproj" />
<ProjectReference Include="..\..\src\Apache.Arrow.Compression\Apache.Arrow.Compression.csproj" />
</ItemGroup>

<ItemGroup>
<None Remove="Resources\ipc_lz4_compression.arrow" />
<EmbeddedResource Include="Resources\ipc_lz4_compression.arrow" />
<None Remove="Resources\ipc_lz4_compression.arrow_stream" />
<EmbeddedResource Include="Resources\ipc_lz4_compression.arrow_stream" />
<None Remove="Resources\ipc_zstd_compression.arrow" />
<EmbeddedResource Include="Resources\ipc_zstd_compression.arrow" />
<None Remove="Resources\ipc_zstd_compression.arrow_stream" />
<EmbeddedResource Include="Resources\ipc_zstd_compression.arrow_stream" />
adamreeve marked this conversation as resolved.
Show resolved Hide resolved
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Apache.Arrow.Ipc;
using System;
using System.Reflection;
using Xunit;

namespace Apache.Arrow.Compression.Tests
{
public class ArrowFileReaderTests
{
[Theory]
[InlineData("ipc_lz4_compression.arrow")]
[InlineData("ipc_zstd_compression.arrow")]
public void CanReadCompressedIpcFile(string fileName)
{
var assembly = Assembly.GetExecutingAssembly();
using var stream = assembly.GetManifestResourceStream($"Apache.Arrow.Compression.Tests.Resources.{fileName}");
Assert.NotNull(stream);
var codecFactory = new CompressionCodecFactory();
using var reader = new ArrowFileReader(stream, codecFactory);

var batch = reader.ReadNextRecordBatch();

var intArray = (Int32Array) batch.Column("integers");
var floatArray = (FloatArray) batch.Column("floats");

const int numRows = 100;
Assert.Equal(numRows, intArray.Length);
Assert.Equal(numRows, floatArray.Length);

for (var i = 0; i < numRows; ++i)
{
Assert.Equal(i, intArray.GetValue(i));
Assert.True(Math.Abs(floatArray.GetValue(i).Value - 0.1f * i) < 1.0e-6);
}
}

[Fact]
public void ErrorReadingCompressedFileWithoutCodecFactory()
westonpace marked this conversation as resolved.
Show resolved Hide resolved
{
var assembly = Assembly.GetExecutingAssembly();
using var stream = assembly.GetManifestResourceStream("Apache.Arrow.Compression.Tests.Resources.ipc_lz4_compression.arrow");
Assert.NotNull(stream);
using var reader = new ArrowFileReader(stream);

var exception = Assert.Throws<Exception>(() => reader.ReadNextRecordBatch());
Assert.Contains("no ICompressionCodecFactory has been configured", exception.Message);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Apache.Arrow.Ipc;
using System;
using System.Reflection;
using Xunit;

namespace Apache.Arrow.Compression.Tests
{
public class ArrowStreamReaderTests
{
[Theory]
[InlineData("ipc_lz4_compression.arrow_stream")]
[InlineData("ipc_zstd_compression.arrow_stream")]
public void CanReadCompressedIpcStream(string fileName)
{
var assembly = Assembly.GetExecutingAssembly();
using var stream = assembly.GetManifestResourceStream($"Apache.Arrow.Compression.Tests.Resources.{fileName}");
Assert.NotNull(stream);
var codecFactory = new CompressionCodecFactory();
using var reader = new ArrowStreamReader(stream, codecFactory);

var batch = reader.ReadNextRecordBatch();

var intArray = (Int32Array) batch.Column("integers");
var floatArray = (FloatArray) batch.Column("floats");

const int numRows = 100;
Assert.Equal(numRows, intArray.Length);
Assert.Equal(numRows, floatArray.Length);

for (var i = 0; i < numRows; ++i)
{
Assert.Equal(i, intArray.GetValue(i));
Assert.True(Math.Abs(floatArray.GetValue(i).Value - 0.1f * i) < 1.0e-6);
}
}
}
}

14 changes: 0 additions & 14 deletions csharp/test/Apache.Arrow.Tests/Apache.Arrow.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,10 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
<PackageReference Include="CommunityToolkit.HighPerformance" Version="8.0.0" />
<PackageReference Include="K4os.Compression.LZ4.Streams" Version="1.3.5" />
<PackageReference Include="ZstdSharp.Port" Version="0.6.7" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Apache.Arrow\Apache.Arrow.csproj" />
</ItemGroup>

<ItemGroup>
<None Remove="Resources\ipc_lz4_compression.arrow" />
<EmbeddedResource Include="Resources\ipc_lz4_compression.arrow" />
<None Remove="Resources\ipc_zstd_compression.arrow" />
<EmbeddedResource Include="Resources\ipc_zstd_compression.arrow" />
<None Remove="Resources\ipc_lz4_compression.arrow_stream" />
<EmbeddedResource Include="Resources\ipc_lz4_compression.arrow_stream" />
<None Remove="Resources\ipc_zstd_compression.arrow_stream" />
<EmbeddedResource Include="Resources\ipc_zstd_compression.arrow_stream" />
</ItemGroup>

</Project>
37 changes: 0 additions & 37 deletions csharp/test/Apache.Arrow.Tests/ArrowFileReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,42 +168,5 @@ public void TestRecordBatchBasics()

recordBatch.Dispose();
}

[Theory]
[InlineData("ipc_lz4_compression.arrow")]
[InlineData("ipc_zstd_compression.arrow")]
public void CanReadCompressedIpcFile(string fileName)
{
var assembly = Assembly.GetExecutingAssembly();
using var stream = assembly.GetManifestResourceStream($"Apache.Arrow.Tests.Resources.{fileName}");
var codecFactory = new Compression.CompressionCodecFactory();
using var reader = new ArrowFileReader(stream, codecFactory);

var batch = reader.ReadNextRecordBatch();

var intArray = (Int32Array) batch.Column("integers");
var floatArray = (FloatArray) batch.Column("floats");

const int numRows = 100;
Assert.Equal(numRows, intArray.Length);
Assert.Equal(numRows, floatArray.Length);

for (var i = 0; i < numRows; ++i)
{
Assert.Equal(i, intArray.GetValue(i));
Assert.True(Math.Abs(floatArray.GetValue(i).Value - 0.1f * i) < 1.0e-6);
}
}

[Fact]
public void ErrorReadingCompressedFileWithoutCodecFactory()
{
var assembly = Assembly.GetExecutingAssembly();
using var stream = assembly.GetManifestResourceStream("Apache.Arrow.Tests.Resources.ipc_lz4_compression.arrow");
using var reader = new ArrowFileReader(stream);

var exception = Assert.Throws<Exception>(() => reader.ReadNextRecordBatch());
Assert.Contains("no ICompressionCodecFactory has been configured", exception.Message);
}
}
}
26 changes: 0 additions & 26 deletions csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,32 +195,6 @@ public async Task ReadRecordBatchAsync_PartialReadStream(bool createDictionaryAr
await TestReaderFromPartialReadStream(ArrowReaderVerifier.VerifyReaderAsync, createDictionaryArray);
}

[Theory]
[InlineData("ipc_lz4_compression.arrow_stream")]
[InlineData("ipc_zstd_compression.arrow_stream")]
public void CanReadCompressedIpcStream(string fileName)
{
var assembly = Assembly.GetExecutingAssembly();
using var stream = assembly.GetManifestResourceStream($"Apache.Arrow.Tests.Resources.{fileName}");
var codecFactory = new Compression.CompressionCodecFactory();
using var reader = new ArrowStreamReader(stream, codecFactory);

var batch = reader.ReadNextRecordBatch();

var intArray = (Int32Array) batch.Column("integers");
var floatArray = (FloatArray) batch.Column("floats");

const int numRows = 100;
Assert.Equal(numRows, intArray.Length);
Assert.Equal(numRows, floatArray.Length);

for (var i = 0; i < numRows; ++i)
{
Assert.Equal(i, intArray.GetValue(i));
Assert.True(Math.Abs(floatArray.GetValue(i).Value - 0.1f * i) < 1.0e-6);
}
}

/// <summary>
/// Verifies that the stream reader reads multiple times when a stream
/// only returns a subset of the data from each Read.
Expand Down
1 change: 1 addition & 0 deletions dev/release/post-06-csharp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ base_names=()
base_names+=(Apache.Arrow.${version})
base_names+=(Apache.Arrow.Flight.${version})
base_names+=(Apache.Arrow.Flight.AspNetCore.${version})
base_names+=(Apache.Arrow.Compression.${version})
for base_name in ${base_names[@]}; do
for extension in nupkg snupkg; do
path=${base_name}.${extension}
Expand Down
2 changes: 2 additions & 0 deletions dev/release/rat_exclude_files.txt
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,13 @@ csharp/src/Apache.Arrow/Properties/Resources.Designer.cs
csharp/src/Apache.Arrow/Properties/Resources.resx
csharp/src/Apache.Arrow.Flight/Apache.Arrow.Flight.csproj
csharp/src/Apache.Arrow.Flight.AspNetCore/Apache.Arrow.Flight.AspNetCore.csproj
csharp/src/Apache.Arrow.Compression/Apache.Arrow.Compression.csproj
csharp/test/Apache.Arrow.Benchmarks/Apache.Arrow.Benchmarks.csproj
csharp/test/Apache.Arrow.Flight.Tests/Apache.Arrow.Flight.Tests.csproj
csharp/test/Apache.Arrow.Flight.TestWeb/Apache.Arrow.Flight.TestWeb.csproj
csharp/test/Apache.Arrow.IntegrationTest/Apache.Arrow.IntegrationTest.csproj
csharp/test/Apache.Arrow.Tests/Apache.Arrow.Tests.csproj
csharp/test/Apache.Arrow.Compression.Tests/Apache.Arrow.Compression.Tests.csproj
csharp/test/Apache.Arrow.Tests/app.config
*.html
*.sgml
Expand Down
2 changes: 2 additions & 0 deletions dev/tasks/tasks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,8 @@ tasks:
- Apache.Arrow.Flight.AspNetCore.{no_rc_version}.snupkg
- Apache.Arrow.Flight.{no_rc_version}.nupkg
- Apache.Arrow.Flight.{no_rc_version}.snupkg
- Apache.Arrow.Compression.{no_rc_version}.nupkg
- Apache.Arrow.Compression.{no_rc_version}.snupkg
- Apache.Arrow.{no_rc_version}.nupkg
- Apache.Arrow.{no_rc_version}.snupkg

Expand Down
4 changes: 3 additions & 1 deletion docs/source/status.rst
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ IPC Format
+-----------------------------+-------+-------+-------+------------+-------+-------+-------+
| Sparse tensors | ✓ | | | | | | |
+-----------------------------+-------+-------+-------+------------+-------+-------+-------+
| Buffer compression | ✓ | ✓ (3) | ✓ | | | ✓ | ✓ |
| Buffer compression | ✓ | ✓ (3) | ✓ | | ✓ (4) | ✓ | ✓ |
+-----------------------------+-------+-------+-------+------------+-------+-------+-------+
| Endianness conversion | ✓ (2) | | ✓ (2) | | | | |
+-----------------------------+-------+-------+-------+------------+-------+-------+-------+
Expand All @@ -143,6 +143,8 @@ Notes:

* \(3) LZ4 Codec currently is quite inefficient. ARROW-11901 tracks improving performance.

* \(4) Compression when writing is not supported, only decompression when reading.

.. seealso::
The :ref:`format-ipc` specification.

Expand Down