Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support foreign key on update cascade and set null when execute update statement #38652

Merged
merged 11 commits into from
Nov 1, 2022
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func (a *ExecStmt) handleForeignKeyTrigger(ctx context.Context, e Executor, dept
}

func (a *ExecStmt) handleForeignKeyCascade(ctx context.Context, fkc *FKCascadeExec, depth int) error {
if len(fkc.fkValues) == 0 {
if len(fkc.fkValues) == 0 && len(fkc.fkUpdatedValuesMap) == 0 {
return nil
}
if depth > maxForeignKeyCascadeDepth {
Expand Down
4 changes: 4 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2257,6 +2257,10 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
if b.err != nil {
return nil
}
updateExec.fkCascades, b.err = b.buildTblID2FKCascadeExecs(tblID2table, v.FKCascades)
if b.err != nil {
return nil
}
return updateExec
}

Expand Down
360 changes: 358 additions & 2 deletions executor/fktest/foreign_key_test.go

Large diffs are not rendered by default.

133 changes: 114 additions & 19 deletions executor/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,17 @@ type FKCascadeExec struct {
referredFK *model.ReferredFKInfo
childTable *model.TableInfo
fk *model.FKInfo
fkValues [][]types.Datum
// On delete statement, fkValues stores the delete foreign key values.
// On update statement and the foreign key cascade is `SET NULL`, fkValues stores the old foreign key values.
fkValues [][]types.Datum
// new-value-key => UpdatedValuesCouple
fkUpdatedValuesMap map[string]*UpdatedValuesCouple
}

// UpdatedValuesCouple contains the updated new row the old rows, exporting for test.
type UpdatedValuesCouple struct {
NewValues []types.Datum
OldValuesList [][]types.Datum
}

func buildTblID2FKCheckExecs(sctx sessionctx.Context, tblID2Table map[int64]table.Table, tblID2FKChecks map[int64][]*plannercore.FKCheck) (map[int64][]*FKCheckExec, error) {
Expand Down Expand Up @@ -561,12 +571,13 @@ func (b *executorBuilder) buildFKCascadeExec(tbl table.Table, fkCascade *planner
fkValuesSet: set.NewStringSet(),
}
return &FKCascadeExec{
b: b,
fkValueHelper: helper,
tp: fkCascade.Tp,
referredFK: fkCascade.ReferredFK,
childTable: fkCascade.ChildTable.Meta(),
fk: fkCascade.FK,
b: b,
fkValueHelper: helper,
tp: fkCascade.Tp,
referredFK: fkCascade.ReferredFK,
childTable: fkCascade.ChildTable.Meta(),
fk: fkCascade.FK,
fkUpdatedValuesMap: make(map[string]*UpdatedValuesCouple),
}, nil
}

Expand All @@ -579,6 +590,34 @@ func (fkc *FKCascadeExec) onDeleteRow(sc *stmtctx.StatementContext, row []types.
return nil
}

func (fkc *FKCascadeExec) onUpdateRow(sc *stmtctx.StatementContext, oldRow, newRow []types.Datum) error {
oldVals, err := fkc.fetchFKValuesWithCheck(sc, oldRow)
if err != nil || len(oldVals) == 0 {
return err
}
if model.ReferOptionType(fkc.fk.OnUpdate) == model.ReferOptionSetNull {
fkc.fkValues = append(fkc.fkValues, oldVals)
return nil
}
newVals, err := fkc.fetchFKValues(newRow)
if err != nil {
return err
}
newValsKey, err := codec.EncodeKey(sc, nil, newVals...)
if err != nil {
return err
}
couple := fkc.fkUpdatedValuesMap[string(newValsKey)]
if couple == nil {
couple = &UpdatedValuesCouple{
NewValues: newVals,
}
}
couple.OldValuesList = append(couple.OldValuesList, oldVals)
fkc.fkUpdatedValuesMap[string(newValsKey)] = couple
return nil
}

func (fkc *FKCascadeExec) buildExecutor(ctx context.Context) (Executor, error) {
p, err := fkc.buildFKCascadePlan(ctx)
if err != nil || p == nil {
Expand All @@ -591,17 +630,9 @@ func (fkc *FKCascadeExec) buildExecutor(ctx context.Context) (Executor, error) {
var maxHandleFKValueInOneCascade = 1024

func (fkc *FKCascadeExec) buildFKCascadePlan(ctx context.Context) (plannercore.Plan, error) {
if len(fkc.fkValues) == 0 {
if len(fkc.fkValues) == 0 && len(fkc.fkUpdatedValuesMap) == 0 {
return nil, nil
}
var fkValues [][]types.Datum
if len(fkc.fkValues) <= maxHandleFKValueInOneCascade {
fkValues = fkc.fkValues
fkc.fkValues = nil
} else {
fkValues = fkc.fkValues[:maxHandleFKValueInOneCascade]
fkc.fkValues = fkc.fkValues[maxHandleFKValueInOneCascade:]
}
var indexName model.CIStr
indexForFK := model.FindIndexByColumns(fkc.childTable, fkc.fk.Cols...)
if indexForFK != nil {
Expand All @@ -611,12 +642,24 @@ func (fkc *FKCascadeExec) buildFKCascadePlan(ctx context.Context) (plannercore.P
var err error
switch fkc.tp {
case plannercore.FKCascadeOnDelete:
fkValues := fkc.fetchOnDeleteOrUpdateFKValues()
switch model.ReferOptionType(fkc.fk.OnDelete) {
case model.ReferOptionCascade:
sqlStr, err = GenCascadeDeleteSQL(fkc.referredFK.ChildSchema, fkc.childTable.Name, indexName, fkc.fk, fkValues)
case model.ReferOptionSetNull:
sqlStr, err = GenCascadeSetNullSQL(fkc.referredFK.ChildSchema, fkc.childTable.Name, indexName, fkc.fk, fkValues)
}
case plannercore.FKCascadeOnUpdate:
switch model.ReferOptionType(fkc.fk.OnUpdate) {
case model.ReferOptionCascade:
couple := fkc.fetchUpdatedValuesCouple()
if couple != nil && len(couple.NewValues) != 0 {
sqlStr, err = GenCascadeUpdateSQL(fkc.referredFK.ChildSchema, fkc.childTable.Name, indexName, fkc.fk, couple)
}
case model.ReferOptionSetNull:
fkValues := fkc.fetchOnDeleteOrUpdateFKValues()
sqlStr, err = GenCascadeSetNullSQL(fkc.referredFK.ChildSchema, fkc.childTable.Name, indexName, fkc.fk, fkValues)
}
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -646,6 +689,34 @@ func (fkc *FKCascadeExec) buildFKCascadePlan(ctx context.Context) (plannercore.P
return finalPlan, err
}

func (fkc *FKCascadeExec) fetchOnDeleteOrUpdateFKValues() [][]types.Datum {
var fkValues [][]types.Datum
if len(fkc.fkValues) <= maxHandleFKValueInOneCascade {
fkValues = fkc.fkValues
fkc.fkValues = nil
} else {
fkValues = fkc.fkValues[:maxHandleFKValueInOneCascade]
fkc.fkValues = fkc.fkValues[maxHandleFKValueInOneCascade:]
}
return fkValues
}

func (fkc *FKCascadeExec) fetchUpdatedValuesCouple() *UpdatedValuesCouple {
for k, couple := range fkc.fkUpdatedValuesMap {
if len(couple.OldValuesList) <= maxHandleFKValueInOneCascade {
delete(fkc.fkUpdatedValuesMap, k)
return couple
}
result := &UpdatedValuesCouple{
NewValues: couple.NewValues,
OldValuesList: couple.OldValuesList[:maxHandleFKValueInOneCascade],
}
couple.OldValuesList = couple.OldValuesList[maxHandleFKValueInOneCascade:]
return result
}
return nil
}

// GenCascadeDeleteSQL uses to generate cascade delete SQL, export for test.
func GenCascadeDeleteSQL(schema, table, idx model.CIStr, fk *model.FKInfo, fkValues [][]types.Datum) (string, error) {
buf := bytes.NewBuffer(make([]byte, 0, 48+8*len(fkValues)))
Expand All @@ -669,6 +740,19 @@ func GenCascadeDeleteSQL(schema, table, idx model.CIStr, fk *model.FKInfo, fkVal

// GenCascadeSetNullSQL uses to generate foreign key `SET NULL` SQL, export for test.
func GenCascadeSetNullSQL(schema, table, idx model.CIStr, fk *model.FKInfo, fkValues [][]types.Datum) (string, error) {
newValues := make([]types.Datum, len(fk.Cols))
for i := range fk.Cols {
newValues[i] = types.NewDatum(nil)
}
couple := &UpdatedValuesCouple{
NewValues: newValues,
OldValuesList: fkValues,
}
return GenCascadeUpdateSQL(schema, table, idx, fk, couple)
}

// GenCascadeUpdateSQL uses to generate cascade update SQL, export for test.
func GenCascadeUpdateSQL(schema, table, idx model.CIStr, fk *model.FKInfo, couple *UpdatedValuesCouple) (string, error) {
buf := bytes.NewBuffer(nil)
buf.WriteString("UPDATE `")
buf.WriteString(schema.L)
Expand All @@ -686,10 +770,15 @@ func GenCascadeSetNullSQL(schema, table, idx model.CIStr, fk *model.FKInfo, fkVa
if i > 0 {
buf.WriteString(", ")
}
buf.WriteString("`" + col.L + "`")
buf.WriteString("=NULL")
buf.WriteString("`" + col.L)
buf.WriteString("` = ")
val, err := genFKValueString(couple.NewValues[i])
if err != nil {
return "", err
}
buf.WriteString(val)
}
err := genCascadeSQLWhereCondition(buf, fk, fkValues)
err := genCascadeSQLWhereCondition(buf, fk, couple.OldValuesList)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -728,6 +817,12 @@ func genCascadeSQLWhereCondition(buf *bytes.Buffer, fk *model.FKInfo, fkValues [
}

func genFKValueString(v types.Datum) (string, error) {
switch v.Kind() {
case types.KindNull:
return "NULL", nil
case types.KindMysqlBit:
return v.GetBinaryLiteral().ToBitLiteralString(true), nil
}
val, err := v.ToString()
if err != nil {
return "", err
Expand Down
2 changes: 1 addition & 1 deletion executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo
}

newData := e.row4Update[:len(oldRow)]
_, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks)
_, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ type InsertValues struct {
isLoadData bool
txnInUse sync.Mutex
// fkChecks contains the foreign key checkers.
fkChecks []*FKCheckExec
fkChecks []*FKCheckExec
fkCascades []*FKCascadeExec
}

type defaultVal struct {
Expand Down
13 changes: 10 additions & 3 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type UpdateExec struct {
matches []bool
// fkChecks contains the foreign key checkers. the map is tableID -> []*FKCheckExec
fkChecks map[int64][]*FKCheckExec
// fkCascades contains the foreign key cascade. the map is tableID -> []*FKCascadeExec
fkCascades map[int64][]*FKCascadeExec
}

// prepare `handles`, `tableUpdatable`, `changed` to avoid re-computations.
Expand Down Expand Up @@ -194,7 +196,8 @@ func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema, row, n

// Update row
fkChecks := e.fkChecks[content.TblID]
changed, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false, e.memTracker, fkChecks)
fkCascades := e.fkCascades[content.TblID]
changed, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false, e.memTracker, fkChecks, fkCascades)
if err1 == nil {
_, exist := e.updatedRowKeys[content.Start].Get(handle)
memDelta := e.updatedRowKeys[content.Start].Set(handle, changed)
Expand Down Expand Up @@ -546,10 +549,14 @@ func (e *UpdateExec) GetFKChecks() []*FKCheckExec {

// GetFKCascades implements WithForeignKeyTrigger interface.
func (e *UpdateExec) GetFKCascades() []*FKCascadeExec {
return nil
fkCascades := make([]*FKCascadeExec, 0, len(e.fkChecks))
for _, fkc := range e.fkCascades {
fkCascades = append(fkCascades, fkc...)
}
return fkCascades
}

// HasFKCascades implements WithForeignKeyTrigger interface.
func (e *UpdateExec) HasFKCascades() bool {
return false
return len(e.fkCascades) > 0
}
8 changes: 7 additions & 1 deletion executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (
// 1. changed (bool) : does the update really change the row values. e.g. update set i = 1 where i = 1;
// 2. err (error) : error in the update.
func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, modified []bool, t table.Table,
onDup bool, memTracker *memory.Tracker, fkChecks []*FKCheckExec) (bool, error) {
onDup bool, memTracker *memory.Tracker, fkChecks []*FKCheckExec, fkCascades []*FKCascadeExec) (bool, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.updateRecord", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand Down Expand Up @@ -220,6 +220,12 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
return false, err
}
}
for _, fkc := range fkCascades {
err := fkc.onUpdateRow(sc, oldData, newData)
if err != nil {
return false, err
}
}
if onDup {
sc.AddAffectedRows(2)
} else {
Expand Down
3 changes: 2 additions & 1 deletion planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ type Update struct {

tblID2Table map[int64]table.Table

FKChecks map[int64][]*FKCheck
FKChecks map[int64][]*FKCheck
FKCascades map[int64][]*FKCascade
}

// MemoryUsage return the memory usage of Update
Expand Down
Loading