Skip to content

Commit

Permalink
check checkpoint schema (pingcap#354)
Browse files Browse the repository at this point in the history
Co-authored-by: kennytm <[email protected]>
  • Loading branch information
glorv and kennytm authored Jul 28, 2020
1 parent 94f7216 commit 1dc2d6a
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 31 deletions.
66 changes: 35 additions & 31 deletions lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,15 @@ const WholeTableEngineID = math.MaxInt32
const (
// the table names to store each kind of checkpoint in the checkpoint database
// remember to increase the version number in case of incompatible change.
checkpointTableNameTable = "table_v6"
checkpointTableNameEngine = "engine_v5"
checkpointTableNameChunk = "chunk_v4"
CheckpointTableNameTable = "table_v6"
CheckpointTableNameEngine = "engine_v5"
CheckpointTableNameChunk = "chunk_v4"
)

func IsCheckpointTable(name string) bool {
return name == CheckpointTableNameTable || name == CheckpointTableNameEngine || name == CheckpointTableNameChunk
}

func (status CheckpointStatus) MetricName() string {
switch status {
case CheckpointStatusLoaded:
Expand Down Expand Up @@ -403,7 +407,7 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string, t
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX(task_id)
);
`, schema, checkpointTableNameTable))
`, schema, CheckpointTableNameTable))
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -417,7 +421,7 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string, t
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(table_name, engine_id DESC)
);
`, schema, checkpointTableNameEngine))
`, schema, CheckpointTableNameEngine))
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -441,7 +445,7 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string, t
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(table_name, engine_id, path(500), offset)
);
`, schema, checkpointTableNameChunk))
`, schema, CheckpointTableNameChunk))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -471,7 +475,7 @@ func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, dbInfo map[strin
WHEN hash = VALUES(hash)
THEN VALUES(task_id)
END;
`, cpdb.schema, checkpointTableNameTable))
`, cpdb.schema, CheckpointTableNameTable))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -514,7 +518,7 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab

engineQuery := fmt.Sprintf(`
SELECT engine_id, status FROM %s.%s WHERE table_name = ? ORDER BY engine_id DESC;
`, cpdb.schema, checkpointTableNameEngine)
`, cpdb.schema, CheckpointTableNameEngine)
engineRows, err := tx.QueryContext(c, engineQuery, tableName)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -545,7 +549,7 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab
kvc_bytes, kvc_kvs, kvc_checksum, unix_timestamp(create_time)
FROM %s.%s WHERE table_name = ?
ORDER BY engine_id, path, offset;
`, cpdb.schema, checkpointTableNameChunk)
`, cpdb.schema, CheckpointTableNameChunk)
chunkRows, err := tx.QueryContext(c, chunkQuery, tableName)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -581,7 +585,7 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab

tableQuery := fmt.Sprintf(`
SELECT status, alloc_base, table_id FROM %s.%s WHERE table_name = ?
`, cpdb.schema, checkpointTableNameTable)
`, cpdb.schema, CheckpointTableNameTable)
tableRow := tx.QueryRowContext(c, tableQuery, tableName)

