Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TxThrottler support for transactions outside BEGIN/COMMIT #13040

Merged
4 changes: 4 additions & 0 deletions changelog/17.0/17.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ Prior to v17, this asynchronous process could run indefinitely in the background
this behavior was changed to use a context with a timeout of `action_timeout`. If you are using VtctldClient to initiate a restore, make sure you provide an appropriate value for action_timeout to give enough
time for the restore process to complete. Otherwise, the restore will throw an error if the context expires before it completes.

### <a id="Vttablet-TxThrottler"> Vttablet's transaction throttler now also throttles DML outside of `BEGIN; ...; COMMIT;` blocks
Prior to v17, `vttablet`'s transaction throttler (enabled with `--enable-tx-throttler`) would only throttle requests done inside an explicit transaction, i.e., a `BEGIN; ...; COMMIT;` block.
In v17 [PR#13040](https://github.com/vitessio/vitess/issues/13037), this behavior was being changed so that it also throttles work outside of explicit transactions for `INSERT/UPDATE/DELETE/LOAD` queries.

### <a id="new-flag"/> New command line flags and behavior

#### <a id="builtin-backup-read-buffering-flags" /> Backup --builtinbackup-file-read-buffer-size and --builtinbackup-file-write-buffer-size
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ var (
Type: sqltypes.Int64,
},
}
errTxThrottled = vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled")
)

func returnStreamResult(result *sqltypes.Result) error {
Expand Down Expand Up @@ -220,6 +221,10 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}
qre.options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT

if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
return nil, errTxThrottled
}

conn, _, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting)

if err != nil {
Expand All @@ -231,6 +236,9 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}

func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
return nil, errTxThrottled
}
conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting)
if err != nil {
return nil, err
Expand Down
99 changes: 75 additions & 24 deletions go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ import (
"strings"
"testing"

"vitess.io/vitess/go/vt/sidecardb"

"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -38,13 +34,16 @@ import (
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/callinfo"
"vitess.io/vitess/go/vt/callinfo/fakecallinfo"
"vitess.io/vitess/go/vt/sidecardb"
"vitess.io/vitess/go/vt/tableacl"
"vitess.io/vitess/go/vt/tableacl/simpleacl"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
"vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler"

querypb "vitess.io/vitess/go/vt/proto/query"
tableaclpb "vitess.io/vitess/go/vt/proto/tableacl"
Expand Down Expand Up @@ -82,6 +81,10 @@ func TestQueryExecutorPlans(t *testing.T) {
// inTxWant is the query log we expect if we're in a transation.
// If empty, then we should expect the same as logWant.
inTxWant string
// errorWant is the error we expect to get, if any, and should be nil if no error should be returned
errorWant error
// TxThrottler allows the test case to override the transaction throttler
txThrottler txthrottler.TxThrottler
}{{
input: "select * from t",
dbResponses: []dbResponse{{
Expand Down Expand Up @@ -268,7 +271,25 @@ func TestQueryExecutorPlans(t *testing.T) {
resultWant: emptyResult,
planWant: "Show",
logWant: "show create table mysql.`user`",
}}
}, {
input: "update test_table set a=1",
dbResponses: []dbResponse{{
query: "update test_table set a = 1 limit 10001",
result: dmlResult,
}},
errorWant: errTxThrottled,
txThrottler: &mockTxThrottler{true},
}, {
input: "update test_table set a=1",
passThrough: true,
dbResponses: []dbResponse{{
query: "update test_table set a = 1 limit 10001",
result: dmlResult,
}},
errorWant: errTxThrottled,
txThrottler: &mockTxThrottler{true},
},
}
for _, tcase := range testcases {
t.Run(tcase.input, func(t *testing.T) {
db := setUpQueryExecutorTest(t)
Expand All @@ -278,6 +299,9 @@ func TestQueryExecutorPlans(t *testing.T) {
}
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
if tcase.txThrottler != nil {
tsv.txThrottler = tcase.txThrottler
}
tsv.config.DB.DBName = "ks"
defer tsv.StopService()

Expand All @@ -286,32 +310,39 @@ func TestQueryExecutorPlans(t *testing.T) {
// Test outside a transaction.
qre := newTestQueryExecutor(ctx, tsv, tcase.input, 0)
got, err := qre.Execute()
require.NoError(t, err, tcase.input)
assert.Equal(t, tcase.resultWant, got, tcase.input)
assert.Equal(t, tcase.planWant, qre.logStats.PlanType, tcase.input)
assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input)

if tcase.errorWant == nil {
require.NoError(t, err, tcase.input)
assert.Equal(t, tcase.resultWant, got, tcase.input)
assert.Equal(t, tcase.planWant, qre.logStats.PlanType, tcase.input)
assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input)
} else {
assert.True(t, vterrors.Equals(err, tcase.errorWant))
}
// Wait for the existing query to be processed by the cache
tsv.QueryPlanCacheWait()

// Test inside a transaction.
target := tsv.sm.Target()
state, err := tsv.Begin(ctx, target, nil)
require.NoError(t, err)
require.NotNil(t, state.TabletAlias, "alias should not be nil")
assert.Equal(t, tsv.alias, state.TabletAlias, "Wrong alias returned by Begin")
defer tsv.Commit(ctx, target, state.TransactionID)

qre = newTestQueryExecutor(ctx, tsv, tcase.input, state.TransactionID)
got, err = qre.Execute()
require.NoError(t, err, tcase.input)
assert.Equal(t, tcase.resultWant, got, "in tx: %v", tcase.input)
assert.Equal(t, tcase.planWant, qre.logStats.PlanType, "in tx: %v", tcase.input)
want := tcase.logWant
if tcase.inTxWant != "" {
want = tcase.inTxWant
if tcase.errorWant == nil {
require.NoError(t, err)
require.NotNil(t, state.TabletAlias, "alias should not be nil")
assert.Equal(t, tsv.alias, state.TabletAlias, "Wrong alias returned by Begin")
defer tsv.Commit(ctx, target, state.TransactionID)

qre = newTestQueryExecutor(ctx, tsv, tcase.input, state.TransactionID)
got, err = qre.Execute()
require.NoError(t, err, tcase.input)
assert.Equal(t, tcase.resultWant, got, "in tx: %v", tcase.input)
assert.Equal(t, tcase.planWant, qre.logStats.PlanType, "in tx: %v", tcase.input)
want := tcase.logWant
if tcase.inTxWant != "" {
want = tcase.inTxWant
}
assert.Equal(t, want, qre.logStats.RewrittenSQL(), "in tx: %v", tcase.input)
} else {
assert.True(t, vterrors.Equals(err, tcase.errorWant))
}
assert.Equal(t, want, qre.logStats.RewrittenSQL(), "in tx: %v", tcase.input)
})
}
}
Expand Down Expand Up @@ -1759,3 +1790,23 @@ func TestQueryExecSchemaReloadCount(t *testing.T) {
})
}
}

