Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: fix misuse of PD client's GetStore #23695

Merged
merged 7 commits into from
Mar 31, 2021
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math/rand"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -30,13 +31,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl/placement"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
pd "github.com/tikv/pd/client"
atomic2 "go.uber.org/atomic"
"go.uber.org/zap"
Expand All @@ -46,6 +40,14 @@ import (
"google.golang.org/grpc/credentials"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"

"github.com/pingcap/tidb/ddl/placement"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
)

const (
Expand Down Expand Up @@ -1768,6 +1770,13 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err
}
}

// A quick and dirty solution to found whether an err is caused by StoreNotFound.
// todo: A better solution, maybe some err-code based error handling?
func isStoreNotFoundError(err error) bool {
return err.Error() == "[pd] store field in rpc response not set" ||
(strings.Contains(err.Error(), "invalid store ID") && strings.Contains(err.Error(), "not found"))
longfangsong marked this conversation as resolved.
Show resolved Hide resolved
}

// reResolve try to resolve addr for store that need check. Returns false if the region is in tombstone state or is
// deleted.
func (s *Store) reResolve(c *RegionCache) (bool, error) {
Expand All @@ -1778,16 +1787,20 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
} else {
metrics.RegionCacheCounterWithGetStoreOK.Inc()
}
if err != nil {
// `err` here can mean "load Store from PD failed" or "store not found"
// if load Store from PD is successful but pd didn't found the store
longfangsong marked this conversation as resolved.
Show resolved Hide resolved
// these error should be handled by next `if` instead of here
if err != nil && !isStoreNotFoundError(err) {
logutil.BgLogger().Error("loadStore from PD failed", zap.Uint64("id", s.storeID), zap.Error(err))
// we cannot do backoff in reResolve loop but try check other store and wait tick.
return false, err
}
if store == nil || store.State == metapb.StoreState_Tombstone {
longfangsong marked this conversation as resolved.
Show resolved Hide resolved
if store == nil {
// store has be removed in PD, we should invalidate all regions using those store.
logutil.BgLogger().Info("invalidate regions in removed store",
zap.Uint64("store", s.storeID), zap.String("add", s.addr))
atomic.AddUint32(&s.epoch, 1)
atomic.StoreUint64(&s.state, uint64(deleted))
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
return false, nil
}
Expand Down