From dfdebdd9199e51b92ed372220b7f33b1aab2d37b Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 15 Nov 2023 23:38:34 +0800 Subject: [PATCH] GH-38503: [Go][Parquet] Style improvement for using ArrowColumnWriter (#38581) ### Rationale for this change Currently, `ArrowColumnWriter` seems not having bug. But the usage is confusing. For nested type, `ArrowColumnWriter` should considering the logic below: ``` /// 0 foo.bar /// foo.bar.baz 0 /// foo.bar.baz2 1 /// foo.qux 2 /// 1 foo2 3 /// 2 foo3 4 ``` The left column is the column in root of `arrow::Schema`, the parquet itself only stores Leaf node, so, the column id for parquet is list at right. In the `ArrowColumnWriter`, the final argument is the LeafIdx in parquet, so, writer should considering using `leafIdx`. Also, it need a `LeafCount` API for getting the leaf-count here. ### What changes are included in this PR? Style enhancement for `LeafCount`, `leafIdx` and usage for `ArrowColumnWriter` ### Are these changes tested? no ### Are there any user-facing changes? no * Closes: #38503 Authored-by: mwish Signed-off-by: Matt Topol --- go/parquet/internal/encoding/levels.go | 12 ++++++------ go/parquet/pqarrow/encode_arrow.go | 10 +++++++--- go/parquet/pqarrow/encode_arrow_test.go | 12 +++++++++--- go/parquet/pqarrow/path_builder.go | 4 ++-- 4 files changed, 24 insertions(+), 14 deletions(-) diff --git a/go/parquet/internal/encoding/levels.go b/go/parquet/internal/encoding/levels.go index caf832059334b..2a6dc24933714 100644 --- a/go/parquet/internal/encoding/levels.go +++ b/go/parquet/internal/encoding/levels.go @@ -19,6 +19,7 @@ package encoding import ( "bytes" "encoding/binary" + "errors" "fmt" "math/bits" @@ -28,7 +29,6 @@ import ( "github.com/apache/arrow/go/v15/parquet" format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet" "github.com/apache/arrow/go/v15/parquet/internal/utils" - "golang.org/x/xerrors" ) // LevelEncoder is for handling the encoding of Definition and Repetition levels @@ -194,12 +194,12 @@ func (l *LevelDecoder) SetData(encoding parquet.Encoding, maxLvl int16, nbuffere switch encoding { case parquet.Encodings.RLE: if len(data) < 4 { - return 0, xerrors.New("parquet: received invalid levels (corrupt data page?)") + return 0, errors.New("parquet: received invalid levels (corrupt data page?)") } nbytes := int32(binary.LittleEndian.Uint32(data[:4])) if nbytes < 0 || nbytes > int32(len(data)-4) { - return 0, xerrors.New("parquet: received invalid number of bytes (corrupt data page?)") + return 0, errors.New("parquet: received invalid number of bytes (corrupt data page?)") } buf := data[4:] @@ -212,12 +212,12 @@ func (l *LevelDecoder) SetData(encoding parquet.Encoding, maxLvl int16, nbuffere case parquet.Encodings.BitPacked: nbits, ok := overflow.Mul(nbuffered, l.bitWidth) if !ok { - return 0, xerrors.New("parquet: number of buffered values too large (corrupt data page?)") + return 0, errors.New("parquet: number of buffered values too large (corrupt data page?)") } nbytes := bitutil.BytesForBits(int64(nbits)) if nbytes < 0 || nbytes > int64(len(data)) { - return 0, xerrors.New("parquet: recieved invalid number of bytes (corrupt data page?)") + return 0, errors.New("parquet: received invalid number of bytes (corrupt data page?)") } if l.bit == nil { l.bit = utils.NewBitReader(bytes.NewReader(data)) @@ -234,7 +234,7 @@ func (l *LevelDecoder) SetData(encoding parquet.Encoding, maxLvl int16, nbuffere // run length encoding. func (l *LevelDecoder) SetDataV2(nbytes int32, maxLvl int16, nbuffered int, data []byte) error { if nbytes < 0 { - return xerrors.New("parquet: invalid page header (corrupt data page?)") + return errors.New("parquet: invalid page header (corrupt data page?)") } l.maxLvl = maxLvl diff --git a/go/parquet/pqarrow/encode_arrow.go b/go/parquet/pqarrow/encode_arrow.go index 1855d3625adb7..4989837cd03bc 100644 --- a/go/parquet/pqarrow/encode_arrow.go +++ b/go/parquet/pqarrow/encode_arrow.go @@ -81,7 +81,7 @@ type ArrowColumnWriter struct { // // Using an arrow column writer is a convenience to avoid having to process the arrow array yourself // and determine the correct definition and repetition levels manually. -func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, col int) (ArrowColumnWriter, error) { +func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (ArrowColumnWriter, error) { if data.Len() == 0 { return ArrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil } @@ -118,7 +118,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch // which is the one this instance will start writing for // colIdx := rgw.CurrentColumn() + 1 - schemaField, err := manifest.GetColumnField(col) + schemaField, err := manifest.GetColumnField(leafColIdx) if err != nil { return ArrowColumnWriter{}, err } @@ -153,7 +153,11 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch values += chunkWriteSize } - return ArrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: col}, nil + return ArrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: leafColIdx}, nil +} + +func (acw *ArrowColumnWriter) LeafCount() int { + return acw.leafCount } func (acw *ArrowColumnWriter) Write(ctx context.Context) error { diff --git a/go/parquet/pqarrow/encode_arrow_test.go b/go/parquet/pqarrow/encode_arrow_test.go index d588aff701f3d..712a003c63ad6 100644 --- a/go/parquet/pqarrow/encode_arrow_test.go +++ b/go/parquet/pqarrow/encode_arrow_test.go @@ -145,10 +145,12 @@ func TestWriteArrowCols(t *testing.T) { srgw := writer.AppendRowGroup() ctx := pqarrow.NewArrowWriteContext(context.TODO(), nil) + colIdx := 0 for i := int64(0); i < tbl.NumCols(); i++ { - acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, int(i)) + acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx) require.NoError(t, err) require.NoError(t, acw.Write(ctx)) + colIdx = colIdx + acw.LeafCount() } require.NoError(t, srgw.Close()) require.NoError(t, writer.Close()) @@ -249,10 +251,12 @@ func TestWriteArrowInt96(t *testing.T) { srgw := writer.AppendRowGroup() ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props) + colIdx := 0 for i := int64(0); i < tbl.NumCols(); i++ { - acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, int(i)) + acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx) require.NoError(t, err) require.NoError(t, acw.Write(ctx)) + colIdx += acw.LeafCount() } require.NoError(t, srgw.Close()) require.NoError(t, writer.Close()) @@ -306,11 +310,13 @@ func writeTableToBuffer(t *testing.T, mem memory.Allocator, tbl arrow.Table, row for offset < tbl.NumRows() { sz := utils.Min(rowGroupSize, tbl.NumRows()-offset) srgw := writer.AppendRowGroup() + colIdx := 0 for i := 0; i < int(tbl.NumCols()); i++ { col := tbl.Column(i) - acw, err := pqarrow.NewArrowColumnWriter(col.Data(), offset, sz, manifest, srgw, i) + acw, err := pqarrow.NewArrowColumnWriter(col.Data(), offset, sz, manifest, srgw, colIdx) require.NoError(t, err) require.NoError(t, acw.Write(ctx)) + colIdx = colIdx + acw.LeafCount() } srgw.Close() offset += sz diff --git a/go/parquet/pqarrow/path_builder.go b/go/parquet/pqarrow/path_builder.go index 0f1158bd1e9f0..57a077956edea 100644 --- a/go/parquet/pqarrow/path_builder.go +++ b/go/parquet/pqarrow/path_builder.go @@ -206,7 +206,7 @@ func (n *listNode) fillForLast(rng, childRng *elemRange, ctx *pathWriteCtx) iter fillRepLevels(int(childRng.size()), n.repLevel, ctx) // once we've reached this point the following preconditions should hold: // 1. there are no more repeated path nodes to deal with - // 2. all elements in |range| reperesent contiguous elements in the child + // 2. all elements in |range| represent contiguous elements in the child // array (null values would have shortened the range to ensure all // remaining list elements are present, though they may be empty) // 3. no element of range spans a parent list (intermediate list nodes @@ -225,7 +225,7 @@ func (n *listNode) fillForLast(rng, childRng *elemRange, ctx *pathWriteCtx) iter // this is the start of a new list. we can be sure that it only applies to the // previous list (and doesn't jump to the start of any list further up in nesting - // due to the contraints mentioned earlier) + // due to the constraints mentioned earlier) ctx.AppendRepLevel(n.prevRepLevel) ctx.AppendRepLevels(int(sizeCheck.size())-1, n.repLevel) childRng.end = sizeCheck.end