-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
backup: advacned prepare implementation #48439
Changes from 37 commits
e34b096
50cbcde
0390885
b59bf5d
17045f7
4689cb6
45d4c22
a70504b
f388412
7ef7c30
1831260
73d47a5
90d53a2
e221a8d
d434e79
8f391d5
ea8739d
63fc060
d8807d9
5b55f50
26a3e1a
b9ef50c
c895f0a
bf1f201
9e5add6
bfc1486
5211953
694e642
a70403d
440a994
1a27f3a
30220d4
8bdd742
6efcf53
fcff4e6
0b517db
b13969c
b4e0d7c
7a2948e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5862,13 +5862,13 @@ def go_deps(): | |
name = "com_github_pingcap_kvproto", | ||
build_file_proto_mode = "disable_global", | ||
importpath = "github.com/pingcap/kvproto", | ||
sha256 = "69c0e5916c604b358dc854873cb855e053c19da1cab0163fa48797eb119537f9", | ||
strip_prefix = "github.com/pingcap/[email protected]20231226064240-4f28b82c7860", | ||
sha256 = "53da7bf27e06dedfeea1b523941e8adb94d7d7ed5f93dbfb7467cb71b5d19bd6", | ||
strip_prefix = "github.com/pingcap/[email protected]20240109063850-932639606bcf", | ||
urls = [ | ||
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20231226064240-4f28b82c7860.zip", | ||
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20231226064240-4f28b82c7860.zip", | ||
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20231226064240-4f28b82c7860.zip", | ||
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20231226064240-4f28b82c7860.zip", | ||
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240109063850-932639606bcf.zip", | ||
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240109063850-932639606bcf.zip", | ||
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240109063850-932639606bcf.zip", | ||
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240109063850-932639606bcf.zip", | ||
], | ||
) | ||
go_repository( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") | ||
|
||
go_library( | ||
name = "prepare_snap", | ||
srcs = [ | ||
"env.go", | ||
"errors.go", | ||
"prepare.go", | ||
"stream.go", | ||
], | ||
importpath = "github.com/pingcap/tidb/br/pkg/backup/prepare_snap", | ||
visibility = ["//visibility:public"], | ||
deps = [ | ||
"//br/pkg/logutil", | ||
"//br/pkg/utils", | ||
"//pkg/util/engine", | ||
"@com_github_docker_go_units//:go-units", | ||
"@com_github_google_btree//:btree", | ||
"@com_github_pingcap_errors//:errors", | ||
"@com_github_pingcap_kvproto//pkg/brpb", | ||
"@com_github_pingcap_kvproto//pkg/errorpb", | ||
"@com_github_pingcap_kvproto//pkg/metapb", | ||
"@com_github_pingcap_log//:log", | ||
"@com_github_tikv_client_go_v2//tikv", | ||
"@com_github_tikv_pd_client//:client", | ||
"@org_golang_google_grpc//:grpc", | ||
"@org_golang_x_sync//errgroup", | ||
"@org_uber_go_zap//:zap", | ||
"@org_uber_go_zap//zapcore", | ||
], | ||
) | ||
|
||
go_test( | ||
name = "prepare_snap_test", | ||
timeout = "short", | ||
srcs = ["prepare_test.go"], | ||
flaky = True, | ||
shard_count = 7, | ||
deps = [ | ||
":prepare_snap", | ||
"//br/pkg/utils", | ||
"//pkg/store/mockstore/unistore", | ||
"@com_github_pingcap_errors//:errors", | ||
"@com_github_pingcap_kvproto//pkg/brpb", | ||
"@com_github_pingcap_kvproto//pkg/errorpb", | ||
"@com_github_pingcap_kvproto//pkg/metapb", | ||
"@com_github_pingcap_log//:log", | ||
"@com_github_stretchr_testify//require", | ||
"@com_github_tikv_client_go_v2//tikv", | ||
"@com_github_tikv_pd_client//:client", | ||
"@org_uber_go_zap//zapcore", | ||
], | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
// Copyright 2024 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package preparesnap | ||
|
||
import ( | ||
"context" | ||
"slices" | ||
"time" | ||
|
||
"github.com/docker/go-units" | ||
"github.com/pingcap/errors" | ||
brpb "github.com/pingcap/kvproto/pkg/brpb" | ||
"github.com/pingcap/kvproto/pkg/metapb" | ||
"github.com/pingcap/log" | ||
"github.com/pingcap/tidb/br/pkg/logutil" | ||
"github.com/pingcap/tidb/br/pkg/utils" | ||
"github.com/pingcap/tidb/pkg/util/engine" | ||
"github.com/tikv/client-go/v2/tikv" | ||
pd "github.com/tikv/pd/client" | ||
"go.uber.org/zap" | ||
"google.golang.org/grpc" | ||
) | ||
|
||
const ( | ||
// default max gRPC message size is 10MiB. | ||
// split requests to chunks of 1MiB will reduce the possibility of being rejected | ||
// due to max gRPC message size. | ||
maxRequestSize = units.MiB | ||
) | ||
|
||
type Env interface { | ||
ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) | ||
GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) | ||
|
||
LoadRegionsInKeyRange(ctx context.Context, startKey, endKey []byte) (regions []Region, err error) | ||
} | ||
|
||
type PrepareClient interface { | ||
Send(*brpb.PrepareSnapshotBackupRequest) error | ||
Recv() (*brpb.PrepareSnapshotBackupResponse, error) | ||
} | ||
|
||
type SplitRequestClient struct { | ||
PrepareClient | ||
MaxRequestSize int | ||
} | ||
|
||
func (s SplitRequestClient) Send(req *brpb.PrepareSnapshotBackupRequest) error { | ||
// Try best to keeping the request untouched. | ||
rs := req.Regions | ||
if req.Ty == brpb.PrepareSnapshotBackupRequestType_WaitApply && req.Size() > s.MaxRequestSize { | ||
findSplitIndex := func() int { | ||
if len(rs) == 0 { | ||
return -1 | ||
} | ||
|
||
// Select at least one request. | ||
// So we won't get sutck if there were a really huge (!) request. | ||
collected := 0 | ||
lastI := 1 | ||
for i := 2; i < len(rs) && collected+rs[i].Size() < s.MaxRequestSize; i++ { | ||
lastI = i | ||
collected += rs[i].Size() | ||
} | ||
return lastI | ||
} | ||
for splitIdx := findSplitIndex(); splitIdx > 0; splitIdx = findSplitIndex() { | ||
split := &brpb.PrepareSnapshotBackupRequest{ | ||
Ty: brpb.PrepareSnapshotBackupRequestType_WaitApply, | ||
Regions: rs[:splitIdx], | ||
} | ||
rs = rs[splitIdx:] | ||
if err := s.PrepareClient.Send(split); err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
return s.PrepareClient.Send(req) | ||
} | ||
|
||
type Region interface { | ||
GetMeta() *metapb.Region | ||
GetLeaderStoreID() uint64 | ||
} | ||
|
||
type CliEnv struct { | ||
Cache *tikv.RegionCache | ||
Mgr *utils.StoreManager | ||
} | ||
|
||
func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) { | ||
stores, err := c.Cache.PDClient().GetAllStores(ctx, pd.WithExcludeTombstone()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
withoutTiFlash := slices.DeleteFunc(stores, engine.IsTiFlash) | ||
return withoutTiFlash, err | ||
} | ||
|
||
func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) { | ||
var cli brpb.Backup_PrepareSnapshotBackupClient | ||
err := c.Mgr.TryWithConn(ctx, storeID, func(cc *grpc.ClientConn) error { | ||
bcli := brpb.NewBackupClient(cc) | ||
c, err := bcli.PrepareSnapshotBackup(ctx) | ||
if err != nil { | ||
return errors.Annotatef(err, "failed to create prepare backup stream") | ||
} | ||
cli = c | ||
return nil | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return cli, nil | ||
} | ||
|
||
func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) { | ||
bo := tikv.NewBackoffer(ctx, regionCacheMaxBackoffMs) | ||
if len(endKey) == 0 { | ||
// This is encoded [0xff; 8]. | ||
// Workaround for https://github.com/tikv/client-go/issues/1051. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This issue has been fixed, can we remove the workaround and upgrade client-go? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That may not be picked to release version and upgrading |
||
endKey = []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} | ||
} | ||
rs, err := c.Cache.LoadRegionsInKeyRange(bo, startKey, endKey) | ||
if err != nil { | ||
return nil, err | ||
} | ||
rrs := make([]Region, 0, len(rs)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why need we make a local copy? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the type doesn't match. The interface |
||
for _, r := range rs { | ||
rrs = append(rrs, r) | ||
} | ||
return rrs, nil | ||
} | ||
|
||
type RetryAndSplitRequestEnv struct { | ||
Env | ||
GetBackoffer func() utils.Backoffer | ||
} | ||
|
||
func (r RetryAndSplitRequestEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) { | ||
rs := utils.InitialRetryState(50, 10*time.Second, 10*time.Second) | ||
YuJuncen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
bo := utils.Backoffer(&rs) | ||
if r.GetBackoffer != nil { | ||
bo = r.GetBackoffer() | ||
} | ||
cli, err := utils.WithRetryV2(ctx, bo, func(ctx context.Context) (PrepareClient, error) { | ||
cli, err := r.Env.ConnectToStore(ctx, storeID) | ||
if err != nil { | ||
log.Warn("Failed to connect to store, will retry.", zap.Uint64("store", storeID), logutil.ShortError(err)) | ||
return nil, err | ||
} | ||
return cli, nil | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return SplitRequestClient{PrepareClient: cli, MaxRequestSize: maxRequestSize}, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
// Copyright 2024 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package preparesnap | ||
|
||
import ( | ||
"github.com/pingcap/errors" | ||
"github.com/pingcap/kvproto/pkg/errorpb" | ||
) | ||
|
||
func convertErr(err *errorpb.Error) error { | ||
if err == nil { | ||
return nil | ||
} | ||
return errors.New(err.Message) | ||
} | ||
|
||
func leaseExpired() error { | ||
return errors.New("the lease has expired") | ||
} | ||
|
||
func unsupported() error { | ||
return errors.New("unsupported operation") | ||
} | ||
|
||
func retryLimitExceeded() error { | ||
return errors.New("the limit of retrying exceeded") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems it canbe moved into
if req.Ty == xxxxx
, :)