Skip to content

Commit

Permalink
vitess Online DDL atomic cut-over (vitessio#11460) (#106)
Browse files Browse the repository at this point in the history
* cut-over with sentry table



* wait for rename via channel; write extra transaction post LOCK



* add stage info



* reduced wait-for-pos timeout. Improved stage logic



* cleanup



* more cleanup



* even more cleanup



* context timeout



* preserve stage by disabling deferred stage changes



* killing rename query upon error



* increase test timeout



* fix defer ordering



* log.info



* add and populate cutover_attempts column



* search PROCESSLIST with LIKE



* code comment



* making a variable more local



* literal string

Signed-off-by: Shlomi Noach <[email protected]>
Co-authored-by: Shlomi Noach <[email protected]>
  • Loading branch information
austenLacy and shlomi-noach authored Jul 5, 2023
1 parent 508442d commit 67543ee
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ const (
maxConcurrency = 20
singleConnectionSleepInterval = 2 * time.Millisecond
countIterations = 5
migrationWaitTimeout = 60 * time.Second
)

func resetOpOrder() {
Expand Down Expand Up @@ -344,7 +345,7 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str
assert.NoError(t, err)

if !strategySetting.Strategy.IsDirect() {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 30*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, migrationWaitTimeout, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
}

Expand Down
245 changes: 194 additions & 51 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,22 @@ func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.Online
return acceptableErrorCodeFound, nil
}

// doesConnectionInfoMatch checks if theres a MySQL connection in PROCESSLIST whose Info matches given text
func (e *Executor) doesConnectionInfoMatch(ctx context.Context, connID int64, submatch string) (bool, error) {
findProcessQuery, err := sqlparser.ParseAndBind(sqlFindProcess,
sqltypes.Int64BindVariable(connID),
sqltypes.StringBindVariable("%"+submatch+"%"),
)
if err != nil {
return false, err
}
rs, err := e.execQuery(ctx, findProcessQuery)
if err != nil {
return false, err
}
return len(rs.Rows) == 1, nil
}

// validateTableForAlterAction checks whether a table is good to undergo a ALTER operation. It returns detailed error if not.
func (e *Executor) validateTableForAlterAction(ctx context.Context, onlineDDL *schema.OnlineDDL) (err error) {
// Validate table does not participate in foreign key relationship:
Expand Down Expand Up @@ -718,6 +734,10 @@ func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) err

// cutOverVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration
func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) error {
if err := e.incrementCutoverAttempts(ctx, s.workflow); err != nil {
return err
}

tmClient := e.tabletManagerClient()
defer tmClient.Close()

Expand All @@ -739,31 +759,101 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
return err
}
isVreplicationTestSuite := onlineDDL.StrategySetting().IsVreplicationTestSuite()
e.updateMigrationStage(ctx, onlineDDL.UUID, "starting cut-over")

var sentryTableName string

waitForPos := func(s *VReplStream, pos mysql.Position) error {
ctx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()
// Wait for target to reach the up-to-date pos
if err := tmClient.VReplicationWaitForPos(ctx, tablet.Tablet, int(s.id), mysql.EncodePosition(pos)); err != nil {
return err
}
// Target is now in sync with source!
return nil
}

