diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index c31b65be8dd..55d0acaae74 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -36,6 +36,7 @@ import ( "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/trace" + "vitess.io/vitess/go/viperutil" "vitess.io/vitess/go/vt/dbconnpool" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" @@ -170,8 +171,8 @@ type QueryEngine struct { txSerializer *txserializer.TxSerializer // Vars - maxResultSize atomic.Int64 - warnResultSize atomic.Int64 + maxResultSize viperutil.Value[int64] + warnResultSize viperutil.Value[int64] streamBufferSize atomic.Int64 // tableaclExemptCount count the number of accesses allowed // based on membership in the superuser ACL @@ -211,12 +212,12 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine { // Cache for query plans: user configured size with a doorkeeper by default to prevent one-off queries // from thrashing the cache. - qe.plans = theine.NewStore[PlanCacheKey, *TabletPlan](config.QueryCacheMemory, config.QueryCacheDoorkeeper) + qe.plans = theine.NewStore[PlanCacheKey, *TabletPlan](config.QueryCacheMemory.Get(), config.QueryCacheDoorkeeper) // cache for connection settings: default to 1/4th of the size for the query cache and do // not use a doorkeeper because custom connection settings are rarely one-off and we always // want to cache them - var settingsCacheMemory = config.QueryCacheMemory / 4 + var settingsCacheMemory = config.QueryCacheMemory.Get() / 4 qe.settings = theine.NewStore[SettingsCacheKey, *smartconnpool.Setting](settingsCacheMemory, false) qe.schema.Store(¤tSchema{ @@ -255,16 +256,16 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine { } } - qe.maxResultSize.Store(int64(config.Oltp.MaxRows)) - qe.warnResultSize.Store(int64(config.Oltp.WarnRows)) + qe.maxResultSize = config.Oltp.MaxRows + qe.warnResultSize = config.Oltp.WarnRows qe.streamBufferSize.Store(int64(config.StreamBufferSize)) planbuilder.PassthroughDMLs = config.PassthroughDML qe.accessCheckerLogger = logutil.NewThrottledLogger("accessChecker", 1*time.Second) - env.Exporter().NewGaugeFunc("MaxResultSize", "Query engine max result size", qe.maxResultSize.Load) - env.Exporter().NewGaugeFunc("WarnResultSize", "Query engine warn result size", qe.warnResultSize.Load) + env.Exporter().NewGaugeFunc("MaxResultSize", "Query engine max result size", qe.maxResultSize.Get) + env.Exporter().NewGaugeFunc("WarnResultSize", "Query engine warn result size", qe.warnResultSize.Get) env.Exporter().NewGaugeFunc("StreamBufferSize", "Query engine stream buffer size", qe.streamBufferSize.Load) env.Exporter().NewCounterFunc("TableACLExemptCount", "Query engine table ACL exempt count", qe.tableaclExemptCount.Load) diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 63dcd42d0a8..55ae9da2f1d 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -731,7 +731,7 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) { } func (qre *QueryExecutor) execDMLLimit(conn *StatefulConnection) (*sqltypes.Result, error) { - maxrows := qre.tsv.qe.maxResultSize.Load() + maxrows := qre.tsv.qe.maxResultSize.Get() qre.bindVars["#maxLimit"] = sqltypes.Int64BindVariable(maxrows + 1) result, err := qre.txFetch(conn, true) if err != nil { @@ -750,7 +750,7 @@ func (qre *QueryExecutor) verifyRowCount(count, maxrows int64) error { callerID := callerid.ImmediateCallerIDFromContext(qre.ctx) return vterrors.Errorf(vtrpcpb.Code_ABORTED, "caller id: %s: row count exceeded %d", callerID.Username, maxrows) } - warnThreshold := qre.tsv.qe.warnResultSize.Load() + warnThreshold := qre.tsv.qe.warnResultSize.Get() if warnThreshold > 0 && count > warnThreshold { callerID := callerid.ImmediateCallerIDFromContext(qre.ctx) qre.tsv.Stats().Warnings.Add("ResultsExceeded", 1) @@ -1060,7 +1060,7 @@ func (qre *QueryExecutor) drainResultSetOnConn(conn *connpool.Conn) error { } func (qre *QueryExecutor) getSelectLimit() int64 { - return qre.tsv.qe.maxResultSize.Load() + return qre.tsv.qe.maxResultSize.Get() } func (qre *QueryExecutor) execDBConn(conn *connpool.Conn, sql string, wantfields bool) (*sqltypes.Result, error) { @@ -1073,7 +1073,7 @@ func (qre *QueryExecutor) execDBConn(conn *connpool.Conn, sql string, wantfields qre.tsv.statelessql.Add(qd) defer qre.tsv.statelessql.Remove(qd) - return conn.Exec(ctx, sql, int(qre.tsv.qe.maxResultSize.Load()), wantfields) + return conn.Exec(ctx, sql, int(qre.tsv.qe.maxResultSize.Get()), wantfields) } func (qre *QueryExecutor) execStatefulConn(conn *StatefulConnection, sql string, wantfields bool) (*sqltypes.Result, error) { @@ -1086,7 +1086,7 @@ func (qre *QueryExecutor) execStatefulConn(conn *StatefulConnection, sql string, qre.tsv.statefulql.Add(qd) defer qre.tsv.statefulql.Remove(qd) - return conn.Exec(ctx, sql, int(qre.tsv.qe.maxResultSize.Load()), wantfields) + return conn.Exec(ctx, sql, int(qre.tsv.qe.maxResultSize.Get()), wantfields) } func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction bool, sql string, callback func(*sqltypes.Result) error) error { diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index 6ea3c90d989..c8f27263511 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -1478,7 +1478,7 @@ func newTestTabletServer(ctx context.Context, flags executorFlags, db *fakesqldb config.TwoPCAbandonAge = 10 } if flags&smallResultSize > 0 { - config.Oltp.MaxRows = 2 + config.Oltp.MaxRows.Set(2) } if flags&enableConsolidator > 0 { config.Consolidator = tabletenv.Enable diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index ac2629709b9..35e75127426 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -28,6 +28,7 @@ import ( "vitess.io/vitess/go/flagutil" "vitess.io/vitess/go/streamlog" + "vitess.io/vitess/go/viperutil" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/servenv" @@ -126,13 +127,29 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) { fs.Var(¤tConfig.Oltp.TxTimeoutSeconds, currentConfig.Oltp.TxTimeoutSeconds.Name(), "query server transaction timeout (in seconds), a transaction will be killed if it takes longer than this value") currentConfig.GracePeriods.ShutdownSeconds = flagutil.NewDeprecatedFloat64Seconds(defaultConfig.GracePeriods.ShutdownSeconds.Name(), defaultConfig.GracePeriods.TransitionSeconds.Get()) fs.Var(¤tConfig.GracePeriods.ShutdownSeconds, currentConfig.GracePeriods.ShutdownSeconds.Name(), "how long to wait (in seconds) for queries and transactions to complete during graceful shutdown.") - fs.IntVar(¤tConfig.Oltp.MaxRows, "queryserver-config-max-result-size", defaultConfig.Oltp.MaxRows, "query server max result size, maximum number of rows allowed to return from vttablet for non-streaming queries.") - fs.IntVar(¤tConfig.Oltp.WarnRows, "queryserver-config-warn-result-size", defaultConfig.Oltp.WarnRows, "query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this") + + currentConfig.Oltp.MaxRows = viperutil.Configure("vttablet.queryserver.max-result-size", viperutil.Options[int64]{ + FlagName: "queryserver-config-max-result-size", + Default: 10000, + Dynamic: true, + }) + fs.Int64("queryserver-config-max-result-size", currentConfig.Oltp.MaxRows.Default(), "query server max result size, maximum number of rows allowed to return from vttablet for non-streaming queries.") + + currentConfig.Oltp.WarnRows = viperutil.Configure("vttalbet.queryserver.warn-result-size", viperutil.Options[int64]{ + FlagName: "queryserver-config-warn-result-size", + Dynamic: true, + }) + fs.Int64("queryserver-config-warn-result-size", currentConfig.Oltp.WarnRows.Default(), "query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this") + fs.BoolVar(¤tConfig.PassthroughDML, "queryserver-config-passthrough-dmls", defaultConfig.PassthroughDML, "query server pass through all dml statements without rewriting") fs.IntVar(¤tConfig.StreamBufferSize, "queryserver-config-stream-buffer-size", defaultConfig.StreamBufferSize, "query server stream buffer size, the maximum number of bytes sent from vttablet for each stream call. It's recommended to keep this value in sync with vtgate's stream_buffer_size.") - fs.Int64Var(¤tConfig.QueryCacheMemory, "queryserver-config-query-cache-memory", defaultConfig.QueryCacheMemory, "query server query cache size in bytes, maximum amount of memory to be used for caching. vttablet analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache.") + currentConfig.QueryCacheMemory = viperutil.Configure("vttablet.queryserver.query-cache.memory", viperutil.Options[int64]{ + FlagName: "queryserver-config-query-cache-memory", + Default: 32 * 1024 * 1024, // 32 mb for our query cache + }) + fs.Int64("queryserver-config-query-cache-memory", currentConfig.QueryCacheMemory.Default(), "query server query cache size in bytes, maximum amount of memory to be used for caching. vttablet analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache.") currentConfig.SchemaReloadIntervalSeconds = defaultConfig.SchemaReloadIntervalSeconds.Clone() fs.Var(¤tConfig.SchemaReloadIntervalSeconds, currentConfig.SchemaReloadIntervalSeconds.Name(), "query server schema reload time, how often vttablet reloads schemas from underlying MySQL instance in seconds. vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time.") @@ -221,6 +238,12 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) { fs.BoolVar(¤tConfig.EnableViews, "queryserver-enable-views", false, "Enable views support in vttablet.") fs.BoolVar(¤tConfig.EnablePerWorkloadTableMetrics, "enable-per-workload-table-metrics", defaultConfig.EnablePerWorkloadTableMetrics, "If true, query counts and query error metrics include a label that identifies the workload") + + viperutil.BindFlags(fs, + currentConfig.QueryCacheMemory, + currentConfig.Oltp.MaxRows, + currentConfig.Oltp.WarnRows, + ) } var ( @@ -325,7 +348,7 @@ type TabletConfig struct { StreamBufferSize int `json:"streamBufferSize,omitempty"` ConsolidatorStreamTotalSize int64 `json:"consolidatorStreamTotalSize,omitempty"` ConsolidatorStreamQuerySize int64 `json:"consolidatorStreamQuerySize,omitempty"` - QueryCacheMemory int64 `json:"queryCacheMemory,omitempty"` + QueryCacheMemory viperutil.Value[int64] `json:"queryCacheMemory,omitempty"` QueryCacheDoorkeeper bool `json:"queryCacheDoorkeeper,omitempty"` SchemaReloadIntervalSeconds flagutil.DeprecatedFloat64Seconds `json:"schemaReloadIntervalSeconds,omitempty"` SignalSchemaChangeReloadIntervalSeconds flagutil.DeprecatedFloat64Seconds `json:"signalSchemaChangeReloadIntervalSeconds,omitempty"` @@ -456,21 +479,21 @@ func (cfg *OlapConfig) MarshalJSON() ([]byte, error) { // OltpConfig contains the config for oltp settings. type OltpConfig struct { - QueryTimeoutSeconds flagutil.DeprecatedFloat64Seconds `json:"queryTimeoutSeconds,omitempty"` - TxTimeoutSeconds flagutil.DeprecatedFloat64Seconds `json:"txTimeoutSeconds,omitempty"` - MaxRows int `json:"maxRows,omitempty"` - WarnRows int `json:"warnRows,omitempty"` + QueryTimeoutSeconds flagutil.DeprecatedFloat64Seconds + TxTimeoutSeconds flagutil.DeprecatedFloat64Seconds + MaxRows viperutil.Value[int64] + WarnRows viperutil.Value[int64] } func (cfg *OltpConfig) MarshalJSON() ([]byte, error) { - type Proxy OltpConfig - tmp := struct { - Proxy + MaxRows int64 `json:"maxRows,omitempty"` + WarnRows int64 `json:"warnRows,omitempty"` QueryTimeoutSeconds string `json:"queryTimeoutSeconds,omitempty"` TxTimeoutSeconds string `json:"txTimeoutSeconds,omitempty"` }{ - Proxy: Proxy(*cfg), + MaxRows: cfg.MaxRows.Get(), + WarnRows: cfg.WarnRows.Get(), } if d := cfg.QueryTimeoutSeconds.Get(); d != 0 { @@ -766,7 +789,6 @@ var defaultConfig = TabletConfig{ Oltp: OltpConfig{ QueryTimeoutSeconds: flagutil.NewDeprecatedFloat64Seconds("queryserver-config-query-timeout", 30*time.Second), TxTimeoutSeconds: flagutil.NewDeprecatedFloat64Seconds("queryserver-config-transaction-timeout", 30*time.Second), - MaxRows: 10000, }, Healthcheck: HealthcheckConfig{ IntervalSeconds: flagutil.NewDeprecatedFloat64Seconds("health_check_interval", 20*time.Second), @@ -804,7 +826,7 @@ var defaultConfig = TabletConfig{ // great (the overhead makes the final packets on the wire about twice // bigger than this). StreamBufferSize: 32 * 1024, - QueryCacheMemory: 32 * 1024 * 1024, // 32 mb for our query cache + // QueryCacheMemory: 32 * 1024 * 1024, // 32 mb for our query cache // The doorkeeper for the plan cache is disabled by default in endtoend tests to ensure // results are consistent between runs. QueryCacheDoorkeeper: !servenv.TestingEndtoend, diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 6c1a60928de..254761c499f 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1972,22 +1972,22 @@ func (tsv *TabletServer) QueryPlanCacheLen() int { // SetMaxResultSize changes the max result size to the specified value. func (tsv *TabletServer) SetMaxResultSize(val int) { - tsv.qe.maxResultSize.Store(int64(val)) + tsv.qe.maxResultSize.Set(int64(val)) } // MaxResultSize returns the max result size. func (tsv *TabletServer) MaxResultSize() int { - return int(tsv.qe.maxResultSize.Load()) + return int(tsv.qe.maxResultSize.Get()) } // SetWarnResultSize changes the warn result size to the specified value. func (tsv *TabletServer) SetWarnResultSize(val int) { - tsv.qe.warnResultSize.Store(int64(val)) + tsv.qe.warnResultSize.Set(int64(val)) } // WarnResultSize returns the warn result size. func (tsv *TabletServer) WarnResultSize() int { - return int(tsv.qe.warnResultSize.Load()) + return int(tsv.qe.warnResultSize.Get()) } // SetThrottleMetricThreshold changes the throttler metric threshold diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index d2fb10e5a77..aaee5946106 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -2010,7 +2010,7 @@ func TestConfigChanges(t *testing.T) { if val := tsv.MaxResultSize(); val != newSize { t.Errorf("MaxResultSize: %d, want %d", val, newSize) } - if val := int(tsv.qe.maxResultSize.Load()); val != newSize { + if val := int(tsv.qe.maxResultSize.Get()); val != newSize { t.Errorf("tsv.qe.maxResultSize.Get: %d, want %d", val, newSize) } @@ -2018,7 +2018,7 @@ func TestConfigChanges(t *testing.T) { if val := tsv.WarnResultSize(); val != newSize { t.Errorf("WarnResultSize: %d, want %d", val, newSize) } - if val := int(tsv.qe.warnResultSize.Load()); val != newSize { + if val := int(tsv.qe.warnResultSize.Get()); val != newSize { t.Errorf("tsv.qe.warnResultSize.Get: %d, want %d", val, newSize) } }