Skip to content

Commit

Permalink
Merge branch 'master' into scatter_bug
Browse files Browse the repository at this point in the history
  • Loading branch information
bufferflies authored Aug 31, 2023
2 parents 8412f8c + 39cff3b commit 9783c53
Show file tree
Hide file tree
Showing 77 changed files with 2,487 additions and 918 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ linters-settings:
excludes:
- G402
- G404
- G601
18 changes: 11 additions & 7 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,18 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/client/tlsutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)

const (
globalDCLocation = "global"
memberUpdateInterval = time.Minute
serviceModeUpdateInterval = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
globalDCLocation = "global"
memberUpdateInterval = time.Minute
serviceModeUpdateInterval = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
updateMemberBackOffBaseTime = 100 * time.Millisecond
)

type serviceType int
Expand Down Expand Up @@ -239,17 +241,19 @@ func (c *pdServiceDiscovery) updateMemberLoop() {
ticker := time.NewTicker(memberUpdateInterval)
defer ticker.Stop()

bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout)
for {
select {
case <-ctx.Done():
log.Info("[pd] exit member loop due to context canceled")
return
case <-ticker.C:
case <-c.checkMembershipCh:
}
failpoint.Inject("skipUpdateMember", func() {
failpoint.Continue()
})
if err := c.updateMember(); err != nil {
if err := bo.Exec(ctx, c.updateMember); err != nil {
log.Error("[pd] failed to update member", zap.Strings("urls", c.GetServiceURLs()), errs.ZapError(err))
}
}
Expand Down Expand Up @@ -319,7 +323,7 @@ func (c *pdServiceDiscovery) GetKeyspaceGroupID() uint32 {
return defaultKeySpaceGroupID
}

// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls.
// DiscoverMicroservice discovers the microservice with the specified type and returns the server urls.
func (c *pdServiceDiscovery) DiscoverMicroservice(svcType serviceType) (urls []string, err error) {
switch svcType {
case apiService:
Expand Down Expand Up @@ -386,7 +390,7 @@ func (c *pdServiceDiscovery) ScheduleCheckMemberChanged() {
}
}

// Immediately check if there is any membership change among the leader/followers in a
// CheckMemberChanged Immediately check if there is any membership change among the leader/followers in a
// quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster.
func (c *pdServiceDiscovery) CheckMemberChanged() error {
return c.updateMember()
Expand Down
5 changes: 5 additions & 0 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,16 @@ func (c *client) createTokenDispatcher() {
tokenBatchController: newTokenBatchController(
make(chan *tokenRequest, 1)),
}
c.wg.Add(1)
go c.handleResourceTokenDispatcher(dispatcherCtx, dispatcher.tokenBatchController)
c.tokenDispatcher = dispatcher
}

func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tbc *tokenBatchController) {
defer func() {
log.Info("[resource manager] exit resource token dispatcher")
c.wg.Done()
}()
var (
connection resourceManagerConnectionContext
firstRequest *tokenRequest
Expand Down
86 changes: 86 additions & 0 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"time"

"github.com/pingcap/failpoint"
)

// BackOffer is a backoff policy for retrying operations.
type BackOffer struct {
max time.Duration
next time.Duration
base time.Duration
}

// Exec is a helper function to exec backoff.
func (bo *BackOffer) Exec(
ctx context.Context,
fn func() error,
) error {
if err := fn(); err != nil {
select {
case <-ctx.Done():
case <-time.After(bo.nextInterval()):
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
})
}
return err
}
// reset backoff when fn() succeed.
bo.resetBackoff()
return nil
}

// InitialBackOffer make the initial state for retrying.
func InitialBackOffer(base, max time.Duration) BackOffer {
return BackOffer{
max: max,
base: base,
next: base,
}
}

// nextInterval for now use the `exponentialInterval`.
func (bo *BackOffer) nextInterval() time.Duration {
return bo.exponentialInterval()
}

// exponentialInterval returns the exponential backoff duration.
func (bo *BackOffer) exponentialInterval() time.Duration {
backoffInterval := bo.next
bo.next *= 2
if bo.next > bo.max {
bo.next = bo.max
}
return backoffInterval
}

// resetBackoff resets the backoff to initial state.
func (bo *BackOffer) resetBackoff() {
bo.next = bo.base
}

// Only used for test.
var testBackOffExecuteFlag = false

// TestBackOffExecute Only used for test.
func TestBackOffExecute() bool {
return testBackOffExecuteFlag
}
47 changes: 47 additions & 0 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/stretchr/testify/require"
)