// A bit early on, we generate names for stowaway and temporary tables
// We do this here because right now we're in a safe place where nothing happened yet. If there's an error now, bail out
// and no harm done.
// Later on, when traffic is blocked and tables renamed, that's a more dangerous place to be in; we want as little logic
// in that place as possible.
var stowawayTableName string
if !isVreplicationTestSuite {
stowawayTableName, err = schema.GenerateGCTableName(schema.HoldTableGCState, newGCTableRetainTime())
// A bit early on, we generate names for stowaway and temporary tables
// We do this here because right now we're in a safe place where nothing happened yet. If there's an error now, bail out
// and no harm done.
// Later on, when traffic is blocked and tables renamed, that's a more dangerous place to be in; we want as little logic
// in that place as possible.
sentryTableName, err = schema.GenerateGCTableName(schema.HoldTableGCState, newGCTableRetainTime())
if err != nil {
return nil
}

// We create the sentry table before toggling writes, because this involves a WaitForPos, which takes some time. We
// don't want to overload the buffering time with this excessive wait.

if err := e.updateArtifacts(ctx, onlineDDL.UUID, sentryTableName); err != nil {
return err
}
// Audit stowawayTableName. If operation is complete, we remove the audit. But if this tablet fails while
// the original table is renamed (into stowaway table), then this will be both the evidence and the information we need
// to restore the table back into existence. This can (and will) be done by a different vttablet process
if err := e.updateMigrationStowawayTable(ctx, onlineDDL.UUID, stowawayTableName); err != nil {
parsed := sqlparser.BuildParsedQuery(sqlCreateSentryTable, sentryTableName)
if _, err := e.execQuery(ctx, parsed.Query); err != nil {
return err
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "sentry table created: %s", sentryTableName)

postSentryPos, err := e.primaryPosition(ctx)
if err != nil {
return err
}
defer e.updateMigrationStowawayTable(ctx, onlineDDL.UUID, "")
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for post-sentry pos: %v", mysql.EncodePosition(postSentryPos))
if err := waitForPos(s, postSentryPos); err != nil {
return err
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "post-sentry pos reached")
}

lockConn, err := e.pool.Get(ctx, nil)
if err != nil {
return err
}
defer lockConn.Recycle()
defer lockConn.Exec(ctx, sqlUnlockTables, 1, false)

renameConn, err := e.pool.Get(ctx, nil)
if err != nil {
return err
}
defer renameConn.Recycle()
defer renameConn.Kill("premature exit while renaming tables", 0)
renameQuery := sqlparser.BuildParsedQuery(sqlSwapTables, onlineDDL.Table, sentryTableName, vreplTable, onlineDDL.Table, sentryTableName, vreplTable)

waitForRenameProcess := func() error {
// This function waits until it finds the RENAME TABLE... query running in MySQL's PROCESSLIST, or until timeout
// The function assumes that one of the renamed tables is locked, thus causing the RENAME to block. If nothing
// is locked, then the RENAME will be near-instantaneious and it's unlikely that the function will find it.
renameWaitCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()

for {
renameProcessFound, err := e.doesConnectionInfoMatch(renameWaitCtx, renameConn.ID(), "rename")
if err != nil {
return err
}
if renameProcessFound {
return nil
}
select {
case <-renameWaitCtx.Done():
return vterrors.Errorf(vtrpcpb.Code_ABORTED, "timeout for rename query: %s", renameQuery.Query)
case <-time.After(time.Second):
// sleep
}
}
}

renameCompleteChan := make(chan error)

bufferingCtx, bufferingContextCancel := context.WithCancel(ctx)
defer bufferingContextCancel()
// Preparation is complete. We proceed to cut-over.
toggleBuffering := func(bufferQueries bool) error {
log.Infof("toggling buffering: %t in migration %v", bufferQueries, onlineDDL.UUID)
e.toggleBufferTableFunc(bufferingCtx, onlineDDL.Table, bufferQueries)
if !bufferQueries {
// called after new table is in place.
Expand All @@ -774,27 +864,31 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
return err
}
}
log.Infof("toggled buffering: %t in migration %v", bufferQueries, onlineDDL.UUID)
return nil
}

var reenableOnce sync.Once
reenableWritesOnce := func() {
reenableOnce.Do(func() {
log.Infof("re-enabling writes in migration %v", onlineDDL.UUID)
toggleBuffering(false)
})
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "buffering queries")
// stop writes on source:
err = toggleBuffering(true)
defer reenableWritesOnce()
if err != nil {
return err
}

