Skip to content

Commit

Permalink
util/parquet: remove dependency on writer to read parquet files
Browse files Browse the repository at this point in the history
Previously, the test utils used to read parquet files
would require the writer as an argument. The main reason
the writer was required is that the writer contained
crdb-specific type information which could be used to
decode raw data until crdb datums.

With this change, the writer is updated to write this
crdb-specific type information to the parquet file in
its metadata. The reader is updated to the read type
information from the file metadata. There is a new
test utility function `ReadFile(parquetFile string)`
which can be used to read all datums from a parquet
file without providing any additional type information.
The function also returns metadata since it is possible
for users of the `Writer` to write arbitrary metadata
and such users may need this metadata in testing.

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None
  • Loading branch information
jayshrivastava committed Jun 21, 2023
1 parent 7d6fd4c commit 30a0e9f
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 86 deletions.
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1178,8 +1178,7 @@ func waitForJobStatus(
}

// TestingSetIncludeParquetMetadata adds the option to turn on adding metadata
// (primary key column names) to the parquet file which is used to convert parquet
// data to JSON format
// to the parquet file which is used in testing.
func TestingSetIncludeParquetMetadata() func() {
includeParquetTestMetadata = true
return func() {
Expand Down
9 changes: 7 additions & 2 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type parquetWriter struct {
// newParquetWriterFromRow constructs a new parquet writer which outputs to
// the given sink. This function interprets the schema from the supplied row.
func newParquetWriterFromRow(
row cdcevent.Row, sink io.Writer, maxRowGroupSize int64,
row cdcevent.Row, sink io.Writer, opts ...parquet.Option,
) (*parquetWriter, error) {
columnNames := make([]string, len(row.ResultColumns())+1)
columnTypes := make([]*types.T, len(row.ResultColumns())+1)
Expand All @@ -48,7 +48,12 @@ func newParquetWriterFromRow(
return nil, err
}

writer, err := parquet.NewWriter(schemaDef, sink, parquet.WithMaxRowGroupLength(maxRowGroupSize))
writerConstructor := parquet.NewWriter
if includeParquetTestMetadata {
writerConstructor = parquet.NewWriterWithReaderMeta
}

writer, err := writerConstructor(schemaDef, sink, opts...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func TestParquetRows(t *testing.T) {
// Rangefeed reader can time out under stress.
skip.UnderStress(t)

defer TestingSetIncludeParquetMetadata()()

ctx := context.Background()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)
Expand Down Expand Up @@ -106,7 +108,7 @@ func TestParquetRows(t *testing.T) {
require.NoError(t, err)

if writer == nil {
writer, err = newParquetWriterFromRow(updatedRow, f, maxRowGroupSize)
writer, err = newParquetWriterFromRow(updatedRow, f, parquet.WithMaxRowGroupLength(maxRowGroupSize))
if err != nil {
t.Fatalf(err.Error())
}
Expand Down
1 change: 0 additions & 1 deletion pkg/util/parquet/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ go_library(
"@com_github_apache_arrow_go_v11//parquet/schema",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
Expand Down
72 changes: 71 additions & 1 deletion pkg/util/parquet/decoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/geo"
"github.com/cockroachdb/cockroach/pkg/geo/geopb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/bitarray"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/timeofday"
Expand Down Expand Up @@ -236,6 +237,76 @@ func (collatedStringDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
return &tree.DCollatedString{Contents: string(v)}, nil
}

// decoderFromFamilyAndType returns the decoder to use based on the type oid and
// family. Note the logical similarity to makeColumn in schema.go. This is
// intentional as each decoder returned by this function corresponds to a
// particular colWriter determined by makeColumn.
// TODO: refactor to remove the code duplication with makeColumn
func decoderFromFamilyAndType(typOid oid.Oid, family types.Family) (decoder, error) {
switch family {
case types.BoolFamily:
return boolDecoder{}, nil
case types.StringFamily:
return stringDecoder{}, nil
case types.IntFamily:
typ, ok := types.OidToType[typOid]
if !ok {
return nil, errors.AssertionFailedf("could not determine type from oid %d", typOid)
}
if typ.Oid() == oid.T_int8 {
return int64Decoder{}, nil
}
return int32Decoder{}, nil
case types.DecimalFamily:
return decimalDecoder{}, nil
case types.TimestampFamily:
return timestampDecoder{}, nil
case types.TimestampTZFamily:
return timestampTZDecoder{}, nil
case types.UuidFamily:
return uUIDDecoder{}, nil
case types.INetFamily:
return iNetDecoder{}, nil
case types.JsonFamily:
return jsonDecoder{}, nil
case types.BitFamily:
return bitDecoder{}, nil
case types.BytesFamily:
return bytesDecoder{}, nil
case types.EnumFamily:
return enumDecoder{}, nil
case types.DateFamily:
return dateDecoder{}, nil
case types.Box2DFamily:
return box2DDecoder{}, nil
case types.GeographyFamily:
return geographyDecoder{}, nil
case types.GeometryFamily:
return geometryDecoder{}, nil
case types.IntervalFamily:
return intervalDecoder{}, nil
case types.TimeFamily:
return timeDecoder{}, nil
case types.TimeTZFamily:
return timeTZDecoder{}, nil
case types.FloatFamily:
typ, ok := types.OidToType[typOid]
if !ok {
return nil, errors.AssertionFailedf("could not determine type from oid %d", typOid)
}
if typ.Oid() == oid.T_float4 {
return float32Decoder{}, nil
}
return float64Decoder{}, nil
case types.OidFamily:
return oidDecoder{}, nil
case types.CollatedStringFamily:
return collatedStringDecoder{}, nil
default:
return nil, errors.AssertionFailedf("could not find decoder for type oid %d and family %d", typOid, family)
}
}

// Defeat the linter's unused lint errors.
func init() {
var _, _ = boolDecoder{}.decode(false)
Expand All @@ -253,7 +324,6 @@ func init() {
var _, _ = enumDecoder{}.decode(parquet.ByteArray{})
var _, _ = dateDecoder{}.decode(parquet.ByteArray{})
var _, _ = box2DDecoder{}.decode(parquet.ByteArray{})
var _, _ = box2DDecoder{}.decode(parquet.ByteArray{})
var _, _ = geographyDecoder{}.decode(parquet.ByteArray{})
var _, _ = geometryDecoder{}.decode(parquet.ByteArray{})
var _, _ = intervalDecoder{}.decode(parquet.ByteArray{})
Expand Down
26 changes: 0 additions & 26 deletions pkg/util/parquet/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ const defaultTypeLength = -1
type column struct {
node schema.Node
colWriter colWriter
decoder decoder
typ *types.T
}

Expand Down Expand Up @@ -99,7 +98,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
case types.BoolFamily:
result.node = schema.NewBooleanNode(colName, repetitions, defaultSchemaFieldID)
result.colWriter = scalarWriter(writeBool)
result.decoder = boolDecoder{}
result.typ = types.Bool
return result, nil
case types.StringFamily:
Expand All @@ -111,7 +109,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeString)
result.decoder = stringDecoder{}
return result, nil
case types.IntFamily:
// Note: integer datums are always signed: https://www.cockroachlabs.com/docs/stable/int.html
Expand All @@ -124,13 +121,11 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeInt64)
result.decoder = int64Decoder{}
return result, nil
}

result.node = schema.NewInt32Node(colName, repetitions, defaultSchemaFieldID)
result.colWriter = scalarWriter(writeInt32)
result.decoder = int32Decoder{}
return result, nil
case types.DecimalFamily:
// According to PostgresSQL docs, scale or precision of 0 implies max
Expand All @@ -155,7 +150,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeDecimal)
result.decoder = decimalDecoder{}
return result, nil
case types.UuidFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
Expand All @@ -165,7 +159,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeUUID)
result.decoder = uUIDDecoder{}
return result, nil
case types.TimestampFamily:
// We do not use schema.TimestampLogicalType because the library will enforce
Expand All @@ -177,7 +170,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeTimestamp)
result.decoder = timestampDecoder{}
return result, nil
case types.TimestampTZFamily:
// We do not use schema.TimestampLogicalType because the library will enforce
Expand All @@ -189,7 +181,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeTimestampTZ)
result.decoder = timestampTZDecoder{}
return result, nil
case types.INetFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
Expand All @@ -199,7 +190,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeINet)
result.decoder = iNetDecoder{}
return result, nil
case types.JsonFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
Expand All @@ -209,7 +199,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeJSON)
result.decoder = jsonDecoder{}
return result, nil
case types.BitFamily:
result.node, err = schema.NewPrimitiveNode(colName,
Expand All @@ -219,7 +208,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeBit)
result.decoder = bitDecoder{}
return result, nil
case types.BytesFamily:
result.node, err = schema.NewPrimitiveNode(colName,
Expand All @@ -229,7 +217,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeBytes)
result.decoder = bytesDecoder{}
return result, nil
case types.EnumFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
Expand All @@ -239,7 +226,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeEnum)
result.decoder = enumDecoder{}
return result, nil
case types.DateFamily:
// We do not use schema.DateLogicalType because the library will enforce
Expand All @@ -251,7 +237,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeDate)
result.decoder = dateDecoder{}
return result, nil
case types.Box2DFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
Expand All @@ -261,7 +246,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeBox2D)
result.decoder = box2DDecoder{}
return result, nil
case types.GeographyFamily:
result.node, err = schema.NewPrimitiveNode(colName,
Expand All @@ -271,7 +255,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeGeography)
result.decoder = geographyDecoder{}
return result, nil
case types.GeometryFamily:
result.node, err = schema.NewPrimitiveNode(colName,
Expand All @@ -281,7 +264,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeGeometry)
result.decoder = geometryDecoder{}
return result, nil
case types.IntervalFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
Expand All @@ -291,7 +273,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeInterval)
result.decoder = intervalDecoder{}
return result, nil
case types.TimeFamily:
// CRDB stores time datums in microseconds, adjusted to UTC.
Expand All @@ -303,7 +284,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeTime)
result.decoder = timeDecoder{}
return result, nil
case types.TimeTZFamily:
// We cannot use the schema.NewTimeLogicalType because it does not support
Expand All @@ -315,7 +295,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeTimeTZ)
result.decoder = timeTZDecoder{}
return result, nil
case types.FloatFamily:
if typ.Oid() == oid.T_float4 {
Expand All @@ -326,7 +305,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeFloat32)
result.decoder = float32Decoder{}
return result, nil
}
result.node, err = schema.NewPrimitiveNode(colName,
Expand All @@ -336,12 +314,10 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeFloat64)
result.decoder = float64Decoder{}
return result, nil
case types.OidFamily:
result.node = schema.NewInt32Node(colName, repetitions, defaultSchemaFieldID)
result.colWriter = scalarWriter(writeOid)
result.decoder = oidDecoder{}
return result, nil
case types.CollatedStringFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
Expand All @@ -351,7 +327,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeCollatedString)
result.decoder = collatedStringDecoder{}
return result, nil
case types.ArrayFamily:
// Arrays for type T are represented by the following:
Expand Down Expand Up @@ -383,7 +358,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
if err != nil {
return result, err
}
result.decoder = elementCol.decoder
scalarColWriter, ok := elementCol.colWriter.(scalarWriter)
if !ok {
return result, errors.AssertionFailedf("expected scalar column writer")
Expand Down
Loading

0 comments on commit 30a0e9f

Please sign in to comment.