type mockTxThrottler struct {
throttle bool
}

func (m mockTxThrottler) InitDBConfig(target *querypb.Target) {
panic("implement me")
}

func (m mockTxThrottler) Open() (err error) {
return nil
}

func (m mockTxThrottler) Close() {
return
}

func (m mockTxThrottler) Throttle(priority int) (result bool) {
return m.throttle
}
38 changes: 21 additions & 17 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type TabletServer struct {
tracker *schema.Tracker
watcher *BinlogWatcher
qe *QueryEngine
txThrottler *txthrottler.TxThrottler
txThrottler txthrottler.TxThrottler
te *TxEngine
messager *messager.Engine
hs *healthStreamer
Expand Down Expand Up @@ -491,22 +491,8 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
startTime := time.Now()
priority := tsv.config.TxThrottlerDefaultPriority
if options != nil && options.Priority != "" {
optionsPriority, err := strconv.Atoi(options.Priority)
// This should never error out, as the value for Priority has been validated in the vtgate already.
// Still, handle it just to make sure.
if err != nil {
log.Errorf(
"The value of the %s query directive could not be converted to integer, using the "+
"default value. Error was: %s",
sqlparser.DirectivePriority, priority, err)
} else {
priority = optionsPriority
}
}
if tsv.txThrottler.Throttle(priority) {
return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled")
if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options)) {
return errTxThrottled
}
var connSetting *pools.Setting
if len(settings) > 0 {
Expand Down Expand Up @@ -537,6 +523,24 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save
return state, err
}

func (tsv *TabletServer) getPriorityFromOptions(options *querypb.ExecuteOptions) int {
priority := tsv.config.TxThrottlerDefaultPriority
if options != nil && options.Priority != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A small nit: in vitess codebase we prefer early returns over indented conditions, so for example we'd do this:

Suggested change
if options != nil && options.Priority != "" {
if options == nil {
return priority
}
if options.Priority == "" {
return priority
}

Mind that if you accept the above suggestion, the code will not compile as we need t remove a closing brace }

optionsPriority, err := strconv.Atoi(options.Priority)
// This should never error out, as the value for Priority has been validated in the vtgate already.
// Still, handle it just to make sure.
if err != nil {
log.Errorf(
"The value of the %s query directive could not be converted to integer, using the "+
"default value. Error was: %s",
sqlparser.DirectivePriority, priority, err)
} else {
priority = optionsPriority
}
}
return priority
}

// Commit commits the specified transaction.
func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, transactionID int64) (newReservedID int64, err error) {
err = tsv.execRequest(
Expand Down
Loading