Skip to content

Commit

Permalink
Merge pull request #4586 from planetscale/ss-vrepl
Browse files Browse the repository at this point in the history
vplayer: add stats tracking and some refactor
  • Loading branch information
rafael authored Feb 17, 2019
2 parents 150102e + ccb2813 commit 66c45d9
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 270 deletions.
206 changes: 185 additions & 21 deletions go/vt/vttablet/tabletmanager/vreplication/player_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/sqlparser"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand All @@ -37,7 +38,7 @@ type PlayerPlan struct {
// while analyzing the inital stream request. A tentative plan is built
// without knowing the table info. The second incarnation is built when
// we receive the field info for a table. At that time, we copy the
// original TablePlan into a separtae map and populate the Fields and
// original TablePlan into a separate map and populate the Fields and
// PKCols members.
type TablePlan struct {
Name string
Expand All @@ -48,15 +49,6 @@ type TablePlan struct {
PKCols []*ColExpr `json:",omitempty"`
}

func (tp *TablePlan) findCol(name sqlparser.ColIdent) *ColExpr {
for _, cExpr := range tp.ColExprs {
if cExpr.ColName.Equal(name) {
return cExpr
}
}
return nil
}

// ColExpr describes the processing to be performed to
// compute the value of the target table column.
type ColExpr struct {
Expand Down Expand Up @@ -98,12 +90,12 @@ func buildPlayerPlan(filter *binlogdatapb.Filter) (*PlayerPlan, error) {
plan.VStreamFilter.Rules[i] = rule
continue
}
sendRule, tplan, err := buildTablePlan(rule)
sendRule, tp, err := buildTablePlan(rule)
if err != nil {
return nil, err
}
plan.VStreamFilter.Rules[i] = sendRule
plan.TablePlans[sendRule.Match] = tplan
plan.TablePlans[sendRule.Match] = tp
}
return plan, nil
}
Expand Down Expand Up @@ -143,7 +135,7 @@ func buildTablePlan(rule *binlogdatapb.Rule) (*binlogdatapb.Rule, *TablePlan, er
return sendRule, &TablePlan{Name: rule.Match}, nil
}

