Skip to content

Commit

Permalink
cdc: add delay for recreating changefeed (#7730) (#7783)
Browse files Browse the repository at this point in the history
ref #7657
  • Loading branch information
ti-chi-bot authored Dec 22, 2022
1 parent 861a092 commit 60de4f5
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 6 deletions.
13 changes: 13 additions & 0 deletions cdc/api/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,19 @@ func (h *openAPI) CreateChangefeed(c *gin.Context) {
return
}

o, err := h.capture.GetOwner()
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(err)
return
}
err = o.ValidateChangefeed(&changefeedConfig)
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(err)
return
}

err = h.capture.EtcdClient.CreateChangefeedInfo(ctx, info,
model.DefaultChangeFeedID(changefeedConfig.ID))
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions cdc/owner/mock/owner_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 91 additions & 6 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ package owner

import (
"context"
"fmt"
"io"
"net/url"
"os"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -49,6 +52,16 @@ const (
// captures with versions different from that of the owner
const versionInconsistentLogRate = 1

// Remove following variables once we fix https://github.com/pingcap/tiflow/issues/7657.
var (
recreateChangefeedDelayLimit = 30 * time.Second
hasCIEnv = func() bool {
// Most CI platform has the "CI" environment variable.
_, ok := os.LookupEnv("CI")
return ok
}()
)

// Export field names for pretty printing.
type ownerJob struct {
Tp ownerJobType
Expand Down Expand Up @@ -84,6 +97,7 @@ type Owner interface {
)
WriteDebugInfo(w io.Writer, done chan<- error)
Query(query *Query, done chan<- error)
ValidateChangefeed(info *model.ChangefeedConfig) error
AsyncStop()
}

Expand All @@ -105,17 +119,28 @@ type ownerImpl struct {
// as it is not a thread-safe value.
bootstrapped bool

newChangefeed func(id model.ChangeFeedID, upStream *upstream.Upstream) *changefeed
newChangefeed func(
id model.ChangeFeedID,
up *upstream.Upstream,
) *changefeed

// removedChangefeed is a workload of https://github.com/pingcap/tiflow/issues/7657
// by delaying recreate changefeed with the same ID.
// TODO: remove these fields after the issue is resolved.
removedChangefeed map[model.ChangeFeedID]time.Time
removedSinkURI map[url.URL]time.Time
}

// NewOwner creates a new Owner
func NewOwner(upstreamManager *upstream.Manager) Owner {
return &ownerImpl{
upstreamManager: upstreamManager,
changefeeds: make(map[model.ChangeFeedID]*changefeed),
lastTickTime: time.Now(),
newChangefeed: newChangefeed,
logLimiter: rate.NewLimiter(versionInconsistentLogRate, versionInconsistentLogRate),
upstreamManager: upstreamManager,
changefeeds: make(map[model.ChangeFeedID]*changefeed),
lastTickTime: time.Now(),
newChangefeed: newChangefeed,
logLimiter: rate.NewLimiter(versionInconsistentLogRate, versionInconsistentLogRate),
removedChangefeed: make(map[model.ChangeFeedID]time.Time),
removedSinkURI: make(map[url.URL]time.Time),
}
}

Expand Down Expand Up @@ -280,6 +305,43 @@ func (o *ownerImpl) Query(query *Query, done chan<- error) {
})
}

func (o *ownerImpl) ValidateChangefeed(info *model.ChangefeedConfig) error {
o.ownerJobQueue.Lock()
defer o.ownerJobQueue.Unlock()
if hasCIEnv {
// Disable the check on CI platform, because many tests repeatedly
// create changefeed with same name and same sinkURI.
return nil
}

t, ok := o.removedChangefeed[model.ChangeFeedID{ID: info.ID, Namespace: info.Namespace}]
if ok {
remain := recreateChangefeedDelayLimit - time.Since(t)
if remain >= 0 {
return cerror.ErrInternalServerError.GenWithStackByArgs(fmt.Sprintf(
"changefeed with same ID was just removed, please wait %s", remain))
}
}

sinkURI, err := url.Parse(info.SinkURI)
if err != nil {
return cerror.ErrInternalServerError.GenWithStackByArgs(
fmt.Sprintf("invalid sink URI %s", err))
}
t, ok = o.removedSinkURI[url.URL{
Scheme: sinkURI.Scheme,
Host: sinkURI.Host,
}]
if ok {
remain := recreateChangefeedDelayLimit - time.Since(t)
if remain >= 0 {
return cerror.ErrInternalServerError.GenWithStackByArgs(fmt.Sprintf(
"changefeed with same sink URI was just removed, please wait %s", remain))
}
}
return nil
}

