Skip to content

Commit

Permalink
client/controller: wait for tokens to reduce the debet
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Jun 5, 2024
1 parent 21bc172 commit 363c933
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 10 deletions.
30 changes: 20 additions & 10 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type ResourceGroupKVInterceptor interface {
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error)
// OnResponse is used to consume tokens after receiving response.
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, time.Duration, error)
// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
IsBackgroundRequest(ctx context.Context, resourceGroupName, requestResource string) bool
}
Expand Down Expand Up @@ -568,7 +568,7 @@ func (c *ResourceGroupsController) OnRequestWait(
// OnResponse is used to consume tokens after receiving response
func (c *ResourceGroupsController) OnResponse(
resourceGroupName string, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error) {
) (*rmpb.Consumption, time.Duration, error) {
tmp, ok := c.groupsController.Load(resourceGroupName)
if !ok {
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
Expand Down Expand Up @@ -1287,11 +1287,12 @@ func (gc *groupCostController) onRequestWait(

func (gc *groupCostController) onResponse(
req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error) {
) (*rmpb.Consumption, time.Duration, error) {
delta := &rmpb.Consumption{}
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}
var d time.Duration
if !gc.burstable.Load() {
switch gc.mode {
case rmpb.GroupMode_RawMode:
Expand All @@ -1303,16 +1304,25 @@ func (gc *groupCostController) onResponse(
case rmpb.GroupMode_RUMode:
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
// counter.limiter.RemoveTokens(time.Now(), v)
var err error
now := time.Now()
res := counter.limiter.Reserve(context.Background(), gc.mainCfg.LTBMaxWaitDuration, now, v)
if d, err := WaitReservations(context.Background(), now, []*Reservation{res}); err != nil {
return nil, err
} else {
gc.successfulRequestDuration.Observe(d.Seconds())
for i := 0; i < gc.mainCfg.WaitRetryTimes; i++ {
res := counter.limiter.Reserve(context.Background(), gc.mainCfg.LTBMaxWaitDuration/2, now, v)
if d, err = WaitReservations(context.Background(), now, []*Reservation{res}); err != nil {
time.Sleep(gc.mainCfg.WaitRetryInterval)
d+=gc.mainCfg.WaitRetryInterval
continue
} else {
gc.successfulRequestDuration.Observe(d.Seconds())
break
}
}
if err != nil {
return delta, d, err
}
}
}

}
}

Expand All @@ -1330,7 +1340,7 @@ func (gc *groupCostController) onResponse(
add(gc.mu.globalCounter, count)
gc.mu.Unlock()

return delta, nil
return delta,d, nil,
}

// GetActiveResourceGroup is used to get action resource group.
Expand Down
27 changes: 27 additions & 0 deletions pkg/core/region_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package core

import (
"bytes"
"fmt"
"math/rand"
"testing"
Expand Down Expand Up @@ -413,6 +414,32 @@ func mock1MRegionTree() *mockRegionTreeData {
return data
}

func compareUsingBytesEqual(origin, region *RegionInfo) bool {
return !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey())
}

func compareUsingStringConversion(origin, region *RegionInfo) bool {
return string(origin.GetStartKey()) != string(region.GetStartKey()) || string(origin.GetEndKey()) != string(region.GetEndKey())
}

func BenchmarkCompareUsingBytesEqual(b *testing.B) {
origin := &RegionInfo{meta: &metapb.Region{StartKey: []byte(fmt.Sprintf("%20d", 1)), EndKey: []byte(fmt.Sprintf("%20d", 2))}}
region := &RegionInfo{meta: &metapb.Region{StartKey: []byte(fmt.Sprintf("%20d", 1)), EndKey: []byte(fmt.Sprintf("%20d", 3))}}

for i := 0; i < b.N; i++ {
_ = compareUsingBytesEqual(origin, region)
}
}

func BenchmarkCompareUsingStringConversion(b *testing.B) {
origin := &RegionInfo{meta: &metapb.Region{StartKey: []byte(fmt.Sprintf("%20d", 1)), EndKey: []byte(fmt.Sprintf("%20d", 2))}}
region := &RegionInfo{meta: &metapb.Region{StartKey: []byte(fmt.Sprintf("%20d", 1)), EndKey: []byte(fmt.Sprintf("%20d", 3))}}

for i := 0; i < b.N; i++ {
_ = compareUsingStringConversion(origin, region)
}
}

const MaxCount = 1_000_000

func BenchmarkRegionTreeSequentialInsert(b *testing.B) {
Expand Down

0 comments on commit 363c933

Please sign in to comment.