Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vreplication: reference tables #4839

Merged
merged 3 commits into from
Apr 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go/vt/topotools/rebuild_vschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func TestRebuildVSchema(t *testing.T) {
},
Tables: map[string]*vschemapb.Table{
"table1": {
Type: "sequence",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did this actually work earlier? I would have expected it to throw an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It worked earlier, and two users ran into issues when they accidentally deployed sequences on a sharded keyspace. So, I fixed it as part of this change.

ColumnVindexes: []*vschemapb.ColumnVindex{
{
Column: "column1",
Expand Down
42 changes: 21 additions & 21 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ const (
SelectNext
// SelectDBA is for executing a DBA statement.
SelectDBA
// SelectReference is for fetching from a reference table.
SelectReference
)

var routeName = map[RouteOpcode]string{
Expand All @@ -181,6 +183,7 @@ var routeName = map[RouteOpcode]string{
SelectScatter: "SelectScatter",
SelectNext: "SelectNext",
SelectDBA: "SelectDBA",
SelectReference: "SelectReference",
}

var (
Expand Down Expand Up @@ -212,16 +215,13 @@ func (route *Route) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVa
}

func (route *Route) execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
switch route.Opcode {
case SelectNext, SelectDBA:
return execAnyShard(vcursor, route.Query, bindVars, route.Keyspace)
}

var rss []*srvtopo.ResolvedShard
var bvs []map[string]*querypb.BindVariable
var err error
switch route.Opcode {
case SelectUnsharded, SelectScatter:
case SelectUnsharded, SelectNext, SelectDBA, SelectReference:
rss, bvs, err = route.paramsAnyShard(vcursor, bindVars)
case SelectScatter:
rss, bvs, err = route.paramsAllShards(vcursor, bindVars)
case SelectEqual, SelectEqualUnique:
rss, bvs, err = route.paramsSelectEqual(vcursor, bindVars)
Expand Down Expand Up @@ -278,7 +278,9 @@ func (route *Route) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.
defer cancel()
}
switch route.Opcode {
case SelectUnsharded, SelectScatter:
case SelectUnsharded, SelectNext, SelectDBA, SelectReference:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is adding SelectNext and SelectDBA here enabled by vreplication, or is this an incidental change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are essentially unreachable code paths or impractical use cases. But I decided to add these for the sake of correctness and completeness. Otherwise, it becomes hard to reason about why these got left out.

rss, bvs, err = route.paramsAnyShard(vcursor, bindVars)
case SelectScatter:
rss, bvs, err = route.paramsAllShards(vcursor, bindVars)
case SelectEqual, SelectEqualUnique:
rss, bvs, err = route.paramsSelectEqual(vcursor, bindVars)
Expand Down Expand Up @@ -343,6 +345,18 @@ func (route *Route) paramsAllShards(vcursor VCursor, bindVars map[string]*queryp
return rss, multiBindVars, nil
}

func (route *Route) paramsAnyShard(vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) {
rss, _, err := vcursor.ResolveDestinations(route.Keyspace.Name, nil, []key.Destination{key.DestinationAnyShard{}})
if err != nil {
return nil, nil, vterrors.Wrap(err, "paramsAnyShard")
}
multiBindVars := make([]map[string]*querypb.BindVariable, len(rss))
for i := range multiBindVars {
multiBindVars[i] = bindVars
}
return rss, multiBindVars, nil
}

func (route *Route) paramsSelectEqual(vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) {
key, err := route.Values[0].ResolveValue(bindVars)
if err != nil {
Expand Down Expand Up @@ -453,20 +467,6 @@ func resolveSingleShard(vcursor VCursor, vindex vindexes.Vindex, keyspace *vinde
return rss[0], ksid, nil
}

func execAnyShard(vcursor VCursor, query string, bindVars map[string]*querypb.BindVariable, keyspace *vindexes.Keyspace) (*sqltypes.Result, error) {
rss, _, err := vcursor.ResolveDestinations(keyspace.Name, nil, []key.Destination{key.DestinationAnyShard{}})
if err != nil {
// TODO(alainjobart): this eats the error code. Use vterrors.Wrapf instead.
// And audit the entire file for it.
return nil, fmt.Errorf("execAnyShard: %v", err)
}
if len(rss) != 1 {
// This code is unreachable. It's just a sanity check.
return nil, fmt.Errorf("no shards for keyspace: %s", keyspace.Name)
}
return vcursor.ExecuteStandalone(query, bindVars, rss[0])
}

func execShard(vcursor VCursor, query string, bindVars map[string]*querypb.BindVariable, rs *srvtopo.ResolvedShard, isDML, canAutocommit bool) (*sqltypes.Result, error) {
autocommit := canAutocommit && vcursor.AutocommitApproval()
result, errs := vcursor.ExecuteMultiShard([]*srvtopo.ResolvedShard{rs}, []*querypb.BoundQuery{
Expand Down
70 changes: 56 additions & 14 deletions go/vt/vtgate/engine/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestSelectUnsharded(t *testing.T) {
t.Fatal(err)
}
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ResolveDestinations ks [] Destinations:DestinationAnyShard()`,
`ExecuteMultiShard ks.0: dummy_select {} false false`,
})
expectResult(t, "sel.Execute", result, defaultSelectResult)
Expand All @@ -65,7 +65,7 @@ func TestSelectUnsharded(t *testing.T) {
t.Fatal(err)
}
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ResolveDestinations ks [] Destinations:DestinationAnyShard()`,
`StreamExecuteMulti dummy_select ks.0: {} `,
})
expectResult(t, "sel.StreamExecute", result, defaultSelectResult)
Expand Down Expand Up @@ -446,13 +446,17 @@ func TestSelectNext(t *testing.T) {
}
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAnyShard()`,
`ExecuteStandalone dummy_select ks -20`,
`ExecuteMultiShard ks.-20: dummy_select {} false false`,
})
expectResult(t, "sel.Execute", result, defaultSelectResult)

vc.Rewind()
_, err = wrapStreamExecute(sel, vc, map[string]*querypb.BindVariable{}, false)
expectError(t, "sel.StreamExecute", err, `query "dummy_select" cannot be used for streaming`)
result, _ = wrapStreamExecute(sel, vc, map[string]*querypb.BindVariable{}, false)
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAnyShard()`,
`StreamExecuteMulti dummy_select ks.-20: {} `,
})
expectResult(t, "sel.StreamExecute", result, defaultSelectResult)
}

func TestSelectDBA(t *testing.T) {
Expand All @@ -476,13 +480,51 @@ func TestSelectDBA(t *testing.T) {
}
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAnyShard()`,
`ExecuteStandalone dummy_select ks -20`,
`ExecuteMultiShard ks.-20: dummy_select {} false false`,
})
expectResult(t, "sel.Execute", result, defaultSelectResult)

vc.Rewind()
_, err = wrapStreamExecute(sel, vc, map[string]*querypb.BindVariable{}, false)
expectError(t, "sel.StreamExecute", err, `query "dummy_select" cannot be used for streaming`)
result, _ = wrapStreamExecute(sel, vc, map[string]*querypb.BindVariable{}, false)
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAnyShard()`,
`StreamExecuteMulti dummy_select ks.-20: {} `,
})
expectResult(t, "sel.StreamExecute", result, defaultSelectResult)
}

func TestSelectReference(t *testing.T) {
sel := NewRoute(
SelectReference,
&vindexes.Keyspace{
Name: "ks",
Sharded: true,
},
"dummy_select",
"dummy_select_field",
)

vc := &loggingVCursor{
shards: []string{"-20", "20-"},
results: []*sqltypes.Result{defaultSelectResult},
}
result, err := sel.Execute(vc, map[string]*querypb.BindVariable{}, false)
if err != nil {
t.Fatal(err)
}
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAnyShard()`,
`ExecuteMultiShard ks.-20: dummy_select {} false false`,
})
expectResult(t, "sel.Execute", result, defaultSelectResult)

vc.Rewind()
result, _ = wrapStreamExecute(sel, vc, map[string]*querypb.BindVariable{}, false)
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAnyShard()`,
`StreamExecuteMulti dummy_select ks.-20: {} `,
})
expectResult(t, "sel.StreamExecute", result, defaultSelectResult)
}

func TestRouteGetFields(t *testing.T) {
Expand Down Expand Up @@ -564,7 +606,7 @@ func TestRouteSort(t *testing.T) {
t.Fatal(err)
}
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ResolveDestinations ks [] Destinations:DestinationAnyShard()`,
`ExecuteMultiShard ks.0: dummy_select {} false false`,
})
wantResult := sqltypes.MakeTestResult(
Expand Down Expand Up @@ -650,7 +692,7 @@ func TestRouteSortTruncate(t *testing.T) {
t.Fatal(err)
}
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ResolveDestinations ks [] Destinations:DestinationAnyShard()`,
`ExecuteMultiShard ks.0: dummy_select {} false false`,
})
wantResult := sqltypes.MakeTestResult(
Expand Down Expand Up @@ -696,7 +738,7 @@ func TestRouteStreamTruncate(t *testing.T) {
t.Fatal(err)
}
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ResolveDestinations ks [] Destinations:DestinationAnyShard()`,
`ExecuteMultiShard ks.0: dummy_select {} false false`,
})
wantResult := sqltypes.MakeTestResult(
Expand Down Expand Up @@ -743,7 +785,7 @@ func TestRouteStreamSortTruncate(t *testing.T) {
t.Fatal(err)
}
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ResolveDestinations ks [] Destinations:DestinationAnyShard()`,
`StreamExecuteMulti dummy_select ks.0: {} `,
})

