Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loader(dm): simplify lightning checkpoint and add clean meta #3813

Merged
merged 18 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,8 @@ func (s *Server) removeMetaData(ctx context.Context, taskName, metaSchema string
// clear loader and syncer checkpoints
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS %s",
dbutil.TableName(metaSchema, cputil.LoaderCheckpoint(taskName))))
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS %s",
dbutil.TableName(metaSchema, cputil.LightningCheckpoint(taskName))))
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS %s",
dbutil.TableName(metaSchema, cputil.SyncerCheckpoint(taskName))))
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS %s",
Expand Down
2 changes: 2 additions & 0 deletions dm/dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
mock := conn.InitMockDB(c)
mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LightningCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerShardMeta(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerOnlineDDL(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
Expand Down Expand Up @@ -1077,6 +1078,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
mock = conn.InitMockDB(c)
mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LightningCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerShardMeta(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerOnlineDDL(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
Expand Down
130 changes: 91 additions & 39 deletions dm/loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ import (
"go.uber.org/zap"
)

const (
LightningCheckpointListName = "lightning_checkpoint_list"
)

// CheckPoint represents checkpoint status.
type CheckPoint interface {
// Load loads all checkpoints recorded before.
Expand Down Expand Up @@ -470,19 +466,58 @@ func (cp *RemoteCheckPoint) String() string {
return string(bytes)
}

type lightingLoadStatus int

const (
lightningStatusInit lightingLoadStatus = iota
lightningStatusRunning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe reuse CheckpointStatus in tidb/pkg/checkpoints ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean the real tidb-lightning's CheckpointStatus ? I think because there are so many state in it, it maybe hard to understand of not familiar with tidb-lightning

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh my fault i got the two status mixed up, now i think the current implementaion make more sense

btw, how about direct use char like init,running , finished in db? maybe this can increase readability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

lightningStatusFinished
)

func (s lightingLoadStatus) String() string {
switch s {
case lightningStatusInit:
return "init"
case lightningStatusRunning:
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
return "running"
case lightningStatusFinished:
return "finished"
default:
panic(fmt.Sprintf("unknown lightning load stauts '%d'", s))
}
}

func parseLightningLoadStatus(s string) lightingLoadStatus {
switch s {
case "running":
return lightningStatusRunning
case "finished":
return lightningStatusFinished
case "init":
return lightningStatusInit
default:
log.L().Warn("unknown lightning load status, will fallback to init", zap.String("status", s))
return lightningStatusInit
}
}

type LightningCheckpointList struct {
db *conn.BaseDB
schema string
tableName string
logger log.Logger
db *conn.BaseDB
schema string
tableName string
taskName string
sourceName string
logger log.Logger
}

func NewLightningCheckpointList(db *conn.BaseDB, metaSchema string) *LightningCheckpointList {
func NewLightningCheckpointList(db *conn.BaseDB, taskName, sourceName, metaSchema string) *LightningCheckpointList {
return &LightningCheckpointList{
db: db,
schema: dbutil.ColumnName(metaSchema),
tableName: dbutil.TableName(metaSchema, LightningCheckpointListName),
logger: log.L().WithFields(zap.String("component", "lightning checkpoint database list")),
db: db,
schema: dbutil.ColumnName(metaSchema),
tableName: dbutil.TableName(metaSchema, cputil.LightningCheckpoint(taskName)),
taskName: taskName,
sourceName: sourceName,
logger: log.L().WithFields(zap.String("component", "lightning checkpoint database list")),
}
}

Expand All @@ -491,35 +526,38 @@ func (cp *LightningCheckpointList) Prepare(ctx context.Context) error {
if err != nil {
return terror.WithScope(terror.Annotate(err, "initialize connection when prepare"), terror.ScopeDownstream)
}
defer conn.CloseBaseConnWithoutErr(cp.db, connection)

createSchema := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", cp.schema)
tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint")))
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{createSchema})
if err != nil {
return err
}
createTable := `CREATE TABLE IF NOT EXISTS %s (
worker_name varchar(255) NOT NULL,
task_name varchar(255) NOT NULL,
PRIMARY KEY (task_name, worker_name)
source_name varchar(255) NOT NULL,
status varchar(10) NOT NULL DEFAULT 'init' COMMENT 'init,running,finished',
PRIMARY KEY (task_name, source_name)
);
`
sql2 := fmt.Sprintf(createTable, cp.tableName)
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql2})
return terror.WithScope(err, terror.ScopeDownstream)
}

func (cp *LightningCheckpointList) RegisterCheckPoint(ctx context.Context, workerName, taskName string) error {
func (cp *LightningCheckpointList) RegisterCheckPoint(ctx context.Context) error {
connection, err := cp.db.GetBaseConn(ctx)
if err != nil {
return terror.WithScope(terror.Annotate(err, "initialize connection"), terror.ScopeDownstream)
}
defer conn.CloseBaseConnWithoutErr(cp.db, connection)

sql := fmt.Sprintf("INSERT IGNORE INTO %s (`worker_name`, `task_name`) VALUES(?,?)", cp.tableName)
sql := fmt.Sprintf("INSERT IGNORE INTO %s (`task_name`, `source_name`) VALUES (?, ?)", cp.tableName)
cp.logger.Info("initial checkpoint record",
zap.String("sql", sql),
zap.String("worker-name", workerName),
zap.String("task-name", taskName))
args := []interface{}{workerName, taskName}
zap.String("task", cp.taskName),
zap.String("source", cp.sourceName))
args := []interface{}{cp.taskName, cp.sourceName}
tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint")))
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql}, args)
if err != nil {
Expand All @@ -528,35 +566,49 @@ func (cp *LightningCheckpointList) RegisterCheckPoint(ctx context.Context, worke
return nil
}

func (cp *LightningCheckpointList) RemoveTaskCheckPoint(ctx context.Context, taskName string) error {
func (cp *LightningCheckpointList) UpdateStatus(ctx context.Context, status lightingLoadStatus) error {
connection, err := cp.db.GetBaseConn(ctx)
if err != nil {
return terror.WithScope(terror.Annotate(err, "initialize connection"), terror.ScopeDownstream)
}
defer conn.CloseBaseConnWithoutErr(cp.db, connection)

sql := fmt.Sprintf("UPDATE %s set status = ? WHERE `task_name` = ? AND `source_name` = ?", cp.tableName)
cp.logger.Info("update lightning loader status",
zap.String("task", cp.taskName), zap.String("source", cp.sourceName),
zap.Stringer("status", status))
tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint")))
query := fmt.Sprintf("SELECT `worker_name` from %s where `task_name`=?", cp.tableName)
rows, err := connection.QuerySQL(tctx, query, taskName)
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql},
[]interface{}{status.String(), cp.taskName, cp.sourceName})
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
return terror.WithScope(terror.Annotate(err, "update lightning status"), terror.ScopeDownstream)
}
return nil
}

func (cp *LightningCheckpointList) taskStatus(ctx context.Context) (lightingLoadStatus, error) {
connection, err := cp.db.GetBaseConn(ctx)
if err != nil {
return lightningStatusInit, terror.WithScope(terror.Annotate(err, "initialize connection"), terror.ScopeDownstream)
}
defer conn.CloseBaseConnWithoutErr(cp.db, connection)

query := fmt.Sprintf("SELECT status FROM %s WHERE `task_name` = ? AND `source_name` = ?", cp.tableName)
tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint")))
rows, err := connection.QuerySQL(tctx, query, cp.taskName, cp.sourceName)
if err != nil {
return lightningStatusInit, err
}
defer rows.Close()
var workerName string
for rows.Next() {
err = rows.Scan(&workerName)
if err != nil {
return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
}
cpdb := config.TiDBLightningCheckpointPrefix + dbutil.TableName(workerName, taskName)
sql := fmt.Sprintf("DROP DATABASE IF NOT EXISTS %s", cpdb)
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql})
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
if rows.Next() {
var status string
if err = rows.Scan(&status); err != nil {
return lightningStatusInit, terror.WithScope(err, terror.ScopeDownstream)
}
return parseLightningLoadStatus(status), nil
}
query = fmt.Sprintf("DELETE from %s where `task_name`=?", cp.tableName)
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{query}, []interface{}{taskName})
return terror.WithScope(err, terror.ScopeDownstream)
// status row doesn't exist, return default value
return lightningStatusInit, nil
}

