Skip to content

Commit

Permalink
Support throttling vstreamer copy table work on source tablets (#9923)
Browse files Browse the repository at this point in the history
* Support throttling vstream work on source tablets

This provides mechanisms to limit the impact of large vreplication
workflows on production source tablets.

It supports throttling based on the InnoDB history list length and
the replication lag seen.

Signed-off-by: Matt Lord <[email protected]>

* Add unit and e2e test for copy table throttling

Signed-off-by: Matt Lord <[email protected]>

* Use shorter internal name for max replica lag seconds

Signed-off-by: Matt Lord <[email protected]>

* Use consistent case for json vars in config test

Signed-off-by: Matt Lord <[email protected]>

* Add MySQL specific context to flags

Signed-off-by: Matt Lord <[email protected]>

* Add copy table row streamer stats

Signed-off-by: Matt Lord <[email protected]>

* Rearrange things to wait for MySQL in initial row stream and copy phase cycles

Signed-off-by: Matt Lord <[email protected]>

* Correct & improve on e2e and unit tests

Signed-off-by: Matt Lord <[email protected]>

* Update test config

I spent way too much time trying to get both tests to work within
the same go test run... giving up for now.

Signed-off-by: Matt Lord <[email protected]>

* Adding additional stats work

Signed-off-by: Matt Lord <[email protected]>

* Add ability to modify the RowStreamerConfig at runtime via /debug/env

Signed-off-by: Matt Lord <[email protected]>

* Add current RowStreamerConfig values to /debug/vars

Signed-off-by: Matt Lord <[email protected]>

* Add waitingForMySQL stats to vreplication phase timings

Signed-off-by: Matt Lord <[email protected]>

* Track waitForMySQL by table on vstreamer side

Signed-off-by: Matt Lord <[email protected]>

* Move row streamer config export to vstreamer engine

And make it a gauge so that it's always showing the current value
as we allow this to be changed in the running process via /debug/env.

Signed-off-by: Matt Lord <[email protected]>

* Minor changes after self review

Signed-off-by: Matt Lord <[email protected]>

* Shorten/generalize InnoDB row history test func names

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Apr 4, 2022
1 parent 4632263 commit 3d4f400
Show file tree
Hide file tree
Showing 16 changed files with 441 additions and 15 deletions.
8 changes: 6 additions & 2 deletions go/test/endtoend/cluster/topo_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,12 @@ func (topo *TopoProcess) IsHealthy() bool {
}

func (topo *TopoProcess) removeTopoDirectories(Cell string) {
_ = topo.ManageTopoDir("rmdir", "/vitess/global")
_ = topo.ManageTopoDir("rmdir", "/vitess/"+Cell)
if err := topo.ManageTopoDir("rmdir", "/vitess/global"); err != nil {
log.Errorf("Failed to remove global topo directory: %v", err)
}
if err := topo.ManageTopoDir("rmdir", "/vitess/"+Cell); err != nil {
log.Errorf("Failed to remove local topo directory: %v", err)
}
}

// ManageTopoDir creates global and zone in etcd2
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/cluster/vtctlclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type VtctlClientParams struct {
MigrationContext string
SkipPreflight bool
UUIDList string
CallerId string
CallerID string
}

// InitShardPrimary executes vtctlclient command to make specified tablet the primary for the shard.
Expand Down Expand Up @@ -91,8 +91,8 @@ func (vtctlclient *VtctlClientProcess) ApplySchemaWithOutput(Keyspace string, SQ
args = append(args, "--skip_preflight")
}

if params.CallerId != "" {
args = append(args, "--caller_id", params.CallerId)
if params.CallerID != "" {
args = append(args, "--caller_id", params.CallerID)
}
args = append(args, Keyspace)
return vtctlclient.ExecuteCommandWithOutput(args...)
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/onlineddl/ghost/onlineddl_ghost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str
}
} else {
var err error
uuid, err = clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, sqlQuery, cluster.VtctlClientParams{DDLStrategy: ddlStrategy, CallerId: callerID})
uuid, err = clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, sqlQuery, cluster.VtctlClientParams{DDLStrategy: ddlStrategy, CallerID: callerID})
assert.NoError(t, err)
}
uuid = strings.TrimSpace(uuid)
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ var (
externalClusterConfig *ClusterConfig
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms"}
extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"}
// This variable can be used within specific tests to alter vttablet behavior
extraVTTabletArgs = []string{}
)