// swap out the table
// Give a fraction of a second for a scenario where a query is in
// query executor, it passed the ACLs and is _about to_ execute. This will be nicer to those queries:
// they will be able to complete before the rename, rather than block briefly on the rename only to find
// the table no longer exists.
e.updateMigrationStage(ctx, onlineDDL.UUID, "graceful wait for buffering")
time.Sleep(100 * time.Millisecond)

if isVreplicationTestSuite {
// The testing suite may inject queries internally from the server via a recurring EVENT.
// Those queries are unaffected by query rules (ACLs) because they don't go through Vitess.
Expand All @@ -805,30 +899,41 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
if _, err := e.execQuery(ctx, parsed.Query); err != nil {
return err
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "test suite 'before' table renamed")
} else {
// real production
parsed := sqlparser.BuildParsedQuery(sqlRenameTable, onlineDDL.Table, stowawayTableName)
if _, err := e.execQuery(ctx, parsed.Query); err != nil {

e.updateMigrationStage(ctx, onlineDDL.UUID, "locking tables")
lockCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()
lockTableQuery := sqlparser.BuildParsedQuery(sqlLockTwoTablesWrite, sentryTableName, onlineDDL.Table)
if _, err := lockConn.Exec(lockCtx, lockTableQuery.Query, 1, false); err != nil {
return err
}
}

// We have just created a gaping hole, the original table does not exist.
// we expect to fill that hole by swapping in the vrepl table. But if anything goes wrong we prepare
// to rename the table back:
defer func() {
if _, err := e.renameTableIfApplicable(ctx, stowawayTableName, onlineDDL.Table); err != nil {
vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "cannot rename back swapped table: %v into %v: %v", stowawayTableName, onlineDDL.Table, err)
e.updateMigrationStage(ctx, onlineDDL.UUID, "renaming tables")
go func() {
_, err := renameConn.Exec(ctx, renameQuery.Query, 1, false)
renameCompleteChan <- err
}()
// the rename should block, because of the LOCK. Wait for it to show up.
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for RENAME to block")
if err := waitForRenameProcess(); err != nil {
return err
}
}()
// Right now: new queries are buffered, any existing query will have executed, and worst case scenario is
// that some leftover query finds the table is not actually there anymore...
// At any case, there's definitely no more writes to the table since it does not exist. We can
// safely take the (GTID) pos now.
e.updateMigrationStage(ctx, onlineDDL.UUID, "RENAME found")
}

e.updateMigrationStage(ctx, onlineDDL.UUID, "reading post-lock pos")
postWritesPos, err := e.primaryPosition(ctx)
if err != nil {
return err
}

// Right now: new queries are buffered, any existing query will have executed, and worst case scenario is
// that some leftover query finds the table is not actually there anymore...
// At any case, there's definitely no more writes to the table since it does not exist. We can
// safely take the (GTID) pos now.
_ = e.updateMigrationTimestamp(ctx, "liveness_timestamp", s.workflow)

// Writes are now disabled on table. Read up-to-date vreplication info, specifically to get latest (and fixed) pos:
Expand All @@ -837,21 +942,13 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
return err
}

