Skip to content

Commit

Permalink
gpool: register gpool into resource manager (#40410)
Browse files Browse the repository at this point in the history
ref #40412
  • Loading branch information
hawkingrei authored Jan 10, 2023
1 parent e2a14ce commit f5362f9
Show file tree
Hide file tree
Showing 13 changed files with 58 additions and 35 deletions.
2 changes: 1 addition & 1 deletion resourcemanager/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (*ResourceManager) exec(pool *util.PoolContainer, cmd scheduler.Command) {
if cmd == scheduler.Hold {
return
}
if time.Since(pool.Pool.LastTunerTs()) > 200*time.Millisecond {
if time.Since(pool.Pool.LastTunerTs()) > util.MinSchedulerInterval.Load() {
con := pool.Pool.Cap()
switch cmd {
case scheduler.Downclock:
Expand Down
1 change: 0 additions & 1 deletion resourcemanager/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,5 @@ go_library(
deps = [
"//resourcemanager/util",
"//util/cpu",
"@org_uber_go_atomic//:atomic",
],
)
2 changes: 1 addition & 1 deletion resourcemanager/scheduler/cpu_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewCPUScheduler() *CPUScheduler {

// Tune is to tune the goroutine pool
func (*CPUScheduler) Tune(_ util.Component, pool util.GorotinuePool) Command {
if time.Since(pool.LastTunerTs()) < minCPUSchedulerInterval.Load() {
if time.Since(pool.LastTunerTs()) < util.MinSchedulerInterval.Load() {
return Hold
}
if cpu.GetCPUUsage() < 0.5 {
Expand Down
7 changes: 0 additions & 7 deletions resourcemanager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,7 @@
package scheduler

import (
"time"

"github.com/pingcap/tidb/resourcemanager/util"
"go.uber.org/atomic"
)

var (
minCPUSchedulerInterval = atomic.NewDuration(time.Minute)
)

// Command is the command for scheduler
Expand Down
5 changes: 4 additions & 1 deletion resourcemanager/util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ go_library(
],
importpath = "github.com/pingcap/tidb/resourcemanager/util",
visibility = ["//visibility:public"],
deps = ["@com_github_pingcap_errors//:errors"],
deps = [
"@com_github_pingcap_errors//:errors",
"@org_uber_go_atomic//:atomic",
],
)

go_test(
Expand Down
4 changes: 2 additions & 2 deletions resourcemanager/util/mock_gpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func NewMockGPool(name string) *MockGPool {
return &MockGPool{name: name}
}

// Release is only for test
func (*MockGPool) Release() {
// ReleaseAndWait is only for test
func (*MockGPool) ReleaseAndWait() {
panic("implement me")
}

Expand Down
23 changes: 11 additions & 12 deletions resourcemanager/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,23 @@

package util

import "time"
import (
"time"

"go.uber.org/atomic"
)

var (
// MinSchedulerInterval is the minimum interval between two scheduling.
MinSchedulerInterval = atomic.NewDuration(200 * time.Millisecond)
)

// GorotinuePool is a pool interface
type GorotinuePool interface {
Release()
ReleaseAndWait()
Tune(size int)
LastTunerTs() time.Time
MaxInFlight() int64
InFlight() int64
MinRT() uint64
MaxPASS() uint64
Cap() int
// LongRTT is to represent the baseline latency by tracking a measurement of the long term, less volatile RTT.
LongRTT() float64
UpdateLongRTT(f func(float64) float64)
// ShortRTT is to represent the current system latency by tracking a measurement of the short time, and more volatile RTT.
ShortRTT() uint64
GetQueueSize() int64
Running() int
Name() string
}
Expand Down
1 change: 1 addition & 0 deletions util/gpool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ go_library(
],
importpath = "github.com/pingcap/tidb/util/gpool",
visibility = ["//visibility:public"],
deps = ["@org_uber_go_atomic//:atomic"],
)
21 changes: 18 additions & 3 deletions util/gpool/gpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"errors"
"sync/atomic"
"time"

atomicutil "go.uber.org/atomic"
)

const (
Expand All @@ -44,13 +46,16 @@ var (

// BasePool is base class of pool
type BasePool struct {
name string
generator atomic.Uint64
name string
lastTuneTs atomicutil.Time
generator atomic.Uint64
}

// NewBasePool is to create a new BasePool.
func NewBasePool() BasePool {
return BasePool{}
return BasePool{
lastTuneTs: *atomicutil.NewTime(time.Now()),
}
}

// SetName is to set name.
Expand All @@ -67,3 +72,13 @@ func (p *BasePool) Name() string {
func (p *BasePool) NewTaskID() uint64 {
return p.generator.Add(1)
}

// LastTunerTs returns the last time when the pool was tuned.
func (p *BasePool) LastTunerTs() time.Time {
return p.lastTuneTs.Load()
}

// SetLastTuneTs sets the last time when the pool was tuned.
func (p *BasePool) SetLastTuneTs(t time.Time) {
p.lastTuneTs.Store(t)
}
3 changes: 3 additions & 0 deletions util/gpool/spmc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ go_library(
importpath = "github.com/pingcap/tidb/util/gpool/spmc",
visibility = ["//visibility:public"],
deps = [
"//resourcemanager",
"//resourcemanager/pooltask",
"//resourcemanager/util",
"//util/gpool",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
Expand All @@ -33,6 +35,7 @@ go_test(
race = "on",
deps = [
"//resourcemanager/pooltask",
"//resourcemanager/util",
"//testkit/testsetup",
"//util",
"//util/gpool",
Expand Down
9 changes: 8 additions & 1 deletion util/gpool/spmc/spmcpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/resourcemanager"
"github.com/pingcap/tidb/resourcemanager/pooltask"
"github.com/pingcap/tidb/resourcemanager/util"
"github.com/pingcap/tidb/util/gpool"
"github.com/pingcap/tidb/util/logutil"
atomicutil "go.uber.org/atomic"
Expand Down Expand Up @@ -55,7 +57,7 @@ type Pool[T any, U any, C any, CT any, TF pooltask.Context[CT]] struct {
}

// NewSPMCPool create a single producer, multiple consumer goroutine pool.
func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name string, size int32, options ...Option) (*Pool[T, U, C, CT, TF], error) {
func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name string, size int32, component util.Component, options ...Option) (*Pool[T, U, C, CT, TF], error) {
opts := loadOptions(options...)
if expiry := opts.ExpiryDuration; expiry <= 0 {
opts.ExpiryDuration = gpool.DefaultCleanIntervalTime
Expand All @@ -77,6 +79,10 @@ func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name stri
result.capacity.Add(size)
result.workers = newWorkerLoopQueue[T, U, C, CT, TF](int(size))
result.cond = sync.NewCond(result.lock)
err := resourcemanager.GlobalResourceManager.Register(result, name, component)
if err != nil {
return nil, err
}
// Start a goroutine to clean up expired workers periodically.
go result.purgePeriodically()
return result, nil
Expand Down Expand Up @@ -129,6 +135,7 @@ func (p *Pool[T, U, C, CT, TF]) Tune(size int) {
if capacity == -1 || size <= 0 || size == capacity {
return
}
p.SetLastTuneTs(time.Now())
p.capacity.Store(int32(size))
if size > capacity {
// boost
Expand Down
3 changes: 2 additions & 1 deletion util/gpool/spmc/spmcpool_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/tidb/resourcemanager/pooltask"
rmutil "github.com/pingcap/tidb/resourcemanager/util"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/gpool"
)
Expand All @@ -29,7 +30,7 @@ const (
)

func BenchmarkGPool(b *testing.B) {
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("test", 10)
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("test", 10, rmutil.UNKNOWN)
if err != nil {
b.Fatal(err)
}
Expand Down
12 changes: 7 additions & 5 deletions util/gpool/spmc/spmcpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/pingcap/tidb/resourcemanager/pooltask"
rmutil "github.com/pingcap/tidb/resourcemanager/util"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/gpool"
"github.com/stretchr/testify/require"
Expand All @@ -32,7 +33,7 @@ func TestPool(t *testing.T) {
myArgs := ConstArgs{a: 10}
// init the pool
// input type, output type, constArgs type
pool, err := NewSPMCPool[int, int, ConstArgs, any, pooltask.NilContext]("TestPool", 10)
pool, err := NewSPMCPool[int, int, ConstArgs, any, pooltask.NilContext]("TestPool", 10, rmutil.UNKNOWN)
require.NoError(t, err)
pool.SetConsumerFunc(func(task int, constArgs ConstArgs, ctx any) int {
return task + constArgs.a
Expand Down Expand Up @@ -76,7 +77,7 @@ func TestPoolWithEnoughCapacity(t *testing.T) {
poolsize = 30
concurrency = 6
)
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithEnoughCapa", poolsize, WithExpiryDuration(DefaultExpiredTime))
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithEnoughCapa", poolsize, rmutil.UNKNOWN, WithExpiryDuration(DefaultExpiredTime))
require.NoError(t, err)
defer p.ReleaseAndWait()
p.SetConsumerFunc(func(a struct{}, b int, c any) struct{} {
Expand Down Expand Up @@ -128,7 +129,7 @@ func TestPoolWithoutEnoughCapacity(t *testing.T) {
concurrency = 2
poolsize = 2
)
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithoutEnoughCapa", poolsize,
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithoutEnoughCapacity", poolsize, rmutil.UNKNOWN,
WithExpiryDuration(DefaultExpiredTime))
require.NoError(t, err)
defer p.ReleaseAndWait()
Expand Down Expand Up @@ -184,7 +185,7 @@ func TestPoolWithoutEnoughCapacityParallel(t *testing.T) {
concurrency = 2
poolsize = 2
)
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithoutEnoughCapa", poolsize,
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithoutEnoughCapacityParallel", poolsize, rmutil.UNKNOWN,
WithExpiryDuration(DefaultExpiredTime), WithNonblocking(true))
require.NoError(t, err)
defer p.ReleaseAndWait()
Expand Down Expand Up @@ -236,7 +237,8 @@ func TestPoolWithoutEnoughCapacityParallel(t *testing.T) {
}

func TestBenchPool(t *testing.T) {
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestBenchPool", 10, WithExpiryDuration(DefaultExpiredTime))
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestBenchPool", 10,
rmutil.UNKNOWN, WithExpiryDuration(DefaultExpiredTime))
require.NoError(t, err)
defer p.ReleaseAndWait()
p.SetConsumerFunc(func(a struct{}, b int, c any) struct{} {
Expand Down

0 comments on commit f5362f9

Please sign in to comment.