Skip to content

Commit

Permalink
syncer(dm): implement start-task --start-time (pingcap#4485)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored and zhaoxinyu committed Feb 16, 2022
1 parent 6f07220 commit 4974552
Show file tree
Hide file tree
Showing 13 changed files with 282 additions and 16 deletions.
1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ ErrConfigInvalidLoadMode,[code=20053:class=config:scope=internal:level=medium],
ErrConfigInvalidDuplicateResolution,[code=20054:class=config:scope=internal:level=medium], "Message: invalid load on-duplicate '%s', Workaround: Please choose a valid value in ['replace', 'error', 'ignore']"
ErrConfigValidationMode,[code=20055:class=config:scope=internal:level=high], "Message: invalid validation mode, Workaround: Please check `validation-mode` config in task configuration file."
ErrContinuousValidatorCfgNotFound,[code=20056:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s continuous validator config %s not exist, Workaround: Please check the `continuous-validator-config-name` config in task configuration file."
ErrConfigStartTimeTooLate,[code=20057:class=config:scope=internal:level=high], "Message: start-time %s is too late, no binlog location matches it, Workaround: Please check the `--start-time` is expected or try again later."
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down
9 changes: 8 additions & 1 deletion dm/dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,14 @@ const (
validatorIdx
)

// adjust adjusts and verifies config.
// Adjust adjusts and verifies config.
func (c *TaskConfig) Adjust() error {
if c == nil {
return terror.ErrConfigYamlTransform.New("task config is nil")
}
return c.adjust()
}

func (c *TaskConfig) adjust() error {
if len(c.Name) == 0 {
return terror.ErrConfigNeedUniqueTaskName.Generate()
Expand Down
11 changes: 8 additions & 3 deletions dm/dm/config/task_cli_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
"github.com/pingcap/tiflow/dm/pkg/terror"
)

const (
StartTimeFormat = "2006-01-02 15:04:05"
StartTimeFormat2 = "2006-01-02T15:04:05"
)

// TaskCliArgs is the task command line arguments, these arguments have higher priority than the config file and
// downstream checkpoint, but may need to be removed after the first time they take effect.
type TaskCliArgs struct {
Expand All @@ -46,10 +51,10 @@ func (t *TaskCliArgs) Verify() error {
if t.StartTime == "" {
return nil
}
_, err := time.Parse("2006-01-02 15:04:05", t.StartTime)
_, err := time.Parse(StartTimeFormat, t.StartTime)
if err == nil {
return nil
}
_, err = time.Parse("2006-01-02T15:04:05", t.StartTime)
return terror.Annotate(err, "error while parse start-time, expected in the format like '2006-01-02 15:04:05'")
_, err = time.Parse(StartTimeFormat2, t.StartTime)
return terror.Annotate(err, "error while parse start-time, expected in the format like '2006-01-02 15:04:05' or '2006-01-02T15:04:05'")
}
12 changes: 10 additions & 2 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/pingcap/tiflow/dm/dm/master/workerrpc"
"github.com/pingcap/tiflow/dm/dm/pb"
"github.com/pingcap/tiflow/dm/dm/unit"
"github.com/pingcap/tiflow/dm/pkg/binlog"
"github.com/pingcap/tiflow/dm/pkg/conn"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/cputil"
Expand Down Expand Up @@ -1545,14 +1546,21 @@ func (s *Server) generateSubTask(
task string,
cliArgs *config.TaskCliArgs,
) (*config.TaskConfig, []*config.SubTaskConfig, error) {
var err error
cfg := config.NewTaskConfig()
// bypass the meta check by set any value. If start-time is specified, DM-worker will not use meta field.
if cliArgs != nil && cliArgs.StartTime != "" {
err = cfg.RawDecode(task)
if err != nil {
return nil, nil, terror.WithClass(err, terror.ClassDMMaster)
}
for _, inst := range cfg.MySQLInstances {
inst.Meta = &config.Meta{BinLogName: cliArgs.StartTime}
inst.Meta = &config.Meta{BinLogName: binlog.FakeBinlogName}
}
err = cfg.Adjust()
} else {
err = cfg.Decode(task)
}
err := cfg.Decode(task)
if err != nil {
return nil, nil, terror.WithClass(err, terror.ClassDMMaster)
}
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (w *SourceWorker) StartSubTask(cfg *config.SubTaskConfig, expectStage, vali
}

// directly put cfg into subTaskHolder
// the unique of subtask should be assured by etcd
// the uniqueness of subtask should be assured by etcd
st := NewSubTask(cfg, w.etcdClient, w.name)
w.subTaskHolder.recordSubTask(st)
if w.closed.Load() {
Expand Down
6 changes: 6 additions & 0 deletions dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,12 @@ description = ""
workaround = "Please check the `continuous-validator-config-name` config in task configuration file."
tags = ["internal", "medium"]

[error.DM-config-20057]
message = "start-time %s is too late, no binlog location matches it"
description = ""
workaround = "Please check the `--start-time` is expected or try again later."
tags = ["internal", "high"]

[error.DM-binlog-op-22001]
message = ""
description = ""
Expand Down
3 changes: 3 additions & 0 deletions dm/pkg/binlog/pos_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
"github.com/pingcap/tiflow/dm/pkg/utils"
)

// FakeBinlogName is used to bypass the checking of meta in task config when start-task with --start-time.
const FakeBinlogName = "start-task with --start-time"

type binlogPosFinder struct {
remote bool
tctx *tcontext.Context
Expand Down
2 changes: 2 additions & 0 deletions dm/pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ const (
codeConfigInvalidLoadDuplicateResolution
codeConfigValidationMode
codeContinuousValidatorCfgNotFound
codeConfigStartTimeTooLate
)

// Binlog operation error code list.
Expand Down Expand Up @@ -914,6 +915,7 @@ var (
ErrConfigInvalidDuplicateResolution = New(codeConfigInvalidLoadDuplicateResolution, ClassConfig, ScopeInternal, LevelMedium, "invalid load on-duplicate '%s'", "Please choose a valid value in ['replace', 'error', 'ignore']")
ErrConfigValidationMode = New(codeConfigValidationMode, ClassConfig, ScopeInternal, LevelHigh, "invalid validation mode", "Please check `validation-mode` config in task configuration file.")
ErrContinuousValidatorCfgNotFound = New(codeContinuousValidatorCfgNotFound, ClassConfig, ScopeInternal, LevelMedium, "mysql-instance(%d)'s continuous validator config %s not exist", "Please check the `continuous-validator-config-name` config in task configuration file.")
ErrConfigStartTimeTooLate = New(codeConfigStartTimeTooLate, ClassConfig, ScopeInternal, LevelHigh, "start-time %s is too late, no binlog location matches it", "Please check the `--start-time` is expected or try again later.")

// Binlog operation error.
ErrBinlogExtractPosition = New(codeBinlogExtractPosition, ClassBinlogOp, ScopeInternal, LevelHigh, "", "")
Expand Down
40 changes: 38 additions & 2 deletions dm/syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/pingcap/errors"

"github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/pkg/binlog"
"github.com/pingcap/tiflow/dm/pkg/conn"
Expand Down Expand Up @@ -227,6 +228,9 @@ type CheckPoint interface {
// DeleteTablePoint deletes checkpoint for specified table in memory and storage
DeleteTablePoint(tctx *tcontext.Context, table *filter.Table) error

// DeleteAllTablePoint deletes all checkpoints for table in memory and storage
DeleteAllTablePoint(tctx *tcontext.Context) error

// DeleteSchemaPoint deletes checkpoint for specified schema
DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error

Expand All @@ -237,10 +241,13 @@ type CheckPoint interface {
// corresponding to Meta.Save
SaveGlobalPoint(point binlog.Location)

// SaveGlobalPointForcibly saves the global binlog stream's checkpoint forcibly.
SaveGlobalPointForcibly(location binlog.Location)

// Snapshot make a snapshot of current checkpoint
Snapshot(isSyncFlush bool) *SnapshotInfo

// FlushGlobalPointsExcept flushes the global checkpoint and tables'
// FlushPointsExcept flushes the global checkpoint and tables'
// checkpoints except exceptTables, it also flushes SQLs with Args providing
// by extraSQLs and extraArgs. Currently extraSQLs contain shard meta only.
// @exceptTables: [[schema, table]... ]
Expand Down Expand Up @@ -551,6 +558,26 @@ func (cp *RemoteCheckPoint) DeleteTablePoint(tctx *tcontext.Context, table *filt
return nil
}

// DeleteAllTablePoint implements CheckPoint.DeleteAllTablePoint.
func (cp *RemoteCheckPoint) DeleteAllTablePoint(tctx *tcontext.Context) error {
cp.Lock()
defer cp.Unlock()

tctx2, cancel := tctx.WithContext(context.Background()).WithTimeout(maxDMLConnectionDuration)
defer cancel()
cp.logCtx.L().Info("delete all table checkpoint")
_, err := cp.dbConn.ExecuteSQL(
tctx2,
[]string{`DELETE FROM ` + cp.tableName + ` WHERE id = ? AND is_global = ?`},
[]interface{}{cp.id, false},
)
if err != nil {
return err
}
cp.points = make(map[string]map[string]*binlogPoint)
return nil
}

// DeleteSchemaPoint implements CheckPoint.DeleteSchemaPoint.
func (cp *RemoteCheckPoint) DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error {
cp.Lock()
Expand Down Expand Up @@ -614,7 +641,16 @@ func (cp *RemoteCheckPoint) SaveGlobalPoint(location binlog.Location) {
}
}

// FlushPointsExcept implements CheckPoint.FlushSnapshotPointsExcept.
// SaveGlobalPointForcibly implements CheckPoint.SaveGlobalPointForcibly.
func (cp *RemoteCheckPoint) SaveGlobalPointForcibly(location binlog.Location) {
cp.Lock()
defer cp.Unlock()

cp.logCtx.L().Info("reset global checkpoint", zap.Stringer("location", location))
cp.globalPoint = newBinlogPoint(location, binlog.NewLocation(cp.cfg.Flavor), nil, nil, cp.cfg.EnableGTID)
}

// FlushPointsExcept implements CheckPoint.FlushPointsExcept.
func (cp *RemoteCheckPoint) FlushPointsExcept(
tctx *tcontext.Context,
snapshotID int,
Expand Down
103 changes: 96 additions & 7 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type Syncer struct {

cfg *config.SubTaskConfig
syncCfg replication.BinlogSyncerConfig
cliArgs *config.TaskCliArgs

sgk *ShardingGroupKeeper // keeper to keep all sharding (sub) group in this syncer
pessimist *shardddl.Pessimist // shard DDL pessimist
Expand Down Expand Up @@ -439,11 +440,13 @@ func (s *Syncer) Init(ctx context.Context) (err error) {
}

// when Init syncer, set active relay log info
err = s.setInitActiveRelayLog(ctx)
if err != nil {
return err
if s.cfg.Meta == nil || s.cfg.Meta.BinLogName != binlog.FakeBinlogName {
err = s.setInitActiveRelayLog(ctx)
if err != nil {
return err
}
rollbackHolder.Add(fr.FuncRollback{Name: "remove-active-realylog", Fn: s.removeActiveRelayLog})
}
rollbackHolder.Add(fr.FuncRollback{Name: "remove-active-realylog", Fn: s.removeActiveRelayLog})

s.reset()
return nil
Expand Down Expand Up @@ -1257,6 +1260,17 @@ func (s *Syncer) afterFlushCheckpoint(task *checkpointFlushTask) error {
s.lastCheckpointFlushedTime = now

s.logAndClearFilteredStatistics()

if s.cliArgs != nil && s.cliArgs.StartTime != "" {
clone := *s.cliArgs
clone.StartTime = ""
err2 := ha.PutTaskCliArgs(s.cli, s.cfg.Name, []string{s.cfg.SourceID}, clone)
if err2 != nil {
s.tctx.L().Error("failed to clean start-time in task cli args", zap.Error(err2))
} else {
s.cliArgs.StartTime = ""
}
}
return nil
}

Expand Down Expand Up @@ -1477,11 +1491,30 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}
}()

// some initialization that can't be put in Syncer.Init
fresh, err := s.IsFreshTask(runCtx)
if err != nil {
return err
} else if fresh {
}

// task command line arguments have the highest priority
// dm-syncer and other usage may not have a etcdCli, so we check it first
skipLoadMeta := false
if s.cli != nil {
s.cliArgs, err = ha.GetTaskCliArgs(s.cli, s.cfg.Name, s.cfg.SourceID)
if err != nil {
s.tctx.L().Error("failed to get task cli args", zap.Error(err))
}
if s.cliArgs != nil && s.cliArgs.StartTime != "" {
err = s.setGlobalPointByTime(tctx, s.cliArgs.StartTime)
if terror.ErrConfigStartTimeTooLate.Equal(err) {
return err
}
skipLoadMeta = err == nil
}
}

// some initialization that can't be put in Syncer.Init
if fresh && !skipLoadMeta {
// for fresh task, we try to load checkpoints from meta (file or config item)
err = s.checkpoint.LoadMeta()
if err != nil {
Expand Down Expand Up @@ -3636,7 +3669,8 @@ func (s *Syncer) adjustGlobalPointGTID(tctx *tcontext.Context) (bool, error) {
// 1. GTID is not enabled
// 2. location already has GTID position
// 3. location is totally new, has no position info
if !s.cfg.EnableGTID || location.GTIDSetStr() != "" || location.Position.Name == "" {
// 4. location is too early thus not a COMMIT location, which happens when it's reset by other logic
if !s.cfg.EnableGTID || location.GTIDSetStr() != "" || location.Position.Name == "" || location.Position.Pos == 4 {
return false, nil
}
// set enableGTID to false for new streamerController
Expand Down Expand Up @@ -3728,3 +3762,58 @@ func (s *Syncer) flushOptimisticTableInfos(tctx *tcontext.Context) {
tctx.L().Error("failed to flush table points with table infos", log.ShortError(err))
}
}

func (s *Syncer) setGlobalPointByTime(tctx *tcontext.Context, timeStr string) error {
// we support two layout
t, err := time.ParseInLocation(config.StartTimeFormat, timeStr, s.timezone)
if err != nil {
t, err = time.ParseInLocation(config.StartTimeFormat2, timeStr, s.timezone)
}
if err != nil {
return err
}

var (
loc *binlog.Location
posTp binlog.PosType
)

if s.relay != nil {
subDir := s.relay.Status(nil).(*pb.RelayStatus).RelaySubDir
relayDir := path.Join(s.cfg.RelayDir, subDir)
finder := binlog.NewLocalBinlogPosFinder(tctx, s.cfg.EnableGTID, s.cfg.Flavor, relayDir)
loc, posTp, err = finder.FindByTimestamp(t.Unix())
} else {
finder := binlog.NewRemoteBinlogPosFinder(tctx, s.fromDB.BaseDB.DB, s.syncCfg, s.cfg.EnableGTID)
loc, posTp, err = finder.FindByTimestamp(t.Unix())
}
if err != nil {
s.tctx.L().Error("fail to find binlog position by timestamp",
zap.Time("time", t),
zap.Error(err))
return err
}

switch posTp {
case binlog.InRangeBinlogPos:
s.tctx.L().Info("find binlog position by timestamp",
zap.String("time", timeStr),
zap.Stringer("pos", loc))
case binlog.BelowLowerBoundBinlogPos:
s.tctx.L().Warn("fail to find binlog location by timestamp because the timestamp is too early, will use the earliest binlog location",
zap.String("time", timeStr),
zap.Any("location", loc))
case binlog.AboveUpperBoundBinlogPos:
return terror.ErrConfigStartTimeTooLate.Generate(timeStr)
}

err = s.checkpoint.DeleteAllTablePoint(tctx)
if err != nil {
return err
}
s.checkpoint.SaveGlobalPointForcibly(*loc)
s.tctx.L().Info("Will replicate from the specified time, the location recorded in checkpoint and config file will be ignored",
zap.String("time", timeStr),
zap.Any("locationOfTheTime", loc))
return nil
}
1 change: 1 addition & 0 deletions dm/tests/duplicate_event/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ function run_with_prepared_source_config() {

server_uuid=$(tail -n 1 $WORK_DIR/worker2/relay-dir/server-uuid.index)
relay_log_size=$(ls -al $WORK_DIR/worker2/relay-dir/$server_uuid/$binlog_file | awk '{print $5}')
echo "binlog_pos: $binlog_pos relay_log_size: $relay_log_size"
[ "$binlog_pos" -eq "$relay_log_size" ]

echo "============== run_with_prepared_source_config success ==================="
Expand Down
2 changes: 2 additions & 0 deletions dm/tests/start_task/conf/dm-worker2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker2"
join = "127.0.0.1:8261"
Loading

0 comments on commit 4974552

Please sign in to comment.