Skip to content

Commit

Permalink
Optimistic: support start task with inconsistent upstream table schema (
Browse files Browse the repository at this point in the history
#3903)

close #3629
  • Loading branch information
GMHDBJD authored Feb 7, 2022
1 parent f710abb commit 5c7d238
Show file tree
Hide file tree
Showing 43 changed files with 1,528 additions and 809 deletions.
13 changes: 11 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,22 @@ dm_generate_openapi: tools/bin/oapi-codegen
cd dm && ../tools/bin/oapi-codegen --config=openapi/spec/types-gen-cfg.yaml openapi/spec/dm.yaml
cd dm && ../tools/bin/oapi-codegen --config=openapi/spec/client-gen-cfg.yaml openapi/spec/dm.yaml

dm_unit_test: check_failpoint_ctl
define run_dm_unit_test
@echo "running unit test for packages:" $(1)
mkdir -p $(DM_TEST_DIR)
$(FAILPOINT_ENABLE)
@export log_level=error; \
$(GOTEST) -timeout 5m -covermode=atomic -coverprofile="$(DM_TEST_DIR)/cov.unit_test.out" $(DM_PACKAGES) \
$(GOTEST) -timeout 5m -covermode=atomic -coverprofile="$(DM_TEST_DIR)/cov.unit_test.out" $(1) \
|| { $(FAILPOINT_DISABLE); exit 1; }
$(FAILPOINT_DISABLE)
endef

dm_unit_test: check_failpoint_ctl
$(call run_dm_unit_test,$(DM_PACKAGES))

# run unit test for the specified pkg only, like `make dm_unit_test_pkg PKG=github.com/pingcap/tiflow/dm/dm/master`
dm_unit_test_pkg: check_failpoint_ctl
$(call run_dm_unit_test,$(PKG))

dm_unit_test_in_verify_ci: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov tools/bin/gocov-xml
mkdir -p $(DM_TEST_DIR)
Expand Down
1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ ErrMasterBoundChanging,[code=38052:class=dm-master:scope=internal:level=low], "M
ErrMasterFailToImportFromV10x,[code=38053:class=dm-master:scope=internal:level=high], "Message: fail to import DM cluster from v1.0.x, Workaround: Please confirm that you have not violated any restrictions in the upgrade documentation."
ErrMasterInconsistentOptimisticDDLsAndInfo,[code=38054:class=dm-master:scope=internal:level=high], "Message: inconsistent count of optimistic ddls and table infos, ddls: %d, table info: %d"
ErrMasterOptimisticTableInfoBeforeNotExist,[code=38055:class=dm-master:scope=internal:level=high], "Message: table-info-before not exist in optimistic ddls: %v"
ErrMasterOptimisticDownstreamMetaNotFound,[code=38056:class=dm-master:scope=internal:level=high], "Message: downstream database config and meta for task %s not found"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium], "Message: parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium], "Message: '%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium], "Message: toml decode file, Workaround: Please check the configuration file has correct TOML format."
Expand Down
5 changes: 2 additions & 3 deletions dm/dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ var (
ShardDDLOptimismOperationKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-optimism/operation/")
// ShardDDLOptimismInitSchemaKeyAdapter is used to store the initial schema (before constructed the lock) of merged tables.
// k/v: Encode(task-name, downstream-schema-name, downstream-table-name) -> table schema.
ShardDDLOptimismInitSchemaKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-optimism/init-schema/")
// TODO: prune in etcd when upgrade
// ShardDDLOptimismInitSchemaKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-optimism/init-schema/")
// ShardDDLOptimismDroppedColumnsKeyAdapter is used to store the columns that are not fully dropped
// k/v: Encode(lock-id, column-name, source-id, upstream-schema-name, upstream-table-name) -> int
// If we don't identify different upstream tables, we may report an error for tb2 in the following case.
Expand Down Expand Up @@ -112,8 +113,6 @@ func keyAdapterKeysLen(s KeyAdapter) int {
ShardDDLPessimismInfoKeyAdapter, ShardDDLPessimismOperationKeyAdapter,
ShardDDLOptimismSourceTablesKeyAdapter, LoadTaskKeyAdapter, TaskCliArgsKeyAdapter:
return 2
case ShardDDLOptimismInitSchemaKeyAdapter:
return 3
case ShardDDLOptimismInfoKeyAdapter, ShardDDLOptimismOperationKeyAdapter:
return 4
case ShardDDLOptimismDroppedColumnsKeyAdapter:
Expand Down
23 changes: 0 additions & 23 deletions dm/dm/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package common
import (
"net"
"path"
"strings"
"testing"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -61,11 +60,6 @@ func (t *testCommon) TestKeyAdapter(c *C) {
adapter: UpstreamSubTaskKeyAdapter,
want: "/dm-master/upstream/subtask/6d7973716c31/74657374",
},
{
keys: []string{"test", "target_db", "target_table"},
adapter: ShardDDLOptimismInitSchemaKeyAdapter,
want: "/dm-master/shardddl-optimism/init-schema/74657374/7461726765745f6462/7461726765745f7461626c65",
},
{
keys: []string{"test", "mysql_replica_01", "target_db", "target_table"},
adapter: ShardDDLOptimismInfoKeyAdapter,
Expand Down Expand Up @@ -108,11 +102,6 @@ func (t *testCommon) TestEncodeAsPrefix(c *C) {
adapter: UpstreamSubTaskKeyAdapter,
want: "/dm-master/upstream/subtask/6d7973716c31/",
},
{
keys: []string{"test", "target_db"},
adapter: ShardDDLOptimismInitSchemaKeyAdapter,
want: "/dm-master/shardddl-optimism/init-schema/74657374/7461726765745f6462/",
},
}

for _, ca := range testCases {
Expand All @@ -121,18 +110,6 @@ func (t *testCommon) TestEncodeAsPrefix(c *C) {
_, err := ca.adapter.Decode(encKey)
c.Assert(err, NotNil)
}

keys := []string{"test", "target_db", "target_table"}
fullEncodedKey := ShardDDLOptimismInitSchemaKeyAdapter.Encode(keys...)
prefixEncodedKey := ShardDDLOptimismInitSchemaKeyAdapter.Encode(keys[:len(keys)-1]...)
c.Assert(strings.HasPrefix(fullEncodedKey, prefixEncodedKey), IsTrue)

keys2 := []string{"test", "target_db_2", "target_table_2"}
fullEncodedKey2 := ShardDDLOptimismInitSchemaKeyAdapter.Encode(keys2...)
prefixEncodedKey2 := ShardDDLOptimismInitSchemaKeyAdapter.Encode(keys2[:len(keys2)-1]...)

c.Assert(strings.HasPrefix(fullEncodedKey, prefixEncodedKey2), IsFalse)
c.Assert(strings.HasPrefix(fullEncodedKey2, prefixEncodedKey), IsFalse)
}

func (t *testCommon) TestIsErrNetClosing(c *C) {
Expand Down
13 changes: 13 additions & 0 deletions dm/dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,19 @@ func (s *Scheduler) getSubTaskCfgByTaskSource(task, source string) *config.SubTa
return &clone
}

// GetDownstreamMetaByTask gets downstream db config and meta config by task name.
func (s *Scheduler) GetDownstreamMetaByTask(task string) (*config.DBConfig, string) {
v, ok := s.subTaskCfgs.Load(task)
if !ok {
return nil, ""
}
cfgM := v.(map[string]config.SubTaskConfig)
for _, cfg := range cfgM {
return cfg.To.Clone(), cfg.MetaSchema
}
return nil, ""
}

// GetSubTaskCfgsByTask gets subtask configs' map by task name.
func (s *Scheduler) GetSubTaskCfgsByTask(task string) map[string]*config.SubTaskConfig {
v, ok := s.subTaskCfgs.Load(task)
Expand Down
16 changes: 16 additions & 0 deletions dm/dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,15 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) {
c.Assert(s.AddSubTasks(false), IsNil) // can call without configs, return without error, but take no effect.
t.subTaskCfgNotExist(c, s, taskName1, sourceID1)
t.subTaskStageMatch(c, s, taskName1, sourceID1, pb.Stage_InvalidStage)
t.downstreamMetaNotExist(c, s, taskName1)
// start the task.
c.Assert(s.AddSubTasks(false, subtaskCfg1), IsNil)
c.Assert(terror.ErrSchedulerSubTaskExist.Equal(s.AddSubTasks(false, subtaskCfg1)), IsTrue) // add again.
// subtask config and stage exist.
t.subTaskCfgExist(c, s, subtaskCfg1)
t.subTaskStageMatch(c, s, taskName1, sourceID1, pb.Stage_Running)
t.downstreamMetaExist(c, s, taskName1, subtaskCfg1.To, subtaskCfg1.MetaSchema)
t.downstreamMetaNotExist(c, s, taskName2)

// update source config when task already started will failed
c.Assert(terror.ErrSchedulerSourceOpTaskExist.Equal(s.UpdateSourceCfg(sourceCfg1)), IsTrue)
Expand Down Expand Up @@ -629,6 +632,19 @@ func (t *testScheduler) subTaskCfgExist(c *C, s *Scheduler, expectCfg config.Sub
c.Assert(cfgM[expectCfg.Name], DeepEquals, expectCfg)
}

func (t *testScheduler) downstreamMetaNotExist(c *C, s *Scheduler, task string) {
dbConfig, metaConfig := s.GetDownstreamMetaByTask(task)
c.Assert(dbConfig, IsNil)
c.Assert(metaConfig, Equals, "")
}

func (t *testScheduler) downstreamMetaExist(c *C, s *Scheduler, task string, expectDBCfg config.DBConfig, expectMetaConfig string) {
dbConfig, metaConfig := s.GetDownstreamMetaByTask(task)
c.Assert(dbConfig, NotNil)
c.Assert(dbConfig, DeepEquals, &expectDBCfg)
c.Assert(metaConfig, Equals, expectMetaConfig)
}

func (t *testScheduler) workerNotExist(c *C, s *Scheduler, worker string) {
c.Assert(s.GetWorkerByName(worker), IsNil)
wm, _, err := ha.GetAllWorkerInfo(etcdTestCli)
Expand Down
16 changes: 14 additions & 2 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func NewServer(cfg *Config) *Server {
ap: NewAgentPool(&RateLimitConfig{rate: cfg.RPCRateLimit, burst: cfg.RPCRateBurst}),
}
server.pessimist = shardddl.NewPessimist(&logger, server.getTaskResources)
server.optimist = shardddl.NewOptimist(&logger)
server.optimist = shardddl.NewOptimist(&logger, server.scheduler.GetDownstreamMetaByTask)
server.closed.Store(true)
setUseTLS(&cfg.Security)

Expand Down Expand Up @@ -590,6 +590,18 @@ func (s *Server) OperateTask(ctx context.Context, req *pb.OperateTaskRequest) (*

resp.Result = true
resp.Sources = s.getSourceRespsAfterOperation(ctx, req.Name, sources, []string{}, req)

if expect == pb.Stage_Stopped {
// delete meta data for optimist
if len(req.Sources) == 0 {
err2 = s.optimist.RemoveMetaDataWithTask(req.Name)
} else {
err2 = s.optimist.RemoveMetaDataWithTaskAndSources(req.Name, sources...)
}
if err2 != nil {
log.L().Error("failed to delete metadata for task", zap.String("task name", req.Name), log.ShortError(err2))
}
}
return resp, nil
}

Expand Down Expand Up @@ -1558,7 +1570,7 @@ func (s *Server) removeMetaData(ctx context.Context, taskName, metaSchema string
if err != nil {
return err
}
err = s.optimist.RemoveMetaData(taskName)
err = s.optimist.RemoveMetaDataWithTask(taskName)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
server.scheduler, _ = t.testMockScheduler(ctx, &wg, c, sources, workers, "",
makeWorkerClientsForHandle(ctrl, taskName, sources, workers, req))
server.pessimist = shardddl.NewPessimist(&logger, func(task string) []string { return sources })
server.optimist = shardddl.NewOptimist(&logger)
server.optimist = shardddl.NewOptimist(&logger, server.scheduler.GetDownstreamMetaByTask)

var (
DDLs = []string{"ALTER TABLE bar ADD COLUMN c1 INT"}
Expand Down Expand Up @@ -1045,7 +1045,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
server.scheduler, _ = t.testMockScheduler(ctx, &wg, c, sources, workers, "",
makeWorkerClientsForHandle(ctrl, taskName, sources, workers, req))
server.pessimist = shardddl.NewPessimist(&logger, func(task string) []string { return sources })
server.optimist = shardddl.NewOptimist(&logger)
server.optimist = shardddl.NewOptimist(&logger, server.scheduler.GetDownstreamMetaByTask)

var (
p = parser.New()
Expand Down
Loading

0 comments on commit 5c7d238

Please sign in to comment.