Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

collecting the RU information by pasing point through context.Value #1032

Merged
merged 6 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ jobs:
with:
repository: pingcap/tidb
path: tidb
ref: release-7.1

- name: Check build
run: |
Expand Down
32 changes: 14 additions & 18 deletions internal/client/client_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package client

import (
"context"
"sync"
"sync/atomic"
"time"

Expand All @@ -35,17 +34,16 @@ var _ Client = interceptedClient{}

type interceptedClient struct {
Client
ruRuntimeStatsMap *sync.Map
}

// NewInterceptedClient creates a Client which can execute interceptor.
func NewInterceptedClient(client Client, ruRuntimeStatsMap *sync.Map) Client {
return interceptedClient{client, ruRuntimeStatsMap}
func NewInterceptedClient(client Client) Client {
return interceptedClient{client}
}

func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
// Build the resource control interceptor.
var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req, r.getRURuntimeStats(req.GetStartTS()))
var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req)
// Chain the interceptors if there are multiple interceptors.
if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil {
if finalInterceptor != nil {
Expand All @@ -62,16 +60,6 @@ func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *ti
return r.Client.SendRequest(ctx, addr, req, timeout)
}

func (r interceptedClient) getRURuntimeStats(startTS uint64) *util.RURuntimeStats {
if r.ruRuntimeStatsMap == nil || startTS == 0 {
return nil
}
if v, ok := r.ruRuntimeStatsMap.Load(startTS); ok {
return v.(*util.RURuntimeStats)
}
return nil
}

var (
// ResourceControlSwitch is used to control whether to enable the resource control.
ResourceControlSwitch atomic.Value
Expand All @@ -84,7 +72,6 @@ var (
func buildResourceControlInterceptor(
ctx context.Context,
req *tikvrpc.Request,
ruRuntimeStats *util.RURuntimeStats,
) interceptor.RPCInterceptor {
if !ResourceControlSwitch.Load().(bool) {
return nil
Expand All @@ -102,6 +89,8 @@ func buildResourceControlInterceptor(
}
resourceControlInterceptor := *rcInterceptor

ruDetails := ctx.Value(util.RUDetailsCtxKey)

// Make the request info.
reqInfo := resourcecontrol.MakeRequestInfo(req)
// Build the interceptor.
Expand All @@ -121,15 +110,22 @@ func buildResourceControlInterceptor(
return nil, err
}
req.GetResourceControlContext().Penalty = penalty
ruRuntimeStats.Update(consumption)
if ruDetails != nil {
detail := ruDetails.(*util.RUDetails)
detail.Update(consumption)
}

resp, err := next(target, req)
if resp != nil {
respInfo := resourcecontrol.MakeResponseInfo(resp)
consumption, err = resourceControlInterceptor.OnResponse(resourceGroupName, reqInfo, respInfo)
if err != nil {
return nil, err
}
ruRuntimeStats.Update(consumption)
if ruDetails != nil {
detail := ruDetails.(*util.RUDetails)
detail.Update(consumption)
}
}
return resp, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/client/client_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (c emptyClient) CloseAddr(addr string) error {

func TestInterceptedClient(t *testing.T) {
executed := false
client := NewInterceptedClient(emptyClient{}, nil)
client := NewInterceptedClient(emptyClient{})
ctx := interceptor.WithRPCInterceptor(context.Background(), interceptor.NewRPCInterceptor("test", func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
executed = true
Expand All @@ -54,7 +54,7 @@ func TestInterceptedClient(t *testing.T) {

func TestAppendChainedInterceptor(t *testing.T) {
executed := make([]int, 0, 10)
client := NewInterceptedClient(emptyClient{}, nil)
client := NewInterceptedClient(emptyClient{})

mkInterceptorFn := func(i int) interceptor.RPCInterceptor {
return interceptor.NewRPCInterceptor(fmt.Sprintf("%d", i), func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
Expand Down
50 changes: 2 additions & 48 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,6 @@ const (
// DCLabelKey indicates the key of label which represents the dc for Store.
DCLabelKey = "zone"
safeTSUpdateInterval = time.Second * 2
// Since the default max transaction TTL is 1 hour, we can use this to
// clean up the RU runtime stats as well.
ruRuntimeStatsCleanThreshold = time.Hour
ruRuntimeStatsCleanInterval = ruRuntimeStatsCleanThreshold / 2
)

func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) {
Expand Down Expand Up @@ -138,9 +134,6 @@ type KVStore struct {

replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled

// StartTS -> RURuntimeStats, stores the RU runtime stats for certain transaction.
ruRuntimeStatsMap sync.Map

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
Expand Down Expand Up @@ -230,14 +223,13 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl
cancel: cancel,
gP: util.NewSpool(128, 10*time.Second),
}
store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient, &store.ruRuntimeStatsMap))
store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient))
store.lockResolver = txnlock.NewLockResolver(store)
loadOption(store, opt...)

store.wg.Add(3)
store.wg.Add(2)
go store.runSafePointChecker()
go store.safeTSUpdater()
go store.ruRuntimeStatsMapCleaner()

return store, nil
}
Expand Down Expand Up @@ -695,44 +687,6 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
wg.Wait()
}

func (s *KVStore) ruRuntimeStatsMapCleaner() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove those code?

defer s.wg.Done()
t := time.NewTicker(ruRuntimeStatsCleanInterval)
defer t.Stop()
ctx, cancel := context.WithCancel(s.ctx)
ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC)
defer cancel()

cleanThreshold := ruRuntimeStatsCleanThreshold
if _, e := util.EvalFailpoint("mockFastRURuntimeStatsMapClean"); e == nil {
t.Reset(time.Millisecond * 100)
cleanThreshold = time.Millisecond
}

for {
select {
case <-ctx.Done():
return
case now := <-t.C:
s.ruRuntimeStatsMap.Range(
func(key, _ interface{}) bool {
startTSTime := oracle.GetTimeFromTS(key.(uint64))
if now.Sub(startTSTime) >= cleanThreshold {
s.ruRuntimeStatsMap.Delete(key)
}
return true
},
)
}
}
}

// CreateRURuntimeStats creates a RURuntimeStats for the startTS and returns it.
func (s *KVStore) CreateRURuntimeStats(startTS uint64) *util.RURuntimeStats {
rrs, _ := s.ruRuntimeStatsMap.LoadOrStore(startTS, util.NewRURuntimeStats())
return rrs.(*util.RURuntimeStats)
}

// EnableResourceControl enables the resource control.
func EnableResourceControl() {
client.ResourceControlSwitch.Store(true)
Expand Down
30 changes: 0 additions & 30 deletions tikv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -126,31 +124,3 @@ func (s *testKVSuite) TestMinSafeTs() {
s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(2))
s.Require().Equal(uint64(80), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
}

func TestRURuntimeStatsCleanUp(t *testing.T) {
util.EnableFailpoints()
require := require.New(t)
require.Nil(failpoint.Enable("tikvclient/mockFastRURuntimeStatsMapClean", `return()`))
defer func() {
require.Nil(failpoint.Disable("tikvclient/mockFastRURuntimeStatsMapClean"))
}()

client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
require.Nil(err)
testutils.BootstrapWithSingleStore(cluster)
store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0)
require.Nil(err)
defer store.Close()

// Create a ruRuntimeStats first.
startTS := oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0)
ruRuntimeStats := store.CreateRURuntimeStats(startTS)
require.NotNil(ruRuntimeStats)
// Wait for the cleanup goroutine to clean up the ruRuntimeStatsMap.
time.Sleep(time.Millisecond * 150)
// The ruRuntimeStatsMap should be cleaned up.
store.ruRuntimeStatsMap.Range(func(key, value interface{}) bool {
require.Fail("ruRuntimeStatsMap should be cleaned up")
return true
})
}
48 changes: 26 additions & 22 deletions util/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
type commitDetailCtxKeyType struct{}
type lockKeysDetailCtxKeyType struct{}
type execDetailsCtxKeyType struct{}
type ruDetailsCtxKeyType struct{}
type traceExecDetailsCtxKeyType struct{}

var (
Expand All @@ -64,6 +65,9 @@ var (
// ExecDetailsKey presents ExecDetail info key in context.
ExecDetailsKey = execDetailsCtxKeyType{}

// ruDetailsCtxKey presents RUDetals info key in context.
RUDetailsCtxKey = ruDetailsCtxKeyType{}

// traceExecDetailsKey is a context key whose value indicates whether to add ExecDetails to trace.
traceExecDetailsKey = traceExecDetailsCtxKeyType{}
)
Expand Down Expand Up @@ -682,54 +686,54 @@ func (rd *ResolveLockDetail) Merge(resolveLock *ResolveLockDetail) {
rd.ResolveLockTime += resolveLock.ResolveLockTime
}

// RURuntimeStats is the runtime stats collector for RU.
type RURuntimeStats struct {
// RUDetails contains RU detail info.
type RUDetails struct {
readRU *uatomic.Float64
writeRU *uatomic.Float64
}

// NewRURuntimeStats creates a new RURuntimeStats.
func NewRURuntimeStats() *RURuntimeStats {
return &RURuntimeStats{
// NewRUDetails creates a new RUDetails.
func NewRUDetails() *RUDetails {
return &RUDetails{
readRU: uatomic.NewFloat64(0),
writeRU: uatomic.NewFloat64(0),
}
}

// Clone implements the RuntimeStats interface.
func (rs *RURuntimeStats) Clone() *RURuntimeStats {
return &RURuntimeStats{
readRU: uatomic.NewFloat64(rs.readRU.Load()),
writeRU: uatomic.NewFloat64(rs.writeRU.Load()),
func (rd *RUDetails) Clone() *RUDetails {
return &RUDetails{
readRU: uatomic.NewFloat64(rd.readRU.Load()),
writeRU: uatomic.NewFloat64(rd.writeRU.Load()),
}
}

// Merge implements the RuntimeStats interface.
func (rs *RURuntimeStats) Merge(other *RURuntimeStats) {
rs.readRU.Add(other.readRU.Load())
rs.writeRU.Add(other.writeRU.Load())
func (rd *RUDetails) Merge(other *RUDetails) {
rd.readRU.Add(other.readRU.Load())
rd.writeRU.Add(other.writeRU.Load())
}

// String implements fmt.Stringer interface.
func (rs *RURuntimeStats) String() string {
return fmt.Sprintf("RRU:%f, WRU:%f", rs.readRU.Load(), rs.writeRU.Load())
func (rd *RUDetails) String() string {
return fmt.Sprintf("RRU:%f, WRU:%f", rd.readRU.Load(), rd.writeRU.Load())
}

// RRU returns the read RU.
func (rs RURuntimeStats) RRU() float64 {
return rs.readRU.Load()
func (rd *RUDetails) RRU() float64 {
return rd.readRU.Load()
}

// WRU returns the write RU.
func (rs RURuntimeStats) WRU() float64 {
return rs.writeRU.Load()
func (rd *RUDetails) WRU() float64 {
return rd.writeRU.Load()
}

// Update updates the RU runtime stats with the given consumption info.
func (rs *RURuntimeStats) Update(consumption *rmpb.Consumption) {
if rs == nil || consumption == nil {
func (rd *RUDetails) Update(consumption *rmpb.Consumption) {
if rd == nil || consumption == nil {
return
}
rs.readRU.Add(consumption.RRU)
rs.writeRU.Add(consumption.WRU)
rd.readRU.Add(consumption.RRU)
rd.writeRU.Add(consumption.WRU)
}
Loading