diff --git a/go/vt/vttablet/tabletmanager/vreplication/player_plan.go b/go/vt/vttablet/tabletmanager/vreplication/player_plan.go index 77ecde44d5d..cc3ba11e5f7 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/player_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/player_plan.go @@ -20,6 +20,7 @@ import ( "fmt" "strings" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/sqlparser" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -37,7 +38,7 @@ type PlayerPlan struct { // while analyzing the inital stream request. A tentative plan is built // without knowing the table info. The second incarnation is built when // we receive the field info for a table. At that time, we copy the -// original TablePlan into a separtae map and populate the Fields and +// original TablePlan into a separate map and populate the Fields and // PKCols members. type TablePlan struct { Name string @@ -48,15 +49,6 @@ type TablePlan struct { PKCols []*ColExpr `json:",omitempty"` } -func (tp *TablePlan) findCol(name sqlparser.ColIdent) *ColExpr { - for _, cExpr := range tp.ColExprs { - if cExpr.ColName.Equal(name) { - return cExpr - } - } - return nil -} - // ColExpr describes the processing to be performed to // compute the value of the target table column. type ColExpr struct { @@ -98,12 +90,12 @@ func buildPlayerPlan(filter *binlogdatapb.Filter) (*PlayerPlan, error) { plan.VStreamFilter.Rules[i] = rule continue } - sendRule, tplan, err := buildTablePlan(rule) + sendRule, tp, err := buildTablePlan(rule) if err != nil { return nil, err } plan.VStreamFilter.Rules[i] = sendRule - plan.TablePlans[sendRule.Match] = tplan + plan.TablePlans[sendRule.Match] = tp } return plan, nil } @@ -143,7 +135,7 @@ func buildTablePlan(rule *binlogdatapb.Rule) (*binlogdatapb.Rule, *TablePlan, er return sendRule, &TablePlan{Name: rule.Match}, nil } - tplan := &TablePlan{ + tp := &TablePlan{ Name: rule.Match, } sendSelect := &sqlparser.Select{ @@ -159,17 +151,17 @@ func buildTablePlan(rule *binlogdatapb.Rule) (*binlogdatapb.Rule, *TablePlan, er sendSelect.SelectExprs = append(sendSelect.SelectExprs, selExpr) cExpr.ColNum = len(sendSelect.SelectExprs) - 1 } - tplan.ColExprs = append(tplan.ColExprs, cExpr) + tp.ColExprs = append(tp.ColExprs, cExpr) } if sel.GroupBy != nil { - if err := analyzeGroupBy(sel.GroupBy, tplan); err != nil { + if err := analyzeGroupBy(sel.GroupBy, tp); err != nil { return nil, nil, err } - tplan.OnInsert = InsertIgnore - for _, cExpr := range tplan.ColExprs { + tp.OnInsert = InsertIgnore + for _, cExpr := range tp.ColExprs { if !cExpr.IsGrouped { - tplan.OnInsert = InsertOndup + tp.OnInsert = InsertOndup break } } @@ -178,7 +170,7 @@ func buildTablePlan(rule *binlogdatapb.Rule) (*binlogdatapb.Rule, *TablePlan, er Match: fromTable.String(), Filter: sqlparser.String(sendSelect), } - return sendRule, tplan, nil + return sendRule, tp, nil } func analyzeExpr(selExpr sqlparser.SelectExpr) (sqlparser.SelectExpr, *ColExpr, error) { @@ -226,13 +218,13 @@ func analyzeExpr(selExpr sqlparser.SelectExpr) (sqlparser.SelectExpr, *ColExpr, } } -func analyzeGroupBy(groupBy sqlparser.GroupBy, tplan *TablePlan) error { +func analyzeGroupBy(groupBy sqlparser.GroupBy, tp *TablePlan) error { for _, expr := range groupBy { colname, ok := expr.(*sqlparser.ColName) if !ok { return fmt.Errorf("unexpected: %v", sqlparser.String(expr)) } - cExpr := tplan.findCol(colname.Name) + cExpr := tp.FindCol(colname.Name) if cExpr == nil { return fmt.Errorf("group by expression does not reference an alias in the select list: %v", sqlparser.String(expr)) } @@ -243,3 +235,175 @@ func analyzeGroupBy(groupBy sqlparser.GroupBy, tplan *TablePlan) error { } return nil } + +//-------------------------------------------------------------- +// TablePlan support functions. + +// FindCol finds the ColExpr. It returns nil if not found. +func (tp *TablePlan) FindCol(name sqlparser.ColIdent) *ColExpr { + for _, cExpr := range tp.ColExprs { + if cExpr.ColName.Equal(name) { + return cExpr + } + } + return nil +} + +// GenerateStatement must be called only after Fields and PKCols have been populated. +func (tp *TablePlan) GenerateStatement(rowChange *binlogdatapb.RowChange) string { + // MakeRowTrusted is needed here because because Proto3ToResult is not convenient. + var before, after []sqltypes.Value + if rowChange.Before != nil { + before = sqltypes.MakeRowTrusted(tp.Fields, rowChange.Before) + } + if rowChange.After != nil { + after = sqltypes.MakeRowTrusted(tp.Fields, rowChange.After) + } + var query string + switch { + case before == nil && after != nil: + query = tp.generateInsert(after) + case before != nil && after != nil: + query = tp.generateUpdate(before, after) + case before != nil && after == nil: + query = tp.generateDelete(before) + case before == nil && after == nil: + // unreachable + } + return query +} + +func (tp *TablePlan) generateInsert(after []sqltypes.Value) string { + sql := sqlparser.NewTrackedBuffer(nil) + if tp.OnInsert == InsertIgnore { + sql.Myprintf("insert ignore into %v set ", sqlparser.NewTableIdent(tp.Name)) + } else { + sql.Myprintf("insert into %v set ", sqlparser.NewTableIdent(tp.Name)) + } + tp.generateInsertValues(sql, after) + if tp.OnInsert == InsertOndup { + sql.Myprintf(" on duplicate key update ") + _ = tp.generateUpdateValues(sql, nil, after) + } + return sql.String() +} + +func (tp *TablePlan) generateUpdate(before, after []sqltypes.Value) string { + if tp.OnInsert == InsertIgnore { + return tp.generateInsert(after) + } + sql := sqlparser.NewTrackedBuffer(nil) + sql.Myprintf("update %v set ", sqlparser.NewTableIdent(tp.Name)) + if ok := tp.generateUpdateValues(sql, before, after); !ok { + return "" + } + sql.Myprintf(" where ") + tp.generateWhereValues(sql, before) + return sql.String() +} + +func (tp *TablePlan) generateDelete(before []sqltypes.Value) string { + sql := sqlparser.NewTrackedBuffer(nil) + switch tp.OnInsert { + case InsertOndup: + return tp.generateUpdate(before, nil) + case InsertIgnore: + return "" + default: // insertNormal + sql.Myprintf("delete from %v where ", sqlparser.NewTableIdent(tp.Name)) + tp.generateWhereValues(sql, before) + } + return sql.String() +} + +func (tp *TablePlan) generateInsertValues(sql *sqlparser.TrackedBuffer, after []sqltypes.Value) { + separator := "" + for _, cExpr := range tp.ColExprs { + sql.Myprintf("%s%v=", separator, cExpr.ColName) + separator = ", " + if cExpr.Operation == OpCount { + sql.WriteString("1") + } else { + if cExpr.Operation == OpSum && after[cExpr.ColNum].IsNull() { + sql.WriteString("0") + } else { + encodeValue(sql, after[cExpr.ColNum]) + } + } + } +} + +// generateUpdateValues returns true if at least one value was set. Otherwise, it returns false. +func (tp *TablePlan) generateUpdateValues(sql *sqlparser.TrackedBuffer, before, after []sqltypes.Value) bool { + separator := "" + hasSet := false + for _, cExpr := range tp.ColExprs { + if cExpr.IsGrouped { + continue + } + if len(before) != 0 && len(after) != 0 { + if cExpr.Operation == OpCount { + continue + } + bef := before[cExpr.ColNum] + aft := after[cExpr.ColNum] + // If both are null, there's no change + if bef.IsNull() && aft.IsNull() { + continue + } + // If any one of them is null, something has changed. + if bef.IsNull() || aft.IsNull() { + goto mustSet + } + // Compare content only if none are null. + if bef.ToString() == aft.ToString() { + continue + } + } + mustSet: + sql.Myprintf("%s%v=", separator, cExpr.ColName) + separator = ", " + hasSet = true + if cExpr.Operation == OpCount || cExpr.Operation == OpSum { + sql.Myprintf("%v", cExpr.ColName) + } + if len(before) != 0 { + switch cExpr.Operation { + case OpNone: + if len(after) == 0 { + sql.WriteString("NULL") + } + case OpCount: + sql.WriteString("-1") + case OpSum: + if !before[cExpr.ColNum].IsNull() { + sql.WriteString("-") + encodeValue(sql, before[cExpr.ColNum]) + } + } + } + if len(after) != 0 { + switch cExpr.Operation { + case OpNone: + encodeValue(sql, after[cExpr.ColNum]) + case OpCount: + sql.WriteString("+1") + case OpSum: + if !after[cExpr.ColNum].IsNull() { + sql.WriteString("+") + encodeValue(sql, after[cExpr.ColNum]) + } + } + } + } + return hasSet +} + +func (tp *TablePlan) generateWhereValues(sql *sqlparser.TrackedBuffer, before []sqltypes.Value) { + separator := "" + for _, cExpr := range tp.PKCols { + sql.Myprintf("%s%v=", separator, cExpr.ColName) + separator = " and " + encodeValue(sql, before[cExpr.ColNum]) + } +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/retryable_client.go b/go/vt/vttablet/tabletmanager/vreplication/retryable_client.go deleted file mode 100644 index f5e8eaa4efc..00000000000 --- a/go/vt/vttablet/tabletmanager/vreplication/retryable_client.go +++ /dev/null @@ -1,84 +0,0 @@ -/* -Copyright 2019 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package vreplication - -import ( - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/binlog/binlogplayer" -) - -// retryableClient is a wrapper on binlogplayer.DBClient. -// It allows us to retry a failed transactions on lock errors. -type retryableClient struct { - binlogplayer.DBClient - InTransaction bool - queries []string -} - -func (rt *retryableClient) Begin() error { - if rt.InTransaction { - return nil - } - if err := rt.DBClient.Begin(); err != nil { - return err - } - rt.queries = append(rt.queries, "begin") - rt.InTransaction = true - return nil -} - -func (rt *retryableClient) Commit() error { - if err := rt.DBClient.Commit(); err != nil { - return err - } - rt.InTransaction = false - rt.queries = nil - return nil -} - -func (rt *retryableClient) Rollback() error { - if err := rt.DBClient.Rollback(); err != nil { - return err - } - rt.InTransaction = false - // Don't reset queries to allow for vplayer to retry. - return nil -} - -func (rt *retryableClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) { - if !rt.InTransaction { - rt.queries = []string{query} - } else { - rt.queries = append(rt.queries, query) - } - return rt.DBClient.ExecuteFetch(query, maxrows) -} - -func (rt *retryableClient) Retry() error { - for _, q := range rt.queries { - if q == "begin" { - if err := rt.Begin(); err != nil { - return err - } - continue - } - if _, err := rt.DBClient.ExecuteFetch(q, 10000); err != nil { - return err - } - } - return nil -} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go new file mode 100644 index 00000000000..dcfa3220df6 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go @@ -0,0 +1,99 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "time" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" +) + +// vdbClient is a wrapper on binlogplayer.DBClient. +// It allows us to retry a failed transactions on lock errors. +type vdbClient struct { + binlogplayer.DBClient + stats *binlogplayer.Stats + InTransaction bool + startTime time.Time + queries []string +} + +func newVDBClient(dbclient binlogplayer.DBClient, stats *binlogplayer.Stats) *vdbClient { + return &vdbClient{ + DBClient: dbclient, + stats: stats, + } +} + +func (vc *vdbClient) Begin() error { + if vc.InTransaction { + return nil + } + if err := vc.DBClient.Begin(); err != nil { + return err + } + vc.queries = append(vc.queries, "begin") + vc.InTransaction = true + vc.startTime = time.Now() + return nil +} + +func (vc *vdbClient) Commit() error { + if err := vc.DBClient.Commit(); err != nil { + return err + } + vc.InTransaction = false + vc.queries = nil + vc.stats.Timings.Record(binlogplayer.BlplTransaction, vc.startTime) + return nil +} + +func (vc *vdbClient) Rollback() error { + if err := vc.DBClient.Rollback(); err != nil { + return err + } + vc.InTransaction = false + // Don't reset queries to allow for vplayer to retry. + return nil +} + +func (vc *vdbClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) { + defer vc.stats.Timings.Record(binlogplayer.BlplQuery, time.Now()) + + if !vc.InTransaction { + vc.queries = []string{query} + } else { + vc.queries = append(vc.queries, query) + } + return vc.DBClient.ExecuteFetch(query, maxrows) +} + +func (vc *vdbClient) Retry() error { + for _, q := range vc.queries { + if q == "begin" { + if err := vc.Begin(); err != nil { + return err + } + continue + } + if _, err := vc.DBClient.ExecuteFetch(q, 10000); err != nil { + return err + } + } + return nil +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 1add35a937c..f8a0520d868 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -50,7 +50,7 @@ type vplayer struct { source *binlogdatapb.BinlogSource sourceTablet *topodatapb.Tablet stats *binlogplayer.Stats - dbClient *retryableClient + dbClient *vdbClient // mysqld is used to fetch the local schema. mysqld mysqlctl.MysqlDaemon @@ -76,7 +76,7 @@ func newVPlayer(id uint32, source *binlogdatapb.BinlogSource, sourceTablet *topo source: source, sourceTablet: sourceTablet, stats: stats, - dbClient: &retryableClient{DBClient: dbClient}, + dbClient: newVDBClient(dbClient, stats), mysqld: mysqld, timeLastSaved: time.Now(), tplans: make(map[string]*TablePlan), @@ -426,172 +426,13 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row return fmt.Errorf("unexpected event on table %s", rowEvent.TableName) } for _, change := range rowEvent.RowChanges { - if err := vp.applyRowChange(ctx, tplan, change); err != nil { - return err - } - } - return nil -} - -func (vp *vplayer) applyRowChange(ctx context.Context, tplan *TablePlan, rowChange *binlogdatapb.RowChange) error { - // MakeRowTrusted is needed here because because Proto3ToResult is not convenient. - var before, after []sqltypes.Value - if rowChange.Before != nil { - before = sqltypes.MakeRowTrusted(tplan.Fields, rowChange.Before) - } - if rowChange.After != nil { - after = sqltypes.MakeRowTrusted(tplan.Fields, rowChange.After) - } - var query string - switch { - case before == nil && after != nil: - query = vp.generateInsert(tplan, after) - case before != nil && after != nil: - query = vp.generateUpdate(tplan, before, after) - case before != nil && after == nil: - query = vp.generateDelete(tplan, before) - case before == nil && after == nil: - // unreachable - } - if query == "" { - return nil - } - return vp.exec(ctx, query) -} - -func (vp *vplayer) generateInsert(tplan *TablePlan, after []sqltypes.Value) string { - sql := sqlparser.NewTrackedBuffer(nil) - if tplan.OnInsert == InsertIgnore { - sql.Myprintf("insert ignore into %v set ", sqlparser.NewTableIdent(tplan.Name)) - } else { - sql.Myprintf("insert into %v set ", sqlparser.NewTableIdent(tplan.Name)) - } - vp.writeInsertValues(sql, tplan, after) - if tplan.OnInsert == InsertOndup { - sql.Myprintf(" on duplicate key update ") - _ = vp.writeUpdateValues(sql, tplan, nil, after) - } - return sql.String() -} - -func (vp *vplayer) generateUpdate(tplan *TablePlan, before, after []sqltypes.Value) string { - if tplan.OnInsert == InsertIgnore { - return vp.generateInsert(tplan, after) - } - sql := sqlparser.NewTrackedBuffer(nil) - sql.Myprintf("update %v set ", sqlparser.NewTableIdent(tplan.Name)) - if ok := vp.writeUpdateValues(sql, tplan, before, after); !ok { - return "" - } - sql.Myprintf(" where ") - vp.writeWhereValues(sql, tplan, before) - return sql.String() -} - -func (vp *vplayer) generateDelete(tplan *TablePlan, before []sqltypes.Value) string { - sql := sqlparser.NewTrackedBuffer(nil) - switch tplan.OnInsert { - case InsertOndup: - return vp.generateUpdate(tplan, before, nil) - case InsertIgnore: - return "" - default: // insertNormal - sql.Myprintf("delete from %v where ", sqlparser.NewTableIdent(tplan.Name)) - vp.writeWhereValues(sql, tplan, before) - } - return sql.String() -} - -func (vp *vplayer) writeInsertValues(sql *sqlparser.TrackedBuffer, tplan *TablePlan, after []sqltypes.Value) { - separator := "" - for _, cExpr := range tplan.ColExprs { - sql.Myprintf("%s%v=", separator, cExpr.ColName) - separator = ", " - if cExpr.Operation == OpCount { - sql.WriteString("1") - } else { - if cExpr.Operation == OpSum && after[cExpr.ColNum].IsNull() { - sql.WriteString("0") - } else { - encodeValue(sql, after[cExpr.ColNum]) - } - } - } -} - -// writeUpdateValues returns true if at least one value was set. Otherwise, it returns false. -func (vp *vplayer) writeUpdateValues(sql *sqlparser.TrackedBuffer, tplan *TablePlan, before, after []sqltypes.Value) bool { - separator := "" - hasSet := false - for _, cExpr := range tplan.ColExprs { - if cExpr.IsGrouped { - continue - } - if len(before) != 0 && len(after) != 0 { - if cExpr.Operation == OpCount { - continue - } - bef := before[cExpr.ColNum] - aft := after[cExpr.ColNum] - // If both are null, there's no change - if bef.IsNull() && aft.IsNull() { - continue - } - // If any one of them is null, something has changed. - if bef.IsNull() || aft.IsNull() { - goto mustSet - } - // Compare content only if none are null. - if bef.ToString() == aft.ToString() { - continue - } - } - mustSet: - sql.Myprintf("%s%v=", separator, cExpr.ColName) - separator = ", " - hasSet = true - if cExpr.Operation == OpCount || cExpr.Operation == OpSum { - sql.Myprintf("%v", cExpr.ColName) - } - if len(before) != 0 { - switch cExpr.Operation { - case OpNone: - if len(after) == 0 { - sql.WriteString("NULL") - } - case OpCount: - sql.WriteString("-1") - case OpSum: - if !before[cExpr.ColNum].IsNull() { - sql.WriteString("-") - encodeValue(sql, before[cExpr.ColNum]) - } - } - } - if len(after) != 0 { - switch cExpr.Operation { - case OpNone: - encodeValue(sql, after[cExpr.ColNum]) - case OpCount: - sql.WriteString("+1") - case OpSum: - if !after[cExpr.ColNum].IsNull() { - sql.WriteString("+") - encodeValue(sql, after[cExpr.ColNum]) - } + if query := tplan.GenerateStatement(change); query != "" { + if err := vp.exec(ctx, query); err != nil { + return err } } } - return hasSet -} - -func (vp *vplayer) writeWhereValues(sql *sqlparser.TrackedBuffer, tplan *TablePlan, before []sqltypes.Value) { - separator := "" - for _, cExpr := range tplan.PKCols { - sql.Myprintf("%s%v=", separator, cExpr.ColName) - separator = " and " - encodeValue(sql, before[cExpr.ColNum]) - } + return nil } func (vp *vplayer) updatePos(ts int64) error { @@ -602,6 +443,10 @@ func (vp *vplayer) updatePos(ts int64) error { } vp.unsavedGTID = nil vp.timeLastSaved = time.Now() + vp.stats.SetLastPosition(vp.pos) + if ts != 0 { + vp.stats.SecondsBehindMaster.Set(vp.timeLastSaved.Unix() - ts) + } return nil }