From 68cf2909b016073f50cfe0231c6238603a05de36 Mon Sep 17 00:00:00 2001 From: pbibra Date: Tue, 12 Sep 2023 09:53:01 -0700 Subject: [PATCH 1/2] add vtgate flag that explicitly allows vstream copy (#125) --- go/flags/endtoend/vtgate.txt | 2 + go/vt/vtexplain/vtexplain_vtgate.go | 2 +- go/vt/vtgate/executor.go | 31 ++++--- go/vt/vtgate/executor_framework_test.go | 6 +- go/vt/vtgate/executor_select_test.go | 4 +- go/vt/vtgate/executor_stream_test.go | 2 +- go/vt/vtgate/vstream_manager.go | 19 ++++- go/vt/vtgate/vstream_manager_test.go | 82 +++++++++++++++---- go/vt/vtgate/vtgate.go | 4 +- .../tabletserver/vstreamer/uvstreamer.go | 5 +- 10 files changed, 114 insertions(+), 43 deletions(-) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 4e66e126e77..95e015c91dd 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -239,6 +239,8 @@ Usage of vtgate: Select tcp, tcp4, or tcp6 to control the socket type. (default tcp) --no_scatter when set to true, the planner will fail instead of producing a plan that includes scatter queries + --no_vstream_copy + when set to true, vstream copy will not be allowed - temporary until we can properly support RDONLY for this --normalize_queries Rewrite queries with bind vars. Turn this off if the app itself sends normalized queries with bind vars. (default true) --onclose_timeout duration diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index 57cc04ad69d..c065c41e4eb 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -71,7 +71,7 @@ func (vte *VTExplain) initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts streamSize := 10 var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests - vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion) + vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion, false) return nil } diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index a6631966f7b..c1ae7e209bb 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -107,6 +107,9 @@ type Executor struct { // allowScatter will fail planning if set to false and a plan contains any scatter queries allowScatter bool + // allowVstreamCopy will fail on vstream copy if false and no GTID provided for the stream. + // This is temporary until RDONLYs are properly supported for bootstrapping. + allowVstreamCopy bool } var executorOnce sync.Once @@ -127,20 +130,22 @@ func NewExecutor( schemaTracker SchemaInfo, noScatter bool, pv plancontext.PlannerVersion, + noVstreamCopy bool, ) *Executor { e := &Executor{ - serv: serv, - cell: cell, - resolver: resolver, - scatterConn: resolver.scatterConn, - txConn: resolver.scatterConn.txConn, - plans: cache.NewDefaultCacheImpl(cacheCfg), - normalize: normalize, - warnShardedOnly: warnOnShardedOnly, - streamSize: streamSize, - schemaTracker: schemaTracker, - allowScatter: !noScatter, - pv: pv, + serv: serv, + cell: cell, + resolver: resolver, + scatterConn: resolver.scatterConn, + txConn: resolver.scatterConn.txConn, + plans: cache.NewDefaultCacheImpl(cacheCfg), + normalize: normalize, + warnShardedOnly: warnOnShardedOnly, + streamSize: streamSize, + schemaTracker: schemaTracker, + allowScatter: !noScatter, + allowVstreamCopy: !noVstreamCopy, + pv: pv, } vschemaacl.Init() @@ -1318,7 +1323,7 @@ func (e *Executor) startVStream(ctx context.Context, rss []*srvtopo.ResolvedShar return err } - vsm := newVStreamManager(e.resolver.resolver, e.serv, e.cell) + vsm := newVStreamManager(e.resolver.resolver, e.serv, e.cell, e.allowVstreamCopy) vs := &vstream{ vgtid: vgtid, tabletType: topodatapb.TabletType_PRIMARY, diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 04f64cab47c..00b3161cb5c 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -469,7 +469,7 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn bad.VSchema = badVSchema getSandbox(KsTestUnsharded).VSchema = unshardedVSchema - executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) key.AnyShardPicker = DestinationAnyShardPickerFirstShard{} // create a new session each time so that ShardSessions don't get re-used across tests @@ -493,7 +493,7 @@ func createCustomExecutor(vschema string) (executor *Executor, sbc1, sbc2, sbclo sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) getSandbox(KsTestUnsharded).VSchema = unshardedVSchema - executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) // create a new session each time so that ShardSessions don't get re-used across tests primarySession = &vtgatepb.Session{ TargetString: "@primary", @@ -522,7 +522,7 @@ func createCustomExecutorSetValues(vschema string, values []*sqltypes.Result) (e sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) getSandbox(KsTestUnsharded).VSchema = unshardedVSchema - executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) // create a new session each time so that ShardSessions don't get re-used across tests primarySession = &vtgatepb.Session{ TargetString: "@primary", diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 3f9224fe003..535dc39f8a5 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -1483,7 +1483,7 @@ func TestStreamSelectIN(t *testing.T) { } func createExecutor(serv *sandboxTopo, cell string, resolver *Resolver) *Executor { - return NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + return NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) } func TestSelectScatter(t *testing.T) { @@ -2981,7 +2981,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) { count++ } - executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) before := runtime.NumGoroutine() query := "select id, col from user order by id limit 2" diff --git a/go/vt/vtgate/executor_stream_test.go b/go/vt/vtgate/executor_stream_test.go index 8fea4ed985f..ee3038972c3 100644 --- a/go/vt/vtgate/executor_stream_test.go +++ b/go/vt/vtgate/executor_stream_test.go @@ -61,7 +61,7 @@ func TestStreamSQLSharded(t *testing.T) { for _, shard := range shards { _ = hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil) } - executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) sql := "stream * from sharded_user_msgs" result, err := executorStreamMessages(executor, sql) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 8563ef0f4f1..8c6dd9f04f4 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" @@ -47,6 +48,9 @@ type vstreamManager struct { resolver *srvtopo.Resolver toposerv srvtopo.Server cell string + // allowVstreamCopy will fail on vstream copy if false and no GTID provided for the stream. + // This is temporary until RDONLYs are properly supported for bootstrapping. + allowVstreamCopy bool vstreamsCreated *stats.CountersWithMultiLabels vstreamsLag *stats.GaugesWithMultiLabels @@ -119,12 +123,13 @@ type journalEvent struct { done chan struct{} } -func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager { +func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager { exporter := servenv.NewExporter(cell, "VStreamManager") return &vstreamManager{ - resolver: resolver, - toposerv: serv, - cell: cell, + resolver: resolver, + toposerv: serv, + cell: cell, + allowVstreamCopy: allowVstreamCopy, vstreamsCreated: exporter.NewCountersWithMultiLabels( "VStreamsCreated", "Number of vstreams created", @@ -540,6 +545,12 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha log.Infof("Starting to vstream from %s", tablet.Alias.String()) // Safe to access sgtid.Gtid here (because it can't change until streaming begins). var vstreamCreatedOnce sync.Once + + if !vs.vsm.allowVstreamCopy && (sgtid.Gtid == "" || len(sgtid.TablePKs) > 0) { + // We are attempting a vstream copy, but are not allowed (temporary until we can properly support RDONLYs for bootstrapping) + return vterrors.NewErrorf(vtrpc.Code_UNIMPLEMENTED, vterrors.NotSupportedYet, "vstream copy is not currently supported") + } + err = tabletConn.VStream(ctx, target, sgtid.Gtid, sgtid.TablePKs, vs.filter, func(events []*binlogdatapb.VEvent) error { // We received a valid event. Reset error count. errCount = 0 diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 241ed3280d4..56586cf6fef 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -90,7 +90,7 @@ func TestVStreamSkew(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) vgtid := &binlogdatapb.VGtid{ShardGtids: []*binlogdatapb.ShardGtid{}} want := int64(0) var sbc0, sbc1 *sandboxconn.SandboxConn @@ -136,7 +136,7 @@ func TestVStreamEvents(t *testing.T) { hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) @@ -213,7 +213,7 @@ func TestVStreamChunks(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -298,7 +298,7 @@ func TestVStreamManagerGetCells(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) ts, _ := st.GetTopoServer() for _, tcase := range tcases { @@ -353,7 +353,7 @@ func TestVStreamMulti(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -415,7 +415,7 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) vsm.vstreamsCreated.ResetAll() vsm.vstreamsLag.ResetAll() sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -470,7 +470,7 @@ func TestVStreamRetry(t *testing.T) { hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) commit := []*binlogdatapb.VEvent{ @@ -511,7 +511,7 @@ func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) @@ -561,7 +561,7 @@ func TestVStreamJournalOneToMany(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -674,7 +674,7 @@ func TestVStreamJournalManyToOne(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -791,7 +791,7 @@ func TestVStreamJournalNoMatch(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) @@ -920,7 +920,7 @@ func TestVStreamJournalPartialMatch(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-10", sbc1.Tablet()) sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, ks, "10-20", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -1000,7 +1000,7 @@ func TestResolveVStreamParams(t *testing.T) { name := "TestVStream" _ = createSandbox(name) hc := discovery.NewFakeHealthCheck(nil) - vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa") + vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa", true) testcases := []struct { input *binlogdatapb.VGtid output *binlogdatapb.VGtid @@ -1146,7 +1146,7 @@ func TestVStreamIdleHeartbeat(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) vgtid := &binlogdatapb.VGtid{ @@ -1195,10 +1195,60 @@ func TestVStreamIdleHeartbeat(t *testing.T) { } } -func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { +func TestVstreamCopy(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cell := "aa" + ks := "TestVStreamCopy" + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + + st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) + sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) + commit := []*binlogdatapb.VEvent{ + {Type: binlogdatapb.VEventType_COMMIT}, + } + sbc0.AddVStreamEvents(commit, nil) + sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "aa")) + sbc0.AddVStreamEvents(commit, nil) + sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "bb")) + sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc")) + sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error")) + var count sync2.AtomicInt32 + count.Set(0) + // empty gtid id means no start position = bootstrapping/vstream copy + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: "-20", + Gtid: "", + }}, + } + + // allowVstreamCopy = false + vsm := newTestVStreamManager(hc, st, "aa", false) + err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + count.Add(1) + return nil + }) + require.Error(t, err) + require.Equal(t, err.Error(), "vstream copy is not currently supported") + + // allowVstreamCopy = true + vsm2 := newTestVStreamManager(hc, st, "aa", true) + err = vsm2.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + count.Add(1) + return nil + }) + require.Equal(t, err.Error(), "final error") +} + +func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager { gw := NewTabletGateway(context.Background(), hc, serv, cell) srvResolver := srvtopo.NewResolver(serv, gw, cell) - return newVStreamManager(srvResolver, serv, cell) + return newVStreamManager(srvResolver, serv, cell, allowVstreamCopy) } func startVStream(ctx context.Context, t *testing.T, vsm *vstreamManager, vgtid *binlogdatapb.VGtid, flags *vtgatepb.VStreamFlags) <-chan *binlogdatapb.VStreamResponse { diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 084d3a149c2..23bdf94d90d 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -71,6 +71,7 @@ var ( defaultDDLStrategy = flag.String("ddl_strategy", string(schema.DDLStrategyDirect), "Set default strategy for DDL statements. Override with @@ddl_strategy session variable") dbDDLPlugin = flag.String("dbddl_plugin", "fail", "controls how to handle CREATE/DROP DATABASE. use it if you are using your own database provisioning service") noScatter = flag.Bool("no_scatter", false, "when set to true, the planner will fail instead of producing a plan that includes scatter queries") + noVstreamCopy = flag.Bool("no_vstream_copy", false, "when set to true, vstream copy will not be allowed - temporary until we can properly support RDONLY for this") // TODO(deepthi): change these two vars to unexported and move to healthcheck.go when LegacyHealthcheck is removed @@ -210,7 +211,7 @@ func Init( sc := NewScatterConn("VttabletCall", tc, gw) srvResolver := srvtopo.NewResolver(serv, gw, cell) resolver := NewResolver(srvResolver, serv, cell, sc) - vsm := newVStreamManager(srvResolver, serv, cell) + vsm := newVStreamManager(srvResolver, serv, cell, !*noVstreamCopy) var si SchemaInfo // default nil var st *vtschema.Tracker @@ -238,6 +239,7 @@ func Init( si, *noScatter, pv, + *noVstreamCopy, ) // connect the schema tracker with the vschema manager diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 46af517ec29..2590300f7c2 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -124,8 +124,9 @@ func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se // buildTablePlan identifies the tables for the copy phase and creates the plans which consist of the lastPK seen // for a table and its Rule (for filtering purposes by the vstreamer engine) // it can be called -// the first time, with just the filter and an empty pos -// during a restart, with both the filter and list of TableLastPK from the vgtid +// +// the first time, with just the filter and an empty pos +// during a restart, with both the filter and list of TableLastPK from the vgtid func (uvs *uvstreamer) buildTablePlan() error { uvs.plans = make(map[string]*tablePlan) tableLastPKs := make(map[string]*binlogdatapb.TableLastPK) From 63e69528aff9cc844cb80d222774895977781843 Mon Sep 17 00:00:00 2001 From: pbibra Date: Tue, 12 Sep 2023 12:05:40 -0700 Subject: [PATCH 2/2] apply vcopy patch 11103 (#128) Signed-off-by: Priya Bibra --- go/vt/vtgate/endtoend/main_test.go | 12 ++ go/vt/vtgate/endtoend/vstream_test.go | 142 ++++++++++++++++++ .../tabletserver/vstreamer/uvstreamer.go | 77 +++++++--- .../vstreamer/uvstreamer_flaky_test.go | 9 +- 4 files changed, 220 insertions(+), 20 deletions(-) diff --git a/go/vt/vtgate/endtoend/main_test.go b/go/vt/vtgate/endtoend/main_test.go index df604c003cc..17cf3e6dd01 100644 --- a/go/vt/vtgate/endtoend/main_test.go +++ b/go/vt/vtgate/endtoend/main_test.go @@ -45,6 +45,12 @@ create table t1( primary key(id1) ) Engine=InnoDB; +create table t1_copy_resume( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + create table t1_id2_idx( id2 bigint, keyspace_id varbinary(10), @@ -133,6 +139,12 @@ create table t1_sharded( Name: "t1_id2_vdx", }}, }, + "t1_copy_resume": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id1", + Name: "hash", + }}, + }, "t1_sharded": { ColumnVindexes: []*vschemapb.ColumnVindex{{ Column: "id1", diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 477bb2518b5..a13aac8291d 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "sort" "sync" "testing" @@ -232,6 +233,119 @@ func TestVStreamCopyBasic(t *testing.T) { } } +func TestVStreamCopyResume(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + gconn, conn, mconn, closeConnections := initialize(ctx, t) + defer closeConnections() + + _, err := conn.ExecuteFetch("insert into t1_copy_resume(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) + if err != nil { + t.Fatal(err) + } + + // Any subsequent GTIDs will be part of the stream + mpos, err := mconn.PrimaryPosition() + require.NoError(t, err) + + // lastPK is id1=4, meaning we should only copy rows for id1 IN(5,6,7,8,9) + lastPK := sqltypes.Result{ + Fields: []*query.Field{{Name: "id1", Type: query.Type_INT64}}, + Rows: [][]sqltypes.Value{{sqltypes.NewInt64(4)}}, + } + tableLastPK := []*binlogdatapb.TableLastPK{{ + TableName: "t1_copy_resume", + Lastpk: sqltypes.ResultToProto3(&lastPK), + }} + + catchupQueries := []string{ + "insert into t1_copy_resume(id1,id2) values(9,9)", // this row will show up twice: once in catchup and copy + "update t1_copy_resume set id2 = 10 where id1 = 1", + "insert into t1(id1, id2) values(100,100)", + "delete from t1_copy_resume where id1 = 1", + "update t1_copy_resume set id2 = 90 where id1 = 9", + } + for _, query := range catchupQueries { + _, err = conn.ExecuteFetch(query, 1, false) + require.NoError(t, err) + } + + var shardGtids []*binlogdatapb.ShardGtid + var vgtid = &binlogdatapb.VGtid{} + shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ + Keyspace: "ks", + Shard: "-80", + Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos), + TablePKs: tableLastPK, + }) + shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ + Keyspace: "ks", + Shard: "80-", + Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos), + TablePKs: tableLastPK, + }) + vgtid.ShardGtids = shardGtids + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1_copy_resume", + Filter: "select * from t1_copy_resume", + }}, + } + flags := &vtgatepb.VStreamFlags{} + reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + if err != nil { + t.Fatal(err) + } + require.NotNil(t, reader) + + expectedRowCopyEvents := 5 // id1 and id2 IN(5,6,7,8,9) + expectedCatchupEvents := len(catchupQueries) - 1 // insert into t1 should never reach + rowCopyEvents, replCatchupEvents := 0, 0 + expectedEvents := []string{ + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"11"} after:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"55"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"66"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"77"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"88"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"99"} after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + } + var evs []*binlogdatapb.VEvent + for { + e, err := reader.Recv() + switch err { + case nil: + for _, ev := range e { + if ev.Type == binlogdatapb.VEventType_ROW { + evs = append(evs, ev) + if ev.Timestamp == 0 { + rowCopyEvents++ + } else { + replCatchupEvents++ + } + printEvents(evs) // for debugging ci failures + } + } + if expectedCatchupEvents == replCatchupEvents && expectedRowCopyEvents == rowCopyEvents { + sort.Sort(VEventSorter(evs)) + for i, ev := range evs { + require.Regexp(t, expectedEvents[i], ev.String()) + } + t.Logf("TestVStreamCopyResume was successful") + return + } + case io.EOF: + log.Infof("stream ended\n") + cancel() + default: + log.Errorf("Returned err %v", err) + t.Fatalf("remote error: %v\n", err) + } + } +} + func TestVStreamCurrent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -396,3 +510,31 @@ func printEvents(evs []*binlogdatapb.VEvent) { s += "===END===" + "\n" log.Infof("%s", s) } + +// Sort the VEvents by the first row change's after value bytes primarily, with +// secondary ordering by timestamp (ASC). Note that row copy events do not have +// a timestamp and the value will be 0. +type VEventSorter []*binlogdatapb.VEvent + +func (v VEventSorter) Len() int { + return len(v) +} +func (v VEventSorter) Swap(i, j int) { + v[i], v[j] = v[j], v[i] +} +func (v VEventSorter) Less(i, j int) bool { + valsI := v[i].GetRowEvent().RowChanges[0].After + if valsI == nil { + valsI = v[i].GetRowEvent().RowChanges[0].Before + } + valsJ := v[j].GetRowEvent().RowChanges[0].After + if valsJ == nil { + valsJ = v[j].GetRowEvent().RowChanges[0].Before + } + valI := string(valsI.Values) + valJ := string(valsJ.Values) + if valI == valJ { + return v[i].Timestamp < v[j].Timestamp + } + return valI < valJ +} diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 2590300f7c2..056d5da1822 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -218,7 +218,9 @@ func getQuery(tableName string, filter string) string { query = buf.String() case key.IsKeyRange(filter): buf := sqlparser.NewTrackedBuffer(nil) - buf.Myprintf("select * from %v where in_keyrange(%v)", sqlparser.NewTableIdent(tableName), sqlparser.NewStrLiteral(filter)) + // note: sqlparser.NewTableIdent is renamed to NewIdentifierCS in v15 + buf.Myprintf("select * from %v where in_keyrange(%v)", + sqlparser.NewTableIdent(tableName), sqlparser.NewStrLiteral(filter)) query = buf.String() } return query @@ -229,7 +231,40 @@ func (uvs *uvstreamer) Cancel() { uvs.cancel() } -// during copy phase only send streaming events (during catchup/fastforward) for pks already seen +// We have not yet implemented the logic to check if an event is for a row that is already copied, +// so we always return true so that we send all events for this table and so we don't miss events. +func (uvs *uvstreamer) isRowCopied(tableName string, ev *binlogdatapb.VEvent) bool { + return true +} + +// Only send catchup/fastforward events for tables whose copy phase is complete or in progress. +// This ensures we fulfill the at-least-once delivery semantics for events. +// TODO: filter out events for rows not yet copied. Note that we can only do this as a best-effort +// for comparable PKs. +func (uvs *uvstreamer) shouldSendEventForTable(tableName string, ev *binlogdatapb.VEvent) bool { + table, ok := uvs.plans[tableName] + // Event is for a table which is not in its copy phase. + if !ok { + return true + } + + // if table copy was not started and no tablePK was specified we can ignore catchup/fastforward events for it + if table.tablePK == nil || table.tablePK.Lastpk == nil { + return false + } + + // Table is currently in its copy phase. We have not yet implemented the logic to + // check if an event is for a row that is already copied, so we always return true + // there so that we don't miss events. + // We may send duplicate insert events or update/delete events for rows not yet seen + // to the client for the table being copied. This is ok as the client is expected to be + // idempotent: we only promise at-least-once semantics for VStream API (not exactly-once). + // Aside: vreplication workflows handle at-least-once by adding where clauses that render + // DML queries, related to events for rows not yet copied, as no-ops. + return uvs.isRowCopied(tableName, ev) +} + +// Do not send internal heartbeat events. Filter out events for tables whose copy has not been started. func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb.VEvent { if len(uvs.plans) == 0 { return evs @@ -239,25 +274,21 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb. var shouldSend bool for _, ev := range evs { - shouldSend = false - tableName = "" switch ev.Type { case binlogdatapb.VEventType_ROW: tableName = ev.RowEvent.TableName case binlogdatapb.VEventType_FIELD: tableName = ev.FieldEvent.TableName + default: + tableName = "" + } + switch ev.Type { case binlogdatapb.VEventType_HEARTBEAT: shouldSend = false default: - shouldSend = true - } - if !shouldSend && tableName != "" { - shouldSend = true - _, ok := uvs.plans[tableName] - if ok { - shouldSend = false - } + shouldSend = uvs.shouldSendEventForTable(tableName, ev) } + if shouldSend { evs2 = append(evs2, ev) } @@ -331,7 +362,9 @@ func (uvs *uvstreamer) setStreamStartPosition() error { } if !curPos.AtLeast(pos) { uvs.vse.errorCounts.Add("GTIDSet Mismatch", 1) - return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos)) + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, + "GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", + mysql.EncodePosition(pos), mysql.EncodePosition(curPos)) } uvs.pos = pos return nil @@ -346,17 +379,22 @@ func (uvs *uvstreamer) currentPosition() (mysql.Position, error) { return conn.PrimaryPosition() } +// Possible states: +// 1. TablePKs nil, startPos set to gtid or "current" => start replicating from pos +// 2. TablePKs nil, startPos empty => full table copy of tables matching filter +// 3. TablePKs not nil, startPos empty => table copy (for pks > lastPK) +// 4. TablePKs not nil, startPos set => run catchup from startPos, then table copy (for pks > lastPK) func (uvs *uvstreamer) init() error { - if uvs.startPos != "" { - if err := uvs.setStreamStartPosition(); err != nil { + if uvs.startPos == "" /* full copy */ || len(uvs.inTablePKs) > 0 /* resume copy */ { + if err := uvs.buildTablePlan(); err != nil { return err } - } else if uvs.startPos == "" || len(uvs.inTablePKs) > 0 { - if err := uvs.buildTablePlan(); err != nil { + } + if uvs.startPos != "" { + if err := uvs.setStreamStartPosition(); err != nil { return err } } - if uvs.pos.IsZero() && (len(uvs.plans) == 0) { return fmt.Errorf("stream needs a position or a table to copy") } @@ -378,7 +416,8 @@ func (uvs *uvstreamer) Stream() error { } uvs.sendTestEvent("Copy Done") } - vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse) + vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), + uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse) uvs.setVs(vs) return vs.Stream() diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index fdd60b8207f..1ed673ebf90 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -182,6 +182,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) { uvstreamerTestMode = true defer func() { uvstreamerTestMode = false }() initialize(t) + if err := engine.se.Reload(context.Background()); err != nil { t.Fatal("Error reloading schema") } @@ -190,6 +191,12 @@ func TestVStreamCopyCompleteFlow(t *testing.T) { var tablePKs []*binlogdatapb.TableLastPK for i, table := range testState.tables { rules = append(rules, getRule(table)) + + // for table t2, let tablepk be nil, so that we don't send events for the insert in initTables() + if table == "t2" { + continue + } + tablePKs = append(tablePKs, getTablePK(table, i+1)) } filter := &binlogdatapb.Filter{ @@ -246,7 +253,7 @@ commit;" numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/) numCopyEvents += 2 /* GTID + Test event after all copy is done */ - numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/ + numCatchupEvents := 3 * 5 /* 2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */ numFastForwardEvents := 5 /*t1:FIELD+ROW*/ numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */ numReplicateEvents := 2*5 /* insert into t1/t2 */ + 6 /* begin/field/2 inserts/gtid/commit */