Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: add full support for the parquet format #104528

Merged
merged 7 commits into from
Jun 15, 2023
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ go_test(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/timeutil/pgdate",
"//pkg/util/uuid",
"//pkg/workload/bank",
"//pkg/workload/ledger",
"//pkg/workload/workloadsql",
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er
}
}

foo, err := f.Feed(`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff`)
withFormatParquet := ""
if rand.Intn(2) < 2 {
withFormatParquet = ", format=parquet"
}
foo, err := f.Feed(fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff %s`, withFormatParquet))
if err != nil {
return nil, err
}
Expand Down
18 changes: 7 additions & 11 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,7 @@ func TestChangefeedBasicQuery(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`)
// Currently, parquet format (which may be injected by feed() call, doesn't
// know how to handle tuple types (cdc_prev); so, force JSON format.
foo := feed(t, f, `
CREATE CHANGEFEED WITH format='json'
AS SELECT *, event_op() AS op, cdc_prev FROM foo`)
foo := feed(t, f, `CREATE CHANGEFEED AS SELECT *, event_op() AS op, cdc_prev FROM foo`)
defer closeFeed(t, foo)

// 'initial' is skipped because only the latest value ('updated') is
Expand Down Expand Up @@ -318,7 +314,7 @@ func TestChangefeedBasicQueryWrapped(t *testing.T) {
// Currently, parquet format (which may be injected by feed() call), doesn't
// know how to handle tuple types (cdc_prev); so, force JSON format.
foo := feed(t, f, `
CREATE CHANGEFEED WITH envelope='wrapped', format='json', diff
CREATE CHANGEFEED WITH envelope='wrapped', format='parquet', diff
AS SELECT b||a AS ba, event_op() AS op FROM foo`)
defer closeFeed(t, foo)

Expand Down Expand Up @@ -347,7 +343,7 @@ AS SELECT b||a AS ba, event_op() AS op FROM foo`)
})
}

cdcTest(t, testFn, feedTestForceSink("webhook"))
cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

// Same test as TestChangefeedBasicQueryWrapped, but this time using AVRO.
Expand Down Expand Up @@ -581,7 +577,7 @@ func TestChangefeedDiff(t *testing.T) {
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`)

foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH diff`)
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH diff, format=parquet`)
defer closeFeed(t, foo)

// 'initial' is skipped because only the latest value ('updated') is
Expand Down Expand Up @@ -613,7 +609,7 @@ func TestChangefeedDiff(t *testing.T) {
})
}

cdcTest(t, testFn)
cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

func TestChangefeedTenants(t *testing.T) {
Expand Down Expand Up @@ -7212,7 +7208,7 @@ func TestChangefeedEndTime(t *testing.T) {
sqlDB.Exec(t, "INSERT INTO foo VALUES (1), (2), (3)")

fakeEndTime := s.Server.Clock().Now().Add(int64(time.Hour), 0).AsOfSystemTime()
feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH end_time = $1", fakeEndTime)
feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH end_time = $1, format=parquet", fakeEndTime)
defer closeFeed(t, feed)

assertPayloads(t, feed, []string{
Expand All @@ -7229,7 +7225,7 @@ func TestChangefeedEndTime(t *testing.T) {
}))
}

cdcTest(t, testFn, feedTestEnterpriseSinks)
cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

func TestChangefeedEndTimeWithCursor(t *testing.T) {
Expand Down
7 changes: 1 addition & 6 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,12 +438,7 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt

// ParquetFormatUnsupportedOptions is options that are not supported with the
// parquet format.
//
// OptKeyInValue is disallowed because parquet files have no concept of key
// columns, so there is no reason to emit duplicate key datums.
//
// TODO(#103129): add support for some of these
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, OptKeyInValue, OptTopicInValue)
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptTopicInValue)

