Skip to content

Commit

Permalink
feat: store query plan and tables to tx properties to be used by quer…
Browse files Browse the repository at this point in the history
…y rules

Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Aug 14, 2024
1 parent 8cde8f3 commit 44117cc
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 17 deletions.
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {
}

for _, query := range conn.TxProperties().Queries {
qr := dte.qe.queryRuleSources.FilterByPlan(query, 0)
qr := dte.qe.queryRuleSources.FilterByPlan(query.Sql, query.PlanType, query.Tables...)
if qr != nil {
act, _, _, _ := qr.GetAction("", "", nil, sqlparser.MarginComments{})
if act != rules.QRContinue {
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,6 @@ func (qre *QueryExecutor) execNextval() (*sqltypes.Result, error) {
newLast += cache
}
query = fmt.Sprintf("update %s set next_id = %d where id = 0", sqlparser.String(tableName), newLast)
conn.TxProperties().RecordQuery(query)
_, err = qre.execStatefulConn(conn, query, false)
if err != nil {
return nil, err
Expand Down Expand Up @@ -807,7 +806,7 @@ func (qre *QueryExecutor) txFetch(conn *StatefulConnection, record bool) (*sqlty
}
// Only record successful queries.
if record {
conn.TxProperties().RecordQuery(sql)
conn.TxProperties().RecordQuery(sql, qre.plan.PlanID, qre.plan.TableNames())
}
return qr, nil
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/twopc.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (tpc *TwoPC) Close() {
}

// SaveRedo saves the statements in the redo log using the supplied connection.
func (tpc *TwoPC) SaveRedo(ctx context.Context, conn *StatefulConnection, dtid string, queries []string) error {
func (tpc *TwoPC) SaveRedo(ctx context.Context, conn *StatefulConnection, dtid string, queries []tx.Query) error {
bindVars := map[string]*querypb.BindVariable{
"dtid": sqltypes.StringBindVariable(dtid),
"state": sqltypes.Int64BindVariable(RedoStatePrepared),
Expand All @@ -185,7 +185,7 @@ func (tpc *TwoPC) SaveRedo(ctx context.Context, conn *StatefulConnection, dtid s
rows[i] = []sqltypes.Value{
sqltypes.NewVarBinary(dtid),
sqltypes.NewInt64(int64(i + 1)),
sqltypes.NewVarBinary(query),
sqltypes.NewVarBinary(query.Sql),
}
}
extras := map[string]sqlparser.Encodable{
Expand Down
32 changes: 22 additions & 10 deletions go/vt/vttablet/tabletserver/tx/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder"
)

type (
// ConnID as type int64
ConnID = int64

//DTID as type string
// DTID as type string
DTID = string

//EngineStateMachine is used to control the state the transactional engine -
//whether new connections and/or transactions are allowed or not.
// EngineStateMachine is used to control the state the transactional engine -
// whether new connections and/or transactions are allowed or not.
EngineStateMachine interface {
Init() error
AcceptReadWrite() error
Expand All @@ -46,14 +47,14 @@ type (
// ReleaseReason as type int
ReleaseReason int

//Properties contains all information that is related to the currently running
//transaction on the connection
// Properties contains all information that is related to the currently running
// transaction on the connection
Properties struct {
EffectiveCaller *vtrpcpb.CallerID
ImmediateCaller *querypb.VTGateCallerID
StartTime time.Time
EndTime time.Time
Queries []string
Queries []Query
Autocommit bool
Conclusion string
LogToFile bool
Expand All @@ -62,6 +63,12 @@ type (
}
)

type Query struct {
Sql string
PlanType planbuilder.PlanType
Tables []string
}

const (
// TxClose - connection released on close.
TxClose ReleaseReason = iota
Expand Down Expand Up @@ -115,11 +122,15 @@ var txNames = map[ReleaseReason]string{
}

// RecordQuery records the query against this transaction.
func (p *Properties) RecordQuery(query string) {
func (p *Properties) RecordQuery(query string, planType planbuilder.PlanType, tables []string) {
if p == nil {
return
}
p.Queries = append(p.Queries, query)
p.Queries = append(p.Queries, Query{
Sql: query,
PlanType: planType,
Tables: tables,
})
}

// InTransaction returns true as soon as this struct is not nil
Expand All @@ -134,10 +145,11 @@ func (p *Properties) String(sanitize bool, parser *sqlparser.Parser) string {
printQueries := func() string {
sb := strings.Builder{}
for _, query := range p.Queries {
sql := query.Sql
if sanitize {
query, _ = parser.RedactSQLQuery(query)
sql, _ = parser.RedactSQLQuery(sql)
}
sb.WriteString(query)
sb.WriteString(sql)
sb.WriteString(";")
}
return sb.String()
Expand Down
1 change: 0 additions & 1 deletion go/vt/vttablet/tabletserver/tx_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,6 @@ outer:
continue
}
for _, stmt := range preparedTx.Queries {
conn.TxProperties().RecordQuery(stmt)
_, err := conn.Exec(ctx, stmt, 1, false)
if err != nil {
allErr.RecordError(err)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/txlogz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func testHandler(req *http.Request, t *testing.T) {
ImmediateCaller: callerid.NewImmediateCallerID("immediate-caller"),
StartTime: time.Now(),
Conclusion: "unknown",
Queries: []string{"select * from test"},
Queries: []tx.Query{{Sql: "select * from test"}},
},
}
txConn.txProps.EndTime = txConn.txProps.StartTime
Expand Down

0 comments on commit 44117cc

Please sign in to comment.