Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Broadcast min_commit_ts for pipelined transactions #1458

Merged
merged 24 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
75892ad
feat: periodic updates of min_commit_ts and broadcast
ekexium Sep 4, 2024
5588fdb
feat: periodic updates of full store list
ekexium Sep 5, 2024
294e614
test: unit test for minCommitTsManager
ekexium Sep 5, 2024
516aa22
fix: set cluster id for broadcast requests
ekexium Sep 5, 2024
fba423c
fix: set resource group related context
ekexium Sep 6, 2024
5adc6c9
limit the update to non-async-commit and non-1pc txns
ekexium Sep 11, 2024
61545b0
broadcast txn status when commit and rollback
ekexium Sep 18, 2024
f3909ea
rename minCommiTS to minCommitTSMgr; refactor broadcastToAllStores
ekexium Sep 19, 2024
ad0302e
fix test
ekexium Sep 19, 2024
d2213c5
Merge branch 'master' of github.com:tikv/client-go into feat-resolved…
ekexium Sep 19, 2024
a98a443
Update txnkv/transaction/2pc.go
ekexium Sep 19, 2024
7bad9fd
refactor broadcastToAllStores to use store goroutine pool
ekexium Sep 19, 2024
4f23a9e
set refreshStoreListInterval to 10s
ekexium Sep 19, 2024
1a9447b
fix: change a read lock to write lock
ekexium Sep 20, 2024
e611350
Apply suggestions from code review
ekexium Sep 23, 2024
f165547
fix mistaken renaming
ekexium Sep 23, 2024
ecb33d5
Update txnkv/transaction/pipelined_flush.go
ekexium Sep 23, 2024
9de2ac9
fix rpc code
ekexium Sep 23, 2024
5fece2a
Merge branch 'feat-resolved-ts-for-large-txn' of github.com:ekexium/c…
ekexium Sep 23, 2024
ca93ef6
comment the check of return values of GetAllStores
ekexium Sep 24, 2024
4a9b779
update kvproto
ekexium Sep 24, 2024
4aada7a
apply suggestions from review
ekexium Sep 24, 2024
012425a
Merge branch 'master' of github.com:tikv/client-go into feat-resolved…
ekexium Sep 24, 2024
ea8180b
update integration_tests dependency
ekexium Sep 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.18.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d h1:vSdKTrF6kpcd56G5BLP0Bz88Nho2tDo7IR1+oSsBAfc=
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f
github.com/pingcap/failpoint v0.0.0-20240527053858-9b3b6e34194a
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d
github.com/pingcap/tidb v1.1.0-beta.0.20240703042657-230bbc2ef5ef
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.9.0
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d h1:vSdKTrF6kpcd56G5BLP0Bz88Nho2tDo7IR1+oSsBAfc=
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d h1:y3EueKVfVykdpTyfUnQGqft0ud+xVFuCdp1XkVL0X1E=
Expand Down
39 changes: 39 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,9 +726,47 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
// cache GC is incompatible with cache refresh
c.bg.schedule(c.gcRoundFunc(cleanRegionNumPerRound), cleanCacheInterval)
}
c.bg.schedule(
func(ctx context.Context, _ time.Time) bool {
refreshFullStoreList(ctx, c.stores)
return false
}, refreshStoreListInterval,
)
return c
}

// Try to refresh full store list. Errors are ignored.
func refreshFullStoreList(ctx context.Context, stores storeCache) {
storeList, err := stores.fetchAllStores(ctx)
if err != nil {
logutil.Logger(ctx).Info("refresh full store list failed", zap.Error(err))
return
}
for _, store := range storeList {
_, exist := stores.get(store.GetId())
if exist {
continue
}
// GetAllStores is supposed to return only Up and Offline stores.
// This check is being defensive and to make it consistent with store resolve code.
if store == nil || store.GetState() == metapb.StoreState_Tombstone {
continue
}
addr := store.GetAddress()
if addr == "" {
continue
}
s := stores.getOrInsertDefault(store.GetId())
// TODO: maybe refactor this, together with other places initializing Store
s.addr = addr
s.peerAddr = store.GetPeerAddress()
s.saddr = store.GetStatusAddress()
s.storeType = tikvrpc.GetStoreTypeByMeta(store)
s.labels = store.GetLabels()
s.changeResolveStateTo(unresolved, resolved)
}
}

