Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Jul 26, 2023
1 parent 78ffcf4 commit cf7be29
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 11 deletions.
2 changes: 1 addition & 1 deletion doc/design-docs/VTGateBuffering.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ failing queries in the tablet gateway layer in vtgate. When a query fails, the r

To assist in diagnosing the root cause a _KeyspaceEventWatcher_ (aka KEW) was introduced. This runs in a goroutine and
watches the SrvKeyspace: if there is a change to the keyspace partitions in the topo it is considered that there is a
resharding in progress.
resharding operation in progress.

The buffering logic subscribes to the keyspace event watcher.

Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ var (
sidecarDBIdentifier = sqlparser.String(sqlparser.NewIdentifierCS(sidecarDBName))
mainClusterConfig *ClusterConfig
externalClusterConfig *ClusterConfig
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", "10s",
"--buffer_size", "100000", "--buffer_min_time_between_failovers", "2m", "--buffer_max_failover_duration", "10s"}
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", "1m",
"--buffer_size", "100000", "--buffer_min_time_between_failovers", "0s", "--buffer_max_failover_duration", "1m"}
extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"}
// This variable can be used within specific tests to alter vttablet behavior
extraVTTabletArgs = []string{}
Expand Down
4 changes: 3 additions & 1 deletion go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
Expand All @@ -27,7 +29,7 @@ func TestMoveTablesBuffering(t *testing.T) {
err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs,
tables, workflowActionCreate, "", "", "")
require.NoError(t, err)
waitForWorkflowState(t, vc, ksWorkflow, workflowStateRunning)
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())

loadCtx, cancelLoad := context.WithCancel(context.Background())
go func() {
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/buffer/shard_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,13 +502,13 @@ func (sb *shardBuffer) recordKeyspaceEvent(alias *topodatapb.TabletAlias, stillS
switch {
case moveTablesSwitched:
reason = stopMoveTablesSwitchingTraffic
msg = "MoveTables has switched writes"
msg = stopMoveTablesSwitchingTrafficMessage
case stillServing:
reason = stopFailoverEndDetected
msg = "a primary promotion has been detected"
msg = stopFailoverEndDetectedMessage
default:
reason = stopShardMissing
msg = "the keyspace has been resharded"
msg = stopShardMissingMessage
}
sb.stopBufferingLocked(reason, msg)
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/buffer/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ const (
stopMaxFailoverDurationExceeded stopReason = "MaxDurationExceeded"
stopShutdown stopReason = "Shutdown"
stopMoveTablesSwitchingTraffic stopReason = "MoveTablesSwitchedTraffic"

stopMoveTablesSwitchingTrafficMessage = "MoveTables has switched writes"
stopFailoverEndDetectedMessage = "a primary promotion has been detected"
stopShardMissingMessage = "the keyspace has been resharded"
)

// evictedReason is used in "requestsEvicted" as "Reason" label.
Expand Down
3 changes: 0 additions & 3 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,6 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn
primarySession = &vtgatepb.Session{
TargetString: "@primary",
}
// FIXME: This sleep seems to fix a lot of tests that are failing due to the change in this PR.
// For now keeping the sleep to confirm in CI. We need to replace this by waiting for whatever race this fixes.
// time.Sleep(1 * time.Second)
return executor, sbc1, sbc2, sbclookup
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type planExec func(ctx context.Context, plan *engine.Plan, vc *vcursorImpl, bind
type txResult func(sqlparser.StatementType, *sqltypes.Result) error

func waitForNewerVSchema(ctx context.Context, e *Executor, lastVSchemaCreated time.Time) bool {
timeout := 5 * time.Second
timeout := 30 * time.Second
pollingInterval := 10 * time.Millisecond
waitCtx, cancel := context.WithTimeout(ctx, timeout)
ticker := time.NewTicker(pollingInterval)
Expand Down

0 comments on commit cf7be29

Please sign in to comment.