diff --git a/go/parquet/internal/encoding/levels.go b/go/parquet/internal/encoding/levels.go index caf832059334b..2a6dc24933714 100644 --- a/go/parquet/internal/encoding/levels.go +++ b/go/parquet/internal/encoding/levels.go @@ -19,6 +19,7 @@ package encoding import ( "bytes" "encoding/binary" + "errors" "fmt" "math/bits" @@ -28,7 +29,6 @@ import ( "github.com/apache/arrow/go/v15/parquet" format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet" "github.com/apache/arrow/go/v15/parquet/internal/utils" - "golang.org/x/xerrors" ) // LevelEncoder is for handling the encoding of Definition and Repetition levels @@ -194,12 +194,12 @@ func (l *LevelDecoder) SetData(encoding parquet.Encoding, maxLvl int16, nbuffere switch encoding { case parquet.Encodings.RLE: if len(data) < 4 { - return 0, xerrors.New("parquet: received invalid levels (corrupt data page?)") + return 0, errors.New("parquet: received invalid levels (corrupt data page?)") } nbytes := int32(binary.LittleEndian.Uint32(data[:4])) if nbytes < 0 || nbytes > int32(len(data)-4) { - return 0, xerrors.New("parquet: received invalid number of bytes (corrupt data page?)") + return 0, errors.New("parquet: received invalid number of bytes (corrupt data page?)") } buf := data[4:] @@ -212,12 +212,12 @@ func (l *LevelDecoder) SetData(encoding parquet.Encoding, maxLvl int16, nbuffere case parquet.Encodings.BitPacked: nbits, ok := overflow.Mul(nbuffered, l.bitWidth) if !ok { - return 0, xerrors.New("parquet: number of buffered values too large (corrupt data page?)") + return 0, errors.New("parquet: number of buffered values too large (corrupt data page?)") } nbytes := bitutil.BytesForBits(int64(nbits)) if nbytes < 0 || nbytes > int64(len(data)) { - return 0, xerrors.New("parquet: recieved invalid number of bytes (corrupt data page?)") + return 0, errors.New("parquet: received invalid number of bytes (corrupt data page?)") } if l.bit == nil { l.bit = utils.NewBitReader(bytes.NewReader(data)) @@ -234,7 +234,7 @@ func (l *LevelDecoder) SetData(encoding parquet.Encoding, maxLvl int16, nbuffere // run length encoding. func (l *LevelDecoder) SetDataV2(nbytes int32, maxLvl int16, nbuffered int, data []byte) error { if nbytes < 0 { - return xerrors.New("parquet: invalid page header (corrupt data page?)") + return errors.New("parquet: invalid page header (corrupt data page?)") } l.maxLvl = maxLvl diff --git a/go/parquet/pqarrow/encode_arrow.go b/go/parquet/pqarrow/encode_arrow.go index 1855d3625adb7..4989837cd03bc 100644 --- a/go/parquet/pqarrow/encode_arrow.go +++ b/go/parquet/pqarrow/encode_arrow.go @@ -81,7 +81,7 @@ type ArrowColumnWriter struct { // // Using an arrow column writer is a convenience to avoid having to process the arrow array yourself // and determine the correct definition and repetition levels manually. -func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, col int) (ArrowColumnWriter, error) { +func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (ArrowColumnWriter, error) { if data.Len() == 0 { return ArrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil } @@ -118,7 +118,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch // which is the one this instance will start writing for // colIdx := rgw.CurrentColumn() + 1 - schemaField, err := manifest.GetColumnField(col) + schemaField, err := manifest.GetColumnField(leafColIdx) if err != nil { return ArrowColumnWriter{}, err } @@ -153,7 +153,11 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch values += chunkWriteSize } - return ArrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: col}, nil + return ArrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: leafColIdx}, nil +} + +func (acw *ArrowColumnWriter) LeafCount() int { + return acw.leafCount } func (acw *ArrowColumnWriter) Write(ctx context.Context) error { diff --git a/go/parquet/pqarrow/encode_arrow_test.go b/go/parquet/pqarrow/encode_arrow_test.go index d588aff701f3d..712a003c63ad6 100644 --- a/go/parquet/pqarrow/encode_arrow_test.go +++ b/go/parquet/pqarrow/encode_arrow_test.go @@ -145,10 +145,12 @@ func TestWriteArrowCols(t *testing.T) { srgw := writer.AppendRowGroup() ctx := pqarrow.NewArrowWriteContext(context.TODO(), nil) + colIdx := 0 for i := int64(0); i < tbl.NumCols(); i++ { - acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, int(i)) + acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx) require.NoError(t, err) require.NoError(t, acw.Write(ctx)) + colIdx = colIdx + acw.LeafCount() } require.NoError(t, srgw.Close()) require.NoError(t, writer.Close()) @@ -249,10 +251,12 @@ func TestWriteArrowInt96(t *testing.T) { srgw := writer.AppendRowGroup() ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props) + colIdx := 0 for i := int64(0); i < tbl.NumCols(); i++ { - acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, int(i)) + acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx) require.NoError(t, err) require.NoError(t, acw.Write(ctx)) + colIdx += acw.LeafCount() } require.NoError(t, srgw.Close()) require.NoError(t, writer.Close()) @@ -306,11 +310,13 @@ func writeTableToBuffer(t *testing.T, mem memory.Allocator, tbl arrow.Table, row for offset < tbl.NumRows() { sz := utils.Min(rowGroupSize, tbl.NumRows()-offset) srgw := writer.AppendRowGroup() + colIdx := 0 for i := 0; i < int(tbl.NumCols()); i++ { col := tbl.Column(i) - acw, err := pqarrow.NewArrowColumnWriter(col.Data(), offset, sz, manifest, srgw, i) + acw, err := pqarrow.NewArrowColumnWriter(col.Data(), offset, sz, manifest, srgw, colIdx) require.NoError(t, err) require.NoError(t, acw.Write(ctx)) + colIdx = colIdx + acw.LeafCount() } srgw.Close() offset += sz diff --git a/go/parquet/pqarrow/path_builder.go b/go/parquet/pqarrow/path_builder.go index 0f1158bd1e9f0..57a077956edea 100644 --- a/go/parquet/pqarrow/path_builder.go +++ b/go/parquet/pqarrow/path_builder.go @@ -206,7 +206,7 @@ func (n *listNode) fillForLast(rng, childRng *elemRange, ctx *pathWriteCtx) iter fillRepLevels(int(childRng.size()), n.repLevel, ctx) // once we've reached this point the following preconditions should hold: // 1. there are no more repeated path nodes to deal with - // 2. all elements in |range| reperesent contiguous elements in the child + // 2. all elements in |range| represent contiguous elements in the child // array (null values would have shortened the range to ensure all // remaining list elements are present, though they may be empty) // 3. no element of range spans a parent list (intermediate list nodes @@ -225,7 +225,7 @@ func (n *listNode) fillForLast(rng, childRng *elemRange, ctx *pathWriteCtx) iter // this is the start of a new list. we can be sure that it only applies to the // previous list (and doesn't jump to the start of any list further up in nesting - // due to the contraints mentioned earlier) + // due to the constraints mentioned earlier) ctx.AppendRepLevel(n.prevRepLevel) ctx.AppendRepLevels(int(sizeCheck.size())-1, n.repLevel) childRng.end = sizeCheck.end