// AlterChangefeedUnsupportedOptions are changefeed options that we do not allow
// users to alter.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func TestOptionsValidations(t *testing.T) {
}{
{map[string]string{"format": "txt"}, false, "unknown format"},
{map[string]string{"initial_scan": "", "no_initial_scan": ""}, false, "cannot specify both"},
{map[string]string{"diff": "", "format": "parquet"}, false, "cannot specify both"},
{map[string]string{"format": "txt"}, true, "unknown format"},
{map[string]string{"initial_scan": "", "no_initial_scan": ""}, true, "cannot specify both"},
{map[string]string{"format": "parquet", "topic_in_value": ""}, false, "cannot specify both"},
// Verify that the returned error uses the syntax initial_scan='yes' instead of initial_scan_only. See #97008.
{map[string]string{"initial_scan_only": "", "resolved": ""}, true, "cannot specify both initial_scan='only'"},
{map[string]string{"initial_scan_only": "", "resolved": ""}, true, "cannot specify both initial_scan='only'"},
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,9 @@ func randomSinkTypeWithOptions(options feedTestOptions) string {
sinkWeights[sinkType] = 0
}
}
if weight, ok := sinkWeights["cloudstorage"]; ok && weight != 0 {
sinkWeights = map[string]int{"cloudstorage": 1}
}
weightTotal := 0
for _, weight := range sinkWeights {
weightTotal += weight
Expand Down
67 changes: 57 additions & 10 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ import (
"io"
"strconv"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/cockroachdb/errors"
)
Expand All @@ -33,6 +36,9 @@ type parquetWriter struct {
encodingOpts changefeedbase.EncodingOptions
schemaDef *parquet.SchemaDefinition
datumAlloc []tree.Datum

// Cached object builder for when using the `diff` option.
vb *json.FixedKeysObjectBuilder
}

// newParquetSchemaDefintion returns a parquet schema definition based on the
Expand Down Expand Up @@ -68,6 +74,7 @@ func newParquetSchemaDefintion(

const parquetOptUpdatedTimestampColName = metaSentinel + changefeedbase.OptUpdatedTimestamps
const parquetOptMVCCTimestampColName = metaSentinel + changefeedbase.OptMVCCTimestamps
const parquetOptDiffColName = metaSentinel + "before"

func appendMetadataColsToSchema(
columnNames []string, columnTypes []*types.T, encodingOpts changefeedbase.EncodingOptions,
Expand All @@ -80,6 +87,10 @@ func appendMetadataColsToSchema(
columnNames = append(columnNames, parquetOptMVCCTimestampColName)
columnTypes = append(columnTypes, types.String)
}
if encodingOpts.Diff {
columnNames = append(columnNames, parquetOptDiffColName)
columnTypes = append(columnTypes, types.Json)
}
return columnNames, columnTypes
}

Expand Down Expand Up @@ -118,7 +129,7 @@ func newParquetWriterFromRow(
func (w *parquetWriter) addData(
updatedRow cdcevent.Row, prevRow cdcevent.Row, updated, mvcc hlc.Timestamp,
) error {
if err := populateDatums(updatedRow, prevRow, w.encodingOpts, updated, mvcc, w.datumAlloc); err != nil {
if err := w.populateDatums(updatedRow, prevRow, updated, mvcc); err != nil {
return err
}

Expand All @@ -131,14 +142,10 @@ func (w *parquetWriter) close() error {
}

// populateDatums writes the appropriate datums into the datumAlloc slice.
func populateDatums(
updatedRow cdcevent.Row,
prevRow cdcevent.Row,
encodingOpts changefeedbase.EncodingOptions,
updated, mvcc hlc.Timestamp,
datumAlloc []tree.Datum,
func (w *parquetWriter) populateDatums(
updatedRow cdcevent.Row, prevRow cdcevent.Row, updated, mvcc hlc.Timestamp,
) error {
datums := datumAlloc[:0]
datums := w.datumAlloc[:0]

if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error {
datums = append(datums, d)
Expand All @@ -148,12 +155,47 @@ func populateDatums(
}
datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString())

if encodingOpts.UpdatedTimestamps {
if w.encodingOpts.UpdatedTimestamps {
datums = append(datums, tree.NewDString(timestampToString(updated)))
}
if encodingOpts.MVCCTimestamps {
if w.encodingOpts.MVCCTimestamps {
datums = append(datums, tree.NewDString(timestampToString(mvcc)))
}
if w.encodingOpts.Diff {
if prevRow.IsDeleted() {
datums = append(datums, tree.DNull)
} else {
if w.vb == nil {
keys := make([]string, 0, len(prevRow.ResultColumns()))
_ = prevRow.ForEachColumn().Col(func(col cdcevent.ResultColumn) error {
keys = append(keys, col.Name)
return nil
})
valueBuilder, err := json.NewFixedKeysObjectBuilder(keys)
if err != nil {
return err
}
w.vb = valueBuilder
}

if err := prevRow.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
j, err := tree.AsJSON(d, sessiondatapb.DataConversionConfig{}, time.UTC)
if err != nil {
return err
}
return w.vb.Set(col.Name, j)
}); err != nil {
return err
}

j, err := w.vb.Build()
if err != nil {
return err
}
datums = append(datums, tree.NewDJSON(j))
}
}

return nil
}

Expand Down Expand Up @@ -228,6 +270,11 @@ func addParquetTestMetadata(
valueCols[parquetOptMVCCTimestampColName] = idx
idx += 1
}
if encodingOpts.Diff {
valuesInOrder = append(valuesInOrder, parquetOptDiffColName)
valueCols[parquetOptDiffColName] = idx
idx += 1
}

parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)}))
parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)}))
Expand Down
51 changes: 29 additions & 22 deletions pkg/ccl/changefeedccl/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand All @@ -54,29 +55,39 @@ func TestParquetRows(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(db)

for _, tc := range []struct {
testName string
createTable string
inserts []string
testName string
createTable string
stmts []string
expectedDatumRows [][]tree.Datum
}{
{
testName: "mixed",
createTable: `CREATE TABLE foo (
int32Col INT4 PRIMARY KEY,
varCharCol VARCHAR(16) ,
charCol CHAR(2),
tsCol TIMESTAMP ,
stringCol STRING ,
decimalCOl DECIMAL(12,2),
uuidCol UUID
)`,
inserts: []string{
`INSERT INTO foo values (0, 'zero', 'CA', now(), 'oiwjfoijsdjif', 'inf', gen_random_uuid())`,
`INSERT INTO foo values (1, 'one', 'NY', now(), 'sdi9fu90d', '-1.90', gen_random_uuid())`,
`INSERT INTO foo values (2, 'two', 'WA', now(), 'sd9fid9fuj', '0.01', gen_random_uuid())`,
`INSERT INTO foo values (3, 'three', 'ON', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`,
`INSERT INTO foo values (4, 'four', 'NS', now(), '123123', '-11222221.2', gen_random_uuid())`,
`INSERT INTO foo values (5, 'five', 'BC', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`,
`INSERT INTO foo values (6, 'siz', 'AB', now(), '123123', '-11222221.2', gen_random_uuid())`,
stmts: []string{
`INSERT INTO foo VALUES (0, 'a1', '2fec7a4b-0a78-40ce-92e0-d1c0fac70436')`,
`INSERT INTO foo VALUES (1, 'b1', '0ce43188-e4a9-4b73-803b-a253abc57e6b')`,
`INSERT INTO foo VALUES (2, 'c1', '5a02bd48-ba64-4134-9199-844c1517f722')`,
`UPDATE foo SET stringCol = 'changed' WHERE int32Col = 1`,
`DELETE FROM foo WHERE int32Col = 0`,
},
expectedDatumRows: [][]tree.Datum{
{tree.NewDInt(0), tree.NewDString("a1"),
&tree.DUuid{uuid.FromStringOrNil("2fec7a4b-0a78-40ce-92e0-d1c0fac70436")},
parquetEventTypeDatumStringMap[parquetEventInsert]},
{tree.NewDInt(1), tree.NewDString("b1"),
&tree.DUuid{uuid.FromStringOrNil("0ce43188-e4a9-4b73-803b-a253abc57e6b")},
parquetEventTypeDatumStringMap[parquetEventInsert]},
{tree.NewDInt(2), tree.NewDString("c1"),
&tree.DUuid{uuid.FromStringOrNil("5a02bd48-ba64-4134-9199-844c1517f722")},
parquetEventTypeDatumStringMap[parquetEventInsert]},
{tree.NewDInt(1), tree.NewDString("changed"),
&tree.DUuid{uuid.FromStringOrNil("0ce43188-e4a9-4b73-803b-a253abc57e6b")},
parquetEventTypeDatumStringMap[parquetEventUpdate]},
{tree.NewDInt(0), tree.DNull, tree.DNull, parquetEventTypeDatumStringMap[parquetEventDelete]},
},
},
} {
Expand Down Expand Up @@ -104,8 +115,8 @@ func TestParquetRows(t *testing.T) {
}
}()

numRows := len(tc.inserts)
for _, insertStmt := range tc.inserts {
numRows := len(tc.stmts)
for _, insertStmt := range tc.stmts {
sqlDB.Exec(t, insertStmt)
}

Expand Down Expand Up @@ -135,11 +146,7 @@ func TestParquetRows(t *testing.T) {
err = writer.addData(updatedRow, prevRow, hlc.Timestamp{}, hlc.Timestamp{})
require.NoError(t, err)

// Save a copy of the datums we wrote.
datumRow := make([]tree.Datum, writer.schemaDef.NumColumns())
err = populateDatums(updatedRow, prevRow, encodingOpts, hlc.Timestamp{}, hlc.Timestamp{}, datumRow)
require.NoError(t, err)
datums[i] = datumRow
datums[i] = tc.expectedDatumRows[i]
}

err = writer.close()
Expand Down
Loading