Skip to content

Commit

Permalink
Merge #74916 #75061
Browse files Browse the repository at this point in the history
74916: changefeedccl: filter out virtual computed columns from events r=sherman-grewal a=sherman-grewal

changefeedccl: filter out virtual computed columns from events

Resolves #74688

Release note (backward-incompatible change): Changefeeds will
now filter out virtual computed columns from events by default.

75061: dev,bazel: miscellaneous improvements r=irfansharif a=irfansharif

**build,bazel: avoid stamping builds with commit sha**

When building development binaries, we already skip tagging each build
with the UTC time to avoid busting the build cache. We could go further
and not tag crdb binaries with the current SHA/last git tag with the
knowledge that we’re not shipping binaries from local machines. This
shaves off around ~30s of go link time on my machine when switching
branches, regardless of cache hit rate. It also shaves a 30s delay
whenever a commit is amended/rebased but the source is unaffected (when
rewording commits for e.g., or during interactive rebase).

**dev: improve help message**

CRDB-specific configs should probably be placed in the gitignored
.bazelrc.user. Using the top-level rc file mucks with bazel builds for
other repositories.

**bazel: avoid writing to bazel-out**

In #74771 we put a regular file in bazel-out to prevent bazel from doing
it automatically (this was to prevent tools from tripping over when
finding extra go files). Bazel exposes a top-level flag we can use
instead.


Co-authored-by: Sherman Grewal <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
3 people committed Jan 19, 2022
3 parents eb26eb3 + 9e3d60c + 69e542d commit d990758
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 63 deletions.
14 changes: 9 additions & 5 deletions .bazelrc
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
try-import %workspace%/.bazelrc.user

build --symlink_prefix=_bazel/ --ui_event_filters=-DEBUG --define gotags=bazel,gss --experimental_proto_descriptor_sets_include_source_info --incompatible_strict_action_env --incompatible_enable_cc_toolchain_resolution
build --define gotags=bazel,gss
build --experimental_proto_descriptor_sets_include_source_info
build --incompatible_strict_action_env --incompatible_enable_cc_toolchain_resolution
build --symlink_prefix=_bazel/ --experimental_no_product_name_out_symlink
test --config=test --experimental_ui_max_stdouterr_bytes=10485760
build:with_ui --define cockroach_with_ui=y
build:test --define crdb_test=y
build:race --@io_bazel_rules_go//go/config:race --test_env=GORACE=halt_on_error=1 --test_sharding_strategy=disabled
test:test --test_env=TZ=
test:race --test_timeout=1200,6000,18000,72000

build --ui_event_filters=-DEBUG
query --ui_event_filters=-DEBUG

# CI should always run with `--config=ci`.
Expand All @@ -22,7 +27,7 @@ test:ci --test_tmpdir=/artifacts/tmp
build:cross --stamp
build:cross --define cockroach_cross=y

# cross-compilation configurations. Add e.g. --config=crosslinux to turn these on.
# Cross-compilation configurations. Add e.g. --config=crosslinux to turn these on.
build:crosslinux --platforms=//build/toolchains:cross_linux
build:crosslinux '--workspace_status_command=./build/bazelutil/stamp.sh x86_64-pc-linux-gnu'
build:crosslinux --config=cross
Expand All @@ -36,10 +41,9 @@ build:crosslinuxarm --platforms=//build/toolchains:cross_linux_arm
build:crosslinuxarm '--workspace_status_command=./build/bazelutil/stamp.sh aarch64-unknown-linux-gnu'
build:crosslinuxarm --config=cross

# developer configurations. Add e.g. --config=devdarwinx86_64 to turn these on.
# Developer configurations. Add e.g. --config=devdarwinx86_64 to turn these on.
# NB: This is consumed in `BUILD` files (see build/toolchains/BUILD.bazel).
build:devdarwinx86_64 --platforms=//build/toolchains:darwin_x86_64
# NOTE(ricky): This is consumed in `BUILD` files (see
# `build/toolchains/BUILD.bazel`).
build:devdarwinx86_64 --config=dev
build:dev --define cockroach_bazel_dev=y
build:dev --stamp --workspace_status_command=./build/bazelutil/stamp.sh
Expand Down
3 changes: 0 additions & 3 deletions bazel-out

