Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracker(dm): close and recreate tracker when pause and resume #5350

Merged
merged 53 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
4775a7e
--wip-- [skip ci]
lance6716 May 7, 2022
c2f54f4
change unistore settings
lance6716 May 7, 2022
134bd55
add IT
lance6716 May 7, 2022
4c3ba73
reset schema tracker every time
lance6716 May 7, 2022
9516171
try fix panic
lance6716 May 7, 2022
6f42039
fix IT
lance6716 May 7, 2022
a6999c2
Merge branch 'master' of github.com:pingcap/ticdc into fix-unistore-u…
lance6716 May 7, 2022
d557967
save work
lance6716 May 7, 2022
f0d406b
make fmt
lance6716 May 7, 2022
6493a50
fix CI
lance6716 May 7, 2022
ba10208
try fix UT
lance6716 May 7, 2022
350d7f4
try fix CI
lance6716 May 7, 2022
5e982fb
refine mock order
lance6716 May 8, 2022
021b6f1
fix mock
lance6716 May 8, 2022
91784c8
fix IT
lance6716 May 8, 2022
39a326a
change logic for operate-schema
lance6716 May 9, 2022
9b3ef6f
fix some tests
lance6716 May 9, 2022
083b2d6
make fmt
lance6716 May 9, 2022
b5a52d4
fix test
lance6716 May 9, 2022
8ec1050
fix terror
lance6716 May 9, 2022
5023e04
Merge branch 'fix-unistore-usage' of github.com:lance6716/ticdc into …
lance6716 May 9, 2022
04e1ae9
change syncer prepare logic order
lance6716 May 10, 2022
6f30b8f
Merge branch 'master' of github.com:pingcap/ticdc into fix-unistore-u…
lance6716 May 10, 2022
e5ce635
try fix CI
lance6716 May 10, 2022
83235ed
try fix CI
lance6716 May 10, 2022
0913117
fix again
lance6716 May 10, 2022
e6a383b
Merge branch 'master' into fix-unistore-usage
lance6716 May 10, 2022
0f3f03a
weaken the test checking pattern
lance6716 May 10, 2022
bbed995
fix again
lance6716 May 10, 2022
5170b1c
fix again
lance6716 May 10, 2022
165e93d
fix test in a new day
lance6716 May 11, 2022
7de53ce
fix matching pattern
lance6716 May 11, 2022
b485b00
try another API
lance6716 May 11, 2022
5ae3a50
fix again
lance6716 May 11, 2022
0f3ae52
some fix
lance6716 May 11, 2022
a831a1f
Merge branch 'master' of github.com:pingcap/tiflow into fix-unistore-…
lance6716 May 11, 2022
7c6ff83
increase check_sync_diff retry
lance6716 May 11, 2022
878bae3
Merge branch 'master' into fix-unistore-usage
lance6716 May 11, 2022
6ca219a
fix again...
lance6716 May 11, 2022
4d76b6d
Merge branch 'fix-unistore-usage' of github.com:lance6716/ticdc into …
lance6716 May 11, 2022
6b72a40
add retryable error
lance6716 May 11, 2022
0d0f63a
fix openapi test
lance6716 May 11, 2022
1a7ef75
fix another test
lance6716 May 11, 2022
3e2ee06
fix another unstable test
lance6716 May 12, 2022
9b1c435
fix lint
lance6716 May 12, 2022
884a0a7
Merge branch 'master' into fix-unistore-usage
lance6716 May 12, 2022
5d69b01
Merge branch 'fix-unistore-usage' of github.com:lance6716/ticdc into …
lance6716 May 12, 2022
7b0e7d6
fix comment
lance6716 May 12, 2022
787c11c
fix unstable CI
lance6716 May 12, 2022
6195c35
fix another unstable test
lance6716 May 12, 2022
5be2376
address comment
lance6716 May 12, 2022
983b24c
fix unstable test
lance6716 May 12, 2022
247297f
Merge branch 'master' into fix-unistore-usage
ti-chi-bot May 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ ErrSyncerEventNotExist,[code=36066:class=sync-unit:scope=internal:level=high], "
ErrSyncerParseDDL,[code=36067:class=sync-unit:scope=internal:level=high], "Message: parse DDL: %s, Workaround: Please confirm your DDL statement is correct and needed. For TiDB compatible DDL, see https://docs.pingcap.com/tidb/stable/mysql-compatibility#ddl. You can use `handle-error` command to skip or replace the DDL or add a binlog filter rule to ignore it if the DDL is not needed."
ErrSyncerUnsupportedStmt,[code=36068:class=sync-unit:scope=internal:level=high], "Message: `%s` statement not supported in %s mode"
ErrSyncerGetEvent,[code=36069:class=sync-unit:scope=upstream:level=high], "Message: get binlog event error: %v, Workaround: Please check if the binlog file could be parsed by `mysqlbinlog`."
ErrSyncerDownstreamTableNotFound,[code=36070:class=sync-unit:scope=internal:level=high], "Message: downstream table %s not found"
ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium], "Message: nil request not valid"
ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium], "Message: op %s not supported"
ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium], "Message: operate request without --sharding specified not valid"
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func (w *SourceWorker) UpdateSubTask(ctx context.Context, cfg *config.SubTaskCon
return st.Update(ctx, cfg)
}

