diff --git a/engine/jobmaster/dm/dm_jobmaster_test.go b/engine/jobmaster/dm/dm_jobmaster_test.go index 3d02acad3f7..e10d24c01b7 100644 --- a/engine/jobmaster/dm/dm_jobmaster_test.go +++ b/engine/jobmaster/dm/dm_jobmaster_test.go @@ -108,6 +108,8 @@ func (t *testDMJobmasterSuite) TestRunDMJobMaster() { require.NoError(t.T(), err) mockServerMasterClient := client.NewMockServerMasterClient(gomock.NewController(t.T())) mockExecutorGroup := client.NewMockExecutorGroup() + broker := broker.NewBrokerForTesting("test-executor-id") + defer broker.Close() depsForTest := masterParamListForTest{ MessageHandlerManager: p2p.NewMockMessageHandlerManager(), MessageSender: p2p.NewMockMessageSender(), @@ -115,7 +117,7 @@ func (t *testDMJobmasterSuite) TestRunDMJobMaster() { BusinessClientConn: kvmock.NewMockClientConn(), ExecutorGroup: mockExecutorGroup, ServerMasterClient: mockServerMasterClient, - ResourceBroker: nil, + ResourceBroker: broker, } RegisterWorker() @@ -454,6 +456,10 @@ func (m *MockBaseJobmaster) Exit(ctx context.Context, exitReason framework.ExitR return args.Error(0) } +func (m *MockBaseJobmaster) IsS3StorageEnabled() bool { + return false +} + type MockCheckpointAgent struct { mu sync.Mutex mock.Mock diff --git a/engine/pkg/externalresource/broker/broker.go b/engine/pkg/externalresource/broker/broker.go index d71a4c54f94..ca750da1e70 100644 --- a/engine/pkg/externalresource/broker/broker.go +++ b/engine/pkg/externalresource/broker/broker.go @@ -418,6 +418,7 @@ func (b *DefaultBroker) Close() { } } +// IsS3StorageEnabled returns true if s3 storage is enabled. func (b *DefaultBroker) IsS3StorageEnabled() bool { _, ok := b.fileManagers[resModel.ResourceTypeS3] return ok diff --git a/engine/test/integration_tests/dm_many_tables_local/conf/diff_config.toml b/engine/test/integration_tests/dm_many_tables_local/conf/diff_config.toml new file mode 100644 index 00000000000..bdb2278d9d2 --- /dev/null +++ b/engine/test/integration_tests/dm_many_tables_local/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tiflow_engine_test/dm_many_tables/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["dm_many_tables.?*"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" diff --git a/engine/test/integration_tests/dm_many_tables_local/conf/job.yaml b/engine/test/integration_tests/dm_many_tables_local/conf/job.yaml new file mode 100644 index 00000000000..5f9bed104bf --- /dev/null +++ b/engine/test/integration_tests/dm_many_tables_local/conf/job.yaml @@ -0,0 +1,19 @@ +task-mode: full +target-database: + host: host.docker.internal + port: 4000 + user: root + password: '' +upstreams: + - db-config: + host: host.docker.internal + port: 3306 + user: root + password: '' + source-id: mysql-01 + block-allow-list: balist-01 + loader-thread: 1 +block-allow-list: + balist-01: + do-dbs: + - dm_many_tables diff --git a/engine/test/integration_tests/dm_many_tables_local/run.sh b/engine/test/integration_tests/dm_many_tables_local/run.sh new file mode 100644 index 00000000000..369205f5c35 --- /dev/null +++ b/engine/test/integration_tests/dm_many_tables_local/run.sh @@ -0,0 +1,42 @@ +#!/bin/bash + +set -eu + +WORK_DIR=$OUT_DIR/$TEST_NAME +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) + +CONFIG="$DOCKER_COMPOSE_DIR/3m3e.yaml $DOCKER_COMPOSE_DIR/dm_databases.yaml" +CONFIG=$(adjust_config $OUT_DIR $TEST_NAME $CONFIG) +echo "using adjusted configs to deploy cluster: $CONFIG" +TABLE_NUM=500 + +function run() { + start_engine_cluster $CONFIG + wait_mysql_online.sh --port 3306 + wait_mysql_online.sh --port 4000 + + # prepare data + run_sql 'DROP DATABASE IF EXISTS dm_many_tables' + run_sql 'CREATE DATABASE dm_many_tables;' + for i in $(seq $TABLE_NUM); do + run_sql --quiet "CREATE TABLE dm_many_tables.t$i(i TINYINT, j INT UNIQUE KEY);" + for j in $(seq 2); do + run_sql --quiet "INSERT INTO dm_many_tables.t$i VALUES ($j,${j}000$j),($j,${j}001$j);" + done + # to make the tables have odd number of lines before 'ALTER TABLE' command, for check_sync_diff to work correctly + run_sql --quiet "INSERT INTO dm_many_tables.t$i VALUES (9, 90009);" + done + + # create job & wait for job finished + job_id=$(create_job "DM" "$CUR_DIR/conf/job.yaml" "dm_many_tables") + # check progress is forwarded gradually, not jump to "finished" + exec_with_retry --count 500 "curl \"http://127.0.0.1:10245/api/v1/jobs/$job_id/status\" | tee /dev/stderr | jq -e '.task_status.\"mysql-01\".status.status | .finishedBytes > 0 and .finishedBytes < .totalBytes'" + exec_with_retry --count 100 "curl \"http://127.0.0.1:10245/api/v1/jobs/$job_id\" | tee /dev/stderr | jq -e '.state == \"Finished\"'" + + # check data + check_sync_diff $WORK_DIR $CUR_DIR/conf/diff_config.toml 1 +} + +trap "stop_engine_cluster $WORK_DIR $CONFIG" EXIT +run $* +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"