Skip to content

Commit

Permalink
cdc: add changefeed epoch to prevent unexpected state (pingcap#8268)
Browse files Browse the repository at this point in the history
close pingcap#7657

Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus committed Apr 6, 2023
1 parent 7a7ca72 commit db668f6
Show file tree
Hide file tree
Showing 27 changed files with 812 additions and 179 deletions.
2 changes: 2 additions & 0 deletions cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -143,6 +144,7 @@ func verifyCreateChangefeedConfig(
Engine: sortEngine,
State: model.StateNormal,
CreatorVersion: version.ReleaseVersion,
Epoch: owner.GenerateChangefeedEpoch(ctx, up.PDClient),
}
f, err := filter.NewFilter(replicaConfig, "")
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v1/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,5 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
newInfo, err = VerifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo)
require.Nil(t, err)
require.NotNil(t, newInfo)
require.NotEqual(t, 0, newInfo.Epoch)
}
1 change: 1 addition & 0 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
Config: replicaCfg,
State: model.StateNormal,
CreatorVersion: version.ReleaseVersion,
Epoch: owner.GenerateChangefeedEpoch(ctx, pdClient),
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
require.NotNil(t, cfInfo)
require.NotEqual(t, "", cfInfo.ID)
require.Equal(t, model.DefaultNamespace, cfInfo.Namespace)
require.NotEqual(t, 0, cfInfo.Epoch)

cfg.ID = "abdc/sss"
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
Expand Down
2 changes: 2 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ type ChangeFeedInfo struct {
Error *RunningError `json:"error"`

CreatorVersion string `json:"creator-version"`
// Epoch is the epoch of a changefeed, changes on every restart.
Epoch uint64 `json:"epoch"`
}

const changeFeedIDMaxLen = 128
Expand Down
21 changes: 14 additions & 7 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
// newSchedulerFromCtx creates a new scheduler from context.
// This function is factored out to facilitate unit testing.
func newSchedulerFromCtx(
ctx cdcContext.Context, pdClock pdutil.Clock,
ctx cdcContext.Context, pdClock pdutil.Clock, epoch uint64,
) (ret scheduler.Scheduler, err error) {
changeFeedID := ctx.ChangefeedVars().ID
messageServer := ctx.GlobalVars().MessageServer
Expand All @@ -52,15 +52,16 @@ func newSchedulerFromCtx(
cfg := config.GetGlobalServerConfig().Debug
ret, err = scheduler.NewScheduler(
ctx, captureID, changeFeedID,
messageServer, messageRouter, ownerRev, cfg.Scheduler, pdClock)
messageServer, messageRouter, ownerRev, epoch, cfg.Scheduler, pdClock)
return ret, errors.Trace(err)
}

func newScheduler(
ctx cdcContext.Context,
pdClock pdutil.Clock,
epoch uint64,
) (scheduler.Scheduler, error) {
return newSchedulerFromCtx(ctx, pdClock)
return newSchedulerFromCtx(ctx, pdClock, epoch)
}

type changefeed struct {
Expand Down Expand Up @@ -120,7 +121,9 @@ type changefeed struct {
) (puller.DDLPuller, error)

newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(error)) DDLSink
newScheduler func(ctx cdcContext.Context, pdClock pdutil.Clock) (scheduler.Scheduler, error)
newScheduler func(
ctx cdcContext.Context, pdClock pdutil.Clock, epoch uint64,
) (scheduler.Scheduler, error)

lastDDLTs uint64 // Timestamp of the last executed DDL. Only used for tests.
}
Expand All @@ -136,7 +139,7 @@ func newChangefeed(
// The scheduler will be created lazily.
scheduler: nil,
barriers: newBarriers(),
feedStateManager: newFeedStateManager(),
feedStateManager: newFeedStateManager(up),
upstream: up,

errCh: make(chan error, defaultErrChSize),
Expand All @@ -158,7 +161,9 @@ func newChangefeed4Test(
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error),
newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(err error)) DDLSink,
newScheduler func(ctx cdcContext.Context, pdClock pdutil.Clock) (scheduler.Scheduler, error),
newScheduler func(
ctx cdcContext.Context, pdClock pdutil.Clock, epoch uint64,
) (scheduler.Scheduler, error),
) *changefeed {
c := newChangefeed(id, state, up)
c.newDDLPuller = newDDLPuller
Expand Down Expand Up @@ -580,7 +585,8 @@ LOOP:
c.state.Info.Config.BDRMode)