// only used fot test.
func newTestRegionCache() *RegionCache {
c := &RegionCache{}
Expand Down Expand Up @@ -2649,6 +2687,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV

const cleanCacheInterval = time.Second
const cleanRegionNumPerRound = 50
const refreshStoreListInterval = 10 * time.Second

// gcScanItemHook is only used for testing
var gcScanItemHook = new(atomic.Pointer[func(*btreeItem)])
Expand Down
4 changes: 4 additions & 0 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,10 @@ func (s *mockTikvGrpcServer) GetHealthFeedback(ctx context.Context, request *kvr
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) BroadcastTxnStatus(ctx context.Context, request *kvrpcpb.BroadcastTxnStatusRequest) (*kvrpcpb.BroadcastTxnStatusResponse, error) {
return nil, errors.New("unreachable")
}

func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled() {
// prepare a mock tikv grpc server
addr := "localhost:56341"
Expand Down
4 changes: 2 additions & 2 deletions tikv/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ const unsafeDestroyRangeTimeout = 5 * time.Minute
// multiple times on an single range.
func (s *KVStore) UnsafeDestroyRange(ctx context.Context, startKey []byte, endKey []byte) error {
// Get all stores every time deleting a region. So the store list is less probably to be stale.
stores, err := s.listStoresForUnsafeDestory(ctx)
stores, err := s.listStoresForUnsafeDestroy(ctx)
if err != nil {
metrics.TiKVUnsafeDestroyRangeFailuresCounterVec.WithLabelValues("get_stores").Inc()
return err
Expand Down Expand Up @@ -366,7 +366,7 @@ func (s *KVStore) UnsafeDestroyRange(ctx context.Context, startKey []byte, endKe
return nil
}

func (s *KVStore) listStoresForUnsafeDestory(ctx context.Context) ([]*metapb.Store, error) {
func (s *KVStore) listStoresForUnsafeDestroy(ctx context.Context) ([]*metapb.Store, error) {
stores, err := s.pdClient.GetAllStores(ctx)
if err != nil {
return nil, errors.WithStack(err)
Expand Down
1 change: 0 additions & 1 deletion tikv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ type testKVSuite struct {
func (s *testKVSuite) SetupTest() {
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
s.Require().Nil(err)
testutils.BootstrapWithSingleStore(cluster)
s.setGetMinResolvedTSByStoresIDs(func(ctx context.Context, ids []uint64) (uint64, map[uint64]uint64, error) {
return 0, nil, nil
})
Expand Down
13 changes: 13 additions & 0 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ const (
CmdLockWaitInfo

CmdGetHealthFeedback
CmdBroadcastTxnStatus

CmdCop CmdType = 512 + iota
CmdCopStream
Expand Down Expand Up @@ -221,6 +222,8 @@ func (t CmdType) String() string {
return "LockWaitInfo"
case CmdGetHealthFeedback:
return "GetHealthFeedback"
case CmdBroadcastTxnStatus:
return "BroadcastTxnStatus"
case CmdFlashbackToVersion:
return "FlashbackToVersion"
case CmdPrepareFlashbackToVersion:
Expand Down Expand Up @@ -568,6 +571,10 @@ func (req *Request) GetHealthFeedback() *kvrpcpb.GetHealthFeedbackRequest {
return req.Req.(*kvrpcpb.GetHealthFeedbackRequest)
}

func (req *Request) BroadcastTxnStatus() *kvrpcpb.BroadcastTxnStatusRequest {
return req.Req.(*kvrpcpb.BroadcastTxnStatusRequest)
}

// FlashbackToVersion returns FlashbackToVersionRequest in request.
func (req *Request) FlashbackToVersion() *kvrpcpb.FlashbackToVersionRequest {
return req.Req.(*kvrpcpb.FlashbackToVersionRequest)
Expand Down Expand Up @@ -653,6 +660,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_BufferBatchGet{BufferBatchGet: req.BufferBatchGet()}}
case CmdGetHealthFeedback:
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_GetHealthFeedback{GetHealthFeedback: req.GetHealthFeedback()}}
case CmdBroadcastTxnStatus:
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_BroadcastTxnStatus{BroadcastTxnStatus: req.BroadcastTxnStatus()}}
}
return nil
}
Expand Down Expand Up @@ -730,6 +739,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) (*Res
return &Response{Resp: res.BufferBatchGet}, nil
case *tikvpb.BatchCommandsResponse_Response_GetHealthFeedback:
return &Response{Resp: res.GetHealthFeedback}, nil
case *tikvpb.BatchCommandsResponse_Response_BroadcastTxnStatus:
return &Response{Resp: res.BroadcastTxnStatus}, nil
}
panic("unreachable")
}
Expand Down Expand Up @@ -1143,6 +1154,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp
resp.Resp, err = client.KvBufferBatchGet(ctx, req.BufferBatchGet())
case CmdGetHealthFeedback:
resp.Resp, err = client.GetHealthFeedback(ctx, req.GetHealthFeedback())
case CmdBroadcastTxnStatus:
resp.Resp, err = client.BroadcastTxnStatus(ctx, req.BroadcastTxnStatus())
default:
return nil, errors.Errorf("invalid request type: %v", req.Type)
}
Expand Down
Loading
Loading