func TestExponentialBackoff(t *testing.T) {
re := require.New(t)

baseBackoff := 100 * time.Millisecond
maxBackoff := 1 * time.Second

backoff := InitialBackOffer(baseBackoff, maxBackoff)
re.Equal(backoff.nextInterval(), baseBackoff)
re.Equal(backoff.nextInterval(), 2*baseBackoff)

for i := 0; i < 10; i++ {
re.LessOrEqual(backoff.nextInterval(), maxBackoff)
}
re.Equal(backoff.nextInterval(), maxBackoff)

// Reset backoff
backoff.resetBackoff()
err := backoff.Exec(context.Background(), func() error {
return errors.New("test")
})
re.Error(err)
}
4 changes: 3 additions & 1 deletion client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/client/timerpool"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -389,6 +390,7 @@ func (c *tsoClient) handleDispatcher(
// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(c.option.timeout)
defer streamLoopTimer.Stop()
bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout)
tsoBatchLoop:
for {
select {
Expand Down Expand Up @@ -498,7 +500,7 @@ tsoBatchLoop:
stream = nil
// Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP.
if IsLeaderChange(err) {
if err := c.svcDiscovery.CheckMemberChanged(); err != nil {
if err := bo.Exec(dispatcherCtx, c.svcDiscovery.CheckMemberChanged); err != nil {
select {
case <-dispatcherCtx.Done():
return
Expand Down
2 changes: 1 addition & 1 deletion client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (c *tsoServiceDiscovery) ScheduleCheckMemberChanged() {
}
}

// Immediately check if there is any membership change among the primary/secondaries in
// CheckMemberChanged Immediately check if there is any membership change among the primary/secondaries in
// a primary/secondary configured cluster.
func (c *tsoServiceDiscovery) CheckMemberChanged() error {
c.apiSvcDiscovery.CheckMemberChanged()
Expand Down
5 changes: 5 additions & 0 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ import (
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/join"
"go.uber.org/zap"

// register microservice API
_ "github.com/tikv/pd/pkg/mcs/resourcemanager/server/install"
_ "github.com/tikv/pd/pkg/mcs/scheduling/server/install"
_ "github.com/tikv/pd/pkg/mcs/tso/server/install"
)

func main() {
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,11 @@ error = '''
leader is nil
'''

["PD:server:ErrRateLimitExceeded"]
error = '''
rate limit exceeded
'''

["PD:server:ErrServerNotStarted"]
error = '''
server not started
Expand Down
8 changes: 4 additions & 4 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -7091,14 +7091,14 @@
"steppedLine": false,
"targets": [
{
"expr": "-sum(delta(pd_scheduler_balance_leader{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store-out\", instance=\"$instance\", type=\"move-leader\"}[1m])) by (store)",
"expr": "-sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-leader-scheduler\"}[1m])) by (source)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}",
"refId": "A"
},
{
"expr": "sum(delta(pd_scheduler_balance_leader{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store-in\", instance=\"$instance\", type=\"move-leader\"}[1m])) by (store)",
"expr": "sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-leader-scheduler\"}[1m])) by (target)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}",
Expand Down Expand Up @@ -7195,14 +7195,14 @@
"steppedLine": false,
"targets": [
{
"expr": "-sum(delta(pd_scheduler_balance_region{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store-out\", instance=\"$instance\", type=\"move-peer\"}[1m])) by (store)",
"expr": "-sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-region-scheduler\"}[1m])) by (source)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}",
"refId": "A"
},
{
"expr": "sum(delta(pd_scheduler_balance_region{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store-in\", instance=\"$instance\", type=\"move-peer\"}[1m])) by (store)",
"expr": "sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-region-scheduler\"}[1m])) by (target)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}",
Expand Down
8 changes: 2 additions & 6 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1660,15 +1660,11 @@ func DiffRegionKeyInfo(origin *RegionInfo, other *RegionInfo) string {
}

// String converts slice of bytes to string without copy.
func String(b []byte) (s string) {
func String(b []byte) string {
if len(b) == 0 {
return ""
}
pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b))
pstring := (*reflect.StringHeader)(unsafe.Pointer(&s))
pstring.Data = pbytes.Data
pstring.Len = pbytes.Len
return
return unsafe.String(unsafe.SliceData(b), len(b))
}

// ToUpperASCIIInplace bytes.ToUpper but zero-cost
Expand Down
Loading

0 comments on commit 9783c53

Please sign in to comment.