Skip to content

Commit

Permalink
changefeedccl: support key_in_value with parquet format
Browse files Browse the repository at this point in the history
Previously, the option `key_in_value` was disallowed with
`format=parquet`. This change allows these settings to be
used together. Note that `key_in_value` is enabled by
default with `cloudstorage` sinks and `format=parquet` is
only allowed with cloudstorage sinks, so `key_in_value` is
enabled for parquet by default.

Informs: cockroachdb#103129
Informs: cockroachdb#99028
Epic: CRDB-27372
Release note: None
  • Loading branch information
jayshrivastava committed Jun 21, 2023
1 parent 9c88db0 commit 0cadd55
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 30 deletions.
5 changes: 1 addition & 4 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,8 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt
// ParquetFormatUnsupportedOptions is options that are not supported with the
// parquet format.
//
// OptKeyInValue is disallowed because parquet files have no concept of key
// columns, so there is no reason to emit duplicate key datums.
//
// TODO(#103129): add support for some of these
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, OptKeyInValue, OptTopicInValue)
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, OptTopicInValue)

// AlterChangefeedUnsupportedOptions are changefeed options that we do not allow
// users to alter.
Expand Down
57 changes: 31 additions & 26 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,41 +1058,46 @@ func (f *cloudFeedFactory) Feed(
tree.KVOption{Key: changefeedbase.OptKeyInValue},
)
}
// Determine if we can enable the parquet format if the changefeed is not
// being created with incompatible options. If it can be enabled, we will use
// parquet format with a probability of 0.4.
parquetPossible := includeParquestTestMetadata

formatSpecified := false
explicitEnvelope := false
for _, opt := range createStmt.Options {
if string(opt.Key) == changefeedbase.OptFormat {
formatSpecified = true
}
if string(opt.Key) == changefeedbase.OptEnvelope {
explicitEnvelope = true
}
if string(opt.Key) == changefeedbase.OptFormat &&
opt.Value.String() != string(changefeedbase.OptFormatParquet) {
}

if !formatSpecified {
// Determine if we can enable the parquet format if the changefeed is not
// being created with incompatible options. If it can be enabled, we will use
// parquet format with a probability of 0.4.
parquetPossible := includeParquestTestMetadata
for _, opt := range createStmt.Options {
for o := range changefeedbase.ParquetFormatUnsupportedOptions {
if o == string(opt.Key) {
parquetPossible = false
break
}
}
}
randNum := rand.Intn(5)
if randNum < 3 {
parquetPossible = false
break
}
for o := range changefeedbase.ParquetFormatUnsupportedOptions {
if o == string(opt.Key) {
parquetPossible = false
break
}
if parquetPossible {
log.Infof(context.Background(), "using parquet format")
createStmt.Options = append(
createStmt.Options,
tree.KVOption{
Key: changefeedbase.OptFormat,
Value: tree.NewStrVal(string(changefeedbase.OptFormatParquet)),
},
)
}
}
randNum := rand.Intn(5)
if randNum < 2 {
parquetPossible = false
}
if parquetPossible {
log.Infof(context.Background(), "using parquet format")
createStmt.Options = append(
createStmt.Options,
tree.KVOption{
Key: changefeedbase.OptFormat,
Value: tree.NewStrVal(string(changefeedbase.OptFormatParquet)),
},
)
}

feedDir := feedSubDir()
sinkURI := `nodelocal://1/` + feedDir
Expand Down

0 comments on commit 0cadd55

Please sign in to comment.