diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 7f9d015db681..1558d5b5d13d 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -66,7 +66,7 @@ type ResourceGroupKVInterceptor interface { // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) // OnResponse is used to consume tokens after receiving response. - OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error) + OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, time.Duration, error) // IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it. IsBackgroundRequest(ctx context.Context, resourceGroupName, requestResource string) bool } @@ -568,7 +568,7 @@ func (c *ResourceGroupsController) OnRequestWait( // OnResponse is used to consume tokens after receiving response func (c *ResourceGroupsController) OnResponse( resourceGroupName string, req RequestInfo, resp ResponseInfo, -) (*rmpb.Consumption, error) { +) (*rmpb.Consumption, time.Duration, error) { tmp, ok := c.groupsController.Load(resourceGroupName) if !ok { log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName)) @@ -1287,11 +1287,12 @@ func (gc *groupCostController) onRequestWait( func (gc *groupCostController) onResponse( req RequestInfo, resp ResponseInfo, -) (*rmpb.Consumption, error) { +) (*rmpb.Consumption, time.Duration, error) { delta := &rmpb.Consumption{} for _, calc := range gc.calculators { calc.AfterKVRequest(delta, req, resp) } + var d time.Duration if !gc.burstable.Load() { switch gc.mode { case rmpb.GroupMode_RawMode: @@ -1303,16 +1304,25 @@ func (gc *groupCostController) onResponse( case rmpb.GroupMode_RUMode: for typ, counter := range gc.run.requestUnitTokens { if v := getRUValueFromConsumption(delta, typ); v > 0 { - // counter.limiter.RemoveTokens(time.Now(), v) + var err error now := time.Now() - res := counter.limiter.Reserve(context.Background(), gc.mainCfg.LTBMaxWaitDuration, now, v) - if d, err := WaitReservations(context.Background(), now, []*Reservation{res}); err != nil { - return nil, err - } else { - gc.successfulRequestDuration.Observe(d.Seconds()) + for i := 0; i < gc.mainCfg.WaitRetryTimes; i++ { + res := counter.limiter.Reserve(context.Background(), gc.mainCfg.LTBMaxWaitDuration/2, now, v) + if d, err = WaitReservations(context.Background(), now, []*Reservation{res}); err != nil { + time.Sleep(gc.mainCfg.WaitRetryInterval) + d+=gc.mainCfg.WaitRetryInterval + continue + } else { + gc.successfulRequestDuration.Observe(d.Seconds()) + break + } + } + if err != nil { + return delta, d, err } } } + } } @@ -1330,7 +1340,7 @@ func (gc *groupCostController) onResponse( add(gc.mu.globalCounter, count) gc.mu.Unlock() - return delta, nil + return delta,d, nil, } // GetActiveResourceGroup is used to get action resource group. diff --git a/pkg/core/region_tree_test.go b/pkg/core/region_tree_test.go index 3f2ca0c1fb8f..b79358cc3fc3 100644 --- a/pkg/core/region_tree_test.go +++ b/pkg/core/region_tree_test.go @@ -15,6 +15,7 @@ package core import ( + "bytes" "fmt" "math/rand" "testing" @@ -413,6 +414,32 @@ func mock1MRegionTree() *mockRegionTreeData { return data } +func compareUsingBytesEqual(origin, region *RegionInfo) bool { + return !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) +} + +func compareUsingStringConversion(origin, region *RegionInfo) bool { + return string(origin.GetStartKey()) != string(region.GetStartKey()) || string(origin.GetEndKey()) != string(region.GetEndKey()) +} + +func BenchmarkCompareUsingBytesEqual(b *testing.B) { + origin := &RegionInfo{meta: &metapb.Region{StartKey: []byte(fmt.Sprintf("%20d", 1)), EndKey: []byte(fmt.Sprintf("%20d", 2))}} + region := &RegionInfo{meta: &metapb.Region{StartKey: []byte(fmt.Sprintf("%20d", 1)), EndKey: []byte(fmt.Sprintf("%20d", 3))}} + + for i := 0; i < b.N; i++ { + _ = compareUsingBytesEqual(origin, region) + } +} + +func BenchmarkCompareUsingStringConversion(b *testing.B) { + origin := &RegionInfo{meta: &metapb.Region{StartKey: []byte(fmt.Sprintf("%20d", 1)), EndKey: []byte(fmt.Sprintf("%20d", 2))}} + region := &RegionInfo{meta: &metapb.Region{StartKey: []byte(fmt.Sprintf("%20d", 1)), EndKey: []byte(fmt.Sprintf("%20d", 3))}} + + for i := 0; i < b.N; i++ { + _ = compareUsingStringConversion(origin, region) + } +} + const MaxCount = 1_000_000 func BenchmarkRegionTreeSequentialInsert(b *testing.B) {