// create scheduler
c.scheduler, err = c.newScheduler(ctx, c.upstream.PDClock)
epoch := c.state.Info.Epoch
c.scheduler, err = c.newScheduler(ctx, c.upstream.PDClock, epoch)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -591,6 +597,7 @@ LOOP:
log.Info("changefeed initialized",
zap.String("namespace", c.state.ID.Namespace),
zap.String("changefeed", c.state.ID.ID),
zap.Uint64("changefeedEpoch", epoch),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", resolvedTs),
zap.Stringer("info", c.state.Info))
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,
},
// new scheduler
func(
ctx cdcContext.Context, pdClock pdutil.Clock,
ctx cdcContext.Context, pdClock pdutil.Clock, epoch uint64,
) (scheduler.Scheduler, error) {
return &mockScheduler{}, nil
})
Expand Down
57 changes: 34 additions & 23 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package owner

import (
"context"
"time"

"github.com/cenkalti/backoff/v4"
Expand All @@ -22,6 +23,9 @@ import (
"github.com/pingcap/tiflow/cdc/model"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand All @@ -44,6 +48,7 @@ const (
// feedStateManager manages the ReactorState of a changefeed
// when an error or an admin job occurs, the feedStateManager is responsible for controlling the ReactorState
type feedStateManager struct {
upstream *upstream.Upstream
state *orchestrator.ChangefeedReactorState
shouldBeRunning bool
// Based on shouldBeRunning = false
Expand All @@ -59,8 +64,9 @@ type feedStateManager struct {
}

// newFeedStateManager creates feedStateManager and initialize the exponential backoff
func newFeedStateManager() *feedStateManager {
func newFeedStateManager(up *upstream.Upstream) *feedStateManager {
f := new(feedStateManager)
f.upstream = up

f.errBackoff = backoff.NewExponentialBackOff()
f.errBackoff.InitialInterval = defaultBackoffInitInterval
Expand All @@ -76,28 +82,6 @@ func newFeedStateManager() *feedStateManager {
return f
}

// newFeedStateManager4Test creates feedStateManager for test
func newFeedStateManager4Test(
initialIntervalInMs time.Duration,
maxIntervalInMs time.Duration,
maxElapsedTimeInMs time.Duration,
multiplier float64,
) *feedStateManager {
f := new(feedStateManager)

f.errBackoff = backoff.NewExponentialBackOff()
f.errBackoff.InitialInterval = initialIntervalInMs * time.Millisecond
f.errBackoff.MaxInterval = maxIntervalInMs * time.Millisecond
f.errBackoff.MaxElapsedTime = maxElapsedTimeInMs * time.Millisecond
f.errBackoff.Multiplier = multiplier
f.errBackoff.RandomizationFactor = 0

f.resetErrBackoff()
f.lastErrorTime = time.Unix(0, 0)

return f
}

// resetErrBackoff reset the backoff-related fields
func (m *feedStateManager) resetErrBackoff() {
m.errBackoff.Reset()
Expand Down Expand Up @@ -340,16 +324,21 @@ func (m *feedStateManager) pushAdminJob(job *model.AdminJob) {
}

func (m *feedStateManager) patchState(feedState model.FeedState) {
var updateEpoch bool
var adminJobType model.AdminJobType
switch feedState {
case model.StateNormal:
adminJobType = model.AdminNone
updateEpoch = false
case model.StateFinished:
adminJobType = model.AdminFinish
updateEpoch = true
case model.StateError, model.StateStopped, model.StateFailed:
adminJobType = model.AdminStop
updateEpoch = true
case model.StateRemoved:
adminJobType = model.AdminRemove
updateEpoch = true
default:
log.Panic("Unreachable")
}
Expand All @@ -376,6 +365,18 @@ func (m *feedStateManager) patchState(feedState model.FeedState) {
info.AdminJobType = adminJobType
changed = true
}
if updateEpoch {
previous := info.Epoch
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
info.Epoch = GenerateChangefeedEpoch(ctx, m.upstream.PDClient)
changed = true
log.Info("update changefeed epoch",
zap.String("namespace", m.state.ID.Namespace),
zap.String("changefeed", m.state.ID.ID),
zap.Uint64("perviousEpoch", previous),
zap.Uint64("currentEpoch", info.Epoch))
}
return info, changed, nil
})
}
Expand Down Expand Up @@ -514,3 +515,13 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
zap.Duration("newInterval", m.backoffInterval))
}
}

// GenerateChangefeedEpoch generates a unique changefeed epoch.
func GenerateChangefeedEpoch(ctx context.Context, pdClient pd.Client) uint64 {
phyTs, logical, err := pdClient.GetTS(ctx)
if err != nil {
log.Warn("generate epoch using local timestamp due to error", zap.Error(err))
return uint64(time.Now().UnixNano())
}
return oracle.ComposeTS(phyTs, logical)
}
38 changes: 37 additions & 1 deletion cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,55 @@
package owner

import (
"context"
"fmt"
"testing"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
)

type mockPD struct {
pd.Client
}

func (p *mockPD) GetTS(_ context.Context) (int64, int64, error) {
return 1, 2, nil
}

// newFeedStateManager4Test creates feedStateManager for test
func newFeedStateManager4Test(
initialIntervalInMs time.Duration,
maxIntervalInMs time.Duration,
maxElapsedTimeInMs time.Duration,
multiplier float64,
) *feedStateManager {
f := new(feedStateManager)
f.upstream = new(upstream.Upstream)
f.upstream.PDClient = &mockPD{}

f.errBackoff = backoff.NewExponentialBackOff()
f.errBackoff.InitialInterval = initialIntervalInMs * time.Millisecond
f.errBackoff.MaxInterval = maxIntervalInMs * time.Millisecond
f.errBackoff.MaxElapsedTime = maxElapsedTimeInMs * time.Millisecond
f.errBackoff.Multiplier = multiplier
f.errBackoff.RandomizationFactor = 0

f.resetErrBackoff()
f.lastErrorTime = time.Unix(0, 0)

return f
}

func TestHandleJob(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test(200, 1600, 0, 2.0)
Expand Down Expand Up @@ -294,7 +330,7 @@ func TestHandleError(t *testing.T) {

func TestHandleFastFailError(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := new(feedStateManager)
manager := newFeedStateManager4Test(0, 0, 0, 0)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
Expand Down
6 changes: 4 additions & 2 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func newOwner4Test(
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error),
newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(err error)) DDLSink,
newScheduler func(ctx cdcContext.Context, pdClock pdutil.Clock) (scheduler.Scheduler, error),
newScheduler func(
ctx cdcContext.Context, pdClock pdutil.Clock, epoch uint64,
) (scheduler.Scheduler, error),
pdClient pd.Client,
) Owner {
m := upstream.NewManager4Test(pdClient)
Expand Down Expand Up @@ -103,7 +105,7 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches
},
// new scheduler
func(
ctx cdcContext.Context, pdClock pdutil.Clock,
ctx cdcContext.Context, pdClock pdutil.Clock, changefeedEpoch uint64,
) (scheduler.Scheduler, error) {
return &mockScheduler{}, nil
},
Expand Down
11 changes: 10 additions & 1 deletion cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type managerImpl struct {
model.ChangeFeedID,
*upstream.Upstream,
*model.Liveness,
uint64,
) *processor

metricProcessorCloseDuration prometheus.Observer
Expand Down Expand Up @@ -110,6 +111,7 @@ func (m *managerImpl) Tick(stdCtx context.Context, state orchestrator.ReactorSta
m.closeProcessor(changefeedID, ctx)
continue
}
currentChangefeedEpoch := changefeedState.Info.Epoch
p, exist := m.processors[changefeedID]
if !exist {
up, ok := m.upstreamManager.Get(changefeedState.Info.UpstreamID)
Expand All @@ -118,13 +120,20 @@ func (m *managerImpl) Tick(stdCtx context.Context, state orchestrator.ReactorSta
up = m.upstreamManager.AddUpstream(upstreamInfo)
}
failpoint.Inject("processorManagerHandleNewChangefeedDelay", nil)
p = m.newProcessor(changefeedState, m.captureInfo, changefeedID, up, m.liveness)
p = m.newProcessor(
changefeedState, m.captureInfo, changefeedID, up, m.liveness,
currentChangefeedEpoch)
m.processors[changefeedID] = p
}
ctx := cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: changefeedID,
Info: changefeedState.Info,
})
if currentChangefeedEpoch != p.changefeedEpoch {
// Changefeed has restarted due to error, the processor is stale.
m.closeProcessor(changefeedID, ctx)
continue
}
if err := p.Tick(ctx); err != nil {
// processor have already patched its error to tell the owner
// manager can just close the processor and continue to tick other processors
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func NewManager4Test(
changefeedID model.ChangeFeedID,
up *upstream.Upstream,
liveness *model.Liveness,
changefeedEpoch uint64,
) *processor {
return newProcessor4Test(t, state, captureInfo, createTablePipeline, m.liveness)
}
Expand Down
Loading

0 comments on commit db668f6

Please sign in to comment.