Skip to content

Commit

Permalink
config(ticdc): enable-old-value always false if using avro or csv as …
Browse files Browse the repository at this point in the history
…the encoding protocol (pingcap#9079) (pingcap#9127)

close pingcap#9086
  • Loading branch information
ti-chi-bot authored Jun 5, 2023
1 parent 00886f7 commit 1e2f277
Show file tree
Hide file tree
Showing 48 changed files with 622 additions and 378 deletions.
25 changes: 0 additions & 25 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,32 +189,7 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
if err != nil {
return nil, err
}
if !replicaCfg.EnableOldValue {
sinkURIParsed, err := url.Parse(cfg.SinkURI)
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}

protocol := sinkURIParsed.Query().Get(config.ProtocolKey)
if protocol != "" {
replicaCfg.Sink.Protocol = protocol
}
for _, fp := range config.ForceEnableOldValueProtocols {
if replicaCfg.Sink.Protocol == fp {
log.Warn(
"Attempting to replicate without old value enabled. "+
"CDC will enable old value and continue.",
zap.String("protocol", replicaCfg.Sink.Protocol))
replicaCfg.EnableOldValue = true
break
}
}

if replicaCfg.ForceReplicate {
return nil, cerror.ErrOldValueNotEnabled.GenWithStackByArgs(
"if use force replicate, old value feature must be enabled")
}
}
f, err := filter.NewFilter(replicaCfg, "")
if err != nil {
return nil, errors.Cause(err)
Expand Down
29 changes: 28 additions & 1 deletion cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
cfg.ReplicaConfig = GetDefaultReplicaConfig()
cfg.ReplicaConfig.ForceReplicate = true
cfg.ReplicaConfig.EnableOldValue = false
// disable old value but force replicate
cfg.SinkURI = "mysql://"
// disable old value but force replicate, and using mysql sink.
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
require.Nil(t, cfInfo)
cfg.ReplicaConfig.ForceReplicate = false
cfg.ReplicaConfig.IgnoreIneligibleTable = true
cfg.SinkURI = "blackhole://"
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.Nil(t, err)
require.NotNil(t, cfInfo)
Expand Down Expand Up @@ -88,6 +90,19 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
cfg.SinkURI = string([]byte{0x7f, ' '})
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)

cfg.StartTs = 0
// use blackhole to workaround
cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro"
cfg.ReplicaConfig.EnableOldValue = true
cfg.ReplicaConfig.ForceReplicate = false
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NoError(t, err)
require.False(t, cfInfo.Config.EnableOldValue)

cfg.ReplicaConfig.ForceReplicate = true
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.Error(t, cerror.ErrOldValueNotEnabled, err)
}

func TestVerifyUpdateChangefeedConfig(t *testing.T) {
Expand Down Expand Up @@ -140,4 +155,16 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
cfg.TargetTs = 9
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NotNil(t, err)

cfg.StartTs = 0
cfg.TargetTs = 0
cfg.ReplicaConfig.EnableOldValue = true
cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro"
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NoError(t, err)
require.False(t, newCfInfo.Config.EnableOldValue)

cfg.ReplicaConfig.ForceReplicate = true
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.Error(t, cerror.ErrOldValueNotEnabled, err)
}
47 changes: 16 additions & 31 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ type Mounter interface {
type mounter struct {
schemaStorage SchemaStorage
tz *time.Location
enableOldValue bool
changefeedID model.ChangeFeedID
filter pfilter.Filter
metricTotalRows prometheus.Gauge
Expand All @@ -83,13 +82,11 @@ func NewMounter(schemaStorage SchemaStorage,
changefeedID model.ChangeFeedID,
tz *time.Location,
filter pfilter.Filter,
enableOldValue bool,
) Mounter {
return &mounter{
schemaStorage: schemaStorage,
changefeedID: changefeedID,
enableOldValue: enableOldValue,
filter: filter,
schemaStorage: schemaStorage,
changefeedID: changefeedID,
filter: filter,
metricTotalRows: totalRowsCountGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricIgnoredDMLEventCounter: ignoredDMLEventCounter.
Expand Down Expand Up @@ -271,7 +268,7 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) {
}

func datum2Column(
tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool,
tableInfo *model.TableInfo, datums map[int64]types.Datum,
) ([]*model.Column, []types.Datum, []rowcodec.ColInfo, error) {
cols := make([]*model.Column, len(tableInfo.RowColumnsOffset))
rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset))
Expand All @@ -288,19 +285,18 @@ func datum2Column(
continue
}
colName := colInfo.Name.O
colDatums, exist := datums[colInfo.ID]
var colValue interface{}
if !exist && !fillWithDefaultValue {
log.Debug("column value is not found",
zap.String("table", tableInfo.Name.O), zap.String("column", colName))
continue
}
var err error
var warn string
var size int
colID := colInfo.ID
colDatums, exist := datums[colID]

var (
colValue interface{}
size int
warn string
err error
)
if exist {
colValue, size, warn, err = formatColVal(colDatums, colInfo)
} else if fillWithDefaultValue {
} else {
colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo)
}
if err != nil {
Expand Down Expand Up @@ -342,27 +338,16 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
if row.PreRowExist {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
preCols, preRawCols, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue)
preCols, preRawCols, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow)
if err != nil {
return nil, rawRow, errors.Trace(err)
}

// NOTICE: When the old Value feature is off,
// the Delete event only needs to keep the handle key column.
if row.Delete && !m.enableOldValue {
for i := range preCols {
col := preCols[i]
if col != nil && !col.Flag.IsHandleKey() {
preCols[i] = nil
}
}
}
}

