Skip to content

Commit

Permalink
Online DDL: detect vreplication errors via vreplication_log histo…
Browse files Browse the repository at this point in the history
…ry (#16925)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Oct 21, 2024
1 parent c5f0c03 commit 327142d
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 2 deletions.
16 changes: 16 additions & 0 deletions go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ var (
ALTER TABLE %s
DROP PRIMARY KEY,
DROP COLUMN vrepl_col`
alterTableFailedVreplicationStatement = `
ALTER TABLE %s
ADD UNIQUE KEY test_val_uidx (test_val)`
// We will run this query while throttling vreplication
alterTableThrottlingStatement = `
ALTER TABLE %s
Expand Down Expand Up @@ -456,6 +459,19 @@ func TestVreplSchemaChanges(t *testing.T) {
onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true)
// migration will fail again
})
t.Run("failed migration due to vreplication", func(t *testing.T) {
insertRows(t, 2)
uuid := testOnlineDDLStatement(t, alterTableFailedVreplicationStatement, "online", providedUUID, providedMigrationContext, "vtgate", "vrepl_col", "", false)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)

rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
message := row["message"].ToString()
assert.Contains(t, message, "vreplication: terminal error:", "migration row: %v", row)
}
})
t.Run("cancel all migrations: nothing to cancel", func(t *testing.T) {
// no migrations pending at this time
time.Sleep(10 * time.Second)
Expand Down
23 changes: 23 additions & 0 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"
Expand Down Expand Up @@ -3485,6 +3486,28 @@ func (e *Executor) readVReplStream(ctx context.Context, uuid string, okIfMissing
if err := prototext.Unmarshal([]byte(s.source), s.bls); err != nil {
return nil, err
}
{
// It's possible that an earlier error was overshadowed by a new non-error `message` values.
// Let's read _vt.vreplication_log to see whether there's any terminal errors in vreplication's history.
query, err := sqlparser.ParseAndBind(sqlReadVReplLogErrors,
sqltypes.Int32BindVariable(s.id),
sqltypes.StringBindVariable(vreplication.TerminalErrorIndicator),
)
if err != nil {
return nil, err
}
r, err := e.execQuery(ctx, query)
if err != nil {
return nil, err
}
// The query has LIMIT 1, ie returns at most one row
if row := r.Named().Row(); row != nil {
s.state = binlogdatapb.VReplicationWorkflowState_Error
if message := row.AsString("message", ""); message != "" {
s.message = "vreplication: " + message
}
}
}
return s, nil
}

Expand Down
14 changes: 14 additions & 0 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,20 @@ const (
WHERE
workflow=%a
`
sqlReadVReplLogErrors = `SELECT
state,
message
FROM _vt.vreplication_log
WHERE
vrepl_id=%a
AND (
state='Error'
OR locate (concat(%a, ':'), message) = 1
)
ORDER BY
id DESC
LIMIT 1
`
sqlReadCountCopyState = `SELECT
count(*) as cnt
FROM
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ const (
// give up and return an error message that the user
// can see and act upon if needed.
tabletPickerRetries = 5

// Prepended to the message to indicate that it is a terminal error.
TerminalErrorIndicator = "terminal error"
)

// controller is created by Engine. Members are initialized upfront.
Expand Down Expand Up @@ -305,7 +308,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy)) ||
isUnrecoverableError(err) ||
!ct.lastWorkflowError.ShouldRetry() {

err = vterrors.Wrapf(err, TerminalErrorIndicator)
if errSetState := vr.setState(binlogdatapb.VReplicationWorkflowState_Error, err.Error()); errSetState != nil {
log.Errorf("INTERNAL: unable to setState() in controller: %v. Could not set error text to: %v.", errSetState, err)
return err // yes, err and not errSetState.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1782,7 +1782,7 @@ func TestPlayerDDL(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"alter table t1 add column val2 varchar(128)",
"/update _vt.vreplication set message='error applying event: Duplicate",
"/update _vt.vreplication set state='Error', message='error applying event: Duplicate",
"/update _vt.vreplication set state='Error', message='terminal error: error applying event: Duplicate",
))
cancel()

Expand Down

0 comments on commit 327142d

Please sign in to comment.