This file was deleted.

4 changes: 2 additions & 2 deletions build/bazelutil/stamp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ fi
# * https://docs.bazel.build/versions/main/user-manual.html#workspace_status
# * https://github.com/bazelbuild/rules_go/blob/master/go/core.rst#defines-and-stamping
cat <<EOF
STABLE_BUILD_GIT_COMMIT ${GIT_COMMIT-}
STABLE_BUILD_GIT_TAG ${GIT_TAG-}
STABLE_BUILD_GIT_BUILD_TYPE ${GIT_BUILD_TYPE-}
STABLE_BUILD_TARGET_TRIPLE ${TARGET_TRIPLE-}
BUILD_GIT_COMMIT ${GIT_COMMIT-}
BUILD_GIT_TAG ${GIT_TAG-}
BUILD_GIT_UTCTIME ${GIT_UTCTIME-}
EOF
4 changes: 2 additions & 2 deletions pkg/build/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ go_library(
visibility = ["//visibility:public"],
x_defs = {
"github.com/cockroachdb/cockroach/pkg/build.cgoTargetTriple": "{STABLE_BUILD_TARGET_TRIPLE}",
"github.com/cockroachdb/cockroach/pkg/build.rev": "{STABLE_BUILD_GIT_COMMIT}",
"github.com/cockroachdb/cockroach/pkg/build.tag": "{STABLE_BUILD_GIT_TAG}",
"github.com/cockroachdb/cockroach/pkg/build.typ": "{STABLE_BUILD_GIT_BUILD_TYPE}",
"github.com/cockroachdb/cockroach/pkg/build.rev": "{BUILD_GIT_COMMIT}",
"github.com/cockroachdb/cockroach/pkg/build.tag": "{BUILD_GIT_TAG}",
"github.com/cockroachdb/cockroach/pkg/build.utcTime": "{BUILD_GIT_UTCTIME}",
},
deps = [
Expand Down
9 changes: 8 additions & 1 deletion pkg/ccl/changefeedccl/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/geo"
"github.com/cockroachdb/cockroach/pkg/geo/geopb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand Down Expand Up @@ -754,7 +755,10 @@ const (
// If a name suffix is provided (as opposed to avroSchemaNoSuffix), it will be
// appended to the end of the avro record's name.
func tableToAvroSchema(
tableDesc catalog.TableDescriptor, nameSuffix string, namespace string,
tableDesc catalog.TableDescriptor,
nameSuffix string,
namespace string,
virtualColumnVisibility string,
) (*avroDataRecord, error) {
name := SQLNameToAvroName(tableDesc.GetName())
if nameSuffix != avroSchemaNoSuffix {
Expand All @@ -771,6 +775,9 @@ func tableToAvroSchema(
fieldIdxByColIdx: make(map[int]int),
}
for _, col := range tableDesc.PublicColumns() {
if col.IsVirtual() && virtualColumnVisibility == string(changefeedbase.OptVirtualColumnsOmitted) {
continue
}
field, err := columnToAvroSchema(col)
if err != nil {
return nil, err
Expand Down
17 changes: 9 additions & 8 deletions pkg/ccl/changefeedccl/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/importccl"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -137,7 +138,7 @@ func parseAvroSchema(j string) (*avroDataRecord, error) {
}
tableDesc.Columns = append(tableDesc.Columns, *colDesc)
}
return tableToAvroSchema(tabledesc.NewBuilder(&tableDesc).BuildImmutableTable(), avroSchemaNoSuffix, "")
return tableToAvroSchema(tabledesc.NewBuilder(&tableDesc).BuildImmutableTable(), avroSchemaNoSuffix, "", string(changefeedbase.OptVirtualColumnsOmitted))
}

func avroFieldMetadataToColDesc(metadata string) (*descpb.ColumnDescriptor, error) {
Expand Down Expand Up @@ -360,7 +361,7 @@ func TestAvroSchema(t *testing.T) {
tableDesc, err := parseTableDesc(
fmt.Sprintf(`CREATE TABLE "%s" %s`, test.name, test.schema))
require.NoError(t, err)
origSchema, err := tableToAvroSchema(tableDesc, avroSchemaNoSuffix, "")
origSchema, err := tableToAvroSchema(tableDesc, avroSchemaNoSuffix, "", string(changefeedbase.OptVirtualColumnsOmitted))
require.NoError(t, err)
jsonSchema := origSchema.codec.Schema()
roundtrippedSchema, err := parseAvroSchema(jsonSchema)
Expand Down Expand Up @@ -396,7 +397,7 @@ func TestAvroSchema(t *testing.T) {
t.Run("escaping", func(t *testing.T) {
tableDesc, err := parseTableDesc(`CREATE TABLE "☃" (🍦 INT PRIMARY KEY)`)
require.NoError(t, err)
tableSchema, err := tableToAvroSchema(tableDesc, avroSchemaNoSuffix, "")
tableSchema, err := tableToAvroSchema(tableDesc, avroSchemaNoSuffix, "", string(changefeedbase.OptVirtualColumnsOmitted))
require.NoError(t, err)
require.Equal(t,
`{"type":"record","name":"_u2603_","fields":[`+
Expand Down Expand Up @@ -603,7 +604,7 @@ func TestAvroSchema(t *testing.T) {
rows, err := parseValues(tableDesc, `VALUES (1, `+test.sql+`)`)
require.NoError(t, err)

schema, err := tableToAvroSchema(tableDesc, avroSchemaNoSuffix, "")
schema, err := tableToAvroSchema(tableDesc, avroSchemaNoSuffix, "", string(changefeedbase.OptVirtualColumnsOmitted))
require.NoError(t, err)
textual, err := schema.textualFromRow(rows[0])
require.NoError(t, err)
Expand Down Expand Up @@ -653,7 +654,7 @@ func TestAvroSchema(t *testing.T) {
rows, err := parseValues(tableDesc, `VALUES (1, `+test.sql+`)`)
require.NoError(t, err)

schema, err := tableToAvroSchema(tableDesc, avroSchemaNoSuffix, "")
schema, err := tableToAvroSchema(tableDesc, avroSchemaNoSuffix, "", string(changefeedbase.OptVirtualColumnsOmitted))
require.NoError(t, err)
textual, err := schema.textualFromRow(rows[0])
require.NoError(t, err)
Expand Down Expand Up @@ -756,12 +757,12 @@ func TestAvroMigration(t *testing.T) {
writerDesc, err := parseTableDesc(
fmt.Sprintf(`CREATE TABLE "%s" %s`, test.name, test.writerSchema))
require.NoError(t, err)
writerSchema, err := tableToAvroSchema(writerDesc, avroSchemaNoSuffix, "")
writerSchema, err := tableToAvroSchema(writerDesc, avroSchemaNoSuffix, "", string(changefeedbase.OptVirtualColumnsOmitted))
require.NoError(t, err)
readerDesc, err := parseTableDesc(
fmt.Sprintf(`CREATE TABLE "%s" %s`, test.name, test.readerSchema))
require.NoError(t, err)
readerSchema, err := tableToAvroSchema(readerDesc, avroSchemaNoSuffix, "")
readerSchema, err := tableToAvroSchema(readerDesc, avroSchemaNoSuffix, "", string(changefeedbase.OptVirtualColumnsOmitted))
require.NoError(t, err)

writerRows, err := parseValues(writerDesc, `VALUES `+test.writerValues)
Expand Down Expand Up @@ -838,7 +839,7 @@ func benchmarkEncodeType(b *testing.B, typ *types.T, encRow rowenc.EncDatumRow)
tableDesc, err := parseTableDesc(
fmt.Sprintf(`CREATE TABLE bench_table (bench_field %s)`, typ.SQLString()))
require.NoError(b, err)
schema, err := tableToAvroSchema(tableDesc, "suffix", "namespace")
schema, err := tableToAvroSchema(tableDesc, "suffix", "namespace", string(changefeedbase.OptVirtualColumnsOmitted))
require.NoError(b, err)

b.ReportAllocs()
Expand Down
14 changes: 13 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func changefeedPlanHook(
if err := changefeedbase.ValidateTable(targets, table); err != nil {
return err
}
for _, warning := range changefeedbase.WarningsForTable(targets, table) {
for _, warning := range changefeedbase.WarningsForTable(targets, table, opts) {
p.BufferClientNotice(ctx, pgnotice.Newf("%s", warning))
}
}
Expand Down Expand Up @@ -623,6 +623,18 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
changefeedbase.OptOnErrorFail)
}
}
{
const opt = changefeedbase.OptVirtualColumns
switch v := changefeedbase.VirtualColumnVisibility(details.Opts[opt]); v {
case ``, changefeedbase.OptVirtualColumnsOmitted:
details.Opts[opt] = string(changefeedbase.OptVirtualColumnsOmitted)
case changefeedbase.OptVirtualColumnsNull:
details.Opts[opt] = string(changefeedbase.OptVirtualColumnsNull)
default:
return jobspb.ChangefeedDetails{}, errors.Errorf(
`unknown %s: %s`, opt, v)
}
}
return details, nil
}

Expand Down
79 changes: 59 additions & 20 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2285,30 +2285,69 @@ func TestChangefeedVirtualComputedColumn(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE cc (
a INT primary key, b INT, c INT AS (b + 1) VIRTUAL NOT NULL
)`)
sqlDB.Exec(t, `INSERT INTO cc VALUES (1, 1)`)
tests := map[string]struct {
formatOpt changefeedbase.FormatType
virtualColumnVisibility changefeedbase.VirtualColumnVisibility
changeFeedStmt string
payloadAfterInsert []string
payloadAfterUpdate []string
}{
`format="json",virtual_columns="omitted"`: {
formatOpt: changefeedbase.OptFormatJSON,
virtualColumnVisibility: changefeedbase.OptVirtualColumnsOmitted,
payloadAfterInsert: []string{`cc: [1]->{"after": {"a": 1, "b": 1}, "before": null}`},
payloadAfterUpdate: []string{`cc: [1]->{"after": {"a": 1, "b": 10}, "before": {"a": 1, "b": 1}}`},
},
`format="json",virtual_columns="null"`: {
formatOpt: changefeedbase.OptFormatJSON,
virtualColumnVisibility: changefeedbase.OptVirtualColumnsNull,
payloadAfterInsert: []string{`cc: [1]->{"after": {"a": 1, "b": 1, "c": null}, "before": null}`},
payloadAfterUpdate: []string{`cc: [1]->{"after": {"a": 1, "b": 10, "c": null}, "before": {"a": 1, "b": 1, "c": null}}`},
},
`format="avro",virtual_columns="omitted"`: {
formatOpt: changefeedbase.OptFormatAvro,
virtualColumnVisibility: changefeedbase.OptVirtualColumnsOmitted,
payloadAfterInsert: []string{`cc: {"a":{"long":1}}->{"after":{"cc":{"a":{"long":1},"b":{"long":1}}},"before":null}`},
payloadAfterUpdate: []string{`cc: {"a":{"long":1}}->{"after":{"cc":{"a":{"long":1},"b":{"long":10}}},"before":{"cc_before":{"a":{"long":1},"b":{"long":1}}}}`},
},
`format="avro",virtual_columns="null"`: {
formatOpt: changefeedbase.OptFormatAvro,
virtualColumnVisibility: changefeedbase.OptVirtualColumnsNull,
payloadAfterInsert: []string{`cc: {"a":{"long":1}}->{"after":{"cc":{"a":{"long":1},"b":{"long":1},"c":null}},"before":null}`},
payloadAfterUpdate: []string{`cc: {"a":{"long":1}}->{"after":{"cc":{"a":{"long":1},"b":{"long":10},"c":null}},"before":{"cc_before":{"a":{"long":1},"b":{"long":1},"c":null}}}`},
},
}

cc := feed(t, f, `CREATE CHANGEFEED FOR cc with diff`)
defer closeFeed(t, cc)
for _, test := range tests {
testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)

assertPayloads(t, cc, []string{
`cc: [1]->{"after": {"a": 1, "b": 1, "c": null}, "before": null}`,
})
sqlDB.Exec(t, `CREATE TABLE cc (
a INT primary key, b INT, c INT AS (b + 1) VIRTUAL NOT NULL
)`)
defer sqlDB.Exec(t, `DROP TABLE cc`)

sqlDB.Exec(t, `UPDATE cc SET b=10 WHERE a=1`)
assertPayloads(t, cc, []string{
`cc: [1]->{"after": {"a": 1, "b": 10, "c": null}, "before": {"a": 1, "b": 1, "c": null}}`,
})
}
sqlDB.Exec(t, `INSERT INTO cc VALUES (1, 1)`)

t.Run(`sinkless`, sinklessTest(testFn))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
t.Run(`webhook`, webhookTest(testFn))
changeFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR cc WITH diff, format="%s", virtual_columns="%s"`,
test.formatOpt, test.virtualColumnVisibility))
defer closeFeed(t, changeFeed)

assertPayloads(t, changeFeed, test.payloadAfterInsert)

sqlDB.Exec(t, `UPDATE cc SET b=10 WHERE a=1`)

assertPayloads(t, changeFeed, test.payloadAfterUpdate)
}

if test.formatOpt != changefeedbase.OptFormatAvro {
t.Run(`sinkless`, sinklessTest(testFn))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`webhook`, webhookTest(testFn))
}

t.Run(`kafka`, kafkaTest(testFn))
}
}

func TestChangefeedUpdatePrimaryKey(t *testing.T) {
Expand Down
11 changes: 10 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type SchemaChangeEventClass string
// change event which is a member of the changefeed's schema change events.
type SchemaChangePolicy string

// VirtualColumnVisibility defines the behaviour of how the changefeed will
// include virtual columns in an event
type VirtualColumnVisibility string

// Constants for the options.
const (
OptAvroSchemaPrefix = `avro_schema_prefix`
Expand All @@ -50,6 +54,10 @@ const (
OptWebhookClientTimeout = `webhook_client_timeout`
OptOnError = `on_error`
OptMetricsScope = `metrics_label`
OptVirtualColumns = `virtual_columns`

OptVirtualColumnsOmitted VirtualColumnVisibility = `omitted`
OptVirtualColumnsNull VirtualColumnVisibility = `null`

// OptSchemaChangeEventClassColumnChange corresponds to all schema change
// events which add or remove any column.
Expand Down Expand Up @@ -170,6 +178,7 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{
OptWebhookClientTimeout: sql.KVStringOptRequireValue,
OptOnError: sql.KVStringOptRequireValue,
OptMetricsScope: sql.KVStringOptRequireValue,
OptVirtualColumns: sql.KVStringOptRequireValue,
}

func makeStringSet(opts ...string) map[string]struct{} {
Expand All @@ -189,7 +198,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEnvelope,
OptSchemaChangeEvents, OptSchemaChangePolicy,
OptProtectDataFromGCOnPause, OptOnError,
OptInitialScan, OptNoInitialScan,
OptMinCheckpointFrequency, OptMetricsScope)
OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns)

// SQLValidOptions is options exclusive to SQL sink
var SQLValidOptions map[string]struct{} = nil
Expand Down
16 changes: 10 additions & 6 deletions pkg/ccl/changefeedccl/changefeedbase/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDesc
}

// WarningsForTable returns any known nonfatal issues with running a changefeed on this kind of table.
func WarningsForTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDescriptor) []error {
func WarningsForTable(
targets jobspb.ChangefeedTargets, tableDesc catalog.TableDescriptor, opts map[string]string,
) []error {
warnings := []error{}
for _, col := range tableDesc.AccessibleColumns() {
if col.IsVirtual() {
warnings = append(warnings,
errors.Errorf("Changefeeds will emit null values for virtual column %s in table %s", col.ColName(), tableDesc.GetName()),
)
if _, ok := opts[OptVirtualColumns]; !ok {
for _, col := range tableDesc.AccessibleColumns() {
if col.IsVirtual() {
warnings = append(warnings,
errors.Errorf("Changefeeds will filter out values for virtual column %s in table %s", col.ColName(), tableDesc.GetName()),
)
}
}
}
return warnings
Expand Down
Loading

0 comments on commit d990758

Please sign in to comment.