Skip to content

Commit

Permalink
Disallow Insert with Duplicate key update and Replace Into queries on…
Browse files Browse the repository at this point in the history
… foreign key column, set locks on fk queries (#13953)

Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Sep 13, 2023
1 parent 91f9f35 commit 7d79453
Show file tree
Hide file tree
Showing 20 changed files with 1,442 additions and 1,130 deletions.
1 change: 1 addition & 0 deletions go/vt/sqlparser/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type (
GetOrderBy() OrderBy
GetLimit() *Limit
SetLimit(*Limit)
GetLock() Lock
SetLock(lock Lock)
SetInto(into *SelectInto)
SetWith(with *With)
Expand Down
35 changes: 27 additions & 8 deletions go/vt/sqlparser/ast_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,11 @@ func (node *Select) GetLimit() *Limit {
return node.Limit
}

// GetLock returns the lock clause
func (node *Select) GetLock() Lock {
return node.Lock
}

// SetLock sets the lock clause
func (node *Select) SetLock(lock Lock) {
node.Lock = lock
Expand Down Expand Up @@ -1196,6 +1201,11 @@ func (node *Union) GetColumns() SelectExprs {
return node.Left.GetColumns()
}

// GetLock returns the lock clause
func (node *Union) GetLock() Lock {
return node.Lock
}

// SetLock sets the lock clause
func (node *Union) SetLock(lock Lock) {
node.Lock = lock
Expand Down Expand Up @@ -2533,13 +2543,22 @@ func (v *visitor) visitAllSelects(in SelectStatement, f func(p *Select, idx int)
panic("switch should be exhaustive")
}

func IsNonLiteral(updExprs UpdateExprs) bool {
for _, updateExpr := range updExprs {
switch updateExpr.Expr.(type) {
case *Argument, *NullVal, BoolVal, *Literal:
default:
return true
}
// IsRestrict returns true if the reference action is of restrict type.
func (ra ReferenceAction) IsRestrict() bool {
switch ra {
case Restrict, NoAction, DefaultAction:
return true
default:
return false
}
}

// IsLiteral returns true if the expression is of a literal type.
func IsLiteral(expr Expr) bool {
switch expr.(type) {
case *Argument, *NullVal, BoolVal, *Literal:
return true
default:
return false
}
return false
}
7 changes: 7 additions & 0 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,13 @@ func (c *ParsedComments) Length() int {
return len(c.comments)
}

func (c *ParsedComments) GetComments() Comments {
if c != nil {
return c.comments
}
return nil
}

func (c *ParsedComments) Prepend(comment string) Comments {
if c == nil {
return Comments{comment}
Expand Down
55 changes: 49 additions & 6 deletions go/vt/vtgate/planbuilder/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package planbuilder

import (
querypb "vitess.io/vitess/go/vt/proto/query"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
Expand All @@ -43,18 +45,24 @@ func gen4InsertStmtPlanner(version querypb.ExecuteOptions_PlannerVersion, insStm
// Check single unsharded. Even if the table is for single unsharded but sequence table is used.
// We cannot shortcut here as sequence column needs additional planning.
ks, tables := ctx.SemTable.SingleUnshardedKeyspace()
if ks != nil && tables[0].AutoIncrement == nil {
plan := insertUnshardedShortcut(insStmt, ks, tables)
plan = pushCommentDirectivesOnPlan(plan, insStmt)
return newPlanResult(plan.Primitive(), operators.QualifiedTables(ks, tables)...), nil
fkPlanNeeded := false
if ks != nil {
noAutoInc := tables[0].AutoIncrement == nil
fkPlanNeeded = fkManagementRequiredForInsert(ctx, tables[0], sqlparser.UpdateExprs(insStmt.OnDup), insStmt.Action == sqlparser.ReplaceAct)
if noAutoInc && !fkPlanNeeded {
plan := insertUnshardedShortcut(insStmt, ks, tables)
plan = pushCommentDirectivesOnPlan(plan, insStmt)
return newPlanResult(plan.Primitive(), operators.QualifiedTables(ks, tables)...), nil
}
}

tblInfo, err := ctx.SemTable.TableInfoFor(ctx.SemTable.TableSetFor(insStmt.Table))
if err != nil {
return nil, err
}
if tblInfo.GetVindexTable().Keyspace.Sharded && ctx.SemTable.NotUnshardedErr != nil {
return nil, ctx.SemTable.NotUnshardedErr

if err = errOutIfPlanCannotBeConstructed(ctx, tblInfo.GetVindexTable(), insStmt, fkPlanNeeded); err != nil {
return nil, err
}

err = queryRewrite(ctx.SemTable, reservedVars, insStmt)
Expand Down Expand Up @@ -83,6 +91,41 @@ func gen4InsertStmtPlanner(version querypb.ExecuteOptions_PlannerVersion, insStm
return newPlanResult(plan.Primitive(), operators.TablesUsed(op)...), nil
}

func errOutIfPlanCannotBeConstructed(ctx *plancontext.PlanningContext, vTbl *vindexes.Table, insStmt *sqlparser.Insert, fkPlanNeeded bool) error {
if vTbl.Keyspace.Sharded && ctx.SemTable.NotUnshardedErr != nil {
return ctx.SemTable.NotUnshardedErr
}
if insStmt.Action != sqlparser.ReplaceAct {
return nil
}
if fkPlanNeeded {
return vterrors.VT12001("REPLACE INTO with foreign keys")
}
return nil
}

// TODO: Handle all this in semantic analysis.
func fkManagementRequiredForInsert(ctx *plancontext.PlanningContext, vTbl *vindexes.Table, updateExprs sqlparser.UpdateExprs, replace bool) bool {
ksMode, err := ctx.VSchema.ForeignKeyMode(vTbl.Keyspace.Name)
if err != nil || ksMode != vschemapb.Keyspace_FK_MANAGED {
return false
}

if len(vTbl.ParentFKsNeedsHandling(ctx.VerifyAllFKs, "")) > 0 {
return true
}

childFks := vTbl.ChildFKsNeedsHandling(ctx.VerifyAllFKs, vindexes.UpdateAction)
if len(childFks) > 0 && replace {
return true
}

// Check if any column in the parent table is being updated which has a child foreign key.
return columnModified(updateExprs, func(expr *sqlparser.UpdateExpr) ([]vindexes.ParentFKInfo, []vindexes.ChildFKInfo) {
return nil, childFks
})
}

func insertUnshardedShortcut(stmt *sqlparser.Insert, ks *vindexes.Keyspace, tables []*vindexes.Table) logicalPlan {
eIns := &engine.Insert{}
eIns.Keyspace = ks
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,15 @@ func transformRoutePlan(ctx *plancontext.PlanningContext, op *operators.Route) (

replaceSubQuery(ctx, stmt)

if stmtWithComments, ok := stmt.(sqlparser.Commented); ok && op.Comments != nil {
stmtWithComments.SetComments(op.Comments.GetComments())
}

switch stmt := stmt.(type) {
case sqlparser.SelectStatement:
if op.Lock != sqlparser.NoLock {
stmt.SetLock(op.Lock)
}
return buildRouteLogicalPlan(ctx, op, stmt)
case *sqlparser.Update:
return buildUpdateLogicalPlan(ctx, op, dmlOp, stmt)
Expand Down
Loading

0 comments on commit 7d79453

Please sign in to comment.