diff --git a/go/parquet/pqarrow/encode_arrow.go b/go/parquet/pqarrow/encode_arrow.go index 4989837cd03bc..8926d0ba51a07 100644 --- a/go/parquet/pqarrow/encode_arrow.go +++ b/go/parquet/pqarrow/encode_arrow.go @@ -65,25 +65,25 @@ func nullableRoot(manifest *SchemaManifest, field *SchemaField) bool { return nullable } -// ArrowColumnWriter is a convenience object for easily writing arrow data to a specific +// arrowColumnWriter is a convenience object for easily writing arrow data to a specific // set of columns in a parquet file. Since a single arrow array can itself be a nested type // consisting of multiple columns of data, this will write to all of the appropriate leaves in // the parquet file, allowing easy writing of nested columns. -type ArrowColumnWriter struct { +type arrowColumnWriter struct { builders []*multipathLevelBuilder leafCount int colIdx int rgw file.RowGroupWriter } -// NewArrowColumnWriter returns a new writer using the chunked array to determine the number of leaf columns, +// newArrowColumnWriter returns a new writer using the chunked array to determine the number of leaf columns, // and the provided schema manifest to determine the paths for writing the columns. // // Using an arrow column writer is a convenience to avoid having to process the arrow array yourself // and determine the correct definition and repetition levels manually. -func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (ArrowColumnWriter, error) { +func newArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (arrowColumnWriter, error) { if data.Len() == 0 { - return ArrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil + return arrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil } var ( @@ -109,7 +109,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch } if absPos >= int64(data.Len()) { - return ArrowColumnWriter{}, errors.New("cannot write data at offset past end of chunked array") + return arrowColumnWriter{}, errors.New("cannot write data at offset past end of chunked array") } leafCount := calcLeafCount(data.DataType()) @@ -120,7 +120,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch schemaField, err := manifest.GetColumnField(leafColIdx) if err != nil { - return ArrowColumnWriter{}, err + return arrowColumnWriter{}, err } isNullable = nullableRoot(manifest, schemaField) @@ -138,10 +138,10 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch if arrToWrite.Len() > 0 { bldr, err := newMultipathLevelBuilder(arrToWrite, isNullable) if err != nil { - return ArrowColumnWriter{}, nil + return arrowColumnWriter{}, nil } if leafCount != bldr.leafCount() { - return ArrowColumnWriter{}, fmt.Errorf("data type leaf_count != builder leafcount: %d - %d", leafCount, bldr.leafCount()) + return arrowColumnWriter{}, fmt.Errorf("data type leaf_count != builder leafcount: %d - %d", leafCount, bldr.leafCount()) } builders = append(builders, bldr) } @@ -153,14 +153,10 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch values += chunkWriteSize } - return ArrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: leafColIdx}, nil + return arrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: leafColIdx}, nil } -func (acw *ArrowColumnWriter) LeafCount() int { - return acw.leafCount -} - -func (acw *ArrowColumnWriter) Write(ctx context.Context) error { +func (acw *arrowColumnWriter) Write(ctx context.Context) error { arrCtx := arrowCtxFromContext(ctx) for leafIdx := 0; leafIdx < acw.leafCount; leafIdx++ { var ( diff --git a/go/parquet/pqarrow/encode_arrow_test.go b/go/parquet/pqarrow/encode_arrow_test.go index 712a003c63ad6..95ea644dd8013 100644 --- a/go/parquet/pqarrow/encode_arrow_test.go +++ b/go/parquet/pqarrow/encode_arrow_test.go @@ -132,28 +132,24 @@ func TestWriteArrowCols(t *testing.T) { tbl := makeDateTimeTypesTable(mem, false, false) defer tbl.Release() - psc, err := pqarrow.ToParquet(tbl.Schema(), nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))) - require.NoError(t, err) - - manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil) - require.NoError(t, err) - sink := encoding.NewBufferWriter(0, mem) defer sink.Release() - writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_4)))) - srgw := writer.AppendRowGroup() - ctx := pqarrow.NewArrowWriteContext(context.TODO(), nil) + fileWriter, err := pqarrow.NewFileWriter( + tbl.Schema(), + sink, + parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_4)), + pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)), + ) + require.NoError(t, err) - colIdx := 0 + fileWriter.NewRowGroup() for i := int64(0); i < tbl.NumCols(); i++ { - acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx) + colChunk := tbl.Column(int(i)).Data() + err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len())) require.NoError(t, err) - require.NoError(t, acw.Write(ctx)) - colIdx = colIdx + acw.LeafCount() } - require.NoError(t, srgw.Close()) - require.NoError(t, writer.Close()) + require.NoError(t, fileWriter.Close()) expected := makeDateTimeTypesTable(mem, true, false) defer expected.Release() @@ -235,31 +231,24 @@ func TestWriteArrowInt96(t *testing.T) { tbl := makeDateTimeTypesTable(mem, false, false) defer tbl.Release() - props := pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true), pqarrow.WithAllocator(mem)) - - psc, err := pqarrow.ToParquet(tbl.Schema(), nil, props) - require.NoError(t, err) - - manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil) - require.NoError(t, err) - sink := encoding.NewBufferWriter(0, mem) defer sink.Release() - writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(parquet.NewWriterProperties(parquet.WithAllocator(mem)))) - - srgw := writer.AppendRowGroup() - ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props) + fileWriter, err := pqarrow.NewFileWriter( + tbl.Schema(), + sink, + parquet.NewWriterProperties(parquet.WithAllocator(mem)), + pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true), pqarrow.WithAllocator(mem)), + ) + require.NoError(t, err) - colIdx := 0 + fileWriter.NewRowGroup() for i := int64(0); i < tbl.NumCols(); i++ { - acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx) + colChunk := tbl.Column(int(i)).Data() + err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len())) require.NoError(t, err) - require.NoError(t, acw.Write(ctx)) - colIdx += acw.LeafCount() } - require.NoError(t, srgw.Close()) - require.NoError(t, writer.Close()) + require.NoError(t, fileWriter.Close()) expected := makeDateTimeTypesTable(mem, false, false) defer expected.Release() @@ -296,33 +285,28 @@ func TestWriteArrowInt96(t *testing.T) { func writeTableToBuffer(t *testing.T, mem memory.Allocator, tbl arrow.Table, rowGroupSize int64, props pqarrow.ArrowWriterProperties) *memory.Buffer { sink := encoding.NewBufferWriter(0, mem) defer sink.Release() - wrprops := parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0)) - psc, err := pqarrow.ToParquet(tbl.Schema(), wrprops, props) - require.NoError(t, err) - manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil) + fileWriter, err := pqarrow.NewFileWriter( + tbl.Schema(), + sink, + parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0)), + props, + ) require.NoError(t, err) - writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(wrprops)) - ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props) - offset := int64(0) for offset < tbl.NumRows() { sz := utils.Min(rowGroupSize, tbl.NumRows()-offset) - srgw := writer.AppendRowGroup() - colIdx := 0 + fileWriter.NewRowGroup() for i := 0; i < int(tbl.NumCols()); i++ { - col := tbl.Column(i) - acw, err := pqarrow.NewArrowColumnWriter(col.Data(), offset, sz, manifest, srgw, colIdx) + colChunk := tbl.Column(i).Data() + err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len())) require.NoError(t, err) - require.NoError(t, acw.Write(ctx)) - colIdx = colIdx + acw.LeafCount() } - srgw.Close() offset += sz } - writer.Close() + require.NoError(t, fileWriter.Close()) return sink.Finish() } diff --git a/go/parquet/pqarrow/file_writer.go b/go/parquet/pqarrow/file_writer.go index 21f16c0b67938..bc484ba243f87 100644 --- a/go/parquet/pqarrow/file_writer.go +++ b/go/parquet/pqarrow/file_writer.go @@ -305,7 +305,7 @@ func (fw *FileWriter) Close() error { // building of writing columns to a file via arrow data without needing to already have // a record or table. func (fw *FileWriter) WriteColumnChunked(data *arrow.Chunked, offset, size int64) error { - acw, err := NewArrowColumnWriter(data, offset, size, fw.manifest, fw.rgw, fw.colIdx) + acw, err := newArrowColumnWriter(data, offset, size, fw.manifest, fw.rgw, fw.colIdx) if err != nil { return err }