Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slack-15.0: VStreams fix backports/patches, pt. 2 #380

Merged
merged 7 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func testVStreamCellFlag(t *testing.T) {
flags := &vtgatepb.VStreamFlags{}
if tc.cells != "" {
flags.Cells = tc.cells
flags.CellPreference = "onlyspecified"
}

ctx2, cancel := context.WithTimeout(ctx, 30*time.Second)
Expand Down
267 changes: 236 additions & 31 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,40 @@ import (
"vitess.io/vitess/go/vt/vterrors"
)

type TabletPickerCellPreference int

const (
// PreferLocalWithAlias gives preference to the local cell first, then specified cells, if any.
// This is the default when no other option is provided.
TabletPickerCellPreference_PreferLocalWithAlias TabletPickerCellPreference = iota
// OnlySpecified only picks tablets from the list of cells given.
TabletPickerCellPreference_OnlySpecified
)

type TabletPickerTabletOrder int

const (
// All provided tablet types are given equal priority. This is the default.
TabletPickerTabletOrder_Any TabletPickerTabletOrder = iota
// Provided tablet types are expected to be prioritized in the given order.
TabletPickerTabletOrder_InOrder
)

var (
tabletPickerRetryDelay = 30 * time.Second
muTabletPickerRetryDelay sync.Mutex
globalTPStats *tabletPickerStats
inOrderHint = "in_order:"

tabletPickerCellPreferenceMap = map[string]TabletPickerCellPreference{
"preferlocalwithalias": TabletPickerCellPreference_PreferLocalWithAlias,
"onlyspecified": TabletPickerCellPreference_OnlySpecified,
}

tabletPickerTabletOrderMap = map[string]TabletPickerTabletOrder{
"any": TabletPickerTabletOrder_Any,
"inorder": TabletPickerTabletOrder_InOrder,
}
)

// GetTabletPickerRetryDelay synchronizes changes to tabletPickerRetryDelay. Used in tests only at the moment
Expand All @@ -62,18 +91,66 @@ func SetTabletPickerRetryDelay(delay time.Duration) {
tabletPickerRetryDelay = delay
}

type TabletPickerOptions struct {
CellPreference string
TabletOrder string
}

func parseTabletPickerCellPreferenceString(str string) (TabletPickerCellPreference, error) {
// return default if blank
if str == "" {
return TabletPickerCellPreference_PreferLocalWithAlias, nil
}

if c, ok := tabletPickerCellPreferenceMap[strings.ToLower(str)]; ok {
return c, nil
}

return -1, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid cell preference: %v", str)
}

func parseTabletPickerTabletOrderString(str string) (TabletPickerTabletOrder, error) {
// return default if blank
if str == "" {
return TabletPickerTabletOrder_Any, nil
}

if o, ok := tabletPickerTabletOrderMap[strings.ToLower(str)]; ok {
return o, nil
}

return -1, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid tablet order type: %v", str)
}

type localCellInfo struct {
localCell string
cellsInAlias map[string]string
}

// TabletPicker gives a simplified API for picking tablets.
type TabletPicker struct {
ts *topo.Server
cells []string
keyspace string
shard string
tabletTypes []topodatapb.TabletType
inOrder bool
ts *topo.Server
cells []string
keyspace string
shard string
tabletTypes []topodatapb.TabletType
inOrder bool
cellPref TabletPickerCellPreference
localCellInfo localCellInfo
// This map is keyed on the results of TabletAlias.String().
ignoreTablets map[string]struct{}
}

// NewTabletPicker returns a TabletPicker.
func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTypesStr string) (*TabletPicker, error) {
func NewTabletPicker(
ctx context.Context,
ts *topo.Server,
cells []string,
localCell, keyspace, shard, tabletTypesStr string,
options TabletPickerOptions,
ignoreTablets ...*topodatapb.TabletAlias,
) (*TabletPicker, error) {
// Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted.
tabletTypes, inOrder, err := ParseTabletTypesAndOrder(tabletTypesStr)
if err != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to parse list of tablet types: %v", tabletTypesStr)
Expand All @@ -92,19 +169,131 @@ func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTyp
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
fmt.Sprintf("Missing required field(s) for tablet picker: %s", strings.Join(missingFields, ", ")))
}
return &TabletPicker{
ts: ts,
cells: cells,
keyspace: keyspace,
shard: shard,
tabletTypes: tabletTypes,
inOrder: inOrder,
}, nil

