Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into fastCreateTableEn…
Browse files Browse the repository at this point in the history
…hance
  • Loading branch information
GMHDBJD committed Jul 9, 2024
2 parents 2b8e6a7 + 49f1427 commit 4274cef
Show file tree
Hide file tree
Showing 382 changed files with 17,358 additions and 11,065 deletions.
62 changes: 44 additions & 18 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7180,26 +7180,26 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "bd23e08aaab7692e442bb0a8bf5889bca086e50e3c53124ace7d4f476b0b95d4",
strip_prefix = "github.com/tikv/client-go/[email protected].20240626064248-4a72526f6c30",
sha256 = "0093081c01fd5119490fb4145f770eb8a90da6c4e0e02708dae7b1fe24668cb2",
strip_prefix = "github.com/tikv/client-go/[email protected].20240703095801-d73cc1ed6503",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240626064248-4a72526f6c30.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240626064248-4a72526f6c30.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240626064248-4a72526f6c30.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240626064248-4a72526f6c30.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240703095801-d73cc1ed6503.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240703095801-d73cc1ed6503.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240703095801-d73cc1ed6503.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240703095801-d73cc1ed6503.zip",
],
)
go_repository(
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sha256 = "107218d0f9404ed51c23fda05b4fdac1bd3dbe8c8724c63ceddb4cbc7e77ceaf",
strip_prefix = "github.com/tikv/pd/[email protected]20240620115049-049de1761e56",
sha256 = "af957cdaccb24818d126f992ff3677b04adb2e80b53b898d5bab6f134a144120",
strip_prefix = "github.com/tikv/pd/[email protected]20240703065657-6b25787aff4d",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240620115049-049de1761e56.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240620115049-049de1761e56.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240620115049-049de1761e56.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240620115049-049de1761e56.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240703065657-6b25787aff4d.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240703065657-6b25787aff4d.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240703065657-6b25787aff4d.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240703065657-6b25787aff4d.zip",
],
)
go_repository(
Expand Down Expand Up @@ -7644,6 +7644,19 @@ def go_deps():
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/yagipy/maintidx/com_github_yagipy_maintidx-v1.0.0.zip",
],
)
go_repository(
name = "com_github_yangkeao_go_mysql_driver",
build_file_proto_mode = "disable_global",
importpath = "github.com/YangKeao/go-mysql-driver",
sha256 = "66ba9bed8b68899ea4adc729fbaf160bd634fe1afa51621a3dc5b153b538eb57",
strip_prefix = "github.com/YangKeao/[email protected]",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/YangKeao/go-mysql-driver/com_github_yangkeao_go_mysql_driver-v0.0.0-20240627104025-dd5589458cfa.zip",
"http://ats.apps.svc/gomod/github.com/YangKeao/go-mysql-driver/com_github_yangkeao_go_mysql_driver-v0.0.0-20240627104025-dd5589458cfa.zip",
"https://cache.hawkingrei.com/gomod/github.com/YangKeao/go-mysql-driver/com_github_yangkeao_go_mysql_driver-v0.0.0-20240627104025-dd5589458cfa.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/YangKeao/go-mysql-driver/com_github_yangkeao_go_mysql_driver-v0.0.0-20240627104025-dd5589458cfa.zip",
],
)
go_repository(
name = "com_github_yeya24_promlinter",
build_file_proto_mode = "disable_global",
Expand Down Expand Up @@ -9685,6 +9698,19 @@ def go_deps():
"https://storage.googleapis.com/pingcapmirror/gomod/go.etcd.io/gofail/io_etcd_go_gofail-v0.1.0.zip",
],
)
go_repository(
name = "io_filippo_edwards25519",
build_file_proto_mode = "disable_global",
importpath = "filippo.io/edwards25519",
sha256 = "9ac43a686d06fdebd719f7af3866c87eb069302272dfb131007adf471c308b65",
strip_prefix = "filippo.io/[email protected]",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/filippo.io/edwards25519/io_filippo_edwards25519-v1.1.0.zip",
"http://ats.apps.svc/gomod/filippo.io/edwards25519/io_filippo_edwards25519-v1.1.0.zip",
"https://cache.hawkingrei.com/gomod/filippo.io/edwards25519/io_filippo_edwards25519-v1.1.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/filippo.io/edwards25519/io_filippo_edwards25519-v1.1.0.zip",
],
)
go_repository(
name = "io_k8s_api",
build_file_proto_mode = "disable_global",
Expand Down Expand Up @@ -10365,13 +10391,13 @@ def go_deps():
name = "org_golang_x_sys",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/sys",
sha256 = "2588f4e77c83774bc6f168e8594f2dcbd21c1d26849a877f7e6a0d151392e735",
strip_prefix = "golang.org/x/sys@v0.21.0",
sha256 = "2434299f530b049a5c8121d6465751ce58bd62f939afde34c442f79c88e9033c",
strip_prefix = "golang.org/x/sys@v0.22.0",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/golang.org/x/sys/org_golang_x_sys-v0.21.0.zip",
"http://ats.apps.svc/gomod/golang.org/x/sys/org_golang_x_sys-v0.21.0.zip",
"https://cache.hawkingrei.com/gomod/golang.org/x/sys/org_golang_x_sys-v0.21.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/golang.org/x/sys/org_golang_x_sys-v0.21.0.zip",
"http://bazel-cache.pingcap.net:8080/gomod/golang.org/x/sys/org_golang_x_sys-v0.22.0.zip",
"http://ats.apps.svc/gomod/golang.org/x/sys/org_golang_x_sys-v0.22.0.zip",
"https://cache.hawkingrei.com/gomod/golang.org/x/sys/org_golang_x_sys-v0.22.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/golang.org/x/sys/org_golang_x_sys-v0.22.0.zip",
],
)
go_repository(
Expand Down
3 changes: 2 additions & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ go_rules_dependencies()
go_download_sdk(
name = "go_sdk",
urls = [
"https://cache.hawkingrei.com/golang/{}",
"http://ats.apps.svc/golang/{}",
"http://bazel-cache.pingcap.net:8080/golang/{}",
"https://mirrors.aliyun.com/golang/{}",
"https://dl.google.com/go/{}",
],
version = "1.21.11",
version = "1.21.12",
)

go_register_toolchains(
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,8 @@ func (c *pdClient) ScanRegions(ctx context.Context, key, endKey []byte, limit in
failpoint.Return(nil, status.Error(codes.Unavailable, "not leader"))
})

regions, err := c.client.BatchScanRegions(ctx, []pd.KeyRange{{StartKey: key, EndKey: endKey}}, limit)
//nolint:staticcheck
regions, err := c.client.ScanRegions(ctx, key, endKey, limit)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
18 changes: 15 additions & 3 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,22 @@ func autoNewCred(qs *backuppb.S3) (cred *credentials.Credentials, err error) {
func createOssRAMCred() (*credentials.Credentials, error) {
cred, err := aliproviders.NewInstanceMetadataProvider().Retrieve()
if err != nil {
return nil, errors.Annotate(err, "Alibaba RAM Provider Retrieve")
log.Warn("failed to get aliyun ram credential", zap.Error(err))
return nil, nil
}
var aliCred, ok = cred.(*alicred.StsTokenCredential)
if !ok {
return nil, errors.Errorf("invalid credential type %T", cred)
}
newCred := credentials.NewChainCredentials([]credentials.Provider{
&credentials.EnvProvider{},
&credentials.SharedCredentialsProvider{},
&credentials.StaticProvider{Value: credentials.Value{AccessKeyID: aliCred.AccessKeyId, SecretAccessKey: aliCred.AccessKeySecret, SessionToken: aliCred.AccessKeyStsToken, ProviderName: ""}},
})
if _, err := newCred.Get(); err != nil {
return nil, errors.Trace(err)
}
ncred := cred.(*alicred.StsTokenCredential)
return credentials.NewStaticCredentials(ncred.AccessKeyId, ncred.AccessKeySecret, ncred.AccessKeyStsToken), nil
return newCred, nil
}

// NewS3Storage initialize a new s3 storage for metadata.
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/streamhelper/advancer_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func (c PDRegionScanner) FetchCurrentTS(ctx context.Context) (uint64, error) {
// RegionScan gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
func (c PDRegionScanner) RegionScan(ctx context.Context, key, endKey []byte, limit int) ([]RegionWithLeader, error) {
rs, err := c.Client.BatchScanRegions(ctx, []pd.KeyRange{{StartKey: key, EndKey: endKey}}, limit)
//nolint:staticcheck
rs, err := c.Client.ScanRegions(ctx, key, endKey, limit)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ go_library(
"//pkg/util",
"//pkg/util/cdcutil",
"//pkg/util/collate",
"//pkg/util/engine",
"//pkg/util/mathutil",
"//pkg/util/table-filter",
"@com_github_docker_go_units//:go-units",
Expand All @@ -83,6 +84,7 @@ go_library(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@com_google_cloud_go_storage//:storage",
"@io_etcd_go_etcd_client_pkg_v3//transport",
"@io_etcd_go_etcd_client_v3//:client",
Expand All @@ -108,7 +110,7 @@ go_test(
],
embed = [":task"],
flaky = True,
shard_count = 29,
shard_count = 32,
deps = [
"//br/pkg/backup",
"//br/pkg/config",
Expand Down Expand Up @@ -144,6 +146,7 @@ go_test(
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@org_golang_google_grpc//keepalive",
],
)
153 changes: 149 additions & 4 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"time"

"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -38,11 +39,13 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/engine"
"github.com/pingcap/tidb/pkg/util/mathutil"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/http"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -98,6 +101,7 @@ const (
defaultFlagDdlBatchSize = 128
resetSpeedLimitRetryTimes = 3
maxRestoreBatchSizeLimit = 10240
pb = 1024 * 1024 * 1024 * 1024 * 1024
)

const (
Expand Down Expand Up @@ -820,6 +824,12 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return errors.Annotate(berrors.ErrRestoreInvalidBackup, "contain tables but no databases")
}

if cfg.CheckRequirements {
if err := checkDiskSpace(ctx, mgr, files, tables); err != nil {
return errors.Trace(err)
}
}

archiveSize := reader.ArchiveSize(ctx, files)
g.Record(summary.RestoreDataSize, archiveSize)
//restore from tidb will fetch a general Size issue https://github.com/pingcap/tidb/issues/27247
Expand Down Expand Up @@ -938,10 +948,6 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return errors.Trace(err)
}

var newTS uint64
if client.IsIncremental() {
newTS = restoreTS
}
ddlJobs := FilterDDLJobs(client.GetDDLJobs(), tables)
ddlJobs = FilterDDLJobByRules(ddlJobs, DDLJobBlockListRule)

Expand Down Expand Up @@ -1000,6 +1006,17 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return errors.Trace(err)
}

var newTS uint64
if client.IsIncremental() {
// we need to get the new ts after execDDL
// or backfilled data in upstream may not be covered by
// the new ts.
// see https://github.com/pingcap/tidb/issues/54426
newTS, err = restore.GetTSWithRetry(ctx, mgr.GetPDClient())
if err != nil {
return errors.Trace(err)
}
}
// We make bigger errCh so we won't block on multi-part failed.
errCh := make(chan error, 32)

Expand Down Expand Up @@ -1166,6 +1183,134 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return nil
}

func getMaxReplica(ctx context.Context, mgr *conn.Mgr) (cnt uint64, err error) {
var resp map[string]any
err = utils.WithRetry(ctx, func() error {
resp, err = mgr.GetPDHTTPClient().GetReplicateConfig(ctx)
return err
}, utils.NewPDReqBackoffer())
if err != nil {
return 0, errors.Trace(err)
}

key := "max-replicas"
val, ok := resp[key]
if !ok {
return 0, errors.Errorf("key %s not found in response %v", key, resp)
}
return uint64(val.(float64)), nil
}

func getStores(ctx context.Context, mgr *conn.Mgr) (stores *http.StoresInfo, err error) {
err = utils.WithRetry(ctx, func() error {
stores, err = mgr.GetPDHTTPClient().GetStores(ctx)
return err
}, utils.NewPDReqBackoffer())
if err != nil {
return nil, errors.Trace(err)
}
return stores, nil
}

func EstimateTikvUsage(files []*backuppb.File, maxReplica uint64, storeCnt int) uint64 {
if storeCnt == 0 {
return 0
}
var totalSize uint64 = 0
for _, file := range files {
totalSize += file.GetSize_()
}
return totalSize * maxReplica / uint64(storeCnt)
}

func EstimateTiflashUsage(tables []*metautil.Table, storeCnt int) uint64 {
if storeCnt == 0 {
return 0
}
var tiflashTotal uint64 = 0
for _, table := range tables {
if table.TiFlashReplicas <= 0 {
continue
}
tableBytes := uint64(0)
for _, file := range table.Files {
tableBytes += file.GetSize_()
}
tiflashTotal += tableBytes * uint64(table.TiFlashReplicas)
}
return tiflashTotal / uint64(storeCnt)
}

func CheckStoreSpace(necessary uint64, store *http.StoreInfo) error {
// Be careful editing the message, it is used in DiskCheckBackoffer
available, err := units.RAMInBytes(store.Status.Available)
if err != nil {
return errors.Annotatef(berrors.ErrPDInvalidResponse, "store %d has invalid available space %s", store.Store.ID, store.Status.Available)
}
if available <= 0 {
return errors.Annotatef(berrors.ErrPDInvalidResponse, "store %d has invalid available space %s", store.Store.ID, store.Status.Available)
}
if uint64(available) < necessary {
return errors.Errorf("store %d has no space left on device, available %s, necessary %s",
store.Store.ID, units.BytesSize(float64(available)), units.BytesSize(float64(necessary)))
}
return nil
}

func checkDiskSpace(ctx context.Context, mgr *conn.Mgr, files []*backuppb.File, tables []*metautil.Table) error {
maxReplica, err := getMaxReplica(ctx, mgr)
if err != nil {
return errors.Trace(err)
}
stores, err := getStores(ctx, mgr)
if err != nil {
return errors.Trace(err)
}

tikvCnt, tiflashCnt := 0, 0
for i := range stores.Stores {
store := &stores.Stores[i]
if engine.IsTiFlashHTTPResp(&store.Store) {
tiflashCnt += 1
continue
}
tikvCnt += 1
}

// We won't need to restore more than 1800 PB data at one time, right?
preserve := func(base uint64, ratio float32) uint64 {
if base > 1000*pb {
return base
}
return base * uint64(ratio*10) / 10
}
tikvUsage := preserve(EstimateTikvUsage(files, maxReplica, tikvCnt), 1.1)
tiflashUsage := preserve(EstimateTiflashUsage(tables, tiflashCnt), 1.1)

err = utils.WithRetry(ctx, func() error {
stores, err = getStores(ctx, mgr)
if err != nil {
return errors.Trace(err)
}
for _, store := range stores.Stores {
if engine.IsTiFlashHTTPResp(&store.Store) {
if err := CheckStoreSpace(tiflashUsage, &store); err != nil {
return errors.Trace(err)
}
continue
}
if err := CheckStoreSpace(tikvUsage, &store); err != nil {
return errors.Trace(err)
}
}
return nil
}, utils.NewDiskCheckBackoffer())
if err != nil {
return errors.Trace(err)
}
return nil
}

// Exhaust drains all remaining errors in the channel, into a slice of errors.
func Exhaust(ec <-chan error) []error {
out := make([]error, 0, len(ec))
Expand Down
Loading

0 comments on commit 4274cef

Please sign in to comment.