waitForPos := func() error {
ctx, cancel := context.WithTimeout(ctx, 2*vreplicationCutOverThreshold)
defer cancel()
// Wait for target to reach the up-to-date pos
if err := tmClient.VReplicationWaitForPos(ctx, tablet.Tablet, int(s.id), mysql.EncodePosition(postWritesPos)); err != nil {
return err
}
// Target is now in sync with source!
return nil
}
log.Infof("VReplication migration %v waiting for position %v", s.workflow, mysql.EncodePosition(postWritesPos))
if err := waitForPos(); err != nil {
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for post-lock pos: %v", mysql.EncodePosition(postWritesPos))
if err := waitForPos(s, postWritesPos); err != nil {
e.updateMigrationStage(ctx, onlineDDL.UUID, "timeout while waiting for post-lock pos: %v", err)
return err
}
// Stop vreplication
e.updateMigrationStage(ctx, onlineDDL.UUID, "stopping vreplication")
if _, err := e.vreplicationExec(ctx, tablet.Tablet, binlogplayer.StopVReplication(uint32(s.id), "stopped for online DDL cutover")); err != nil {
return err
}
Expand All @@ -865,23 +962,43 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
if _, err := e.execQuery(ctx, parsed.Query); err != nil {
return err
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "test suite 'after' table renamed")
} else {
// Normal (non-testing) alter table
conn, err := dbconnpool.NewDBConnection(ctx, e.env.Config().DB.DbaWithDB())
if err != nil {
e.updateMigrationStage(ctx, onlineDDL.UUID, "validating rename is still in place")
if err := waitForRenameProcess(); err != nil {
return err
}
defer conn.Close()

parsed := sqlparser.BuildParsedQuery(sqlRenameTwoTables,
vreplTable, onlineDDL.Table,
stowawayTableName, vreplTable,
)
if _, err := e.execQuery(ctx, parsed.Query); err != nil {
return err
// Normal (non-testing) alter table
e.updateMigrationStage(ctx, onlineDDL.UUID, "dropping sentry table")

{
dropTableQuery := sqlparser.BuildParsedQuery(sqlDropTable, sentryTableName)
lockCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()
if _, err := lockConn.Exec(lockCtx, dropTableQuery.Query, 1, false); err != nil {
return err
}
}
{
lockCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()
e.updateMigrationStage(ctx, onlineDDL.UUID, "unlocking tables")
if _, err := lockConn.Exec(lockCtx, sqlUnlockTables, 1, false); err != nil {
return err
}
}
{
lockCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()
e.updateMigrationStage(lockCtx, onlineDDL.UUID, "waiting for RENAME to complete")
if err := <-renameCompleteChan; err != nil {
return err
}
}
}
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "cut-over complete")
e.ownedRunningMigrations.Delete(onlineDDL.UUID)

go func() {
Expand All @@ -896,12 +1013,12 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
}()

// Tables are now swapped! Migration is successful
e.updateMigrationStage(ctx, onlineDDL.UUID, "re-enabling writes")
reenableWritesOnce() // this function is also deferred, in case of early return; but now would be a good time to resume writes, before we publish the migration as "complete"
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull, etaSecondsNow, s.rowsCopied, emptyHint)
return nil

// deferred function will re-enable writes now
// deferred function will unlock keyspace
}

// initMigrationSQLMode sets sql_mode according to DDL strategy, and returns a function that
Expand Down Expand Up @@ -3336,6 +3453,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
if isReady {
if err := e.cutOverVReplMigration(ctx, s); err != nil {
_ = e.updateMigrationMessage(ctx, uuid, err.Error())
log.Errorf("cutOverVReplMigration failed: err=%v", err)
if merr, ok := err.(*mysql.SQLError); ok {
switch merr.Num {
case mysql.ERTooLongIdent:
Expand Down Expand Up @@ -3772,6 +3890,31 @@ func (e *Executor) updateMigrationSpecialPlan(ctx context.Context, uuid string,
return err
}

func (e *Executor) updateMigrationStage(ctx context.Context, uuid string, stage string, args ...interface{}) error {
msg := fmt.Sprintf(stage, args...)
log.Infof("updateMigrationStage: uuid=%s, stage=%s", uuid, msg)
query, err := sqlparser.ParseAndBind(sqlUpdateStage,
sqltypes.StringBindVariable(msg),
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
_, err = e.execQuery(ctx, query)
return err
}

func (e *Executor) incrementCutoverAttempts(ctx context.Context, uuid string) error {
query, err := sqlparser.ParseAndBind(sqlIncrementCutoverAttempts,
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
_, err = e.execQuery(ctx, query)
return err
}

// updateMigrationTablet sets 'tablet' column to be this executor's tablet alias for given migration
func (e *Executor) updateMigrationTablet(ctx context.Context, uuid string) error {
query, err := sqlparser.ParseAndBind(sqlUpdateTablet,
Expand Down
Loading

0 comments on commit 67543ee

Please sign in to comment.