// Resolve tablet picker options
cellPref, err := parseTabletPickerCellPreferenceString(options.CellPreference)
if err != nil {
return nil, err
}

// For backward compatibility only parse the options for tablet ordering
// if the in_order hint wasn't already specified. Otherwise it could be overridden.
// We can remove this check once the in_order hint is deprecated.
if !inOrder {
order, err := parseTabletPickerTabletOrderString(options.TabletOrder)
if err != nil {
return nil, err
}
switch order {
case TabletPickerTabletOrder_Any:
inOrder = false
case TabletPickerTabletOrder_InOrder:
inOrder = true
}
}

aliasCellMap := make(map[string]string)
if cellPref == TabletPickerCellPreference_PreferLocalWithAlias {
if localCell == "" {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot have local cell preference without local cell")
}

// Add local cell to the list of cells for tablet picking.
// This will be de-duped later if the local cell already exists in the original list - see: dedupeCells()
cells = append(cells, localCell)
aliasName := topo.GetAliasByCell(ctx, ts, localCell)

// If an alias exists
if aliasName != localCell {
alias, err := ts.GetCellsAlias(ctx, aliasName, false)
if err != nil {
return nil, vterrors.Wrap(err, "error fetching local cell alias")
}

// Add the aliasName to the list of cells for tablet picking.
cells = append(cells, aliasName)

// Create a map of the cells in the alias to make lookup faster later when we're giving preference to these.
// see prioritizeTablets()
for _, c := range alias.Cells {
aliasCellMap[c] = c
}
}
}

tp := &TabletPicker{
ts: ts,
cells: dedupeCells(cells),
localCellInfo: localCellInfo{localCell: localCell, cellsInAlias: aliasCellMap},
keyspace: keyspace,
shard: shard,
tabletTypes: tabletTypes,
inOrder: inOrder,
cellPref: cellPref,
ignoreTablets: make(map[string]struct{}, len(ignoreTablets)),
}

for _, ignoreTablet := range ignoreTablets {
tp.ignoreTablets[ignoreTablet.String()] = struct{}{}
}

return tp, nil

}

// dedupeCells is used to remove duplicates in the cell list in case it is passed in
// and exists in the local cell's alias. Can happen if CellPreference is PreferLocalWithAlias.
func dedupeCells(cells []string) []string {
keys := make(map[string]bool)
dedupedCells := []string{}

for _, c := range cells {
if _, value := keys[c]; !value {
keys[c] = true
dedupedCells = append(dedupedCells, c)
}
}
return dedupedCells
}

// prioritizeTablets orders the candidate pool of tablets based on CellPreference.
// If CellPreference is PreferLocalWithAlias then tablets in the local cell will be prioritized for selection,
// followed by the tablets within the local cell's alias, and finally any others specified by the client.
// If CellPreference is OnlySpecified, then tablets will only be selected randomly from the cells specified by the client.
func (tp *TabletPicker) prioritizeTablets(candidates []*topo.TabletInfo) (sameCell, sameAlias, allOthers []*topo.TabletInfo) {
for _, c := range candidates {
if c.Alias.Cell == tp.localCellInfo.localCell {
sameCell = append(sameCell, c)
} else if _, ok := tp.localCellInfo.cellsInAlias[c.Alias.Cell]; ok {
sameAlias = append(sameAlias, c)
} else {
allOthers = append(allOthers, c)
}
}

return sameCell, sameAlias, allOthers
}