// ClusterConfig defines the parameters like ports, tmpDir, tablet types which uniquely define a vitess cluster
Expand Down Expand Up @@ -406,6 +408,7 @@ func (vc *VitessCluster) AddTablet(t testing.TB, cell *Cell, keyspace *Keyspace,
"--heartbeat_enable",
"--heartbeat_interval", "250ms",
} //FIXME: for multi-cell initial schema doesn't seem to load without "--queryserver-config-schema-reload-time"
options = append(options, extraVTTabletArgs...)

if mainClusterConfig.vreplicationCompressGTID {
options = append(options, "--vreplication_store_compressed_gtid=true")
Expand Down
115 changes: 114 additions & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import (
"testing"
"time"

"github.com/buger/jsonparser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/buger/jsonparser"
"github.com/tidwall/gjson"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
Expand Down Expand Up @@ -103,6 +105,43 @@ func throttlerCheckSelf(tablet *cluster.VttabletProcess, app string) (resp *http
return resp, respBody, err
}

func TestVreplicationCopyThrottling(t *testing.T) {
workflow := "copy-throttling"
cell := "zone1"
table := "customer"
shard := "0"
vc = NewVitessCluster(t, "TestVreplicationCopyThrottling", []string{cell}, mainClusterConfig)
defer vc.TearDown(t)
defaultCell = vc.Cells[cell]
// To test vstreamer source throttling for the MoveTables operation
maxSourceTrxHistory := 5
extraVTTabletArgs = []string{
fmt.Sprintf("--vreplication_copy_phase_max_innodb_history_list_length=%d", maxSourceTrxHistory),
}

if _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, sourceKs, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil); err != nil {
t.Fatal(err)
}
if _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, targetKs, shard, "", "", 0, 0, 200, nil); err != nil {
t.Fatal(err)
}
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", sourceKs, shard), 1)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", targetKs, shard), 1)

// Confirm that the initial copy table phase does not proceed until the source tablet(s)
// have an InnoDB History List length that is less than specified in the tablet's config.
// We update rows in a table not part of the MoveTables operation so that we're not blocking
// on the LOCK TABLE call but rather the InnoDB History List length.
trxConn := generateInnoDBRowHistory(t, sourceKs, maxSourceTrxHistory)
// We need to force primary tablet types as the history list has been increased on the source primary
moveTablesWithTabletTypes(t, defaultCell.Name, workflow, sourceKs, targetKs, table, "primary")
verifySourceTabletThrottling(t, targetKs, workflow)
releaseInnoDBRowHistory(t, trxConn)
trxConn.Close()
}

func TestBasicVreplicationWorkflow(t *testing.T) {
sourceKsOpts["DBTypeVersion"] = "mysql-5.7"
targetKsOpts["DBTypeVersion"] = "mysql-5.7"
Expand Down Expand Up @@ -503,6 +542,40 @@ func validateRollupReplicates(t *testing.T) {
})
}

