Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#8268
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
overvenus authored and ti-chi-bot committed Feb 19, 2023
1 parent 719e8e3 commit b38189d
Show file tree
Hide file tree
Showing 23 changed files with 8,728 additions and 1 deletion.
465 changes: 465 additions & 0 deletions cdc/api/v2/api_helpers.go

Large diffs are not rendered by default.

147 changes: 147 additions & 0 deletions cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package v2

import (
"context"
"testing"
"time"

"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/stretchr/testify/require"
)

func TestVerifyCreateChangefeedConfig(t *testing.T) {
ctx := context.Background()
pdClient := &mockPDClient{}
helper := entry.NewSchemaTestHelper(t)
helper.Tk().MustExec("use test;")
storage := helper.Storage()
provider := &mockStatusProvider{}
cfg := &ChangefeedConfig{}
h := &APIV2HelpersImpl{}
cfInfo, err := h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.Nil(t, cfInfo)
require.NotNil(t, err)
cfg.SinkURI = "blackhole://"
// repliconfig is nil
require.Panics(t, func() {
_, _ = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
})
cfg.ReplicaConfig = GetDefaultReplicaConfig()
cfg.ReplicaConfig.ForceReplicate = true
cfg.ReplicaConfig.EnableOldValue = false
// disable old value but force replicate
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
require.Nil(t, cfInfo)
cfg.ReplicaConfig.ForceReplicate = false
cfg.ReplicaConfig.IgnoreIneligibleTable = true
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.Nil(t, err)
require.NotNil(t, cfInfo)
require.NotEqual(t, "", cfInfo.ID)
require.Equal(t, model.DefaultNamespace, cfInfo.Namespace)
require.NotEqual(t, 0, cfInfo.Epoch)

cfg.ID = "abdc/sss"
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
cfg.ID = ""
cfg.Namespace = "abdc/sss"
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
cfg.ID = ""
cfg.Namespace = ""
// changefeed already exists
provider.changefeedStatus = &model.ChangeFeedStatus{}
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
provider.changefeedStatus = nil
provider.err = cerror.ErrChangeFeedNotExists.GenWithStackByArgs("aaa")
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.Nil(t, err)
require.Equal(t, uint64(123), cfInfo.UpstreamID)
cfg.TargetTs = 3
cfg.StartTs = 4
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
cfg.TargetTs = 6
cfg.ReplicaConfig.EnableOldValue = false
cfg.SinkURI = "aaab://"
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
cfg.SinkURI = string([]byte{0x7f, ' '})
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
}

func TestVerifyUpdateChangefeedConfig(t *testing.T) {
ctx := context.Background()
cfg := &ChangefeedConfig{}
oldInfo := &model.ChangeFeedInfo{
Config: config.GetDefaultReplicaConfig(),
}
oldUpInfo := &model.UpstreamInfo{}
helper := entry.NewSchemaTestHelper(t)
helper.Tk().MustExec("use test;")
storage := helper.Storage()
h := &APIV2HelpersImpl{}
newCfInfo, newUpInfo, err := h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NotNil(t, err)
require.Nil(t, newCfInfo)
require.Nil(t, newUpInfo)
// namespace and id can not be updated
cfg.Namespace = "abc"
cfg.ID = "1234"
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NotNil(t, err)
require.Nil(t, newCfInfo)
require.Nil(t, newUpInfo)
cfg.StartTs = 2
cfg.TargetTs = 10
cfg.Engine = model.SortInMemory
cfg.ReplicaConfig = ToAPIReplicaConfig(config.GetDefaultReplicaConfig())
cfg.ReplicaConfig.EnableSyncPoint = true
cfg.ReplicaConfig.SyncPointInterval = 30 * time.Second
cfg.PDAddrs = []string{"a", "b"}
cfg.CertPath = "p1"
cfg.CAPath = "p2"
cfg.KeyPath = "p3"
cfg.SinkURI = "blackhole://"
cfg.CertAllowedCN = []string{"c", "d"}
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.Nil(t, err)
// startTs can not be updated
newCfInfo.Config.Sink.TxnAtomicity = ""
require.Equal(t, uint64(0), newCfInfo.StartTs)
require.Equal(t, uint64(10), newCfInfo.TargetTs)
require.Equal(t, model.SortInMemory, newCfInfo.Engine)
require.Equal(t, true, newCfInfo.Config.EnableSyncPoint)
require.Equal(t, 30*time.Second, newCfInfo.Config.SyncPointInterval)
require.Equal(t, cfg.ReplicaConfig.ToInternalReplicaConfig(), newCfInfo.Config)
require.Equal(t, "a,b", newUpInfo.PDEndpoints)
require.Equal(t, "p1", newUpInfo.CertPath)
require.Equal(t, "p2", newUpInfo.CAPath)
require.Equal(t, "p3", newUpInfo.KeyPath)
require.Equal(t, []string{"c", "d"}, newUpInfo.CertAllowedCN)
require.Equal(t, "blackhole://", newCfInfo.SinkURI)
oldInfo.StartTs = 10
cfg.TargetTs = 9
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NotNil(t, err)
}
33 changes: 33 additions & 0 deletions cdc/capture/http_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ import (
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
<<<<<<< HEAD:cdc/capture/http_validator.go
"github.com/pingcap/tiflow/cdc/sink"
=======
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/sinkv2/validator"
>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268)):cdc/api/v1/validator.go
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
Expand Down Expand Up @@ -114,6 +119,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch

// init ChangefeedInfo
info := &model.ChangeFeedInfo{
<<<<<<< HEAD:cdc/capture/http_validator.go
SinkURI: changefeedConfig.SinkURI,
Opts: make(map[string]string),
CreateTime: time.Now(),
Expand All @@ -125,6 +131,33 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
SyncPointEnabled: false,
SyncPointInterval: 10 * time.Minute,
CreatorVersion: version.ReleaseVersion,
=======
Namespace: model.DefaultNamespace,
ID: changefeedConfig.ID,
UpstreamID: up.ID,
SinkURI: changefeedConfig.SinkURI,
CreateTime: time.Now(),
StartTs: changefeedConfig.StartTS,
TargetTs: changefeedConfig.TargetTS,
Config: replicaConfig,
Engine: sortEngine,
State: model.StateNormal,
CreatorVersion: version.ReleaseVersion,
Epoch: owner.GenerateChangefeedEpoch(ctx, up.PDClient),
}
f, err := filter.NewFilter(replicaConfig, "")
if err != nil {
return nil, err
}
tableInfos, ineligibleTables, _, err := entry.VerifyTables(f,
up.KVStorage, changefeedConfig.StartTS)
if err != nil {
return nil, err
}
err = f.Verify(tableInfos)
if err != nil {
return nil, err
>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268)):cdc/api/v1/validator.go
}

if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable {
Expand Down
1 change: 1 addition & 0 deletions cdc/capture/http_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
newInfo, err = verifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo)
require.Nil(t, err)
require.NotNil(t, newInfo)
require.NotEqual(t, 0, newInfo.Epoch)
}
6 changes: 6 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,15 @@ type ChangeFeedInfo struct {
State FeedState `json:"state"`
Error *RunningError `json:"error"`

<<<<<<< HEAD
SyncPointEnabled bool `json:"sync-point-enabled"`
SyncPointInterval time.Duration `json:"sync-point-interval"`
CreatorVersion string `json:"creator-version"`
=======
CreatorVersion string `json:"creator-version"`
// Epoch is the epoch of a changefeed, changes on every restart.
Epoch uint64 `json:"epoch"`
>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268))
}

const changeFeedIDMaxLen = 128
Expand Down
124 changes: 124 additions & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,31 @@ import (
"go.uber.org/zap"
)

<<<<<<< HEAD
=======
// newSchedulerFromCtx creates a new scheduler from context.
// This function is factored out to facilitate unit testing.
func newSchedulerFromCtx(
ctx cdcContext.Context, up *upstream.Upstream, epoch uint64, cfg *config.SchedulerConfig,
) (ret scheduler.Scheduler, err error) {
changeFeedID := ctx.ChangefeedVars().ID
messageServer := ctx.GlobalVars().MessageServer
messageRouter := ctx.GlobalVars().MessageRouter
ownerRev := ctx.GlobalVars().OwnerRevision
captureID := ctx.GlobalVars().CaptureInfo.ID
ret, err = scheduler.NewScheduler(
ctx, captureID, changeFeedID,
messageServer, messageRouter, ownerRev, epoch, up.RegionCache, up.PDClock, cfg)
return ret, errors.Trace(err)
}

func newScheduler(
ctx cdcContext.Context, up *upstream.Upstream, epoch uint64, cfg *config.SchedulerConfig,
) (scheduler.Scheduler, error) {
return newSchedulerFromCtx(ctx, up, epoch, cfg)
}

