diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index 6b8e9aee16f..41e2211f082 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -69,7 +69,7 @@ create table `+"`blüb_tbl`"+` (id int, val1 varchar(20), `+"`blöb1`"+` blob, create table reftable (id int, val1 varchar(20), primary key(id), key(val1)); create table loadtest (id int, name varchar(256), primary key(id), key(name)); create table nopk (name varchar(128), age int unsigned); - create table admins(team_id int, email varchar(128), primary key(team_id), unique key(email)); + create table admins(team_id int, email varchar(128), val varchar(256), primary key(team_id), unique key(email)); `, strings.Join(customerTypes, ",")) // These should always be ignored in vreplication internalSchema = ` diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 44c35d0acea..8671442b274 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -288,6 +288,26 @@ func waitForRowCountInTablet(t *testing.T, vttablet *cluster.VttabletProcess, da } } +func waitForResult(t *testing.T, vttablet *cluster.VttabletProcess, database string, query string, want string, timeout time.Duration) { + timer := time.NewTimer(timeout) + defer timer.Stop() + for { + qr, err := vttablet.QueryTablet(query, database, true) + require.NoError(t, err) + require.NotNil(t, qr) + if want == fmt.Sprintf("%v", qr.Rows) { + return + } + select { + case <-timer.C: + require.FailNow(t, fmt.Sprintf("query %q did not reach the expected result (%s) on tablet %q before the timeout of %s; last seen result: %s", + query, want, vttablet.Name, timeout, qr.Rows)) + default: + time.Sleep(defaultTick) + } + } +} + // waitForSequenceValue queries the provided sequence name in the // provided database using the provided vtgate connection until // we get a next value from it. This allows us to move forward diff --git a/go/test/endtoend/vreplication/unsharded_init_data.sql b/go/test/endtoend/vreplication/unsharded_init_data.sql index 9610c174725..e7142b04927 100644 --- a/go/test/endtoend/vreplication/unsharded_init_data.sql +++ b/go/test/endtoend/vreplication/unsharded_init_data.sql @@ -51,5 +51,5 @@ insert into reftable (id, val1) values (3, 'c') insert into reftable (id, val1) values (4, 'd') insert into reftable (id, val1) values (5, 'e') -insert into admins(team_id, email) values(1, 'a@example.com') -insert into admins(team_id, email) values(2, 'b@example.com') +insert into admins(team_id, email, val) values(1, 'a@example.com', 'ibis-1') +insert into admins(team_id, email, val) values(2, 'b@example.com', 'ibis-2') diff --git a/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go b/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go index 6f1c58f9207..89f20e68a74 100644 --- a/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go +++ b/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go @@ -2,8 +2,14 @@ package vreplication import ( "testing" + "time" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/test/endtoend/throttler" + "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" vttablet "vitess.io/vitess/go/vt/vttablet/common" ) @@ -29,20 +35,38 @@ func TestWorkflowDuplicateKeyBackoff(t *testing.T) { _ = targetTabs tables := "customer,admins" + req := &vtctldatapb.UpdateThrottlerConfigRequest{ + Enable: false, + } + res, err := throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, "customer", req, nil, nil) + require.NoError(t, err, res) + res, err = throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, "product", req, nil, nil) + require.NoError(t, err, res) + mt := createMoveTables(t, sourceKeyspaceName, targetKeyspaceName, workflowName, tables, nil, nil, nil) waitForWorkflowState(t, vc, "customer.wf1", binlogdatapb.VReplicationWorkflowState_Running.String()) mt.SwitchReadsAndWrites() vtgateConn, cancel := getVTGateConn() defer cancel() + + // team_id 1 => 80-, team_id 2 => -80 queries := []string{ - "update admins set email = null where team_id = 2", - "update admins set email = 'b@example.com' where team_id = 1", - "update admins set email = 'a@example.com' where team_id = 2", + "update admins set email = null, val = 'ibis-3' where team_id = 2", // -80 + "update admins set email = 'b@example.com', val = 'ibis-4' where team_id = 1", // 80- + "update admins set email = 'a@example.com', val = 'ibis-5' where team_id = 2", // -80 } - vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Stopped' where id = 2") + vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Stopped' where id = 1") //-80 for _, query := range queries { execVtgateQuery(t, vtgateConn, targetKeyspaceName, query) } - + // Since -80 is stopped the "update admins set email = 'b@example.com' where team_id = 1" will fail with duplicate key + // since it is already set for team_id = 2 + // The vplayer stream for -80 should backoff with the new logic and retry should be successful once the -80 stream is restarted + time.Sleep(5 * time.Second) + vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Running' where id = 1") + productTab := vc.Cells["zone1"].Keyspaces[sourceKeyspaceName].Shards["0"].Tablets["zone1-100"].Vttablet + waitForResult(t, productTab, "product", "select * from admins order by team_id", + "[[INT32(1) VARCHAR(\"b@example.com\") VARCHAR(\"ibis-4\")] [INT32(2) VARCHAR(\"a@example.com\") VARCHAR(\"ibis-5\")]]", 20*time.Second) + log.Infof("TestWorkflowDuplicateKeyBackoff passed") } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go index b8339cdf874..842c71a13d5 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go @@ -168,10 +168,45 @@ func (vc *vdbClient) Execute(query string) (*sqltypes.Result, error) { return vc.ExecuteFetch(query, vc.relayLogMaxItems) } +func (vc *vdbClient) IsRetryable(err error) bool { + if sqlErr, ok := err.(*sqlerror.SQLError); ok { + return sqlErr.Number() == sqlerror.ERDupEntry + } + return false +} + +func (vc *vdbClient) ExecuteWithRetryAndBackoff(ctx context.Context, query string) (*sqltypes.Result, error) { + timeout := 1 * time.Minute + shortCtx, cancel := context.WithDeadline(ctx, time.Now().Add(timeout)) + defer cancel() + attempts := 0 + backoffSeconds := 1 + for { + qr, err := vc.ExecuteWithRetry(ctx, query) + if err == nil { + return qr, nil + } + if !vc.IsRetryable(err) { + return nil, err + } + attempts++ + log.Infof("Backing off for %v seconds before retrying query: %v, got err %v", backoffSeconds, query, err) + select { + case <-shortCtx.Done(): + return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "backoff timeout exceeded while retrying query: %v", query) + case <-time.After(time.Duration(backoffSeconds) * time.Second): + } + backoffSeconds = (1 << attempts) >> 1 + } +} + func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqltypes.Result, error) { + ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() qr, err := vc.Execute(query) for err != nil { - if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout { + if sqlErr, ok := err.(*sqlerror.SQLError); ok && + sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout || sqlErr.Number() == sqlerror.ERDupEntry { log.Infof("retryable error: %v, waiting for %v and retrying", sqlErr, dbLockRetryDelay) if err := vc.Rollback(); err != nil { return nil, err @@ -179,7 +214,7 @@ func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqlty time.Sleep(dbLockRetryDelay) // Check context here. Otherwise this can become an infinite loop. select { - case <-ctx.Done(): + case <-ctx2.Done(): return nil, io.EOF default: } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index db2f3f341ac..06991338af9 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -134,7 +134,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map } batchMode := false if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { - batchMode = true + batchMode = false //true // FIXME } if batchMode { // relayLogMaxSize is effectively the limit used when not batching. diff --git a/test/config.json b/test/config.json index 185201cf3e0..a96229f0b35 100644 --- a/test/config.json +++ b/test/config.json @@ -1076,6 +1076,15 @@ "RetryMax": 0, "Tags": [] }, + "vreplication_workflow_dup": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestWorkflowDuplicateKeyBackoff"], + "Command": [], + "Manual": false, + "Shard": "vreplication_cellalias", + "RetryMax": 0, + "Tags": [] + }, "vreplication_multi_tenant": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication","-run", "MultiTenant"],