var status uint8
Expand All @@ -606,7 +610,7 @@ func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tab
err := s.Transact(ctx, "update engine checkpoints", func(c context.Context, tx *sql.Tx) error {
engineStmt, err := tx.PrepareContext(c, fmt.Sprintf(`
REPLACE INTO %s.%s (table_name, engine_id, status) VALUES (?, ?, ?);
`, cpdb.schema, checkpointTableNameEngine))
`, cpdb.schema, CheckpointTableNameEngine))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -624,7 +628,7 @@ func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tab
?, ?, ?, ?,
0, 0, 0, from_unixtime(?)
);
`, cpdb.schema, checkpointTableNameChunk))
`, cpdb.schema, CheckpointTableNameChunk))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -661,16 +665,16 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi
chunkQuery := fmt.Sprintf(`
UPDATE %s.%s SET pos = ?, prev_rowid_max = ?, kvc_bytes = ?, kvc_kvs = ?, kvc_checksum = ?
WHERE (table_name, engine_id, path, offset) = (?, ?, ?, ?);
`, cpdb.schema, checkpointTableNameChunk)
`, cpdb.schema, CheckpointTableNameChunk)
rebaseQuery := fmt.Sprintf(`
UPDATE %s.%s SET alloc_base = GREATEST(?, alloc_base) WHERE table_name = ?;
`, cpdb.schema, checkpointTableNameTable)
`, cpdb.schema, CheckpointTableNameTable)
tableStatusQuery := fmt.Sprintf(`
UPDATE %s.%s SET status = ? WHERE table_name = ?;
`, cpdb.schema, checkpointTableNameTable)
`, cpdb.schema, CheckpointTableNameTable)
engineStatusQuery := fmt.Sprintf(`
UPDATE %s.%s SET status = ? WHERE (table_name, engine_id) = (?, ?);
`, cpdb.schema, checkpointTableNameEngine)
`, cpdb.schema, CheckpointTableNameEngine)

s := common.SQLWithRetry{DB: cpdb.db, Logger: log.L()}
err := s.Transact(context.Background(), "update checkpoints", func(c context.Context, tx *sql.Tx) error {
Expand Down Expand Up @@ -974,9 +978,9 @@ func (cpdb *MySQLCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName
return s.Exec(ctx, "remove all checkpoints", "DROP SCHEMA "+cpdb.schema)
}

deleteChunkQuery := fmt.Sprintf("DELETE FROM %s.%s WHERE table_name = ?", cpdb.schema, checkpointTableNameChunk)
deleteEngineQuery := fmt.Sprintf("DELETE FROM %s.%s WHERE table_name = ?", cpdb.schema, checkpointTableNameEngine)
deleteTableQuery := fmt.Sprintf("DELETE FROM %s.%s WHERE table_name = ?", cpdb.schema, checkpointTableNameTable)
deleteChunkQuery := fmt.Sprintf("DELETE FROM %s.%s WHERE table_name = ?", cpdb.schema, CheckpointTableNameChunk)
deleteEngineQuery := fmt.Sprintf("DELETE FROM %s.%s WHERE table_name = ?", cpdb.schema, CheckpointTableNameEngine)
deleteTableQuery := fmt.Sprintf("DELETE FROM %s.%s WHERE table_name = ?", cpdb.schema, CheckpointTableNameTable)

return s.Transact(ctx, "remove checkpoints", func(c context.Context, tx *sql.Tx) error {
if _, e := tx.ExecContext(c, deleteChunkQuery, tableName); e != nil {
Expand All @@ -1003,9 +1007,9 @@ func (cpdb *MySQLCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int6
}

createSchemaQuery := "CREATE SCHEMA IF NOT EXISTS " + newSchema
moveChunkQuery := fmt.Sprintf("RENAME TABLE %[1]s.%[3]s TO %[2]s.%[3]s", cpdb.schema, newSchema, checkpointTableNameChunk)
moveEngineQuery := fmt.Sprintf("RENAME TABLE %[1]s.%[3]s TO %[2]s.%[3]s", cpdb.schema, newSchema, checkpointTableNameEngine)
moveTableQuery := fmt.Sprintf("RENAME TABLE %[1]s.%[3]s TO %[2]s.%[3]s", cpdb.schema, newSchema, checkpointTableNameTable)
moveChunkQuery := fmt.Sprintf("RENAME TABLE %[1]s.%[3]s TO %[2]s.%[3]s", cpdb.schema, newSchema, CheckpointTableNameChunk)
moveEngineQuery := fmt.Sprintf("RENAME TABLE %[1]s.%[3]s TO %[2]s.%[3]s", cpdb.schema, newSchema, CheckpointTableNameEngine)
moveTableQuery := fmt.Sprintf("RENAME TABLE %[1]s.%[3]s TO %[2]s.%[3]s", cpdb.schema, newSchema, CheckpointTableNameTable)

if e := s.Exec(ctx, "create backup checkpoints schema", createSchemaQuery); e != nil {
return e
Expand Down Expand Up @@ -1034,10 +1038,10 @@ func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, table

engineQuery := fmt.Sprintf(`
UPDATE %s.%s SET status = %d WHERE %s = ? AND status <= %d;
`, cpdb.schema, checkpointTableNameEngine, CheckpointStatusLoaded, colName, CheckpointStatusMaxInvalid)
`, cpdb.schema, CheckpointTableNameEngine, CheckpointStatusLoaded, colName, CheckpointStatusMaxInvalid)
tableQuery := fmt.Sprintf(`
UPDATE %s.%s SET status = %d WHERE %s = ? AND status <= %d;
`, cpdb.schema, checkpointTableNameTable, CheckpointStatusLoaded, colName, CheckpointStatusMaxInvalid)
`, cpdb.schema, CheckpointTableNameTable, CheckpointStatusLoaded, colName, CheckpointStatusMaxInvalid)

s := common.SQLWithRetry{
DB: cpdb.db,
Expand Down Expand Up @@ -1077,16 +1081,16 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl
LEFT JOIN %[1]s.%[5]s e ON t.table_name = e.table_name
WHERE %[2]s = ? AND t.status <= %[3]d
GROUP BY t.table_name;
`, cpdb.schema, aliasedColName, CheckpointStatusMaxInvalid, checkpointTableNameTable, checkpointTableNameEngine)
`, cpdb.schema, aliasedColName, CheckpointStatusMaxInvalid, CheckpointTableNameTable, CheckpointTableNameEngine)
deleteChunkQuery := fmt.Sprintf(`
DELETE FROM %[1]s.%[4]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[5]s WHERE %[2]s = ? AND status <= %[3]d)
`, cpdb.schema, colName, CheckpointStatusMaxInvalid, checkpointTableNameChunk, checkpointTableNameTable)
`, cpdb.schema, colName, CheckpointStatusMaxInvalid, CheckpointTableNameChunk, CheckpointTableNameTable)
deleteEngineQuery := fmt.Sprintf(`
DELETE FROM %[1]s.%[4]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[5]s WHERE %[2]s = ? AND status <= %[3]d)
`, cpdb.schema, colName, CheckpointStatusMaxInvalid, checkpointTableNameEngine, checkpointTableNameTable)
`, cpdb.schema, colName, CheckpointStatusMaxInvalid, CheckpointTableNameEngine, CheckpointTableNameTable)
deleteTableQuery := fmt.Sprintf(`
DELETE FROM %s.%s WHERE %s = ? AND status <= %d
`, cpdb.schema, checkpointTableNameTable, colName, CheckpointStatusMaxInvalid)
`, cpdb.schema, CheckpointTableNameTable, colName, CheckpointStatusMaxInvalid)

