Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

test: improve stability for tests #785

Merged
merged 11 commits into from
Jul 8, 2020
Merged
4 changes: 2 additions & 2 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ var (
errCheckSyncConfig = "(?m).*check sync config with error.*"
errCheckSyncConfigReg = fmt.Sprintf("(?m).*%s.*", errCheckSyncConfig)
testEtcdCluster *integration.ClusterV3
keepAliveTTL = int64(1)
keepAliveTTL = int64(10)
etcdTestCli *clientv3.Client
)

Expand Down Expand Up @@ -258,7 +258,7 @@ func testMockScheduler(ctx context.Context, wg *sync.WaitGroup, c *check.C, sour
cfg := config.NewSourceConfig()
cfg.SourceID = sources[i]
cfg.From.Password = password
c.Assert(scheduler2.AddSourceCfg(*cfg), check.IsNil)
c.Assert(scheduler2.AddSourceCfg(*cfg), check.IsNil, check.Commentf("all sources: %v", sources))
wg.Add(1)
ctx1, cancel1 := context.WithCancel(ctx)
cancels = append(cancels, cancel1)
Expand Down
51 changes: 36 additions & 15 deletions dm/master/shardddl/optimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
var (
backOff = 30
waitTime = 100 * time.Millisecond
watchTimeout = 2 * time.Second
watchTimeout = 500 * time.Millisecond
logger = log.L()
o = NewOptimist(&logger)

Expand Down Expand Up @@ -254,8 +254,11 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
// wait operation for i11 become available.
opCh := make(chan optimism.Operation, 10)
errCh := make(chan error, 10)
ctx2, cancel2 := context.WithTimeout(ctx, watchTimeout)
optimism.WatchOperationPut(ctx2, etcdTestCli, i11.Task, i11.Source, i11.UpSchema, i11.UpTable, rev1, opCh, errCh)
ctx2, cancel2 := context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i11.Task, i11.Source, i11.UpSchema, i11.UpTable, rev1, opCh, errCh)
utils.WaitSomething(10, watchTimeout, func() bool {
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
return len(opCh) != 0
})
cancel2()
close(opCh)
close(errCh)
Expand Down Expand Up @@ -309,8 +312,11 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
// wait operation for i12 become available.
opCh = make(chan optimism.Operation, 10)
errCh = make(chan error, 10)
ctx2, cancel2 = context.WithTimeout(ctx, watchTimeout)
optimism.WatchOperationPut(ctx2, etcdTestCli, i12.Task, i12.Source, i12.UpSchema, i12.UpTable, rev2, opCh, errCh)
ctx2, cancel2 = context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i12.Task, i12.Source, i12.UpSchema, i12.UpTable, rev2, opCh, errCh)
utils.WaitSomething(10, watchTimeout, func() bool {
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
return len(opCh) != 0
})
cancel2()
close(opCh)
close(errCh)
Expand Down Expand Up @@ -356,8 +362,11 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
// wait operation for i21 become available.
opCh = make(chan optimism.Operation, 10)
errCh = make(chan error, 10)
ctx2, cancel2 = context.WithTimeout(ctx, watchTimeout)
optimism.WatchOperationPut(ctx2, etcdTestCli, i21.Task, i21.Source, i21.UpSchema, i21.UpTable, rev1, opCh, errCh)
ctx2, cancel2 = context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i21.Task, i21.Source, i21.UpSchema, i21.UpTable, rev1, opCh, errCh)
utils.WaitSomething(10, watchTimeout, func() bool {
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
return len(opCh) != 0
})
cancel2()
close(opCh)
close(errCh)
Expand Down Expand Up @@ -420,8 +429,11 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
// wait operation for i23 become available.
opCh = make(chan optimism.Operation, 10)
errCh = make(chan error, 10)
ctx2, cancel2 = context.WithTimeout(ctx, watchTimeout)
optimism.WatchOperationPut(ctx2, etcdTestCli, i23.Task, i23.Source, i23.UpSchema, i23.UpTable, rev3, opCh, errCh)
ctx2, cancel2 = context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i23.Task, i23.Source, i23.UpSchema, i23.UpTable, rev3, opCh, errCh)
utils.WaitSomething(10, watchTimeout, func() bool {
return len(opCh) != 0
})
cancel2()
close(opCh)
close(errCh)
Expand All @@ -437,8 +449,11 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
// wait until operation for i12 ready.
opCh = make(chan optimism.Operation, 10)
errCh = make(chan error, 10)
ctx2, cancel2 = context.WithTimeout(ctx, watchTimeout)
optimism.WatchOperationPut(ctx2, etcdTestCli, i12.Task, i12.Source, i12.UpSchema, i12.UpTable, rev2, opCh, errCh)
ctx2, cancel2 = context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i12.Task, i12.Source, i12.UpSchema, i12.UpTable, rev2, opCh, errCh)
utils.WaitSomething(10, watchTimeout, func() bool {
return len(opCh) != 0
})
cancel2()
close(opCh)
close(errCh)
Expand Down Expand Up @@ -537,8 +552,11 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
// wait operation for i31 become available.
opCh = make(chan optimism.Operation, 10)
errCh = make(chan error, 10)
ctx2, cancel2 = context.WithTimeout(ctx, watchTimeout)
optimism.WatchOperationPut(ctx2, etcdTestCli, i31.Task, i31.Source, i31.UpSchema, i31.UpTable, rev1, opCh, errCh)
ctx2, cancel2 = context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i31.Task, i31.Source, i31.UpSchema, i31.UpTable, rev1, opCh, errCh)
utils.WaitSomething(10, watchTimeout, func() bool {
return len(opCh) != 0
})
cancel2()
close(opCh)
close(errCh)
Expand Down Expand Up @@ -587,8 +605,11 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
// wait operation for i33 become available.
opCh = make(chan optimism.Operation, 10)
errCh = make(chan error, 10)
ctx2, cancel2 = context.WithTimeout(ctx, watchTimeout)
optimism.WatchOperationPut(ctx2, etcdTestCli, i33.Task, i33.Source, i33.UpSchema, i33.UpTable, rev3, opCh, errCh)
ctx2, cancel2 = context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i33.Task, i33.Source, i33.UpSchema, i33.UpTable, rev3, opCh, errCh)
utils.WaitSomething(10, watchTimeout, func() bool {
return len(opCh) != 0
})
cancel2()
close(opCh)
close(errCh)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ha/keepalive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (t *testForEtcd) TestWorkerKeepAlive(c *C) {
cancels = append(cancels, cancel1)
go func(ctx context.Context) {
err1 := KeepAlive(ctx, etcdTestCli, worker, keepAliveTTL)
c.Assert(err1, IsNil)
c.Assert(err1, IsNil, Commentf("if \"context canceled\", retry later\ncause: context used in `KeepAlive` exceed timeout of 10s (`etcdutil.DefaultRequestTimeout`)"))
atomic.AddInt32(&finished, 1)
}(ctx1)

Expand Down
9 changes: 7 additions & 2 deletions pkg/shardddl/pessimism/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.etcd.io/etcd/integration"

"github.com/pingcap/dm/dm/common"
"github.com/pingcap/dm/pkg/utils"
)