tplan := &TablePlan{
tp := &TablePlan{
Name: rule.Match,
}
sendSelect := &sqlparser.Select{
Expand All @@ -159,17 +151,17 @@ func buildTablePlan(rule *binlogdatapb.Rule) (*binlogdatapb.Rule, *TablePlan, er
sendSelect.SelectExprs = append(sendSelect.SelectExprs, selExpr)
cExpr.ColNum = len(sendSelect.SelectExprs) - 1
}
tplan.ColExprs = append(tplan.ColExprs, cExpr)
tp.ColExprs = append(tp.ColExprs, cExpr)
}

if sel.GroupBy != nil {
if err := analyzeGroupBy(sel.GroupBy, tplan); err != nil {
if err := analyzeGroupBy(sel.GroupBy, tp); err != nil {
return nil, nil, err
}
tplan.OnInsert = InsertIgnore
for _, cExpr := range tplan.ColExprs {
tp.OnInsert = InsertIgnore
for _, cExpr := range tp.ColExprs {
if !cExpr.IsGrouped {
tplan.OnInsert = InsertOndup
tp.OnInsert = InsertOndup
break
}
}
Expand All @@ -178,7 +170,7 @@ func buildTablePlan(rule *binlogdatapb.Rule) (*binlogdatapb.Rule, *TablePlan, er
Match: fromTable.String(),
Filter: sqlparser.String(sendSelect),
}
return sendRule, tplan, nil
return sendRule, tp, nil
}

func analyzeExpr(selExpr sqlparser.SelectExpr) (sqlparser.SelectExpr, *ColExpr, error) {
Expand Down Expand Up @@ -226,13 +218,13 @@ func analyzeExpr(selExpr sqlparser.SelectExpr) (sqlparser.SelectExpr, *ColExpr,
}
}

func analyzeGroupBy(groupBy sqlparser.GroupBy, tplan *TablePlan) error {
func analyzeGroupBy(groupBy sqlparser.GroupBy, tp *TablePlan) error {
for _, expr := range groupBy {
colname, ok := expr.(*sqlparser.ColName)
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
cExpr := tplan.findCol(colname.Name)
cExpr := tp.FindCol(colname.Name)
if cExpr == nil {
return fmt.Errorf("group by expression does not reference an alias in the select list: %v", sqlparser.String(expr))
}
Expand All @@ -243,3 +235,175 @@ func analyzeGroupBy(groupBy sqlparser.GroupBy, tplan *TablePlan) error {
}
return nil
}

//--------------------------------------------------------------
// TablePlan support functions.

// FindCol finds the ColExpr. It returns nil if not found.
func (tp *TablePlan) FindCol(name sqlparser.ColIdent) *ColExpr {
for _, cExpr := range tp.ColExprs {
if cExpr.ColName.Equal(name) {
return cExpr
}
}
return nil
}

// GenerateStatement must be called only after Fields and PKCols have been populated.
func (tp *TablePlan) GenerateStatement(rowChange *binlogdatapb.RowChange) string {
// MakeRowTrusted is needed here because because Proto3ToResult is not convenient.
var before, after []sqltypes.Value
if rowChange.Before != nil {
before = sqltypes.MakeRowTrusted(tp.Fields, rowChange.Before)
}
if rowChange.After != nil {
after = sqltypes.MakeRowTrusted(tp.Fields, rowChange.After)
}
var query string
switch {
case before == nil && after != nil:
query = tp.generateInsert(after)
case before != nil && after != nil:
query = tp.generateUpdate(before, after)
case before != nil && after == nil:
query = tp.generateDelete(before)
case before == nil && after == nil:
// unreachable
}
return query
}

func (tp *TablePlan) generateInsert(after []sqltypes.Value) string {
sql := sqlparser.NewTrackedBuffer(nil)
if tp.OnInsert == InsertIgnore {
sql.Myprintf("insert ignore into %v set ", sqlparser.NewTableIdent(tp.Name))
} else {
sql.Myprintf("insert into %v set ", sqlparser.NewTableIdent(tp.Name))
}
tp.generateInsertValues(sql, after)
if tp.OnInsert == InsertOndup {
sql.Myprintf(" on duplicate key update ")
_ = tp.generateUpdateValues(sql, nil, after)
}
return sql.String()
}

func (tp *TablePlan) generateUpdate(before, after []sqltypes.Value) string {
if tp.OnInsert == InsertIgnore {
return tp.generateInsert(after)
}
sql := sqlparser.NewTrackedBuffer(nil)
sql.Myprintf("update %v set ", sqlparser.NewTableIdent(tp.Name))
if ok := tp.generateUpdateValues(sql, before, after); !ok {
return ""
}
sql.Myprintf(" where ")
tp.generateWhereValues(sql, before)
return sql.String()
}

func (tp *TablePlan) generateDelete(before []sqltypes.Value) string {
sql := sqlparser.NewTrackedBuffer(nil)
switch tp.OnInsert {
case InsertOndup:
return tp.generateUpdate(before, nil)
case InsertIgnore:
return ""
default: // insertNormal
sql.Myprintf("delete from %v where ", sqlparser.NewTableIdent(tp.Name))
tp.generateWhereValues(sql, before)
}
return sql.String()
}

func (tp *TablePlan) generateInsertValues(sql *sqlparser.TrackedBuffer, after []sqltypes.Value) {
separator := ""
for _, cExpr := range tp.ColExprs {
sql.Myprintf("%s%v=", separator, cExpr.ColName)
separator = ", "
if cExpr.Operation == OpCount {
sql.WriteString("1")
} else {
if cExpr.Operation == OpSum && after[cExpr.ColNum].IsNull() {
sql.WriteString("0")
} else {
encodeValue(sql, after[cExpr.ColNum])
}
}
}
}

// generateUpdateValues returns true if at least one value was set. Otherwise, it returns false.
func (tp *TablePlan) generateUpdateValues(sql *sqlparser.TrackedBuffer, before, after []sqltypes.Value) bool {
separator := ""
hasSet := false
for _, cExpr := range tp.ColExprs {
if cExpr.IsGrouped {
continue
}
if len(before) != 0 && len(after) != 0 {
if cExpr.Operation == OpCount {
continue
}
bef := before[cExpr.ColNum]
aft := after[cExpr.ColNum]
// If both are null, there's no change
if bef.IsNull() && aft.IsNull() {
continue
}
// If any one of them is null, something has changed.
if bef.IsNull() || aft.IsNull() {
goto mustSet
}
// Compare content only if none are null.
if bef.ToString() == aft.ToString() {
continue
}
}
mustSet:
sql.Myprintf("%s%v=", separator, cExpr.ColName)
separator = ", "
hasSet = true
if cExpr.Operation == OpCount || cExpr.Operation == OpSum {
sql.Myprintf("%v", cExpr.ColName)
}
if len(before) != 0 {
switch cExpr.Operation {
case OpNone:
if len(after) == 0 {
sql.WriteString("NULL")
}
case OpCount:
sql.WriteString("-1")
case OpSum:
if !before[cExpr.ColNum].IsNull() {
sql.WriteString("-")
encodeValue(sql, before[cExpr.ColNum])
}
}
}
if len(after) != 0 {
switch cExpr.Operation {
case OpNone:
encodeValue(sql, after[cExpr.ColNum])
case OpCount:
sql.WriteString("+1")
case OpSum:
if !after[cExpr.ColNum].IsNull() {
sql.WriteString("+")
encodeValue(sql, after[cExpr.ColNum])
}
}
}
}
return hasSet
}

func (tp *TablePlan) generateWhereValues(sql *sqlparser.TrackedBuffer, before []sqltypes.Value) {
separator := ""
for _, cExpr := range tp.PKCols {
sql.Myprintf("%s%v=", separator, cExpr.ColName)
separator = " and "
encodeValue(sql, before[cExpr.ColNum])
}
}
84 changes: 0 additions & 84 deletions go/vt/vttablet/tabletmanager/vreplication/retryable_client.go

This file was deleted.

Loading

0 comments on commit 66c45d9

Please sign in to comment.