Skip to content

Commit

Permalink
GH-41640: [Go] Implement BYTE_STREAM_SPLIT Parquet Encoding (#43066)
Browse files Browse the repository at this point in the history
### Rationale for this change

This encoding is defined by the [Parquet spec](https://github.com/apache/parquet-format/blob/master/Encodings.md#byte-stream-split-byte_stream_split--9) but does not currently have a Go implementation.

### What changes are included in this PR?

Implement BYTE_STREAM_SPLIT encoder/decoder for:
- FIXED_LEN_BYTE_ARRAY
- FLOAT
- DOUBLE
- INT32
- INT64

### Are these changes tested?

Yes. See unit tests, file read conformance tests, and benchmarks.

**Benchmark results on my machine**
```
➜  go git:(impl-pq-bytestreamsplit) go test ./parquet/internal/encoding -run=^$ -bench=BenchmarkByteStreamSplit -benchmem 
goos: darwin
goarch: arm64
pkg: github.com/apache/arrow/go/v17/parquet/internal/encoding
BenchmarkByteStreamSplitEncodingInt32/len_1024-14                 502117              2005 ns/op        2043.37 MB/s        5267 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingInt32/len_2048-14                 328921              3718 ns/op        2203.54 MB/s        9879 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingInt32/len_4096-14                 169642              7083 ns/op        2313.14 MB/s       18852 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingInt32/len_8192-14                  82503             14094 ns/op        2324.99 MB/s       41425 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingInt32/len_16384-14                 45006             26841 ns/op        2441.68 MB/s       74286 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingInt32/len_32768-14                 23433             51233 ns/op        2558.33 MB/s      140093 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingInt32/len_65536-14                 12019             99001 ns/op        2647.90 MB/s      271417 B/op          3 allocs/op
BenchmarkByteStreamSplitDecodingInt32/len_1024-14                 996573              1199 ns/op        3417.00 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt32/len_2048-14                 503200              2380 ns/op        3442.18 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt32/len_4096-14                 252038              4748 ns/op        3450.90 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt32/len_8192-14                 122419              9793 ns/op        3346.08 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt32/len_16384-14                 63321             19040 ns/op        3442.00 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt32/len_32768-14                 31051             38677 ns/op        3388.89 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt32/len_65536-14                 15792             77931 ns/op        3363.80 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt32Batched/len_1024-14                  981043              1221 ns/op        3354.53 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt32Batched/len_2048-14                  492319              2424 ns/op        3379.34 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt32Batched/len_4096-14                  248062              4850 ns/op        3378.20 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt32Batched/len_8192-14                  123064              9903 ns/op        3308.87 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt32Batched/len_16384-14                  61845             19567 ns/op        3349.29 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt32Batched/len_32768-14                  30568             39456 ns/op        3321.96 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt32Batched/len_65536-14                  15172             78762 ns/op        3328.30 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitEncodingInt64/len_1024-14                         319006              3690 ns/op        2220.13 MB/s        9880 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingInt64/len_2048-14                         161006              7132 ns/op        2297.30 MB/s       18853 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingInt64/len_4096-14                          85783             13925 ns/op        2353.12 MB/s       41421 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingInt64/len_8192-14                          45015             26943 ns/op        2432.43 MB/s       74312 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingInt64/len_16384-14                         20352             59259 ns/op        2211.84 MB/s      139940 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingInt64/len_32768-14                         10000            111143 ns/op        2358.61 MB/s      271642 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingInt64/len_65536-14                          5529            212652 ns/op        2465.47 MB/s      534805 B/op          3 allocs/op
BenchmarkByteStreamSplitDecodingInt64/len_1024-14                         528987              2355 ns/op        3478.32 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt64/len_2048-14                         262707              4701 ns/op        3485.08 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt64/len_4096-14                         129212              9313 ns/op        3518.63 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt64/len_8192-14                          53746             23315 ns/op        2810.90 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt64/len_16384-14                         28782             41054 ns/op        3192.65 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt64/len_32768-14                         14803             80157 ns/op        3270.39 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingInt64/len_65536-14                          7484            164111 ns/op        3194.72 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitEncodingFixedLenByteArray/len_1024-14             291716              4107 ns/op         997.43 MB/s        5276 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingFixedLenByteArray/len_2048-14             148888              7975 ns/op        1027.18 MB/s        9914 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingFixedLenByteArray/len_4096-14              76587             15677 ns/op        1045.11 MB/s       18955 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingFixedLenByteArray/len_8192-14              39758             30277 ns/op        1082.26 MB/s       41752 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingFixedLenByteArray/len_16384-14             20306             59506 ns/op        1101.33 MB/s       74937 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingFixedLenByteArray/len_32768-14             10000            116043 ns/op        1129.52 MB/s      141290 B/op          3 allocs/op
BenchmarkByteStreamSplitEncodingFixedLenByteArray/len_65536-14              4770            236887 ns/op        1106.62 MB/s      277583 B/op          3 allocs/op
BenchmarkByteStreamSplitDecodingFixedLenByteArray/len_1024-14             601875              1723 ns/op        2376.70 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingFixedLenByteArray/len_2048-14             363206              3422 ns/op        2394.18 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingFixedLenByteArray/len_4096-14             173041              6906 ns/op        2372.45 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingFixedLenByteArray/len_8192-14              81810             14307 ns/op        2290.40 MB/s           0 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingFixedLenByteArray/len_16384-14             40518             29101 ns/op        2252.04 MB/s           1 B/op          0 allocs/op
BenchmarkByteStreamSplitDecodingFixedLenByteArray/len_32768-14             21338             56678 ns/op        2312.58 MB/s           6 B/op          1 allocs/op
BenchmarkByteStreamSplitDecodingFixedLenByteArray/len_65536-14             10000            111433 ns/op        2352.49 MB/s          26 B/op          6 allocs/op
PASS
ok      github.com/apache/arrow/go/v17/parquet/internal/encoding        69.109s
```

### Are there any user-facing changes?

New ByteStreamSplit encoding option available. Godoc updated to reflect this.

* GitHub Issue: #41640

Authored-by: Joel Lubinitsky <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
joellubi authored Jul 9, 2024
1 parent 0c4d6c7 commit 89fd566
Show file tree
Hide file tree
Showing 13 changed files with 945 additions and 5 deletions.
14 changes: 12 additions & 2 deletions go/parquet/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,18 @@
// # Encodings
//
// The encoding types supported in this package are:
// Plain, Plain/RLE Dictionary, Delta Binary Packed (only integer types), Delta Byte Array
// (only ByteArray), Delta Length Byte Array (only ByteArray)
//
// - Plain
//
// - Plain/RLE Dictionary
//
// - Delta Binary Packed (only integer types)
//
// - Delta Byte Array (only ByteArray)
//
// - Delta Length Byte Array (only ByteArray)
//
// - Byte Stream Split (Float, Double, Int32, Int64, FixedLenByteArray)
//
// Tip: Some platforms don't necessarily support all kinds of encodings. If you're not
// sure what to use, just use Plain and Dictionary encoding.
Expand Down
5 changes: 2 additions & 3 deletions go/parquet/file/column_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,13 +354,12 @@ func (c *columnChunkReader) initDataDecoder(page Page, lvlByteLen int64) error {
case format.Encoding_PLAIN,
format.Encoding_DELTA_BYTE_ARRAY,
format.Encoding_DELTA_LENGTH_BYTE_ARRAY,
format.Encoding_DELTA_BINARY_PACKED:
format.Encoding_DELTA_BINARY_PACKED,
format.Encoding_BYTE_STREAM_SPLIT:
c.curDecoder = c.decoderTraits.Decoder(parquet.Encoding(encoding), c.descr, false, c.mem)
c.decoders[encoding] = c.curDecoder
case format.Encoding_RLE_DICTIONARY:
return errors.New("parquet: dictionary page must be before data page")
case format.Encoding_BYTE_STREAM_SPLIT:
return fmt.Errorf("parquet: unsupported data encoding %s", encoding)
default:
return fmt.Errorf("parquet: unknown encoding type %s", encoding)
}
Expand Down
9 changes: 9 additions & 0 deletions go/parquet/file/column_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,15 @@ func (p *PrimitiveWriterTestSuite) TestRequiredPlain() {
p.testRequiredWithEncoding(parquet.Encodings.Plain)
}

func (p *PrimitiveWriterTestSuite) TestRequiredByteStreamSplit() {
switch p.Typ {
case reflect.TypeOf(float32(0)), reflect.TypeOf(float64(0)), reflect.TypeOf(int32(0)), reflect.TypeOf(int64(0)), reflect.TypeOf(parquet.FixedLenByteArray{}):
p.testRequiredWithEncoding(parquet.Encodings.ByteStreamSplit)
default:
p.Panics(func() { p.testRequiredWithEncoding(parquet.Encodings.ByteStreamSplit) })
}
}

func (p *PrimitiveWriterTestSuite) TestRequiredDictionary() {
p.testRequiredWithEncoding(parquet.Encodings.PlainDict)
}
Expand Down
136 changes: 136 additions & 0 deletions go/parquet/file/file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"crypto/rand"
"encoding/binary"
"fmt"
"io"
"os"
"path"
Expand Down Expand Up @@ -446,3 +447,138 @@ func TestRleBooleanEncodingFileRead(t *testing.T) {

assert.Equal(t, expected, values[:len(expected)])
}

func TestByteStreamSplitEncodingFileRead(t *testing.T) {
dir := os.Getenv("PARQUET_TEST_DATA")
if dir == "" {
t.Skip("no path supplied with PARQUET_TEST_DATA")
}
require.DirExists(t, dir)

props := parquet.NewReaderProperties(memory.DefaultAllocator)
fileReader, err := file.OpenParquetFile(path.Join(dir, "byte_stream_split_extended.gzip.parquet"),
false, file.WithReadProps(props))
require.NoError(t, err)
defer fileReader.Close()

nRows := 200
nCols := 14
require.Equal(t, 1, fileReader.NumRowGroups())
rgr := fileReader.RowGroup(0)
require.EqualValues(t, nRows, rgr.NumRows())
require.EqualValues(t, nCols, rgr.NumColumns())

// Helper to unpack values from column of a specific type
getValues := func(rdr file.ColumnChunkReader, typ parquet.Type) any {
var (
vals any
total int64
read int
err error
)

switch typ {
case parquet.Types.FixedLenByteArray:
r, ok := rdr.(*file.FixedLenByteArrayColumnChunkReader)
require.True(t, ok)

values := make([]parquet.FixedLenByteArray, nRows)
total, read, err = r.ReadBatch(int64(nRows), values, nil, nil)
vals = values
case parquet.Types.Float:
r, ok := rdr.(*file.Float32ColumnChunkReader)
require.True(t, ok)

values := make([]float32, nRows)
total, read, err = r.ReadBatch(int64(nRows), values, nil, nil)
vals = values
case parquet.Types.Double:
r, ok := rdr.(*file.Float64ColumnChunkReader)
require.True(t, ok)

values := make([]float64, nRows)
total, read, err = r.ReadBatch(int64(nRows), values, nil, nil)
vals = values
case parquet.Types.Int32:
r, ok := rdr.(*file.Int32ColumnChunkReader)
require.True(t, ok)

values := make([]int32, nRows)
total, read, err = r.ReadBatch(int64(nRows), values, nil, nil)
vals = values
case parquet.Types.Int64:
r, ok := rdr.(*file.Int64ColumnChunkReader)
require.True(t, ok)

values := make([]int64, nRows)
total, read, err = r.ReadBatch(int64(nRows), values, nil, nil)
vals = values
default:
t.Fatalf("unrecognized parquet type: %s", typ)
}

require.NoError(t, err)
require.EqualValues(t, nRows, total)
require.EqualValues(t, nRows, read)

return vals
}

// Test conformance against Parquet reference
// Expected structure: https://github.com/apache/parquet-testing/blob/1bf4bd39df2135d132451c281754268f03dc1c0e/data/README.md?plain=1#L358
for i, tc := range []struct {
PhysicalType parquet.Type
LogicalType schema.LogicalType
}{
{
PhysicalType: parquet.Types.FixedLenByteArray,
LogicalType: schema.Float16LogicalType{},
},
{
PhysicalType: parquet.Types.Float,
LogicalType: schema.NoLogicalType{},
},
{
PhysicalType: parquet.Types.Double,
LogicalType: schema.NoLogicalType{},
},
{
PhysicalType: parquet.Types.Int32,
LogicalType: schema.NoLogicalType{},
},
{
PhysicalType: parquet.Types.Int64,
LogicalType: schema.NoLogicalType{},
},
{
PhysicalType: parquet.Types.FixedLenByteArray,
LogicalType: schema.NoLogicalType{},
},
{
PhysicalType: parquet.Types.FixedLenByteArray,
LogicalType: schema.NewDecimalLogicalType(7, 3),
},
} {
t.Run(fmt.Sprintf("(Physical:%s/Logical:%s)", tc.PhysicalType, tc.LogicalType), func(t *testing.T) {
// Iterate through pairs of adjacent columns
colIdx := 2 * i

// Read Plain-encoded column
rdrPlain, err := rgr.Column(colIdx)
require.NoError(t, err)

// Read ByteStreamSplit-encoded column
rdrByteStreamSplit, err := rgr.Column(colIdx + 1)
require.NoError(t, err)

// Logical types match
require.True(t, rdrPlain.Descriptor().LogicalType().Equals(tc.LogicalType))
require.True(t, rdrByteStreamSplit.Descriptor().LogicalType().Equals(tc.LogicalType))

// Decoded values match
valuesPlain := getValues(rdrPlain, tc.PhysicalType)
valuesByteStreamSplit := getValues(rdrByteStreamSplit, tc.PhysicalType)
require.Equal(t, valuesPlain, valuesByteStreamSplit)
})
}
}
76 changes: 76 additions & 0 deletions go/parquet/file/file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,3 +464,79 @@ func TestCloseError(t *testing.T) {
writer := file.NewParquetWriter(sink, sc)
assert.Error(t, writer.Close())
}

func TestBatchedByteStreamSplitFileRoundtrip(t *testing.T) {
input := []parquet.FixedLenByteArray{
{1, 2},
{3, 4},
{5, 6},
{7, 8},
}

size := len(input)
chunk := size / 2

props := parquet.NewWriterProperties(
parquet.WithEncoding(parquet.Encodings.ByteStreamSplit),
parquet.WithDictionaryDefault(false),
parquet.WithBatchSize(int64(chunk)),
parquet.WithDataPageSize(int64(size)*2),
)

field, err := schema.NewPrimitiveNodeLogical("f16", parquet.Repetitions.Required, schema.Float16LogicalType{}, parquet.Types.FixedLenByteArray, 2, 1)
require.NoError(t, err)

schema, err := schema.NewGroupNode("test", parquet.Repetitions.Required, schema.FieldList{field}, 0)
require.NoError(t, err)

sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
writer := file.NewParquetWriter(sink, schema, file.WithWriterProps(props))

rgw := writer.AppendRowGroup()
cw, err := rgw.NextColumn()
require.NoError(t, err)

f16ColumnWriter, ok := cw.(*file.FixedLenByteArrayColumnChunkWriter)
require.True(t, ok)

nVals, err := f16ColumnWriter.WriteBatch(input[:chunk], nil, nil)
require.NoError(t, err)
require.EqualValues(t, chunk, nVals)

nVals, err = f16ColumnWriter.WriteBatch(input[chunk:], nil, nil)
require.NoError(t, err)
require.EqualValues(t, chunk, nVals)

require.NoError(t, cw.Close())
require.NoError(t, rgw.Close())
require.NoError(t, writer.Close())

rdr, err := file.NewParquetReader(bytes.NewReader(sink.Bytes()))
require.NoError(t, err)

require.Equal(t, 1, rdr.NumRowGroups())
require.EqualValues(t, size, rdr.NumRows())

rgr := rdr.RowGroup(0)
cr, err := rgr.Column(0)
require.NoError(t, err)

f16ColumnReader, ok := cr.(*file.FixedLenByteArrayColumnChunkReader)
require.True(t, ok)

output := make([]parquet.FixedLenByteArray, size)

total, valuesRead, err := f16ColumnReader.ReadBatch(int64(chunk), output[:chunk], nil, nil)
require.NoError(t, err)
require.EqualValues(t, chunk, total)
require.EqualValues(t, chunk, valuesRead)

total, valuesRead, err = f16ColumnReader.ReadBatch(int64(chunk), output[chunk:], nil, nil)
require.NoError(t, err)
require.EqualValues(t, chunk, total)
require.EqualValues(t, chunk, valuesRead)

require.Equal(t, input, output)

require.NoError(t, rdr.Close())
}
Loading

0 comments on commit 89fd566

Please sign in to comment.