Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new placement Vindex strategy #77

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions go/vt/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ func KeyRangeAdd(first, second *topodatapb.KeyRange) (*topodatapb.KeyRange, bool
if first == nil || second == nil {
return nil, false
}
if len(first.End) != 0 && bytes.Equal(first.End, second.Start) {
if len(first.End) != 0 && bytes.Equal(addPadding(first.End), addPadding(second.Start)) {
return &topodatapb.KeyRange{Start: first.Start, End: second.End}, true
}
if len(second.End) != 0 && bytes.Equal(second.End, first.Start) {
if len(second.End) != 0 && bytes.Equal(addPadding(second.End), addPadding(first.Start)) {
return &topodatapb.KeyRange{Start: second.Start, End: first.End}, true
}
return nil, false
Expand All @@ -127,8 +127,8 @@ func KeyRangeContains(kr *topodatapb.KeyRange, id []byte) bool {
if kr == nil {
return true
}
return bytes.Compare(kr.Start, id) <= 0 &&
(len(kr.End) == 0 || bytes.Compare(id, kr.End) < 0)
return bytes.Compare(addPadding(kr.Start), addPadding(id)) <= 0 &&
(len(kr.End) == 0 || bytes.Compare(addPadding(id), addPadding(kr.End)) < 0)
}

// ParseKeyRangeParts parses a start and end hex values and build a proto KeyRange
Expand Down Expand Up @@ -202,7 +202,7 @@ func KeyRangeStartSmaller(left, right *topodatapb.KeyRange) bool {
if right == nil {
return false
}
return bytes.Compare(left.Start, right.Start) < 0
return bytes.Compare(addPadding(left.Start), addPadding(right.Start)) < 0
}

// KeyRangeStartEqual returns true if both key ranges have the same start
Expand Down Expand Up @@ -250,8 +250,8 @@ func KeyRangesIntersect(first, second *topodatapb.KeyRange) bool {
if first == nil || second == nil {
return true
}
return (len(first.End) == 0 || bytes.Compare(second.Start, first.End) < 0) &&
(len(second.End) == 0 || bytes.Compare(first.Start, second.End) < 0)
return (len(first.End) == 0 || bytes.Compare(addPadding(second.Start), addPadding(first.End)) < 0) &&
(len(second.End) == 0 || bytes.Compare(addPadding(first.Start), addPadding(second.End)) < 0)
}

// KeyRangesOverlap returns the overlap between two KeyRanges.
Expand All @@ -270,14 +270,14 @@ func KeyRangesOverlap(first, second *topodatapb.KeyRange) (*topodatapb.KeyRange,
// start with (a,b)
result := proto.Clone(first).(*topodatapb.KeyRange)
// if c > a, then use c
if bytes.Compare(second.Start, first.Start) > 0 {
if bytes.Compare(addPadding(second.Start), addPadding(first.Start)) > 0 {
result.Start = second.Start
}
// if b is maxed out, or
// (d is not maxed out and d < b)
// ^ valid test as neither b nor d are max
// then use d
if len(first.End) == 0 || (len(second.End) != 0 && bytes.Compare(second.End, first.End) < 0) {
if len(first.End) == 0 || (len(second.End) != 0 && bytes.Compare(addPadding(second.End), addPadding(first.End)) < 0) {
result.End = second.End
}
return result, nil
Expand All @@ -297,10 +297,10 @@ func KeyRangeIncludes(big, small *topodatapb.KeyRange) bool {
return len(big.Start) == 0 && len(big.End) == 0
}
// Now we check small.Start >= big.Start, and small.End <= big.End
if len(big.Start) != 0 && bytes.Compare(small.Start, big.Start) < 0 {
if len(big.Start) != 0 && bytes.Compare(addPadding(small.Start), addPadding(big.Start)) < 0 {
return false
}
if len(big.End) != 0 && (len(small.End) == 0 || bytes.Compare(small.End, big.End) > 0) {
if len(big.End) != 0 && (len(small.End) == 0 || bytes.Compare(addPadding(small.End), addPadding(big.End)) > 0) {
return false
}
return true
Expand Down
224 changes: 224 additions & 0 deletions go/vt/vtgate/vindexes/placement.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
Copyright 2022 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

/*

A Vindex which uses a mapping lookup table `placement_map` to set the first `placement_prefix_bytes` of the Keyspace ID
and another Vindex type `placement_sub_vindex_type` (which must support Hashing) as a sub-Vindex to set the rest.
This is suitable for regional sharding (like region_json or region_experimental) but does not require a mapping file,
and can support non-integer types for the sharding column. All parameters are prefixed with `placement_` so as to avoid
conflict, because the `params` map is passed to the sub-Vindex as well.

*/

package vindexes

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"strconv"
"strings"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
)

var (
_ MultiColumn = (*Placement)(nil)

PlacementRequiredParams = []string{
"placement_map",
"placement_prefix_bytes",
"placement_sub_vindex_type",
}
)

func init() {
Register("placement", NewPlacement)
}

type PlacementMap map[string]uint64

type Placement struct {
name string
placementMap PlacementMap
subVindex Vindex
subVindexType string
subVindexName string
prefixBytes int
}

// Parse a string containing a list of delimited string:integer key-value pairs, e.g. "foo:1,bar:2".
func parsePlacementMap(s string) (*PlacementMap, error) {
placementMap := make(PlacementMap)
for _, entry := range strings.Split(s, ",") {
if entry == "" {
continue
}

kv := strings.Split(entry, ":")
if len(kv) != 2 {
return nil, fmt.Errorf("entry: %v; expected key:value", entry)
}
if kv[0] == "" {
return nil, fmt.Errorf("entry: %v; unexpected empty key", entry)
}
if kv[1] == "" {
return nil, fmt.Errorf("entry: %v; unexpected empty value", entry)
}

value, err := strconv.ParseUint(kv[1], 0, 64)
if err != nil {
return nil, fmt.Errorf("entry: %v; %v", entry, err)
}
placementMap[kv[0]] = value
}
return &placementMap, nil
}

func NewPlacement(name string, params map[string]string) (Vindex, error) {
var missingParams []string
for _, param := range PlacementRequiredParams {
if params[param] == "" {
missingParams = append(missingParams, param)
}
}

if len(missingParams) > 0 {
return nil, fmt.Errorf("missing params: %s", strings.Join(missingParams, ", "))
}

placementMap, parseError := parsePlacementMap(params["placement_map"])
if parseError != nil {
return nil, fmt.Errorf("malformed placement_map; %v", parseError)
}

prefixBytes, prefixError := strconv.Atoi(params["placement_prefix_bytes"])
if prefixError != nil {
return nil, prefixError
}

if prefixBytes < 1 || prefixBytes > 7 {
return nil, fmt.Errorf("invalid placement_prefix_bytes: %v; expected integer between 1 and 7", prefixBytes)
}

subVindexType := params["placement_sub_vindex_type"]
subVindexName := fmt.Sprintf("%s_sub_vindex", name)
subVindex, createVindexError := CreateVindex(subVindexType, subVindexName, params)
if createVindexError != nil {
return nil, fmt.Errorf("invalid placement_sub_vindex_type: %v", createVindexError)
}

// TODO: Should we support MultiColumn Vindex?
if _, subVindexSupportsHashing := subVindex.(Hashing); !subVindexSupportsHashing {
return nil, fmt.Errorf("invalid placement_sub_vindex_type: %v; does not support the Hashing interface", createVindexError)
}

return &Placement{
name: name,
placementMap: *placementMap,
subVindex: subVindex,
subVindexType: subVindexType,
subVindexName: subVindexName,
prefixBytes: prefixBytes,
}, nil
}

func (p *Placement) String() string {
return p.name
}

func (p *Placement) Cost() int {
return 1
}

func (p *Placement) IsUnique() bool {
return true
}

func (p *Placement) NeedsVCursor() bool {
return false
}

func (p *Placement) PartialVindex() bool {
return true
}

func makeDestinationPrefix(value uint64, prefixBytes int) []byte {
destinationPrefix := make([]byte, 8)
binary.BigEndian.PutUint64(destinationPrefix, value)
if prefixBytes < 8 {
// Shorten the prefix to the desired length.
destinationPrefix = destinationPrefix[(8 - prefixBytes):]
}

return destinationPrefix
}

func (p *Placement) Map(ctx context.Context, vcursor VCursor, rowsColValues [][]sqltypes.Value) ([]key.Destination, error) {
destinations := make([]key.Destination, 0, len(rowsColValues))

for _, row := range rowsColValues {
if len(row) != 1 && len(row) != 2 {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of column values were passed: expected 1-2, got %d", len(row))
}

// Calculate the destination prefix from the placement key which will be the same whether this is a partial
// or full usage of the Vindex.
placementKey := row[0].ToString()
placementDestinationValue, placementMappingFound := p.placementMap[placementKey]
if !placementMappingFound {
destinations = append(destinations, key.DestinationNone{})
continue
}

placementDestinationPrefix := makeDestinationPrefix(placementDestinationValue, p.prefixBytes)

if len(row) == 1 { // Partial Vindex usage with only the placement column provided.
destinations = append(destinations, NewKeyRangeFromPrefix(placementDestinationPrefix))
} else if len(row) == 2 { // Full Vindex usage with the placement column and subVindex column provided.
subVindexValue, hashingError := p.subVindex.(Hashing).Hash(row[1])
if hashingError != nil {
return nil, hashingError // TODO: Should we be less fatal here and use DestinationNone?
}

// Concatenate and add to destinations.
rowDestination := append(placementDestinationPrefix, subVindexValue...)
destinations = append(destinations, key.DestinationKeyspaceID(rowDestination[0:8]))
}
}

return destinations, nil
}

func (p *Placement) Verify(ctx context.Context, vcursor VCursor, rowsColValues [][]sqltypes.Value, keyspaceIDs [][]byte) ([]bool, error) {
result := make([]bool, len(rowsColValues))
destinations, _ := p.Map(ctx, vcursor, rowsColValues)
for i, destination := range destinations {
switch d := destination.(type) {
case key.DestinationKeyspaceID:
result[i] = bytes.Equal(d, keyspaceIDs[i])
default:
result[i] = false
}
}
return result, nil
}
Loading