Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancing VTGate buffering for MoveTables and Shard by Shard Migration #13507

Merged
merged 9 commits into from
Aug 2, 2023
63 changes: 63 additions & 0 deletions doc/design-docs/VTGateBuffering.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Adding buffering to VTGate while switching traffic during a movetables operation

## Current buffering support in VTGate

VTGate currently supports buffering of queries during reparenting and resharding operations. This is done by buffering
the failing queries in the tablet gateway layer in vtgate. When a query fails, the reason for the failure is checked, to
see if is due to one of these.

To assist in diagnosing the root cause a _KeyspaceEventWatcher_ (aka *KEW*) was introduced. This watches the
SrvKeyspace (in a goroutine): if there is a change to the keyspace partitions in the topo it is considered that there is
a resharding operation in progress. The buffering logic subscribes to the keyspace event watcher.

Otherwise, if there are no tables to serve from, based on the health check results, it is assumed that there is a
cluster event where either the primary is being reparented or if the cluster is being restarted and all tablets are in
the process of starting up.

If either of these occurs, the _consistent_ flag is set to false for that keyspace. When that happens the keyspace
watcher checks, on every SrvKeyspace update, if the event has got resolved. This can happen when tablets are now
available (in case of a cluster event) or if the partition information indicates that resharding is complete.

When that happens. the keyspace event watcher publishes an event that the keyspace is now consistent. The buffers are
then drained and the queries retried by the tablet gateway.

## Adding buffering support for MoveTables

### Background

MoveTables does not affect the entire keyspace, just the tables being moved. Even if all tables are being moved there is
no change in existing keyspace or shard configurations. So the KEW doesn't detect a cluster event since the tablets are
still available and shard partitions are unchanged.

MoveTables moves tables from one keyspace to another. There are two flavors of MoveTables: one where the tables are
moved into all shards in the target keyspace. In Shard-By-Shard Migration user can specify a subset of shards to move
the tables into.

These are the topo attributes that are affected during a MoveTables (regular or shard-by-shard):

* *DeniedTables* in a shard's TabletControls. These are used to stop writes to the source keyspace for these tables.
While switching writes we first create these entries, wait for the target to catchup to the source (using gtid
positions), and then update the routing rules to point these tables to the target. When a primary sees a DeniedTables
entry during a DML it will error with an "enforce denied tables".
* *RoutingRules* (for regular movetables) and *ShardRoutingRules* (for shard by shard migration). Routing rules are
pointers for each table being moved to a keyspace. When a MoveTables is initiated, that keyspace is the source
keyspace. After traffic is switched the pointer is changed to point to the target keyspace. If routing rules are
specified, VTGate uses them to decide which keyspace to route each table.

### Changes

There are two main changes:

* The keyspace event watcher is enhanced to look at the topo attributes mentioned above. An SrvVSchema watcher looks for
changes in the Routing Rules. DeniedTables are only in the Shard records in the topo. So any changes to the
DeniedTables would not result in a notification. To get around that we change the traffic switcher to also rebuild
SrvVSchema when DeniedTables are modified.
* The logic to start buffering needs to look for the "enforce denied tables" error that is thrown by the vttablets when
it tries to execute a query on a table being switched.
* We cannot use the current query retry logic which is at the tablet gateway level: meaning the keyspace is already
fixed by the planner and cannot be changed in that layer. We need to add a new retry logic at a higher level (the
_newExecute_ method) and always replan before retrying a query. This also means that we need to bypass the plan cache
while retrying.



11 changes: 9 additions & 2 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"testing"
"time"

"vitess.io/vitess/go/mysql"

"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
Expand All @@ -52,8 +54,9 @@ var (
sidecarDBIdentifier = sqlparser.String(sqlparser.NewIdentifierCS(sidecarDBName))
mainClusterConfig *ClusterConfig
externalClusterConfig *ClusterConfig
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms"}
extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"}
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDurationStr,
"--buffer_size", "100000", "--buffer_min_time_between_failovers", "0s", "--buffer_max_failover_duration", loadTestBufferingWindowDurationStr}
extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"}
// This variable can be used within specific tests to alter vttablet behavior
extraVTTabletArgs = []string{}

Expand Down Expand Up @@ -730,6 +733,10 @@ func (vc *VitessCluster) getPrimaryTablet(t *testing.T, ksName, shardName string
return nil
}

func (vc *VitessCluster) GetVTGateConn(t *testing.T) *mysql.Conn {
return getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
}