Expand Down Expand Up @@ -774,11 +816,11 @@ func TestParamsFail(t *testing.T) {

vc := &loggingVCursor{shardErr: errors.New("shard error")}
_, err := sel.Execute(vc, map[string]*querypb.BindVariable{}, false)
expectError(t, "sel.Execute err", err, "paramsAllShards: shard error")
expectError(t, "sel.Execute err", err, "paramsAnyShard: shard error")

vc.Rewind()
_, err = wrapStreamExecute(sel, vc, map[string]*querypb.BindVariable{}, false)
expectError(t, "sel.StreamExecute err", err, "paramsAllShards: shard error")
expectError(t, "sel.StreamExecute err", err, "paramsAnyShard: shard error")
}

func TestExecFail(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ func (pb *primitiveBuilder) buildTablePrimitive(tableExpr *sqlparser.AliasedTabl
}
var eroute *engine.Route
switch {
case vst.Type == vindexes.TypeSequence:
eroute = engine.NewSimpleRoute(engine.SelectNext, vst.Keyspace)
case vst.Type == vindexes.TypeReference:
eroute = engine.NewSimpleRoute(engine.SelectReference, vst.Keyspace)
case !vst.Keyspace.Sharded:
eroute = engine.NewSimpleRoute(engine.SelectUnsharded, vst.Keyspace)
case vst.Pinned == nil:
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtgate/planbuilder/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ func loadSchema(t *testing.T, filename string) *vindexes.VSchema {
if err != nil {
t.Fatal(err)
}
for _, ks := range vschema.Keyspaces {
if ks.Error != nil {
t.Fatal(ks.Error)
}
}
return vschema
}

Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtgate/planbuilder/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ func (rb *route) SetGroupBy(groupBy sqlparser.GroupBy) error {

// PushOrderBy sets the order by for the route.
func (rb *route) PushOrderBy(order *sqlparser.Order) error {
// By this time, if any single shard options were already present,
// multi-sharded options would have already been removed. So, this
// call is only for checking if the route has single shard options.
if rb.removeMultishardOptions() {
rb.Select.AddOrder(order)
return nil
Expand Down Expand Up @@ -532,7 +535,7 @@ outer:
func (rb *route) removeMultishardOptions() bool {
return rb.removeOptions(func(ro *routeOption) bool {
switch ro.eroute.Opcode {
case engine.SelectUnsharded, engine.SelectDBA, engine.SelectNext, engine.SelectEqualUnique:
case engine.SelectUnsharded, engine.SelectDBA, engine.SelectNext, engine.SelectEqualUnique, engine.SelectReference:
return true
}
return false
Expand Down
11 changes: 10 additions & 1 deletion go/vt/vtgate/planbuilder/route_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ func (ro *routeOption) canMerge(rro *routeOption, customCheck func() bool) bool
if ro.eroute.Keyspace.Name != rro.eroute.Keyspace.Name {
return false
}
if rro.eroute.Opcode == engine.SelectReference {
// Any opcode can join with a reference table.
return true
}
switch ro.eroute.Opcode {
case engine.SelectUnsharded, engine.SelectDBA:
return ro.eroute.Opcode == rro.eroute.Opcode
Expand All @@ -144,6 +148,10 @@ func (ro *routeOption) canMerge(rro *routeOption, customCheck func() bool) bool
if rro.eroute.Opcode == engine.SelectEqualUnique && ro.eroute.Vindex == rro.eroute.Vindex && valEqual(ro.condition, rro.condition) {
return true
}
case engine.SelectReference:
// TODO(sougou): this can be changed to true, but we'll have
// to merge against rro insteal of ro.
return false
case engine.SelectNext:
return false
}
Expand Down Expand Up @@ -188,7 +196,7 @@ func (ro *routeOption) canMergeOnFilter(pb *primitiveBuilder, rro *routeOption,
// the route.
func (ro *routeOption) UpdatePlan(pb *primitiveBuilder, filter sqlparser.Expr) {
switch ro.eroute.Opcode {
case engine.SelectUnsharded, engine.SelectNext, engine.SelectDBA:
case engine.SelectUnsharded, engine.SelectNext, engine.SelectDBA, engine.SelectReference:
return
}
opcode, vindex, values := ro.computePlan(pb, filter)
Expand Down Expand Up @@ -293,6 +301,7 @@ var planCost = map[engine.RouteOpcode]int{
engine.SelectUnsharded: 0,
engine.SelectNext: 0,
engine.SelectDBA: 0,
engine.SelectReference: 0,
engine.SelectEqualUnique: 1,
engine.SelectIN: 2,
engine.SelectEqual: 3,
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtgate/planbuilder/route_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func TestIsBetterThan(t *testing.T) {
left: engine.SelectUnsharded,
right: engine.SelectDBA,
out: false,
}, {
left: engine.SelectUnsharded,
right: engine.SelectReference,
out: false,
}, {
left: engine.SelectUnsharded,
right: engine.SelectEqualUnique,
Expand All @@ -65,6 +69,10 @@ func TestIsBetterThan(t *testing.T) {
left: engine.SelectDBA,
right: engine.SelectUnsharded,
out: false,
}, {
left: engine.SelectReference,
right: engine.SelectUnsharded,
out: false,
}, {
left: engine.SelectEqualUnique,
right: engine.SelectUnsharded,
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ func (pb *primitiveBuilder) pushSelectRoutes(selectExprs sqlparser.SelectExprs)
return nil, errors.New("unsupported: SELECT NEXT query in cross-shard query")
}
for _, ro := range rb.routeOptions {
if ro.eroute.Opcode != engine.SelectUnsharded {
return nil, errors.New("NEXT used on a sharded table")
if ro.eroute.Opcode != engine.SelectNext {
return nil, errors.New("NEXT used on a non-sequence table")
}
ro.eroute.Opcode = engine.SelectNext
}
Expand Down
15 changes: 15 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,21 @@
}
}

# oreder by on a reference table
"select col from ref order by col"
{
"Original": "select col from ref order by col",
"Instructions": {
"Opcode": "SelectReference",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"Query": "select col from ref order by col asc",
"FieldQuery": "select col from ref where 1 != 1"
}
}

# Group by invalid column number (code is duplicated from symab).
"select id from user group by 1.1"
"column number is not an int"
Expand Down
Loading