Skip to content

Commit

Permalink
Merge pull request pingcap#25 from 3pointer/split_compacted_start
Browse files Browse the repository at this point in the history
implement split start key on compacted ssts
  • Loading branch information
3pointer authored Aug 15, 2024
2 parents fdb47eb + 557c02d commit d97015a
Show file tree
Hide file tree
Showing 13 changed files with 248 additions and 88 deletions.
24 changes: 12 additions & 12 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6140,13 +6140,13 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sha256 = "8796fef08253250c98b36069fca29d538331068ff0f3f79ab6bc723b8e392bb6",
strip_prefix = "github.com/yujuncen/[email protected]20240709031634-baefbf030e28",
sha256 = "44b76999a3f190a136e0d0d8b2a47d8fb03072afac6e6f4f1cd7e2b66209f8a1",
strip_prefix = "github.com/YuJuncen/[email protected]20240725064730-6d93a3086bf2",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20240709031634-baefbf030e28.zip",
"http://ats.apps.svc/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20240709031634-baefbf030e28.zip",
"https://cache.hawkingrei.com/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20240709031634-baefbf030e28.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20240709031634-baefbf030e28.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/YuJuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20240725064730-6d93a3086bf2.zip",
"http://ats.apps.svc/gomod/github.com/YuJuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20240725064730-6d93a3086bf2.zip",
"https://cache.hawkingrei.com/gomod/github.com/YuJuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20240725064730-6d93a3086bf2.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/YuJuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20240725064730-6d93a3086bf2.zip",
],
)
go_repository(
Expand Down Expand Up @@ -9637,13 +9637,13 @@ def go_deps():
name = "com_sourcegraph_sourcegraph_appdash_data",
build_file_proto_mode = "disable_global",
importpath = "sourcegraph.com/sourcegraph/appdash-data",
sha256 = "59b71fa8cdb0fe2b1c02739ccf2daeaf28f2e22c4b178cdc8e1b902ad1022bc0",
strip_prefix = "github.com/sourcegraph/[email protected]",
sha256 = "382adefecd62bb79172e2552bcfb7d45f47122f9bd22259b0566b26fb2627b87",
strip_prefix = "sourcegraph.com/sourcegraph/[email protected]",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/sourcegraph/appdash-data/com_github_sourcegraph_appdash_data-v0.0.0-20151005221446-73f23eafcf67.zip",
"http://ats.apps.svc/gomod/github.com/sourcegraph/appdash-data/com_github_sourcegraph_appdash_data-v0.0.0-20151005221446-73f23eafcf67.zip",
"https://cache.hawkingrei.com/gomod/github.com/sourcegraph/appdash-data/com_github_sourcegraph_appdash_data-v0.0.0-20151005221446-73f23eafcf67.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/sourcegraph/appdash-data/com_github_sourcegraph_appdash_data-v0.0.0-20151005221446-73f23eafcf67.zip",
"http://bazel-cache.pingcap.net:8080/gomod/sourcegraph.com/sourcegraph/appdash-data/com_sourcegraph_sourcegraph_appdash_data-v0.0.0-20151005221446-73f23eafcf67.zip",
"http://ats.apps.svc/gomod/sourcegraph.com/sourcegraph/appdash-data/com_sourcegraph_sourcegraph_appdash_data-v0.0.0-20151005221446-73f23eafcf67.zip",
"https://cache.hawkingrei.com/gomod/sourcegraph.com/sourcegraph/appdash-data/com_sourcegraph_sourcegraph_appdash_data-v0.0.0-20151005221446-73f23eafcf67.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/sourcegraph.com/sourcegraph/appdash-data/com_sourcegraph_sourcegraph_appdash_data-v0.0.0-20151005221446-73f23eafcf67.zip",
],
)
go_repository(
Expand Down
13 changes: 7 additions & 6 deletions br/pkg/restore/import_mode_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package restore
import (
"context"
"crypto/tls"
"sync"
"time"

_ "github.com/go-sql-driver/mysql" // mysql driver
Expand Down Expand Up @@ -47,8 +48,8 @@ func NewImportModeSwitcher(
}

// switchToNormalMode switch tikv cluster to normal mode.
func (switcher *ImportModeSwitcher) switchToNormalMode(ctx context.Context) error {
close(switcher.switchCh)
func (switcher *ImportModeSwitcher) SwitchToNormalMode(ctx context.Context) error {
sync.OnceFunc(func() { close(switcher.switchCh) })
return switcher.switchTiKVMode(ctx, import_sstpb.SwitchMode_Normal)
}

Expand Down Expand Up @@ -113,8 +114,8 @@ func (switcher *ImportModeSwitcher) switchTiKVMode(
return nil
}

// switchToImportMode switch tikv cluster to import mode.
func (switcher *ImportModeSwitcher) switchToImportMode(
// SwitchToImportMode switch tikv cluster to import mode.
func (switcher *ImportModeSwitcher) SwitchToImportMode(
ctx context.Context,
) {
// tikv automatically switch to normal mode in every 10 minutes
Expand Down Expand Up @@ -163,7 +164,7 @@ func RestorePreWork(

if switchToImport {
// Switch TiKV cluster to import mode (adjust rocksdb configuration).
switcher.switchToImportMode(ctx)
switcher.SwitchToImportMode(ctx)
}

return mgr.RemoveSchedulersWithConfig(ctx)
Expand All @@ -186,7 +187,7 @@ func RestorePostWork(
ctx = context.Background()
}

if err := switcher.switchToNormalMode(ctx); err != nil {
if err := switcher.SwitchToNormalMode(ctx); err != nil {
log.Warn("fail to switch to normal mode", zap.Error(err))
}
if err := restoreSchedulers(ctx); err != nil {
Expand Down
27 changes: 23 additions & 4 deletions br/pkg/restore/internal/utils/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,31 @@ import (
"go.uber.org/zap"
)

type SplitOption interface {
apply(*RegionSplitter)
}

type UseStartKeyOption struct{}

func (o *UseStartKeyOption) apply(r *RegionSplitter) {
r.useStartKey = true
}

// RegionSplitter is a executor of region split by rules.
type RegionSplitter struct {
client split.SplitClient
useStartKey bool
client split.SplitClient
}

// NewRegionSplitter returns a new RegionSplitter.
func NewRegionSplitter(client split.SplitClient) *RegionSplitter {
return &RegionSplitter{
func NewRegionSplitter(client split.SplitClient, opts ...SplitOption) *RegionSplitter {
r := &RegionSplitter{
client: client,
}
for _, opt := range opts {
opt.apply(r)
}
return r
}

// SplitWaitAndScatter expose the function `SplitWaitAndScatter` of split client.
Expand Down Expand Up @@ -67,7 +82,11 @@ func (rs *RegionSplitter) ExecuteSplit(
sortedKeys := make([][]byte, 0, len(sortedRanges))
totalRangeSize := uint64(0)
for _, r := range sortedRanges {
sortedKeys = append(sortedKeys, r.EndKey)
if rs.useStartKey {
sortedKeys = append(sortedKeys, r.StartKey)
} else {
sortedKeys = append(sortedKeys, r.EndKey)
}
totalRangeSize += r.Size
}
// the range size must be greater than 0 here
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ go_library(
"//br/pkg/restore/internal/import_client",
"//br/pkg/restore/internal/log_split",
"//br/pkg/restore/internal/rawkv",
"//br/pkg/restore/snap_client/sstfiles",
"//br/pkg/restore/split",
"//br/pkg/restore/tiflashrec",
"//br/pkg/restore/utils",
"//br/pkg/rtree",
"//br/pkg/storage",
"//br/pkg/stream",
"//br/pkg/summary",
Expand Down Expand Up @@ -84,7 +86,7 @@ go_test(
],
embed = [":log_client"],
flaky = True,
shard_count = 39,
shard_count = 40,
deps = [
"//br/pkg/errors",
"//br/pkg/gluetidb",
Expand Down
161 changes: 130 additions & 31 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
package logclient

import (
"bytes"
"cmp"
"context"
"crypto/tls"
"fmt"
"math"
"os"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -49,6 +49,7 @@ import (
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/summary"
Expand All @@ -61,6 +62,7 @@ import (
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/model"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/codec"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/tikv/client-go/v2/config"
kvutil "github.com/tikv/client-go/v2/util"
Expand All @@ -77,16 +79,18 @@ const maxSplitKeysOnce = 10240

// rawKVBatchCount specifies the count of entries that the rawkv client puts into TiKV.
const rawKVBatchCount = 64
const compactedSSTBatchSize = 16

type LogClient struct {
restorer sstfiles.FileRestorer
cipher *backuppb.CipherInfo
pdClient pd.Client
pdHTTPClient pdhttp.Client
clusterID uint64
dom *domain.Domain
tlsConf *tls.Config
keepaliveConf keepalive.ClientParameters
restorer sstfiles.FileRestorer
cipher *backuppb.CipherInfo
pdClient pd.Client
pdHTTPClient pdhttp.Client
clusterID uint64
dom *domain.Domain
tlsConf *tls.Config
keepaliveConf keepalive.ClientParameters
concurrencyPerStore uint

rawKVClient *rawkv.RawKVBatchClient
storage storage.ExternalStorage
Expand Down Expand Up @@ -150,42 +154,132 @@ func (rc *LogClient) Close() {
log.Info("Restore client closed")
}

func (rc *LogClient) RestoreCompactedSsts(ctx context.Context, rules map[int64]*restoreutils.RewriteRules, compactionsIter iter.TryNextor[*backuppb.LogFileSubcompaction]) error {
eg, eCtx := errgroup.WithContext(ctx)
for tid, rules := range rules {
log.Info("Using rewrite rules.", zap.Int64("table_id", tid), zap.Stringer("rules", rules))
}
type CompactedItem struct {
files []sstfiles.SstFilesInfo
regionMinKeys [][]byte
}

func (c *CompactedItem) Append(file sstfiles.SstFilesInfo, key []byte) {
c.files = append(c.files, file)
c.regionMinKeys = append(c.regionMinKeys, key)
}

func (c *CompactedItem) Size() int {
return len(c.files)
}

func (rc *LogClient) CollectCompactedSsts(
ctx context.Context,
rules map[int64]*restoreutils.RewriteRules,
compactionsIter iter.TryNextor[*backuppb.LogFileSubcompaction],
) (int, map[uint64]*CompactedItem, error) {
totalSSTCount := 0
regionCompactedMap := make(map[uint64]*CompactedItem)
// read unorder files
for r := compactionsIter.TryNext(ctx); !r.Finished; r = compactionsIter.TryNext(ctx) {
if r.Err != nil {
return r.Err
return 0, nil, r.Err
}
i := r.Item
rewriteRules, ok := rules[i.Meta.TableId]
if !ok {
log.Warn("Skipped excluded tables.", zap.Int64("table_id", i.Meta.TableId))
continue
}
// no need to merge sst files here
ranges, _, err := restoreutils.MergeAndRewriteFileRanges(i.SstOutputs, rewriteRules, 0, 0)

if _, ok := regionCompactedMap[i.Meta.RegionId]; !ok {
c := CompactedItem{
files: make([]sstfiles.SstFilesInfo, 0, compactedSSTBatchSize),
// TODO When log backup supports record region range, use it to split ranges.
// currently, we use the min key of the record key and rewrite it to split ranges.
regionMinKeys: make([][]byte, 0, compactedSSTBatchSize),
}
regionCompactedMap[i.Meta.RegionId] = &c
}
// translate min key to raw key
_, rawMinKey, err := codec.DecodeBytes(i.Meta.MinKey, nil)
if err != nil {
return errors.Trace(err)
return 0, nil, errors.Trace(err)
}
restoreFiles := sstfiles.SstFilesInfo{
totalSSTCount += len(i.SstOutputs)

regionCompactedMap[i.Meta.RegionId].Append(sstfiles.SstFilesInfo{
TableID: i.Meta.TableId,
Files: i.SstOutputs,
RewriteRules: rewriteRules,
}, rawMinKey)
}
return totalSSTCount, regionCompactedMap, nil
}

func (rc *LogClient) RestoreCompactedSsts(
ctx context.Context,
regionCompactedMap map[uint64]*CompactedItem,
importModeSwitcher *restore.ImportModeSwitcher,
onProgress func(),
) error {
// need to enter import mode before restore SST files
// it will set to noral mode whatever
importModeSwitcher.SwitchToImportMode(ctx)
defer importModeSwitcher.SwitchToNormalMode(ctx)

splitRanges := make([]rtree.Range, 0, compactedSSTBatchSize)

eg, eCtx := errgroup.WithContext(ctx)
// split every cnt regions
cnt := 0
for regionId, CompactedItems := range regionCompactedMap {
cnt += 1
if cnt >= compactedSSTBatchSize {
var regionSplitKey []byte
var splitKeyIndex int
for i, minKey := range CompactedItems.regionMinKeys {
if len(regionSplitKey) == 0 || bytes.Compare(minKey, regionSplitKey) < 0 {
regionSplitKey = minKey
splitKeyIndex = i
}
}
log.Info("find min split key for region",
zap.Uint64("region_id", regionId),
logutil.Key("split_key", regionSplitKey),
zap.Int("index", splitKeyIndex))

// build split ranges
tmpRng := rtree.Range{
StartKey: []byte(regionSplitKey),
// ignored this field, because we use start key to split region.
EndKey: []byte(regionSplitKey),
}
// only one range in the region should split
rg, err := restoreutils.RewriteRange(&tmpRng, CompactedItems.files[splitKeyIndex].RewriteRules)
if err != nil {
return errors.Trace(err)
}
splitRanges = append(splitRanges, *rg)
cnt = 0
}
// TODO implement update ch
rc.workerPool.ApplyOnErrorGroup(eg, func() error {
if _, ok := os.LookupEnv("br_skip_split"); !ok {
err = rc.restorer.SplitRanges(eCtx, ranges, nil)

if len(splitRanges) > 0 {
rc.workerPool.ApplyOnErrorGroup(eg, func() error {
defer onProgress()
// TODO unify the onProcess and updateCh
err := rc.restorer.SplitRanges(eCtx, splitRanges, nil)
if err != nil {
return errors.Trace(err)
}
}
return rc.restorer.RestoreFiles(eCtx, []sstfiles.SstFilesInfo{restoreFiles}, nil)
})
return rc.restorer.RestoreFiles(eCtx, CompactedItems.files, nil)
})
// reset for next batch
splitRanges = nil
} else {
rc.workerPool.ApplyOnErrorGroup(eg, func() error {
defer onProgress()
// restore rest files
return rc.restorer.RestoreFiles(eCtx, CompactedItems.files, nil)
})
}
}

return eg.Wait()
}

Expand All @@ -212,6 +306,14 @@ func (rc *LogClient) SetConcurrency(c uint) {
rc.workerPool = tidbutil.NewWorkerPool(c, "file")
}

func (rc *LogClient) SetConcurrencyPerStore(c uint) {
if c == 0 {
c = 128
}
log.Info("download worker pool per store", zap.Uint("size", c))
rc.concurrencyPerStore = c
}

func (rc *LogClient) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error {
var err error
rc.storage, err = storage.New(ctx, backend, opts)
Expand Down Expand Up @@ -287,17 +389,14 @@ func (rc *LogClient) InitClients(ctx context.Context, backend *backuppb.StorageB
createCallBacks = append(createCallBacks, func(importer *sstfiles.SnapFileImporter) error {
return importer.CheckMultiIngestSupport(ctx, stores)
})
// TODO make a better concurrencyPerStore
log.Info("Initializing client.", zap.Stringer("api", rc.dom.Store().GetCodec().GetAPIVersion()))
snapFileImporter, err := sstfiles.NewSnapFileImporter(
ctx, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion(), metaClient,
importCli, backend, false, false, stores, sstfiles.RewriteModeKeyspace, 128, createCallBacks, closeCallBacks)
importCli, backend, false, false, stores, sstfiles.RewriteModeKeyspace, rc.concurrencyPerStore, createCallBacks, closeCallBacks)
if err != nil {
log.Fatal("failed to init snap file importer", zap.Error(err))
}
// TODO make a better concurrency
workerPool := tidbutil.NewWorkerPool(128, "sst files")
rc.restorer = sstfiles.NewSimpleFileRestorer(snapFileImporter, metaClient, workerPool)
rc.restorer = sstfiles.NewSimpleFileRestorer(true, snapFileImporter, metaClient, rc.workerPool)
}

func (rc *LogClient) InitCheckpointMetadataForLogRestore(ctx context.Context, taskName string, gcRatio string) (string, error) {
Expand Down
Loading

0 comments on commit d97015a

Please sign in to comment.