Skip to content

Commit

Permalink
cmd: check changefeed start-ts when creating or resuming
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed Mar 11, 2021
1 parent d0ddd5d commit f45e824
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 2 deletions.
22 changes: 22 additions & 0 deletions cmd/client_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ func newChangefeedCommand() *cobra.Command {
return command
}

func resumeChangefeedCheck(ctx context.Context, cmd *cobra.Command) error {
resp, err := applyOwnerChangefeedQuery(ctx, changefeedID, getCredential())
if err != nil {
return err
}
info := &cdc.ChangefeedResp{}
err = json.Unmarshal([]byte(resp), info)
if err != nil {
return err
}
return confirmLargeDataGap(ctx, cmd, info.TSO)
}

func newAdminChangefeedCommand() []*cobra.Command {
cmds := []*cobra.Command{
{
Expand All @@ -90,6 +103,9 @@ func newAdminChangefeedCommand() []*cobra.Command {
CfID: changefeedID,
Type: model.AdminResume,
}
if err := resumeChangefeedCheck(ctx, cmd); err != nil {
return err
}
return applyAdminChangefeed(ctx, job, getCredential())
},
},
Expand All @@ -116,6 +132,9 @@ func newAdminChangefeedCommand() []*cobra.Command {
if cmd.Use == "remove" {
cmd.PersistentFlags().BoolVarP(&optForceRemove, "force", "f", false, "remove all information of the changefeed")
}
if cmd.Use == "resume" {
cmd.PersistentFlags().BoolVar(&noConfirm, "no-confirm", false, "Don't ask user whether to ignore ineligible table")
}
}
return cmds
}
Expand Down Expand Up @@ -235,6 +254,9 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate
if err := verifyStartTs(ctx, startTs); err != nil {
return nil, err
}
if err := confirmLargeDataGap(ctx, cmd, startTs); err != nil {
return nil, err
}
if err := verifyTargetTs(ctx, startTs, targetTs); err != nil {
return nil, err
}
Expand Down
69 changes: 67 additions & 2 deletions cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
package cmd

import (
"context"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/parser/model"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/util/testleak"

"github.com/pingcap/check"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/spf13/cobra"
pd "github.com/tikv/pd/client"
)

func TestSuite(t *testing.T) { check.TestingT(t) }
Expand Down Expand Up @@ -198,3 +202,64 @@ func (s *decodeFileSuite) TestShouldReturnErrForUnknownCfgs(c *check.C) {
c.Assert(err, check.NotNil)
c.Assert(err, check.ErrorMatches, ".*unknown config.*")
}

type mockPDClient struct {
pd.Client
ts uint64
}

func (m *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) {
return oracle.ExtractPhysical(m.ts), 0, nil
}

type commonUtilSuite struct{}

var _ = check.Suite(&commonUtilSuite{})

func (s *commonUtilSuite) TestConfirmLargeDataGap(c *check.C) {
defer testleak.AfterTest(c)()
ctx := context.Background()
currentTs := uint64(423482306736160769) // 2021-03-11 17:59:57.547
startTs := uint64(423450030227042420) // 2021-03-10 07:47:52.435
pdCli = &mockPDClient{ts: currentTs}
cmd := &cobra.Command{}

// check start ts more than 1 day before current ts, and type N when confirming
dir := c.MkDir()
path := filepath.Join(dir, "confirm.txt")
err := ioutil.WriteFile(path, []byte("n"), 0o644)
c.Assert(err, check.IsNil)
f, err := os.Open(path)
c.Assert(err, check.IsNil)
stdin := os.Stdin
os.Stdin = f
defer func() {
os.Stdin = stdin
}()
err = confirmLargeDataGap(ctx, cmd, startTs)
c.Assert(err, check.ErrorMatches, "abort changefeed create or resume")

// check no confirm works
originNoConfirm := noConfirm
noConfirm = true
defer func() {
noConfirm = originNoConfirm
}()
err = confirmLargeDataGap(ctx, cmd, startTs)
c.Assert(err, check.IsNil)
noConfirm = false

// check start ts more than 1 day before current ts, and type Y when confirming
err = ioutil.WriteFile(path, []byte("Y"), 0o644)
c.Assert(err, check.IsNil)
f, err = os.Open(path)
c.Assert(err, check.IsNil)
os.Stdin = f
err = confirmLargeDataGap(ctx, cmd, startTs)
c.Assert(err, check.IsNil)

// check start ts does not exceed threshold
pdCli = &mockPDClient{ts: startTs}
err = confirmLargeDataGap(ctx, cmd, startTs)
c.Assert(err, check.IsNil)
}
30 changes: 30 additions & 0 deletions cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os/signal"
"strings"
"syscall"
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/pingcap/ticdc/pkg/logutil"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.etcd.io/etcd/clientv3/concurrency"
Expand All @@ -54,6 +56,8 @@ var (

var errOwnerNotFound = liberrors.New("owner not found")

var tsGapWarnning int64 = 86400 * 1000 // 1 day in milliseconds

func addSecurityFlags(flags *pflag.FlagSet, isServer bool) {
flags.StringVar(&caPath, "ca", "", "CA certificate path for TLS connection")
flags.StringVar(&certPath, "cert", "", "Certificate path for TLS connection")
Expand Down Expand Up @@ -311,3 +315,29 @@ func strictDecodeFile(path, component string, cfg interface{}) error {

return errors.Trace(err)
}

func confirmLargeDataGap(ctx context.Context, cmd *cobra.Command, startTs uint64) error {
if noConfirm {
return nil
}
currentPhysical, _, err := pdCli.GetTS(ctx)
if err != nil {
return err
}
tsGap := currentPhysical - oracle.ExtractPhysical(startTs)
if tsGap > tsGapWarnning {
cmd.Printf("Replicate lag (%s) is larger than 1 days, "+
"large data may cause OOM, confirm to continue at your own risk [Y/N]\n",
time.Duration(tsGap)*time.Millisecond,
)
var yOrN string
_, err := fmt.Scan(&yOrN)
if err != nil {
return err
}
if strings.ToLower(strings.TrimSpace(yOrN)) != "y" {
return errors.NewNoStackError("abort changefeed create or resume")
}
}
return nil
}

0 comments on commit f45e824

Please sign in to comment.