Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd: check changefeed start-ts when creating or resuming #1497

Merged
merged 2 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions cmd/client_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ func newChangefeedCommand() *cobra.Command {
return command
}

func resumeChangefeedCheck(ctx context.Context, cmd *cobra.Command) error {
resp, err := applyOwnerChangefeedQuery(ctx, changefeedID, getCredential())
if err != nil {
return err
}
info := &cdc.ChangefeedResp{}
err = json.Unmarshal([]byte(resp), info)
if err != nil {
return err
}
return confirmLargeDataGap(ctx, cmd, info.TSO)
}

func newAdminChangefeedCommand() []*cobra.Command {
cmds := []*cobra.Command{
{
Expand All @@ -92,6 +105,9 @@ func newAdminChangefeedCommand() []*cobra.Command {
CfID: changefeedID,
Type: model.AdminResume,
}
if err := resumeChangefeedCheck(ctx, cmd); err != nil {
return err
}
return applyAdminChangefeed(ctx, job, getCredential())
},
},
Expand All @@ -118,6 +134,9 @@ func newAdminChangefeedCommand() []*cobra.Command {
if cmd.Use == "remove" {
cmd.PersistentFlags().BoolVarP(&optForceRemove, "force", "f", false, "remove all information of the changefeed")
}
if cmd.Use == "resume" {
cmd.PersistentFlags().BoolVar(&noConfirm, "no-confirm", false, "Don't ask user whether to ignore ineligible table")
}
}
return cmds
}
Expand Down Expand Up @@ -237,6 +256,9 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate
if err := verifyStartTs(ctx, startTs); err != nil {
return nil, err
}
if err := confirmLargeDataGap(ctx, cmd, startTs); err != nil {
return nil, err
}
if err := verifyTargetTs(ctx, startTs, targetTs); err != nil {
return nil, err
}
Expand Down
69 changes: 67 additions & 2 deletions cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
package cmd

import (
"context"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/parser/model"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/util/testleak"

"github.com/pingcap/check"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/spf13/cobra"
pd "github.com/tikv/pd/client"
)

func TestSuite(t *testing.T) { check.TestingT(t) }
Expand Down Expand Up @@ -198,3 +202,64 @@ func (s *decodeFileSuite) TestShouldReturnErrForUnknownCfgs(c *check.C) {
c.Assert(err, check.NotNil)
c.Assert(err, check.ErrorMatches, ".*unknown config.*")
}

type mockPDClient struct {
pd.Client
ts uint64
}

func (m *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) {
return oracle.ExtractPhysical(m.ts), 0, nil
}

type commonUtilSuite struct{}

var _ = check.Suite(&commonUtilSuite{})

func (s *commonUtilSuite) TestConfirmLargeDataGap(c *check.C) {
defer testleak.AfterTest(c)()
ctx := context.Background()
currentTs := uint64(423482306736160769) // 2021-03-11 17:59:57.547
startTs := uint64(423450030227042420) // 2021-03-10 07:47:52.435
pdCli = &mockPDClient{ts: currentTs}
cmd := &cobra.Command{}

// check start ts more than 1 day before current ts, and type N when confirming
dir := c.MkDir()
path := filepath.Join(dir, "confirm.txt")
err := ioutil.WriteFile(path, []byte("n"), 0o644)
c.Assert(err, check.IsNil)
f, err := os.Open(path)
c.Assert(err, check.IsNil)
stdin := os.Stdin
os.Stdin = f
defer func() {
os.Stdin = stdin
}()
err = confirmLargeDataGap(ctx, cmd, startTs)
c.Assert(err, check.ErrorMatches, "abort changefeed create or resume")

// check no confirm works
originNoConfirm := noConfirm
noConfirm = true
defer func() {
noConfirm = originNoConfirm
}()
err = confirmLargeDataGap(ctx, cmd, startTs)
c.Assert(err, check.IsNil)
noConfirm = false

// check start ts more than 1 day before current ts, and type Y when confirming
err = ioutil.WriteFile(path, []byte("Y"), 0o644)
c.Assert(err, check.IsNil)
f, err = os.Open(path)
c.Assert(err, check.IsNil)
os.Stdin = f
err = confirmLargeDataGap(ctx, cmd, startTs)
c.Assert(err, check.IsNil)

// check start ts does not exceed threshold
pdCli = &mockPDClient{ts: startTs}
err = confirmLargeDataGap(ctx, cmd, startTs)
c.Assert(err, check.IsNil)
}
30 changes: 30 additions & 0 deletions cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os/signal"
"strings"
"syscall"
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/pingcap/ticdc/pkg/logutil"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.etcd.io/etcd/clientv3/concurrency"
Expand All @@ -54,6 +56,8 @@ var (

var errOwnerNotFound = liberrors.New("owner not found")

var tsGapWarnning int64 = 86400 * 1000 // 1 day in milliseconds

func addSecurityFlags(flags *pflag.FlagSet, isServer bool) {
flags.StringVar(&caPath, "ca", "", "CA certificate path for TLS connection")
flags.StringVar(&certPath, "cert", "", "Certificate path for TLS connection")
Expand Down Expand Up @@ -311,3 +315,29 @@ func strictDecodeFile(path, component string, cfg interface{}) error {

return errors.Trace(err)
}

func confirmLargeDataGap(ctx context.Context, cmd *cobra.Command, startTs uint64) error {
if noConfirm {
return nil
}
currentPhysical, _, err := pdCli.GetTS(ctx)
if err != nil {
return err
}
tsGap := currentPhysical - oracle.ExtractPhysical(startTs)
if tsGap > tsGapWarnning {
cmd.Printf("Replicate lag (%s) is larger than 1 days, "+
"large data may cause OOM, confirm to continue at your own risk [Y/N]\n",
time.Duration(tsGap)*time.Millisecond,
)
var yOrN string
_, err := fmt.Scan(&yOrN)
if err != nil {
return err
}
if strings.ToLower(strings.TrimSpace(yOrN)) != "y" {
return errors.NewNoStackError("abort changefeed create or resume")
}
}
return nil
}