From a80244dca234c4a340501d0b8e9e21cb59172077 Mon Sep 17 00:00:00 2001 From: pbibra Date: Mon, 18 Sep 2023 10:56:36 -0700 Subject: [PATCH 1/7] apply patch 12623 (#132) * apply patch 12623 Signed-off-by: Priya Bibra * fix unit test Signed-off-by: Priya Bibra * revert regex Signed-off-by: Priya Bibra * update regex Signed-off-by: Priya Bibra --------- Signed-off-by: Priya Bibra --- go/vt/vtgate/endtoend/vstream_test.go | 6 ++++++ go/vt/vttablet/tabletserver/vstreamer/copy.go | 2 +- .../tabletserver/vstreamer/uvstreamer_flaky_test.go | 6 +++--- .../vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go | 6 +++--- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index e7ead3dcd22..710cc50767a 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -481,6 +481,12 @@ func TestVStreamCopyResume(t *testing.T) { } printEvents(evs) // for debugging ci failures } + if ev.Type == binlogdatapb.VEventType_VGTID { + // Validate that the vgtid event the client receives from the vstream copy has a complete TableLastPK proto message. + // Also, to ensure that the client can resume properly, make sure that + // the Fields value is present in the sqltypes.Result field and not missing. + require.Regexp(t, `type:VGTID vgtid:{(shard_gtids:{keyspace:"ks" shard:"(80-|-80)" gtid:".+" table_p_ks:{table_name:"t1_copy_resume" lastpk:{fields:{name:"id1" type:INT64} rows:{lengths:1 values:"[0-9]"}}}})+} keyspace:"ks" shard:"(80-|-80)"`, ev.String()) + } } if expectedCatchupEvents == replCatchupEvents && expectedRowCopyEvents == rowCopyEvents { sort.Sort(VEventSorter(evs)) diff --git a/go/vt/vttablet/tabletserver/vstreamer/copy.go b/go/vt/vttablet/tabletserver/vstreamer/copy.go index 0065555047d..62b93cf5063 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/copy.go +++ b/go/vt/vttablet/tabletserver/vstreamer/copy.go @@ -262,7 +262,7 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { } newLastPK = sqltypes.CustomProto3ToResult(uvs.pkfields, &querypb.QueryResult{ - Fields: rows.Fields, + Fields: uvs.pkfields, Rows: []*querypb.Row{rows.Lastpk}, }) qrLastPK := sqltypes.ResultToProto3(newLastPK) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index 610b9012f7f..8ca43f008b6 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -477,7 +477,7 @@ var expectedEvents = []string{ "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:1 lengths:2 values:\"880\"}}}", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:1 lengths:2 values:\"990\"}}}", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:2 lengths:3 values:\"10100\"}}}", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\" lastpk:{rows:{lengths:2 values:\"10\"}}}}", + "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\" lastpk:{fields:{name:\"id11\" type:INT32} rows:{lengths:2 values:\"10\"}}}}", "type:COMMIT", "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\"} completed:true}", @@ -506,7 +506,7 @@ var expectedEvents = []string{ "type:ROW row_event:{table_name:\"t2\" row_changes:{after:{lengths:1 lengths:3 values:\"9180\"}}}", "type:ROW row_event:{table_name:\"t2\" row_changes:{after:{lengths:2 lengths:3 values:\"10200\"}}}", "type:ROW row_event:{table_name:\"t2\" row_changes:{after:{lengths:2 lengths:3 values:\"11220\"}}}", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2\" lastpk:{rows:{lengths:2 values:\"11\"}}}}", + "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2\" lastpk:{fields:{name:\"id21\" type:INT32} rows:{lengths:2 values:\"11\"}}}}", "type:COMMIT", "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2\"} completed:true}", @@ -534,7 +534,7 @@ var expectedEvents = []string{ "type:ROW row_event:{table_name:\"t3\" row_changes:{after:{lengths:1 lengths:3 values:\"8240\"}}}", "type:ROW row_event:{table_name:\"t3\" row_changes:{after:{lengths:1 lengths:3 values:\"9270\"}}}", "type:ROW row_event:{table_name:\"t3\" row_changes:{after:{lengths:2 lengths:3 values:\"10300\"}}}", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t3\" lastpk:{rows:{lengths:2 values:\"10\"}}}}", + "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t3\" lastpk:{fields:{name:\"id31\" type:INT32} rows:{lengths:2 values:\"10\"}}}}", "type:COMMIT", "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t3\"} completed:true}", diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go index 84d7d0de138..5d518558e87 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go @@ -506,7 +506,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id1\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id2\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:GTID", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:1 lengths:1 values:\"12\"}}}", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\" lastpk:{rows:{lengths:1 values:\"1\"}}}}", + "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\" lastpk:{fields:{name:\"id1\" type:INT32} rows:{lengths:1 values:\"1\"}}}}", "type:COMMIT", "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\"} completed:true}", @@ -514,7 +514,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { "type:BEGIN", "type:FIELD field_event:{table_name:\"t2a\" fields:{name:\"id1\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id2\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t2a\" row_changes:{after:{lengths:1 lengths:1 values:\"14\"}}}", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2a\" lastpk:{rows:{lengths:1 values:\"1\"}}}}", + "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2a\" lastpk:{fields:{name:\"id1\" type:INT32} rows:{lengths:1 values:\"1\"}}}}", "type:COMMIT", "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2a\"} completed:true}", @@ -523,7 +523,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { "type:FIELD field_event:{table_name:\"t2b\" fields:{name:\"id1\" type:VARCHAR table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id1\" column_length:80 charset:45 column_type:\"varchar(20)\"} fields:{name:\"id2\" type:INT32 table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t2b\" row_changes:{after:{lengths:1 lengths:1 values:\"a5\"}}}", "type:ROW row_event:{table_name:\"t2b\" row_changes:{after:{lengths:1 lengths:1 values:\"b6\"}}}", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2b\" lastpk:{rows:{lengths:1 values:\"b\"}}}}", + "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2b\" lastpk:{fields:{name:\"id1\" type:VARCHAR} rows:{lengths:1 values:\"b\"}}}}", "type:COMMIT", "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2b\"} completed:true}", From 31e9e72957f58d630ef4b082a2cca1fc3298ee1e Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 20 Jul 2023 12:57:27 -0400 Subject: [PATCH 2/7] VReplication: Ensure ROW events are sent within a transaction (#13547) Signed-off-by: Matt Lord --- go/vt/vtgate/endtoend/vstream_test.go | 119 ++++++++++++++++++ go/vt/vttablet/tabletserver/vstreamer/copy.go | 16 +++ .../tabletserver/vstreamer/uvstreamer.go | 18 +-- 3 files changed, 146 insertions(+), 7 deletions(-) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 710cc50767a..16eea4c91f2 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -655,6 +655,125 @@ func TestVStreamSharded(t *testing.T) { } +// TestVStreamCopyTransactions tests that we are properly wrapping +// ROW events in the stream with BEGIN and COMMIT events. +func TestVStreamCopyTransactions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + keyspace := "ks" + shards := []string{"-80", "80-"} + table := "t1_copy_basic" + beginEventSeen, commitEventSeen := false, false + numResultInTrx := 0 + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: keyspace, + Shard: shards[0], + Gtid: "", // Start a vstream copy + }, + { + Keyspace: keyspace, + Shard: shards[1], + Gtid: "", // Start a vstream copy + }, + }, + } + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }}, + } + + gconn, conn, _, closeConnections := initialize(ctx, t) + defer closeConnections() + + // Clear any existing data. + q := fmt.Sprintf("delete from %s", table) + _, err := conn.ExecuteFetch(q, -1, false) + require.NoError(t, err, "error clearing data: %v", err) + + // Generate some test data. Enough to cross the default + // vstream_packet_size threshold. + for i := 1; i <= 100000; i++ { + values := fmt.Sprintf("(%d, %d)", i, i) + q := fmt.Sprintf("insert into %s (id1, id2) values %s", table, values) + _, err := conn.ExecuteFetch(q, 1, false) + require.NoError(t, err, "error inserting data: %v", err) + } + + // Start a vstream. + reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, nil) + require.NoError(t, err, "error starting vstream: %v", err) + +recvLoop: + for { + vevents, err := reader.Recv() + numResultInTrx++ + eventCount := len(vevents) + t.Logf("------------------ Received %d events in response #%d for the transaction ------------------\n", + eventCount, numResultInTrx) + switch err { + case nil: + for _, event := range vevents { + switch event.Type { + case binlogdatapb.VEventType_BEGIN: + require.False(t, beginEventSeen, "received a second BEGIN event within the transaction: numResultInTrx=%d\n", + numResultInTrx) + beginEventSeen = true + t.Logf("Found BEGIN event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d\n", + beginEventSeen, commitEventSeen, event.Type, numResultInTrx) + require.False(t, commitEventSeen, "received a BEGIN event when expecting a COMMIT event: numResultInTrx=%d\n", + numResultInTrx) + case binlogdatapb.VEventType_VGTID: + t.Logf("Found VGTID event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n", + beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + case binlogdatapb.VEventType_FIELD: + t.Logf("Found FIELD event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n", + beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + case binlogdatapb.VEventType_ROW: + // Uncomment if you need to do more debugging. + // t.Logf("Found ROW event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n", + // beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + case binlogdatapb.VEventType_COMMIT: + commitEventSeen = true + t.Logf("Found COMMIT event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n", + beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + require.True(t, beginEventSeen, "received COMMIT event before receiving BEGIN event: numResultInTrx=%d\n", + numResultInTrx) + case binlogdatapb.VEventType_COPY_COMPLETED: + t.Logf("Finished vstream copy\n") + t.Logf("-------------------------------------------------------------------\n\n") + cancel() + break recvLoop + default: + t.Logf("Found extraneous event: %+v\n", event) + } + if beginEventSeen && commitEventSeen { + t.Logf("Received both BEGIN and COMMIT, so resetting transactional state\n") + beginEventSeen = false + commitEventSeen = false + numResultInTrx = 0 + } + } + case io.EOF: + t.Logf("vstream ended\n") + t.Logf("-------------------------------------------------------------------\n\n") + cancel() + return + default: + require.FailNowf(t, "unexpected error", "encountered error in vstream: %v", err) + return + } + } + // The last response, when the vstream copy completes, does not + // typically contain ROW events. + if beginEventSeen || commitEventSeen { + require.True(t, (beginEventSeen && commitEventSeen), "did not receive both BEGIN and COMMIT events in the final ROW event set") + } +} + func removeAnyDeprecatedDisplayWidths(orig string) string { var adjusted string baseIntType := "int" diff --git a/go/vt/vttablet/tabletserver/vstreamer/copy.go b/go/vt/vttablet/tabletserver/vstreamer/copy.go index 62b93cf5063..864dbd5d50c 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/copy.go +++ b/go/vt/vttablet/tabletserver/vstreamer/copy.go @@ -255,12 +255,26 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { log.Infof("sendFieldEvent returned error %v", err) return err } + // sendFieldEvent() sends a BEGIN event first. + uvs.inTransaction = true } + if len(rows.Rows) == 0 { log.V(2).Infof("0 rows returned for table %s", tableName) return nil } + // We are about to send ROW events, so we need to ensure + // that we do so within a transaction. The COMMIT event + // will be sent in sendEventsForRows() below. + if !uvs.inTransaction { + evs := []*binlogdatapb.VEvent{{ + Type: binlogdatapb.VEventType_BEGIN, + }} + uvs.send(evs) + uvs.inTransaction = true + } + newLastPK = sqltypes.CustomProto3ToResult(uvs.pkfields, &querypb.QueryResult{ Fields: uvs.pkfields, Rows: []*querypb.Row{rows.Lastpk}, @@ -271,6 +285,8 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { log.Infof("sendEventsForRows returned error %v", err) return err } + // sendEventsForRows() sends a COMMIT event last. + uvs.inTransaction = false uvs.setCopyState(tableName, qrLastPK) log.V(2).Infof("NewLastPK: %v", qrLastPK) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 2584a166226..34f13268a06 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -51,13 +51,17 @@ type uvstreamer struct { cancel func() // input parameters - vse *Engine - send func([]*binlogdatapb.VEvent) error - cp dbconfigs.Connector - se *schema.Engine - startPos string - filter *binlogdatapb.Filter - inTablePKs []*binlogdatapb.TableLastPK + vse *Engine + send func([]*binlogdatapb.VEvent) error + cp dbconfigs.Connector + se *schema.Engine + startPos string + // Are we currently in an explicit transaction? + // If we are not, and we're about to send ROW + // events, then we need to send a BEGIN event first. + inTransaction bool + filter *binlogdatapb.Filter + inTablePKs []*binlogdatapb.TableLastPK vschema *localVSchema From 8c458566eea0c40cc63320b03f8f13ca0b0f3bf4 Mon Sep 17 00:00:00 2001 From: pbibra Date: Thu, 18 May 2023 09:21:23 -0700 Subject: [PATCH 3/7] Update tabletpicker to support cell pref and tablet order options (#12282) * update tabletpicker to support cell pref and tablet order options Signed-off-by: Priya Bibra * add tablet picker options in vstream flags Signed-off-by: Priya Bibra * add tablet picker tests Signed-off-by: Priya Bibra * add summary docs Signed-off-by: Priya Bibra * update proto Signed-off-by: Priya Bibra Signed-off-by: 'Priya Bibra' * fix vreplication test Signed-off-by: Priya Bibra Signed-off-by: 'Priya Bibra' * fix doc typos, add function comments, update error handling Signed-off-by: Priya Bibra Signed-off-by: 'Priya Bibra' * fix tests and comments Signed-off-by: Priya Bibra * fix local cell ref Signed-off-by: Priya Bibra * update proto Signed-off-by: Priya Bibra * add log line to debug test Signed-off-by: Priya Bibra * add cell to vre test def Signed-off-by: Priya Bibra * define vrepl engine for controller tests Signed-off-by: Priya Bibra Signed-off-by: 'Priya Bibra' --------- Signed-off-by: Priya Bibra Signed-off-by: pbibra Signed-off-by: 'Priya Bibra' --- .../vreplication/vreplication_test.go | 1 + go/vt/discovery/tablet_picker.go | 244 +++++++- go/vt/discovery/tablet_picker_test.go | 542 ++++++++++-------- go/vt/proto/vtgate/vtgate.pb.go | 147 +++-- go/vt/proto/vtgate/vtgate_vtproto.pb.go | 86 +++ go/vt/vtgate/vstream_manager.go | 8 +- .../tabletmanager/vdiff/table_differ.go | 8 +- .../tabletmanager/vreplication/controller.go | 2 +- .../vreplication/controller_test.go | 23 +- go/vt/wrangler/traffic_switcher.go | 2 +- go/vt/wrangler/traffic_switcher_env_test.go | 4 +- go/vt/wrangler/vdiff.go | 4 +- proto/vtgate.proto | 2 + tools/rowlog/rowlog.go | 13 +- 14 files changed, 736 insertions(+), 350 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 5ae14270318..74e724ad52e 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -367,6 +367,7 @@ func testVStreamCellFlag(t *testing.T) { flags := &vtgatepb.VStreamFlags{} if tc.cells != "" { flags.Cells = tc.cells + flags.CellPreference = "onlyspecified" } ctx2, cancel := context.WithTimeout(ctx, 30*time.Second) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index cb0449c6191..20d4126831a 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -41,11 +41,40 @@ import ( "vitess.io/vitess/go/vt/vterrors" ) +type TabletPickerCellPreference int + +const ( + // PreferLocalWithAlias gives preference to the local cell first, then specified cells, if any. + // This is the default when no other option is provided. + TabletPickerCellPreference_PreferLocalWithAlias TabletPickerCellPreference = iota + // OnlySpecified only picks tablets from the list of cells given. + TabletPickerCellPreference_OnlySpecified +) + +type TabletPickerTabletOrder int + +const ( + // All provided tablet types are given equal priority. This is the default. + TabletPickerTabletOrder_Any TabletPickerTabletOrder = iota + // Provided tablet types are expected to be prioritized in the given order. + TabletPickerTabletOrder_InOrder +) + var ( tabletPickerRetryDelay = 30 * time.Second muTabletPickerRetryDelay sync.Mutex globalTPStats *tabletPickerStats inOrderHint = "in_order:" + + tabletPickerCellPreferenceMap = map[string]TabletPickerCellPreference{ + "preferlocalwithalias": TabletPickerCellPreference_PreferLocalWithAlias, + "onlyspecified": TabletPickerCellPreference_OnlySpecified, + } + + tabletPickerTabletOrderMap = map[string]TabletPickerTabletOrder{ + "any": TabletPickerTabletOrder_Any, + "inorder": TabletPickerTabletOrder_InOrder, + } ) // GetTabletPickerRetryDelay synchronizes changes to tabletPickerRetryDelay. Used in tests only at the moment @@ -62,18 +91,63 @@ func SetTabletPickerRetryDelay(delay time.Duration) { tabletPickerRetryDelay = delay } +type TabletPickerOptions struct { + CellPreference string + TabletOrder string +} + +func parseTabletPickerCellPreferenceString(str string) (TabletPickerCellPreference, error) { + // return default if blank + if str == "" { + return TabletPickerCellPreference_PreferLocalWithAlias, nil + } + + if c, ok := tabletPickerCellPreferenceMap[strings.ToLower(str)]; ok { + return c, nil + } + + return -1, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid cell preference: %v", str) +} + +func parseTabletPickerTabletOrderString(str string) (TabletPickerTabletOrder, error) { + // return default if blank + if str == "" { + return TabletPickerTabletOrder_Any, nil + } + + if o, ok := tabletPickerTabletOrderMap[strings.ToLower(str)]; ok { + return o, nil + } + + return -1, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid tablet order type: %v", str) +} + +type localCellInfo struct { + localCell string + cellsInAlias map[string]string +} + // TabletPicker gives a simplified API for picking tablets. type TabletPicker struct { - ts *topo.Server - cells []string - keyspace string - shard string - tabletTypes []topodatapb.TabletType - inOrder bool + ts *topo.Server + cells []string + keyspace string + shard string + tabletTypes []topodatapb.TabletType + inOrder bool + cellPref TabletPickerCellPreference + localCellInfo localCellInfo } // NewTabletPicker returns a TabletPicker. -func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTypesStr string) (*TabletPicker, error) { +func NewTabletPicker( + ctx context.Context, + ts *topo.Server, + cells []string, + localCell, keyspace, shard, tabletTypesStr string, + options TabletPickerOptions, +) (*TabletPicker, error) { + // Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted. tabletTypes, inOrder, err := ParseTabletTypesAndOrder(tabletTypesStr) if err != nil { return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to parse list of tablet types: %v", tabletTypesStr) @@ -92,19 +166,123 @@ func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTyp return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, fmt.Sprintf("Missing required field(s) for tablet picker: %s", strings.Join(missingFields, ", "))) } + + // Resolve tablet picker options + cellPref, err := parseTabletPickerCellPreferenceString(options.CellPreference) + if err != nil { + return nil, err + } + + // For backward compatibility only parse the options for tablet ordering + // if the in_order hint wasn't already specified. Otherwise it could be overridden. + // We can remove this check once the in_order hint is deprecated. + if !inOrder { + order, err := parseTabletPickerTabletOrderString(options.TabletOrder) + if err != nil { + return nil, err + } + switch order { + case TabletPickerTabletOrder_Any: + inOrder = false + case TabletPickerTabletOrder_InOrder: + inOrder = true + } + } + + aliasCellMap := make(map[string]string) + if cellPref == TabletPickerCellPreference_PreferLocalWithAlias { + if localCell == "" { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot have local cell preference without local cell") + } + + // Add local cell to the list of cells for tablet picking. + // This will be de-duped later if the local cell already exists in the original list - see: dedupeCells() + cells = append(cells, localCell) + aliasName := topo.GetAliasByCell(ctx, ts, localCell) + + // If an alias exists + if aliasName != localCell { + alias, err := ts.GetCellsAlias(ctx, aliasName, false) + if err != nil { + return nil, vterrors.Wrap(err, "error fetching local cell alias") + } + + // Add the aliasName to the list of cells for tablet picking. + cells = append(cells, aliasName) + + // Create a map of the cells in the alias to make lookup faster later when we're giving preference to these. + // see prioritizeTablets() + for _, c := range alias.Cells { + aliasCellMap[c] = c + } + } + } + return &TabletPicker{ - ts: ts, - cells: cells, - keyspace: keyspace, - shard: shard, - tabletTypes: tabletTypes, - inOrder: inOrder, + ts: ts, + cells: dedupeCells(cells), + localCellInfo: localCellInfo{localCell: localCell, cellsInAlias: aliasCellMap}, + keyspace: keyspace, + shard: shard, + tabletTypes: tabletTypes, + inOrder: inOrder, + cellPref: cellPref, }, nil } +// dedupeCells is used to remove duplicates in the cell list in case it is passed in +// and exists in the local cell's alias. Can happen if CellPreference is PreferLocalWithAlias. +func dedupeCells(cells []string) []string { + keys := make(map[string]bool) + dedupedCells := []string{} + + for _, c := range cells { + if _, value := keys[c]; !value { + keys[c] = true + dedupedCells = append(dedupedCells, c) + } + } + return dedupedCells +} + +// prioritizeTablets orders the candidate pool of tablets based on CellPreference. +// If CellPreference is PreferLocalWithAlias then tablets in the local cell will be prioritized for selection, +// followed by the tablets within the local cell's alias, and finally any others specified by the client. +// If CellPreference is OnlySpecified, then tablets will only be selected randomly from the cells specified by the client. +func (tp *TabletPicker) prioritizeTablets(candidates []*topo.TabletInfo) (sameCell, sameAlias, allOthers []*topo.TabletInfo) { + for _, c := range candidates { + if c.Alias.Cell == tp.localCellInfo.localCell { + sameCell = append(sameCell, c) + } else if _, ok := tp.localCellInfo.cellsInAlias[c.Alias.Cell]; ok { + sameAlias = append(sameAlias, c) + } else { + allOthers = append(allOthers, c) + } + } + + return sameCell, sameAlias, allOthers +} + +func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo.TabletInfo { + // Sort candidates slice such that tablets appear in same tablet type order as in tp.tabletTypes + orderMap := map[topodatapb.TabletType]int{} + for i, t := range tp.tabletTypes { + orderMap[t] = i + } + sort.Slice(candidates, func(i, j int) bool { + if orderMap[candidates[i].Type] == orderMap[candidates[j].Type] { + // identical tablet types: randomize order of tablets for this type + return rand.Intn(2) == 0 // 50% chance + } + return orderMap[candidates[i].Type] < orderMap[candidates[j].Type] + }) + + return candidates +} + // PickForStreaming picks an available tablet. -// All tablets that belong to tp.cells are evaluated and one is -// chosen at random. +// Selection is based on CellPreference. +// See prioritizeTablets for prioritization logic. func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) { rand.Seed(time.Now().UnixNano()) // keep trying at intervals (tabletPickerRetryDelay) until a tablet is found @@ -116,19 +294,30 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table default: } candidates := tp.GetMatchingTablets(ctx) - if tp.inOrder { - // Sort candidates slice such that tablets appear in same tablet type order as in tp.tabletTypes - orderMap := map[topodatapb.TabletType]int{} - for i, t := range tp.tabletTypes { - orderMap[t] = i + if tp.cellPref == TabletPickerCellPreference_PreferLocalWithAlias { + sameCellCandidates, sameAliasCandidates, allOtherCandidates := tp.prioritizeTablets(candidates) + + if tp.inOrder { + sameCellCandidates = tp.orderByTabletType(sameCellCandidates) + sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates) + allOtherCandidates = tp.orderByTabletType(allOtherCandidates) + } else { + // Randomize candidates + rand.Shuffle(len(sameCellCandidates), func(i, j int) { + sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i] + }) + rand.Shuffle(len(sameAliasCandidates), func(i, j int) { + sameAliasCandidates[i], sameAliasCandidates[j] = sameAliasCandidates[j], sameAliasCandidates[i] + }) + rand.Shuffle(len(allOtherCandidates), func(i, j int) { + allOtherCandidates[i], allOtherCandidates[j] = allOtherCandidates[j], allOtherCandidates[i] + }) } - sort.Slice(candidates, func(i, j int) bool { - if orderMap[candidates[i].Type] == orderMap[candidates[j].Type] { - // identical tablet types: randomize order of tablets for this type - return rand.Intn(2) == 0 // 50% chance - } - return orderMap[candidates[i].Type] < orderMap[candidates[j].Type] - }) + + candidates = append(sameCellCandidates, sameAliasCandidates...) + candidates = append(candidates, allOtherCandidates...) + } else if tp.inOrder { + candidates = tp.orderByTabletType(candidates) } else { // Randomize candidates rand.Shuffle(len(candidates), func(i, j int) { @@ -204,6 +393,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn actualCells = append(actualCells, cell) } } + for _, cell := range actualCells { shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index ed071af13ad..88368c02a60 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -1,12 +1,9 @@ /* Copyright 2019 The Vitess Authors. - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -31,169 +28,6 @@ import ( "vitess.io/vitess/go/vt/topo/memorytopo" ) -func TestPickSimple(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell"}) - want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") - require.NoError(t, err) - - tablet, err := tp.PickForStreaming(context.Background()) - require.NoError(t, err) - assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) -} - -func TestPickFromTwoHealthy(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell"}) - want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want1) - want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true) - defer deleteTablet(t, te, want2) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly") - require.NoError(t, err) - - // In 20 attempts, both tablet types must be picked at least once. - var picked1, picked2 bool - for i := 0; i < 20; i++ { - tablet, err := tp.PickForStreaming(context.Background()) - require.NoError(t, err) - if proto.Equal(tablet, want1) { - picked1 = true - } - if proto.Equal(tablet, want2) { - picked2 = true - } - } - assert.True(t, picked1) - assert.True(t, picked2) -} - -func TestPickInOrder1(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell"}) - want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want1) - want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true) - defer deleteTablet(t, te, want2) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "in_order:replica,rdonly") - require.NoError(t, err) - - // In 20 attempts, we always pick the first healthy tablet in order - var picked1, picked2 bool - for i := 0; i < 20; i++ { - tablet, err := tp.PickForStreaming(context.Background()) - require.NoError(t, err) - if proto.Equal(tablet, want1) { - picked1 = true - } - if proto.Equal(tablet, want2) { - picked2 = true - } - } - assert.True(t, picked1) - assert.False(t, picked2) -} - -func TestPickInOrder2(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell"}) - want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want1) - want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true) - defer deleteTablet(t, te, want2) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "in_order:rdonly,replica") - require.NoError(t, err) - - // In 20 attempts, we always pick the first healthy tablet in order - var picked1, picked2 bool - for i := 0; i < 20; i++ { - tablet, err := tp.PickForStreaming(context.Background()) - require.NoError(t, err) - if proto.Equal(tablet, want1) { - picked1 = true - } - if proto.Equal(tablet, want2) { - picked2 = true - } - } - assert.False(t, picked1) - assert.True(t, picked2) -} - -func TestPickInOrderMultipleInGroup(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell"}) - want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want1) - want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true) - defer deleteTablet(t, te, want2) - want3 := addTablet(te, 102, topodatapb.TabletType_RDONLY, "cell", true, true) - defer deleteTablet(t, te, want3) - want4 := addTablet(te, 103, topodatapb.TabletType_RDONLY, "cell", true, true) - defer deleteTablet(t, te, want4) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "in_order:rdonly,replica") - require.NoError(t, err) - - // In 40 attempts, we pick each of the three RDONLY, but never the REPLICA - var picked1, picked2, picked3, picked4 bool - for i := 0; i < 40; i++ { - tablet, err := tp.PickForStreaming(context.Background()) - require.NoError(t, err) - if proto.Equal(tablet, want1) { - picked1 = true - } - if proto.Equal(tablet, want2) { - picked2 = true - } - if proto.Equal(tablet, want3) { - picked3 = true - } - if proto.Equal(tablet, want4) { - picked4 = true - } - } - assert.False(t, picked1) - assert.True(t, picked2) - assert.True(t, picked3) - assert.True(t, picked4) -} - -func TestPickRespectsTabletType(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell"}) - want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want) - dont := addTablet(te, 101, topodatapb.TabletType_PRIMARY, "cell", true, true) - defer deleteTablet(t, te, dont) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly") - require.NoError(t, err) - - // In 20 attempts, primary tablet must be never picked - for i := 0; i < 20; i++ { - tablet, err := tp.PickForStreaming(context.Background()) - require.NoError(t, err) - require.NotNil(t, tablet) - require.True(t, proto.Equal(tablet, want), "picked wrong tablet type") - } -} - -func TestPickMultiCell(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell", "otherCell"}) - want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") - require.NoError(t, err) - - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() - tablet, err := tp.PickForStreaming(ctx) - require.NoError(t, err) - assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) -} - func TestPickPrimary(t *testing.T) { te := newPickerTestEnv(t, []string{"cell", "otherCell"}) want := addTablet(te, 100, topodatapb.TabletType_PRIMARY, "cell", true, true) @@ -206,7 +40,7 @@ func TestPickPrimary(t *testing.T) { }) require.NoError(t, err) - tp, err := NewTabletPicker(te.topoServ, []string{"otherCell"}, te.keyspace, te.shard, "primary") + tp, err := NewTabletPicker(context.Background(), te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{}) require.NoError(t, err) ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond) @@ -216,67 +50,278 @@ func TestPickPrimary(t *testing.T) { assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) } -func TestPickFromOtherCell(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell", "otherCell"}) - want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "otherCell", true, true) - defer deleteTablet(t, te, want) +func TestPickLocalPreferences(t *testing.T) { + type tablet struct { + id uint32 + typ topodatapb.TabletType + cell string + } - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") - require.NoError(t, err) + type testCase struct { + name string - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() - tablet, err := tp.PickForStreaming(ctx) - require.NoError(t, err) - assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) -} - -func TestDontPickFromOtherCell(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell", "otherCell"}) - want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want1) - want2 := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true) - defer deleteTablet(t, te, want2) + //inputs + tablets []tablet + envCells []string + inCells []string + localCell string + inTabletTypes string + options TabletPickerOptions - tp, err := NewTabletPicker(te.topoServ, []string{"cell"}, te.keyspace, te.shard, "replica") - require.NoError(t, err) + //expected + tpCells []string + wantTablets []uint32 + } - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() + tcases := []testCase{ + { + name: "pick simple", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + }, + envCells: []string{"cell"}, + inCells: []string{"cell"}, + localCell: "cell", + inTabletTypes: "replica", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "cella"}, + wantTablets: []uint32{100}, + }, { + name: "pick from two healthy", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_RDONLY, "cell"}, + }, + envCells: []string{"cell"}, + inCells: []string{"cell"}, + localCell: "cell", + inTabletTypes: "replica,rdonly", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "cella"}, + wantTablets: []uint32{100, 101}, + }, { + name: "pick in order replica", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_RDONLY, "cell"}, + }, + envCells: []string{"cell"}, + inCells: []string{"cell"}, + localCell: "cell", + inTabletTypes: "in_order:replica,rdonly", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "cella"}, + wantTablets: []uint32{100}, + }, { + name: "pick in order rdonly", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_RDONLY, "cell"}, + }, + envCells: []string{"cell"}, + inCells: []string{"cell"}, + localCell: "cell", + inTabletTypes: "in_order:rdonly,replica", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "cella"}, + wantTablets: []uint32{101}, + }, { + name: "pick in order multiple in group", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_RDONLY, "cell"}, + {102, topodatapb.TabletType_RDONLY, "cell"}, + {103, topodatapb.TabletType_RDONLY, "cell"}, + }, + envCells: []string{"cell"}, + inCells: []string{"cell"}, + localCell: "cell", + inTabletTypes: "in_order:rdonly,replica", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "cella"}, + wantTablets: []uint32{101, 102, 103}, + }, { + // Same test as above, except the in order preference is passed via the new TabletPickerOptions param. + // This will replace the above test when we deprecate the "in_order" hint in the tabletTypeStr + name: "pick in order multiple in group with new picker option", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_RDONLY, "cell"}, + {102, topodatapb.TabletType_RDONLY, "cell"}, + {103, topodatapb.TabletType_RDONLY, "cell"}, + }, + envCells: []string{"cell"}, + inCells: []string{"cell"}, + localCell: "cell", + inTabletTypes: "rdonly,replica", + options: TabletPickerOptions{TabletOrder: "InOrder"}, + tpCells: []string{"cell", "cella"}, + wantTablets: []uint32{101, 102, 103}, + }, { + name: "picker respects tablet type", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_PRIMARY, "cell"}, + }, + envCells: []string{"cell"}, + inCells: []string{"cell"}, + localCell: "cell", + inTabletTypes: "replica,rdonly", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "cella"}, + wantTablets: []uint32{100}, + }, { + name: "pick multi cell", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + }, + envCells: []string{"cell", "otherCell"}, + inCells: []string{"cell", "otherCell"}, + localCell: "cell", + inTabletTypes: "replica", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "otherCell", "cella"}, + wantTablets: []uint32{100}, + }, { + name: "pick from other cell", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "otherCell"}, + }, + envCells: []string{"cell", "otherCell"}, + inCells: []string{"cell", "otherCell"}, + localCell: "cell", + inTabletTypes: "replica", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "otherCell", "cella"}, + wantTablets: []uint32{100}, + }, { + name: "don't pick from other cell", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_REPLICA, "otherCell"}, + }, + envCells: []string{"cell", "otherCell"}, + inCells: []string{"cell"}, + localCell: "cell", + inTabletTypes: "replica", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "cella"}, + wantTablets: []uint32{100}, + }, { + name: "multi cell two tablets, local preference default", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_REPLICA, "otherCell"}, + }, + envCells: []string{"cell", "otherCell"}, + inCells: []string{"cell", "otherCell"}, + localCell: "cell", + inTabletTypes: "replica", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "otherCell", "cella"}, + wantTablets: []uint32{100}, + }, { + name: "multi cell two tablets, only specified cells", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_REPLICA, "otherCell"}, + }, + envCells: []string{"cell", "otherCell"}, + inCells: []string{"cell", "otherCell"}, + localCell: "cell", + inTabletTypes: "replica", + options: TabletPickerOptions{CellPreference: "OnlySpecified"}, + tpCells: []string{"cell", "otherCell"}, + wantTablets: []uint32{100, 101}, + }, { + name: "multi cell two tablet types, local preference default", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_RDONLY, "otherCell"}, + }, + envCells: []string{"cell", "otherCell"}, + inCells: []string{"cell", "otherCell"}, + localCell: "cell", + inTabletTypes: "replica,rdonly", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "otherCell", "cella"}, + wantTablets: []uint32{100}, + }, { + name: "multi cell two tablet types, only specified cells", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_RDONLY, "otherCell"}, + }, + envCells: []string{"cell", "otherCell"}, + inCells: []string{"cell", "otherCell"}, + localCell: "cell", + inTabletTypes: "replica,rdonly", + options: TabletPickerOptions{CellPreference: "OnlySpecified"}, + tpCells: []string{"cell", "otherCell"}, + wantTablets: []uint32{100, 101}, + }, + } - // In 20 attempts, only want1 must be picked because TabletPicker.cells = "cell" - var picked1, picked2 bool - for i := 0; i < 20; i++ { - tablet, err := tp.PickForStreaming(ctx) - require.NoError(t, err) - if proto.Equal(tablet, want1) { - picked1 = true - } - if proto.Equal(tablet, want2) { - picked2 = true - } + ctx := context.Background() + for _, tcase := range tcases { + t.Run(tcase.name, func(t *testing.T) { + te := newPickerTestEnv(t, tcase.envCells) + var testTablets []*topodatapb.Tablet + for _, tab := range tcase.tablets { + testTablets = append(testTablets, addTablet(te, int(tab.id), tab.typ, tab.cell, true, true)) + } + defer func() { + for _, tab := range testTablets { + deleteTablet(t, te, tab) + } + }() + tp, err := NewTabletPicker(context.Background(), te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options) + require.NoError(t, err) + require.Equal(t, tp.localCellInfo.localCell, tcase.localCell) + require.ElementsMatch(t, tp.cells, tcase.tpCells) + + var selectedTablets []uint32 + selectedTabletMap := make(map[uint32]bool) + for i := 0; i < 40; i++ { + tab, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + selectedTabletMap[tab.Alias.Uid] = true + } + for uid := range selectedTabletMap { + selectedTablets = append(selectedTablets, uid) + } + require.ElementsMatch(t, selectedTablets, tcase.wantTablets) + }) } - assert.True(t, picked1) - assert.False(t, picked2) } -func TestPickMultiCellTwoTablets(t *testing.T) { +func TestPickCellPreferenceLocalCell(t *testing.T) { + // test env puts all cells into an alias called "cella" te := newPickerTestEnv(t, []string{"cell", "otherCell"}) want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) defer deleteTablet(t, te, want1) - want2 := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true) - defer deleteTablet(t, te, want2) - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") + // Local cell preference is default + tp, err := NewTabletPicker(context.Background(), te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() + ctx1, cancel1 := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel1() + tablet, err := tp.PickForStreaming(ctx1) + require.NoError(t, err) + assert.True(t, proto.Equal(want1, tablet), "Pick: %v, want %v", tablet, want1) + + // create a tablet in the other cell + want2 := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true) + defer deleteTablet(t, te, want2) + + ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel2() - // In 20 attempts, both tablet types must be picked at least once. + // In 20 attempts, only tablet in "cell" will be picked because we give local cell priority by default var picked1, picked2 bool for i := 0; i < 20; i++ { - tablet, err := tp.PickForStreaming(ctx) + tablet, err := tp.PickForStreaming(ctx2) require.NoError(t, err) if proto.Equal(tablet, want1) { picked1 = true @@ -286,45 +331,32 @@ func TestPickMultiCellTwoTablets(t *testing.T) { } } assert.True(t, picked1) - assert.True(t, picked2) + assert.False(t, picked2) } -func TestPickMultiCellTwoTabletTypes(t *testing.T) { +func TestPickCellPreferenceLocalAlias(t *testing.T) { + // test env puts all cells into an alias called "cella" te := newPickerTestEnv(t, []string{"cell", "otherCell"}) - want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want1) - want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "otherCell", true, true) - defer deleteTablet(t, te, want2) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly") + tp, err := NewTabletPicker(context.Background(), te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) + // create a tablet in the other cell, it should be picked + want := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true) + defer deleteTablet(t, te, want) ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() - - // In 20 attempts, both tablet types must be picked at least once. - var picked1, picked2 bool - for i := 0; i < 20; i++ { - tablet, err := tp.PickForStreaming(ctx) - require.NoError(t, err) - if proto.Equal(tablet, want1) { - picked1 = true - } - if proto.Equal(tablet, want2) { - picked2 = true - } - } - assert.True(t, picked1) - assert.True(t, picked2) + tablet, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) } -func TestPickUsingCellAlias(t *testing.T) { +func TestPickUsingCellAliasOnlySpecified(t *testing.T) { // test env puts all cells into an alias called "cella" te := newPickerTestEnv(t, []string{"cell", "otherCell"}) want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) defer deleteTablet(t, te, want1) - tp, err := NewTabletPicker(te.topoServ, []string{"cella"}, te.keyspace, te.shard, "replica") + tp, err := NewTabletPicker(context.Background(), te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}) require.NoError(t, err) ctx1, cancel1 := context.WithTimeout(context.Background(), 200*time.Millisecond) @@ -348,7 +380,8 @@ func TestPickUsingCellAlias(t *testing.T) { ctx3, cancel3 := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel3() - // In 20 attempts, both tablet types must be picked at least once. + // In 20 attempts each of the tablets should get picked at least once. + // Local cell is not given preference var picked1, picked2 bool for i := 0; i < 20; i++ { tablet, err := tp.PickForStreaming(ctx3) @@ -366,7 +399,7 @@ func TestPickUsingCellAlias(t *testing.T) { func TestTabletAppearsDuringSleep(t *testing.T) { te := newPickerTestEnv(t, []string{"cell"}) - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") + tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) delay := GetTabletPickerRetryDelay() @@ -392,12 +425,12 @@ func TestTabletAppearsDuringSleep(t *testing.T) { assert.True(t, proto.Equal(want, got), "Pick: %v, want %v", got, want) } -func TestPickError(t *testing.T) { +func TestPickErrorLocalPreferenceDefault(t *testing.T) { te := newPickerTestEnv(t, []string{"cell"}) - _, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "badtype") + _, err := NewTabletPicker(context.Background(), te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{}) assert.EqualError(t, err, "failed to parse list of tablet types: badtype") - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") + tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) delay := GetTabletPickerRetryDelay() defer func() { @@ -416,6 +449,33 @@ func TestPickError(t *testing.T) { defer cancel() _, err = tp.PickForStreaming(ctx) require.EqualError(t, err, "context has expired") + // if local preference is selected, tp cells include's the local cell's alias + require.Greater(t, globalTPStats.noTabletFoundError.Counts()["cell_cella.ks.0.replica"], int64(0)) +} + +func TestPickErrorOnlySpecified(t *testing.T) { + te := newPickerTestEnv(t, []string{"cell"}) + + tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}) + require.NoError(t, err) + delay := GetTabletPickerRetryDelay() + defer func() { + SetTabletPickerRetryDelay(delay) + }() + SetTabletPickerRetryDelay(11 * time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + // no tablets + _, err = tp.PickForStreaming(ctx) + require.EqualError(t, err, "context has expired") + // no tablets of the correct type + defer deleteTablet(t, te, addTablet(te, 200, topodatapb.TabletType_RDONLY, "cell", true, true)) + ctx, cancel = context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + _, err = tp.PickForStreaming(ctx) + require.EqualError(t, err, "context has expired") + require.Greater(t, globalTPStats.noTabletFoundError.Counts()["cell.ks.0.replica"], int64(0)) } diff --git a/go/vt/proto/vtgate/vtgate.pb.go b/go/vt/proto/vtgate/vtgate.pb.go index 11914d8f3fb..fdc959e903c 100644 --- a/go/vt/proto/vtgate/vtgate.pb.go +++ b/go/vt/proto/vtgate/vtgate.pb.go @@ -988,7 +988,9 @@ type VStreamFlags struct { StopOnReshard bool `protobuf:"varint,3,opt,name=stop_on_reshard,json=stopOnReshard,proto3" json:"stop_on_reshard,omitempty"` // if specified, these cells (comma-separated) are used to pick source tablets from. // defaults to the cell of the vtgate serving the VStream API. - Cells string `protobuf:"bytes,4,opt,name=cells,proto3" json:"cells,omitempty"` + Cells string `protobuf:"bytes,4,opt,name=cells,proto3" json:"cells,omitempty"` + CellPreference string `protobuf:"bytes,5,opt,name=cell_preference,json=cellPreference,proto3" json:"cell_preference,omitempty"` + TabletOrder string `protobuf:"bytes,6,opt,name=tablet_order,json=tabletOrder,proto3" json:"tablet_order,omitempty"` } func (x *VStreamFlags) Reset() { @@ -1051,6 +1053,20 @@ func (x *VStreamFlags) GetCells() string { return "" } +func (x *VStreamFlags) GetCellPreference() string { + if x != nil { + return x.CellPreference + } + return "" +} + +func (x *VStreamFlags) GetTabletOrder() string { + if x != nil { + return x.TabletOrder + } + return "" +} + // VStreamRequest is the payload for VStream. type VStreamRequest struct { state protoimpl.MessageState @@ -1696,7 +1712,7 @@ var file_vtgate_proto_rawDesc = []byte{ 0x0a, 0x04, 0x64, 0x74, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x74, 0x69, 0x64, 0x22, 0x1c, 0x0a, 0x1a, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0xa0, 0x01, 0x0a, 0x0c, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6c, 0x61, 0x67, + 0x22, 0xec, 0x01, 0x0a, 0x0c, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6c, 0x61, 0x67, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x6d, 0x69, 0x6e, 0x69, 0x6d, 0x69, 0x7a, 0x65, 0x5f, 0x73, 0x6b, 0x65, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x6d, 0x69, 0x6e, 0x69, 0x6d, 0x69, 0x7a, 0x65, 0x53, 0x6b, 0x65, 0x77, 0x12, 0x2d, 0x0a, 0x12, 0x68, 0x65, 0x61, 0x72, 0x74, 0x62, @@ -1706,67 +1722,72 @@ var file_vtgate_proto_rawDesc = []byte{ 0x5f, 0x72, 0x65, 0x73, 0x68, 0x61, 0x72, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0d, 0x73, 0x74, 0x6f, 0x70, 0x4f, 0x6e, 0x52, 0x65, 0x73, 0x68, 0x61, 0x72, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x65, 0x6c, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x63, 0x65, - 0x6c, 0x6c, 0x73, 0x22, 0xf6, 0x01, 0x0a, 0x0e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, - 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, - 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, - 0x65, 0x72, 0x49, 0x64, 0x12, 0x35, 0x0a, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, 0x74, - 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x74, 0x6f, 0x70, 0x6f, - 0x64, 0x61, 0x74, 0x61, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, - 0x0a, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x27, 0x0a, 0x05, 0x76, - 0x67, 0x74, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x62, 0x69, 0x6e, - 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x47, 0x74, 0x69, 0x64, 0x52, 0x05, 0x76, - 0x67, 0x74, 0x69, 0x64, 0x12, 0x2a, 0x0a, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, - 0x61, 0x2e, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x52, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, - 0x12, 0x2a, 0x0a, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x14, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x46, 0x6c, 0x61, 0x67, 0x73, 0x52, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x22, 0x3d, 0x0a, 0x0f, - 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x2a, 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x45, 0x76, - 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x92, 0x01, 0x0a, 0x0e, - 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, - 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, - 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, - 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, - 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, - 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x27, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x42, - 0x6f, 0x75, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, - 0x22, 0x89, 0x01, 0x0a, 0x0f, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, - 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x29, 0x0a, 0x07, 0x73, - 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, - 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, - 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x24, 0x0a, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, - 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x46, - 0x69, 0x65, 0x6c, 0x64, 0x52, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x22, 0x6e, 0x0a, 0x13, - 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, - 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, - 0x64, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, - 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3d, 0x0a, 0x14, - 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, - 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x2a, 0x44, 0x0a, 0x0f, 0x54, - 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x0f, - 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, - 0x0a, 0x0a, 0x06, 0x53, 0x49, 0x4e, 0x47, 0x4c, 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, - 0x55, 0x4c, 0x54, 0x49, 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x57, 0x4f, 0x50, 0x43, 0x10, - 0x03, 0x2a, 0x3c, 0x0a, 0x0b, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x4f, 0x72, 0x64, 0x65, 0x72, - 0x12, 0x0a, 0x0a, 0x06, 0x4e, 0x4f, 0x52, 0x4d, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, - 0x50, 0x52, 0x45, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x50, 0x4f, 0x53, 0x54, 0x10, 0x02, 0x12, - 0x0e, 0x0a, 0x0a, 0x41, 0x55, 0x54, 0x4f, 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0x10, 0x03, 0x42, - 0x36, 0x0a, 0x0f, 0x69, 0x6f, 0x2e, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x5a, 0x23, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, - 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2f, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6c, 0x6c, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x63, 0x65, 0x6c, 0x6c, 0x5f, 0x70, 0x72, 0x65, 0x66, + 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x63, 0x65, + 0x6c, 0x6c, 0x50, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x12, 0x21, 0x0a, 0x0c, + 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x18, 0x06, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x22, + 0xf6, 0x01, 0x0a, 0x0e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, + 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, + 0x12, 0x35, 0x0a, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x74, 0x6f, 0x70, 0x6f, 0x64, 0x61, 0x74, 0x61, + 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0a, 0x74, 0x61, 0x62, + 0x6c, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x27, 0x0a, 0x05, 0x76, 0x67, 0x74, 0x69, 0x64, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, + 0x61, 0x74, 0x61, 0x2e, 0x56, 0x47, 0x74, 0x69, 0x64, 0x52, 0x05, 0x76, 0x67, 0x74, 0x69, 0x64, + 0x12, 0x2a, 0x0a, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x46, 0x69, + 0x6c, 0x74, 0x65, 0x72, 0x52, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x2a, 0x0a, 0x05, + 0x66, 0x6c, 0x61, 0x67, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x76, 0x74, + 0x67, 0x61, 0x74, 0x65, 0x2e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6c, 0x61, 0x67, + 0x73, 0x52, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x22, 0x3d, 0x0a, 0x0f, 0x56, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x65, + 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x62, 0x69, + 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, + 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x92, 0x01, 0x0a, 0x0e, 0x50, 0x72, 0x65, 0x70, + 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, + 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, + 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, + 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, + 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, + 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, + 0x69, 0x6f, 0x6e, 0x12, 0x27, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x42, 0x6f, 0x75, 0x6e, 0x64, + 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x22, 0x89, 0x01, 0x0a, + 0x0f, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x25, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, 0x72, 0x72, 0x6f, 0x72, + 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, + 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, + 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, + 0x6f, 0x6e, 0x12, 0x24, 0x0a, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x18, 0x03, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, + 0x52, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x22, 0x6e, 0x0a, 0x13, 0x43, 0x6c, 0x6f, 0x73, + 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, + 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, + 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, + 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, + 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3d, 0x0a, 0x14, 0x43, 0x6c, 0x6f, 0x73, + 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x25, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, 0x72, 0x72, 0x6f, 0x72, + 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x2a, 0x44, 0x0a, 0x0f, 0x54, 0x72, 0x61, 0x6e, 0x73, + 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, + 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x53, + 0x49, 0x4e, 0x47, 0x4c, 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x55, 0x4c, 0x54, 0x49, + 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x57, 0x4f, 0x50, 0x43, 0x10, 0x03, 0x2a, 0x3c, 0x0a, + 0x0b, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x0a, 0x0a, 0x06, + 0x4e, 0x4f, 0x52, 0x4d, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x50, 0x52, 0x45, 0x10, + 0x01, 0x12, 0x08, 0x0a, 0x04, 0x50, 0x4f, 0x53, 0x54, 0x10, 0x02, 0x12, 0x0e, 0x0a, 0x0a, 0x41, + 0x55, 0x54, 0x4f, 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0x10, 0x03, 0x42, 0x36, 0x0a, 0x0f, 0x69, + 0x6f, 0x2e, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x23, + 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, + 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x74, 0x67, + 0x61, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/go/vt/proto/vtgate/vtgate_vtproto.pb.go b/go/vt/proto/vtgate/vtgate_vtproto.pb.go index 8f05e8d01ab..1ddccaaf169 100644 --- a/go/vt/proto/vtgate/vtgate_vtproto.pb.go +++ b/go/vt/proto/vtgate/vtgate_vtproto.pb.go @@ -910,6 +910,20 @@ func (m *VStreamFlags) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.TabletOrder) > 0 { + i -= len(m.TabletOrder) + copy(dAtA[i:], m.TabletOrder) + i = encodeVarint(dAtA, i, uint64(len(m.TabletOrder))) + i-- + dAtA[i] = 0x32 + } + if len(m.CellPreference) > 0 { + i -= len(m.CellPreference) + copy(dAtA[i:], m.CellPreference) + i = encodeVarint(dAtA, i, uint64(len(m.CellPreference))) + i-- + dAtA[i] = 0x2a + } if len(m.Cells) > 0 { i -= len(m.Cells) copy(dAtA[i:], m.Cells) @@ -1667,6 +1681,14 @@ func (m *VStreamFlags) SizeVT() (n int) { if l > 0 { n += 1 + l + sov(uint64(l)) } + l = len(m.CellPreference) + if l > 0 { + n += 1 + l + sov(uint64(l)) + } + l = len(m.TabletOrder) + if l > 0 { + n += 1 + l + sov(uint64(l)) + } if m.unknownFields != nil { n += len(m.unknownFields) } @@ -4244,6 +4266,70 @@ func (m *VStreamFlags) UnmarshalVT(dAtA []byte) error { } m.Cells = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CellPreference", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.CellPreference = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TabletOrder", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TabletOrder = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skip(dAtA[iNdEx:]) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index f660cae4be2..03a9d78d32a 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -119,6 +119,8 @@ type vstream struct { eventCh chan []*binlogdatapb.VEvent heartbeatInterval uint32 ts *topo.Server + + tabletPickerOptions discovery.TabletPickerOptions } type journalEvent struct { @@ -176,6 +178,10 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta heartbeatInterval: flags.GetHeartbeatInterval(), ts: ts, copyCompletedShard: make(map[string]struct{}), + tabletPickerOptions: discovery.TabletPickerOptions{ + CellPreference: flags.GetCellPreference(), + TabletOrder: flags.GetTabletOrder(), + }, } return vs.stream(ctx) } @@ -492,7 +498,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var eventss [][]*binlogdatapb.VEvent var err error cells := vs.getCells() - tp, err := discovery.NewTabletPicker(vs.ts, cells, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()) + tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions) if err != nil { log.Errorf(err.Error()) return err diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index f24d9442ac1..bc3335e305b 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -218,7 +218,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes stri go func() { defer wg.Done() err1 = td.forEachSource(func(source *migrationSource) error { - tablet, err := pickTablet(ctx, sourceTopoServer, cell, ct.sourceKeyspace, source.shard, tabletTypes) + tablet, err := pickTablet(ctx, sourceTopoServer, cell, ct.vde.thisTablet.Alias.Cell, ct.sourceKeyspace, source.shard, tabletTypes) if err != nil { return err } @@ -230,7 +230,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes stri wg.Add(1) go func() { defer wg.Done() - tablet, err2 := pickTablet(ctx, ct.ts, td.wd.opts.PickerOptions.TargetCell, ct.vde.thisTablet.Keyspace, + tablet, err2 := pickTablet(ctx, ct.ts, td.wd.opts.PickerOptions.TargetCell, ct.vde.thisTablet.Alias.Cell, ct.vde.thisTablet.Keyspace, ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes) if err2 != nil { return @@ -248,8 +248,8 @@ func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes stri return err2 } -func pickTablet(ctx context.Context, ts *topo.Server, cell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { - tp, err := discovery.NewTabletPicker(ts, []string{cell}, keyspace, shard, tabletTypes) +func pickTablet(ctx context.Context, ts *topo.Server, cell, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { + tp, err := discovery.NewTabletPicker(ctx, ts, []string{cell}, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 60c478a078c..0eed7de71ee 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -123,7 +123,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor return nil, err } } - tp, err := discovery.NewTabletPicker(sourceTopo, cells, ct.source.Keyspace, ct.source.Shard, tabletTypesStr) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{}) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go index c73f6be56ae..ebea9b8225f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go @@ -88,8 +88,9 @@ func TestControllerKeyRange(t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, nil) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, vre) if err != nil { t.Fatal(err) } @@ -149,8 +150,10 @@ func TestControllerTables(t *testing.T) { }, }, } + mysqld.MysqlPort.Set(3306) + vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, nil) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, vre) if err != nil { t.Fatal(err) } @@ -217,8 +220,9 @@ func TestControllerOverrides(t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, nil) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, vre) if err != nil { t.Fatal(err) } @@ -232,7 +236,8 @@ func TestControllerOverrides(t *testing.T) { } func TestControllerCanceledContext(t *testing.T) { - defer deleteTablet(addTablet(100)) + wantTablet := addTablet(100) + defer deleteTablet(wantTablet) params := map[string]string{ "id": "1", @@ -242,7 +247,9 @@ func TestControllerCanceledContext(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - ct, err := newController(ctx, params, nil, nil, env.TopoServ, env.Cells[0], "rdonly", nil, nil) + vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, nil, nil, nil, "", nil) + + ct, err := newController(ctx, params, nil, nil, env.TopoServ, env.Cells[0], "rdonly", nil, vre) if err != nil { t.Fatal(err) } @@ -285,8 +292,9 @@ func TestControllerRetry(t *testing.T) { dbClient.ExpectRequest("commit", nil, nil) dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + vre := NewTestEngine(nil, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, nil) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, vre) if err != nil { t.Fatal(err) } @@ -343,8 +351,9 @@ func TestControllerStopPosition(t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, nil) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, vre) if err != nil { t.Fatal(err) } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 79a967121ae..9a98dfe24b1 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -434,7 +434,7 @@ func (wr *Wrangler) areTabletsAvailableToStreamFrom(ctx context.Context, ts *tra if cells == nil { cells = append(cells, shard.PrimaryAlias.Cell) } - tp, err := discovery.NewTabletPicker(wr.ts, cells, keyspace, shard.ShardName(), tabletTypes) + tp, err := discovery.NewTabletPicker(ctx, wr.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypes, discovery.TabletPickerOptions{}) if err != nil { allErrors.RecordError(err) return diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index af0a6bea2f3..02eb5afb377 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -433,7 +433,7 @@ func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) { tme.dbSourceClients = append(tme.dbSourceClients, dbclient) dbClientFactory := func() binlogplayer.DBClient { return dbclient } // Replace existing engine with a new one - primary.TM.VREngine = vreplication.NewTestEngine(tme.ts, "", primary.FakeMysqlDaemon, dbClientFactory, dbClientFactory, dbclient.DBName(), nil) + primary.TM.VREngine = vreplication.NewTestEngine(tme.ts, primary.Tablet.GetAlias().GetCell(), primary.FakeMysqlDaemon, dbClientFactory, dbClientFactory, dbclient.DBName(), nil) primary.TM.VREngine.Open(ctx) } for _, primary := range tme.targetPrimaries { @@ -442,7 +442,7 @@ func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) { tme.dbTargetClients = append(tme.dbTargetClients, dbclient) dbClientFactory := func() binlogplayer.DBClient { return dbclient } // Replace existing engine with a new one - primary.TM.VREngine = vreplication.NewTestEngine(tme.ts, "", primary.FakeMysqlDaemon, dbClientFactory, dbClientFactory, dbclient.DBName(), nil) + primary.TM.VREngine = vreplication.NewTestEngine(tme.ts, primary.Tablet.GetAlias().GetCell(), primary.FakeMysqlDaemon, dbClientFactory, dbClientFactory, dbclient.DBName(), nil) primary.TM.VREngine.Open(ctx) } tme.allDBClients = append(tme.dbSourceClients, tme.dbTargetClients...) diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index cfecefaeabd..e31f4c4b8c2 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -692,7 +692,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { if ts.ExternalTopo() != nil { sourceTopo = ts.ExternalTopo() } - tp, err := discovery.NewTabletPicker(sourceTopo, []string{df.sourceCell}, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) if err != nil { return err } @@ -710,7 +710,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { go func() { defer wg.Done() err2 = df.forAll(df.targets, func(shard string, target *shardStreamer) error { - tp, err := discovery.NewTabletPicker(df.ts.TopoServer(), []string{df.targetCell}, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr) + tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) if err != nil { return err } diff --git a/proto/vtgate.proto b/proto/vtgate.proto index 55e94221e5f..6431f58f727 100644 --- a/proto/vtgate.proto +++ b/proto/vtgate.proto @@ -283,6 +283,8 @@ message VStreamFlags { // if specified, these cells (comma-separated) are used to pick source tablets from. // defaults to the cell of the vtgate serving the VStream API. string cells = 4; + string cell_preference = 5; + string tablet_order = 6; } // VStreamRequest is the payload for VStream. diff --git a/tools/rowlog/rowlog.go b/tools/rowlog/rowlog.go index 9593e88d652..369cc68b5db 100644 --- a/tools/rowlog/rowlog.go +++ b/tools/rowlog/rowlog.go @@ -378,7 +378,18 @@ func getFlavor(ctx context.Context, server, keyspace string) string { } func getTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace string) string { - picker, err := discovery.NewTabletPicker(ts, cells, keyspace, "0", "primary") + picker, err := discovery.NewTabletPicker( + ctx, + ts, + cells, + "", + keyspace, + "0", + "primary", + discovery.TabletPickerOptions{ + CellPreference: "OnlySpecified", + }, + ) if err != nil { return "" } From db22c90dab09c9304f534466a08de646d7174b23 Mon Sep 17 00:00:00 2001 From: pbibra Date: Mon, 23 Oct 2023 12:42:32 -0700 Subject: [PATCH 4/7] allow tablet picker to exclude specified tablets from its candidate list (#14224) Signed-off-by: Priya Bibra --- go/vt/discovery/tablet_picker.go | 23 +++- go/vt/discovery/tablet_picker_test.go | 55 +++++++++ go/vt/vtgate/vstream_manager.go | 64 ++++++++-- go/vt/vtgate/vstream_manager_test.go | 171 ++++++++++++++++++++------ 4 files changed, 263 insertions(+), 50 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 20d4126831a..c9537d3851e 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -137,6 +137,8 @@ type TabletPicker struct { inOrder bool cellPref TabletPickerCellPreference localCellInfo localCellInfo + // This map is keyed on the results of TabletAlias.String(). + ignoreTablets map[string]struct{} } // NewTabletPicker returns a TabletPicker. @@ -146,6 +148,7 @@ func NewTabletPicker( cells []string, localCell, keyspace, shard, tabletTypesStr string, options TabletPickerOptions, + ignoreTablets ...*topodatapb.TabletAlias, ) (*TabletPicker, error) { // Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted. tabletTypes, inOrder, err := ParseTabletTypesAndOrder(tabletTypesStr) @@ -218,7 +221,7 @@ func NewTabletPicker( } } - return &TabletPicker{ + tp := &TabletPicker{ ts: ts, cells: dedupeCells(cells), localCellInfo: localCellInfo{localCell: localCell, cellsInAlias: aliasCellMap}, @@ -227,7 +230,15 @@ func NewTabletPicker( tabletTypes: tabletTypes, inOrder: inOrder, cellPref: cellPref, - }, nil + ignoreTablets: make(map[string]struct{}, len(ignoreTablets)), + } + + for _, ignoreTablet := range ignoreTablets { + tp.ignoreTablets[ignoreTablet.String()] = struct{}{} + } + + return tp, nil + } // dedupeCells is used to remove duplicates in the cell list in case it is passed in @@ -368,7 +379,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn log.Errorf("error getting shard %s/%s: %s", tp.keyspace, tp.shard, err.Error()) return nil } - aliases = append(aliases, si.PrimaryAlias) + if _, ignore := tp.ignoreTablets[si.PrimaryAlias.String()]; !ignore { + aliases = append(aliases, si.PrimaryAlias) + } } else { actualCells := make([]string, 0) for _, cell := range tp.cells { @@ -404,7 +417,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn } for _, node := range sri.Nodes { - aliases = append(aliases, node.TabletAlias) + if _, ignore := tp.ignoreTablets[node.TabletAlias.String()]; !ignore { + aliases = append(aliases, node.TabletAlias) + } } } } diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 88368c02a60..c637c6eb4cf 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -350,6 +350,61 @@ func TestPickCellPreferenceLocalAlias(t *testing.T) { assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) } +// TestPickUsingCellAsAlias confirms that when the tablet picker is +// given a cell name that is an alias, it will choose a tablet that +// exists within a cell that is part of the alias. +func TestPickUsingCellAsAlias(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + // The test env puts all cells into an alias called "cella". + // We're also going to specify an optional extraCell that is NOT + // added to the alias. + te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2", "cell3"}, "xtracell") + // Specify the alias as the cell. + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}) + require.NoError(t, err) + + // Create a tablet in one of the main cells, it should be + // picked as it is part of the cella alias. This tablet is + // NOT part of the talbet picker's local cell (cell1) so it + // will not be given local preference. + want := addTablet(ctx, te, 101, topodatapb.TabletType_REPLICA, "cell2", true, true) + defer deleteTablet(t, te, want) + // Create a tablet in an extra cell which is thus NOT part of + // the cella alias so it should NOT be picked. + noWant := addTablet(ctx, te, 102, topodatapb.TabletType_REPLICA, "xtracell", true, true) + defer deleteTablet(t, te, noWant) + // Try it many times to be sure we don't ever pick the wrong one. + for i := 0; i < 100; i++ { + tablet, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) + } +} + +func TestPickWithIgnoreList(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2"}) + + want := addTablet(ctx, te, 101, topodatapb.TabletType_REPLICA, "cell1", true, true) + defer deleteTablet(t, te, want) + + dontWant := addTablet(ctx, te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true) + defer deleteTablet(t, te, dontWant) + + // Specify the alias as the cell. + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, dontWant.GetAlias()) + require.NoError(t, err) + + // Try it many times to be sure we don't ever pick from the ignore list. + for i := 0; i < 100; i++ { + tablet, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + require.False(t, proto.Equal(dontWant, tablet), "Picked the tablet we shouldn't have: %v", dontWant) + } +} + func TestPickUsingCellAliasOnlySpecified(t *testing.T) { // test env puts all cells into an alias called "cella" te := newPickerTestEnv(t, []string{"cell", "otherCell"}) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 03a9d78d32a..73ccfa9bf69 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -25,23 +25,21 @@ import ( "sync" "time" + "google.golang.org/protobuf/proto" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" - querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" - - "google.golang.org/protobuf/proto" - - "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/srvtopo" - "vitess.io/vitess/go/vt/vterrors" ) // vstreamManager manages vstream requests. @@ -60,6 +58,10 @@ type vstreamManager struct { // maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set const maxSkewTimeoutSeconds = 10 * 60 +// tabletPickerContextTimeout is the timeout for the child context used to select candidate tablets +// for a vstream +const tabletPickerContextTimeout = 90 * time.Second + // vstream contains the metadata for one VStream request. type vstream struct { // mu protects parts of vgtid, the semantics of a send, and journaler. @@ -131,6 +133,7 @@ type journalEvent struct { func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager { exporter := servenv.NewExporter(cell, "VStreamManager") + return &vstreamManager{ resolver: resolver, toposerv: serv, @@ -481,6 +484,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // journalDone is assigned a channel when a journal event is encountered. // It will be closed when all journal events converge. var journalDone chan struct{} + ignoreTablets := make([]*topodatapb.TabletAlias, 0) errCount := 0 for { @@ -498,12 +502,19 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var eventss [][]*binlogdatapb.VEvent var err error cells := vs.getCells() - tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions) + + tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...) if err != nil { log.Errorf(err.Error()) return err } - tablet, err := tp.PickForStreaming(ctx) + + // Create a child context with a stricter timeout when picking a tablet. + // This will prevent hanging in the case no tablets are found. + tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout) + defer tpCancel() + + tablet, err := tp.PickForStreaming(tpCtx) if err != nil { log.Errorf(err.Error()) return err @@ -684,11 +695,18 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // Unreachable. err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly") } - if vterrors.Code(err) != vtrpcpb.Code_FAILED_PRECONDITION && vterrors.Code(err) != vtrpcpb.Code_UNAVAILABLE { + + retry, ignoreTablet := vs.shouldRetry(err) + if !retry { log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) return err } + if ignoreTablet { + ignoreTablets = append(ignoreTablets, tablet.GetAlias()) + } + errCount++ + // Retry, at most, 3 times if the error can be retried. if errCount >= 3 { log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err) return err @@ -697,6 +715,30 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } } +// shouldRetry determines whether we should exit immediately or retry the vstream. +// The first return value determines if the error can be retried, while the second +// indicates whether the tablet with which the error occurred should be ommitted +// from the candidate list of tablets to choose from on the retry. +// +// An error should be retried if it is expected to be transient. +// A tablet should be ignored upon retry if it's likely another tablet will not +// produce the same error. +func (vs *vstream) shouldRetry(err error) (bool, bool) { + errCode := vterrors.Code(err) + + if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE { + return true, false + } + + // If there is a GTIDSet Mismatch on the tablet, omit it from the candidate + // list in the TabletPicker on retry. + if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") { + return true, true + } + + return false, false +} + // sendAll sends a group of events together while holding the lock. func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error { vs.mu.Lock() diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index a9dbc05f203..0a80ef6e3f5 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -389,47 +389,132 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches") } -func TestVStreamRetry(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func TestVStreamRetriableErrors(t *testing.T) { + type testCase struct { + name string + code vtrpcpb.Code + msg string + shouldRetry bool + ignoreTablet bool + } - cell := "aa" - ks := "TestVStream" - _ = createSandbox(ks) - hc := discovery.NewFakeHealthCheck(nil) + tcases := []testCase{ + { + name: "failed precondition", + code: vtrpcpb.Code_FAILED_PRECONDITION, + msg: "", + shouldRetry: true, + ignoreTablet: false, + }, + { + name: "gtid mismatch", + code: vtrpcpb.Code_INVALID_ARGUMENT, + msg: "GTIDSet Mismatch aa", + shouldRetry: true, + ignoreTablet: true, + }, + { + name: "unavailable", + code: vtrpcpb.Code_UNAVAILABLE, + msg: "", + shouldRetry: true, + ignoreTablet: false, + }, + { + name: "should not retry", + code: vtrpcpb.Code_INVALID_ARGUMENT, + msg: "final error", + shouldRetry: false, + ignoreTablet: false, + }, + } - st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, "aa", true) - sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) - addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) commit := []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_COMMIT}, } - sbc0.AddVStreamEvents(commit, nil) - sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "aa")) - sbc0.AddVStreamEvents(commit, nil) - sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "bb")) - sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc")) - sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error")) - var count sync2.AtomicInt32 - count.Set(0) - vgtid := &binlogdatapb.VGtid{ - ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: ks, - Shard: "-20", - Gtid: "pos", - }}, - } - err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { - count.Add(1) - return nil - }) - wantErr := "final error" - if err == nil || !strings.Contains(err.Error(), wantErr) { - t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr) + + want := &binlogdatapb.VStreamResponse{Events: commit} + + for _, tcase := range tcases { + t.Run(tcase.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // aa will be the local cell for this test, but that tablet will have a vstream error. + cells := []string{"aa", "ab"} + + ks := "TestVStream" + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + + st := getSandboxTopoMultiCell(cells, ks, []string{"-20"}) + + sbc0 := hc.AddTestTablet(cells[0], "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_REPLICA, true, 1, nil) + sbc1 := hc.AddTestTablet(cells[1], "1.1.1.1", 1002, ks, "-20", topodatapb.TabletType_REPLICA, true, 1, nil) + + addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) + addTabletToSandboxTopo(t, st, ks, "-20", sbc1.Tablet()) + + vsm := newTestVStreamManager(hc, st, cells[0], true) + + // Always have the local cell tablet error so it's ignored on retry and we pick the other one + // if the error requires ignoring the tablet on retry. + sbc0.AddVStreamEvents(nil, vterrors.Errorf(tcase.code, tcase.msg)) + + if tcase.ignoreTablet { + sbc1.AddVStreamEvents(commit, nil) + } else { + sbc0.AddVStreamEvents(commit, nil) + } + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: "-20", + Gtid: "pos", + }}, + } + + ch := make(chan *binlogdatapb.VStreamResponse) + done := make(chan struct{}) + go func() { + err := vsm.VStream(ctx, topodatapb.TabletType_REPLICA, vgtid, nil, &vtgatepb.VStreamFlags{Cells: strings.Join(cells, ",")}, func(events []*binlogdatapb.VEvent) error { + ch <- &binlogdatapb.VStreamResponse{Events: events} + return nil + }) + wantErr := "context canceled" + + if !tcase.shouldRetry { + wantErr = tcase.msg + } + + if err == nil || !strings.Contains(err.Error(), wantErr) { + t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr) + } + close(done) + }() + + Loop: + for { + if tcase.shouldRetry { + select { + case event := <-ch: + got := event + if !proto.Equal(got, want) { + t.Errorf("got different vstream event than expected") + } + cancel() + case <-done: + // The goroutine has completed, so break out of the loop + break Loop + } + } else { + <-done + break Loop + } + } + }) } - time.Sleep(100 * time.Millisecond) // wait for goroutine within VStream to finish - assert.Equal(t, int32(2), count.Get()) } func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) { @@ -1314,6 +1399,22 @@ func getSandboxTopo(ctx context.Context, cell string, keyspace string, shards [] return st } +func getSandboxTopoMultiCell(cells []string, keyspace string, shards []string) *sandboxTopo { + st := newSandboxForCells(cells) + ts := st.topoServer + + for _, cell := range cells { + ts.CreateCellInfo(ctx, cell, &topodatapb.CellInfo{}) + } + + ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}) + + for _, shard := range shards { + ts.CreateShard(ctx, keyspace, shard) + } + return st +} + func addTabletToSandboxTopo(t *testing.T, st *sandboxTopo, ks, shard string, tablet *topodatapb.Tablet) { _, err := st.topoServer.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error { si.PrimaryAlias = tablet.Alias From 24cf103b0ff9535f697626466b4bcb76fcf423bc Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 29 May 2024 03:06:34 +0200 Subject: [PATCH 5/7] goimports Signed-off-by: Tim Vaillancourt --- go/vt/vtgate/vstream_manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 73ccfa9bf69..1d4b0dcd87c 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -26,6 +26,7 @@ import ( "time" "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" From b3f3b93d0c2bc364cfff2fdae316b032931fe936 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 29 May 2024 03:19:34 +0200 Subject: [PATCH 6/7] fix test in go/vt/discovery Signed-off-by: Tim Vaillancourt --- go/vt/discovery/tablet_picker_test.go | 41 ++++----------------------- 1 file changed, 5 insertions(+), 36 deletions(-) diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index c637c6eb4cf..fd2c1635359 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -350,47 +350,16 @@ func TestPickCellPreferenceLocalAlias(t *testing.T) { assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) } -// TestPickUsingCellAsAlias confirms that when the tablet picker is -// given a cell name that is an alias, it will choose a tablet that -// exists within a cell that is part of the alias. -func TestPickUsingCellAsAlias(t *testing.T) { - ctx := utils.LeakCheckContext(t) - - // The test env puts all cells into an alias called "cella". - // We're also going to specify an optional extraCell that is NOT - // added to the alias. - te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2", "cell3"}, "xtracell") - // Specify the alias as the cell. - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}) - require.NoError(t, err) - - // Create a tablet in one of the main cells, it should be - // picked as it is part of the cella alias. This tablet is - // NOT part of the talbet picker's local cell (cell1) so it - // will not be given local preference. - want := addTablet(ctx, te, 101, topodatapb.TabletType_REPLICA, "cell2", true, true) - defer deleteTablet(t, te, want) - // Create a tablet in an extra cell which is thus NOT part of - // the cella alias so it should NOT be picked. - noWant := addTablet(ctx, te, 102, topodatapb.TabletType_REPLICA, "xtracell", true, true) - defer deleteTablet(t, te, noWant) - // Try it many times to be sure we don't ever pick the wrong one. - for i := 0; i < 100; i++ { - tablet, err := tp.PickForStreaming(ctx) - require.NoError(t, err) - assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) - } -} - func TestPickWithIgnoreList(t *testing.T) { - ctx := utils.LeakCheckContext(t) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() - te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2"}) + te := newPickerTestEnv(t, []string{"cell1", "cell2"}) - want := addTablet(ctx, te, 101, topodatapb.TabletType_REPLICA, "cell1", true, true) + want := addTablet(te, 101, topodatapb.TabletType_REPLICA, "cell1", true, true) defer deleteTablet(t, te, want) - dontWant := addTablet(ctx, te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true) + dontWant := addTablet(te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true) defer deleteTablet(t, te, dontWant) // Specify the alias as the cell. From c63ab9876e8ab2a2e4321bde6475d32b65811835 Mon Sep 17 00:00:00 2001 From: Malcolm Akinje Date: Wed, 20 Dec 2023 15:06:48 -0500 Subject: [PATCH 7/7] add NOT_FOUND vstream error to retry but omit tablet group (#154) --- go/vt/vtgate/vstream_manager.go | 3 ++- go/vt/vtgate/vstream_manager_test.go | 7 +++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 1d4b0dcd87c..9194327d076 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -733,7 +733,8 @@ func (vs *vstream) shouldRetry(err error) (bool, bool) { // If there is a GTIDSet Mismatch on the tablet, omit it from the candidate // list in the TabletPicker on retry. - if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") { + if (errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch")) || + errCode == vtrpc.Code_NOT_FOUND { return true, true } diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 0a80ef6e3f5..dab397d4621 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -427,6 +427,13 @@ func TestVStreamRetriableErrors(t *testing.T) { shouldRetry: false, ignoreTablet: false, }, + { + name: "not found", + code: vtrpcpb.Code_NOT_FOUND, + msg: "", + shouldRetry: true, + ignoreTablet: true, + }, } commit := []*binlogdatapb.VEvent{