Skip to content

Commit

Permalink
begin viperizing tabletserver - max/warn result size, cache memory
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Mason <[email protected]>
  • Loading branch information
Andrew Mason committed Nov 27, 2023
1 parent 162f346 commit 2415efc
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 34 deletions.
17 changes: 9 additions & 8 deletions go/vt/vttablet/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"vitess.io/vitess/go/streamlog"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/viperutil"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -170,8 +171,8 @@ type QueryEngine struct {
txSerializer *txserializer.TxSerializer

// Vars
maxResultSize atomic.Int64
warnResultSize atomic.Int64
maxResultSize viperutil.Value[int64]
warnResultSize viperutil.Value[int64]
streamBufferSize atomic.Int64
// tableaclExemptCount count the number of accesses allowed
// based on membership in the superuser ACL
Expand Down Expand Up @@ -211,12 +212,12 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine {

// Cache for query plans: user configured size with a doorkeeper by default to prevent one-off queries
// from thrashing the cache.
qe.plans = theine.NewStore[PlanCacheKey, *TabletPlan](config.QueryCacheMemory, config.QueryCacheDoorkeeper)
qe.plans = theine.NewStore[PlanCacheKey, *TabletPlan](config.QueryCacheMemory.Get(), config.QueryCacheDoorkeeper)

// cache for connection settings: default to 1/4th of the size for the query cache and do
// not use a doorkeeper because custom connection settings are rarely one-off and we always
// want to cache them
var settingsCacheMemory = config.QueryCacheMemory / 4
var settingsCacheMemory = config.QueryCacheMemory.Get() / 4
qe.settings = theine.NewStore[SettingsCacheKey, *smartconnpool.Setting](settingsCacheMemory, false)

qe.schema.Store(&currentSchema{
Expand Down Expand Up @@ -255,16 +256,16 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine {
}
}

qe.maxResultSize.Store(int64(config.Oltp.MaxRows))
qe.warnResultSize.Store(int64(config.Oltp.WarnRows))
qe.maxResultSize = config.Oltp.MaxRows
qe.warnResultSize = config.Oltp.WarnRows
qe.streamBufferSize.Store(int64(config.StreamBufferSize))

planbuilder.PassthroughDMLs = config.PassthroughDML

qe.accessCheckerLogger = logutil.NewThrottledLogger("accessChecker", 1*time.Second)

env.Exporter().NewGaugeFunc("MaxResultSize", "Query engine max result size", qe.maxResultSize.Load)
env.Exporter().NewGaugeFunc("WarnResultSize", "Query engine warn result size", qe.warnResultSize.Load)
env.Exporter().NewGaugeFunc("MaxResultSize", "Query engine max result size", qe.maxResultSize.Get)
env.Exporter().NewGaugeFunc("WarnResultSize", "Query engine warn result size", qe.warnResultSize.Get)
env.Exporter().NewGaugeFunc("StreamBufferSize", "Query engine stream buffer size", qe.streamBufferSize.Load)
env.Exporter().NewCounterFunc("TableACLExemptCount", "Query engine table ACL exempt count", qe.tableaclExemptCount.Load)

Expand Down
10 changes: 5 additions & 5 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
}

func (qre *QueryExecutor) execDMLLimit(conn *StatefulConnection) (*sqltypes.Result, error) {
maxrows := qre.tsv.qe.maxResultSize.Load()
maxrows := qre.tsv.qe.maxResultSize.Get()
qre.bindVars["#maxLimit"] = sqltypes.Int64BindVariable(maxrows + 1)
result, err := qre.txFetch(conn, true)
if err != nil {
Expand All @@ -750,7 +750,7 @@ func (qre *QueryExecutor) verifyRowCount(count, maxrows int64) error {
callerID := callerid.ImmediateCallerIDFromContext(qre.ctx)
return vterrors.Errorf(vtrpcpb.Code_ABORTED, "caller id: %s: row count exceeded %d", callerID.Username, maxrows)
}
warnThreshold := qre.tsv.qe.warnResultSize.Load()
warnThreshold := qre.tsv.qe.warnResultSize.Get()
if warnThreshold > 0 && count > warnThreshold {
callerID := callerid.ImmediateCallerIDFromContext(qre.ctx)
qre.tsv.Stats().Warnings.Add("ResultsExceeded", 1)
Expand Down Expand Up @@ -1060,7 +1060,7 @@ func (qre *QueryExecutor) drainResultSetOnConn(conn *connpool.Conn) error {
}

func (qre *QueryExecutor) getSelectLimit() int64 {
return qre.tsv.qe.maxResultSize.Load()
return qre.tsv.qe.maxResultSize.Get()
}

func (qre *QueryExecutor) execDBConn(conn *connpool.Conn, sql string, wantfields bool) (*sqltypes.Result, error) {
Expand All @@ -1073,7 +1073,7 @@ func (qre *QueryExecutor) execDBConn(conn *connpool.Conn, sql string, wantfields
qre.tsv.statelessql.Add(qd)
defer qre.tsv.statelessql.Remove(qd)

return conn.Exec(ctx, sql, int(qre.tsv.qe.maxResultSize.Load()), wantfields)
return conn.Exec(ctx, sql, int(qre.tsv.qe.maxResultSize.Get()), wantfields)
}

func (qre *QueryExecutor) execStatefulConn(conn *StatefulConnection, sql string, wantfields bool) (*sqltypes.Result, error) {
Expand All @@ -1086,7 +1086,7 @@ func (qre *QueryExecutor) execStatefulConn(conn *StatefulConnection, sql string,
qre.tsv.statefulql.Add(qd)
defer qre.tsv.statefulql.Remove(qd)

return conn.Exec(ctx, sql, int(qre.tsv.qe.maxResultSize.Load()), wantfields)
return conn.Exec(ctx, sql, int(qre.tsv.qe.maxResultSize.Get()), wantfields)
}

func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction bool, sql string, callback func(*sqltypes.Result) error) error {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1478,7 +1478,7 @@ func newTestTabletServer(ctx context.Context, flags executorFlags, db *fakesqldb
config.TwoPCAbandonAge = 10
}
if flags&smallResultSize > 0 {
config.Oltp.MaxRows = 2
config.Oltp.MaxRows.Set(2)
}
if flags&enableConsolidator > 0 {
config.Consolidator = tabletenv.Enable
Expand Down
50 changes: 36 additions & 14 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/streamlog"
"vitess.io/vitess/go/viperutil"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
Expand Down Expand Up @@ -126,13 +127,29 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
fs.Var(&currentConfig.Oltp.TxTimeoutSeconds, currentConfig.Oltp.TxTimeoutSeconds.Name(), "query server transaction timeout (in seconds), a transaction will be killed if it takes longer than this value")
currentConfig.GracePeriods.ShutdownSeconds = flagutil.NewDeprecatedFloat64Seconds(defaultConfig.GracePeriods.ShutdownSeconds.Name(), defaultConfig.GracePeriods.TransitionSeconds.Get())
fs.Var(&currentConfig.GracePeriods.ShutdownSeconds, currentConfig.GracePeriods.ShutdownSeconds.Name(), "how long to wait (in seconds) for queries and transactions to complete during graceful shutdown.")
fs.IntVar(&currentConfig.Oltp.MaxRows, "queryserver-config-max-result-size", defaultConfig.Oltp.MaxRows, "query server max result size, maximum number of rows allowed to return from vttablet for non-streaming queries.")
fs.IntVar(&currentConfig.Oltp.WarnRows, "queryserver-config-warn-result-size", defaultConfig.Oltp.WarnRows, "query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this")

currentConfig.Oltp.MaxRows = viperutil.Configure("vttablet.queryserver.max-result-size", viperutil.Options[int64]{
FlagName: "queryserver-config-max-result-size",
Default: 10000,
Dynamic: true,
})
fs.Int64("queryserver-config-max-result-size", currentConfig.Oltp.MaxRows.Default(), "query server max result size, maximum number of rows allowed to return from vttablet for non-streaming queries.")

currentConfig.Oltp.WarnRows = viperutil.Configure("vttalbet.queryserver.warn-result-size", viperutil.Options[int64]{
FlagName: "queryserver-config-warn-result-size",
Dynamic: true,
})
fs.Int64("queryserver-config-warn-result-size", currentConfig.Oltp.WarnRows.Default(), "query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this")

fs.BoolVar(&currentConfig.PassthroughDML, "queryserver-config-passthrough-dmls", defaultConfig.PassthroughDML, "query server pass through all dml statements without rewriting")

fs.IntVar(&currentConfig.StreamBufferSize, "queryserver-config-stream-buffer-size", defaultConfig.StreamBufferSize, "query server stream buffer size, the maximum number of bytes sent from vttablet for each stream call. It's recommended to keep this value in sync with vtgate's stream_buffer_size.")

fs.Int64Var(&currentConfig.QueryCacheMemory, "queryserver-config-query-cache-memory", defaultConfig.QueryCacheMemory, "query server query cache size in bytes, maximum amount of memory to be used for caching. vttablet analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache.")
currentConfig.QueryCacheMemory = viperutil.Configure("vttablet.queryserver.query-cache.memory", viperutil.Options[int64]{
FlagName: "queryserver-config-query-cache-memory",
Default: 32 * 1024 * 1024, // 32 mb for our query cache
})
fs.Int64("queryserver-config-query-cache-memory", currentConfig.QueryCacheMemory.Default(), "query server query cache size in bytes, maximum amount of memory to be used for caching. vttablet analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache.")

currentConfig.SchemaReloadIntervalSeconds = defaultConfig.SchemaReloadIntervalSeconds.Clone()
fs.Var(&currentConfig.SchemaReloadIntervalSeconds, currentConfig.SchemaReloadIntervalSeconds.Name(), "query server schema reload time, how often vttablet reloads schemas from underlying MySQL instance in seconds. vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time.")
Expand Down Expand Up @@ -221,6 +238,12 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
fs.BoolVar(&currentConfig.EnableViews, "queryserver-enable-views", false, "Enable views support in vttablet.")

fs.BoolVar(&currentConfig.EnablePerWorkloadTableMetrics, "enable-per-workload-table-metrics", defaultConfig.EnablePerWorkloadTableMetrics, "If true, query counts and query error metrics include a label that identifies the workload")

viperutil.BindFlags(fs,
currentConfig.QueryCacheMemory,
currentConfig.Oltp.MaxRows,
currentConfig.Oltp.WarnRows,
)
}

var (
Expand Down Expand Up @@ -325,7 +348,7 @@ type TabletConfig struct {
StreamBufferSize int `json:"streamBufferSize,omitempty"`
ConsolidatorStreamTotalSize int64 `json:"consolidatorStreamTotalSize,omitempty"`
ConsolidatorStreamQuerySize int64 `json:"consolidatorStreamQuerySize,omitempty"`
QueryCacheMemory int64 `json:"queryCacheMemory,omitempty"`
QueryCacheMemory viperutil.Value[int64] `json:"queryCacheMemory,omitempty"`
QueryCacheDoorkeeper bool `json:"queryCacheDoorkeeper,omitempty"`
SchemaReloadIntervalSeconds flagutil.DeprecatedFloat64Seconds `json:"schemaReloadIntervalSeconds,omitempty"`
SignalSchemaChangeReloadIntervalSeconds flagutil.DeprecatedFloat64Seconds `json:"signalSchemaChangeReloadIntervalSeconds,omitempty"`
Expand Down Expand Up @@ -456,21 +479,21 @@ func (cfg *OlapConfig) MarshalJSON() ([]byte, error) {

// OltpConfig contains the config for oltp settings.
type OltpConfig struct {
QueryTimeoutSeconds flagutil.DeprecatedFloat64Seconds `json:"queryTimeoutSeconds,omitempty"`
TxTimeoutSeconds flagutil.DeprecatedFloat64Seconds `json:"txTimeoutSeconds,omitempty"`
MaxRows int `json:"maxRows,omitempty"`
WarnRows int `json:"warnRows,omitempty"`
QueryTimeoutSeconds flagutil.DeprecatedFloat64Seconds
TxTimeoutSeconds flagutil.DeprecatedFloat64Seconds
MaxRows viperutil.Value[int64]
WarnRows viperutil.Value[int64]
}

func (cfg *OltpConfig) MarshalJSON() ([]byte, error) {
type Proxy OltpConfig

tmp := struct {
Proxy
MaxRows int64 `json:"maxRows,omitempty"`
WarnRows int64 `json:"warnRows,omitempty"`
QueryTimeoutSeconds string `json:"queryTimeoutSeconds,omitempty"`
TxTimeoutSeconds string `json:"txTimeoutSeconds,omitempty"`
}{
Proxy: Proxy(*cfg),
MaxRows: cfg.MaxRows.Get(),
WarnRows: cfg.WarnRows.Get(),
}

if d := cfg.QueryTimeoutSeconds.Get(); d != 0 {
Expand Down Expand Up @@ -766,7 +789,6 @@ var defaultConfig = TabletConfig{
Oltp: OltpConfig{
QueryTimeoutSeconds: flagutil.NewDeprecatedFloat64Seconds("queryserver-config-query-timeout", 30*time.Second),
TxTimeoutSeconds: flagutil.NewDeprecatedFloat64Seconds("queryserver-config-transaction-timeout", 30*time.Second),
MaxRows: 10000,
},
Healthcheck: HealthcheckConfig{
IntervalSeconds: flagutil.NewDeprecatedFloat64Seconds("health_check_interval", 20*time.Second),
Expand Down Expand Up @@ -804,7 +826,7 @@ var defaultConfig = TabletConfig{
// great (the overhead makes the final packets on the wire about twice
// bigger than this).
StreamBufferSize: 32 * 1024,
QueryCacheMemory: 32 * 1024 * 1024, // 32 mb for our query cache
// QueryCacheMemory: 32 * 1024 * 1024, // 32 mb for our query cache
// The doorkeeper for the plan cache is disabled by default in endtoend tests to ensure
// results are consistent between runs.
QueryCacheDoorkeeper: !servenv.TestingEndtoend,
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1972,22 +1972,22 @@ func (tsv *TabletServer) QueryPlanCacheLen() int {

// SetMaxResultSize changes the max result size to the specified value.
func (tsv *TabletServer) SetMaxResultSize(val int) {
tsv.qe.maxResultSize.Store(int64(val))
tsv.qe.maxResultSize.Set(int64(val))
}

// MaxResultSize returns the max result size.
func (tsv *TabletServer) MaxResultSize() int {
return int(tsv.qe.maxResultSize.Load())
return int(tsv.qe.maxResultSize.Get())
}

// SetWarnResultSize changes the warn result size to the specified value.
func (tsv *TabletServer) SetWarnResultSize(val int) {
tsv.qe.warnResultSize.Store(int64(val))
tsv.qe.warnResultSize.Set(int64(val))
}

// WarnResultSize returns the warn result size.
func (tsv *TabletServer) WarnResultSize() int {
return int(tsv.qe.warnResultSize.Load())
return int(tsv.qe.warnResultSize.Get())
}

// SetThrottleMetricThreshold changes the throttler metric threshold
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2010,15 +2010,15 @@ func TestConfigChanges(t *testing.T) {
if val := tsv.MaxResultSize(); val != newSize {
t.Errorf("MaxResultSize: %d, want %d", val, newSize)
}
if val := int(tsv.qe.maxResultSize.Load()); val != newSize {
if val := int(tsv.qe.maxResultSize.Get()); val != newSize {
t.Errorf("tsv.qe.maxResultSize.Get: %d, want %d", val, newSize)
}

tsv.SetWarnResultSize(newSize)
if val := tsv.WarnResultSize(); val != newSize {
t.Errorf("WarnResultSize: %d, want %d", val, newSize)
}
if val := int(tsv.qe.warnResultSize.Load()); val != newSize {
if val := int(tsv.qe.warnResultSize.Get()); val != newSize {
t.Errorf("tsv.qe.warnResultSize.Get: %d, want %d", val, newSize)
}
}
Expand Down

0 comments on commit 2415efc

Please sign in to comment.