Skip to content

Commit

Permalink
GH-38503: [Go][Parquet] Style improvement for using ArrowColumnWriter (
Browse files Browse the repository at this point in the history
…#38581)

### Rationale for this change

Currently, `ArrowColumnWriter` seems not having bug. But the usage is confusing. For nested type,  `ArrowColumnWriter` should considering the logic below:

```
  /// 0 foo.bar
  ///       foo.bar.baz           0
  ///       foo.bar.baz2          1
  ///   foo.qux                   2
  /// 1 foo2                      3
  /// 2 foo3                      4
```

The left column is the column in root of `arrow::Schema`, the parquet itself only stores Leaf node,
so, the column id for parquet is list at right.

In the `ArrowColumnWriter`, the final argument is the LeafIdx in parquet, so, writer should considering
using `leafIdx`. Also, it need a `LeafCount` API for getting the leaf-count here.

### What changes are included in this PR?

Style enhancement for `LeafCount`, `leafIdx` and usage for `ArrowColumnWriter`

### Are these changes tested?

no

### Are there any user-facing changes?

no

* Closes: #38503

Authored-by: mwish <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
mapleFU authored Nov 15, 2023
1 parent d076c69 commit dfdebdd
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 14 deletions.
12 changes: 6 additions & 6 deletions go/parquet/internal/encoding/levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package encoding
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math/bits"

Expand All @@ -28,7 +29,6 @@ import (
"github.com/apache/arrow/go/v15/parquet"
format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet"
"github.com/apache/arrow/go/v15/parquet/internal/utils"
"golang.org/x/xerrors"
)

// LevelEncoder is for handling the encoding of Definition and Repetition levels
Expand Down Expand Up @@ -194,12 +194,12 @@ func (l *LevelDecoder) SetData(encoding parquet.Encoding, maxLvl int16, nbuffere
switch encoding {
case parquet.Encodings.RLE:
if len(data) < 4 {
return 0, xerrors.New("parquet: received invalid levels (corrupt data page?)")
return 0, errors.New("parquet: received invalid levels (corrupt data page?)")
}

nbytes := int32(binary.LittleEndian.Uint32(data[:4]))
if nbytes < 0 || nbytes > int32(len(data)-4) {
return 0, xerrors.New("parquet: received invalid number of bytes (corrupt data page?)")
return 0, errors.New("parquet: received invalid number of bytes (corrupt data page?)")
}

buf := data[4:]
Expand All @@ -212,12 +212,12 @@ func (l *LevelDecoder) SetData(encoding parquet.Encoding, maxLvl int16, nbuffere
case parquet.Encodings.BitPacked:
nbits, ok := overflow.Mul(nbuffered, l.bitWidth)
if !ok {
return 0, xerrors.New("parquet: number of buffered values too large (corrupt data page?)")
return 0, errors.New("parquet: number of buffered values too large (corrupt data page?)")
}

nbytes := bitutil.BytesForBits(int64(nbits))
if nbytes < 0 || nbytes > int64(len(data)) {
return 0, xerrors.New("parquet: recieved invalid number of bytes (corrupt data page?)")
return 0, errors.New("parquet: received invalid number of bytes (corrupt data page?)")
}
if l.bit == nil {
l.bit = utils.NewBitReader(bytes.NewReader(data))
Expand All @@ -234,7 +234,7 @@ func (l *LevelDecoder) SetData(encoding parquet.Encoding, maxLvl int16, nbuffere
// run length encoding.
func (l *LevelDecoder) SetDataV2(nbytes int32, maxLvl int16, nbuffered int, data []byte) error {
if nbytes < 0 {
return xerrors.New("parquet: invalid page header (corrupt data page?)")
return errors.New("parquet: invalid page header (corrupt data page?)")
}

l.maxLvl = maxLvl
Expand Down
10 changes: 7 additions & 3 deletions go/parquet/pqarrow/encode_arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type ArrowColumnWriter struct {
//
// Using an arrow column writer is a convenience to avoid having to process the arrow array yourself
// and determine the correct definition and repetition levels manually.
func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, col int) (ArrowColumnWriter, error) {
func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (ArrowColumnWriter, error) {
if data.Len() == 0 {
return ArrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil
}
Expand Down Expand Up @@ -118,7 +118,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
// which is the one this instance will start writing for
// colIdx := rgw.CurrentColumn() + 1

schemaField, err := manifest.GetColumnField(col)
schemaField, err := manifest.GetColumnField(leafColIdx)
if err != nil {
return ArrowColumnWriter{}, err
}
Expand Down Expand Up @@ -153,7 +153,11 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
values += chunkWriteSize
}

return ArrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: col}, nil
return ArrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: leafColIdx}, nil
}

func (acw *ArrowColumnWriter) LeafCount() int {
return acw.leafCount
}

func (acw *ArrowColumnWriter) Write(ctx context.Context) error {
Expand Down
12 changes: 9 additions & 3 deletions go/parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,12 @@ func TestWriteArrowCols(t *testing.T) {
srgw := writer.AppendRowGroup()
ctx := pqarrow.NewArrowWriteContext(context.TODO(), nil)

colIdx := 0
for i := int64(0); i < tbl.NumCols(); i++ {
acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, int(i))
acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx)
require.NoError(t, err)
require.NoError(t, acw.Write(ctx))
colIdx = colIdx + acw.LeafCount()
}
require.NoError(t, srgw.Close())
require.NoError(t, writer.Close())
Expand Down Expand Up @@ -249,10 +251,12 @@ func TestWriteArrowInt96(t *testing.T) {
srgw := writer.AppendRowGroup()
ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props)

colIdx := 0
for i := int64(0); i < tbl.NumCols(); i++ {
acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, int(i))
acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx)
require.NoError(t, err)
require.NoError(t, acw.Write(ctx))
colIdx += acw.LeafCount()
}
require.NoError(t, srgw.Close())
require.NoError(t, writer.Close())
Expand Down Expand Up @@ -306,11 +310,13 @@ func writeTableToBuffer(t *testing.T, mem memory.Allocator, tbl arrow.Table, row
for offset < tbl.NumRows() {
sz := utils.Min(rowGroupSize, tbl.NumRows()-offset)
srgw := writer.AppendRowGroup()
colIdx := 0
for i := 0; i < int(tbl.NumCols()); i++ {
col := tbl.Column(i)
acw, err := pqarrow.NewArrowColumnWriter(col.Data(), offset, sz, manifest, srgw, i)
acw, err := pqarrow.NewArrowColumnWriter(col.Data(), offset, sz, manifest, srgw, colIdx)
require.NoError(t, err)
require.NoError(t, acw.Write(ctx))
colIdx = colIdx + acw.LeafCount()
}
srgw.Close()
offset += sz
Expand Down
4 changes: 2 additions & 2 deletions go/parquet/pqarrow/path_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (n *listNode) fillForLast(rng, childRng *elemRange, ctx *pathWriteCtx) iter
fillRepLevels(int(childRng.size()), n.repLevel, ctx)
// once we've reached this point the following preconditions should hold:
// 1. there are no more repeated path nodes to deal with
// 2. all elements in |range| reperesent contiguous elements in the child
// 2. all elements in |range| represent contiguous elements in the child
// array (null values would have shortened the range to ensure all
// remaining list elements are present, though they may be empty)
// 3. no element of range spans a parent list (intermediate list nodes
Expand All @@ -225,7 +225,7 @@ func (n *listNode) fillForLast(rng, childRng *elemRange, ctx *pathWriteCtx) iter

// this is the start of a new list. we can be sure that it only applies to the
// previous list (and doesn't jump to the start of any list further up in nesting
// due to the contraints mentioned earlier)
// due to the constraints mentioned earlier)
ctx.AppendRepLevel(n.prevRepLevel)
ctx.AppendRepLevels(int(sizeCheck.size())-1, n.repLevel)
childRng.end = sizeCheck.end
Expand Down

0 comments on commit dfdebdd

Please sign in to comment.