var cols []*model.Column
var rawCols []types.Datum
if row.RowExist {
cols, rawCols, extendColumnInfos, err = datum2Column(tableInfo, row.Row, true)
cols, rawCols, extendColumnInfos, err = datum2Column(tableInfo, row.Row)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down
24 changes: 10 additions & 14 deletions cdc/entry/mounter_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@ type MounterGroup interface {
}

type mounterGroup struct {
schemaStorage SchemaStorage
inputCh chan *model.PolymorphicEvent
tz *time.Location
filter filter.Filter
enableOldValue bool

workerNum int
schemaStorage SchemaStorage
inputCh chan *model.PolymorphicEvent
tz *time.Location
filter filter.Filter
workerNum int

changefeedID model.ChangeFeedID
}
Expand All @@ -52,7 +50,6 @@ const (
func NewMounterGroup(
schemaStorage SchemaStorage,
workerNum int,
enableOldValue bool,
filter filter.Filter,
tz *time.Location,
changefeedID model.ChangeFeedID,
Expand All @@ -61,11 +58,10 @@ func NewMounterGroup(
workerNum = defaultMounterWorkerNum
}
return &mounterGroup{
schemaStorage: schemaStorage,
inputCh: make(chan *model.PolymorphicEvent, defaultInputChanSize),
enableOldValue: enableOldValue,
filter: filter,
tz: tz,
schemaStorage: schemaStorage,
inputCh: make(chan *model.PolymorphicEvent, defaultInputChanSize),
filter: filter,
tz: tz,

workerNum: workerNum,

Expand Down Expand Up @@ -100,7 +96,7 @@ func (m *mounterGroup) Run(ctx context.Context) error {
}

func (m *mounterGroup) runWorker(ctx context.Context) error {
mounter := NewMounter(m.schemaStorage, m.changefeedID, m.tz, m.filter, m.enableOldValue)
mounter := NewMounter(m.schemaStorage, m.changefeedID, m.tz, m.filter)
for {
select {
case <-ctx.Done():
Expand Down
8 changes: 3 additions & 5 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,7 @@ func testMounterDisableOldValue(t *testing.T, tc struct {
config := config.GetDefaultReplicaConfig()
filter, err := filter.NewFilter(config, "")
require.Nil(t, err)
mounter := NewMounter(scheamStorage,
model.DefaultChangeFeedID("c1"),
time.UTC, filter, false).(*mounter)
mounter := NewMounter(scheamStorage, model.DefaultChangeFeedID("c1"), time.UTC, filter).(*mounter)
mounter.tz = time.Local
ctx := context.Background()

Expand Down Expand Up @@ -1019,7 +1017,7 @@ func TestDecodeEventIgnoreRow(t *testing.T) {

ts := schemaStorage.GetLastSnapshot().CurrentTs()
schemaStorage.AdvanceResolvedTs(ver.Ver)
mounter := NewMounter(schemaStorage, cfID, time.Local, f, true).(*mounter)
mounter := NewMounter(schemaStorage, cfID, time.Local, f).(*mounter)

type testCase struct {
schema string
Expand Down Expand Up @@ -1196,7 +1194,7 @@ func TestBuildTableInfo(t *testing.T) {
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true)
cols, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{})
require.NoError(t, err)
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
Expand Down
25 changes: 22 additions & 3 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math"
"net/url"
"regexp"
"strings"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -261,7 +262,7 @@ func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error) {
// VerifyAndComplete verifies changefeed info and may fill in some fields.
// If a required field is not provided, return an error.
// If some necessary filed is missing but can use a default value, fill in it.
func (info *ChangeFeedInfo) VerifyAndComplete() error {
func (info *ChangeFeedInfo) VerifyAndComplete() {
defaultConfig := config.GetDefaultReplicaConfig()
if info.Engine == "" {
info.Engine = SortUnified
Expand All @@ -278,8 +279,6 @@ func (info *ChangeFeedInfo) VerifyAndComplete() error {
if info.Config.Consistent == nil {
info.Config.Consistent = defaultConfig.Consistent
}

return nil
}

// FixIncompatible fixes incompatible changefeed meta info.
Expand Down Expand Up @@ -308,6 +307,14 @@ func (info *ChangeFeedInfo) FixIncompatible() {
info.fixMemoryQuota()
log.Info("Fix incompatible memory quota completed", zap.String("changefeed", info.String()))
}

if creatorVersionGate.ChangefeedAdjustEnableOldValueByProtocol() {
log.Info("Start fixing incompatible enable old value", zap.String("changefeed", info.String()),
zap.Bool("enableOldValue", info.Config.EnableOldValue))
info.fixEnableOldValue()
log.Info("Fix incompatible enable old value completed", zap.String("changefeed", info.String()),
zap.Bool("enableOldValue", info.Config.EnableOldValue))
}
}

// fixState attempts to fix state loss from upgrading the old owner to the new owner.
Expand Down Expand Up @@ -378,6 +385,18 @@ func (info *ChangeFeedInfo) fixMySQLSinkProtocol() {
}
}

func (info *ChangeFeedInfo) fixEnableOldValue() {
uri, err := url.Parse(info.SinkURI)
if err != nil {
// this is impossible to happen, since the changefeed registered successfully.
log.Warn("parse sink URI failed", zap.Error(err))
return
}
scheme := strings.ToLower(uri.Scheme)
protocol := uri.Query().Get(config.ProtocolKey)
info.Config.AdjustEnableOldValue(scheme, protocol)
}

func (info *ChangeFeedInfo) fixMQSinkProtocol() {
uri, err := url.Parse(info.SinkURI)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ func TestVerifyAndComplete(t *testing.T) {
},
}

err := info.VerifyAndComplete()
require.Nil(t, err)
info.VerifyAndComplete()
require.Equal(t, SortUnified, info.Engine)

marshalConfig1, err := info.Config.Marshal()
Expand Down
8 changes: 0 additions & 8 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,15 +302,7 @@ func SplitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEv
deleteEventRowKV := *updateEvent.RawKV
deleteEvent.Row = &deleteEventRow
deleteEvent.RawKV = &deleteEventRowKV

deleteEvent.Row.Columns = nil
for i := range deleteEvent.Row.PreColumns {
// NOTICE: Only the handle key pre column is retained in the delete event.
if deleteEvent.Row.PreColumns[i] != nil &&
!deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() {
deleteEvent.Row.PreColumns[i] = nil
}
}

insertEvent := *updateEvent
insertEventRow := *updateEvent.Row
Expand Down
8 changes: 0 additions & 8 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,14 +584,6 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) {
require.Len(t, sink.Received[deleteEventIndex].Row.PreColumns, 3)
nilColIndex := 0
require.Nil(t, sink.Received[deleteEventIndex].Row.PreColumns[nilColIndex])
nonHandleKeyColIndex := 1
handleKeyColIndex := 2
// NOTICE: When old value disabled, we only keep the handle key pre cols.
require.Nil(t, sink.Received[deleteEventIndex].Row.PreColumns[nonHandleKeyColIndex])
require.Equal(t, "col2", sink.Received[deleteEventIndex].Row.PreColumns[handleKeyColIndex].Name)
require.True(t,
sink.Received[deleteEventIndex].Row.PreColumns[handleKeyColIndex].Flag.IsHandleKey(),
)

insertEventIndex := 1
require.Len(t, sink.Received[insertEventIndex].Row.Columns, 3)
Expand Down
1 change: 0 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,6 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {

p.mg = entry.NewMounterGroup(p.schemaStorage,
p.changefeed.Info.Config.Mounter.WorkerNum,
p.changefeed.Info.Config.EnableOldValue,
p.filter, tz, p.changefeedID)

p.wg.Add(1)
Expand Down
9 changes: 1 addition & 8 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func convertRowChangedEvents(
// This indicates that it is an update event,
// and after enable old value internally by default(but disable in the configuration).
// We need to handle the update event to be compatible with the old format.
if !enableOldValue && colLen != 0 && preColLen != 0 && colLen == preColLen {
if e.Row.IsUpdate() && !enableOldValue {
if shouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := splitUpdateEvent(e)
if err != nil {
Expand Down Expand Up @@ -508,13 +508,6 @@ func splitUpdateEvent(
deleteEvent.RawKV = &deleteEventRowKV

deleteEvent.Row.Columns = nil
for i := range deleteEvent.Row.PreColumns {
// NOTICE: Only the handle key pre column is retained in the delete event.
if deleteEvent.Row.PreColumns[i] != nil &&
!deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() {
deleteEvent.Row.PreColumns[i] = nil
}
}

insertEvent := *updateEvent
insertEventRow := *updateEvent.Row
Expand Down
Loading

0 comments on commit 1e2f277

Please sign in to comment.