Skip to content

Commit

Permalink
changefeedccl: support diff option with parquet format
Browse files Browse the repository at this point in the history
This change adds support for the `diff` changefeed
options when using `format=parquet`. Enabling `diff` also adds
support for CDC Transformations with parquet.

Informs: cockroachdb#103129
Informs: cockroachdb#99028
Epic: CRDB-27372
Release note: None
  • Loading branch information
jayshrivastava committed Jun 21, 2023
1 parent cf44c1d commit 60f169f
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 19 deletions.
6 changes: 1 addition & 5 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,7 @@ func TestChangefeedBasicQuery(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`)
// Currently, parquet format (which may be injected by feed() call, doesn't
// know how to handle tuple types (cdc_prev); so, force JSON format.
foo := feed(t, f, `
CREATE CHANGEFEED WITH format='json'
AS SELECT *, event_op() AS op, cdc_prev FROM foo`)
foo := feed(t, f, `CREATE CHANGEFEED AS SELECT *, event_op() AS op, cdc_prev FROM foo`)
defer closeFeed(t, foo)

// 'initial' is skipped because only the latest value ('updated') is
Expand Down
4 changes: 1 addition & 3 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,7 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt

// ParquetFormatUnsupportedOptions is options that are not supported with the
// parquet format.
//
// TODO(#103129): add support for some of these
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, OptTopicInValue)
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptTopicInValue)

// AlterChangefeedUnsupportedOptions are changefeed options that we do not allow
// users to alter.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func TestOptionsValidations(t *testing.T) {
}{
{map[string]string{"format": "txt"}, false, "unknown format"},
{map[string]string{"initial_scan": "", "no_initial_scan": ""}, false, "cannot specify both"},
{map[string]string{"diff": "", "format": "parquet"}, false, "cannot specify both"},
{map[string]string{"format": "txt"}, true, "unknown format"},
{map[string]string{"initial_scan": "", "no_initial_scan": ""}, true, "cannot specify both"},
{map[string]string{"format": "parquet", "topic_in_value": ""}, false, "cannot specify both"},
// Verify that the returned error uses the syntax initial_scan='yes' instead of initial_scan_only. See #97008.
{map[string]string{"initial_scan_only": "", "resolved": ""}, true, "cannot specify both initial_scan='only'"},
{map[string]string{"initial_scan_only": "", "resolved": ""}, true, "cannot specify both initial_scan='only'"},
Expand Down
67 changes: 57 additions & 10 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ import (
"io"
"strconv"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/cockroachdb/errors"
)
Expand All @@ -33,6 +36,9 @@ type parquetWriter struct {
encodingOpts changefeedbase.EncodingOptions
schemaDef *parquet.SchemaDefinition
datumAlloc []tree.Datum

// Cached object builder for when using the `diff` option.
vb *json.FixedKeysObjectBuilder
}

// newParquetSchemaDefintion returns a parquet schema definition based on the
Expand Down Expand Up @@ -68,6 +74,7 @@ func newParquetSchemaDefintion(

const parquetOptUpdatedTimestampColName = metaSentinel + changefeedbase.OptUpdatedTimestamps
const parquetOptMVCCTimestampColName = metaSentinel + changefeedbase.OptMVCCTimestamps
const parquetOptDiffColName = metaSentinel + "before"

func appendMetadataColsToSchema(
columnNames []string, columnTypes []*types.T, encodingOpts changefeedbase.EncodingOptions,
Expand All @@ -80,6 +87,10 @@ func appendMetadataColsToSchema(
columnNames = append(columnNames, parquetOptMVCCTimestampColName)
columnTypes = append(columnTypes, types.String)
}
if encodingOpts.Diff {
columnNames = append(columnNames, parquetOptDiffColName)
columnTypes = append(columnTypes, types.Json)
}
return columnNames, columnTypes
}

Expand Down Expand Up @@ -118,7 +129,7 @@ func newParquetWriterFromRow(
func (w *parquetWriter) addData(
updatedRow cdcevent.Row, prevRow cdcevent.Row, updated, mvcc hlc.Timestamp,
) error {
if err := populateDatums(updatedRow, prevRow, w.encodingOpts, updated, mvcc, w.datumAlloc); err != nil {
if err := w.populateDatums(updatedRow, prevRow, updated, mvcc); err != nil {
return err
}

Expand All @@ -131,14 +142,10 @@ func (w *parquetWriter) close() error {
}

// populateDatums writes the appropriate datums into the datumAlloc slice.
func populateDatums(
updatedRow cdcevent.Row,
prevRow cdcevent.Row,
encodingOpts changefeedbase.EncodingOptions,
updated, mvcc hlc.Timestamp,
datumAlloc []tree.Datum,
func (w *parquetWriter) populateDatums(
updatedRow cdcevent.Row, prevRow cdcevent.Row, updated, mvcc hlc.Timestamp,
) error {
datums := datumAlloc[:0]
datums := w.datumAlloc[:0]

if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error {
datums = append(datums, d)
Expand All @@ -148,12 +155,47 @@ func populateDatums(
}
datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString())

if encodingOpts.UpdatedTimestamps {
if w.encodingOpts.UpdatedTimestamps {
datums = append(datums, tree.NewDString(timestampToString(updated)))
}
if encodingOpts.MVCCTimestamps {
if w.encodingOpts.MVCCTimestamps {
datums = append(datums, tree.NewDString(timestampToString(mvcc)))
}
if w.encodingOpts.Diff {
if prevRow.IsDeleted() {
datums = append(datums, tree.DNull)
} else {
if w.vb == nil {
keys := make([]string, 0, len(prevRow.ResultColumns()))
_ = prevRow.ForEachColumn().Col(func(col cdcevent.ResultColumn) error {
keys = append(keys, col.Name)
return nil
})
valueBuilder, err := json.NewFixedKeysObjectBuilder(keys)
if err != nil {
return err
}
w.vb = valueBuilder
}

if err := prevRow.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
j, err := tree.AsJSON(d, sessiondatapb.DataConversionConfig{}, time.UTC)
if err != nil {
return err
}
return w.vb.Set(col.Name, j)
}); err != nil {
return err
}

j, err := w.vb.Build()
if err != nil {
return err
}
datums = append(datums, tree.NewDJSON(j))
}
}

return nil
}

Expand Down Expand Up @@ -228,6 +270,11 @@ func addParquetTestMetadata(
valueCols[parquetOptMVCCTimestampColName] = idx
idx += 1
}
if encodingOpts.Diff {
valuesInOrder = append(valuesInOrder, parquetOptDiffColName)
valueCols[parquetOptDiffColName] = idx
idx += 1
}

parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)}))
parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)}))
Expand Down
9 changes: 9 additions & 0 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,8 @@ func (c *cloudFeed) appendParquetTestFeedMessages(
metaColumnNameSet[colName] = colIdx
case parquetOptMVCCTimestampColName:
metaColumnNameSet[colName] = colIdx
case parquetOptDiffColName:
metaColumnNameSet[colName] = colIdx
default:
}
}
Expand Down Expand Up @@ -1339,6 +1341,13 @@ func (c *cloudFeed) appendParquetTestFeedMessages(
}
valueWithAfter.Add(changefeedbase.OptMVCCTimestamps, j)
}
if mvccColIdx, mvcc := metaColumnNameSet[parquetOptDiffColName]; mvcc {
j, err := tree.AsJSON(row[mvccColIdx], sessiondatapb.DataConversionConfig{}, time.UTC)
if err != nil {
return err
}
valueWithAfter.Add("before", j)
}
}

keyJSON := keyJSONBuilder.Build()
Expand Down

0 comments on commit 60f169f

Please sign in to comment.