var (
Expand Down Expand Up @@ -118,12 +119,11 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
// start the watcher.
wch := make(chan Info, 10)
ech := make(chan error, 10)
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
WatchInfoPut(ctx, etcdTestCli, rev4+1, wch, ech) // revision+1
close(wch) // close the chan
close(ech)
Expand All @@ -132,6 +132,11 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
// put another key for a different task.
_, err = PutInfo(etcdTestCli, i21)
c.Assert(err, IsNil)
// wait response of WatchInfoPut, increase waiting time when resource shortage
utils.WaitSomething(10, 500*time.Millisecond, func() bool {
return len(wch) != 0
})
cancel()
wg.Wait()

// watch should only get i21.
Expand Down
23 changes: 17 additions & 6 deletions pkg/shardddl/pessimism/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/dm/pkg/utils"
)

func (t *testForEtcd) TestOperationJSON(c *C) {
Expand Down Expand Up @@ -65,8 +66,12 @@ func (t *testForEtcd) TestOperationEtcd(c *C) {
// start the watcher with the same revision as the last PUT for the specified task and source.
wch := make(chan Operation, 10)
ech := make(chan error, 10)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
WatchOperationPut(ctx, etcdTestCli, task1, source1, rev2, wch, ech)
ctx, cancel := context.WithCancel(context.Background())
go WatchOperationPut(ctx, etcdTestCli, task1, source1, rev2, wch, ech)
// wait response of WatchOperationPut, increase waiting time when resource shortage
utils.WaitSomething(10, 500*time.Millisecond, func() bool {
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
return len(wch) != 0
})
cancel()
close(wch)
close(ech)
Expand All @@ -84,8 +89,11 @@ func (t *testForEtcd) TestOperationEtcd(c *C) {
// start the watch with an older revision for all tasks and sources.
wch = make(chan Operation, 10)
ech = make(chan error, 10)
ctx, cancel = context.WithTimeout(context.Background(), 500*time.Millisecond)
WatchOperationPut(ctx, etcdTestCli, "", "", rev2, wch, ech)
ctx, cancel = context.WithCancel(context.Background())
go WatchOperationPut(ctx, etcdTestCli, "", "", rev2, wch, ech)
utils.WaitSomething(10, 500*time.Millisecond, func() bool {
return len(wch) != 0
})
cancel()
close(wch)
close(ech)
Expand Down Expand Up @@ -125,8 +133,11 @@ func (t *testForEtcd) TestOperationEtcd(c *C) {
// start watch with an older revision for the deleted op11.
wch = make(chan Operation, 10)
ech = make(chan error, 10)
ctx, cancel = context.WithTimeout(context.Background(), 500*time.Millisecond)
WatchOperationDelete(ctx, etcdTestCli, op11.Task, op11.Source, rev5, wch, ech)
ctx, cancel = context.WithCancel(context.Background())
go WatchOperationDelete(ctx, etcdTestCli, op11.Task, op11.Source, rev5, wch, ech)
utils.WaitSomething(10, 500*time.Millisecond, func() bool {
return len(wch) != 0
})
cancel()
close(wch)
close(ech)
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/pingcap/dm/pkg/terror"
)

var parseFileTimeout = 3 * time.Second
var parseFileTimeout = 10 * time.Second

var _ = Suite(&testReaderSuite{})

Expand Down
13 changes: 11 additions & 2 deletions tests/ha_cases/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,20 @@ function test_multi_task_running() {

sleep 3 # wait for flush checkpoint
echo "use sync_diff_inspector to check increment data"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
check_sync_diff $WORK_DIR $cur/conf/diff_config_multi_task.toml
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 10 || print_debug_status
check_sync_diff $WORK_DIR $cur/conf/diff_config_multi_task.toml 10 || print_debug_status
echo "[$(date)] <<<<<< finish test_multi_task_running >>>>>>"
}

function print_debug_status() {
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT3" \
"query-status test" \
"fail me!" 1 && \
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT3" \
"query-status test2" \
"fail me!" 1 && exit 1
}


function test_join_masters_and_worker {
echo "[$(date)] <<<<<< start test_join_masters_and_worker >>>>>>"
Expand Down
2 changes: 1 addition & 1 deletion tests/sharding2/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ function run() {
echo "check sync diff for the increment replication"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"stage\": \"Running\"" 2 \
"\"unit\": \"Sync\"" 2
Expand Down