func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo.TabletInfo {
// Sort candidates slice such that tablets appear in same tablet type order as in tp.tabletTypes
orderMap := map[topodatapb.TabletType]int{}
for i, t := range tp.tabletTypes {
orderMap[t] = i
}
sort.Slice(candidates, func(i, j int) bool {
if orderMap[candidates[i].Type] == orderMap[candidates[j].Type] {
// identical tablet types: randomize order of tablets for this type
return rand.Intn(2) == 0 // 50% chance
}
return orderMap[candidates[i].Type] < orderMap[candidates[j].Type]
})

return candidates
}

// PickForStreaming picks an available tablet.
// All tablets that belong to tp.cells are evaluated and one is
// chosen at random.
// Selection is based on CellPreference.
// See prioritizeTablets for prioritization logic.
func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) {
rand.Seed(time.Now().UnixNano())
// keep trying at intervals (tabletPickerRetryDelay) until a tablet is found
Expand All @@ -116,19 +305,30 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
default:
}
candidates := tp.GetMatchingTablets(ctx)
if tp.inOrder {
// Sort candidates slice such that tablets appear in same tablet type order as in tp.tabletTypes
orderMap := map[topodatapb.TabletType]int{}
for i, t := range tp.tabletTypes {
orderMap[t] = i
if tp.cellPref == TabletPickerCellPreference_PreferLocalWithAlias {
sameCellCandidates, sameAliasCandidates, allOtherCandidates := tp.prioritizeTablets(candidates)

if tp.inOrder {
sameCellCandidates = tp.orderByTabletType(sameCellCandidates)
sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates)
allOtherCandidates = tp.orderByTabletType(allOtherCandidates)
} else {
// Randomize candidates
rand.Shuffle(len(sameCellCandidates), func(i, j int) {
sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i]
})
rand.Shuffle(len(sameAliasCandidates), func(i, j int) {
sameAliasCandidates[i], sameAliasCandidates[j] = sameAliasCandidates[j], sameAliasCandidates[i]
})
rand.Shuffle(len(allOtherCandidates), func(i, j int) {
allOtherCandidates[i], allOtherCandidates[j] = allOtherCandidates[j], allOtherCandidates[i]
})
}
sort.Slice(candidates, func(i, j int) bool {
if orderMap[candidates[i].Type] == orderMap[candidates[j].Type] {
// identical tablet types: randomize order of tablets for this type
return rand.Intn(2) == 0 // 50% chance
}
return orderMap[candidates[i].Type] < orderMap[candidates[j].Type]
})

candidates = append(sameCellCandidates, sameAliasCandidates...)
candidates = append(candidates, allOtherCandidates...)
} else if tp.inOrder {
candidates = tp.orderByTabletType(candidates)
} else {
// Randomize candidates
rand.Shuffle(len(candidates), func(i, j int) {
Expand Down Expand Up @@ -179,7 +379,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
log.Errorf("error getting shard %s/%s: %s", tp.keyspace, tp.shard, err.Error())
return nil
}
aliases = append(aliases, si.PrimaryAlias)
if _, ignore := tp.ignoreTablets[si.PrimaryAlias.String()]; !ignore {
aliases = append(aliases, si.PrimaryAlias)
}
} else {
actualCells := make([]string, 0)
for _, cell := range tp.cells {
Expand All @@ -204,6 +406,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
actualCells = append(actualCells, cell)
}
}

for _, cell := range actualCells {
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
Expand All @@ -214,7 +417,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
}

for _, node := range sri.Nodes {
aliases = append(aliases, node.TabletAlias)
if _, ignore := tp.ignoreTablets[node.TabletAlias.String()]; !ignore {
aliases = append(aliases, node.TabletAlias)
}
}
}
}
Expand Down
Loading
Loading