func verifySourceTabletThrottling(t *testing.T, targetKS, workflow string) {
tDuration := time.Duration(15 * time.Second)
ticker := time.NewTicker(5 * time.Second)
timer := time.NewTimer(tDuration)
ksWorkflow := fmt.Sprintf("%s.%s", targetKS, workflow)
for {
select {
case <-ticker.C:
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
require.NoError(t, err)
result := gjson.Get(output, "ShardStatuses")
result.ForEach(func(tabletId, tabletStreams gjson.Result) bool { // for each source tablet
tabletStreams.ForEach(func(streamId, streamInfos gjson.Result) bool { // for each stream
if streamId.String() == "PrimaryReplicationStatuses" {
streamInfos.ForEach(func(attributeKey, attributeValue gjson.Result) bool { // for each attribute in the stream
state := attributeValue.Get("State").String()
if state != "Copying" {
require.FailNowf(t, "Unexpected running workflow stream",
"Initial copy phase for the MoveTables workflow %s started in less than %d seconds when it should have been waiting. Show output: %s",
ksWorkflow, int(tDuration.Seconds()), output)
}
return true // end attribute loop
})
}
return true // end stream loop
})
return true // end tablet loop
})
case <-timer.C:
return
}
}
}

func reshardCustomer2to4Split(t *testing.T, cells []*Cell, sourceCellOrAlias string) {
t.Run("reshardCustomer2to4Split", func(t *testing.T) {
ksName := "customer"
Expand Down Expand Up @@ -991,6 +1064,12 @@ func moveTables(t *testing.T, cell, workflow, sourceKs, targetKs, tables string)
t.Fatalf("MoveTables command failed with %+v\n", err)
}
}
func moveTablesWithTabletTypes(t *testing.T, cell, workflow, sourceKs, targetKs, tables string, tabletTypes string) {
if err := vc.VtctlClient.ExecuteCommand("MoveTables", "--", "--v1", "--cells="+cell, "--workflow="+workflow,
"--tablet_types="+tabletTypes, sourceKs, targetKs, tables); err != nil {
t.Fatalf("MoveTables command failed with %+v\n", err)
}
}
func applyVSchema(t *testing.T, vschema, keyspace string) {
err := vc.VtctlClient.ExecuteCommand("ApplyVSchema", "--", "--vschema", vschema, keyspace)
require.NoError(t, err)
Expand Down Expand Up @@ -1074,3 +1153,37 @@ func dropSources(t *testing.T, ksWorkflow string) {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("DropSources", ksWorkflow)
require.NoError(t, err, fmt.Sprintf("DropSources Error: %s: %s", err, output))
}

// generateInnoDBRowHistory generates at least maxSourceTrxHistory rollback segment entries.
// This allows us to confirm two behaviors:
// 1. MoveTables blocks on starting its first copy phase until we rollback
// 2. All other workflows continue to work w/o issue with this MVCC history in place (not used yet)
// Returns a db connection used for the transaction which you can use for follow-up
// work, such as rolling it back directly or using the releaseInnoDBRowHistory call.
func generateInnoDBRowHistory(t *testing.T, sourceKS string, neededTrxHistory int) *mysql.Conn {
dbConn1 := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
dbConn2 := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
execQuery(t, dbConn1, "use "+sourceKS)
execQuery(t, dbConn2, "use "+sourceKS)
offset := 1000
insertStmt := strings.Builder{}
for i := offset; i <= (neededTrxHistory*10)+offset; i++ {
if i == offset {
insertStmt.WriteString(fmt.Sprintf("insert into product (pid, description) values (%d, 'test')", i))
} else {
insertStmt.WriteString(fmt.Sprintf(", (%d, 'test')", i))
}
}
execQuery(t, dbConn2, "start transaction")
execQuery(t, dbConn2, "select count(*) from product")
execQuery(t, dbConn1, insertStmt.String())
execQuery(t, dbConn2, "rollback")
execQuery(t, dbConn2, "start transaction")
execQuery(t, dbConn2, "select count(*) from product")
execQuery(t, dbConn1, fmt.Sprintf("delete from product where pid >= %d and pid < %d", offset, offset+10000))
return dbConn2
}

func releaseInnoDBRowHistory(t *testing.T, dbConn *mysql.Conn) {
execQuery(t, dbConn, "rollback")
}
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func TestPlayerCopyBigTable(t *testing.T) {
defer func() { *copyPhaseDuration = savedCopyPhaseDuration }()

savedWaitRetryTime := waitRetryTime
// waitRetry time shoulw be very low to cause the wait loop to execute multipel times.
// waitRetry time should be very low to cause the wait loop to execute multiple times.
waitRetryTime = 10 * time.Millisecond
defer func() { waitRetryTime = savedWaitRetryTime }()

Expand Down Expand Up @@ -676,7 +676,7 @@ func TestPlayerCopyWildcardRule(t *testing.T) {
defer func() { *copyPhaseDuration = savedCopyPhaseDuration }()

savedWaitRetryTime := waitRetryTime
// waitRetry time shoulw be very low to cause the wait loop to execute multipel times.
// waitRetry time should be very low to cause the wait loop to execute multipel times.
waitRetryTime = 10 * time.Millisecond
defer func() { waitRetryTime = savedWaitRetryTime }()

Expand Down
21 changes: 21 additions & 0 deletions go/vt/vttablet/tabletserver/debugenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request)
f(ival)
msg = fmt.Sprintf("Setting %v to: %v", varname, value)
}
setInt64Val := func(f func(int64)) {
ival, err := strconv.ParseInt(value, 10, 64)
if err != nil {
msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err)
return
}
f(ival)
msg = fmt.Sprintf("Setting %v to: %v", varname, value)
}
setDurationVal := func(f func(time.Duration)) {
durationVal, err := time.ParseDuration(value)
if err != nil {
Expand Down Expand Up @@ -104,6 +113,10 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request)
setIntVal(tsv.SetMaxResultSize)
case "WarnResultSize":
setIntVal(tsv.SetWarnResultSize)
case "RowStreamerMaxInnoDBTrxHistLen":
setInt64Val(func(val int64) { tsv.Config().RowStreamer.MaxInnoDBTrxHistLen = val })
case "RowStreamerMaxMySQLReplLagSecs":
setInt64Val(func(val int64) { tsv.Config().RowStreamer.MaxMySQLReplLagSecs = val })
case "UnhealthyThreshold":
setDurationVal(tsv.Config().Healthcheck.UnhealthyThresholdSeconds.Set)
setDurationVal(tsv.hs.SetUnhealthyThreshold)
Expand All @@ -123,6 +136,12 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request)
Value: fmt.Sprintf("%v", f()),
})
}
addInt64Var := func(varname string, f func() int64) {
vars = append(vars, envValue{
VarName: varname,
Value: fmt.Sprintf("%v", f()),
})
}
addDurationVar := func(varname string, f func() time.Duration) {
vars = append(vars, envValue{
VarName: varname,
Expand All @@ -141,6 +160,8 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request)
addIntVar("QueryCacheCapacity", tsv.QueryPlanCacheCap)
addIntVar("MaxResultSize", tsv.MaxResultSize)
addIntVar("WarnResultSize", tsv.WarnResultSize)
addInt64Var("RowStreamerMaxInnoDBTrxHistLen", func() int64 { return tsv.Config().RowStreamer.MaxInnoDBTrxHistLen })
addInt64Var("RowStreamerMaxMySQLReplLagSecs", func() int64 { return tsv.Config().RowStreamer.MaxMySQLReplLagSecs })
addDurationVar("UnhealthyThreshold", tsv.Config().Healthcheck.UnhealthyThresholdSeconds.Get)
addFloat64Var("ThrottleMetricThreshold", tsv.ThrottleMetricThreshold)
vars = append(vars, envValue{
Expand Down
18 changes: 17 additions & 1 deletion go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ func init() {

flag.BoolVar(&enableReplicationReporter, "enable_replication_reporter", false, "Use polling to track replication lag.")
flag.BoolVar(&currentConfig.EnableOnlineDDL, "queryserver_enable_online_ddl", true, "Enable online DDL.")
flag.BoolVar(&currentConfig.SanitizeLogMessages, "sanitize_log_messages", false, "Remove potentially sensitive information in tablet INFO, WARNING, and ERROR log messages such as query parameters.")

flag.Int64Var(&currentConfig.RowStreamer.MaxInnoDBTrxHistLen, "vreplication_copy_phase_max_innodb_history_list_length", 1000000, "The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet.")
flag.Int64Var(&currentConfig.RowStreamer.MaxMySQLReplLagSecs, "vreplication_copy_phase_max_mysql_replication_lag", 43200, "The maximum MySQL replication lag (in seconds) that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet.")
}

// Init must be called after flag.Parse, and before doing any other operations.
Expand Down Expand Up @@ -287,6 +289,8 @@ type TabletConfig struct {

EnforceStrictTransTables bool `json:"-"`
EnableOnlineDDL bool `json:"-"`

RowStreamer RowStreamerConfig `json:"rowStreamer,omitempty"`
}

// ConnPoolConfig contains the config for a conn pool.
Expand Down Expand Up @@ -348,6 +352,13 @@ type TransactionLimitConfig struct {
TransactionLimitBySubcomponent bool
}

// RowStreamerConfig contains configuration parameters for a vstreamer (source) that is
// copying the contents of a table to a target
type RowStreamerConfig struct {
MaxInnoDBTrxHistLen int64 `json:"maxInnoDBTrxHistLen,omitempty"`
MaxMySQLReplLagSecs int64 `json:"maxMySQLReplLagSecs,omitempty"`
}

// NewCurrentConfig returns a copy of the current config.
func NewCurrentConfig() *TabletConfig {
return currentConfig.Clone()
Expand Down Expand Up @@ -487,6 +498,11 @@ var defaultConfig = TabletConfig{

EnforceStrictTransTables: true,
EnableOnlineDDL: true,

RowStreamer: RowStreamerConfig{
MaxInnoDBTrxHistLen: 1000000,
MaxMySQLReplLagSecs: 43200,
},
}

// defaultTxThrottlerConfig formats the default throttlerdata.Configuration
Expand Down
18 changes: 18 additions & 0 deletions go/vt/vttablet/tabletserver/tabletenv/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func TestConfigParse(t *testing.T) {
PrefillParallelism: 30,
MaxWaiters: 40,
},
RowStreamer: RowStreamerConfig{
MaxInnoDBTrxHistLen: 1000,
MaxMySQLReplLagSecs: 400,
},
}
gotBytes, err := yaml2.Marshal(&cfg)
require.NoError(t, err)
Expand Down Expand Up @@ -79,6 +83,9 @@ oltpReadPool:
size: 16
timeoutSeconds: 10
replicationTracker: {}
rowStreamer:
maxInnoDBTrxHistLen: 1000
maxMySQLReplLagSecs: 400
txPool: {}
`
assert.Equal(t, wantBytes, string(gotBytes))
Expand Down Expand Up @@ -141,6 +148,9 @@ queryCacheSize: 5000
replicationTracker:
heartbeatIntervalSeconds: 0.25
mode: disable
rowStreamer:
maxInnoDBTrxHistLen: 1000000
maxMySQLReplLagSecs: 43200
schemaReloadIntervalSeconds: 1800
signalSchemaChangeReloadIntervalSeconds: 5
streamBufferSize: 32768
Expand All @@ -165,6 +175,10 @@ func TestClone(t *testing.T) {
PrefillParallelism: 30,
MaxWaiters: 40,
},
RowStreamer: RowStreamerConfig{
MaxInnoDBTrxHistLen: 1000000,
MaxMySQLReplLagSecs: 43200,
},
}
cfg2 := cfg1.Clone()
assert.Equal(t, cfg1, cfg2)
Expand Down Expand Up @@ -216,6 +230,10 @@ func TestFlags(t *testing.T) {
EnforceStrictTransTables: true,
EnableOnlineDDL: true,
DB: &dbconfigs.DBConfigs{},
RowStreamer: RowStreamerConfig{
MaxInnoDBTrxHistLen: 1000000,
MaxMySQLReplLagSecs: 43200,
},
}
assert.Equal(t, want.DB, currentConfig.DB)
assert.Equal(t, want, currentConfig)
Expand Down
Loading

0 comments on commit 3d4f400

Please sign in to comment.