-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
loader(dm): simplify lightning checkpoint and add clean meta #3813
Changes from 9 commits
86a31e7
f849a07
8201a52
902dca6
ea02fbc
62a73ca
7ee0078
adf8a7b
44f90ee
174efba
ec90513
f2ea0cd
83156f2
09df71e
5c489fd
cbf0641
020d020
8224103
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,10 +32,6 @@ import ( | |
"go.uber.org/zap" | ||
) | ||
|
||
const ( | ||
LightningCheckpointListName = "lightning_checkpoint_list" | ||
) | ||
|
||
// CheckPoint represents checkpoint status. | ||
type CheckPoint interface { | ||
// Load loads all checkpoints recorded before. | ||
|
@@ -470,19 +466,44 @@ func (cp *RemoteCheckPoint) String() string { | |
return string(bytes) | ||
} | ||
|
||
type lightingLoadStatus int | ||
|
||
const ( | ||
lightningStatusInit lightingLoadStatus = iota | ||
lightningStatusRunning | ||
lightningStatusFinished | ||
) | ||
|
||
func (s lightingLoadStatus) String() string { | ||
switch s { | ||
case lightningStatusInit: | ||
return "init" | ||
case lightningStatusRunning: | ||
Ehco1996 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return "running" | ||
case lightningStatusFinished: | ||
return "finished" | ||
default: | ||
panic(fmt.Sprintf("unknown lightning load stauts '%d'", s)) | ||
} | ||
} | ||
|
||
type LightningCheckpointList struct { | ||
db *conn.BaseDB | ||
schema string | ||
tableName string | ||
logger log.Logger | ||
db *conn.BaseDB | ||
schema string | ||
tableName string | ||
taskName string | ||
sourceName string | ||
logger log.Logger | ||
} | ||
|
||
func NewLightningCheckpointList(db *conn.BaseDB, metaSchema string) *LightningCheckpointList { | ||
func NewLightningCheckpointList(db *conn.BaseDB, taskName, sourceName, metaSchema string) *LightningCheckpointList { | ||
return &LightningCheckpointList{ | ||
db: db, | ||
schema: dbutil.ColumnName(metaSchema), | ||
tableName: dbutil.TableName(metaSchema, LightningCheckpointListName), | ||
logger: log.L().WithFields(zap.String("component", "lightning checkpoint database list")), | ||
db: db, | ||
schema: dbutil.ColumnName(metaSchema), | ||
tableName: dbutil.TableName(metaSchema, cputil.LightningCheckpoint(taskName)), | ||
taskName: taskName, | ||
sourceName: sourceName, | ||
logger: log.L().WithFields(zap.String("component", "lightning checkpoint database list")), | ||
} | ||
} | ||
|
||
|
@@ -491,35 +512,38 @@ func (cp *LightningCheckpointList) Prepare(ctx context.Context) error { | |
if err != nil { | ||
return terror.WithScope(terror.Annotate(err, "initialize connection when prepare"), terror.ScopeDownstream) | ||
} | ||
defer conn.CloseBaseConnWithoutErr(cp.db, connection) | ||
|
||
createSchema := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", cp.schema) | ||
tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint"))) | ||
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{createSchema}) | ||
if err != nil { | ||
return err | ||
} | ||
createTable := `CREATE TABLE IF NOT EXISTS %s ( | ||
worker_name varchar(255) NOT NULL, | ||
task_name varchar(255) NOT NULL, | ||
PRIMARY KEY (task_name, worker_name) | ||
source_name varchar(255) NOT NULL, | ||
status tinyint NOT NULL DEFAULT 0, | ||
PRIMARY KEY (task_name, source_name) | ||
); | ||
` | ||
sql2 := fmt.Sprintf(createTable, cp.tableName) | ||
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql2}) | ||
return terror.WithScope(err, terror.ScopeDownstream) | ||
} | ||
|
||
func (cp *LightningCheckpointList) RegisterCheckPoint(ctx context.Context, workerName, taskName string) error { | ||
func (cp *LightningCheckpointList) RegisterCheckPoint(ctx context.Context) error { | ||
connection, err := cp.db.GetBaseConn(ctx) | ||
if err != nil { | ||
return terror.WithScope(terror.Annotate(err, "initialize connection"), terror.ScopeDownstream) | ||
} | ||
defer conn.CloseBaseConnWithoutErr(cp.db, connection) | ||
|
||
sql := fmt.Sprintf("INSERT IGNORE INTO %s (`worker_name`, `task_name`) VALUES(?,?)", cp.tableName) | ||
sql := fmt.Sprintf("INSERT IGNORE INTO %s (`task_name`, `source_name`) VALUES (?, ?)", cp.tableName) | ||
cp.logger.Info("initial checkpoint record", | ||
zap.String("sql", sql), | ||
zap.String("worker-name", workerName), | ||
zap.String("task-name", taskName)) | ||
args := []interface{}{workerName, taskName} | ||
zap.String("task", cp.taskName), | ||
zap.String("source", cp.sourceName)) | ||
args := []interface{}{cp.taskName, cp.sourceName} | ||
tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint"))) | ||
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql}, args) | ||
if err != nil { | ||
|
@@ -528,34 +552,61 @@ func (cp *LightningCheckpointList) RegisterCheckPoint(ctx context.Context, worke | |
return nil | ||
} | ||
|
||
func (cp *LightningCheckpointList) RemoveTaskCheckPoint(ctx context.Context, taskName string) error { | ||
func (cp *LightningCheckpointList) UpdateStatus(ctx context.Context, status lightingLoadStatus) error { | ||
connection, err := cp.db.GetBaseConn(ctx) | ||
if err != nil { | ||
return terror.WithScope(terror.Annotate(err, "initialize connection"), terror.ScopeDownstream) | ||
} | ||
defer conn.CloseBaseConnWithoutErr(cp.db, connection) | ||
|
||
sql := fmt.Sprintf("UPDATE %s set status = ? WHERE `task_name` = ? AND `source_name` = ?", cp.tableName) | ||
cp.logger.Info("update lightning loader status", | ||
zap.String("task", cp.taskName), zap.String("source", cp.sourceName), | ||
zap.Stringer("status", status)) | ||
tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint"))) | ||
query := fmt.Sprintf("SELECT `worker_name` from %s where `task_name`=?", cp.tableName) | ||
rows, err := connection.QuerySQL(tctx, query, taskName) | ||
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql}, | ||
[]interface{}{status, cp.taskName, cp.sourceName}) | ||
if err != nil { | ||
return terror.WithScope(err, terror.ScopeDownstream) | ||
return terror.WithScope(terror.Annotate(err, "update lightning status"), terror.ScopeDownstream) | ||
} | ||
return nil | ||
} | ||
|
||
func (cp *LightningCheckpointList) taskStatus(ctx context.Context) (lightingLoadStatus, error) { | ||
connection, err := cp.db.GetBaseConn(ctx) | ||
if err != nil { | ||
return lightningStatusInit, terror.WithScope(terror.Annotate(err, "initialize connection"), terror.ScopeDownstream) | ||
} | ||
defer conn.CloseBaseConnWithoutErr(cp.db, connection) | ||
|
||
query := fmt.Sprintf("SELECT status FROM %s WHERE `task_name` = ? AND `source_name` = ?", cp.tableName) | ||
tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint"))) | ||
rows, err := connection.QuerySQL(tctx, query, cp.taskName, cp.sourceName) | ||
if err != nil { | ||
return lightningStatusInit, err | ||
} | ||
defer rows.Close() | ||
var workerName string | ||
for rows.Next() { | ||
err = rows.Scan(&workerName) | ||
if err != nil { | ||
return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream) | ||
} | ||
cpdb := config.TiDBLightningCheckpointPrefix + dbutil.TableName(workerName, taskName) | ||
sql := fmt.Sprintf("DROP DATABASE IF NOT EXISTS %s", cpdb) | ||
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql}) | ||
if err != nil { | ||
return terror.WithScope(err, terror.ScopeDownstream) | ||
if rows.Next() { | ||
var status lightingLoadStatus | ||
if err = rows.Scan(&status); err != nil { | ||
return lightningStatusInit, terror.WithScope(err, terror.ScopeDownstream) | ||
} | ||
return status, nil | ||
} | ||
query = fmt.Sprintf("DELETE from %s where `task_name`=?", cp.tableName) | ||
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{query}, []interface{}{taskName}) | ||
// status row doesn't exist, return default value | ||
return lightningStatusInit, nil | ||
} | ||
|
||
func (cp *LightningCheckpointList) RemoveTaskCheckPoint(ctx context.Context) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when is this function to be called ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No used now. It is orginally designed for being calling at cleanup meta, but the current design just call raw There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so we can delete this func? |
||
connection, err := cp.db.GetBaseConn(ctx) | ||
if err != nil { | ||
return terror.WithScope(terror.Annotate(err, "initialize connection"), terror.ScopeDownstream) | ||
} | ||
defer conn.CloseBaseConnWithoutErr(cp.db, connection) | ||
|
||
tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint"))) | ||
query := fmt.Sprintf("DELETE from %s where `task_name`=?", cp.tableName) | ||
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{query}, []interface{}{cp.taskName}) | ||
return terror.WithScope(err, terror.ScopeDownstream) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe reuse
CheckpointStatus
intidb/pkg/checkpoints
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean the real tidb-lightning's
CheckpointStatus
? I think because there are so many state in it, it maybe hard to understand of not familiar with tidb-lightningThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh my fault i got the two status mixed up, now i think the current implementaion make more sense
btw, how about direct use char like
init
,running
,finished
in db? maybe this can increase readabilityThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done