From 327142de4a37c0ea201ef5c3e8e5d980d198f7c4 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 21 Oct 2024 09:57:34 +0300 Subject: [PATCH] Online DDL: detect `vreplication` errors via `vreplication_log` history (#16925) Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../onlineddl/vrepl/onlineddl_vrepl_test.go | 16 +++++++++++++ go/vt/vttablet/onlineddl/executor.go | 23 +++++++++++++++++++ go/vt/vttablet/onlineddl/schema.go | 14 +++++++++++ .../tabletmanager/vreplication/controller.go | 5 +++- .../vreplication/vplayer_flaky_test.go | 2 +- 5 files changed, 58 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go index f7bab109c05..161b1566680 100644 --- a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go +++ b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go @@ -80,6 +80,9 @@ var ( ALTER TABLE %s DROP PRIMARY KEY, DROP COLUMN vrepl_col` + alterTableFailedVreplicationStatement = ` + ALTER TABLE %s + ADD UNIQUE KEY test_val_uidx (test_val)` // We will run this query while throttling vreplication alterTableThrottlingStatement = ` ALTER TABLE %s @@ -456,6 +459,19 @@ func TestVreplSchemaChanges(t *testing.T) { onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true) // migration will fail again }) + t.Run("failed migration due to vreplication", func(t *testing.T) { + insertRows(t, 2) + uuid := testOnlineDDLStatement(t, alterTableFailedVreplicationStatement, "online", providedUUID, providedMigrationContext, "vtgate", "vrepl_col", "", false) + onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) + + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + message := row["message"].ToString() + assert.Contains(t, message, "vreplication: terminal error:", "migration row: %v", row) + } + }) t.Run("cancel all migrations: nothing to cancel", func(t *testing.T) { // no migrations pending at this time time.Sleep(10 * time.Second) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index aad8417237e..22dd9447bb9 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -60,6 +60,7 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" @@ -3485,6 +3486,28 @@ func (e *Executor) readVReplStream(ctx context.Context, uuid string, okIfMissing if err := prototext.Unmarshal([]byte(s.source), s.bls); err != nil { return nil, err } + { + // It's possible that an earlier error was overshadowed by a new non-error `message` values. + // Let's read _vt.vreplication_log to see whether there's any terminal errors in vreplication's history. + query, err := sqlparser.ParseAndBind(sqlReadVReplLogErrors, + sqltypes.Int32BindVariable(s.id), + sqltypes.StringBindVariable(vreplication.TerminalErrorIndicator), + ) + if err != nil { + return nil, err + } + r, err := e.execQuery(ctx, query) + if err != nil { + return nil, err + } + // The query has LIMIT 1, ie returns at most one row + if row := r.Named().Row(); row != nil { + s.state = binlogdatapb.VReplicationWorkflowState_Error + if message := row.AsString("message", ""); message != "" { + s.message = "vreplication: " + message + } + } + } return s, nil } diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 9023639fd00..a30ab6b3ed9 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -515,6 +515,20 @@ const ( WHERE workflow=%a ` + sqlReadVReplLogErrors = `SELECT + state, + message + FROM _vt.vreplication_log + WHERE + vrepl_id=%a + AND ( + state='Error' + OR locate (concat(%a, ':'), message) = 1 + ) + ORDER BY + id DESC + LIMIT 1 + ` sqlReadCountCopyState = `SELECT count(*) as cnt FROM diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 575300a685c..7067211ff10 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -47,6 +47,9 @@ const ( // give up and return an error message that the user // can see and act upon if needed. tabletPickerRetries = 5 + + // Prepended to the message to indicate that it is a terminal error. + TerminalErrorIndicator = "terminal error" ) // controller is created by Engine. Members are initialized upfront. @@ -305,7 +308,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy)) || isUnrecoverableError(err) || !ct.lastWorkflowError.ShouldRetry() { - + err = vterrors.Wrapf(err, TerminalErrorIndicator) if errSetState := vr.setState(binlogdatapb.VReplicationWorkflowState_Error, err.Error()); errSetState != nil { log.Errorf("INTERNAL: unable to setState() in controller: %v. Could not set error text to: %v.", errSetState, err) return err // yes, err and not errSetState. diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 420134ab7e3..ccf1ce9119a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -1782,7 +1782,7 @@ func TestPlayerDDL(t *testing.T) { expectDBClientQueries(t, qh.Expect( "alter table t1 add column val2 varchar(128)", "/update _vt.vreplication set message='error applying event: Duplicate", - "/update _vt.vreplication set state='Error', message='error applying event: Duplicate", + "/update _vt.vreplication set state='Error', message='terminal error: error applying event: Duplicate", )) cancel()