// AsyncStop stops the owner asynchronously
func (o *ownerImpl) AsyncStop() {
atomic.StoreInt32(&o.closed, 1)
Expand Down Expand Up @@ -399,6 +461,17 @@ func (o *ownerImpl) handleJobs() {
}
switch job.Tp {
case ownerJobTypeAdminJob:
if job.AdminJob.Type == model.AdminRemove {
now := time.Now()
o.removedChangefeed[changefeedID] = now
uri, err := url.Parse(cfReactor.state.Info.SinkURI)
if err == nil {
o.removedSinkURI[url.URL{
Scheme: uri.Scheme,
Host: uri.Host,
}] = now
}
}
cfReactor.feedStateManager.PushAdminJob(job.AdminJob)
case ownerJobTypeScheduleTable:
cfReactor.scheduler.MoveTable(job.TableID, job.TargetCaptureID)
Expand All @@ -411,6 +484,18 @@ func (o *ownerImpl) handleJobs() {
}
close(job.done)
}

// Try GC removed changefeed id/sink URI after delay limit passed.
for id, t := range o.removedChangefeed {
if time.Since(t) >= recreateChangefeedDelayLimit {
delete(o.removedChangefeed, id)
}
}
for s, t := range o.removedSinkURI {
if time.Since(t) >= recreateChangefeedDelayLimit {
delete(o.removedSinkURI, s)
}
}
}

func (o *ownerImpl) handleQueries(query *Query) error {
Expand Down
75 changes: 75 additions & 0 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math"
"net/url"
"testing"
"time"

Expand Down Expand Up @@ -592,3 +593,77 @@ func TestCalculateGCSafepointTs(t *testing.T) {
require.Equal(t, expectMinTsMap, minCheckpoinTsMap)
require.Equal(t, expectForceUpdateMap, forceUpdateMap)
}

func TestValidateChangefeed(t *testing.T) {
t.Parallel()

// Test `ValidateChangefeed` by setting `hasCIEnv` to false.
//
// FIXME: We need a better way to enable following tests
// Changing global variable in a unit test is BAD practice.
hasCIEnv = false

o := &ownerImpl{
changefeeds: make(map[model.ChangeFeedID]*changefeed),
// logLimiter: rate.NewLimiter(1, 1),
removedChangefeed: make(map[model.ChangeFeedID]time.Time),
removedSinkURI: make(map[url.URL]time.Time),
}

id := model.ChangeFeedID{Namespace: "a", ID: "b"}
sinkURI := "mysql://host:1234/"
o.changefeeds[id] = &changefeed{
state: &orchestrator.ChangefeedReactorState{
Info: &model.ChangeFeedInfo{SinkURI: sinkURI},
},
feedStateManager: &feedStateManager{},
}

o.pushOwnerJob(&ownerJob{
Tp: ownerJobTypeAdminJob,
ChangefeedID: id,
AdminJob: &model.AdminJob{
CfID: id,
Type: model.AdminRemove,
},
done: make(chan<- error, 1),
})
o.handleJobs()

require.Error(t, o.ValidateChangefeed(&model.ChangefeedConfig{
ID: id.ID,
Namespace: id.Namespace,
}))
require.Error(t, o.ValidateChangefeed(&model.ChangefeedConfig{
ID: "unknown",
Namespace: "unknown",
SinkURI: sinkURI,
}))

// Test invalid sink URI
require.Error(t, o.ValidateChangefeed(&model.ChangefeedConfig{
SinkURI: "wrong uri\n\t",
}))

// Test limit passed.
o.removedChangefeed[id] = time.Now().Add(-2 * recreateChangefeedDelayLimit)
o.removedSinkURI[url.URL{
Scheme: "mysql",
Host: "host:1234",
}] = time.Now().Add(-2 * recreateChangefeedDelayLimit)

require.Nil(t, o.ValidateChangefeed(&model.ChangefeedConfig{
ID: id.ID,
Namespace: id.Namespace,
}))
require.Nil(t, o.ValidateChangefeed(&model.ChangefeedConfig{
ID: "unknown",
Namespace: "unknown",
SinkURI: sinkURI,
}))

// Test GC.
o.handleJobs()
require.Equal(t, 0, len(o.removedChangefeed))
require.Equal(t, 0, len(o.removedSinkURI))
}
2 changes: 2 additions & 0 deletions tests/integration_tests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ else
test_case="*"
fi

# Print environment variables.
env
set -eu

if [ "$test_case" == "*" ]; then
Expand Down

0 comments on commit 60de4f5

Please sign in to comment.