>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268))
type changefeed struct {
id model.ChangeFeedID
state *model.ChangefeedReactorState
Expand Down Expand Up @@ -67,17 +92,54 @@ type changefeed struct {
metricsChangefeedResolvedTsGauge prometheus.Gauge
metricsChangefeedResolvedTsLagGauge prometheus.Gauge

<<<<<<< HEAD
newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error)
newSink func() DDLSink
=======
metricsChangefeedResolvedTsGauge prometheus.Gauge
metricsChangefeedResolvedTsLagGauge prometheus.Gauge
metricsChangefeedResolvedTsLagDuration prometheus.Observer
metricsCurrentPDTsGauge prometheus.Gauge

metricsChangefeedBarrierTsGauge prometheus.Gauge
metricsChangefeedTickDuration prometheus.Observer

downstreamObserver observer.Observer
observerLastTick *atomic.Time

newDDLPuller func(ctx context.Context,
replicaConfig *config.ReplicaConfig,
up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error)

newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(error)) DDLSink
newScheduler func(
ctx cdcContext.Context, up *upstream.Upstream, epoch uint64, cfg *config.SchedulerConfig,
) (scheduler.Scheduler, error)

newDownstreamObserver func(
ctx context.Context, sinkURIStr string, replCfg *config.ReplicaConfig,
opts ...observer.NewObserverOption,
) (observer.Observer, error)

lastDDLTs uint64 // Timestamp of the last executed DDL. Only used for tests.
>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268))
}

func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
c := &changefeed{
id: id,
scheduler: newScheduler(),
barriers: newBarriers(),
<<<<<<< HEAD
feedStateManager: newFeedStateManager(),
gcManager: gcManager,
=======
feedStateManager: newFeedStateManager(up),
upstream: up,
>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268))

errCh: make(chan error, defaultErrChSize),
cancel: func() {},
Expand All @@ -89,9 +151,27 @@ func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
}

func newChangefeed4Test(
<<<<<<< HEAD
id model.ChangeFeedID, gcManager gc.Manager,
newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error),
newSink func() DDLSink,
=======
id model.ChangeFeedID, state *orchestrator.ChangefeedReactorState, up *upstream.Upstream,
newDDLPuller func(ctx context.Context,
replicaConfig *config.ReplicaConfig,
up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error),
newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(err error)) DDLSink,
newScheduler func(
ctx cdcContext.Context, up *upstream.Upstream, epoch uint64, cfg *config.SchedulerConfig,
) (scheduler.Scheduler, error),
newDownstreamObserver func(
ctx context.Context, sinkURIStr string, replCfg *config.ReplicaConfig,
opts ...observer.NewObserverOption,
) (observer.Observer, error),
>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268))
) *changefeed {
c := newChangefeed(id, gcManager)
c.newDDLPuller = newDDLPuller
Expand Down Expand Up @@ -267,13 +347,57 @@ LOOP:
ctx.Throw(c.ddlPuller.Run(cancelCtx))
}()

<<<<<<< HEAD
// init metrics
c.metricsChangefeedCheckpointTsGauge = changefeedCheckpointTsGauge.WithLabelValues(c.id)
c.metricsChangefeedCheckpointTsLagGauge = changefeedCheckpointTsLagGauge.WithLabelValues(c.id)
c.metricsChangefeedResolvedTsGauge = changefeedResolvedTsGauge.WithLabelValues(c.id)
c.metricsChangefeedResolvedTsLagGauge = changefeedResolvedTsLagGauge.WithLabelValues(c.id)

c.initialized = true
=======
c.downstreamObserver, err = c.newDownstreamObserver(
ctx, c.state.Info.SinkURI, c.state.Info.Config)
if err != nil {
return err
}
c.observerLastTick = atomic.NewTime(time.Time{})

stdCtx := contextutil.PutChangefeedIDInCtx(cancelCtx, c.id)
redoManagerOpts := redo.NewOwnerManagerOptions(c.errCh)
mgr, err := redo.NewManager(stdCtx, c.state.Info.Config.Consistent, redoManagerOpts)
c.redoManager = mgr
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
err = errors.New("changefeed new redo manager injected error")
})
if err != nil {
return err
}
log.Info("owner creates redo manager",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID))

// create scheduler
cfg := *c.cfg
cfg.ChangefeedSettings = c.state.Info.Config.Scheduler
epoch := c.state.Info.Epoch
c.scheduler, err = c.newScheduler(ctx, c.upstream, epoch, &cfg)
if err != nil {
return errors.Trace(err)
}

c.initMetrics()

c.initialized = true
log.Info("changefeed initialized",
zap.String("namespace", c.state.ID.Namespace),
zap.String("changefeed", c.state.ID.ID),
zap.Uint64("changefeedEpoch", epoch),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", resolvedTs),
zap.Stringer("info", c.state.Info))

>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268))
return nil
}

Expand Down
Loading

0 comments on commit b38189d

Please sign in to comment.