func (vc *VitessCluster) startQuery(t *testing.T, query string) (func(t *testing.T), func(t *testing.T)) {
conn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
_, err := conn.ExecuteFetch("begin", 1000, false)
Expand Down
20 changes: 19 additions & 1 deletion go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ create table json_tbl (id int, j1 json, j2 json, primary key(id));
create table geom_tbl (id int, g geometry, p point, ls linestring, pg polygon, mp multipoint, mls multilinestring, mpg multipolygon, gc geometrycollection, primary key(id));
create table ` + "`blüb_tbl`" + ` (id int, val1 varchar(20), ` + "`blöb1`" + ` blob, val2 varbinary(20), ` + "`bl@b2`" + ` longblob, txt1 text, blb3 tinyblob, txt2 longtext, blb4 mediumblob, primary key(id));
create table reftable (id int, val1 varchar(20), primary key(id), key(val1));
create table loadtest (id int, name varchar(256), primary key(id), key(name));
`
// These should always be ignored in vreplication
internalSchema = `
Expand All @@ -76,6 +77,7 @@ create table reftable (id int, val1 varchar(20), primary key(id), key(val1));
"product": {},
"merchant": {},
"orders": {},
"loadtest": {},
"customer": {},
"customer_seq": {
"type": "sequence"
Expand Down Expand Up @@ -117,6 +119,14 @@ create table reftable (id int, val1 varchar(20), primary key(id), key(val1));
}
},
"tables": {
"loadtest": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
},
"customer": {
"column_vindexes": [
{
Expand Down Expand Up @@ -283,7 +293,15 @@ create table reftable (id int, val1 varchar(20), primary key(id), key(val1));
}
},
"tables": {
"customer": {
"loadtest": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
},
"customer": {
"column_vindexes": [
{
"column": "cid",
Expand Down
119 changes: 119 additions & 0 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sort"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -696,3 +697,121 @@ func isBinlogRowImageNoBlob(t *testing.T, tablet *cluster.VttabletProcess) bool
mode := strings.ToLower(rs.Rows[0][0].ToString())
return mode == "noblob"
}

const (
loadTestBufferingWindowDurationStr = "30s"
loadTestPostBufferingInsertWindow = 60 * time.Second // should be greater than loadTestBufferingWindowDurationStr
loadTestWaitForCancel = 30 * time.Second
loadTestWaitBetweenQueries = 2 * time.Millisecond
)

type loadGenerator struct {
t *testing.T
vc *VitessCluster
ctx context.Context
cancel context.CancelFunc
}

func newLoadGenerator(t *testing.T, vc *VitessCluster) *loadGenerator {
return &loadGenerator{
t: t,
vc: vc,
}
}

func (lg *loadGenerator) stop() {
time.Sleep(loadTestPostBufferingInsertWindow) // wait for buffering to stop and additional records to be inserted by startLoad after traffic is switched
log.Infof("Canceling load")
lg.cancel()
time.Sleep(loadTestWaitForCancel) // wait for cancel to take effect
log.Flush()

}

func (lg *loadGenerator) start() {
t := lg.t
lg.ctx, lg.cancel = context.WithCancel(context.Background())

var id int64
log.Infof("startLoad: starting")
queryTemplate := "insert into loadtest(id, name) values (%d, 'name-%d')"
var totalQueries, successfulQueries int64
var deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors int64
defer func() {

log.Infof("startLoad: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
totalQueries, successfulQueries, deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors)
}()
logOnce := true
for {
select {
case <-lg.ctx.Done():
log.Infof("startLoad: context cancelled")
log.Infof("startLoad: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors)
require.Equal(t, int64(0), deniedErrors)
require.Equal(t, int64(0), otherErrors)
require.Equal(t, totalQueries, successfulQueries)
return
default:
go func() {
conn := vc.GetVTGateConn(t)
defer conn.Close()
atomic.AddInt64(&id, 1)
query := fmt.Sprintf(queryTemplate, id, id)
_, err := conn.ExecuteFetch(query, 1, false)
atomic.AddInt64(&totalQueries, 1)
if err != nil {
sqlErr := err.(*mysql.SQLError)
if strings.Contains(strings.ToLower(err.Error()), "denied tables") {
log.Infof("startLoad: denied tables error executing query: %d:%v", sqlErr.Number(), err)
atomic.AddInt64(&deniedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") {
// this can happen when a second keyspace is setup with the same tables, but there are no routing rules
// set yet by MoveTables. So we ignore these errors.
atomic.AddInt64(&ambiguousErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") {
atomic.AddInt64(&reshardedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "not found") {
atomic.AddInt64(&tableNotFoundErrors, 1)
} else {
if logOnce {
log.Infof("startLoad: error executing query: %d:%v", sqlErr.Number(), err)
logOnce = false
}
atomic.AddInt64(&otherErrors, 1)
}
time.Sleep(loadTestWaitBetweenQueries)
} else {
atomic.AddInt64(&successfulQueries, 1)
}
}()
time.Sleep(loadTestWaitBetweenQueries)
}
}
}

func (lg *loadGenerator) waitForCount(want int64) {
t := lg.t
conn := vc.GetVTGateConn(t)
defer conn.Close()
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
qr, err := conn.ExecuteFetch("select count(*) from loadtest", 1, false)
require.NoError(t, err)
require.NotNil(t, qr)
got, _ := qr.Rows[0][0].ToInt64()

if int64(got) >= want {
return
}
select {
case <-timer.C:
require.FailNow(t, fmt.Sprintf("table %q did not reach the expected number of rows (%d) before the timeout of %s; last seen count: %v",
"loadtest", want, defaultTimeout, got))
default:
time.Sleep(defaultTick)
}
}
}
45 changes: 45 additions & 0 deletions go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package vreplication

import (
"testing"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"
)

func TestMoveTablesBuffering(t *testing.T) {
defaultRdonly = 1
vc = setupMinimalCluster(t)
defer vtgateConn.Close()
defer vc.TearDown(t)

currentWorkflowType = wrangler.MoveTablesWorkflow
setupMinimalCustomerKeyspace(t)
tables := "loadtest"
err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs,
tables, workflowActionCreate, "", "", "")
require.NoError(t, err)
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())

lg := newLoadGenerator(t, vc)
go func() {
lg.start()
}()
lg.waitForCount(1000)

catchup(t, targetTab1, workflowName, "MoveTables")
catchup(t, targetTab2, workflowName, "MoveTables")
vdiff1(t, ksWorkflow, "")
waitForLowLag(t, "customer", workflowName)
tstWorkflowSwitchReads(t, "", "")
tstWorkflowSwitchWrites(t)
log.Infof("SwitchWrites done")
lg.stop()

log.Infof("TestMoveTablesBuffering: done")
log.Flush()
}
26 changes: 20 additions & 6 deletions go/test/endtoend/vreplication/partial_movetables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ func TestPartialMoveTablesBasic(t *testing.T) {
defer func() {
extraVTGateArgs = origExtraVTGateArgs
}()
vc = setupCluster(t)
vc = setupMinimalCluster(t)
defer vtgateConn.Close()
defer vc.TearDown(t)
setupCustomerKeyspace(t)
setupMinimalCustomerKeyspace(t)

// Move customer table from unsharded product keyspace to
// sharded customer keyspace.
createMoveTablesWorkflow(t, "customer")
createMoveTablesWorkflow(t, "customer,loadtest")
tstWorkflowSwitchReadsAndWrites(t)
tstWorkflowComplete(t)

Expand All @@ -75,6 +75,8 @@ func TestPartialMoveTablesBasic(t *testing.T) {
applyShardRoutingRules(t, emptyShardRoutingRules)
require.Equal(t, emptyShardRoutingRules, getShardRoutingRules(t))

runWithLoad := true

// Now setup the customer2 keyspace so we can do a partial
// move tables for one of the two shards: 80-.
defaultRdonly = 0
Expand All @@ -88,8 +90,17 @@ func TestPartialMoveTablesBasic(t *testing.T) {

// start the partial movetables for 80-
err := tstWorkflowExec(t, defaultCellName, wfName, sourceKs, targetKs,
"customer", workflowActionCreate, "", shard, "")
"customer,loadtest", workflowActionCreate, "", shard, "")
require.NoError(t, err)
var lg *loadGenerator
if runWithLoad { // start load after routing rules are set, otherwise we end up with ambiguous tables
lg = newLoadGenerator(t, vc)
go func() {
lg.start()
}()
lg.waitForCount(1000)
}

targetTab1 = vc.getPrimaryTablet(t, targetKs, shard)
catchup(t, targetTab1, wfName, "Partial MoveTables Customer to Customer2")
vdiff1(t, ksWf, "")
Expand Down Expand Up @@ -221,14 +232,14 @@ func TestPartialMoveTablesBasic(t *testing.T) {
wfName = "partialDash80"
shard = "-80"
ksWf = fmt.Sprintf("%s.%s", targetKs, wfName)

// Start the partial movetables for -80, 80- has already been switched
err = tstWorkflowExec(t, defaultCellName, wfName, sourceKs, targetKs,
"customer", workflowActionCreate, "", shard, "")
"customer,loadtest", workflowActionCreate, "", shard, "")
require.NoError(t, err)
targetTab2 := vc.getPrimaryTablet(t, targetKs, shard)
catchup(t, targetTab2, wfName, "Partial MoveTables Customer to Customer2: -80")
vdiff1(t, ksWf, "")

// Switch all traffic for the shard
require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", ""))
expectedSwitchOutput = fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n",
Expand All @@ -243,6 +254,8 @@ func TestPartialMoveTablesBasic(t *testing.T) {
// target side (customer2).
require.Equal(t, postCutoverShardRoutingRules, getShardRoutingRules(t))

lg.stop()

// Cancel both reverse workflows (as we've done the cutover), which should
// clean up both the global routing rules and the shard routing rules.
for _, wf := range []string{"partialDash80", "partial80Dash"} {
Expand Down Expand Up @@ -272,4 +285,5 @@ func TestPartialMoveTablesBasic(t *testing.T) {

// Confirm that the shard routing rules are now gone.
require.Equal(t, emptyShardRoutingRules, getShardRoutingRules(t))

}
Loading
Loading