// Close implements CheckPoint.Close.
Expand Down
79 changes: 78 additions & 1 deletion dm/loader/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package loader

import (
"context"
"database/sql"
"fmt"
"os"
"strconv"
Expand All @@ -27,7 +29,10 @@ import (
"github.com/pingcap/ticdc/dm/pkg/cputil"
)

var _ = Suite(&testCheckPointSuite{})
var (
_ = Suite(&testCheckPointSuite{})
_ = Suite(&lightningCpListSuite{})
)

var (
schemaCreateSQL = ""
Expand Down Expand Up @@ -259,3 +264,75 @@ func (t *testCheckPointSuite) TestDeepCopy(c *C) {
cp.restoringFiles.pos["db"]["table"]["file3"] = []int64{0, 100}
c.Assert(ret, DeepEquals, map[string][]int64{"file": {10, 100}, "file2": {0, 100}})
}

type lightningCpListSuite struct {
mock sqlmock.Sqlmock
cpList *LightningCheckpointList
}

func (s *lightningCpListSuite) SetUpTest(c *C) {
s.mock = conn.InitMockDB(c)

baseDB, err := conn.DefaultDBProvider.Apply(&config.DBConfig{})
c.Assert(err, IsNil)

metaSchema := "dm_meta"
cpList := NewLightningCheckpointList(baseDB, "test_lightning", "source1", metaSchema)

s.cpList = cpList
}

func (s *lightningCpListSuite) TearDownTest(c *C) {
c.Assert(s.mock.ExpectationsWereMet(), IsNil)
}

func (s *lightningCpListSuite) TestLightningCheckpointListPrepare(c *C) {
ctx := context.Background()
s.mock.ExpectBegin()
s.mock.ExpectExec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s.*", s.cpList.schema)).WillReturnResult(sqlmock.NewResult(1, 1))
s.mock.ExpectCommit()
s.mock.ExpectBegin()
s.mock.ExpectExec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.*", s.cpList.tableName)).WillReturnResult(sqlmock.NewResult(1, 1))
s.mock.ExpectCommit()
err := s.cpList.Prepare(ctx)
c.Assert(err, IsNil)
}

func (s *lightningCpListSuite) TestLightningCheckpointListStatusInit(c *C) {
// no rows in target table, will return default status
s.mock.ExpectQuery(fmt.Sprintf("SELECT status FROM %s WHERE `task_name` = \\? AND `source_name` = \\?", s.cpList.tableName)).
WithArgs(s.cpList.taskName, s.cpList.sourceName).
WillReturnRows(sqlmock.NewRows([]string{"status"}).RowError(0, sql.ErrNoRows))
status, err := s.cpList.taskStatus(context.Background())
c.Assert(err, IsNil)
c.Assert(status, Equals, lightningStatusInit)
}

func (s *lightningCpListSuite) TestLightningCheckpointListStatusRunning(c *C) {
s.mock.ExpectQuery(fmt.Sprintf("SELECT status FROM %s WHERE `task_name` = \\? AND `source_name` = \\?", s.cpList.tableName)).
WithArgs(s.cpList.taskName, s.cpList.sourceName).
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("running"))
status, err := s.cpList.taskStatus(context.Background())
c.Assert(err, IsNil)
c.Assert(status, Equals, lightningStatusRunning)
}

func (s *lightningCpListSuite) TestLightningCheckpointListRegister(c *C) {
s.mock.ExpectBegin()
s.mock.ExpectExec(fmt.Sprintf("INSERT IGNORE INTO %s \\(`task_name`, `source_name`\\) VALUES \\(\\?, \\?\\)", s.cpList.tableName)).
WithArgs(s.cpList.taskName, s.cpList.sourceName).
WillReturnResult(sqlmock.NewResult(2, 1))
s.mock.ExpectCommit()
err := s.cpList.RegisterCheckPoint(context.Background())
c.Assert(err, IsNil)
}

func (s *lightningCpListSuite) TestLightningCheckpointListUpdateStatus(c *C) {
s.mock.ExpectBegin()
s.mock.ExpectExec(fmt.Sprintf("UPDATE %s set status = \\? WHERE `task_name` = \\? AND `source_name` = \\?", s.cpList.tableName)).
WithArgs("running", s.cpList.taskName, s.cpList.sourceName).
WillReturnResult(sqlmock.NewResult(3, 1))
s.mock.ExpectCommit()
err := s.cpList.UpdateStatus(context.Background(), lightningStatusRunning)
c.Assert(err, IsNil)
}
Loading