Skip to content

Commit

Permalink
Merge pull request #155 from Shopify/candidate-v15.0.3-shopify-11
Browse files Browse the repository at this point in the history
Backport: set vreplication net read and net write timeout session vars to high values
(cherry picked from commit 84ea974)
(cherry picked from commit 5cc2dfc)
  • Loading branch information
pawandubey authored and shivnagarajan committed May 8, 2024
1 parent fdefd71 commit dde1210
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 0 deletions.
2 changes: 2 additions & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ Usage of vttablet:
--vreplication_healthcheck_topology_refresh duration refresh interval for re-reading the topology (default 30s)
--vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1)
--vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence
--vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300)
--vreplication_net_write_timeout int Session value of net_write_timeout for vreplication, in seconds (default 600)
--vreplication_replica_lag_tolerance duration Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase (default 1m0s)
--vreplication_retry_delay duration delay before retrying a failed workflow event in the replication phase (default 5s)
--vreplication_store_compressed_gtid Store compressed gtids in the pos column of the sidecar database's vreplication table
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vttablet/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,18 @@ const (

var VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage

var (
VReplicationNetReadTimeout = 300
VReplicationNetWriteTimeout = 600
)

func init() {
servenv.OnParseFor("vttablet", registerFlags)
}

func registerFlags(fs *pflag.FlagSet) {
fs.Int64Var(&VReplicationExperimentalFlags, "vreplication_experimental_flags", VReplicationExperimentalFlags,
"(Bitmask) of experimental features in vreplication to enable")
fs.IntVar(&VReplicationNetReadTimeout, "vreplication_net_read_timeout", VReplicationNetReadTimeout, "Session value of net_read_timeout for vreplication, in seconds")
fs.IntVar(&VReplicationNetWriteTimeout, "vreplication_net_write_timeout", VReplicationNetWriteTimeout, "Session value of net_write_timeout for vreplication, in seconds")
}
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync/atomic"
"time"

"vitess.io/vitess/go/vt/vttablet"

"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/vt/discovery"
Expand Down Expand Up @@ -243,6 +245,12 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
if _, err := dbClient.ExecuteFetch("set names 'binary'", 10000); err != nil {
return err
}
if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 10000); err != nil {
return err
}
if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 10000); err != nil {
return err
}
// We must apply AUTO_INCREMENT values precisely as we got them. This include the 0 value, which is not recommended in AUTO_INCREMENT, and yet is valid.
if _, err := dbClient.ExecuteFetch("set @@session.sql_mode = CONCAT(@@session.sql_mode, ',NO_AUTO_VALUE_ON_ZERO')", 10000); err != nil {
return err
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/vttablet"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
Expand Down Expand Up @@ -119,6 +121,12 @@ func (rs *rowStreamer) Stream() error {
if _, err := conn.ExecuteFetch("set names 'binary'", 1, false); err != nil {
return err
}
if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 1, false); err != nil {
return err
}
if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 1, false); err != nil {
return err
}
return rs.streamQuery(conn, rs.send)
}

Expand Down

0 comments on commit dde1210

Please sign in to comment.