// OperateSubTask stop/resume/pause sub task.
// OperateSubTask stop/resume/pause sub task.
func (w *SourceWorker) OperateSubTask(name string, op pb.TaskOp) error {
w.Lock()
defer w.Unlock()
Expand Down
6 changes: 6 additions & 0 deletions dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2092,6 +2092,12 @@ description = ""
workaround = "Please check if the binlog file could be parsed by `mysqlbinlog`."
tags = ["upstream", "high"]

[error.DM-sync-unit-36070]
message = "downstream table %s not found"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-dm-master-38001]
message = "nil request not valid"
description = ""
Expand Down
48 changes: 23 additions & 25 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"
"sync"

"github.com/docker/go-units"
"github.com/pingcap/errors"
tidbConfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
Expand All @@ -37,7 +38,9 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
unistoreConfig "github.com/pingcap/tidb/store/mockstore/unistore/config"
"github.com/pingcap/tidb/util/filter"
"go.uber.org/atomic"
"go.uber.org/zap"

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
Expand All @@ -63,6 +66,11 @@ var (
}
)

func init() {
unistoreConfig.DefaultConf.Engine.VlogFileSize = 4 * units.MiB
unistoreConfig.DefaultConf.Engine.L1Size = 128 * units.MiB
}

// Tracker is used to track schema locally.
type Tracker struct {
// we're using an embedded tidb, there's no need to sync operations on it, but we may recreate(drop and create)
Expand All @@ -74,6 +82,7 @@ type Tracker struct {
dom *domain.Domain
se session.Session
dsTracker *downstreamTracker
closed atomic.Bool
}

// downstreamTracker tracks downstream schema.
Expand Down Expand Up @@ -282,10 +291,7 @@ func (tr *Tracker) GetCreateTable(ctx context.Context, table *filter.Table) (str

row := req.GetRow(0)
str := row.GetString(1) // the first column is the table name.
// returned as single line.
str = strings.ReplaceAll(str, "\n", "")
str = strings.ReplaceAll(str, " ", " ")
return str, nil
return utils.CreateTableSQLToOneRow(str), nil
}

// AllSchemas returns all schemas visible to the tracker (excluding system tables).
Expand Down Expand Up @@ -366,6 +372,12 @@ func (tr *Tracker) Reset() error {

// Close close a tracker.
func (tr *Tracker) Close() error {
if tr == nil {
return nil
}
if !tr.closed.CAS(false, true) {
return nil
}
tr.se.Close()
tr.dom.Close()
if err := tr.store.Close(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to remove tr.storePath before we start? I'm afraid an ungraceful stop may not remove the storage path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TempDir will return a newly created folder by appending suffix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh user may need to manually clean it to release disk space. Hope it's not occupy large space.

Expand Down Expand Up @@ -427,29 +439,15 @@ func (tr *Tracker) CreateTableIfNotExists(table *filter.Table, ti *model.TableIn
return tr.dom.DDL().CreateTableWithInfo(tr.se, schemaName, ti, ddl.OnExistIgnore)
}

func (tr *Tracker) RecreateTables(logCtx *tcontext.Context, tablesToDrop []*filter.Table, tablesToCreate map[string]map[string]*model.TableInfo) error {
tr.Lock()
defer tr.Unlock()
for _, tbl := range tablesToDrop {
// schema changed
if err := tr.DropTable(tbl); err != nil {
logCtx.L().Warn("failed to drop table from schema tracker",
zap.Stringer("table", tbl), log.ShortError(err))
}
}
for schemaName := range tablesToCreate {
// TODO: Figure out how to recover from errors.
if err := tr.CreateSchemaIfNotExists(schemaName); err != nil {
logCtx.L().Error("failed to rollback schema on schema tracker: cannot create schema",
zap.String("schema", schemaName), log.ShortError(err))
}
}
return tr.batchCreateTableIfNotExist(tablesToCreate)
}

func (tr *Tracker) batchCreateTableIfNotExist(tablesToCreate map[string]map[string]*model.TableInfo) error {
// BatchCreateTableIfNotExist will batch creating tables per schema. If the schema does not exist, it will create it.
// The argument is { database name -> { table name -> TableInfo } }.
func (tr *Tracker) BatchCreateTableIfNotExist(tablesToCreate map[string]map[string]*model.TableInfo) error {
tr.se.SetValue(sessionctx.QueryString, "skip")
for schema, tableNameInfo := range tablesToCreate {
if err := tr.CreateSchemaIfNotExists(schema); err != nil {
return err
}

var cloneTis []*model.TableInfo
for table, ti := range tableNameInfo {
cloneTi := cloneTableInfo(ti) // clone TableInfo w.r.t the warning of the CreateTable function
Expand Down
83 changes: 5 additions & 78 deletions dm/pkg/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func (s *trackerSuite) TestBatchCreateTableIfNotExist(c *C) {
for i := range tables {
tablesToCreate[tables[i].Schema][tables[i].Name] = tiInfos[i]
}
err = tracker.batchCreateTableIfNotExist(tablesToCreate)
err = tracker.BatchCreateTableIfNotExist(tablesToCreate)
c.Assert(err, IsNil)
// 3. check all create success
for i := range tables {
Expand All @@ -581,7 +581,7 @@ func (s *trackerSuite) TestBatchCreateTableIfNotExist(c *C) {
err = tracker.DropTable(tables[0])
c.Assert(err, IsNil)
// 2. batch create
err = tracker.batchCreateTableIfNotExist(tablesToCreate)
err = tracker.BatchCreateTableIfNotExist(tablesToCreate)
c.Assert(err, IsNil)
// 3. check
for i := range tables {
Expand All @@ -592,12 +592,12 @@ func (s *trackerSuite) TestBatchCreateTableIfNotExist(c *C) {
c.Assert(ti, DeepEquals, tiInfos[i])
}

// drop schema and raise error
// BatchCreateTableIfNotExist will also create database
ctx := context.Background()
err = tracker.Exec(ctx, "", `drop database testdb`)
c.Assert(err, IsNil)
err = tracker.batchCreateTableIfNotExist(tablesToCreate)
c.Assert(err, NotNil)
err = tracker.BatchCreateTableIfNotExist(tablesToCreate)
c.Assert(err, IsNil)
}

func (s *trackerSuite) TestAllSchemas(c *C) {
Expand Down Expand Up @@ -956,79 +956,6 @@ func (s *trackerSuite) TestPlacementRule(c *C) {
c.Assert(ok, IsTrue)
}

func TestTrackerRecreateTables(t *testing.T) {
cfg := &config.SubTaskConfig{}
backupKeys := downstreamVars
defer func() {
downstreamVars = backupKeys
}()
downstreamVars = []string{"sql_mode"}
db, _, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
con, err := db.Conn(context.Background())
require.NoError(t, err)
dbConn := dbconn.NewDBConn(cfg, conn.NewBaseConn(con, nil))
log.SetLevel(zapcore.ErrorLevel)

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn)
require.NoError(t, err)
defer func() {
err = tracker.Close()
require.NoError(t, err)
}()
err = tracker.CreateSchemaIfNotExists("testdb")
require.NoError(t, err)
err = tracker.CreateSchemaIfNotExists("testdb2")
require.NoError(t, err)

tables := []*filter.Table{
{Schema: "testdb", Name: "foo"},
{Schema: "testdb", Name: "foo1"},
{Schema: "testdb2", Name: "foo3"},
}
execStmt := []string{
`create table foo(a int primary key);`,
`create table foo1(a int primary key);`,
`create table foo3(a int primary key);`,
}
tiInfos := make([]*model.TableInfo, len(tables))
for i := range tables {
ctx := context.Background()
err = tracker.Exec(ctx, tables[i].Schema, execStmt[i])
require.NoError(t, err)
tiInfos[i], err = tracker.GetTableInfo(tables[i])
require.NoError(t, err)
require.NotNil(t, tiInfos[i])
require.Equal(t, tables[i].Name, tiInfos[i].Name.O)
tiInfos[i] = tiInfos[i].Clone()
clearVolatileInfo(tiInfos[i])
}

// drop one schema
require.NoError(t, tracker.dom.DDL().DropSchema(tracker.se, model.NewCIStr("testdb")))

// recreate tables
tablesToCreate := map[string]map[string]*model.TableInfo{}
tablesToCreate["testdb"] = map[string]*model.TableInfo{}
tablesToCreate["testdb2"] = map[string]*model.TableInfo{}
for i := range tables {
tablesToCreate[tables[i].Schema][tables[i].Name] = tiInfos[i]
}
tctx := tcontext.Background()
err = tracker.RecreateTables(tctx, tables, tablesToCreate)
require.NoError(t, err)
// check all create success
for i := range tables {
var ti *model.TableInfo
ti, err = tracker.GetTableInfo(tables[i])
require.NoError(t, err)
cloneTi := ti.Clone()
clearVolatileInfo(cloneTi)
require.Equal(t, tiInfos[i], cloneTi)
}
}

func TestNewTmpFolderForTracker(t *testing.T) {
got, err := newTmpFolderForTracker("task/db01")
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/shardddl/optimism/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ func (l *Lock) IsDroppedColumn(source, upSchema, upTable, col string) bool {
return true
}

// AddDroppedColumn adds a dropped column name in both etcd and lock's column map.
// AddDroppedColumns adds a dropped column name in both etcd and lock's column map.
func (l *Lock) AddDroppedColumns(source, schema, table string, cols []string) error {
newCols := make([]string, 0, len(cols))
for _, col := range cols {
Expand Down
2 changes: 2 additions & 0 deletions dm/pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ const (
codeSyncerParseDDL
codeSyncerUnsupportedStmt
codeSyncerGetEvent
codeSyncerDownstreamTableNotFound
)

// DM-master error code.
Expand Down Expand Up @@ -1127,6 +1128,7 @@ var (
ErrSyncerParseDDL = New(codeSyncerParseDDL, ClassSyncUnit, ScopeInternal, LevelHigh, "parse DDL: %s", "Please confirm your DDL statement is correct and needed. For TiDB compatible DDL, see https://docs.pingcap.com/tidb/stable/mysql-compatibility#ddl. You can use `handle-error` command to skip or replace the DDL or add a binlog filter rule to ignore it if the DDL is not needed.")
ErrSyncerUnsupportedStmt = New(codeSyncerUnsupportedStmt, ClassSyncUnit, ScopeInternal, LevelHigh, "`%s` statement not supported in %s mode", "")
ErrSyncerGetEvent = New(codeSyncerGetEvent, ClassSyncUnit, ScopeUpstream, LevelHigh, "get binlog event error: %v", "Please check if the binlog file could be parsed by `mysqlbinlog`.")
ErrSyncerDownstreamTableNotFound = New(codeSyncerDownstreamTableNotFound, ClassSyncUnit, ScopeInternal, LevelHigh, "downstream table %s not found", "")

// DM-master error.
ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid", "")
Expand Down
7 changes: 7 additions & 0 deletions dm/pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,3 +671,10 @@ func GetMaxConnectionsForConn(ctx context.Context, conn *sql.Conn) (int, error)
func IsMariaDB(version string) bool {
return strings.Contains(strings.ToUpper(version), "MARIADB")
}

// CreateTableSQLToOneRow formats the result of SHOW CREATE TABLE to one row.
func CreateTableSQLToOneRow(sql string) string {
sql = strings.ReplaceAll(sql, "\n", "")
sql = strings.ReplaceAll(sql, " ", " ")
Comment on lines +677 to +678
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if there are column defaults contains those char.

seems only used in get schema, ok for now.

return sql
}
6 changes: 6 additions & 0 deletions dm/pkg/utils/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,3 +497,9 @@ func TestIsMariaDB(t *testing.T) {
require.True(t, IsMariaDB("5.5.50-MariaDB-1~wheezy"))
require.False(t, IsMariaDB("5.7.19-17-log"))
}

func TestCreateTableSQLToOneRow(t *testing.T) {
input := "CREATE TABLE `t1` (\n `id` bigint(20) NOT NULL,\n `c1` varchar(20) DEFAULT NULL,\n `c2` varchar(20) DEFAULT NULL,\n PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */\n) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin"
expected := "CREATE TABLE `t1` ( `id` bigint(20) NOT NULL, `c1` varchar(20) DEFAULT NULL, `c2` varchar(20) DEFAULT NULL, PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin"
require.Equal(t, expected, CreateTableSQLToOneRow(input))
}
Loading