diff --git a/cmd/client_changefeed.go b/cmd/client_changefeed.go index 875c1dabb24..51a8afb4d43 100644 --- a/cmd/client_changefeed.go +++ b/cmd/client_changefeed.go @@ -69,6 +69,19 @@ func newChangefeedCommand() *cobra.Command { return command } +func resumeChangefeedCheck(ctx context.Context, cmd *cobra.Command) error { + resp, err := applyOwnerChangefeedQuery(ctx, changefeedID, getCredential()) + if err != nil { + return err + } + info := &cdc.ChangefeedResp{} + err = json.Unmarshal([]byte(resp), info) + if err != nil { + return err + } + return confirmLargeDataGap(ctx, cmd, info.TSO) +} + func newAdminChangefeedCommand() []*cobra.Command { cmds := []*cobra.Command{ { @@ -92,6 +105,9 @@ func newAdminChangefeedCommand() []*cobra.Command { CfID: changefeedID, Type: model.AdminResume, } + if err := resumeChangefeedCheck(ctx, cmd); err != nil { + return err + } return applyAdminChangefeed(ctx, job, getCredential()) }, }, @@ -118,6 +134,9 @@ func newAdminChangefeedCommand() []*cobra.Command { if cmd.Use == "remove" { cmd.PersistentFlags().BoolVarP(&optForceRemove, "force", "f", false, "remove all information of the changefeed") } + if cmd.Use == "resume" { + cmd.PersistentFlags().BoolVar(&noConfirm, "no-confirm", false, "Don't ask user whether to ignore ineligible table") + } } return cmds } @@ -237,6 +256,9 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate if err := verifyStartTs(ctx, startTs); err != nil { return nil, err } + if err := confirmLargeDataGap(ctx, cmd, startTs); err != nil { + return nil, err + } if err := verifyTargetTs(ctx, startTs, targetTs); err != nil { return nil, err } diff --git a/cmd/cmd_test.go b/cmd/cmd_test.go index ad3b3a36bf1..da3002e67aa 100644 --- a/cmd/cmd_test.go +++ b/cmd/cmd_test.go @@ -14,15 +14,19 @@ package cmd import ( + "context" "io/ioutil" + "os" "path/filepath" "testing" + "github.com/pingcap/check" "github.com/pingcap/parser/model" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/util/testleak" - - "github.com/pingcap/check" + "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/spf13/cobra" + pd "github.com/tikv/pd/client" ) func TestSuite(t *testing.T) { check.TestingT(t) } @@ -198,3 +202,64 @@ func (s *decodeFileSuite) TestShouldReturnErrForUnknownCfgs(c *check.C) { c.Assert(err, check.NotNil) c.Assert(err, check.ErrorMatches, ".*unknown config.*") } + +type mockPDClient struct { + pd.Client + ts uint64 +} + +func (m *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) { + return oracle.ExtractPhysical(m.ts), 0, nil +} + +type commonUtilSuite struct{} + +var _ = check.Suite(&commonUtilSuite{}) + +func (s *commonUtilSuite) TestConfirmLargeDataGap(c *check.C) { + defer testleak.AfterTest(c)() + ctx := context.Background() + currentTs := uint64(423482306736160769) // 2021-03-11 17:59:57.547 + startTs := uint64(423450030227042420) // 2021-03-10 07:47:52.435 + pdCli = &mockPDClient{ts: currentTs} + cmd := &cobra.Command{} + + // check start ts more than 1 day before current ts, and type N when confirming + dir := c.MkDir() + path := filepath.Join(dir, "confirm.txt") + err := ioutil.WriteFile(path, []byte("n"), 0o644) + c.Assert(err, check.IsNil) + f, err := os.Open(path) + c.Assert(err, check.IsNil) + stdin := os.Stdin + os.Stdin = f + defer func() { + os.Stdin = stdin + }() + err = confirmLargeDataGap(ctx, cmd, startTs) + c.Assert(err, check.ErrorMatches, "abort changefeed create or resume") + + // check no confirm works + originNoConfirm := noConfirm + noConfirm = true + defer func() { + noConfirm = originNoConfirm + }() + err = confirmLargeDataGap(ctx, cmd, startTs) + c.Assert(err, check.IsNil) + noConfirm = false + + // check start ts more than 1 day before current ts, and type Y when confirming + err = ioutil.WriteFile(path, []byte("Y"), 0o644) + c.Assert(err, check.IsNil) + f, err = os.Open(path) + c.Assert(err, check.IsNil) + os.Stdin = f + err = confirmLargeDataGap(ctx, cmd, startTs) + c.Assert(err, check.IsNil) + + // check start ts does not exceed threshold + pdCli = &mockPDClient{ts: startTs} + err = confirmLargeDataGap(ctx, cmd, startTs) + c.Assert(err, check.IsNil) +} diff --git a/cmd/util.go b/cmd/util.go index 49b72c2dc75..998727bfe85 100644 --- a/cmd/util.go +++ b/cmd/util.go @@ -24,6 +24,7 @@ import ( "os/signal" "strings" "syscall" + "time" "github.com/BurntSushi/toml" "github.com/pingcap/errors" @@ -39,6 +40,7 @@ import ( "github.com/pingcap/ticdc/pkg/logutil" "github.com/pingcap/ticdc/pkg/security" "github.com/pingcap/ticdc/pkg/util" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/spf13/cobra" "github.com/spf13/pflag" "go.etcd.io/etcd/clientv3/concurrency" @@ -54,6 +56,8 @@ var ( var errOwnerNotFound = liberrors.New("owner not found") +var tsGapWarnning int64 = 86400 * 1000 // 1 day in milliseconds + func addSecurityFlags(flags *pflag.FlagSet, isServer bool) { flags.StringVar(&caPath, "ca", "", "CA certificate path for TLS connection") flags.StringVar(&certPath, "cert", "", "Certificate path for TLS connection") @@ -311,3 +315,29 @@ func strictDecodeFile(path, component string, cfg interface{}) error { return errors.Trace(err) } + +func confirmLargeDataGap(ctx context.Context, cmd *cobra.Command, startTs uint64) error { + if noConfirm { + return nil + } + currentPhysical, _, err := pdCli.GetTS(ctx) + if err != nil { + return err + } + tsGap := currentPhysical - oracle.ExtractPhysical(startTs) + if tsGap > tsGapWarnning { + cmd.Printf("Replicate lag (%s) is larger than 1 days, "+ + "large data may cause OOM, confirm to continue at your own risk [Y/N]\n", + time.Duration(tsGap)*time.Millisecond, + ) + var yOrN string + _, err := fmt.Scan(&yOrN) + if err != nil { + return err + } + if strings.ToLower(strings.TrimSpace(yOrN)) != "y" { + return errors.NewNoStackError("abort changefeed create or resume") + } + } + return nil +}