Skip to content

Commit

Permalink
Add additional test column to admins for debugging if required. Misc …
Browse files Browse the repository at this point in the history
…testing changes/comments. ExecWithRetry enhanced with a timeout instead of the backoff function for now. Need to see if this can stay or needs to be changed

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Oct 25, 2024
1 parent 6049a37 commit d076111
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 11 deletions.
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ create table `+"`blüb_tbl`"+` (id int, val1 varchar(20), `+"`blöb1`"+` blob,
create table reftable (id int, val1 varchar(20), primary key(id), key(val1));
create table loadtest (id int, name varchar(256), primary key(id), key(name));
create table nopk (name varchar(128), age int unsigned);
create table admins(team_id int, email varchar(128), primary key(team_id), unique key(email));
create table admins(team_id int, email varchar(128), val varchar(256), primary key(team_id), unique key(email));
`, strings.Join(customerTypes, ","))
// These should always be ignored in vreplication
internalSchema = `
Expand Down
20 changes: 20 additions & 0 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,26 @@ func waitForRowCountInTablet(t *testing.T, vttablet *cluster.VttabletProcess, da
}
}

func waitForResult(t *testing.T, vttablet *cluster.VttabletProcess, database string, query string, want string, timeout time.Duration) {
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
qr, err := vttablet.QueryTablet(query, database, true)
require.NoError(t, err)
require.NotNil(t, qr)
if want == fmt.Sprintf("%v", qr.Rows) {
return
}
select {
case <-timer.C:
require.FailNow(t, fmt.Sprintf("query %q did not reach the expected result (%s) on tablet %q before the timeout of %s; last seen result: %s",
query, want, vttablet.Name, timeout, qr.Rows))
default:
time.Sleep(defaultTick)
}
}
}

// waitForSequenceValue queries the provided sequence name in the
// provided database using the provided vtgate connection until
// we get a next value from it. This allows us to move forward
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/unsharded_init_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,5 @@ insert into reftable (id, val1) values (3, 'c')
insert into reftable (id, val1) values (4, 'd')
insert into reftable (id, val1) values (5, 'e')

insert into admins(team_id, email) values(1, '[email protected]')
insert into admins(team_id, email) values(2, '[email protected]')
insert into admins(team_id, email, val) values(1, '[email protected]', 'ibis-1')
insert into admins(team_id, email, val) values(2, '[email protected]', 'ibis-2')
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@ package vreplication

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

Expand All @@ -29,20 +35,38 @@ func TestWorkflowDuplicateKeyBackoff(t *testing.T) {
_ = targetTabs
tables := "customer,admins"

req := &vtctldatapb.UpdateThrottlerConfigRequest{
Enable: false,
}
res, err := throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, "customer", req, nil, nil)
require.NoError(t, err, res)
res, err = throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, "product", req, nil, nil)
require.NoError(t, err, res)

mt := createMoveTables(t, sourceKeyspaceName, targetKeyspaceName, workflowName, tables, nil, nil, nil)
waitForWorkflowState(t, vc, "customer.wf1", binlogdatapb.VReplicationWorkflowState_Running.String())
mt.SwitchReadsAndWrites()
vtgateConn, cancel := getVTGateConn()
defer cancel()

// team_id 1 => 80-, team_id 2 => -80
queries := []string{
"update admins set email = null where team_id = 2",
"update admins set email = '[email protected]' where team_id = 1",
"update admins set email = '[email protected]' where team_id = 2",
"update admins set email = null, val = 'ibis-3' where team_id = 2", // -80
"update admins set email = '[email protected]', val = 'ibis-4' where team_id = 1", // 80-
"update admins set email = '[email protected]', val = 'ibis-5' where team_id = 2", // -80
}

vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Stopped' where id = 2")
vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Stopped' where id = 1") //-80
for _, query := range queries {
execVtgateQuery(t, vtgateConn, targetKeyspaceName, query)
}

// Since -80 is stopped the "update admins set email = '[email protected]' where team_id = 1" will fail with duplicate key
// since it is already set for team_id = 2
// The vplayer stream for -80 should backoff with the new logic and retry should be successful once the -80 stream is restarted
time.Sleep(5 * time.Second)
vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Running' where id = 1")
productTab := vc.Cells["zone1"].Keyspaces[sourceKeyspaceName].Shards["0"].Tablets["zone1-100"].Vttablet
waitForResult(t, productTab, "product", "select * from admins order by team_id",
"[[INT32(1) VARCHAR(\"[email protected]\") VARCHAR(\"ibis-4\")] [INT32(2) VARCHAR(\"[email protected]\") VARCHAR(\"ibis-5\")]]", 20*time.Second)
log.Infof("TestWorkflowDuplicateKeyBackoff passed")
}
39 changes: 37 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vdbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,53 @@ func (vc *vdbClient) Execute(query string) (*sqltypes.Result, error) {
return vc.ExecuteFetch(query, vc.relayLogMaxItems)
}

func (vc *vdbClient) IsRetryable(err error) bool {
if sqlErr, ok := err.(*sqlerror.SQLError); ok {
return sqlErr.Number() == sqlerror.ERDupEntry
}
return false
}

func (vc *vdbClient) ExecuteWithRetryAndBackoff(ctx context.Context, query string) (*sqltypes.Result, error) {
timeout := 1 * time.Minute
shortCtx, cancel := context.WithDeadline(ctx, time.Now().Add(timeout))
defer cancel()
attempts := 0
backoffSeconds := 1
for {
qr, err := vc.ExecuteWithRetry(ctx, query)
if err == nil {
return qr, nil
}
if !vc.IsRetryable(err) {
return nil, err
}
attempts++
log.Infof("Backing off for %v seconds before retrying query: %v, got err %v", backoffSeconds, query, err)
select {
case <-shortCtx.Done():
return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "backoff timeout exceeded while retrying query: %v", query)
case <-time.After(time.Duration(backoffSeconds) * time.Second):
}
backoffSeconds = (1 << attempts) >> 1
}
}

func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqltypes.Result, error) {
ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
qr, err := vc.Execute(query)
for err != nil {
if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout {
if sqlErr, ok := err.(*sqlerror.SQLError); ok &&
sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout || sqlErr.Number() == sqlerror.ERDupEntry {
log.Infof("retryable error: %v, waiting for %v and retrying", sqlErr, dbLockRetryDelay)
if err := vc.Rollback(); err != nil {
return nil, err
}
time.Sleep(dbLockRetryDelay)
// Check context here. Otherwise this can become an infinite loop.
select {
case <-ctx.Done():
case <-ctx2.Done():
return nil, io.EOF
default:
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
}
batchMode := false
if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 {
batchMode = true
batchMode = false //true // FIXME
}
if batchMode {
// relayLogMaxSize is effectively the limit used when not batching.
Expand Down
9 changes: 9 additions & 0 deletions test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,15 @@
"RetryMax": 0,
"Tags": []
},
"vreplication_workflow_dup": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestWorkflowDuplicateKeyBackoff"],
"Command": [],
"Manual": false,
"Shard": "vreplication_cellalias",
"RetryMax": 0,
"Tags": []
},
"vreplication_multi_tenant": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication","-run", "MultiTenant"],
Expand Down

0 comments on commit d076111

Please sign in to comment.