diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index c7d91af5b6c..e369ff2aa12 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -24,7 +24,9 @@ import ( "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" ) @@ -34,13 +36,15 @@ type DTExecutor struct { ctx context.Context logStats *tabletenv.LogStats te *TxEngine + qe *QueryEngine } // NewDTExecutor creates a new distributed transaction executor. -func NewDTExecutor(ctx context.Context, te *TxEngine, logStats *tabletenv.LogStats) *DTExecutor { +func NewDTExecutor(ctx context.Context, te *TxEngine, qe *QueryEngine, logStats *tabletenv.LogStats) *DTExecutor { return &DTExecutor{ ctx: ctx, te: te, + qe: qe, logStats: logStats, } } @@ -78,6 +82,17 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error { return vterrors.VT10002("cannot prepare the transaction on a reserved connection") } + for _, query := range conn.TxProperties().Queries { + qr := dte.qe.queryRuleSources.FilterByPlan(query, 0) + if qr != nil { + act, _, _, _ := qr.GetAction("", "", nil, sqlparser.MarginComments{}) + if act != rules.QRContinue { + dte.te.txPool.RollbackAndRelease(dte.ctx, conn) + return vterrors.VT10002("cannot prepare the transaction due to query rule") + } + } + } + err = dte.te.preparedPool.Put(conn, dtid) if err != nil { dte.te.txPool.RollbackAndRelease(dte.ctx, conn) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index e3e951892b7..3cedc9e8068 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -646,7 +646,7 @@ func (tsv *TabletServer) Prepare(ctx context.Context, target *querypb.Target, tr "Prepare", "prepare", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, logStats) + txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) return txe.Prepare(transactionID, dtid) }, ) @@ -659,7 +659,7 @@ func (tsv *TabletServer) CommitPrepared(ctx context.Context, target *querypb.Tar "CommitPrepared", "commit_prepared", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, logStats) + txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) if DebugTwoPc { commitPreparedDelayForTest(tsv) } @@ -675,7 +675,7 @@ func (tsv *TabletServer) RollbackPrepared(ctx context.Context, target *querypb.T "RollbackPrepared", "rollback_prepared", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, logStats) + txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) return txe.RollbackPrepared(dtid, originalID) }, ) @@ -688,7 +688,7 @@ func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb. "CreateTransaction", "create_transaction", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, logStats) + txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) return txe.CreateTransaction(dtid, participants) }, ) @@ -702,7 +702,7 @@ func (tsv *TabletServer) StartCommit(ctx context.Context, target *querypb.Target "StartCommit", "start_commit", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, logStats) + txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) return txe.StartCommit(transactionID, dtid) }, ) @@ -716,7 +716,7 @@ func (tsv *TabletServer) SetRollback(ctx context.Context, target *querypb.Target "SetRollback", "set_rollback", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, logStats) + txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) return txe.SetRollback(dtid, transactionID) }, ) @@ -730,7 +730,7 @@ func (tsv *TabletServer) ConcludeTransaction(ctx context.Context, target *queryp "ConcludeTransaction", "conclude_transaction", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, logStats) + txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) return txe.ConcludeTransaction(dtid) }, ) @@ -743,7 +743,7 @@ func (tsv *TabletServer) ReadTransaction(ctx context.Context, target *querypb.Ta "ReadTransaction", "read_transaction", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, logStats) + txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) metadata, err = txe.ReadTransaction(dtid) return err }, @@ -758,7 +758,7 @@ func (tsv *TabletServer) UnresolvedTransactions(ctx context.Context, target *que "UnresolvedTransactions", "unresolved_transaction", nil, target, nil, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, logStats) + txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) transactions, err = txe.UnresolvedTransactions() return err }, @@ -1771,7 +1771,7 @@ func (tsv *TabletServer) registerQueryListHandlers(queryLists []*QueryList) { func (tsv *TabletServer) registerTwopczHandler() { tsv.exporter.HandleFunc("/twopcz", func(w http.ResponseWriter, r *http.Request) { ctx := tabletenv.LocalContext() - txe := NewDTExecutor(ctx, tsv.te, tabletenv.NewLogStats(ctx, "twopcz")) + txe := NewDTExecutor(ctx, tsv.te, tsv.qe, tabletenv.NewLogStats(ctx, "twopcz")) twopczHandler(txe, w, r) }) }