diff --git a/dm/master/server_test.go b/dm/master/server_test.go index b7b86f0fd3..072d67ea4e 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -152,7 +152,7 @@ var ( errCheckSyncConfig = "(?m).*check sync config with error.*" errCheckSyncConfigReg = fmt.Sprintf("(?m).*%s.*", errCheckSyncConfig) testEtcdCluster *integration.ClusterV3 - keepAliveTTL = int64(1) + keepAliveTTL = int64(10) etcdTestCli *clientv3.Client ) @@ -258,7 +258,7 @@ func testMockScheduler(ctx context.Context, wg *sync.WaitGroup, c *check.C, sour cfg := config.NewSourceConfig() cfg.SourceID = sources[i] cfg.From.Password = password - c.Assert(scheduler2.AddSourceCfg(*cfg), check.IsNil) + c.Assert(scheduler2.AddSourceCfg(*cfg), check.IsNil, check.Commentf("all sources: %v", sources)) wg.Add(1) ctx1, cancel1 := context.WithCancel(ctx) cancels = append(cancels, cancel1) diff --git a/dm/master/shardddl/optimist_test.go b/dm/master/shardddl/optimist_test.go index 63ecffccdd..4817b5a273 100644 --- a/dm/master/shardddl/optimist_test.go +++ b/dm/master/shardddl/optimist_test.go @@ -156,7 +156,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) { var ( backOff = 30 waitTime = 100 * time.Millisecond - watchTimeout = 2 * time.Second + watchTimeout = 500 * time.Millisecond logger = log.L() o = NewOptimist(&logger) @@ -254,8 +254,11 @@ func (t *testOptimist) testOptimist(c *C, restart int) { // wait operation for i11 become available. opCh := make(chan optimism.Operation, 10) errCh := make(chan error, 10) - ctx2, cancel2 := context.WithTimeout(ctx, watchTimeout) - optimism.WatchOperationPut(ctx2, etcdTestCli, i11.Task, i11.Source, i11.UpSchema, i11.UpTable, rev1, opCh, errCh) + ctx2, cancel2 := context.WithCancel(ctx) + go optimism.WatchOperationPut(ctx2, etcdTestCli, i11.Task, i11.Source, i11.UpSchema, i11.UpTable, rev1, opCh, errCh) + utils.WaitSomething(10, watchTimeout, func() bool { + return len(opCh) != 0 + }) cancel2() close(opCh) close(errCh) @@ -309,8 +312,11 @@ func (t *testOptimist) testOptimist(c *C, restart int) { // wait operation for i12 become available. opCh = make(chan optimism.Operation, 10) errCh = make(chan error, 10) - ctx2, cancel2 = context.WithTimeout(ctx, watchTimeout) - optimism.WatchOperationPut(ctx2, etcdTestCli, i12.Task, i12.Source, i12.UpSchema, i12.UpTable, rev2, opCh, errCh) + ctx2, cancel2 = context.WithCancel(ctx) + go optimism.WatchOperationPut(ctx2, etcdTestCli, i12.Task, i12.Source, i12.UpSchema, i12.UpTable, rev2, opCh, errCh) + utils.WaitSomething(10, watchTimeout, func() bool { + return len(opCh) != 0 + }) cancel2() close(opCh) close(errCh) @@ -356,8 +362,11 @@ func (t *testOptimist) testOptimist(c *C, restart int) { // wait operation for i21 become available. opCh = make(chan optimism.Operation, 10) errCh = make(chan error, 10) - ctx2, cancel2 = context.WithTimeout(ctx, watchTimeout) - optimism.WatchOperationPut(ctx2, etcdTestCli, i21.Task, i21.Source, i21.UpSchema, i21.UpTable, rev1, opCh, errCh) + ctx2, cancel2 = context.WithCancel(ctx) + go optimism.WatchOperationPut(ctx2, etcdTestCli, i21.Task, i21.Source, i21.UpSchema, i21.UpTable, rev1, opCh, errCh) + utils.WaitSomething(10, watchTimeout, func() bool { + return len(opCh) != 0 + }) cancel2() close(opCh) close(errCh) @@ -420,8 +429,11 @@ func (t *testOptimist) testOptimist(c *C, restart int) { // wait operation for i23 become available. opCh = make(chan optimism.Operation, 10) errCh = make(chan error, 10) - ctx2, cancel2 = context.WithTimeout(ctx, watchTimeout) - optimism.WatchOperationPut(ctx2, etcdTestCli, i23.Task, i23.Source, i23.UpSchema, i23.UpTable, rev3, opCh, errCh) + ctx2, cancel2 = context.WithCancel(ctx) + go optimism.WatchOperationPut(ctx2, etcdTestCli, i23.Task, i23.Source, i23.UpSchema, i23.UpTable, rev3, opCh, errCh) + utils.WaitSomething(10, watchTimeout, func() bool { + return len(opCh) != 0 + }) cancel2() close(opCh) close(errCh) @@ -437,8 +449,11 @@ func (t *testOptimist) testOptimist(c *C, restart int) { // wait until operation for i12 ready. opCh = make(chan optimism.Operation, 10) errCh = make(chan error, 10) - ctx2, cancel2 = context.WithTimeout(ctx, watchTimeout) - optimism.WatchOperationPut(ctx2, etcdTestCli, i12.Task, i12.Source, i12.UpSchema, i12.UpTable, rev2, opCh, errCh) + ctx2, cancel2 = context.WithCancel(ctx) + go optimism.WatchOperationPut(ctx2, etcdTestCli, i12.Task, i12.Source, i12.UpSchema, i12.UpTable, rev2, opCh, errCh) + utils.WaitSomething(10, watchTimeout, func() bool { + return len(opCh) != 0 + }) cancel2() close(opCh) close(errCh) @@ -537,8 +552,11 @@ func (t *testOptimist) testOptimist(c *C, restart int) { // wait operation for i31 become available. opCh = make(chan optimism.Operation, 10) errCh = make(chan error, 10) - ctx2, cancel2 = context.WithTimeout(ctx, watchTimeout) - optimism.WatchOperationPut(ctx2, etcdTestCli, i31.Task, i31.Source, i31.UpSchema, i31.UpTable, rev1, opCh, errCh) + ctx2, cancel2 = context.WithCancel(ctx) + go optimism.WatchOperationPut(ctx2, etcdTestCli, i31.Task, i31.Source, i31.UpSchema, i31.UpTable, rev1, opCh, errCh) + utils.WaitSomething(10, watchTimeout, func() bool { + return len(opCh) != 0 + }) cancel2() close(opCh) close(errCh) @@ -587,8 +605,11 @@ func (t *testOptimist) testOptimist(c *C, restart int) { // wait operation for i33 become available. opCh = make(chan optimism.Operation, 10) errCh = make(chan error, 10) - ctx2, cancel2 = context.WithTimeout(ctx, watchTimeout) - optimism.WatchOperationPut(ctx2, etcdTestCli, i33.Task, i33.Source, i33.UpSchema, i33.UpTable, rev3, opCh, errCh) + ctx2, cancel2 = context.WithCancel(ctx) + go optimism.WatchOperationPut(ctx2, etcdTestCli, i33.Task, i33.Source, i33.UpSchema, i33.UpTable, rev3, opCh, errCh) + utils.WaitSomething(10, watchTimeout, func() bool { + return len(opCh) != 0 + }) cancel2() close(opCh) close(errCh) diff --git a/pkg/ha/keepalive_test.go b/pkg/ha/keepalive_test.go index 390d32b9b4..9452c73345 100644 --- a/pkg/ha/keepalive_test.go +++ b/pkg/ha/keepalive_test.go @@ -57,7 +57,7 @@ func (t *testForEtcd) TestWorkerKeepAlive(c *C) { cancels = append(cancels, cancel1) go func(ctx context.Context) { err1 := KeepAlive(ctx, etcdTestCli, worker, keepAliveTTL) - c.Assert(err1, IsNil) + c.Assert(err1, IsNil, Commentf("if \"context canceled\", retry later\ncause: context used in `KeepAlive` exceed timeout of 10s (`etcdutil.DefaultRequestTimeout`)")) atomic.AddInt32(&finished, 1) }(ctx1) diff --git a/pkg/shardddl/pessimism/info_test.go b/pkg/shardddl/pessimism/info_test.go index 2895e4caf4..e034a4acc4 100644 --- a/pkg/shardddl/pessimism/info_test.go +++ b/pkg/shardddl/pessimism/info_test.go @@ -26,6 +26,7 @@ import ( "go.etcd.io/etcd/integration" "github.com/pingcap/dm/dm/common" + "github.com/pingcap/dm/pkg/utils" ) var ( @@ -118,12 +119,11 @@ func (t *testForEtcd) TestInfoEtcd(c *C) { // start the watcher. wch := make(chan Info, 10) ech := make(chan error, 10) + ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) - defer cancel() WatchInfoPut(ctx, etcdTestCli, rev4+1, wch, ech) // revision+1 close(wch) // close the chan close(ech) @@ -132,6 +132,11 @@ func (t *testForEtcd) TestInfoEtcd(c *C) { // put another key for a different task. _, err = PutInfo(etcdTestCli, i21) c.Assert(err, IsNil) + // wait response of WatchInfoPut, increase waiting time when resource shortage + utils.WaitSomething(10, 500*time.Millisecond, func() bool { + return len(wch) != 0 + }) + cancel() wg.Wait() // watch should only get i21. diff --git a/pkg/shardddl/pessimism/operation_test.go b/pkg/shardddl/pessimism/operation_test.go index 929b308756..1f22497a31 100644 --- a/pkg/shardddl/pessimism/operation_test.go +++ b/pkg/shardddl/pessimism/operation_test.go @@ -18,6 +18,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/dm/pkg/utils" ) func (t *testForEtcd) TestOperationJSON(c *C) { @@ -65,8 +66,12 @@ func (t *testForEtcd) TestOperationEtcd(c *C) { // start the watcher with the same revision as the last PUT for the specified task and source. wch := make(chan Operation, 10) ech := make(chan error, 10) - ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) - WatchOperationPut(ctx, etcdTestCli, task1, source1, rev2, wch, ech) + ctx, cancel := context.WithCancel(context.Background()) + go WatchOperationPut(ctx, etcdTestCli, task1, source1, rev2, wch, ech) + // wait response of WatchOperationPut, increase waiting time when resource shortage + utils.WaitSomething(10, 500*time.Millisecond, func() bool { + return len(wch) != 0 + }) cancel() close(wch) close(ech) @@ -84,8 +89,11 @@ func (t *testForEtcd) TestOperationEtcd(c *C) { // start the watch with an older revision for all tasks and sources. wch = make(chan Operation, 10) ech = make(chan error, 10) - ctx, cancel = context.WithTimeout(context.Background(), 500*time.Millisecond) - WatchOperationPut(ctx, etcdTestCli, "", "", rev2, wch, ech) + ctx, cancel = context.WithCancel(context.Background()) + go WatchOperationPut(ctx, etcdTestCli, "", "", rev2, wch, ech) + utils.WaitSomething(10, 500*time.Millisecond, func() bool { + return len(wch) != 0 + }) cancel() close(wch) close(ech) @@ -125,8 +133,11 @@ func (t *testForEtcd) TestOperationEtcd(c *C) { // start watch with an older revision for the deleted op11. wch = make(chan Operation, 10) ech = make(chan error, 10) - ctx, cancel = context.WithTimeout(context.Background(), 500*time.Millisecond) - WatchOperationDelete(ctx, etcdTestCli, op11.Task, op11.Source, rev5, wch, ech) + ctx, cancel = context.WithCancel(context.Background()) + go WatchOperationDelete(ctx, etcdTestCli, op11.Task, op11.Source, rev5, wch, ech) + utils.WaitSomething(10, 500*time.Millisecond, func() bool { + return len(wch) != 0 + }) cancel() close(wch) close(ech) diff --git a/pkg/streamer/reader_test.go b/pkg/streamer/reader_test.go index 631abe0c10..3bd1152aef 100644 --- a/pkg/streamer/reader_test.go +++ b/pkg/streamer/reader_test.go @@ -36,7 +36,7 @@ import ( "github.com/pingcap/dm/pkg/terror" ) -var parseFileTimeout = 3 * time.Second +var parseFileTimeout = 10 * time.Second var _ = Suite(&testReaderSuite{}) diff --git a/tests/ha_cases/run.sh b/tests/ha_cases/run.sh index 52ba1739d0..ace0487ce2 100755 --- a/tests/ha_cases/run.sh +++ b/tests/ha_cases/run.sh @@ -72,11 +72,20 @@ function test_multi_task_running() { sleep 3 # wait for flush checkpoint echo "use sync_diff_inspector to check increment data" - check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - check_sync_diff $WORK_DIR $cur/conf/diff_config_multi_task.toml + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 10 || print_debug_status + check_sync_diff $WORK_DIR $cur/conf/diff_config_multi_task.toml 10 || print_debug_status echo "[$(date)] <<<<<< finish test_multi_task_running >>>>>>" } +function print_debug_status() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT3" \ + "query-status test" \ + "fail me!" 1 && \ + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT3" \ + "query-status test2" \ + "fail me!" 1 && exit 1 +} + function test_join_masters_and_worker { echo "[$(date)] <<<<<< start test_join_masters_and_worker >>>>>>" diff --git a/tests/sharding2/run.sh b/tests/sharding2/run.sh index 20032b52d8..dfeec56805 100755 --- a/tests/sharding2/run.sh +++ b/tests/sharding2/run.sh @@ -79,7 +79,7 @@ function run() { echo "check sync diff for the increment replication" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ "\"stage\": \"Running\"" 2 \ "\"unit\": \"Sync\"" 2