Skip to content

Commit

Permalink
changefeedccl: use new parquet library
Browse files Browse the repository at this point in the history
This change updates changefeeds to use the new parquet library
added in `pkg/util/parquet` when using `format=parquet`.

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None
  • Loading branch information
jayshrivastava committed Jun 21, 2023
1 parent 30a0e9f commit 2ce1d54
Show file tree
Hide file tree
Showing 16 changed files with 260 additions and 398 deletions.
5 changes: 0 additions & 5 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ go_library(
"//pkg/sql/execinfrapb",
"//pkg/sql/exprutil",
"//pkg/sql/flowinfra",
"//pkg/sql/importer",
"//pkg/sql/isql",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
Expand Down Expand Up @@ -154,9 +153,6 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_fraugster_parquet_go//:parquet-go",
"@com_github_fraugster_parquet_go//parquet",
"@com_github_fraugster_parquet_go//parquetschema",
"@com_github_gogo_protobuf//jsonpb",
"@com_github_gogo_protobuf//types",
"@com_github_google_btree//:btree",
Expand Down Expand Up @@ -313,7 +309,6 @@ go_test(
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_fraugster_parquet_go//:parquet-go",
"@com_github_gogo_protobuf//types",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_lib_pq//:pq",
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7485,7 +7485,6 @@ func TestChangefeedPredicateWithSchemaChange(t *testing.T) {
defer log.Scope(t).Close(t)

skip.UnderRace(t, "takes too long under race")
defer TestingSetIncludeParquetMetadata()()

setupSQL := []string{
`CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`,
Expand Down
43 changes: 26 additions & 17 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,18 +423,31 @@ var NoLongerExperimental = map[string]string{
DeprecatedSinkSchemeCloudStorageS3: SinkSchemeCloudStorageS3,
}

// OptionsSet is a test of changefeed option strings.
type OptionsSet map[string]struct{}

// InitialScanOnlyUnsupportedOptions is options that are not supported with the
// initial scan only option
var InitialScanOnlyUnsupportedOptions = makeStringSet(OptEndTime, OptResolvedTimestamps, OptDiff,
var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptResolvedTimestamps, OptDiff,
OptMVCCTimestamps, OptUpdatedTimestamps)

// ParquetFormatUnsupportedOptions is options that are not supported with the
// parquet format.
//
// OptKeyInValue is disallowed because parquet files have no concept of key
// columns, so there is no reason to emit duplicate key datums.
//
// TODO(#103129): add support for some of these
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptResolvedTimestamps, OptDiff,
OptMVCCTimestamps, OptUpdatedTimestamps, OptKeyInValue)

// AlterChangefeedUnsupportedOptions are changefeed options that we do not allow
// users to alter.
// TODO(sherman): At the moment we disallow altering both the initial_scan_only
// and the end_time option. However, there are instances in which it should be
// allowed to alter either of these options. We need to support the alteration
// of these fields.
var AlterChangefeedUnsupportedOptions = makeStringSet(OptCursor, OptInitialScan,
var AlterChangefeedUnsupportedOptions OptionsSet = makeStringSet(OptCursor, OptInitialScan,
OptNoInitialScan, OptInitialScanOnly, OptEndTime)

// AlterChangefeedOptionExpectValues is used to parse alter changefeed options
Expand Down Expand Up @@ -1039,16 +1052,21 @@ func (s StatementOptions) ValidateForCreateChangefeed(isPredicateChangefeed bool
if err != nil {
return err
}
validateInitialScanUnsupportedOptions := func(errMsg string) error {
for o := range InitialScanOnlyUnsupportedOptions {

// validateUnsupportedOptions returns an error if any of the supplied are
// in the statement options. The error string should be the string
// representation of the option (ex. "key_in_value", or "initial_scan='only'").
validateUnsupportedOptions := func(unsupportedOptions OptionsSet, errorStr string) error {
for o := range unsupportedOptions {
if _, ok := s.m[o]; ok {
return errors.Newf(`cannot specify both %s='only' and %s`, OptInitialScan, o)
return errors.Newf(`cannot specify both %s and %s`, errorStr, o)
}
}
return nil
}
if scanType == OnlyInitialScan {
if err := validateInitialScanUnsupportedOptions(fmt.Sprintf("%s='only'", OptInitialScan)); err != nil {
if err := validateUnsupportedOptions(InitialScanOnlyUnsupportedOptions,
fmt.Sprintf("%s='only'", OptInitialScan)); err != nil {
return err
}
} else {
Expand All @@ -1058,17 +1076,8 @@ func (s StatementOptions) ValidateForCreateChangefeed(isPredicateChangefeed bool
}
// Right now parquet does not support any of these options
if s.m[OptFormat] == string(OptFormatParquet) {
if isPredicateChangefeed {
// Diff option is allowed when using predicate changefeeds with parquet format.
for o := range InitialScanOnlyUnsupportedOptions {
if _, ok := s.m[o]; ok && o != OptDiff {
return errors.Newf(`cannot specify both format='%s' and %s`, OptFormatParquet, o)
}
}
} else {
if err := validateInitialScanUnsupportedOptions(string(OptFormatParquet)); err != nil {
return err
}
if err := validateUnsupportedOptions(ParquetFormatUnsupportedOptions, fmt.Sprintf("format=%s", OptFormatParquet)); err != nil {
return err
}
}
for o := range s.m {
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func TestOptionsValidations(t *testing.T) {
{map[string]string{"initial_scan_only": "", "resolved": ""}, true, "cannot specify both initial_scan='only'"},
{map[string]string{"initial_scan_only": "", "resolved": ""}, true, "cannot specify both initial_scan='only'"},
{map[string]string{"key_column": "b"}, false, "requires the unordered option"},
{map[string]string{"diff": "", "format": "parquet"}, true, ""},
}

for _, test := range tests {
Expand Down
16 changes: 7 additions & 9 deletions pkg/ccl/changefeedccl/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,8 +1064,6 @@ func TestParquetEncoder(t *testing.T) {
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
defer TestingSetIncludeParquetMetadata()()

tests := []struct {
name string
changefeedStmt string
Expand All @@ -1085,22 +1083,22 @@ func TestParquetEncoder(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY, x STRING, y INT, z FLOAT NOT NULL, a BOOL)`)
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY, x STRING, y INT, z FLOAT NOT NULL, a BOOL, c INT[])`)
defer sqlDB.Exec(t, `DROP TABLE FOO`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'Alice', 3, 0.5032135844230652, true), (2, 'Bob',
2, CAST('nan' AS FLOAT),false),(3, NULL, NULL, 4.5, NULL)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'Alice', 3, 0.5032135844230652, true, ARRAY[]), (2, 'Bob',
2, CAST('nan' AS FLOAT),false, NULL),(3, NULL, NULL, 4.5, NULL, ARRAY[1,NULL,3])`)
foo := feed(t, f, test.changefeedStmt)
defer closeFeed(t, foo)

assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": true, "i": 1, "x": "Alice", "y": 3, "z": 0.5032135844230652}}`,
`foo: [2]->{"after": {"a": false, "i": 2, "x": "Bob", "y": 2, "z": "NaN"}}`,
`foo: [3]->{"after": {"a": null, "i": 3, "x": null, "y": null, "z": 4.5}}`,
`foo: [1]->{"after": {"a": true, "c": [], "i": 1, "x": "Alice", "y": 3, "z": 0.5032135844230652}}`,
`foo: [2]->{"after": {"a": false, "c": null, "i": 2, "x": "Bob", "y": 2, "z": "NaN"}}`,
`foo: [3]->{"after": {"a": null, "c": [1, null, 3], "i": 3, "x": null, "y": null, "z": 4.5}}`,
})

sqlDB.Exec(t, `UPDATE foo SET x='wonderland' where i=1`)
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": true, "i": 1, "x": "wonderland", "y": 3, "z": 0.5032135844230652}}`,
`foo: [1]->{"after": {"a": true, "c": [], "i": 1, "x": "wonderland", "y": 3, "z": 0.5032135844230652}}`,
})

sqlDB.Exec(t, `DELETE from foo where i=1`)
Expand Down
21 changes: 9 additions & 12 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,9 +924,7 @@ func makeFeedFactoryWithOptions(
t.Fatalf("expected externalIODir option to be set")
}
f := makeCloudFeedFactory(srvOrCluster, db, options.externalIODir)
return f, func() {
TestingSetIncludeParquetMetadata()()
}
return f, func() {}
case "enterprise":
sink, cleanup := pgURLForUser(username.RootUser)
f := makeTableFeedFactory(srvOrCluster, db, sink)
Expand Down Expand Up @@ -981,12 +979,20 @@ func cdcTestNamedWithSystem(
testLabel = fmt.Sprintf("%s/%s", sinkType, name)
}
t.Run(testLabel, func(t *testing.T) {
// Even if the parquet format is not being used, enable metadata
// in all tests for simplicity.
testServer, cleanupServer := makeServerWithOptions(t, options)
knobs := testServer.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.EnableParquetMetadata = true

feedFactory, cleanupSink := makeFeedFactoryWithOptions(t, sinkType, testServer.Server, testServer.DB, options)
feedFactory = maybeUseExternalConnection(feedFactory, testServer.DB, sinkType, options, t)
defer cleanupServer()
defer cleanupSink()
defer cleanupCloudStorage()

testFn(t, testServer, feedFactory)
})
}
Expand Down Expand Up @@ -1177,15 +1183,6 @@ func waitForJobStatus(
})
}

// TestingSetIncludeParquetMetadata adds the option to turn on adding metadata
// to the parquet file which is used in testing.
func TestingSetIncludeParquetMetadata() func() {
includeParquetTestMetadata = true
return func() {
includeParquetTestMetadata = false
}
}

// ChangefeedJobPermissionsTestSetup creates entities and users with various permissions
// for tests which test access control for changefeed jobs.
//
Expand Down
69 changes: 58 additions & 11 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package changefeedccl

import (
"io"
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand All @@ -22,13 +23,13 @@ type parquetWriter struct {
datumAlloc []tree.Datum
}

// newParquetWriterFromRow constructs a new parquet writer which outputs to
// the given sink. This function interprets the schema from the supplied row.
func newParquetWriterFromRow(
row cdcevent.Row, sink io.Writer, opts ...parquet.Option,
) (*parquetWriter, error) {
columnNames := make([]string, len(row.ResultColumns())+1)
columnTypes := make([]*types.T, len(row.ResultColumns())+1)
// newParquetSchemaDefintion returns a parquet schema definition based on the
// cdcevent.Row and the number of cols in the schema.
func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int, error) {
numCols := len(row.ResultColumns()) + 1

columnNames := make([]string, numCols)
columnTypes := make([]*types.T, numCols)

idx := 0
if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error {
Expand All @@ -37,27 +38,49 @@ func newParquetWriterFromRow(
idx += 1
return nil
}); err != nil {
return nil, err
return nil, 0, err
}

columnNames[idx] = parquetCrdbEventTypeColName
columnTypes[idx] = types.String

schemaDef, err := parquet.NewSchema(columnNames, columnTypes)
if err != nil {
return nil, 0, err
}
return schemaDef, numCols, nil
}

// newParquetWriterFromRow constructs a new parquet writer which outputs to
// the given sink. This function interprets the schema from the supplied row.
func newParquetWriterFromRow(
row cdcevent.Row,
sink io.Writer,
knobs *TestingKnobs, /* may be nil */
opts ...parquet.Option,
) (*parquetWriter, error) {
schemaDef, numCols, err := newParquetSchemaDefintion(row)
if err != nil {
return nil, err
}

writerConstructor := parquet.NewWriter
if includeParquetTestMetadata {

if knobs.EnableParquetMetadata {
if opts, err = addParquetTestMetadata(row, opts); err != nil {
return nil, err
}

// To use parquet test utils for reading datums, the writer needs to be
// configured with additional metadata.
writerConstructor = parquet.NewWriterWithReaderMeta
}

writer, err := writerConstructor(schemaDef, sink, opts...)
if err != nil {
return nil, err
}
return &parquetWriter{inner: writer, datumAlloc: make([]tree.Datum, len(columnNames))}, nil
return &parquetWriter{inner: writer, datumAlloc: make([]tree.Datum, numCols)}, nil
}

// addData writes the updatedRow, adding the row's event type. There is no guarantee
Expand All @@ -70,7 +93,7 @@ func (w *parquetWriter) addData(updatedRow cdcevent.Row, prevRow cdcevent.Row) e
return w.inner.AddRow(w.datumAlloc)
}

// Close closes the writer and flushes any buffered data to the sink.
// close closes the writer and flushes any buffered data to the sink.
func (w *parquetWriter) close() error {
return w.inner.Close()
}
Expand All @@ -88,3 +111,27 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []
datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString())
return nil
}

// addParquetTestMetadata appends options to the provided options to configure the
// parquet writer to write metadata required by cdc test feed factories.
func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.Option, error) {
keyCols := make([]string, 0)
if err := row.ForEachKeyColumn().Col(func(col cdcevent.ResultColumn) error {
keyCols = append(keyCols, col.Name)
return nil
}); err != nil {
return opts, err
}
opts = append(opts, parquet.WithMetadata(map[string]string{"keyCols": strings.Join(keyCols, ",")}))

allCols := make([]string, 0)
if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error {
allCols = append(allCols, col.Name)
return nil
}); err != nil {
return opts, err
}
allCols = append(allCols, parquetCrdbEventTypeColName)
opts = append(opts, parquet.WithMetadata(map[string]string{"allCols": strings.Join(allCols, ",")}))
return opts, nil
}
Loading

0 comments on commit 2ce1d54

Please sign in to comment.