Skip to content

Commit

Permalink
reject prepare on query rule
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Aug 12, 2024
1 parent 5b6fd89 commit 8cde8f3
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
17 changes: 16 additions & 1 deletion go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
)
Expand All @@ -34,13 +36,15 @@ type DTExecutor struct {
ctx context.Context
logStats *tabletenv.LogStats
te *TxEngine
qe *QueryEngine
}

// NewDTExecutor creates a new distributed transaction executor.
func NewDTExecutor(ctx context.Context, te *TxEngine, logStats *tabletenv.LogStats) *DTExecutor {
func NewDTExecutor(ctx context.Context, te *TxEngine, qe *QueryEngine, logStats *tabletenv.LogStats) *DTExecutor {
return &DTExecutor{
ctx: ctx,
te: te,
qe: qe,
logStats: logStats,
}
}
Expand Down Expand Up @@ -78,6 +82,17 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {
return vterrors.VT10002("cannot prepare the transaction on a reserved connection")
}

for _, query := range conn.TxProperties().Queries {
qr := dte.qe.queryRuleSources.FilterByPlan(query, 0)
if qr != nil {
act, _, _, _ := qr.GetAction("", "", nil, sqlparser.MarginComments{})
if act != rules.QRContinue {
dte.te.txPool.RollbackAndRelease(dte.ctx, conn)
return vterrors.VT10002("cannot prepare the transaction due to query rule")
}
}
}

err = dte.te.preparedPool.Put(conn, dtid)
if err != nil {
dte.te.txPool.RollbackAndRelease(dte.ctx, conn)
Expand Down
20 changes: 10 additions & 10 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ func (tsv *TabletServer) Prepare(ctx context.Context, target *querypb.Target, tr
"Prepare", "prepare", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, logStats)
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
return txe.Prepare(transactionID, dtid)
},
)
Expand All @@ -659,7 +659,7 @@ func (tsv *TabletServer) CommitPrepared(ctx context.Context, target *querypb.Tar
"CommitPrepared", "commit_prepared", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, logStats)
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
if DebugTwoPc {
commitPreparedDelayForTest(tsv)
}
Expand All @@ -675,7 +675,7 @@ func (tsv *TabletServer) RollbackPrepared(ctx context.Context, target *querypb.T
"RollbackPrepared", "rollback_prepared", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, logStats)
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
return txe.RollbackPrepared(dtid, originalID)
},
)
Expand All @@ -688,7 +688,7 @@ func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb.
"CreateTransaction", "create_transaction", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, logStats)
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
return txe.CreateTransaction(dtid, participants)
},
)
Expand All @@ -702,7 +702,7 @@ func (tsv *TabletServer) StartCommit(ctx context.Context, target *querypb.Target
"StartCommit", "start_commit", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, logStats)
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
return txe.StartCommit(transactionID, dtid)
},
)
Expand All @@ -716,7 +716,7 @@ func (tsv *TabletServer) SetRollback(ctx context.Context, target *querypb.Target
"SetRollback", "set_rollback", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, logStats)
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
return txe.SetRollback(dtid, transactionID)
},
)
Expand All @@ -730,7 +730,7 @@ func (tsv *TabletServer) ConcludeTransaction(ctx context.Context, target *queryp
"ConcludeTransaction", "conclude_transaction", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, logStats)
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
return txe.ConcludeTransaction(dtid)
},
)
Expand All @@ -743,7 +743,7 @@ func (tsv *TabletServer) ReadTransaction(ctx context.Context, target *querypb.Ta
"ReadTransaction", "read_transaction", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, logStats)
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
metadata, err = txe.ReadTransaction(dtid)
return err
},
Expand All @@ -758,7 +758,7 @@ func (tsv *TabletServer) UnresolvedTransactions(ctx context.Context, target *que
"UnresolvedTransactions", "unresolved_transaction", nil,
target, nil, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, logStats)
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
transactions, err = txe.UnresolvedTransactions()
return err
},
Expand Down Expand Up @@ -1771,7 +1771,7 @@ func (tsv *TabletServer) registerQueryListHandlers(queryLists []*QueryList) {
func (tsv *TabletServer) registerTwopczHandler() {
tsv.exporter.HandleFunc("/twopcz", func(w http.ResponseWriter, r *http.Request) {
ctx := tabletenv.LocalContext()
txe := NewDTExecutor(ctx, tsv.te, tabletenv.NewLogStats(ctx, "twopcz"))
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, tabletenv.NewLogStats(ctx, "twopcz"))
twopczHandler(txe, w, r)
})
}
Expand Down

0 comments on commit 8cde8f3

Please sign in to comment.