From 198a5ad35c9ada0247cd33c3718fc1065b0e370d Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Thu, 18 May 2023 10:00:53 -0400 Subject: [PATCH] util/parquet: support tuples This change adds support for writing tuples. Implementation details below. The standard way to write a tuple in parquet is to use a group: ``` message schema { -- toplevel schema optional group a (LIST) { optional T1 element; -- physical column for the first field ... optional Tn element; -- physical column for the nth field } } ``` Because parquet has a very strict format, it does not write such groups as one column with all the fields adjacent to each other. Instead, it writes each field in the tuple as its own column. This 1:N mapping from CRDB datum to physical column in parquet violates the assumption used in this library that the mapping is 1:1. This change aims to update the library to break that assumption. Firstly, there is now a clear distiction between a "datum column" and a "physical column". Also, the `Writer` is updated to be able to write to multiple physical columns for a given datum, and the reader is updated to "squash" physical columns into single tuple datums if needed. Finally, randomized testing and benchmarking is extended to cover tuples. Informs: #99028 Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071 Release note: None --- pkg/ccl/changefeedccl/testfeed_test.go | 20 +- pkg/util/parquet/BUILD.bazel | 1 + pkg/util/parquet/schema.go | 188 ++++++++--- pkg/util/parquet/testutils.go | 432 +++++++++++++++++-------- pkg/util/parquet/write_functions.go | 91 +++++- pkg/util/parquet/writer.go | 69 ++-- pkg/util/parquet/writer_bench_test.go | 8 +- pkg/util/parquet/writer_test.go | 77 ++++- 8 files changed, 662 insertions(+), 224 deletions(-) diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 6685dab5ccf6..e22164d1e5a9 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -1232,29 +1232,23 @@ func extractKeyFromJSONValue(isBare bool, wrapped []byte) (key []byte, value []b func (c *cloudFeed) appendParquetTestFeedMessages( path string, topic string, envelopeType changefeedbase.EnvelopeType, ) (err error) { - meta, datums, closeReader, err := parquet.ReadFile(path) + meta, datums, err := parquet.ReadFile(path) if err != nil { return err } - defer func() { - closeErr := closeReader() - if closeErr != nil { - err = errors.CombineErrors(err, closeErr) - } - }() - primaryKeyColumnsString := meta.KeyValueMetadata().FindValue("keyCols") - if primaryKeyColumnsString == nil { + primaryKeyColumnsString, ok := meta.MetaFields["keyCols"] + if !ok { return errors.Errorf("could not find primary key column names in parquet metadata") } - columnsNamesString := meta.KeyValueMetadata().FindValue("allCols") - if columnsNamesString == nil { + columnsNamesString, ok := meta.MetaFields["allCols"] + if !ok { return errors.Errorf("could not find column names in parquet metadata") } - primaryKeys := strings.Split(*primaryKeyColumnsString, ",") - columns := strings.Split(*columnsNamesString, ",") + primaryKeys := strings.Split(primaryKeyColumnsString, ",") + columns := strings.Split(columnsNamesString, ",") columnNameSet := make(map[string]struct{}) primaryKeyColumnSet := make(map[string]struct{}) diff --git a/pkg/util/parquet/BUILD.bazel b/pkg/util/parquet/BUILD.bazel index ced63863101f..60fddac8ef24 100644 --- a/pkg/util/parquet/BUILD.bazel +++ b/pkg/util/parquet/BUILD.bazel @@ -53,6 +53,7 @@ go_test( "//pkg/util/bitarray", "//pkg/util/duration", "//pkg/util/ipaddr", + "//pkg/util/json", "//pkg/util/timeutil", "//pkg/util/timeutil/pgdate", "//pkg/util/uuid", diff --git a/pkg/util/parquet/schema.go b/pkg/util/parquet/schema.go index 76721fe6d8cd..5ffb4e650b5a 100644 --- a/pkg/util/parquet/schema.go +++ b/pkg/util/parquet/schema.go @@ -11,6 +11,7 @@ package parquet import ( + "fmt" "math" "github.com/apache/arrow/go/v11/parquet" @@ -23,63 +24,81 @@ import ( "github.com/lib/pq/oid" ) -// Setting parquet.Repetitions.Optional makes parquet a column nullable. When -// writing a datum, we will always specify a definition level to indicate if the -// datum is null or not. See comments on nonNilDefLevel or nilDefLevel for more info. +// Setting parquet.Repetitions.Optional makes a column nullable. When writing a +// datum, we will always specify a definition level to indicate if the datum is +// null or not. See comments on nonNilDefLevel or nilDefLevel for more info. var defaultRepetitions = parquet.Repetitions.Optional -// A schema field is an internal identifier for schema nodes used by the parquet library. -// A value of -1 will let the library auto-assign values. This does not affect reading -// or writing parquet files. +// A schema field is an internal identifier for schema nodes used by the parquet +// library. A value of -1 will let the library auto-assign values. This does not +// affect reading or writing parquet files. const defaultSchemaFieldID = int32(-1) // The parquet library utilizes a type length of -1 for all types // except for the type parquet.FixedLenByteArray, in which case the type -// length is the length of the array. See comment on (*schema.PrimitiveNode).TypeLength() +// length is the length of the array. See comment on +// (*schema.PrimitiveNode).TypeLength() const defaultTypeLength = -1 -// A column stores column metadata. -type column struct { +// A datumColumn stores metadata for a CRDB datum column. +type datumColumn struct { node schema.Node colWriter colWriter typ *types.T + + // numPhysicalCols is the number of columns which will be created in a + // parquet file for this datumColumn. physicalColsStartIdx is the index of + // the first of these columns. + // + // Typically, there is a 1:1 mapping between columns and physical columns in + // a file. However, if there is at least one tuple in the schema, then the + // mapping is not 1:1 since each field in a tuple gets its own physical + // column. + // + // Ex. For the datumColumns [i int, j (int, int), k (int, int)] where j and + // k are tuples, the physical columns generated will be [i int, j.0 int, + // j.1, int, k.0 int, k.1 int]. The start index of datumColumn j is 1. The + // start index of datumColumn k is 3. + numPhysicalCols int + physicalColsStartIdx int } // A SchemaDefinition stores a parquet schema. type SchemaDefinition struct { - // The index of a column when reading or writing parquet files - // will correspond to the column's index in this array. - cols []column + cols []datumColumn // The schema is a root node with terminal children nodes which represent - // primitive types such as int or bool. The individual columns can be - // traversed using schema.Column(i). The children are indexed from [0, - // len(cols)). + // primitive types such as int or bool. schema *schema.Schema } // NewSchema generates a SchemaDefinition. // -// Columns in the returned SchemaDefinition will match -// the order they appear in the supplied parameters. +// Columns in the returned SchemaDefinition will match the order they appear in +// the supplied parameters. func NewSchema(columnNames []string, columnTypes []*types.T) (*SchemaDefinition, error) { if len(columnTypes) != len(columnNames) { return nil, errors.AssertionFailedf("the number of column names must match the number of column types") } - cols := make([]column, 0) + cols := make([]datumColumn, 0) fields := make([]schema.Node, 0) + physicalColStartIdx := 0 for i := 0; i < len(columnNames); i++ { if columnTypes[i] == nil { return nil, errors.AssertionFailedf("column %s missing type information", columnNames[i]) } - parquetCol, err := makeColumn(columnNames[i], columnTypes[i], defaultRepetitions) + column, err := makeColumn(columnNames[i], columnTypes[i], defaultRepetitions) if err != nil { return nil, err } - cols = append(cols, parquetCol) - fields = append(fields, parquetCol.node) + + column.physicalColsStartIdx = physicalColStartIdx + physicalColStartIdx += column.numPhysicalCols + + cols = append(cols, column) + fields = append(fields, column.node) } groupNode, err := schema.NewGroupNode("schema", parquet.Repetitions.Required, @@ -93,23 +112,22 @@ func NewSchema(columnNames []string, columnTypes []*types.T) (*SchemaDefinition, }, nil } -// makeColumn constructs a column. -func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (column, error) { - result := column{typ: typ} +// makeColumn constructs a datumColumn. It does not populate +// datumColumn.physicalColsStartIdx. +func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (datumColumn, error) { + result := datumColumn{typ: typ, numPhysicalCols: 1} var err error switch typ.Family() { case types.BoolFamily: result.node = schema.NewBooleanNode(colName, repetitions, defaultSchemaFieldID) result.colWriter = scalarWriter(writeBool) - result.typ = types.Bool return result, nil case types.StringFamily: result.node, err = schema.NewPrimitiveNodeLogical(colName, repetitions, schema.StringLogicalType{}, parquet.Types.ByteArray, defaultTypeLength, defaultSchemaFieldID) - if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeString) return result, nil @@ -121,7 +139,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c parquet.Types.Int64, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeInt64) return result, nil @@ -150,7 +168,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c scale), parquet.Types.ByteArray, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeDecimal) return result, nil @@ -159,7 +177,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, schema.UUIDLogicalType{}, parquet.Types.FixedLenByteArray, uuid.Size, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeUUID) return result, nil @@ -170,7 +188,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, schema.StringLogicalType{}, parquet.Types.ByteArray, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeTimestamp) return result, nil @@ -181,7 +199,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, schema.StringLogicalType{}, parquet.Types.ByteArray, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeTimestampTZ) return result, nil @@ -190,7 +208,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, schema.StringLogicalType{}, parquet.Types.ByteArray, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeINet) return result, nil @@ -199,7 +217,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, schema.JSONLogicalType{}, parquet.Types.ByteArray, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeJSON) return result, nil @@ -208,7 +226,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, parquet.Types.ByteArray, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeBit) return result, nil @@ -217,7 +235,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, parquet.Types.ByteArray, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeBytes) return result, nil @@ -226,7 +244,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, schema.EnumLogicalType{}, parquet.Types.ByteArray, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeEnum) return result, nil @@ -237,7 +255,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, schema.StringLogicalType{}, parquet.Types.ByteArray, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeDate) return result, nil @@ -246,7 +264,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, schema.StringLogicalType{}, parquet.Types.ByteArray, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeBox2D) return result, nil @@ -255,7 +273,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, parquet.Types.ByteArray, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeGeography) return result, nil @@ -264,7 +282,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, parquet.Types.ByteArray, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeGeometry) return result, nil @@ -273,7 +291,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, schema.StringLogicalType{}, parquet.Types.ByteArray, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeInterval) return result, nil @@ -284,7 +302,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, schema.NewTimeLogicalType(true, schema.TimeUnitMicros), parquet.Types.Int64, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeTime) return result, nil @@ -295,7 +313,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, schema.StringLogicalType{}, parquet.Types.ByteArray, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeTimeTZ) return result, nil @@ -305,7 +323,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, parquet.Types.Float, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeFloat32) return result, nil @@ -314,7 +332,7 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, parquet.Types.Double, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeFloat64) return result, nil @@ -327,14 +345,14 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c repetitions, schema.StringLogicalType{}, parquet.Types.ByteArray, defaultTypeLength, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } result.colWriter = scalarWriter(writeCollatedString) return result, nil case types.ArrayFamily: // Arrays for type T are represented by the following: // message schema { -- toplevel schema - // optional group a (LIST) { -- list column + // optional group a (LIST) { -- column for this array // repeated group list { // optional T element; // } @@ -348,30 +366,96 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c return result, pgerror.Newf(pgcode.FeatureNotSupported, "parquet writer does not support nested arrays") } + if typ.ArrayContents().Family() == types.TupleFamily { + return result, pgerror.Newf(pgcode.FeatureNotSupported, + "parquet writer does not support tuples in arrays") + } + elementCol, err := makeColumn("element", typ.ArrayContents(), parquet.Repetitions.Optional) if err != nil { - return result, err + return datumColumn{}, err } + innerListFields := []schema.Node{elementCol.node} innerListNode, err := schema.NewGroupNode("list", parquet.Repetitions.Repeated, innerListFields, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } + outerListFields := []schema.Node{innerListNode} + result.node, err = schema.NewGroupNodeLogical(colName, parquet.Repetitions.Optional, outerListFields, schema.ListLogicalType{}, defaultSchemaFieldID) if err != nil { - return result, err + return datumColumn{}, err } + // NB: Because we assert that the inner elements are not arrays or tuples, + // it is guaranteed that they are scalar. scalarColWriter, ok := elementCol.colWriter.(scalarWriter) if !ok { - return result, errors.AssertionFailedf("expected scalar column writer") + return datumColumn{}, errors.AssertionFailedf("expected scalar column writer") } result.colWriter = arrayWriter(scalarColWriter) result.typ = elementCol.typ return result, nil + case types.TupleFamily: + // Tuples for types T1...Tn are represented by the following: + // message schema { -- toplevel schema + // optional group a (LIST) { + // optional T1 element; -- physical column for the first field + // ... + // optional Tn element; -- physical column for the nth field + // } + // } + // There is more info about encoding groups here: + // https://arrow.apache.org/blog/2022/10/08/arrow-parquet-encoding-part-2/ + contents := typ.TupleContents() + if len(contents) == 0 { + return result, pgerror.Newf(pgcode.FeatureNotSupported, + "parquet writer does not support empty tuples") + } + labels := typ.TupleLabels() // may be nil. + nodes := make([]schema.Node, 0, len(contents)) + colWriters := make([]writeFn, 0, len(contents)) + for i, innerTyp := range contents { + if innerTyp.Family() == types.ArrayFamily { + return result, pgerror.Newf(pgcode.FeatureNotSupported, + "parquet writer does not support arrays in tuples") + } + if innerTyp.Family() == types.TupleFamily { + return result, pgerror.Newf(pgcode.FeatureNotSupported, + "parquet writer does not support nested tuples") + } + var label string + if labels == nil { + label = fmt.Sprintf("%s_col%d", colName, i) + } else { + label = labels[i] + } + elementCol, err := makeColumn(label, innerTyp, defaultRepetitions) + if err != nil { + return datumColumn{}, err + } + nodes = append(nodes, elementCol.node) + // NB: Because we assert that the inner elements are not arrays or tuples, + // it is guaranteed that they are scalar. + wFn, ok := elementCol.colWriter.(scalarWriter) + if !ok { + return datumColumn{}, errors.AssertionFailedf("expected scalar column writer") + } + colWriters = append(colWriters, writeFn(wFn)) + } + + result.colWriter = tupleWriter(colWriters) + result.node, err = schema.NewGroupNode(colName, parquet.Repetitions.Optional, + nodes, defaultSchemaFieldID) + result.numPhysicalCols = len(colWriters) + if err != nil { + return datumColumn{}, err + } + return result, nil default: return result, pgerror.Newf(pgcode.FeatureNotSupported, "parquet writer does not support the type family %v", typ.Family()) diff --git a/pkg/util/parquet/testutils.go b/pkg/util/parquet/testutils.go index d6877f3784ac..bbc7c38d4489 100644 --- a/pkg/util/parquet/testutils.go +++ b/pkg/util/parquet/testutils.go @@ -11,6 +11,7 @@ package parquet import ( + "bytes" "fmt" "io" "math" @@ -21,7 +22,6 @@ import ( "github.com/apache/arrow/go/v11/parquet" "github.com/apache/arrow/go/v11/parquet/file" - "github.com/apache/arrow/go/v11/parquet/metadata" "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -45,25 +45,22 @@ func NewWriterWithReaderMeta( func ReadFileAndVerifyDatums( t *testing.T, parquetFile string, - numRows int, - numCols int, + expectedNumRows int, + expectedNumCols int, writer *Writer, writtenDatums [][]tree.Datum, ) { - meta, readDatums, closeReader, err := ReadFile(parquetFile) - defer func() { - require.NoError(t, closeReader()) - }() + meta, readDatums, err := ReadFile(parquetFile) require.NoError(t, err) - require.Equal(t, int64(numRows), meta.GetNumRows()) - require.Equal(t, numCols, meta.Schema.NumColumns()) + require.Equal(t, expectedNumRows, meta.NumRows) + require.Equal(t, expectedNumCols, meta.NumCols) - numRowGroups := int(math.Ceil(float64(numRows) / float64(writer.cfg.maxRowGroupLength))) - require.EqualValues(t, numRowGroups, len(meta.GetRowGroups())) + expectedNumRowGroups := int(math.Ceil(float64(expectedNumRows) / float64(writer.cfg.maxRowGroupLength))) + require.EqualValues(t, expectedNumRowGroups, meta.NumRowGroups) - for i := 0; i < numRows; i++ { - for j := 0; j < numCols; j++ { + for i := 0; i < expectedNumRows; i++ { + for j := 0; j < expectedNumCols; j++ { ValidateDatum(t, writtenDatums[i][j], readDatums[i][j]) } } @@ -74,47 +71,52 @@ func ReadFileAndVerifyDatums( // To use this function, the Writer must be configured to write CRDB-specific // metadata for the reader. See NewWriterWithReaderMeta. // -// close() should be called to release the underlying reader once the returned -// metadata is not required anymore. The returned metadata should not be used -// after close() is called. -// // NB: The returned datums may not be hydrated or identical to the ones // which were written. See comment on ValidateDatum for more info. -func ReadFile( - parquetFile string, -) (meta *metadata.FileMetaData, datums [][]tree.Datum, close func() error, err error) { +func ReadFile(parquetFile string) (meta ReadDatumsMetadata, datums [][]tree.Datum, err error) { f, err := os.Open(parquetFile) if err != nil { - return nil, nil, nil, err + return ReadDatumsMetadata{}, nil, err } reader, err := file.NewParquetReader(f) if err != nil { - return nil, nil, nil, err + return ReadDatumsMetadata{}, nil, err } + defer func() { + if closeErr := reader.Close(); closeErr != nil { + err = errors.CombineErrors(err, closeErr) + } + }() var readDatums [][]tree.Datum typFamiliesMeta := reader.MetaData().KeyValueMetadata().FindValue(typeFamilyMetaKey) if typFamiliesMeta == nil { - return nil, nil, nil, + return ReadDatumsMetadata{}, nil, errors.AssertionFailedf("missing type family metadata. ensure the writer is configured" + " to write reader metadata. see NewWriterWithReaderMeta()") } typFamilies, err := deserializeIntArray(*typFamiliesMeta) if err != nil { - return nil, nil, nil, err + return ReadDatumsMetadata{}, nil, err + } + + tupleColumnsMeta := reader.MetaData().KeyValueMetadata().FindValue(tupleIndexesMetaKey) + tupleColumns, err := deserialize2DIntArray(*tupleColumnsMeta) + if err != nil { + return ReadDatumsMetadata{}, nil, err } typOidsMeta := reader.MetaData().KeyValueMetadata().FindValue(typeOidMetaKey) if typOidsMeta == nil { - return nil, nil, nil, + return ReadDatumsMetadata{}, nil, errors.AssertionFailedf("missing type oid metadata. ensure the writer is configured" + " to write reader metadata. see NewWriterWithReaderMeta()") } typOids, err := deserializeIntArray(*typOidsMeta) if err != nil { - return nil, nil, nil, err + return ReadDatumsMetadata{}, nil, err } startingRowIdx := 0 @@ -128,113 +130,150 @@ func ReadFile( for colIdx := 0; colIdx < rgr.NumColumns(); colIdx++ { col, err := rgr.Column(colIdx) if err != nil { - return nil, nil, nil, err + return ReadDatumsMetadata{}, nil, err } dec, err := decoderFromFamilyAndType(oid.Oid(typOids[colIdx]), types.Family(typFamilies[colIdx])) if err != nil { - return nil, nil, nil, err + return ReadDatumsMetadata{}, nil, err } - // Based on how we define schemas, we can detect an array by seeing if the - // primitive col reader has a max repetition level of 1. See comments above - // arrayEntryRepLevel for more info. - isArray := col.Descriptor().MaxRepetitionLevel() == 1 + // Based on how we define the schemas for these columns, we can determine if they are arrays or + // part of tuples. See comments above arrayEntryNonNilDefLevel and tupleFieldNonNilDefLevel for + // more info. + isArray := col.Descriptor().MaxDefinitionLevel() == 3 + isTuple := col.Descriptor().MaxDefinitionLevel() == 2 - switch col.Type() { - case parquet.Types.Boolean: - colDatums, read, err := readBatch(col, make([]bool, 1), dec, isArray) - if err != nil { - return nil, nil, nil, err - } - if read != rowsInRowGroup { - return nil, nil, nil, - errors.AssertionFailedf("expected to read %d values but found %d", rowsInRowGroup, read) - } - decodeValuesIntoDatumsHelper(colDatums, readDatums, colIdx, startingRowIdx) - case parquet.Types.Int32: - colDatums, read, err := readBatch(col, make([]int32, 1), dec, isArray) - if err != nil { - return nil, nil, nil, err - } - if read != rowsInRowGroup { - return nil, nil, nil, - errors.AssertionFailedf("expected to read %d values but found %d", rowsInRowGroup, read) - } - decodeValuesIntoDatumsHelper(colDatums, readDatums, colIdx, startingRowIdx) - case parquet.Types.Int64: - colDatums, read, err := readBatch(col, make([]int64, 1), dec, isArray) - if err != nil { - return nil, nil, nil, err - } - if read != rowsInRowGroup { - return nil, nil, nil, - errors.AssertionFailedf("expected to read %d values but found %d", rowsInRowGroup, read) - } - decodeValuesIntoDatumsHelper(colDatums, readDatums, colIdx, startingRowIdx) - case parquet.Types.Int96: - panic("unimplemented") - case parquet.Types.Float: - arrs, read, err := readBatch(col, make([]float32, 1), dec, isArray) - if err != nil { - return nil, nil, nil, err - } - if read != rowsInRowGroup { - return nil, nil, nil, - errors.AssertionFailedf("expected to read %d values but found %d", rowsInRowGroup, read) - } - decodeValuesIntoDatumsHelper(arrs, readDatums, colIdx, startingRowIdx) - case parquet.Types.Double: - arrs, read, err := readBatch(col, make([]float64, 1), dec, isArray) - if err != nil { - return nil, nil, nil, err - } - if read != rowsInRowGroup { - return nil, nil, nil, - errors.AssertionFailedf("expected to read %d values but found %d", rowsInRowGroup, read) - } - decodeValuesIntoDatumsHelper(arrs, readDatums, colIdx, startingRowIdx) - case parquet.Types.ByteArray: - colDatums, read, err := readBatch(col, make([]parquet.ByteArray, 1), dec, isArray) - if err != nil { - return nil, nil, nil, err - } - if read != rowsInRowGroup { - return nil, nil, nil, - errors.AssertionFailedf("expected to read %d values but found %d", rowsInRowGroup, read) - } - decodeValuesIntoDatumsHelper(colDatums, readDatums, colIdx, startingRowIdx) - case parquet.Types.FixedLenByteArray: - colDatums, read, err := readBatch(col, make([]parquet.FixedLenByteArray, 1), dec, isArray) - if err != nil { - return nil, nil, nil, err - } - if read != rowsInRowGroup { - return nil, nil, nil, - errors.AssertionFailedf("expected to read %d values but found %d", rowsInRowGroup, read) - } - decodeValuesIntoDatumsHelper(colDatums, readDatums, colIdx, startingRowIdx) + datumsForColInRowGroup, err := readColInRowGroup(col, dec, rowsInRowGroup, isArray, isTuple) + if err != nil { + return ReadDatumsMetadata{}, nil, err } + decodeValuesIntoDatumsHelper(datumsForColInRowGroup, readDatums, colIdx, startingRowIdx) } startingRowIdx += int(rowsInRowGroup) } - // Since reader.MetaData() is being returned, we do not close the reader. - // This is defensive - we should not assume any method or data on the reader - // is safe to read once it is closed. - return reader.MetaData(), readDatums, reader.Close, nil + + for i := 0; i < len(readDatums); i++ { + readDatums[i] = squashTuples(readDatums[i], tupleColumns) + } + + return makeDatumMeta(reader, readDatums), readDatums, nil +} + +// ReadDatumsMetadata contains metadata from the parquet file which was read. +type ReadDatumsMetadata struct { + // MetaFields is the arbitrary metadata read from the file. + MetaFields map[string]string + // NumRows is the number of rows read. + NumRows int + // NumCols is the number of datum cols read. Will be 0 if NumRows + // is 0. + NumCols int + // NumRowGroups is the number of row groups in the file. + NumRowGroups int +} + +func makeDatumMeta(reader *file.Reader, readDatums [][]tree.Datum) ReadDatumsMetadata { + // Copy the reader metadata into a new map since this metadata can be used + // after the metadata is closed. This is defensive - we should not assume + // any method or data on the reader is safe to use once it is closed. + readerMeta := reader.MetaData().KeyValueMetadata() + kvMeta := make(map[string]string) + for _, key := range reader.MetaData().KeyValueMetadata().Keys() { + val := readerMeta.FindValue(key) + if val != nil { + kvMeta[key] = *val + } + } + + reader.MetaData().Schema.NumColumns() + + // NB: The number of physical columns in the file may be more than + // the number of datums in each row because duple datums get a physical + // column for each field. + // We do not return reader.MetaData().Schema.NumColumns() because that + // returns the number of physical columns. + numCols := 0 + if len(readDatums) > 0 { + numCols = len(readDatums[0]) + } + return ReadDatumsMetadata{ + MetaFields: kvMeta, + NumRows: len(readDatums), + NumCols: numCols, + NumRowGroups: reader.NumRowGroups(), + } +} + +func readColInRowGroup( + col file.ColumnChunkReader, dec decoder, rowsInRowGroup int64, isArray bool, isTuple bool, +) ([]tree.Datum, error) { + switch col.Type() { + case parquet.Types.Boolean: + colDatums, err := readRowGroup(col, make([]bool, 1), dec, rowsInRowGroup, isArray, isTuple) + if err != nil { + return nil, err + } + return colDatums, nil + case parquet.Types.Int32: + colDatums, err := readRowGroup(col, make([]int32, 1), dec, rowsInRowGroup, isArray, isTuple) + if err != nil { + return nil, err + } + return colDatums, nil + case parquet.Types.Int64: + colDatums, err := readRowGroup(col, make([]int64, 1), dec, rowsInRowGroup, isArray, isTuple) + if err != nil { + return nil, err + } + return colDatums, nil + case parquet.Types.Int96: + panic("unimplemented") + case parquet.Types.Float: + colDatums, err := readRowGroup(col, make([]float32, 1), dec, rowsInRowGroup, isArray, isTuple) + if err != nil { + return nil, err + } + return colDatums, nil + case parquet.Types.Double: + colDatums, err := readRowGroup(col, make([]float64, 1), dec, rowsInRowGroup, isArray, isTuple) + if err != nil { + return nil, err + } + return colDatums, nil + case parquet.Types.ByteArray: + colDatums, err := readRowGroup(col, make([]parquet.ByteArray, 1), dec, rowsInRowGroup, isArray, isTuple) + if err != nil { + return nil, err + } + return colDatums, nil + case parquet.Types.FixedLenByteArray: + colDatums, err := readRowGroup(col, make([]parquet.FixedLenByteArray, 1), dec, rowsInRowGroup, isArray, isTuple) + if err != nil { + return nil, err + } + return colDatums, nil + default: + return nil, errors.AssertionFailedf("unexpected type: %s", col.Type()) + } } type batchReader[T parquetDatatypes] interface { ReadBatch(batchSize int64, values []T, defLvls []int16, repLvls []int16) (total int64, valuesRead int, err error) } -// readBatch reads all the datums in a row group for a column. -func readBatch[T parquetDatatypes]( - r file.ColumnChunkReader, valueAlloc []T, dec decoder, isArray bool, -) (tree.Datums, int64, error) { +// readRowGroup reads all the datums in a row group for a physical column. +func readRowGroup[T parquetDatatypes]( + r file.ColumnChunkReader, + valueAlloc []T, + dec decoder, + expectedRowCount int64, + isArray bool, + isTuple bool, +) (tree.Datums, error) { br, ok := r.(batchReader[T]) if !ok { - return nil, 0, errors.AssertionFailedf("expected batchReader for type %T, but found %T instead", valueAlloc, r) + return nil, errors.AssertionFailedf("expected batchReader for type %T, but found %T instead", valueAlloc, r) } result := make([]tree.Datum, 0) @@ -244,7 +283,7 @@ func readBatch[T parquetDatatypes]( for { numRowsRead, _, err := br.ReadBatch(1, valueAlloc, defLevels[:], repLevels[:]) if err != nil { - return nil, 0, err + return nil, err } if numRowsRead == 0 { break @@ -275,9 +314,25 @@ func readBatch[T parquetDatatypes]( // Deflevel 3 represents a non-null datum in an array. d, err := decode(dec, valueAlloc[0]) if err != nil { - return nil, 0, err + return nil, err } currentArrayDatum.Array = append(currentArrayDatum.Array, d) + } else if isTuple { + // Deflevel 0 represents a null tuple. + // Deflevel 1 represents a null value in a non null tuple. + // Deflevel 2 represents a non-null value in a non-null tuple. + switch defLevels[0] { + case 0: + result = append(result, dNullTuple) + case 1: + result = append(result, tree.DNull) + case 2: + d, err := decode(dec, valueAlloc[0]) + if err != nil { + return nil, err + } + result = append(result, d) + } } else { // Deflevel 0 represents a null value // Deflevel 1 represents a non-null value @@ -285,14 +340,17 @@ func readBatch[T parquetDatatypes]( if defLevels[0] != 0 { d, err = decode(dec, valueAlloc[0]) if err != nil { - return nil, 0, err + return nil, err } } result = append(result, d) } } - - return result, int64(len(result)), nil + if int64(len(result)) != expectedRowCount { + return nil, errors.AssertionFailedf( + "expected to read %d rows in row group, found %d", expectedRowCount, int64(len(result))) + } + return result, nil } func decodeValuesIntoDatumsHelper( @@ -352,6 +410,13 @@ func ValidateDatum(t *testing.T, expected tree.Datum, actual tree.Datum) { for i := 0; i < len(arr1); i++ { ValidateDatum(t, arr1[i], arr2[i]) } + case types.TupleFamily: + arr1 := expected.(*tree.DTuple).D + arr2 := actual.(*tree.DTuple).D + require.Equal(t, len(arr1), len(arr2)) + for i := 0; i < len(arr1); i++ { + ValidateDatum(t, arr1[i], arr2[i]) + } case types.EnumFamily: require.Equal(t, expected.(*tree.DEnum).LogicalRep, actual.(*tree.DEnum).LogicalRep) case types.CollatedStringFamily: @@ -383,6 +448,7 @@ func makeTestingEnumType() *types.T { const typeOidMetaKey = `crdbTypeOIDs` const typeFamilyMetaKey = `crdbTypeFamilies` +const tupleIndexesMetaKey = `crdbTupleIndexes` // MakeReaderMetadata returns column type metadata that will be written to all // parquet files. This metadata is useful for roundtrip tests where we construct @@ -391,31 +457,137 @@ func MakeReaderMetadata(sch *SchemaDefinition) map[string]string { meta := map[string]string{} typOids := make([]uint32, 0, len(sch.cols)) typFamilies := make([]int32, 0, len(sch.cols)) + var tupleIntervals [][]int for _, col := range sch.cols { - typOids = append(typOids, uint32(col.typ.Oid())) - typFamilies = append(typFamilies, int32(col.typ.Family())) + // Tuples get flattened out such that each field in the tuple is + // its own physical column in the parquet file. + if col.typ.Family() == types.TupleFamily { + for _, typ := range col.typ.TupleContents() { + typOids = append(typOids, uint32(typ.Oid())) + typFamilies = append(typFamilies, int32(typ.Family())) + } + tupleInterval := []int{col.physicalColsStartIdx, col.physicalColsStartIdx + col.numPhysicalCols - 1} + tupleIntervals = append(tupleIntervals, tupleInterval) + } else { + typOids = append(typOids, uint32(col.typ.Oid())) + typFamilies = append(typFamilies, int32(col.typ.Family())) + } } meta[typeOidMetaKey] = serializeIntArray(typOids) meta[typeFamilyMetaKey] = serializeIntArray(typFamilies) + meta[tupleIndexesMetaKey] = serialize2DIntArray(tupleIntervals) return meta } -// serializeIntArray serializes an int array to a string "23 2 32 43 32". -func serializeIntArray[I int32 | uint32](ints []I) string { - return strings.Trim(fmt.Sprint(ints), "[]") +// serializeIntArray serializes an int array to a string "[23 2 32 43 32]". +func serializeIntArray[I int | int32 | uint32](ints []I) string { + return fmt.Sprint(ints) } -// deserializeIntArray deserializes an integer sting in the format "23 2 32 43 -// 32" to an array of ints. -func deserializeIntArray(s string) ([]uint32, error) { - vals := strings.Split(s, " ") - result := make([]uint32, 0, len(vals)) +// deserializeIntArray deserializes an integer sting in the format "[23 2 32 43 +// 32]" to an array of ints. +func deserializeIntArray(s string) ([]int, error) { + vals := strings.Split(strings.Trim(s, "[]"), " ") + result := make([]int, 0, len(vals)) for _, val := range vals { intVal, err := strconv.Atoi(val) if err != nil { return nil, err } - result = append(result, uint32(intVal)) + result = append(result, intVal) + } + return result, nil +} + +// serialize2DIntArray serializes a 2D int array in the format +// "[1 2 3][4 5 6][7 8 9]". +func serialize2DIntArray(ints [][]int) string { + var buf bytes.Buffer + for _, colIdxs := range ints { + buf.WriteString(serializeIntArray(colIdxs)) + } + return buf.String() +} + +// deserialize2DIntArray deserializes an integer sting in the format +// "[1 2 3][4 5 6][7 8 9]". +func deserialize2DIntArray(s string) ([][]int, error) { + var result [][]int + found := true + var currentArray string + var remaining string + for found { + currentArray, remaining, found = strings.Cut(s, "]") + if !found { + break + } + // currentArray always has the form "[x y" where x, y are integers + // Drop the leading "[" + currentArray = currentArray[1:] + tupleInts, err := deserializeIntArray(currentArray) + if err != nil { + return nil, err + } + result = append(result, tupleInts) + s = remaining } return result, nil } + +// dNullTuple is a special null which indicates that a tuple field belongs +// to a tuple which is entirely null. This is used to differentiate +// a tuple which is NULL from a tuple containing all NULL fields. +var dNullTuple = dNullTupleType{tree.DNull} + +type dNullTupleType struct { + tree.Datum +} + +// squashTuples takes an array of datums and merges groups of adjacent datums +// into tuples using the passed intervals. Example: +// +// Input: ["0", "1", "2", "3", "4", "5", "6"] [[0, 1], [3, 3], [4, 6]] +// Output: [("0", "1"), "2", ("3"), ("4", "5", "6")] +// +// Behavior is undefined if the intervals are not sorted, not disjoint, +// not ascending, or out of bounds. +func squashTuples(datumRow []tree.Datum, tupleColIndexes [][]int) []tree.Datum { + if len(tupleColIndexes) == 0 { + return datumRow + } + tupleIdx := 0 + var updatedDatums []tree.Datum + var currentTupleDatums []tree.Datum + var currentTupleTypes []*types.T + for i, d := range datumRow { + if tupleIdx < len(tupleColIndexes) { + tupleUpperIdx := tupleColIndexes[tupleIdx][1] + tupleLowerIdx := tupleColIndexes[tupleIdx][0] + + if i >= tupleLowerIdx && i <= tupleUpperIdx { + currentTupleDatums = append(currentTupleDatums, d) + currentTupleTypes = append(currentTupleTypes, d.ResolvedType()) + + if i == tupleUpperIdx { + // Check for marker that indicates the entire tuple is NULL. + if currentTupleDatums[0] == dNullTuple { + updatedDatums = append(updatedDatums, tree.DNull) + } else { + tupleDatum := tree.MakeDTuple(types.MakeTuple(currentTupleTypes), currentTupleDatums...) + updatedDatums = append(updatedDatums, &tupleDatum) + } + + currentTupleTypes = []*types.T{} + currentTupleDatums = []tree.Datum{} + + tupleIdx += 1 + } + } else { + updatedDatums = append(updatedDatums, d) + } + } else { + updatedDatums = append(updatedDatums, d) + } + } + return updatedDatums +} diff --git a/pkg/util/parquet/write_functions.go b/pkg/util/parquet/write_functions.go index 7801597199f4..1c0d8c6bbcba 100644 --- a/pkg/util/parquet/write_functions.go +++ b/pkg/util/parquet/write_functions.go @@ -109,17 +109,52 @@ var zeroLengthArrayDefLevel = []int16{1} var arrayEntryNilDefLevel = []int16{2} var arrayEntryNonNilDefLevel = []int16{3} -// A colWriter is responsible for writing a datum to a file.ColumnChunkWriter. +// The following definition levels are used when writing datums which are in +// tuples. This explanation is valid for the array schema constructed in +// makeColumn. Any corresponding repetition level should be 0 as nonzero +// repetition levels are only valid for arrays in this library. +// +// In summary: +// - def level 0 means the tuple is null +// - def level 1 means the tuple is not null, and contains a null datum +// - def level 2 means the tuple is not null, and contains a non-null datum +// +// Examples: +// +// # Null Tuple +// +// d := tree.DNull +// +// for _, writeFn := range tupleFields { +// writeFn(tree.DNull, ..., defLevels = [0], ...) +// } +// +// # Typical Tuple +// +// d := tree.MakeDTuple(1, NULL, 2) +// writeFnForField1(datum, ..., defLevels = [2], ...) +// writeFnForField2(datum, ..., defLevels = [1], ...) +// writeFnForField3(datum, ..., defLevels = [2], ...) +var nilTupleDefLevel = []int16{0} +var tupleFieldNilDefLevel = []int16{1} +var tupleFieldNonNilDefLevel = []int16{2} + +// For arrays and scalar types, a colWriter is responsible for encoding a datum +// and writing it to a file.ColumnChunkWriter. For tuples, there is a +// file.ColumnChunkWriter per tuple field. type colWriter interface { - Write(d tree.Datum, w file.ColumnChunkWriter, a *batchAlloc, fmtCtx *tree.FmtCtx) error + Write(d tree.Datum, w []file.ColumnChunkWriter, a *batchAlloc, fmtCtx *tree.FmtCtx) error } type scalarWriter writeFn func (w scalarWriter) Write( - d tree.Datum, cw file.ColumnChunkWriter, a *batchAlloc, fmtCtx *tree.FmtCtx, + d tree.Datum, cw []file.ColumnChunkWriter, a *batchAlloc, fmtCtx *tree.FmtCtx, ) error { - return writeScalar(d, cw, a, fmtCtx, writeFn(w)) + if len(cw) != 1 { + return errors.AssertionFailedf("invalid number of column chunk writers in scalar writer: %d", len(cw)) + } + return writeScalar(d, cw[0], a, fmtCtx, writeFn(w)) } func writeScalar( @@ -141,9 +176,12 @@ func writeScalar( type arrayWriter writeFn func (w arrayWriter) Write( - d tree.Datum, cw file.ColumnChunkWriter, a *batchAlloc, fmtCtx *tree.FmtCtx, + d tree.Datum, cw []file.ColumnChunkWriter, a *batchAlloc, fmtCtx *tree.FmtCtx, ) error { - return writeArray(d, cw, a, fmtCtx, writeFn(w)) + if len(cw) != 1 { + return errors.AssertionFailedf("invalid number of column chunk writers in array writer: %d", len(cw)) + } + return writeArray(d, cw[0], a, fmtCtx, writeFn(w)) } func writeArray( @@ -178,6 +216,47 @@ func writeArray( return nil } +type tupleWriter []writeFn + +func (tw tupleWriter) Write( + d tree.Datum, cw []file.ColumnChunkWriter, a *batchAlloc, fmtCtx *tree.FmtCtx, +) error { + if len(cw) != len(tw) { + return errors.AssertionFailedf( + "invalid number of column chunk writers (%d) for tuple writer (%d)", + len(cw), len(tw)) + } + return writeTuple(d, cw, a, fmtCtx, tw) +} + +func writeTuple( + d tree.Datum, w []file.ColumnChunkWriter, a *batchAlloc, fmtCtx *tree.FmtCtx, wFns []writeFn, +) error { + if d == tree.DNull { + for i, wFn := range wFns { + if err := wFn(tree.DNull, w[i], a, fmtCtx, nilTupleDefLevel, newEntryRepLevel); err != nil { + return err + } + } + return nil + } + dt, ok := tree.AsDTuple(d) + if !ok { + return pgerror.Newf(pgcode.DatatypeMismatch, "expected DTuple, found %T", d) + } + for i, wFn := range wFns { + defLevel := tupleFieldNonNilDefLevel + if dt.D[i] == tree.DNull { + defLevel = tupleFieldNilDefLevel + } + if err := wFn(dt.D[i], w[i], a, fmtCtx, defLevel, newEntryRepLevel); err != nil { + return err + } + } + + return nil +} + // A writeFn encodes a datum and writes it using the provided column chunk // writer. The caller is responsible for ensuring that the def levels and rep // levels are correct. diff --git a/pkg/util/parquet/writer.go b/pkg/util/parquet/writer.go index 3fef33e212b2..3baafadb799d 100644 --- a/pkg/util/parquet/writer.go +++ b/pkg/util/parquet/writer.go @@ -137,6 +137,14 @@ type Writer struct { // The current number of rows written to the row group writer. currentRowGroupSize int64 currentRowGroupWriter file.BufferedRowGroupWriter + // Caches the file.ColumnChunkWriters for each datumColumn in the schema + // definition. The array at columnChunkWriterCache[i] has has + // sch.cols[i].numPhysicalCols writers. The writers should be valid for the + // lifespan of the currentRowGroupWriter. + // + // The purpose of this cache is to avoid allocating an array of + // file.ColumnChunkWriter every time we call a colWriter. + columnChunkWriterCache [][]file.ColumnChunkWriter } // NewWriter constructs a new Writer which outputs to @@ -164,27 +172,46 @@ func NewWriter(sch *SchemaDefinition, sink io.Writer, opts ...Option) (*Writer, parquet.WithCompression(cfg.compression), } props := parquet.NewWriterProperties(parquetOpts...) - writer := file.NewParquetWriter(sink, sch.schema.Root(), file.WithWriterProps(props), file.WithWriteMetadata(cfg.metadata)) + writer := file.NewParquetWriter(sink, sch.schema.Root(), file.WithWriterProps(props), + file.WithWriteMetadata(cfg.metadata)) return &Writer{ - sch: sch, - writer: writer, - cfg: cfg, - ba: &batchAlloc{}, + sch: sch, + writer: writer, + cfg: cfg, + ba: &batchAlloc{}, + columnChunkWriterCache: make([][]file.ColumnChunkWriter, len(sch.cols)), }, nil } -func (w *Writer) writeDatumToColChunk(d tree.Datum, colIdx int) error { - cw, err := w.currentRowGroupWriter.Column(colIdx) - if err != nil { - return err +// setNewRowGroupWriter appends a new row group to the Writer and +// refreshes the entries in the column chunk writer cache. +func (w *Writer) setNewRowGroupWriter() error { + w.currentRowGroupWriter = w.writer.AppendBufferedRowGroup() + + for colIdx := 0; colIdx < len(w.sch.cols); colIdx++ { + w.columnChunkWriterCache[colIdx] = w.columnChunkWriterCache[colIdx][:0] + physicalStartIdx := w.sch.cols[colIdx].physicalColsStartIdx + physicalEndIdx := physicalStartIdx + w.sch.cols[colIdx].numPhysicalCols + for i := physicalStartIdx; i < physicalEndIdx; i += 1 { + cw, err := w.currentRowGroupWriter.Column(i) + if err != nil { + return err + } + w.columnChunkWriterCache[colIdx] = append(w.columnChunkWriterCache[colIdx], cw) + } } + w.currentRowGroupSize = 0 + return nil +} + +func (w *Writer) writeDatumToColChunk(d tree.Datum, datumColIdx int) (err error) { // tree.NewFmtCtx uses an underlying pool, so we can assume there is no // allocation here. fmtCtx := tree.NewFmtCtx(tree.FmtExport) defer fmtCtx.Close() - if err = w.sch.cols[colIdx].colWriter.Write(d, cw, w.ba, fmtCtx); err != nil { + if err = w.sch.cols[datumColIdx].colWriter.Write(d, w.columnChunkWriterCache[datumColIdx], w.ba, fmtCtx); err != nil { return err } @@ -194,22 +221,26 @@ func (w *Writer) writeDatumToColChunk(d tree.Datum, colIdx int) error { // AddRow writes the supplied datums. There is no guarantee // that they will be flushed to the sink after AddRow returns. func (w *Writer) AddRow(datums []tree.Datum) error { + if len(datums) != len(w.sch.cols) { + return errors.AssertionFailedf("expected %d datums in row, got %d datums", + len(w.sch.cols), len(datums)) + } + if w.currentRowGroupWriter == nil { - w.currentRowGroupWriter = w.writer.AppendBufferedRowGroup() + if err := w.setNewRowGroupWriter(); err != nil { + return err + } } else if w.currentRowGroupSize == w.cfg.maxRowGroupLength { if err := w.currentRowGroupWriter.Close(); err != nil { return err } - w.currentRowGroupWriter = w.writer.AppendBufferedRowGroup() - w.currentRowGroupSize = 0 - } - - if len(datums) != len(w.sch.cols) { - return errors.AssertionFailedf("expected %d datums in row, got %d datums", len(w.sch.cols), len(datums)) + if err := w.setNewRowGroupWriter(); err != nil { + return err + } } - for idx, d := range datums { - if err := w.writeDatumToColChunk(d, idx); err != nil { + for datumColIdx, d := range datums { + if err := w.writeDatumToColChunk(d, datumColIdx); err != nil { return err } } diff --git a/pkg/util/parquet/writer_bench_test.go b/pkg/util/parquet/writer_bench_test.go index 0b46aec8cc58..2f965d71f0f1 100644 --- a/pkg/util/parquet/writer_bench_test.go +++ b/pkg/util/parquet/writer_bench_test.go @@ -69,8 +69,12 @@ func getBenchmarkTypes() []*types.T { for _, typ := range randgen.SeedTypes { switch typ.Family() { case types.AnyFamily, types.TSQueryFamily, types.TSVectorFamily, - types.VoidFamily, types.TupleFamily: - // Remove Any Tuple. + types.VoidFamily: + case types.TupleFamily: + // Replace Any Tuple with Tuple of Ints with size 5. + typs = append(typs, types.MakeTuple([]*types.T{ + types.Int, types.Int, types.Int, types.Int, types.Int, + })) case types.ArrayFamily: // Replace Any Array with Int Array. typs = append(typs, types.IntArray) diff --git a/pkg/util/parquet/writer_test.go b/pkg/util/parquet/writer_test.go index 2def9b844b77..754bbe1398e4 100644 --- a/pkg/util/parquet/writer_test.go +++ b/pkg/util/parquet/writer_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/bitarray" "github.com/cockroachdb/cockroach/pkg/util/duration" "github.com/cockroachdb/cockroach/pkg/util/ipaddr" + "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil/pgdate" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -49,14 +50,25 @@ func newColSchema(numCols int) *colSchema { // that are not supported by the writer. func typSupported(typ *types.T) bool { switch typ.Family() { - case types.AnyFamily, types.TSQueryFamily, types.TSVectorFamily, - types.TupleFamily, types.VoidFamily: + case types.AnyFamily, types.TSQueryFamily, types.TSVectorFamily, types.VoidFamily: return false case types.ArrayFamily: if typ.ArrayContents().Family() == types.ArrayFamily || typ.ArrayContents().Family() == types.TupleFamily { return false } return typSupported(typ.ArrayContents()) + case types.TupleFamily: + supported := true + if len(typ.TupleContents()) == 0 { + return false + } + for _, typleFieldTyp := range typ.TupleContents() { + if typleFieldTyp.Family() == types.ArrayFamily || typleFieldTyp.Family() == types.TupleFamily { + return false + } + supported = supported && typSupported(typleFieldTyp) + } + return supported default: // It is better to let an unexpected type pass the filter and fail the test // because we can observe and document such failures. @@ -450,6 +462,30 @@ func TestBasicDatums(t *testing.T) { }, nil }, }, + { + name: "tuple", + sch: &colSchema{ + columnTypes: []*types.T{ + types.Int, + types.MakeTuple([]*types.T{types.Int, types.Int, types.Int, types.Int}), + types.Int, + types.MakeTuple([]*types.T{types.Int}), + types.Int, + types.MakeTuple([]*types.T{types.Int, types.Int}), + types.Int, + types.MakeLabeledTuple([]*types.T{types.Int, types.Int}, []string{"a", "b"}), + }, + columnNames: []string{"a", "b", "c", "d", "e", "f", "g", "h"}, + }, + datums: func() ([][]tree.Datum, error) { + dt1 := tree.MakeDTuple(types.MakeTuple([]*types.T{types.Int, types.Int, types.Int, types.Int}), tree.NewDInt(2), tree.NewDInt(3), tree.NewDInt(4), tree.NewDInt(5)) + dt2 := tree.MakeDTuple(types.MakeTuple([]*types.T{types.Int}), tree.NewDInt(7)) + dt3 := tree.MakeDTuple(types.MakeLabeledTuple([]*types.T{types.Int, types.Int}, []string{"a", "b"}), tree.DNull, tree.DNull) + return [][]tree.Datum{ + {tree.NewDInt(1), &dt1, tree.NewDInt(6), &dt2, tree.NewDInt(8), tree.DNull, tree.NewDInt(9), &dt3}, + }, nil + }, + }, } { t.Run(tc.name, func(t *testing.T) { datums, err := tc.datums() @@ -587,3 +623,40 @@ func optionsTest(t *testing.T, opt Option, testFn func(t *testing.T, reader *fil err = reader.Close() require.NoError(t, err) } +func TestSquashTuples(t *testing.T) { + datums := []tree.Datum{ + tree.NewDInt(1), + tree.NewDString("string"), + tree.NewDBytes("bytes"), + tree.NewDUuid(tree.DUuid{UUID: uuid.FromStringOrNil("52fdfc07-2182-454f-963f-5f0f9a621d72")}), + tree.NewDJSON(json.FromInt(1)), + tree.NewDFloat(0.1), + tree.NewDJSON(json.FromBool(false)), + tree.NewDInt(0), + } + + for _, tc := range []struct { + tupleIntervals [][]int + tupleOutput string + }{ + { + tupleIntervals: [][]int{}, + tupleOutput: "[1 'string' '\\x6279746573' '52fdfc07-2182-454f-963f-5f0f9a621d72' '1' 0.1 'false' 0]", + }, + { + tupleIntervals: [][]int{{0, 1}, {2, 4}}, + tupleOutput: "[(1, 'string') ('\\x6279746573', '52fdfc07-2182-454f-963f-5f0f9a621d72', '1') 0.1 'false' 0]", + }, + { + tupleIntervals: [][]int{{0, 2}, {3, 3}}, + tupleOutput: "[(1, 'string', '\\x6279746573') ('52fdfc07-2182-454f-963f-5f0f9a621d72',) '1' 0.1 'false' 0]", + }, + { + tupleIntervals: [][]int{{0, 7}}, + tupleOutput: "[(1, 'string', '\\x6279746573', '52fdfc07-2182-454f-963f-5f0f9a621d72', '1', 0.1, 'false', 0)]", + }, + } { + squashedDatums := squashTuples(datums, tc.tupleIntervals) + require.Equal(t, tc.tupleOutput, fmt.Sprint(squashedDatums)) + } +}