var targetTables []DestroyedTableCheckpoint

Expand Down Expand Up @@ -1143,7 +1147,7 @@ func (cpdb *MySQLCheckpointsDB) DumpTables(ctx context.Context, writer io.Writer
create_time,
update_time
FROM %s.%s;
`, cpdb.schema, checkpointTableNameTable))
`, cpdb.schema, CheckpointTableNameTable))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1161,7 +1165,7 @@ func (cpdb *MySQLCheckpointsDB) DumpEngines(ctx context.Context, writer io.Write
create_time,
update_time
FROM %s.%s;
`, cpdb.schema, checkpointTableNameEngine))
`, cpdb.schema, CheckpointTableNameEngine))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1187,7 +1191,7 @@ func (cpdb *MySQLCheckpointsDB) DumpChunks(ctx context.Context, writer io.Writer
create_time,
update_time
FROM %s.%s;
`, cpdb.schema, checkpointTableNameChunk))
`, cpdb.schema, CheckpointTableNameChunk))
if err != nil {
return errors.Trace(err)
}
Expand Down
25 changes: 25 additions & 0 deletions lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"syscall"
"time"

"github.com/pingcap/tidb-lightning/lightning/checkpoints"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -217,6 +219,13 @@ func (l *Lightning) run(taskCfg *config.Config) (err error) {
log.L().Error("check system requirements failed", zap.Error(err))
return errors.Trace(err)
}
// check table schema conflicts
err = checkSchemaConflict(taskCfg, mdl.GetDatabases())
if err != nil {
log.L().Error("checkpoint schema conflicts with data files", zap.Error(err))
return errors.Trace(err)
}

dbMetas := mdl.GetDatabases()
web.BroadcastInitProgress(dbMetas)

Expand Down Expand Up @@ -590,3 +599,19 @@ func checkSystemRequirement(cfg *config.Config, dbsMeta []*mydump.MDDatabaseMeta

return nil
}

/// checkSchemaConflict return error if checkpoint table scheme is conflict with data files
func checkSchemaConflict(cfg *config.Config, dbsMeta []*mydump.MDDatabaseMeta) error {
if cfg.Checkpoint.Enable && cfg.Checkpoint.Driver == config.CheckpointDriverMySQL {
for _, db := range dbsMeta {
if db.Name == cfg.Checkpoint.Schema {
for _, tb := range db.Tables {
if checkpoints.IsCheckpointTable(tb.Name) {
return errors.Errorf("checkpoint table `%s`.`%s` conflict with data files. Please change the `checkpoint.schema` config or set `checkpoint.driver` to \"file\" instead", db.Name, tb.Name)
}
}
}
}
}
return nil
}
56 changes: 56 additions & 0 deletions lightning/lightning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"github.com/pingcap/tidb-lightning/lightning/checkpoints"

"github.com/pingcap/tidb-lightning/lightning/mydump"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -461,3 +463,57 @@ func (s *lightningServerSuite) TestCheckSystemRequirement(c *C) {
err = checkSystemRequirement(cfg, dbMetas)
c.Assert(err, IsNil)
}

func (s *lightningServerSuite) TestCheckSchemaConflict(c *C) {
cfg := config.NewConfig()
cfg.Checkpoint.Schema = "cp"
cfg.Checkpoint.Driver = config.CheckpointDriverMySQL

dbMetas := []*mydump.MDDatabaseMeta{
{
Name: "test",
Tables: []*mydump.MDTableMeta{
{
Name: checkpoints.CheckpointTableNameTable,
},
{
Name: checkpoints.CheckpointTableNameEngine,
},
},
},
{
Name: "cp",
Tables: []*mydump.MDTableMeta{
{
Name: "test",
},
},
},
}
err := checkSchemaConflict(cfg, dbMetas)
c.Assert(err, IsNil)

dbMetas = append(dbMetas, &mydump.MDDatabaseMeta{
Name: "cp",
Tables: []*mydump.MDTableMeta{
{
Name: checkpoints.CheckpointTableNameChunk,
},
{
Name: "test123",
},
},
})
err = checkSchemaConflict(cfg, dbMetas)
c.Assert(err, NotNil)

cfg.Checkpoint.Enable = false
err = checkSchemaConflict(cfg, dbMetas)
c.Assert(err, IsNil)

cfg.Checkpoint.Enable = true
cfg.Checkpoint.Driver = config.CheckpointDriverFile
err = checkSchemaConflict(cfg, dbMetas)
c.Assert(err, IsNil)

}

0 comments on commit 1dc2d6a

Please sign in to comment.