diff --git a/br/cmd/br/BUILD.bazel b/br/cmd/br/BUILD.bazel index 2958366d93c4f..278044d0e6db5 100644 --- a/br/cmd/br/BUILD.bazel +++ b/br/cmd/br/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//br/pkg/redact", "//br/pkg/restore", "//br/pkg/rtree", + "//br/pkg/streamhelper/config", "//br/pkg/summary", "//br/pkg/task", "//br/pkg/trace", diff --git a/br/pkg/conn/BUILD.bazel b/br/pkg/conn/BUILD.bazel index b61518eb2f44d..ad1bcacaaac8d 100644 --- a/br/pkg/conn/BUILD.bazel +++ b/br/pkg/conn/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/logbackuppb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//oracle", @@ -26,9 +27,7 @@ go_library( "@com_github_tikv_client_go_v2//txnkv/txnlock", "@com_github_tikv_pd_client//:client", "@org_golang_google_grpc//:grpc", - "@org_golang_google_grpc//backoff", "@org_golang_google_grpc//codes", - "@org_golang_google_grpc//credentials", "@org_golang_google_grpc//keepalive", "@org_golang_google_grpc//status", "@org_uber_go_zap//:zap", diff --git a/br/pkg/lightning/restore/BUILD.bazel b/br/pkg/lightning/restore/BUILD.bazel index 80befee3774fd..3586907693301 100644 --- a/br/pkg/lightning/restore/BUILD.bazel +++ b/br/pkg/lightning/restore/BUILD.bazel @@ -38,7 +38,6 @@ go_library( "//br/pkg/utils", "//br/pkg/version", "//br/pkg/version/build", - "//config", "//kv", "//meta/autoid", "//parser", @@ -109,10 +108,13 @@ go_test( "//ddl", "//errno", "//kv", + "//meta", + "//meta/autoid", "//parser", "//parser/ast", "//parser/model", "//parser/mysql", + "//store/mockstore", "//store/pdtypes", "//table/tables", "//types", diff --git a/br/pkg/logutil/BUILD.bazel b/br/pkg/logutil/BUILD.bazel index 5a8df97911de6..1443a6bfd4639 100644 --- a/br/pkg/logutil/BUILD.bazel +++ b/br/pkg/logutil/BUILD.bazel @@ -12,6 +12,7 @@ go_library( deps = [ "//br/pkg/lightning/metric", "//br/pkg/redact", + "//kv", "@com_github_google_uuid//:uuid", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/brpb", diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index e18abc5e82b59..4f242d49de7e5 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -21,6 +21,7 @@ go_library( importpath = "github.com/pingcap/tidb/br/pkg/restore", visibility = ["//visibility:public"], deps = [ + "//br/pkg/backup", "//br/pkg/checksum", "//br/pkg/conn", "//br/pkg/errors", @@ -45,6 +46,7 @@ go_library( "//statistics/handle", "//store/pdtypes", "//tablecodec", + "//util", "//util/codec", "//util/hack", "//util/mathutil", @@ -109,7 +111,9 @@ go_test( "//br/pkg/mock", "//br/pkg/rtree", "//br/pkg/storage", + "//br/pkg/stream", "//br/pkg/utils", + "//infoschema", "//kv", "//meta/autoid", "//parser/model", @@ -122,8 +126,10 @@ go_test( "//testkit/testsetup", "//types", "//util/codec", + "//util/mathutil", "@com_github_golang_protobuf//proto", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/encryptionpb", "@com_github_pingcap_kvproto//pkg/errorpb", diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index 46150497b872b..762df1ae59957 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -33,6 +33,7 @@ go_library( "@com_github_aws_aws_sdk_go//aws/session", "@com_github_aws_aws_sdk_go//service/s3", "@com_github_aws_aws_sdk_go//service/s3/s3iface", + "@com_github_aws_aws_sdk_go//service/s3/s3manager", "@com_github_azure_azure_sdk_for_go_sdk_azidentity//:azidentity", "@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//:azblob", "@com_github_google_uuid//:uuid", diff --git a/br/pkg/stream/BUILD.bazel b/br/pkg/stream/BUILD.bazel index 7d2ac25a863c9..15ee92d85b2a2 100644 --- a/br/pkg/stream/BUILD.bazel +++ b/br/pkg/stream/BUILD.bazel @@ -3,11 +3,8 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "stream", srcs = [ - "client.go", "decode_kv.go", "meta_kv.go", - "models.go", - "prefix_scanner.go", "rewrite_meta_rawkv.go", "stream_mgr.go", "stream_status.go", @@ -15,14 +12,13 @@ go_library( importpath = "github.com/pingcap/tidb/br/pkg/stream", visibility = ["//visibility:public"], deps = [ - "//br/pkg/backup", - "//br/pkg/conn", "//br/pkg/errors", "//br/pkg/glue", "//br/pkg/httputil", "//br/pkg/logutil", - "//br/pkg/redact", "//br/pkg/storage", + "//br/pkg/streamhelper", + "//br/pkg/utils", "//kv", "//meta", "//parser/model", @@ -31,14 +27,12 @@ go_library( "//util/codec", "//util/table-filter", "@com_github_fatih_color//:color", - "@com_github_gogo_protobuf//proto", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", - "@com_github_tikv_client_go_v2//kv", "@com_github_tikv_client_go_v2//oracle", - "@io_etcd_go_etcd_client_v3//:client", + "@com_github_tikv_pd_client//:client", "@org_golang_x_sync//errgroup", "@org_uber_go_zap//:zap", ], @@ -48,28 +42,19 @@ go_test( name = "stream_test", srcs = [ "decode_kv_test.go", - "integration_test.go", "meta_kv_test.go", "rewrite_meta_rawkv_test.go", "stream_misc_test.go", ], embed = [":stream"], deps = [ - "//br/pkg/errors", - "//br/pkg/logutil", - "//br/pkg/storage", + "//br/pkg/streamhelper", "//meta", "//parser/model", "//tablecodec", "//util/codec", - "@com_github_pingcap_errors//:errors", + "//util/table-filter", "@com_github_pingcap_kvproto//pkg/brpb", - "@com_github_pingcap_log//:log", "@com_github_stretchr_testify//require", - "@com_github_tikv_client_go_v2//kv", - "@io_etcd_go_etcd_client_v3//:client", - "@io_etcd_go_etcd_server_v3//embed", - "@io_etcd_go_etcd_server_v3//mvcc", - "@org_uber_go_zap//:zap", ], ) diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel new file mode 100644 index 0000000000000..4af832cbf1c26 --- /dev/null +++ b/br/pkg/streamhelper/BUILD.bazel @@ -0,0 +1,81 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "streamhelper", + srcs = [ + "advancer.go", + "advancer_daemon.go", + "advancer_env.go", + "client.go", + "collector.go", + "models.go", + "prefix_scanner.go", + "regioniter.go", + "stream_listener.go", + "tsheap.go", + ], + importpath = "github.com/pingcap/tidb/br/pkg/streamhelper", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/errors", + "//br/pkg/logutil", + "//br/pkg/redact", + "//br/pkg/streamhelper/config", + "//br/pkg/utils", + "//config", + "//kv", + "//metrics", + "//owner", + "@com_github_gogo_protobuf//proto", + "@com_github_golang_protobuf//proto", + "@com_github_google_btree//:btree", + "@com_github_google_uuid//:uuid", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/logbackuppb", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_log//:log", + "@com_github_tikv_client_go_v2//kv", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_pd_client//:client", + "@io_etcd_go_etcd_client_v3//:client", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//keepalive", + "@org_golang_x_sync//errgroup", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "streamhelper_test", + srcs = [ + "advancer_test.go", + "basic_lib_for_test.go", + "integration_test.go", + "tsheap_test.go", + ], + deps = [ + ":streamhelper", + "//br/pkg/errors", + "//br/pkg/logutil", + "//br/pkg/storage", + "//br/pkg/streamhelper/config", + "//kv", + "//tablecodec", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/errorpb", + "@com_github_pingcap_kvproto//pkg/logbackuppb", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_log//:log", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//kv", + "@io_etcd_go_etcd_client_v3//:client", + "@io_etcd_go_etcd_server_v3//embed", + "@io_etcd_go_etcd_server_v3//mvcc", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", + "@org_uber_go_zap//zapcore", + ], +) diff --git a/br/pkg/streamhelper/config/BUILD.bazel b/br/pkg/streamhelper/config/BUILD.bazel new file mode 100644 index 0000000000000..1911b1ac82605 --- /dev/null +++ b/br/pkg/streamhelper/config/BUILD.bazel @@ -0,0 +1,9 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "config", + srcs = ["advancer_conf.go"], + importpath = "github.com/pingcap/tidb/br/pkg/streamhelper/config", + visibility = ["//visibility:public"], + deps = ["@com_github_spf13_pflag//:pflag"], +) diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index 9cdc23114f2be..b01d47de6de22 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -26,6 +26,8 @@ go_library( "//br/pkg/rtree", "//br/pkg/storage", "//br/pkg/stream", + "//br/pkg/streamhelper", + "//br/pkg/streamhelper/config", "//br/pkg/summary", "//br/pkg/utils", "//br/pkg/version", @@ -79,6 +81,7 @@ go_test( "//br/pkg/pdutil", "//br/pkg/restore", "//br/pkg/storage", + "//br/pkg/stream", "//br/pkg/utils", "//config", "//parser/model", diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index a000479c29696..b708ec2fa7979 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -17,12 +17,14 @@ go_library( "retry.go", "safe_point.go", "schema.go", + "store_manager.go", "worker.go", ], importpath = "github.com/pingcap/tidb/br/pkg/utils", visibility = ["//visibility:public"], deps = [ "//br/pkg/errors", + "//br/pkg/logutil", "//br/pkg/metautil", "//errno", "//parser/model", @@ -38,7 +40,11 @@ go_library( "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_pd_client//:client", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//backoff", "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//credentials", + "@org_golang_google_grpc//keepalive", "@org_golang_google_grpc//status", "@org_golang_x_net//http/httpproxy", "@org_golang_x_sync//errgroup", diff --git a/config/BUILD.bazel b/config/BUILD.bazel index 222adfd336194..fd000f17f4b8b 100644 --- a/config/BUILD.bazel +++ b/config/BUILD.bazel @@ -10,6 +10,7 @@ go_library( importpath = "github.com/pingcap/tidb/config", visibility = ["//visibility:public"], deps = [ + "//br/pkg/streamhelper/config", "//parser/terror", "//types/json", "//util/logutil", diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index 3327bf550acb9..f02b31b630067 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -172,6 +172,7 @@ go_test( deps = [ "//config", "//ddl/placement", + "//ddl/schematracker", "//ddl/testutil", "//ddl/util", "//domain", @@ -223,6 +224,7 @@ go_test( "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//testutils", diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index 160d238b51eb5..5a086511fc186 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -16,7 +16,6 @@ package ddl import ( "context" - "fmt" "testing" "time" @@ -33,8 +32,6 @@ import ( "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/table/tables" - "github.com/pingcap/tidb/testkit/testutil" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/mock" @@ -56,11 +53,6 @@ func (d *ddl) SetInterceptor(i Interceptor) { d.mu.interceptor = i } -// generalWorker returns the general worker. -func (d *ddl) generalWorker() *worker { - return d.workers[generalWorker] -} - // JobNeedGCForTest is only used for test. var JobNeedGCForTest = jobNeedGC @@ -509,299 +501,6 @@ func testCheckSchemaState(test *testing.T, d *ddl, dbInfo *model.DBInfo, state m } } -type testCtxKeyType int - -func (k testCtxKeyType) String() string { - return "test_ctx_key" -} - -const testCtxKey testCtxKeyType = 0 - -func TestReorg(t *testing.T) { - tests := []struct { - isCommonHandle bool - handle kv.Handle - startKey kv.Handle - endKey kv.Handle - }{ - { - false, - kv.IntHandle(100), - kv.IntHandle(1), - kv.IntHandle(0), - }, - { - true, - testutil.MustNewCommonHandle(t, "a", 100, "string"), - testutil.MustNewCommonHandle(t, 100, "string"), - testutil.MustNewCommonHandle(t, 101, "string"), - }, - } - - for _, test := range tests { - t.Run(fmt.Sprintf("isCommandHandle(%v)", test.isCommonHandle), func(t *testing.T) { - store := createMockStore(t) - defer func() { - require.NoError(t, store.Close()) - }() - - d, err := testNewDDLAndStart( - context.Background(), - WithStore(store), - WithLease(testLease), - ) - require.NoError(t, err) - defer func() { - err := d.Stop() - require.NoError(t, err) - }() - - time.Sleep(testLease) - - sctx := testNewContext(d) - - sctx.SetValue(testCtxKey, 1) - require.Equal(t, sctx.Value(testCtxKey), 1) - sctx.ClearValue(testCtxKey) - - err = sessiontxn.NewTxn(context.Background(), sctx) - require.NoError(t, err) - txn, err := sctx.Txn(true) - require.NoError(t, err) - err = txn.Set([]byte("a"), []byte("b")) - require.NoError(t, err) - err = txn.Rollback() - require.NoError(t, err) - - err = sessiontxn.NewTxn(context.Background(), sctx) - require.NoError(t, err) - txn, err = sctx.Txn(true) - require.NoError(t, err) - err = txn.Set([]byte("a"), []byte("b")) - require.NoError(t, err) - err = txn.Commit(context.Background()) - require.NoError(t, err) - - rowCount := int64(10) - handle := test.handle - job := &model.Job{ - ID: 1, - SnapshotVer: 1, // Make sure it is not zero. So the reorgInfo's first is false. - } - err = sessiontxn.NewTxn(context.Background(), sctx) - require.NoError(t, err) - txn, err = sctx.Txn(true) - require.NoError(t, err) - m := meta.NewMeta(txn) - e := &meta.Element{ID: 333, TypeKey: meta.IndexElementKey} - rInfo := &reorgInfo{ - Job: job, - currElement: e, - d: d.ddlCtx, - } - f := func() error { - d.getReorgCtx(job).setRowCount(rowCount) - d.getReorgCtx(job).setNextKey(handle.Encoded()) - time.Sleep(1*ReorgWaitTimeout + 100*time.Millisecond) - return nil - } - mockTbl := tables.MockTableFromMeta(&model.TableInfo{IsCommonHandle: test.isCommonHandle, CommonHandleVersion: 1}) - err = d.generalWorker().runReorgJob(newReorgHandler(m), rInfo, mockTbl.Meta(), d.lease, f) - require.Error(t, err) - - // The longest to wait for 5 seconds to make sure the function of f is returned. - for i := 0; i < 1000; i++ { - time.Sleep(5 * time.Millisecond) - err = d.generalWorker().runReorgJob(newReorgHandler(m), rInfo, mockTbl.Meta(), d.lease, f) - if err == nil { - require.Equal(t, job.RowCount, rowCount) - - // Test whether reorgInfo's Handle is update. - err = txn.Commit(context.Background()) - require.NoError(t, err) - err = sessiontxn.NewTxn(context.Background(), sctx) - require.NoError(t, err) - - m = meta.NewMeta(txn) - info, err1 := getReorgInfo(NewJobContext(), d.ddlCtx, newReorgHandler(m), job, mockTbl, nil) - require.NoError(t, err1) - require.Equal(t, info.StartKey, kv.Key(handle.Encoded())) - require.Equal(t, info.currElement, e) - break - } - } - require.NoError(t, err) - - job = &model.Job{ - ID: 2, - SchemaID: 1, - Type: model.ActionCreateSchema, - Args: []interface{}{model.NewCIStr("test")}, - SnapshotVer: 1, // Make sure it is not zero. So the reorgInfo's first is false. - } - - element := &meta.Element{ID: 123, TypeKey: meta.ColumnElementKey} - info := &reorgInfo{ - Job: job, - d: d.ddlCtx, - currElement: element, - StartKey: test.startKey.Encoded(), - EndKey: test.endKey.Encoded(), - PhysicalTableID: 456, - } - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) - err = kv.RunInNewTxn(ctx, d.store, false, func(ctx context.Context, txn kv.Transaction) error { - m := meta.NewMeta(txn) - var err1 error - _, err1 = getReorgInfo(NewJobContext(), d.ddlCtx, newReorgHandler(m), job, mockTbl, []*meta.Element{element}) - require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err1)) - require.Equal(t, job.SnapshotVer, uint64(0)) - return nil - }) - require.NoError(t, err) - job.SnapshotVer = uint64(1) - err = info.UpdateReorgMeta(info.StartKey) - require.NoError(t, err) - err = kv.RunInNewTxn(ctx, d.store, false, func(ctx context.Context, txn kv.Transaction) error { - m := meta.NewMeta(txn) - info1, err1 := getReorgInfo(NewJobContext(), d.ddlCtx, newReorgHandler(m), job, mockTbl, []*meta.Element{element}) - require.NoError(t, err1) - require.Equal(t, info1.currElement, info.currElement) - require.Equal(t, info1.StartKey, info.StartKey) - require.Equal(t, info1.EndKey, info.EndKey) - require.Equal(t, info1.PhysicalTableID, info.PhysicalTableID) - return nil - }) - require.NoError(t, err) - - err = d.Stop() - require.NoError(t, err) - err = d.generalWorker().runReorgJob(newReorgHandler(m), rInfo, mockTbl.Meta(), d.lease, func() error { - time.Sleep(4 * testLease) - return nil - }) - require.Error(t, err) - txn, err = sctx.Txn(true) - require.NoError(t, err) - err = txn.Commit(context.Background()) - require.NoError(t, err) - }) - } -} - -func TestCancelJobs(t *testing.T) { - store, clean := newMockStore(t) - defer clean() - - txn, err := store.Begin() - require.NoError(t, err) - - m := meta.NewMeta(txn) - cnt := 10 - ids := make([]int64, cnt) - for i := 0; i < cnt; i++ { - job := &model.Job{ - ID: int64(i), - SchemaID: 1, - Type: model.ActionCreateTable, - } - if i == 0 { - job.State = model.JobStateDone - } - if i == 1 { - job.State = model.JobStateCancelled - } - ids[i] = int64(i) - err = m.EnQueueDDLJob(job) - require.NoError(t, err) - } - - errs, err := CancelJobs(txn, ids) - require.NoError(t, err) - for i, err := range errs { - if i == 0 { - require.Error(t, err) - continue - } - require.NoError(t, err) - } - - errs, err = CancelJobs(txn, []int64{}) - require.NoError(t, err) - require.Nil(t, errs) - - errs, err = CancelJobs(txn, []int64{-1}) - require.NoError(t, err) - require.Error(t, errs[0]) - require.Regexp(t, "DDL Job:-1 not found$", errs[0].Error()) - - // test cancel finish job. - job := &model.Job{ - ID: 100, - SchemaID: 1, - Type: model.ActionCreateTable, - State: model.JobStateDone, - } - err = m.EnQueueDDLJob(job) - require.NoError(t, err) - errs, err = CancelJobs(txn, []int64{100}) - require.NoError(t, err) - require.Error(t, errs[0]) - require.Regexp(t, "This job:100 is finished, so can't be cancelled$", errs[0].Error()) - - // test can't cancelable job. - job.Type = model.ActionDropIndex - job.SchemaState = model.StateWriteOnly - job.State = model.JobStateRunning - job.ID = 101 - err = m.EnQueueDDLJob(job) - require.NoError(t, err) - errs, err = CancelJobs(txn, []int64{101}) - require.NoError(t, err) - require.Error(t, errs[0]) - require.Regexp(t, "This job:101 is almost finished, can't be cancelled now$", errs[0].Error()) - - // When both types of jobs exist in the DDL queue, - // we first cancel the job with a larger ID. - job = &model.Job{ - ID: 1000, - SchemaID: 1, - TableID: 2, - Type: model.ActionAddIndex, - } - job1 := &model.Job{ - ID: 1001, - SchemaID: 1, - TableID: 2, - Type: model.ActionAddColumn, - } - job2 := &model.Job{ - ID: 1002, - SchemaID: 1, - TableID: 2, - Type: model.ActionAddIndex, - } - job3 := &model.Job{ - ID: 1003, - SchemaID: 1, - TableID: 2, - Type: model.ActionRepairTable, - } - require.NoError(t, m.EnQueueDDLJob(job, meta.AddIndexJobListKey)) - require.NoError(t, m.EnQueueDDLJob(job1)) - require.NoError(t, m.EnQueueDDLJob(job2, meta.AddIndexJobListKey)) - require.NoError(t, m.EnQueueDDLJob(job3)) - - errs, err = CancelJobs(txn, []int64{job1.ID, job.ID, job2.ID, job3.ID}) - require.NoError(t, err) - for _, err := range errs { - require.NoError(t, err) - } - - err = txn.Rollback() - require.NoError(t, err) -} - func TestError(t *testing.T) { kvErrs := []*terror.Error{ dbterror.ErrDDLJobNotFound, @@ -814,16 +513,3 @@ func TestError(t *testing.T) { require.Equal(t, uint16(err.Code()), code) } } - -func newMockStore(t *testing.T) (store kv.Storage, clean func()) { - var err error - store, err = mockstore.NewMockStore() - require.NoError(t, err) - - clean = func() { - err = store.Close() - require.NoError(t, err) - } - - return -} diff --git a/ddl/schematracker/BUILD.bazel b/ddl/schematracker/BUILD.bazel new file mode 100644 index 0000000000000..a61470a7a1192 --- /dev/null +++ b/ddl/schematracker/BUILD.bazel @@ -0,0 +1,51 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "schematracker", + srcs = [ + "checker.go", + "dm_tracker.go", + "info_store.go", + ], + importpath = "github.com/pingcap/tidb/ddl/schematracker", + visibility = ["//visibility:public"], + deps = [ + "//ddl", + "//ddl/util", + "//executor", + "//infoschema", + "//kv", + "//meta/autoid", + "//owner", + "//parser/ast", + "//parser/charset", + "//parser/model", + "//sessionctx", + "//sessionctx/variable", + "//statistics/handle", + "//table", + "//table/tables", + "//tidb-binlog/pump_client", + "//util/collate", + "//util/dbterror", + "@com_github_ngaut_pools//:pools", + "@com_github_pingcap_errors//:errors", + ], +) + +go_test( + name = "schematracker_test", + srcs = [ + "dm_tracker_test.go", + "info_store_test.go", + ], + embed = [":schematracker"], + deps = [ + "//infoschema", + "//parser", + "//parser/ast", + "//parser/model", + "//util/mock", + "@com_github_stretchr_testify//require", + ], +) diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index b1c867746ac8d..d878a32ded8a6 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -17,6 +17,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//bindinfo", + "//br/pkg/streamhelper", "//config", "//ddl", "//ddl/util", diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index 6b5590f9429a0..46b816e9128cc 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -328,6 +328,7 @@ go_test( "//config", "//ddl", "//ddl/placement", + "//ddl/schematracker", "//ddl/testutil", "//ddl/util", "//distsql", diff --git a/metrics/BUILD.bazel b/metrics/BUILD.bazel index 6ff2486d8c333..ad34ac544fa0e 100644 --- a/metrics/BUILD.bazel +++ b/metrics/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "domain.go", "executor.go", "gc_worker.go", + "log_backup.go", "meta.go", "metrics.go", "owner.go", diff --git a/planner/core/BUILD.bazel b/planner/core/BUILD.bazel index 83342809499ce..e2ebf20dea339 100644 --- a/planner/core/BUILD.bazel +++ b/planner/core/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "core", srcs = [ + "access_object.go", "cache.go", "cacheable_checker.go", "collect_column_stats_usage.go", @@ -13,6 +14,7 @@ go_library( "explain.go", "expression_rewriter.go", "find_best_task.go", + "flat_plan.go", "fragment.go", "handle_cols.go", "hashcode.go", @@ -116,6 +118,7 @@ go_library( "//util/kvcache", "//util/logutil", "//util/mathutil", + "//util/memory", "//util/mock", "//util/paging", "//util/parser", @@ -154,6 +157,7 @@ go_test( "expression_rewriter_test.go", "expression_test.go", "find_best_task_test.go", + "flat_plan_test.go", "fragment_test.go", "indexmerge_test.go", "integration_partition_test.go", diff --git a/sessiontxn/BUILD.bazel b/sessiontxn/BUILD.bazel index 0738636e8299a..c884d7981e19e 100644 --- a/sessiontxn/BUILD.bazel +++ b/sessiontxn/BUILD.bazel @@ -4,8 +4,8 @@ go_library( name = "sessiontxn", srcs = [ "failpoint.go", + "future.go", "interface.go", - "txn.go", ], importpath = "github.com/pingcap/tidb/sessiontxn", visibility = ["//visibility:public"], @@ -14,12 +14,7 @@ go_library( "//kv", "//parser/ast", "//sessionctx", - "//sessionctx/variable", - "//table/temptable", "//util/stringutil", - "@com_github_opentracing_opentracing_go//:opentracing-go", - "@com_github_pingcap_kvproto//pkg/kvrpcpb", - "@com_github_tikv_client_go_v2//oracle", ], ) @@ -35,12 +30,16 @@ go_test( "//infoschema", "//kv", "//parser/ast", + "//parser/model", "//planner/core", "//sessionctx", + "//sessiontxn/internal", "//sessiontxn/staleread", + "//tablecodec", "//testkit", "//testkit/testfork", "//testkit/testsetup", + "//tests/realtikvtest", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", diff --git a/sessiontxn/internal/BUILD.bazel b/sessiontxn/internal/BUILD.bazel new file mode 100644 index 0000000000000..43e4dc2b8be1c --- /dev/null +++ b/sessiontxn/internal/BUILD.bazel @@ -0,0 +1,17 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "internal", + srcs = ["txn.go"], + importpath = "github.com/pingcap/tidb/sessiontxn/internal", + visibility = ["//sessiontxn:__subpackages__"], + deps = [ + "//kv", + "//sessionctx", + "//sessionctx/variable", + "//table/temptable", + "//util/logutil", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", + "@org_uber_go_zap//:zap", + ], +) diff --git a/sessiontxn/isolation/BUILD.bazel b/sessiontxn/isolation/BUILD.bazel index a05b08583768a..8ef29433ceb7d 100644 --- a/sessiontxn/isolation/BUILD.bazel +++ b/sessiontxn/isolation/BUILD.bazel @@ -22,9 +22,11 @@ go_library( "//sessionctx", "//sessionctx/variable", "//sessiontxn", + "//sessiontxn/internal", "//sessiontxn/staleread", "//table/temptable", "//util/logutil", + "@com_github_opentracing_opentracing_go//:opentracing-go", "@com_github_pingcap_errors//:errors", "@com_github_tikv_client_go_v2//error", "@com_github_tikv_client_go_v2//oracle", diff --git a/sessiontxn/staleread/BUILD.bazel b/sessiontxn/staleread/BUILD.bazel index d6272550153af..117623298d563 100644 --- a/sessiontxn/staleread/BUILD.bazel +++ b/sessiontxn/staleread/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//sessionctx", "//sessionctx/variable", "//sessiontxn", + "//sessiontxn/internal", "//table/temptable", "//types", "//util/dbterror", diff --git a/tests/realtikvtest/sessiontest/BUILD.bazel b/tests/realtikvtest/sessiontest/BUILD.bazel index 14923fa0fa623..1f7df628a05d9 100644 --- a/tests/realtikvtest/sessiontest/BUILD.bazel +++ b/tests/realtikvtest/sessiontest/BUILD.bazel @@ -17,7 +17,6 @@ go_test( "//domain", "//errno", "//executor", - "//infoschema", "//kv", "//meta/autoid", "//parser", @@ -31,11 +30,9 @@ go_test( "//session", "//sessionctx", "//sessionctx/variable", - "//sessiontxn", "//store/copr", "//store/mockstore", "//table/tables", - "//tablecodec", "//testkit", "//tests/realtikvtest", "//types",