Skip to content

Commit

Permalink
cdc: add delay for recreating changefeed (#7730)
Browse files Browse the repository at this point in the history
ref #7657
  • Loading branch information
overvenus authored Dec 2, 2022
1 parent bc176c3 commit 1f6ae1f
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 5 deletions.
12 changes: 12 additions & 0 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,18 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
_ = c.Error(err)
return
}

o, err := h.capture.GetOwner()
if err != nil {
_ = c.Error(err)
return
}
err = o.ValidateChangefeed(info)
if err != nil {
_ = c.Error(err)
return
}

upstreamInfo := &model.UpstreamInfo{
ID: up.ID,
PDEndpoints: strings.Join(up.PdEndpoints, ","),
Expand Down
12 changes: 12 additions & 0 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
o, err := h.capture.GetOwner()
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
err = o.ValidateChangefeed(info)
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}

err = etcdClient.CreateChangefeedInfo(ctx,
upstreamInfo,
Expand Down
3 changes: 3 additions & 0 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestCreateChangefeed(t *testing.T) {
apiV2 := NewOpenAPIV2ForTest(cp, helpers)
router := newRouter(apiV2)

o := mock_owner.NewMockOwner(gomock.NewController(t))
mockUpManager := upstream.NewManager4Test(pdClient)
statusProvider := &mockStatusProvider{}
etcdClient.EXPECT().
Expand All @@ -65,6 +66,8 @@ func TestCreateChangefeed(t *testing.T) {
cp.EXPECT().GetUpstreamManager().Return(mockUpManager, nil).AnyTimes()
cp.EXPECT().IsReady().Return(true).AnyTimes()
cp.EXPECT().IsOwner().Return(true).AnyTimes()
cp.EXPECT().GetOwner().Return(o, nil).AnyTimes()
o.EXPECT().ValidateChangefeed(gomock.Any()).Return(nil).AnyTimes()

// case 1: json format mismatches with the spec.
errConfig := struct {
Expand Down
14 changes: 14 additions & 0 deletions cdc/owner/mock/owner_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 87 additions & 5 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ package owner

import (
"context"
"fmt"
"io"
"net/url"
"os"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -50,6 +53,16 @@ const (
// captures with versions different from that of the owner
const versionInconsistentLogRate = 1

// Remove following variables once we fix https://github.com/pingcap/tiflow/issues/7657.
var (
recreateChangefeedDelayLimit = 30 * time.Second
hasCIEnv = func() bool {
// Most CI platform has the "CI" environment variable.
_, ok := os.LookupEnv("CI")
return ok
}()
)

// Export field names for pretty printing.
type ownerJob struct {
Tp ownerJobType
Expand Down Expand Up @@ -89,6 +102,7 @@ type Owner interface {
DrainCapture(query *scheduler.Query, done chan<- error)
WriteDebugInfo(w io.Writer, done chan<- error)
Query(query *Query, done chan<- error)
ValidateChangefeed(info *model.ChangeFeedInfo) error
AsyncStop()
}

Expand Down Expand Up @@ -119,16 +133,24 @@ type ownerImpl struct {
state *orchestrator.ChangefeedReactorState,
up *upstream.Upstream,
) *changefeed

// removedChangefeed is a workload of https://github.com/pingcap/tiflow/issues/7657
// by delaying recreate changefeed with the same ID.
// TODO: remove these fields after the issue is resolved.
removedChangefeed map[model.ChangeFeedID]time.Time
removedSinkURI map[url.URL]time.Time
}

// NewOwner creates a new Owner
func NewOwner(upstreamManager *upstream.Manager) Owner {
return &ownerImpl{
upstreamManager: upstreamManager,
changefeeds: make(map[model.ChangeFeedID]*changefeed),
lastTickTime: time.Now(),
newChangefeed: newChangefeed,
logLimiter: rate.NewLimiter(versionInconsistentLogRate, versionInconsistentLogRate),
upstreamManager: upstreamManager,
changefeeds: make(map[model.ChangeFeedID]*changefeed),
lastTickTime: time.Now(),
newChangefeed: newChangefeed,
logLimiter: rate.NewLimiter(versionInconsistentLogRate, versionInconsistentLogRate),
removedChangefeed: make(map[model.ChangeFeedID]time.Time),
removedSinkURI: make(map[url.URL]time.Time),
}
}

Expand Down Expand Up @@ -285,6 +307,43 @@ func (o *ownerImpl) Query(query *Query, done chan<- error) {
})
}

func (o *ownerImpl) ValidateChangefeed(info *model.ChangeFeedInfo) error {
o.ownerJobQueue.Lock()
defer o.ownerJobQueue.Unlock()
if hasCIEnv {
// Disable the check on CI platform, because many tests repeatedly
// create changefeed with same name and same sinkURI.
return nil
}

t, ok := o.removedChangefeed[model.ChangeFeedID{ID: info.ID, Namespace: info.Namespace}]
if ok {
remain := recreateChangefeedDelayLimit - time.Since(t)
if remain >= 0 {
return cerror.ErrInternalServerError.GenWithStackByArgs(fmt.Sprintf(
"changefeed with same ID was just removed, please wait %s", remain))
}
}

sinkURI, err := url.Parse(info.SinkURI)
if err != nil {
return cerror.ErrInternalServerError.GenWithStackByArgs(
fmt.Sprintf("invalid sink URI %s", err))
}
t, ok = o.removedSinkURI[url.URL{
Scheme: sinkURI.Scheme,
Host: sinkURI.Host,
}]
if ok {
remain := recreateChangefeedDelayLimit - time.Since(t)
if remain >= 0 {
return cerror.ErrInternalServerError.GenWithStackByArgs(fmt.Sprintf(
"changefeed with same sink URI was just removed, please wait %s", remain))
}
}
return nil
}

// AsyncStop stops the owner asynchronously
func (o *ownerImpl) AsyncStop() {
atomic.StoreInt32(&o.closed, 1)
Expand Down Expand Up @@ -466,6 +525,17 @@ func (o *ownerImpl) handleJobs(ctx context.Context) {
}
switch job.Tp {
case ownerJobTypeAdminJob:
if job.AdminJob.Type == model.AdminRemove {
now := time.Now()
o.removedChangefeed[changefeedID] = now
uri, err := url.Parse(cfReactor.state.Info.SinkURI)
if err == nil {
o.removedSinkURI[url.URL{
Scheme: uri.Scheme,
Host: uri.Host,
}] = now
}
}
cfReactor.feedStateManager.PushAdminJob(job.AdminJob)
case ownerJobTypeScheduleTable:
// Scheduler is created lazily, it is nil before initialization.
Expand All @@ -487,6 +557,18 @@ func (o *ownerImpl) handleJobs(ctx context.Context) {
}
close(job.done)
}

// Try GC removed changefeed id/sink URI after delay limit passed.
for id, t := range o.removedChangefeed {
if time.Since(t) >= recreateChangefeedDelayLimit {
delete(o.removedChangefeed, id)
}
}
for s, t := range o.removedSinkURI {
if time.Since(t) >= recreateChangefeedDelayLimit {
delete(o.removedSinkURI, s)
}
}
}

func (o *ownerImpl) handleQueries(query *Query) error {
Expand Down
75 changes: 75 additions & 0 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math"
"net/url"
"testing"
"time"

Expand Down Expand Up @@ -904,3 +905,77 @@ func TestIsHealthy(t *testing.T) {
require.NoError(t, err)
require.False(t, query.Data.(bool))
}

func TestValidateChangefeed(t *testing.T) {
t.Parallel()

// Test `ValidateChangefeed` by setting `hasCIEnv` to false.
//
// FIXME: We need a better way to enable following tests
// Changing global variable in a unit test is BAD practice.
hasCIEnv = false

o := &ownerImpl{
changefeeds: make(map[model.ChangeFeedID]*changefeed),
// logLimiter: rate.NewLimiter(1, 1),
removedChangefeed: make(map[model.ChangeFeedID]time.Time),
removedSinkURI: make(map[url.URL]time.Time),
}

id := model.ChangeFeedID{Namespace: "a", ID: "b"}
sinkURI := "mysql://host:1234/"
o.changefeeds[id] = &changefeed{
state: &orchestrator.ChangefeedReactorState{
Info: &model.ChangeFeedInfo{SinkURI: sinkURI},
},
feedStateManager: &feedStateManager{},
}

o.pushOwnerJob(&ownerJob{
Tp: ownerJobTypeAdminJob,
ChangefeedID: id,
AdminJob: &model.AdminJob{
CfID: id,
Type: model.AdminRemove,
},
done: make(chan<- error, 1),
})
o.handleJobs(context.Background())

require.Error(t, o.ValidateChangefeed(&model.ChangeFeedInfo{
ID: id.ID,
Namespace: id.Namespace,
}))
require.Error(t, o.ValidateChangefeed(&model.ChangeFeedInfo{
ID: "unknown",
Namespace: "unknown",
SinkURI: sinkURI,
}))

// Test invalid sink URI
require.Error(t, o.ValidateChangefeed(&model.ChangeFeedInfo{
SinkURI: "wrong uri\n\t",
}))

// Test limit passed.
o.removedChangefeed[id] = time.Now().Add(-2 * recreateChangefeedDelayLimit)
o.removedSinkURI[url.URL{
Scheme: "mysql",
Host: "host:1234",
}] = time.Now().Add(-2 * recreateChangefeedDelayLimit)

require.Nil(t, o.ValidateChangefeed(&model.ChangeFeedInfo{
ID: id.ID,
Namespace: id.Namespace,
}))
require.Nil(t, o.ValidateChangefeed(&model.ChangeFeedInfo{
ID: "unknown",
Namespace: "unknown",
SinkURI: sinkURI,
}))

// Test GC.
o.handleJobs(context.Background())
require.Equal(t, 0, len(o.removedChangefeed))
require.Equal(t, 0, len(o.removedSinkURI))
}
2 changes: 2 additions & 0 deletions tests/integration_tests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ else
test_case="*"
fi

# Print environment variables.
env
set -eu

if [ "$test_case" == "*" ]; then
Expand Down

0 comments on commit 1f6ae1f

Please sign in to comment.