diff --git a/doc/design-docs/VTGateBuffering.md b/doc/design-docs/VTGateBuffering.md new file mode 100644 index 00000000000..9155929ea49 --- /dev/null +++ b/doc/design-docs/VTGateBuffering.md @@ -0,0 +1,63 @@ +# Adding buffering to VTGate while switching traffic during a movetables operation + +## Current buffering support in VTGate + +VTGate currently supports buffering of queries during reparenting and resharding operations. This is done by buffering +the failing queries in the tablet gateway layer in vtgate. When a query fails, the reason for the failure is checked, to +see if is due to one of these. + +To assist in diagnosing the root cause a _KeyspaceEventWatcher_ (aka *KEW*) was introduced. This watches the +SrvKeyspace (in a goroutine): if there is a change to the keyspace partitions in the topo it is considered that there is +a resharding operation in progress. The buffering logic subscribes to the keyspace event watcher. + +Otherwise, if there are no tables to serve from, based on the health check results, it is assumed that there is a +cluster event where either the primary is being reparented or if the cluster is being restarted and all tablets are in +the process of starting up. + +If either of these occurs, the _consistent_ flag is set to false for that keyspace. When that happens the keyspace +watcher checks, on every SrvKeyspace update, if the event has got resolved. This can happen when tablets are now +available (in case of a cluster event) or if the partition information indicates that resharding is complete. + +When that happens. the keyspace event watcher publishes an event that the keyspace is now consistent. The buffers are +then drained and the queries retried by the tablet gateway. + +## Adding buffering support for MoveTables + +### Background + +MoveTables does not affect the entire keyspace, just the tables being moved. Even if all tables are being moved there is +no change in existing keyspace or shard configurations. So the KEW doesn't detect a cluster event since the tablets are +still available and shard partitions are unchanged. + +MoveTables moves tables from one keyspace to another. There are two flavors of MoveTables: one where the tables are +moved into all shards in the target keyspace. In Shard-By-Shard Migration user can specify a subset of shards to move +the tables into. + +These are the topo attributes that are affected during a MoveTables (regular or shard-by-shard): + +* *DeniedTables* in a shard's TabletControls. These are used to stop writes to the source keyspace for these tables. + While switching writes we first create these entries, wait for the target to catchup to the source (using gtid + positions), and then update the routing rules to point these tables to the target. When a primary sees a DeniedTables + entry during a DML it will error with an "enforce denied tables". +* *RoutingRules* (for regular movetables) and *ShardRoutingRules* (for shard by shard migration). Routing rules are + pointers for each table being moved to a keyspace. When a MoveTables is initiated, that keyspace is the source + keyspace. After traffic is switched the pointer is changed to point to the target keyspace. If routing rules are + specified, VTGate uses them to decide which keyspace to route each table. + +### Changes + +There are two main changes: + +* The keyspace event watcher is enhanced to look at the topo attributes mentioned above. An SrvVSchema watcher looks for + changes in the Routing Rules. DeniedTables are only in the Shard records in the topo. So any changes to the + DeniedTables would not result in a notification. To get around that we change the traffic switcher to also rebuild + SrvVSchema when DeniedTables are modified. +* The logic to start buffering needs to look for the "enforce denied tables" error that is thrown by the vttablets when + it tries to execute a query on a table being switched. +* We cannot use the current query retry logic which is at the tablet gateway level: meaning the keyspace is already + fixed by the planner and cannot be changed in that layer. We need to add a new retry logic at a higher level (the + _newExecute_ method) and always replan before retrying a query. This also means that we need to bypass the plan cache + while retrying. + + + diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 12539b778de..54ba2d3d0b0 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -30,6 +30,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" @@ -52,8 +54,9 @@ var ( sidecarDBIdentifier = sqlparser.String(sqlparser.NewIdentifierCS(sidecarDBName)) mainClusterConfig *ClusterConfig externalClusterConfig *ClusterConfig - extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms"} - extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"} + extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDurationStr, + "--buffer_size", "100000", "--buffer_min_time_between_failovers", "0s", "--buffer_max_failover_duration", loadTestBufferingWindowDurationStr} + extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"} // This variable can be used within specific tests to alter vttablet behavior extraVTTabletArgs = []string{} @@ -730,6 +733,10 @@ func (vc *VitessCluster) getPrimaryTablet(t *testing.T, ksName, shardName string return nil } +func (vc *VitessCluster) GetVTGateConn(t *testing.T) *mysql.Conn { + return getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) +} + func (vc *VitessCluster) startQuery(t *testing.T, query string) (func(t *testing.T), func(t *testing.T)) { conn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) _, err := conn.ExecuteFetch("begin", 1000, false) diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index 213ad0bcc75..dcae0f6a5bf 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -59,6 +59,7 @@ create table json_tbl (id int, j1 json, j2 json, primary key(id)); create table geom_tbl (id int, g geometry, p point, ls linestring, pg polygon, mp multipoint, mls multilinestring, mpg multipolygon, gc geometrycollection, primary key(id)); create table ` + "`blüb_tbl`" + ` (id int, val1 varchar(20), ` + "`blöb1`" + ` blob, val2 varbinary(20), ` + "`bl@b2`" + ` longblob, txt1 text, blb3 tinyblob, txt2 longtext, blb4 mediumblob, primary key(id)); create table reftable (id int, val1 varchar(20), primary key(id), key(val1)); +create table loadtest (id int, name varchar(256), primary key(id), key(name)); ` // These should always be ignored in vreplication internalSchema = ` @@ -76,6 +77,7 @@ create table reftable (id int, val1 varchar(20), primary key(id), key(val1)); "product": {}, "merchant": {}, "orders": {}, + "loadtest": {}, "customer": {}, "customer_seq": { "type": "sequence" @@ -117,6 +119,14 @@ create table reftable (id int, val1 varchar(20), primary key(id), key(val1)); } }, "tables": { + "loadtest": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] + }, "customer": { "column_vindexes": [ { @@ -283,7 +293,15 @@ create table reftable (id int, val1 varchar(20), primary key(id), key(val1)); } }, "tables": { - "customer": { + "loadtest": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] + }, + "customer": { "column_vindexes": [ { "column": "cid", diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 79052307c7a..20d31948e26 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -29,6 +29,7 @@ import ( "sort" "strconv" "strings" + "sync/atomic" "testing" "time" @@ -696,3 +697,121 @@ func isBinlogRowImageNoBlob(t *testing.T, tablet *cluster.VttabletProcess) bool mode := strings.ToLower(rs.Rows[0][0].ToString()) return mode == "noblob" } + +const ( + loadTestBufferingWindowDurationStr = "30s" + loadTestPostBufferingInsertWindow = 60 * time.Second // should be greater than loadTestBufferingWindowDurationStr + loadTestWaitForCancel = 30 * time.Second + loadTestWaitBetweenQueries = 2 * time.Millisecond +) + +type loadGenerator struct { + t *testing.T + vc *VitessCluster + ctx context.Context + cancel context.CancelFunc +} + +func newLoadGenerator(t *testing.T, vc *VitessCluster) *loadGenerator { + return &loadGenerator{ + t: t, + vc: vc, + } +} + +func (lg *loadGenerator) stop() { + time.Sleep(loadTestPostBufferingInsertWindow) // wait for buffering to stop and additional records to be inserted by startLoad after traffic is switched + log.Infof("Canceling load") + lg.cancel() + time.Sleep(loadTestWaitForCancel) // wait for cancel to take effect + log.Flush() + +} + +func (lg *loadGenerator) start() { + t := lg.t + lg.ctx, lg.cancel = context.WithCancel(context.Background()) + + var id int64 + log.Infof("startLoad: starting") + queryTemplate := "insert into loadtest(id, name) values (%d, 'name-%d')" + var totalQueries, successfulQueries int64 + var deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors int64 + defer func() { + + log.Infof("startLoad: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d", + totalQueries, successfulQueries, deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors) + }() + logOnce := true + for { + select { + case <-lg.ctx.Done(): + log.Infof("startLoad: context cancelled") + log.Infof("startLoad: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d", + deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors) + require.Equal(t, int64(0), deniedErrors) + require.Equal(t, int64(0), otherErrors) + require.Equal(t, totalQueries, successfulQueries) + return + default: + go func() { + conn := vc.GetVTGateConn(t) + defer conn.Close() + atomic.AddInt64(&id, 1) + query := fmt.Sprintf(queryTemplate, id, id) + _, err := conn.ExecuteFetch(query, 1, false) + atomic.AddInt64(&totalQueries, 1) + if err != nil { + sqlErr := err.(*mysql.SQLError) + if strings.Contains(strings.ToLower(err.Error()), "denied tables") { + log.Infof("startLoad: denied tables error executing query: %d:%v", sqlErr.Number(), err) + atomic.AddInt64(&deniedErrors, 1) + } else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") { + // this can happen when a second keyspace is setup with the same tables, but there are no routing rules + // set yet by MoveTables. So we ignore these errors. + atomic.AddInt64(&ambiguousErrors, 1) + } else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") { + atomic.AddInt64(&reshardedErrors, 1) + } else if strings.Contains(strings.ToLower(err.Error()), "not found") { + atomic.AddInt64(&tableNotFoundErrors, 1) + } else { + if logOnce { + log.Infof("startLoad: error executing query: %d:%v", sqlErr.Number(), err) + logOnce = false + } + atomic.AddInt64(&otherErrors, 1) + } + time.Sleep(loadTestWaitBetweenQueries) + } else { + atomic.AddInt64(&successfulQueries, 1) + } + }() + time.Sleep(loadTestWaitBetweenQueries) + } + } +} + +func (lg *loadGenerator) waitForCount(want int64) { + t := lg.t + conn := vc.GetVTGateConn(t) + defer conn.Close() + timer := time.NewTimer(defaultTimeout) + defer timer.Stop() + for { + qr, err := conn.ExecuteFetch("select count(*) from loadtest", 1, false) + require.NoError(t, err) + require.NotNil(t, qr) + got, _ := qr.Rows[0][0].ToInt64() + + if int64(got) >= want { + return + } + select { + case <-timer.C: + require.FailNow(t, fmt.Sprintf("table %q did not reach the expected number of rows (%d) before the timeout of %s; last seen count: %v", + "loadtest", want, defaultTimeout, got)) + default: + time.Sleep(defaultTick) + } + } +} diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go new file mode 100644 index 00000000000..2dc2db7b909 --- /dev/null +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -0,0 +1,45 @@ +package vreplication + +import ( + "testing" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/wrangler" +) + +func TestMoveTablesBuffering(t *testing.T) { + defaultRdonly = 1 + vc = setupMinimalCluster(t) + defer vtgateConn.Close() + defer vc.TearDown(t) + + currentWorkflowType = wrangler.MoveTablesWorkflow + setupMinimalCustomerKeyspace(t) + tables := "loadtest" + err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs, + tables, workflowActionCreate, "", "", "") + require.NoError(t, err) + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) + + lg := newLoadGenerator(t, vc) + go func() { + lg.start() + }() + lg.waitForCount(1000) + + catchup(t, targetTab1, workflowName, "MoveTables") + catchup(t, targetTab2, workflowName, "MoveTables") + vdiff1(t, ksWorkflow, "") + waitForLowLag(t, "customer", workflowName) + tstWorkflowSwitchReads(t, "", "") + tstWorkflowSwitchWrites(t) + log.Infof("SwitchWrites done") + lg.stop() + + log.Infof("TestMoveTablesBuffering: done") + log.Flush() +} diff --git a/go/test/endtoend/vreplication/partial_movetables_test.go b/go/test/endtoend/vreplication/partial_movetables_test.go index 1cba9a6b4f1..321d6afc6c1 100644 --- a/go/test/endtoend/vreplication/partial_movetables_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_test.go @@ -51,14 +51,14 @@ func TestPartialMoveTablesBasic(t *testing.T) { defer func() { extraVTGateArgs = origExtraVTGateArgs }() - vc = setupCluster(t) + vc = setupMinimalCluster(t) defer vtgateConn.Close() defer vc.TearDown(t) - setupCustomerKeyspace(t) + setupMinimalCustomerKeyspace(t) // Move customer table from unsharded product keyspace to // sharded customer keyspace. - createMoveTablesWorkflow(t, "customer") + createMoveTablesWorkflow(t, "customer,loadtest") tstWorkflowSwitchReadsAndWrites(t) tstWorkflowComplete(t) @@ -75,6 +75,8 @@ func TestPartialMoveTablesBasic(t *testing.T) { applyShardRoutingRules(t, emptyShardRoutingRules) require.Equal(t, emptyShardRoutingRules, getShardRoutingRules(t)) + runWithLoad := true + // Now setup the customer2 keyspace so we can do a partial // move tables for one of the two shards: 80-. defaultRdonly = 0 @@ -88,8 +90,17 @@ func TestPartialMoveTablesBasic(t *testing.T) { // start the partial movetables for 80- err := tstWorkflowExec(t, defaultCellName, wfName, sourceKs, targetKs, - "customer", workflowActionCreate, "", shard, "") + "customer,loadtest", workflowActionCreate, "", shard, "") require.NoError(t, err) + var lg *loadGenerator + if runWithLoad { // start load after routing rules are set, otherwise we end up with ambiguous tables + lg = newLoadGenerator(t, vc) + go func() { + lg.start() + }() + lg.waitForCount(1000) + } + targetTab1 = vc.getPrimaryTablet(t, targetKs, shard) catchup(t, targetTab1, wfName, "Partial MoveTables Customer to Customer2") vdiff1(t, ksWf, "") @@ -221,14 +232,14 @@ func TestPartialMoveTablesBasic(t *testing.T) { wfName = "partialDash80" shard = "-80" ksWf = fmt.Sprintf("%s.%s", targetKs, wfName) - // Start the partial movetables for -80, 80- has already been switched err = tstWorkflowExec(t, defaultCellName, wfName, sourceKs, targetKs, - "customer", workflowActionCreate, "", shard, "") + "customer,loadtest", workflowActionCreate, "", shard, "") require.NoError(t, err) targetTab2 := vc.getPrimaryTablet(t, targetKs, shard) catchup(t, targetTab2, wfName, "Partial MoveTables Customer to Customer2: -80") vdiff1(t, ksWf, "") + // Switch all traffic for the shard require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "")) expectedSwitchOutput = fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n", @@ -243,6 +254,8 @@ func TestPartialMoveTablesBasic(t *testing.T) { // target side (customer2). require.Equal(t, postCutoverShardRoutingRules, getShardRoutingRules(t)) + lg.stop() + // Cancel both reverse workflows (as we've done the cutover), which should // clean up both the global routing rules and the shard routing rules. for _, wf := range []string{"partialDash80", "partial80Dash"} { @@ -272,4 +285,5 @@ func TestPartialMoveTablesBasic(t *testing.T) { // Confirm that the shard routing rules are now gone. require.Equal(t, emptyShardRoutingRules, getShardRoutingRules(t)) + } diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index a55ad3047e1..b5a8b0a980f 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -429,7 +429,7 @@ func testMoveTablesV2Workflow(t *testing.T) { setupCustomerKeyspace(t) // The purge table should get skipped/ignored // If it's not then we'll get an error as the table doesn't exist in the vschema - createMoveTablesWorkflow(t, "customer,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431") + createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431") if !strings.Contains(lastOutput, "Workflow started successfully") { t.Fail() } @@ -639,21 +639,58 @@ func setupCustomer2Keyspace(t *testing.T) { c2shards := []string{"-80", "80-"} c2keyspace := "customer2" if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, c2keyspace, strings.Join(c2shards, ","), - customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 1200, nil); err != nil { + customerVSchema, customerSchema, 0, 0, 1200, nil); err != nil { t.Fatal(err) } for _, c2shard := range c2shards { err := cluster.WaitForHealthyShard(vc.VtctldClient, c2keyspace, c2shard) require.NoError(t, err) - if defaultReplicas > 0 { - require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", c2keyspace, c2shard), defaultReplicas, 30*time.Second)) - } - if defaultRdonly > 0 { - require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", c2keyspace, c2shard), defaultRdonly, 30*time.Second)) - } + require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", c2keyspace, c2shard), 1, 30*time.Second)) } } +func setupMinimalCluster(t *testing.T) *VitessCluster { + cells := []string{"zone1"} + + vc = NewVitessCluster(t, "TestBasicVreplicationWorkflow", cells, mainClusterConfig) + require.NotNil(t, vc) + defaultCellName := "zone1" + allCellNames = defaultCellName + defaultCell = vc.Cells[defaultCellName] + + zone1 := vc.Cells["zone1"] + + vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, 0, 0, 100, nil) + + vtgate = zone1.Vtgates[0] + require.NotNil(t, vtgate) + err := cluster.WaitForHealthyShard(vc.VtctldClient, "product", "0") + require.NoError(t, err) + require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1, 30*time.Second)) + + vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + verifyClusterHealth(t, vc) + insertInitialData(t) + + sourceTab = vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet + + return vc +} + +func setupMinimalCustomerKeyspace(t *testing.T) { + if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, "customer", "-80,80-", + customerVSchema, customerSchema, 0, 0, 200, nil); err != nil { + t.Fatal(err) + } + require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, "customer", "-80")) + require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, "customer", "80-")) + require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "customer", "-80"), 1, 30*time.Second)) + require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "customer", "80-"), 1, 30*time.Second)) + custKs := vc.Cells[defaultCell.Name].Keyspaces["customer"] + targetTab1 = custKs.Shards["-80"].Tablets["zone1-200"].Vttablet + targetTab2 = custKs.Shards["80-"].Tablets["zone1-300"].Vttablet +} + func TestSwitchReadsWritesInAnyOrder(t *testing.T) { vc = setupCluster(t) defer vc.TearDown(t) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 35f7062d27d..f908d5c5150 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -328,6 +328,7 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string verifyClusterHealth(t, vc) insertInitialData(t) materializeRollup(t) + shardCustomer(t, true, []*Cell{defaultCell}, defaultCellName, false) // the Lead and Lead-1 tables tested a specific case with binary sharding keys. Drop it now so that we don't @@ -623,6 +624,8 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) { shardCustomer(t, true, []*Cell{cell1, cell2}, "alias", false) } +var queryErrorCount int64 + // testVStreamFrom confirms that the "vstream * from" endpoint is serving data func testVStreamFrom(t *testing.T, table string, expectedRowCount int) { ctx := context.Background() @@ -705,7 +708,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl defaultCell := cells[0] custKs := vc.Cells[defaultCell.Name].Keyspaces["customer"] - tables := "customer,Lead,Lead-1,db_order_test,geom_tbl,json_tbl,blüb_tbl,vdiff_order,reftable" + tables := "customer,loadtest,Lead,Lead-1,db_order_test,geom_tbl,json_tbl,blüb_tbl,vdiff_order,reftable" moveTablesAction(t, "Create", sourceCellOrAlias, workflow, sourceKs, targetKs, tables) customerTab1 := custKs.Shards["-80"].Tablets["zone1-200"].Vttablet diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index f3109b2123b..4500a98868c 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -19,14 +19,14 @@ package vreplication var dryRunResultsSwitchWritesCustomerShard = []string{ "Lock keyspace product", "Lock keyspace customer", - "Stop writes on keyspace product, tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,reftable,vdiff_order]:", + "Stop writes on keyspace product, tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]:", "/ Keyspace product, Shard 0 at Position", "Wait for VReplication on stopped streams to catchup for up to 30s", "Create reverse replication workflow p2c_reverse", "Create journal entries on source databases", - "Enable writes on keyspace customer tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,reftable,vdiff_order]", + "Enable writes on keyspace customer tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]", "Switch routing from keyspace product to keyspace customer", - "Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,reftable,vdiff_order] will be updated", + "Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated", "Switch writes completed, freeze and delete vreplication streams on:", " tablet 200 ", " tablet 300 ", @@ -41,8 +41,8 @@ var dryRunResultsSwitchWritesCustomerShard = []string{ var dryRunResultsReadCustomerShard = []string{ "Lock keyspace product", - "Switch reads for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,reftable,vdiff_order] to keyspace customer for tablet types [RDONLY,REPLICA]", - "Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,reftable,vdiff_order] will be updated", + "Switch reads for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] to keyspace customer for tablet types [RDONLY,REPLICA]", + "Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated", "Unlock keyspace product", } diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index 89a0fb23eac..bf5cfcdf1df 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -26,10 +26,12 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" "vitess.io/vitess/go/vt/sidecardb" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/topotools" ) // KeyspaceEventWatcher is an auxiliary watcher that watches all availability incidents @@ -62,6 +64,9 @@ type KeyspaceEvent struct { // Shards is a list of all the shards in the keyspace, including their state after the event is resolved Shards []ShardEvent + + // MoveTablesState records the current state of an ongoing MoveTables workflow + MoveTablesState MoveTablesState } type ShardEvent struct { @@ -86,6 +91,16 @@ func NewKeyspaceEventWatcher(ctx context.Context, topoServer srvtopo.Server, hc return kew } +type MoveTablesStatus int + +const ( + MoveTablesUnknown MoveTablesStatus = iota + // MoveTablesSwitching is set when the write traffic is the middle of being switched from the source to the target + MoveTablesSwitching + // MoveTablesSwitched is set when write traffic has been completely switched to the target + MoveTablesSwitched +) + // keyspaceState is the internal state for all the keyspaces that the KEW is // currently watching type keyspaceState struct { @@ -99,6 +114,8 @@ type keyspaceState struct { lastError error lastKeyspace *topodatapb.SrvKeyspace shards map[string]*shardState + + moveTablesState *MoveTablesState } // Format prints the internal state for this keyspace for debug purposes @@ -134,6 +151,7 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool { // for all the known shards, try to find a primary shard besides the one we're trying to access // and which is currently healthy. if there are other healthy primaries in the keyspace, it means // we're in the middle of a resharding operation + // FIXME: probably doesn't work for anything other than 1->2 resharding for shard, sstate := range kss.shards { if shard != currentShard && sstate.serving { return true @@ -218,6 +236,10 @@ func (kss *keyspaceState) ensureConsistentLocked() { return } + if kss.moveTablesState != nil && kss.moveTablesState.Typ != MoveTablesNone && kss.moveTablesState.State != MoveTablesSwitched { + return + } + // get the topology metadata for our primary from `lastKeyspace`; this value is refreshed // from our topology watcher whenever a change is detected, so it should always be up to date primary := topoproto.SrvKeyspaceGetPartition(kss.lastKeyspace, topodatapb.TabletType_PRIMARY) @@ -252,16 +274,25 @@ func (kss *keyspaceState) ensureConsistentLocked() { } } + // clone the current moveTablesState, if any, to handle race conditions where it can get updated while we're broadcasting + var moveTablesState MoveTablesState + if kss.moveTablesState != nil { + moveTablesState = *kss.moveTablesState + } + + ksevent := &KeyspaceEvent{ + Cell: kss.kew.localCell, + Keyspace: kss.keyspace, + Shards: make([]ShardEvent, 0, len(kss.shards)), + MoveTablesState: moveTablesState, + } + // we haven't found any inconsistencies between the HealthCheck stream and the topology // watcher. this means the ongoing availability event has been resolved, so we can broadcast // a resolution event to all listeners kss.consistent = true - ksevent := &KeyspaceEvent{ - Cell: kss.kew.localCell, - Keyspace: kss.keyspace, - Shards: make([]ShardEvent, 0, len(kss.shards)), - } + kss.moveTablesState = nil for shard, sstate := range kss.shards { ksevent.Shards = append(ksevent.Shards, ShardEvent{ @@ -329,6 +360,97 @@ func (kss *keyspaceState) onHealthCheck(th *TabletHealth) { kss.ensureConsistentLocked() } +type MoveTablesType int + +const ( + MoveTablesNone MoveTablesType = iota + MoveTablesRegular + MoveTablesShardByShard +) + +type MoveTablesState struct { + Typ MoveTablesType + State MoveTablesStatus +} + +func (kss *keyspaceState) getMoveTablesStatus(vs *vschemapb.SrvVSchema) (*MoveTablesState, error) { + mtState := &MoveTablesState{ + Typ: MoveTablesNone, + State: MoveTablesUnknown, + } + + // if there are no routing rules defined, then movetables is not in progress, exit early + if (vs.RoutingRules != nil && len(vs.RoutingRules.Rules) == 0) && + (vs.ShardRoutingRules != nil && len(vs.ShardRoutingRules.Rules) == 0) { + return mtState, nil + } + + shortCtx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + ts, _ := kss.kew.ts.GetTopoServer() + + // collect all current shard information from the topo + var shardInfos []*topo.ShardInfo + for _, sstate := range kss.shards { + si, err := ts.GetShard(shortCtx, kss.keyspace, sstate.target.Shard) + if err != nil { + return nil, err + } + shardInfos = append(shardInfos, si) + } + + // check if any shard has denied tables and if so, record one of these to check where it currently points to + // using the (shard) routing rules + var shardsWithDeniedTables []string + var oneDeniedTable string + for _, si := range shardInfos { + for _, tc := range si.TabletControls { + if len(tc.DeniedTables) > 0 { + oneDeniedTable = tc.DeniedTables[0] + shardsWithDeniedTables = append(shardsWithDeniedTables, si.ShardName()) + } + } + } + if len(shardsWithDeniedTables) == 0 { + return mtState, nil + } + + // check if a shard by shard migration is in progress and if so detect if it has been switched + isPartialTables := vs.ShardRoutingRules != nil && len(vs.ShardRoutingRules.Rules) > 0 + + if isPartialTables { + srr := topotools.GetShardRoutingRulesMap(vs.ShardRoutingRules) + mtState.Typ = MoveTablesShardByShard + mtState.State = MoveTablesSwitched + for _, shard := range shardsWithDeniedTables { + ruleKey := topotools.GetShardRoutingRuleKey(kss.keyspace, shard) + if _, ok := srr[ruleKey]; ok { + // still pointing to the source shard + mtState.State = MoveTablesSwitching + break + } + } + log.Infof("getMoveTablesStatus: keyspace %s declaring partial move tables %v", kss.keyspace, mtState) + return mtState, nil + } + + // it wasn't a shard by shard migration, but since we have denied tables it must be a regular MoveTables + mtState.Typ = MoveTablesRegular + mtState.State = MoveTablesSwitching + rr := topotools.GetRoutingRulesMap(vs.RoutingRules) + if rr != nil { + r, ok := rr[oneDeniedTable] + // if a rule exists for the table and points to the target keyspace, writes have been switched + if ok && len(r) > 0 && r[0] != fmt.Sprintf("%s.%s", kss.keyspace, oneDeniedTable) { + mtState.State = MoveTablesSwitched + log.Infof("onSrvKeyspace:: keyspace %s writes have been switched for table %s, rule %v", kss.keyspace, oneDeniedTable, r[0]) + } + } + log.Infof("getMoveTablesStatus: keyspace %s declaring regular move tables %v", kss.keyspace, mtState) + + return mtState, nil +} + // onSrvKeyspace is the callback that updates this keyspace with fresh topology data from our topology server. // this callback is called from a Watcher in the topo server whenever a change to the topology for this keyspace // occurs. this watcher is dedicated to this keyspace, and will only yield topology metadata changes for as @@ -391,6 +513,23 @@ func (kss *keyspaceState) isServing() bool { return false } +// onSrvVSchema is called from a Watcher in the topo server whenever the SrvVSchema is updated by Vitess. +// For the purposes here, we are interested in updates to the RoutingRules or ShardRoutingRules. +// In addition, the traffic switcher updates SrvVSchema when the DeniedTables attributes in a Shard record is +// modified. +func (kss *keyspaceState) onSrvVSchema(vs *vschemapb.SrvVSchema, err error) bool { + kss.mu.Lock() + defer kss.mu.Unlock() + kss.moveTablesState, _ = kss.getMoveTablesStatus(vs) + if kss.moveTablesState != nil && kss.moveTablesState.Typ != MoveTablesNone { + // mark the keyspace as inconsistent. ensureConsistentLocked() checks if the workflow is switched, + // and if so, it will send an event to the buffering subscribers to indicate that buffering can be stopped. + kss.consistent = false + kss.ensureConsistentLocked() + } + return true +} + // newKeyspaceState allocates the internal state required to keep track of availability incidents // in this keyspace, and starts up a SrvKeyspace watcher on our topology server which will update // our keyspaceState with any topology changes in real time. @@ -402,6 +541,7 @@ func newKeyspaceState(kew *KeyspaceEventWatcher, cell, keyspace string) *keyspac shards: make(map[string]*shardState), } kew.ts.WatchSrvKeyspace(context.Background(), cell, keyspace, kss.onSrvKeyspace) + kew.ts.WatchSrvVSchema(context.Background(), cell, kss.onSrvVSchema) return kss } @@ -421,7 +561,6 @@ func (kew *KeyspaceEventWatcher) processHealthCheck(th *TabletHealth) { func (kew *KeyspaceEventWatcher) getKeyspaceStatus(keyspace string) *keyspaceState { kew.mu.Lock() defer kew.mu.Unlock() - kss := kew.keyspaces[keyspace] if kss == nil { kss = newKeyspaceState(kew, kew.localCell, keyspace) diff --git a/go/vt/topotools/routing_rules.go b/go/vt/topotools/routing_rules.go index 6dfa8b655ca..9eb64c936d7 100644 --- a/go/vt/topotools/routing_rules.go +++ b/go/vt/topotools/routing_rules.go @@ -27,6 +27,19 @@ import ( vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) +//region routing rules + +func GetRoutingRulesMap(rules *vschemapb.RoutingRules) map[string][]string { + if rules == nil { + return nil + } + rulesMap := make(map[string][]string, len(rules.Rules)) + for _, rr := range rules.Rules { + rulesMap[rr.FromTable] = rr.ToTables + } + return rulesMap +} + // GetRoutingRules fetches routing rules from the topology server and returns a // mapping of fromTable=>[]toTables. func GetRoutingRules(ctx context.Context, ts *topo.Server) (map[string][]string, error) { @@ -35,10 +48,7 @@ func GetRoutingRules(ctx context.Context, ts *topo.Server) (map[string][]string, return nil, err } - rules := make(map[string][]string, len(rrs.Rules)) - for _, rr := range rrs.Rules { - rules[rr.FromTable] = rr.ToTables - } + rules := GetRoutingRulesMap(rrs) return rules, nil } @@ -59,6 +69,29 @@ func SaveRoutingRules(ctx context.Context, ts *topo.Server, rules map[string][]s return ts.SaveRoutingRules(ctx, rrs) } +//endregion + +//region shard routing rules + +func GetShardRoutingRuleKey(fromKeyspace, shard string) string { + return fmt.Sprintf("%s.%s", fromKeyspace, shard) +} +func ParseShardRoutingRuleKey(key string) (string, string) { + arr := strings.Split(key, ".") + return arr[0], arr[1] +} + +func GetShardRoutingRulesMap(rules *vschemapb.ShardRoutingRules) map[string]string { + if rules == nil { + return nil + } + rulesMap := make(map[string]string, len(rules.Rules)) + for _, rr := range rules.Rules { + rulesMap[GetShardRoutingRuleKey(rr.FromKeyspace, rr.Shard)] = rr.ToKeyspace + } + return rulesMap +} + // GetShardRoutingRules fetches shard routing rules from the topology server and returns a // mapping of fromKeyspace.Shard=>toKeyspace. func GetShardRoutingRules(ctx context.Context, ts *topo.Server) (map[string]string, error) { @@ -67,10 +100,7 @@ func GetShardRoutingRules(ctx context.Context, ts *topo.Server) (map[string]stri return nil, err } - rules := make(map[string]string, len(rrs.Rules)) - for _, rr := range rrs.Rules { - rules[fmt.Sprintf("%s.%s", rr.FromKeyspace, rr.Shard)] = rr.ToKeyspace - } + rules := GetShardRoutingRulesMap(rrs) return rules, nil } @@ -82,9 +112,7 @@ func SaveShardRoutingRules(ctx context.Context, ts *topo.Server, srr map[string] srs := &vschemapb.ShardRoutingRules{Rules: make([]*vschemapb.ShardRoutingRule, 0, len(srr))} for from, to := range srr { - arr := strings.Split(from, ".") - fromKeyspace := arr[0] - shard := arr[1] + fromKeyspace, shard := ParseShardRoutingRuleKey(from) srs.Rules = append(srs.Rules, &vschemapb.ShardRoutingRule{ FromKeyspace: fromKeyspace, ToKeyspace: to, diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index e0482dedeb0..c73f0a31d6c 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -931,7 +931,7 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { } func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access accessType) error { - return ts.ForAllSources(func(source *MigrationSource) error { + err := ts.ForAllSources(func(source *MigrationSource) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables()) }); err != nil { @@ -946,6 +946,14 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a } return err }) + if err != nil { + log.Warningf("Error in changeTableSourceWrites: %s", err) + return err + } + // Note that the denied tables, which are being updated in this method, are not part of the SrvVSchema in the topo. + // However, we are using the notification of a SrvVSchema change in VTGate to recompute the state of a + // MoveTables workflow (which also looks up denied tables from the topo). So we need to trigger a SrvVSchema change here. + return ts.TopoServer().RebuildSrvVSchema(ctx, nil) } func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { diff --git a/go/vt/vtgate/buffer/buffer.go b/go/vt/vtgate/buffer/buffer.go index 3db19f68b98..622bb03b082 100644 --- a/go/vt/vtgate/buffer/buffer.go +++ b/go/vt/vtgate/buffer/buffer.go @@ -28,6 +28,7 @@ package buffer import ( "context" + "strings" "sync" "golang.org/x/sync/semaphore" @@ -65,12 +66,61 @@ const ( // currently retried. type RetryDoneFunc context.CancelFunc +const ( + ClusterEventReshardingInProgress = "current keyspace is being resharded" + ClusterEventReparentInProgress = "primary is not serving, there may be a reparent operation in progress" + ClusterEventMoveTables = "disallowed due to rule" +) + +var ClusterEvents []string + +func init() { + ClusterEvents = []string{ + ClusterEventReshardingInProgress, + ClusterEventReparentInProgress, + ClusterEventMoveTables, + } +} + // CausedByFailover returns true if "err" was supposedly caused by a failover. // To simplify things, we've merged the detection for different MySQL flavors // in one function. Supported flavors: MariaDB, MySQL func CausedByFailover(err error) bool { log.V(2).Infof("Checking error (type: %T) if it is caused by a failover. err: %v", err, err) - return vterrors.Code(err) == vtrpcpb.Code_CLUSTER_EVENT + reason, isFailover := isFailoverError(err) + if isFailover { + log.Infof("CausedByFailover signalling failover for reason: %s", reason) + } + return isFailover +} + +// for debugging purposes +func getReason(err error) string { + for _, ce := range ClusterEvents { + if strings.Contains(err.Error(), ce) { + return ce + } + } + return "" +} + +// isFailoverError looks at the error returned by the sql query execution to check if there is a cluster event +// (caused by resharding or reparenting) or a denied tables error seen during switch writes in MoveTables +func isFailoverError(err error) (string, bool) { + var reason string + var isFailover bool + switch vterrors.Code(err) { + case vtrpcpb.Code_CLUSTER_EVENT: + isFailover = true + case vtrpcpb.Code_FAILED_PRECONDITION: + if strings.Contains(err.Error(), ClusterEventMoveTables) { + isFailover = true + } + } + if isFailover { + reason = getReason(err) + } + return reason, isFailover } // Buffer is used to track ongoing PRIMARY tablet failovers and buffer @@ -138,7 +188,6 @@ func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, requestsSkipped.Add([]string{keyspace, shard, skippedDisabled}, 1) return nil, nil } - return sb.waitForFailoverEnd(ctx, keyspace, shard, err) } @@ -146,7 +195,7 @@ func (b *Buffer) HandleKeyspaceEvent(ksevent *discovery.KeyspaceEvent) { for _, shard := range ksevent.Shards { sb := b.getOrCreateBuffer(shard.Target.Keyspace, shard.Target.Shard) if sb != nil { - sb.recordKeyspaceEvent(shard.Tablet, shard.Serving) + sb.recordKeyspaceEvent(shard.Tablet, shard.Serving, ksevent) } } } diff --git a/go/vt/vtgate/buffer/flags.go b/go/vt/vtgate/buffer/flags.go index 742a5d5d412..a17cc09ccc3 100644 --- a/go/vt/vtgate/buffer/flags.go +++ b/go/vt/vtgate/buffer/flags.go @@ -70,9 +70,6 @@ func verifyFlags() error { if bufferSize < 1 { return fmt.Errorf("--buffer_size must be >= 1 (specified value: %d)", bufferSize) } - if bufferMinTimeBetweenFailovers < bufferMaxFailoverDuration*time.Duration(2) { - return fmt.Errorf("--buffer_min_time_between_failovers should be at least twice the length of --buffer_max_failover_duration: %v vs. %v", bufferMinTimeBetweenFailovers, bufferMaxFailoverDuration) - } if bufferDrainConcurrency < 1 { return fmt.Errorf("--buffer_drain_concurrency must be >= 1 (specified value: %d)", bufferDrainConcurrency) @@ -165,6 +162,16 @@ func NewDefaultConfig() *Config { } } +// EnableBuffering is used in tests where we require the keyspace event watcher to be created +func EnableBuffering() { + bufferEnabled = true +} + +// DisableBuffering is the counterpart of EnableBuffering +func DisableBuffering() { + bufferEnabled = false +} + func NewConfigFromFlags() *Config { if err := verifyFlags(); err != nil { log.Fatalf("Invalid buffer configuration: %v", err) diff --git a/go/vt/vtgate/buffer/shard_buffer.go b/go/vt/vtgate/buffer/shard_buffer.go index 1b829cb3ddd..321e9b90f73 100644 --- a/go/vt/vtgate/buffer/shard_buffer.go +++ b/go/vt/vtgate/buffer/shard_buffer.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/vtgate/errorsanitizer" "vitess.io/vitess/go/vt/log" @@ -476,11 +478,12 @@ func (sb *shardBuffer) remove(toRemove *entry) { // Entry was already removed. Keep the queue as it is. } -func (sb *shardBuffer) recordKeyspaceEvent(alias *topodatapb.TabletAlias, stillServing bool) { +func (sb *shardBuffer) recordKeyspaceEvent(alias *topodatapb.TabletAlias, stillServing bool, keyspaceEvent *discovery.KeyspaceEvent) { sb.mu.Lock() defer sb.mu.Unlock() - log.Infof("disruption in shard %s/%s resolved (serving: %v)", sb.keyspace, sb.shard, stillServing) + log.Infof("disruption in shard %s/%s resolved (serving: %v), movetable state %#v", + sb.keyspace, sb.shard, stillServing, keyspaceEvent.MoveTablesState) if !topoproto.TabletAliasEqual(alias, sb.currentPrimary) { if sb.currentPrimary != nil { @@ -488,11 +491,26 @@ func (sb *shardBuffer) recordKeyspaceEvent(alias *topodatapb.TabletAlias, stillS } sb.currentPrimary = alias } - if stillServing { - sb.stopBufferingLocked(stopFailoverEndDetected, "a primary promotion has been detected") - } else { - sb.stopBufferingLocked(stopShardMissing, "the keyspace has been resharded") + var reason stopReason + var msg string + + // heuristically determine the reason why vtgate is currently buffering + moveTablesSwitched := false + if keyspaceEvent.MoveTablesState.State == discovery.MoveTablesSwitched { + moveTablesSwitched = true + } + switch { + case moveTablesSwitched: + reason = stopMoveTablesSwitchingTraffic + msg = stopMoveTablesSwitchingTrafficMessage + case stillServing: + reason = stopFailoverEndDetected + msg = stopFailoverEndDetectedMessage + default: + reason = stopShardMissing + msg = stopShardMissingMessage } + sb.stopBufferingLocked(reason, msg) } func (sb *shardBuffer) recordExternallyReparentedTimestamp(timestamp int64, alias *topodatapb.TabletAlias) { @@ -569,7 +587,8 @@ func (sb *shardBuffer) stopBufferingLocked(reason stopReason, details string) { if sb.mode == bufferModeDryRun { msg = "Dry-run: Would have stopped buffering" } - log.Infof("%v for shard: %s after: %.1f seconds due to: %v. Draining %d buffered requests now.", msg, topoproto.KeyspaceShardString(sb.keyspace, sb.shard), d.Seconds(), details, len(q)) + log.Infof("%v for shard: %s after: %.1f seconds due to: %v. Draining %d buffered requests now.", + msg, topoproto.KeyspaceShardString(sb.keyspace, sb.shard), d.Seconds(), details, len(q)) var clientEntryError error if reason == stopShardMissing { diff --git a/go/vt/vtgate/buffer/variables.go b/go/vt/vtgate/buffer/variables.go index b4b036b0775..af99cb52220 100644 --- a/go/vt/vtgate/buffer/variables.go +++ b/go/vt/vtgate/buffer/variables.go @@ -112,6 +112,11 @@ const ( stopFailoverEndDetected stopReason = "NewPrimarySeen" stopMaxFailoverDurationExceeded stopReason = "MaxDurationExceeded" stopShutdown stopReason = "Shutdown" + stopMoveTablesSwitchingTraffic stopReason = "MoveTablesSwitchedTraffic" + + stopMoveTablesSwitchingTrafficMessage = "MoveTables has switched writes" + stopFailoverEndDetectedMessage = "a primary promotion has been detected" + stopShardMissingMessage = "the keyspace has been resharded" ) // evictedReason is used in "requestsEvicted" as "Reason" label. diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 91e102b6a49..39dc1d5105c 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -3011,6 +3011,8 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) { } executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_Gen4) + // some sleep for all goroutines to start + time.Sleep(100 * time.Millisecond) before := runtime.NumGoroutine() query := "select id, col from user order by id limit 2" @@ -3867,7 +3869,7 @@ func TestSelectCFC(t *testing.T) { _, err := executor.Execute(context.Background(), nil, "TestSelectCFC", session, "select /*vt+ PLANNER=gen4 */ c2 from tbl_cfc where c1 like 'A%'", nil) require.NoError(t, err) - timeout := time.After(10 * time.Second) + timeout := time.After(30 * time.Second) for { select { case <-timeout: diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index bf7f1eac211..4710cb8f53c 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -35,6 +35,8 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/vt/vtgate/buffer" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/cache" @@ -2074,6 +2076,9 @@ func TestExecutorClearsWarnings(t *testing.T) { // TestServingKeyspaces tests that the dual queries are routed to the correct keyspaces from the list of serving keyspaces. func TestServingKeyspaces(t *testing.T) { + buffer.EnableBuffering() + defer buffer.DisableBuffering() + executor, sbc1, _, sbclookup := createExecutorEnv() executor.pv = querypb.ExecuteOptions_Gen4 gw, ok := executor.resolver.resolver.GetGateway().(*TabletGateway) diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 6ec72d919cb..5d2414ac275 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -18,10 +18,12 @@ package vtgate import ( "context" + "fmt" "strings" "time" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" @@ -34,6 +36,25 @@ import ( type planExec func(ctx context.Context, plan *engine.Plan, vc *vcursorImpl, bindVars map[string]*querypb.BindVariable, startTime time.Time) error type txResult func(sqlparser.StatementType, *sqltypes.Result) error +func waitForNewerVSchema(ctx context.Context, e *Executor, lastVSchemaCreated time.Time) bool { + timeout := 30 * time.Second + pollingInterval := 10 * time.Millisecond + waitCtx, cancel := context.WithTimeout(ctx, timeout) + ticker := time.NewTicker(pollingInterval) + defer ticker.Stop() + defer cancel() + for { + select { + case <-waitCtx.Done(): + return false + case <-ticker.C: + if e.VSchema().GetCreated().After(lastVSchemaCreated) { + return true + } + } + } +} + func (e *Executor) newExecute( ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, @@ -57,10 +78,6 @@ func (e *Executor) newExecute( } query, comments := sqlparser.SplitMarginComments(sql) - vcursor, err := newVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv) - if err != nil { - return err - } // 2: Parse and Validate query stmt, reservedVars, err := parseAndValidateQuery(query) @@ -68,47 +85,89 @@ func (e *Executor) newExecute( return err } - // 3: Create a plan for the query - plan, err := e.getPlan(ctx, vcursor, query, stmt, comments, bindVars, reservedVars, e.normalize, logStats) - execStart := e.logPlanningFinished(logStats, plan) + var lastVSchemaCreated time.Time + vs := e.VSchema() + lastVSchemaCreated = vs.GetCreated() + for try := 0; try < MaxBufferingRetries; try++ { + if try > 0 && !vs.GetCreated().After(lastVSchemaCreated) { + // There is a race due to which the executor's vschema may not have been updated yet. + // Without a wait we fail non-deterministically since the previous vschema will not have the updated routing rules + if waitForNewerVSchema(ctx, e, lastVSchemaCreated) { + vs = e.VSchema() + } + } - if err != nil { - safeSession.ClearWarnings() - return err - } + vcursor, err := newVCursorImpl(safeSession, comments, e, logStats, e.vm, vs, e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv) + if err != nil { + return err + } - if plan.Type != sqlparser.StmtShow { - safeSession.ClearWarnings() - } + // 3: Create a plan for the query + // If we are retrying, it is likely that the routing rules have changed and hence we need to + // replan the query since the target keyspace of the resolved shards may have changed as a + // result of MoveTables. So we cannot reuse the plan from the first try. + // When buffering ends, many queries might be getting planned at the same time. Ideally we + // should be able to reuse plans once the first drained query has been planned. For now, we + // punt on this and choose not to prematurely optimize since it is not clear how much caching + // will help and if it will result in hard-to-track edge cases. + + var plan *engine.Plan + plan, err = e.getPlan(ctx, vcursor, query, stmt, comments, bindVars, reservedVars, e.normalize, logStats) + execStart := e.logPlanningFinished(logStats, plan) + + if err != nil { + safeSession.ClearWarnings() + return err + } - // add any warnings that the planner wants to add - for _, warning := range plan.Warnings { - safeSession.RecordWarning(warning) - } + if plan.Type != sqlparser.StmtShow { + safeSession.ClearWarnings() + } - result, err := e.handleTransactions(ctx, mysqlCtx, safeSession, plan, logStats, vcursor, stmt) - if err != nil { - return err - } - if result != nil { - return recResult(plan.Type, result) - } + // add any warnings that the planner wants to add + for _, warning := range plan.Warnings { + safeSession.RecordWarning(warning) + } - // 3: Prepare for execution - err = e.addNeededBindVars(vcursor, plan.BindVarNeeds, bindVars, safeSession) - if err != nil { - logStats.Error = err - return err - } + result, err := e.handleTransactions(ctx, mysqlCtx, safeSession, plan, logStats, vcursor, stmt) + if err != nil { + return err + } + if result != nil { + return recResult(plan.Type, result) + } - if plan.Instructions.NeedsTransaction() { - return e.insideTransaction(ctx, safeSession, logStats, - func() error { - return execPlan(ctx, plan, vcursor, bindVars, execStart) - }) - } + // 4: Prepare for execution + err = e.addNeededBindVars(vcursor, plan.BindVarNeeds, bindVars, safeSession) + if err != nil { + logStats.Error = err + return err + } + + // 5: Execute the plan and retry if needed + if plan.Instructions.NeedsTransaction() { + err = e.insideTransaction(ctx, safeSession, logStats, + func() error { + return execPlan(ctx, plan, vcursor, bindVars, execStart) + }) + } else { + err = execPlan(ctx, plan, vcursor, bindVars, execStart) + } + + if err == nil || safeSession.InTransaction() { + return err + } + + rootCause := vterrors.RootCause(err) + if rootCause != nil && strings.Contains(rootCause.Error(), "enforce denied tables") { + log.V(2).Infof("Retry: %d, will retry query %s due to %v", try, query, err) + lastVSchemaCreated = vs.GetCreated() + continue + } - return execPlan(ctx, plan, vcursor, bindVars, execStart) + return err + } + return vterrors.New(vtrpcpb.Code_INTERNAL, fmt.Sprintf("query %s failed after retries: %v ", query, err)) } // handleTransactions deals with transactional queries: begin, commit, rollback and savepoint management diff --git a/go/vt/vtgate/planbuilder/bypass.go b/go/vt/vtgate/planbuilder/bypass.go index a5490e2231e..52286816a11 100644 --- a/go/vt/vtgate/planbuilder/bypass.go +++ b/go/vt/vtgate/planbuilder/bypass.go @@ -30,7 +30,6 @@ func buildPlanForBypass(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, vsc if err != nil { return nil, err } - switch dest := vschema.Destination().(type) { case key.DestinationExactKeyRange: if _, ok := stmt.(*sqlparser.Insert); ok { diff --git a/go/vt/vtgate/sandbox_test.go b/go/vt/vtgate/sandbox_test.go index 4197e6ef231..91ae9589591 100644 --- a/go/vt/vtgate/sandbox_test.go +++ b/go/vt/vtgate/sandbox_test.go @@ -19,6 +19,8 @@ package vtgate import ( "context" "fmt" + "hash/fnv" + "strconv" "sync" "vitess.io/vitess/go/json2" @@ -284,6 +286,16 @@ func (sct *sandboxTopo) WatchSrvKeyspace(ctx context.Context, cell, keyspace str // panic("not supported: WatchSrvKeyspace") } +func hash(s string) uint32 { + h := fnv.New32a() + h.Write([]byte(s)) + return h.Sum32() +} + +func GetSrvVSchemaHash(vs *vschemapb.SrvVSchema) string { + return strconv.Itoa(int(hash(vs.String()))) +} + // WatchSrvVSchema is part of the srvtopo.Server interface. // // If the sandbox was created with a backing topo service, piggy back on it @@ -302,11 +314,24 @@ func (sct *sandboxTopo) WatchSrvVSchema(ctx context.Context, cell string, callba if !callback(current.Value, nil) { panic("sandboxTopo callback returned false") } + currentHash := GetSrvVSchemaHash(current.Value) go func() { for { - update := <-updateChan - if !callback(update.Value, update.Err) { - panic("sandboxTopo callback returned false") + select { + case <-ctx.Done(): + return + case update := <-updateChan: + newHash := GetSrvVSchemaHash(update.Value) + if newHash == currentHash { + // sometimes we get the same update multiple times. This results in the plan cache to be cleared + // causing tests to fail. So we just ignore the duplicate updates. + continue + } + currentHash = newHash + if !callback(update.Value, update.Err) { + panic("sandboxTopo callback returned false") + } + } } }() diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 1bba8a6a2f1..b468543e24d 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math/rand" + "runtime/debug" "sort" "sync" "sync/atomic" @@ -117,6 +118,10 @@ func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtop func (gw *TabletGateway) setupBuffering(ctx context.Context) { cfg := buffer.NewConfigFromFlags() + if !cfg.Enabled { + log.Info("Query buffering is disabled") + return + } gw.buffer = buffer.New(cfg) gw.kev = discovery.NewKeyspaceEventWatcher(ctx, gw.srvTopoServer, gw.hc, gw.localCell) @@ -223,6 +228,7 @@ func (gw *TabletGateway) CacheStatus() TabletCacheStatusList { // withShardError should not be combined with withRetry. func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, _ queryservice.QueryService, _ string, inTransaction bool, inner func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error)) error { + // for transactions, we connect to a specific tablet instead of letting gateway choose one if inTransaction && target.TabletType != topodatapb.TabletType_PRIMARY { return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "tabletGateway's query service can only be used for non-transactional queries on replicas") @@ -246,12 +252,11 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, bufferedOnce := false for i := 0; i < gw.retryCount+1; i++ { - // Check if we should buffer PRIMARY queries which failed due to an ongoing - // failover. + // Check if we should buffer PRIMARY queries which failed due to an ongoing failover. // Note: We only buffer once and only "!inTransaction" queries i.e. // a) no transaction is necessary (e.g. critical reads) or // b) no transaction was created yet. - if !bufferedOnce && !inTransaction && target.TabletType == topodatapb.TabletType_PRIMARY { + if gw.buffer != nil && !bufferedOnce && !inTransaction && target.TabletType == topodatapb.TabletType_PRIMARY { // The next call blocks if we should buffer during a failover. retryDone, bufferErr := gw.buffer.WaitForFailoverEnd(ctx, target.Keyspace, target.Shard, err) @@ -277,12 +282,13 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, // or if a reparent operation is in progress. if kev := gw.kev; kev != nil { if kev.TargetIsBeingResharded(target) { - err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, "current keyspace is being resharded") + log.V(2).Infof("current keyspace is being resharded, retrying: %s: %s", target.Keyspace, debug.Stack()) + err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, buffer.ClusterEventReshardingInProgress) continue } primary, notServing := kev.PrimaryIsNotServing(target) if notServing { - err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, "primary is not serving, there may be a reparent operation in progress") + err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, buffer.ClusterEventReparentInProgress) continue } // if primary is serving, but we initially found no tablet, we're in an inconsistent state @@ -297,6 +303,7 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, err = vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "no healthy tablet available for '%s'", target.String()) break } + gw.shuffleTablets(gw.localCell, tablets) var th *discovery.TabletHealth diff --git a/go/vt/vtgate/vindexes/vschema.go b/go/vt/vtgate/vindexes/vschema.go index 90becdf275f..c044c6a3151 100644 --- a/go/vt/vtgate/vindexes/vschema.go +++ b/go/vt/vtgate/vindexes/vschema.go @@ -23,6 +23,7 @@ import ( "os" "sort" "strings" + "time" "vitess.io/vitess/go/sqlescape" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" @@ -69,6 +70,9 @@ type VSchema struct { uniqueVindexes map[string]Vindex Keyspaces map[string]*KeyspaceSchema `json:"keyspaces"` ShardRoutingRules map[string]string `json:"shard_routing_rules"` + // created is the time when the VSchema object was created. Used to detect if a cached + // copy of the vschema is stale. + created time.Time } // RoutingRule represents one routing rule. @@ -259,6 +263,7 @@ func BuildVSchema(source *vschemapb.SrvVSchema) (vschema *VSchema) { globalTables: make(map[string]*Table), uniqueVindexes: make(map[string]Vindex), Keyspaces: make(map[string]*KeyspaceSchema), + created: time.Now(), } buildKeyspaces(source, vschema) // buildGlobalTables before buildReferences so that buildReferences can @@ -1147,6 +1152,17 @@ func (vschema *VSchema) FindRoutedShard(keyspace, shard string) (string, error) return keyspace, nil } +// GetCreated returns the time when the VSchema was created. +func (vschema *VSchema) GetCreated() time.Time { + return vschema.created +} + +// ResetCreated resets the created time to zero value. +// Used only in tests where vschema protos are compared. +func (vschema *VSchema) ResetCreated() { + vschema.created = time.Time{} +} + // ByCost provides the interface needed for ColumnVindexes to // be sorted by cost order. type ByCost []*ColumnVindex diff --git a/go/vt/vtgate/vindexes/vschema_test.go b/go/vt/vtgate/vindexes/vschema_test.go index c7257e0a633..6a58810b064 100644 --- a/go/vt/vtgate/vindexes/vschema_test.go +++ b/go/vt/vtgate/vindexes/vschema_test.go @@ -234,6 +234,14 @@ func init() { Register("mcfu", newMCFU) } +func buildVSchema(source *vschemapb.SrvVSchema) (vschema *VSchema) { + vs := BuildVSchema(source) + if vs != nil { + vs.ResetCreated() + } + return vs +} + func TestUnshardedVSchemaValid(t *testing.T) { _, err := BuildKeyspace(&vschemapb.Keyspace{ Sharded: false, @@ -1037,7 +1045,7 @@ func TestShardedVSchemaMultiColumnVindex(t *testing.T) { Columns: []string{"c1", "c2"}, Name: "stfu1"}}}}}}} - got := BuildVSchema(&good) + got := buildVSchema(&good) err := got.Keyspaces["sharded"].Error require.NoError(t, err) ks := &Keyspace{ @@ -1106,7 +1114,7 @@ func TestShardedVSchemaNotOwned(t *testing.T) { Name: "stlu1"}, { Column: "c2", Name: "stfu1"}}}}}}} - got := BuildVSchema(&good) + got := buildVSchema(&good) err := got.Keyspaces["sharded"].Error require.NoError(t, err) ks := &Keyspace{ @@ -1236,7 +1244,7 @@ func TestBuildVSchemaDupSeq(t *testing.T) { Name: "ksa"} ksb := &Keyspace{ Name: "ksb"} - got := BuildVSchema(&good) + got := buildVSchema(&good) t1a := &Table{ Name: sqlparser.NewIdentifierCS("t1"), Keyspace: ksa, @@ -1291,7 +1299,7 @@ func TestBuildVSchemaDupTable(t *testing.T) { }, }, } - got := BuildVSchema(&good) + got := buildVSchema(&good) ksa := &Keyspace{ Name: "ksa", } @@ -1383,7 +1391,7 @@ func TestBuildVSchemaDupVindex(t *testing.T) { }, }, } - got := BuildVSchema(&good) + got := buildVSchema(&good) err := got.Keyspaces["ksa"].Error err1 := got.Keyspaces["ksb"].Error require.NoError(t, err) @@ -1959,7 +1967,7 @@ func TestSequence(t *testing.T) { }, }, } - got := BuildVSchema(&good) + got := buildVSchema(&good) err := got.Keyspaces["sharded"].Error require.NoError(t, err) err1 := got.Keyspaces["unsharded"].Error diff --git a/go/vt/vtgate/vschema_manager_test.go b/go/vt/vtgate/vschema_manager_test.go index 4aa81442f38..a52338b0fff 100644 --- a/go/vt/vtgate/vschema_manager_test.go +++ b/go/vt/vtgate/vschema_manager_test.go @@ -100,6 +100,7 @@ func TestVSchemaUpdate(t *testing.T) { var vs *vindexes.VSchema vm.subscriber = func(vschema *vindexes.VSchema, _ *VSchemaStats) { vs = vschema + vs.ResetCreated() } for _, tcase := range tcases { t.Run(tcase.name, func(t *testing.T) { @@ -199,6 +200,7 @@ func TestRebuildVSchema(t *testing.T) { var vs *vindexes.VSchema vm.subscriber = func(vschema *vindexes.VSchema, _ *VSchemaStats) { vs = vschema + vs.ResetCreated() } for _, tcase := range tcases { t.Run(tcase.name, func(t *testing.T) { @@ -229,6 +231,7 @@ func makeTestVSchema(ks string, sharded bool, tbls map[string]*vindexes.Table) * } vs := makeTestEmptyVSchema() vs.Keyspaces[ks] = keyspaceSchema + vs.ResetCreated() return vs } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index e3f45283ee2..6be857ae620 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -1159,7 +1159,7 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { } func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access accessType) error { - return ts.ForAllSources(func(source *workflow.MigrationSource) error { + err := ts.ForAllSources(func(source *workflow.MigrationSource) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables()) }); err != nil { @@ -1174,6 +1174,14 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a } return err }) + if err != nil { + log.Warningf("Error in changeTableSourceWrites: %s", err) + return err + } + // Note that the denied tables, which are being updated in this method, are not part of the SrvVSchema in the topo. + // However, we are using the notification of a SrvVSchema change in VTGate to recompute the state of a + // MoveTables workflow (which also looks up denied tables from the topo). So we need to trigger a SrvVSchema change here. + return ts.TopoServer().RebuildSrvVSchema(ctx, nil) } // executeLockTablesOnSource executes a LOCK TABLES tb1 READ, tbl2 READ,... statement on each @@ -1509,7 +1517,6 @@ func (ts *trafficSwitcher) changeWriteRoute(ctx context.Context) error { return err } } - return ts.TopoServer().RebuildSrvVSchema(ctx, nil) } diff --git a/test/config.json b/test/config.json index fe18772714f..c0f0b0825f8 100644 --- a/test/config.json +++ b/test/config.json @@ -1004,6 +1004,15 @@ "RetryMax": 0, "Tags": [] }, + "vreplication_movetables_buffering": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMoveTablesBuffering"], + "Command": [], + "Manual": false, + "Shard": "vreplication_cellalias", + "RetryMax": 0, + "Tags": [] + }, "vreplication_vschema_load": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVSchemaChangesUnderLoad"],