Skip to content

Commit

Permalink
ddl: directly use BackendConfig rather than use lightning config (#55433
Browse files Browse the repository at this point in the history
)

ref #54436
  • Loading branch information
lance6716 authored Aug 20, 2024
1 parent 8f75ed3 commit 2621a4c
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 112 deletions.
1 change: 0 additions & 1 deletion pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ go_library(
"//pkg/lightning/checkpoints",
"//pkg/lightning/common",
"//pkg/lightning/config",
"//pkg/lightning/errormanager",
"//pkg/lightning/log",
"//pkg/meta",
"//pkg/parser/model",
Expand Down
6 changes: 2 additions & 4 deletions pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/common"
lightning "github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/errormanager"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
Expand Down Expand Up @@ -95,7 +94,7 @@ type litBackendCtx struct {
tbl table.Table
backend *local.Backend
ctx context.Context
cfg *lightning.Config
cfg *local.BackendConfig
sysVars map[string]string

flushing atomic.Bool
Expand Down Expand Up @@ -148,8 +147,7 @@ func (bc *litBackendCtx) CollectRemoteDuplicateRows(indexID int64, tbl table.Tab
}

func (bc *litBackendCtx) collectRemoteDuplicateRows(indexID int64, tbl table.Table) error {
errorMgr := errormanager.New(nil, bc.cfg, log.Logger{Logger: logutil.Logger(bc.ctx)})
dupeController := bc.backend.GetDupeController(bc.cfg.TikvImporter.RangeConcurrency*2, errorMgr)
dupeController := bc.backend.GetDupeController(bc.cfg.WorkerConcurrency, nil)
hasDupe, err := dupeController.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &encode.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
SysVars: bc.sysVars,
Expand Down
30 changes: 17 additions & 13 deletions pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ package ingest
import (
"context"
"math"
"net"
"os"
"path/filepath"
"strconv"
"sync"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
ddllogutil "github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/util/logutil"
kvutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
Expand Down Expand Up @@ -152,7 +153,7 @@ func (m *litBackendCtxMgr) Register(
return nil, err
}

bcCtx := newBackendContext(ctx, jobID, bd, cfg.lightning, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient)
bcCtx := newBackendContext(ctx, jobID, bd, cfg, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient)
m.backends.m[jobID] = bcCtx
m.memRoot.Consume(structSizeBackendCtx)
m.backends.mu.Unlock()
Expand All @@ -171,23 +172,26 @@ func (m *litBackendCtxMgr) EncodeJobSortPath(jobID int64) string {

func createLocalBackend(
ctx context.Context,
cfg *litConfig,
cfg *local.BackendConfig,
pdSvcDiscovery pd.ServiceDiscovery,
) (*local.Backend, error) {
tls, err := cfg.lightning.ToTLS()
tidbCfg := config.GetGlobalConfig()
tls, err := common.NewTLS(
tidbCfg.Security.ClusterSSLCA,
tidbCfg.Security.ClusterSSLCert,
tidbCfg.Security.ClusterSSLKey,
net.JoinHostPort("127.0.0.1", strconv.Itoa(int(tidbCfg.Status.StatusPort))),
nil, nil, nil,
)
if err != nil {
logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Error(err))
return nil, err
}

ddllogutil.DDLIngestLogger().Info("create local backend for adding index",
zap.String("sortDir", cfg.lightning.TikvImporter.SortedKVDir),
zap.String("keyspaceName", cfg.keyspaceName))
// We disable the switch TiKV mode feature for now,
// because the impact is not fully tested.
var raftKV2SwitchModeDuration time.Duration
backendConfig := local.NewBackendConfig(cfg.lightning, int(litRLimit), cfg.keyspaceName, cfg.resourceGroup, kvutil.ExplicitTypeDDL, raftKV2SwitchModeDuration)
return local.NewBackend(ctx, tls, backendConfig, pdSvcDiscovery)
zap.String("sortDir", cfg.LocalStoreDir),
zap.String("keyspaceName", cfg.KeyspaceName))
return local.NewBackend(ctx, tls, *cfg, pdSvcDiscovery)
}

const checkpointUpdateInterval = 10 * time.Minute
Expand All @@ -196,7 +200,7 @@ func newBackendContext(
ctx context.Context,
jobID int64,
be *local.Backend,
cfg *config.Config,
cfg *local.BackendConfig,
vars map[string]string,
memRoot MemRoot,
diskRoot DiskRoot,
Expand Down
107 changes: 50 additions & 57 deletions pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,78 +17,71 @@ package ingest
import (
"context"
"net"
"runtime"
"strconv"
"sync/atomic"

tidb "github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/lightning/common"
lightning "github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

// ImporterRangeConcurrencyForTest is only used for test.
var ImporterRangeConcurrencyForTest *atomic.Int32

// litConfig is the configuration for the lightning local backend used in DDL.
type litConfig struct {
lightning *lightning.Config
keyspaceName string
isRaftKV2 bool
resourceGroup string
}

func genConfig(
ctx context.Context,
jobSortPath string,
memRoot MemRoot,
unique bool,
resourceGroup string,
concurrency int,
) (*litConfig, error) {
tidbCfg := tidb.GetGlobalConfig()
cfg := lightning.NewConfig()
cfg.TikvImporter.Backend = lightning.BackendLocal
) (*local.BackendConfig, error) {
cfg := &local.BackendConfig{
LocalStoreDir: jobSortPath,
ResourceGroupName: resourceGroup,
MaxConnPerStore: concurrency,
WorkerConcurrency: concurrency * 2,
KeyspaceName: tidb.GetGlobalKeyspaceName(),
// We disable the switch TiKV mode feature for now, because the impact is not
// fully tested.
ShouldCheckWriteStall: true,

// lighting default values
CheckpointEnabled: true,
BlockSize: lightning.DefaultBlockSize,
KVWriteBatchSize: lightning.KVWriteBatchSize,
RegionSplitBatchSize: lightning.DefaultRegionSplitBatchSize,
RegionSplitConcurrency: runtime.GOMAXPROCS(0),
MemTableSize: lightning.DefaultEngineMemCacheSize,
LocalWriterMemCacheSize: lightning.DefaultLocalWriterMemCacheSize,
ShouldCheckTiKV: true,
MaxOpenFiles: int(litRLimit),
PausePDSchedulerScope: lightning.PausePDSchedulerScopeTable,
TaskType: kvutil.ExplicitTypeDDL,
DisableAutomaticCompactions: true,
}
// Each backend will build a single dir in lightning dir.
cfg.TikvImporter.SortedKVDir = jobSortPath
cfg.TikvImporter.RangeConcurrency = concurrency
if ImporterRangeConcurrencyForTest != nil {
cfg.TikvImporter.RangeConcurrency = int(ImporterRangeConcurrencyForTest.Load())
}
err := cfg.AdjustForDDL()
if err != nil {
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Error(err))
return nil, err
cfg.WorkerConcurrency = int(ImporterRangeConcurrencyForTest.Load()) * 2
}
adjustImportMemory(ctx, memRoot, cfg)
cfg.Checkpoint.Enable = true
if unique {
cfg.Conflict.Strategy = lightning.ErrorOnDup
cfg.Conflict.Threshold = lightning.DefaultRecordDuplicateThreshold
cfg.DupeDetectEnabled = true
cfg.DuplicateDetectOpt = common.DupDetectOpt{ReportErrOnDup: true}
} else {
cfg.Conflict.Strategy = lightning.NoneOnDup
}
cfg.TiDB.Host = "127.0.0.1"
cfg.TiDB.StatusPort = int(tidbCfg.Status.StatusPort)
// Set TLS related information
cfg.Security.CAPath = tidbCfg.Security.ClusterSSLCA
cfg.Security.CertPath = tidbCfg.Security.ClusterSSLCert
cfg.Security.KeyPath = tidbCfg.Security.ClusterSSLKey
// in DDL scenario, we don't switch import mode
cfg.Cron.SwitchMode = lightning.Duration{Duration: 0}

c := &litConfig{
lightning: cfg,
keyspaceName: tidb.GetGlobalKeyspaceName(),
isRaftKV2: false,
resourceGroup: resourceGroup,
cfg.DupeDetectEnabled = false
}

return c, nil
return cfg, nil
}

// CopReadBatchSize is the batch size of coprocessor read.
Expand Down Expand Up @@ -144,19 +137,19 @@ func generateLocalEngineConfig(ts uint64) *backend.EngineConfig {
}

// adjustImportMemory adjusts the lightning memory parameters according to the memory root's max limitation.
func adjustImportMemory(ctx context.Context, memRoot MemRoot, cfg *lightning.Config) {
func adjustImportMemory(ctx context.Context, memRoot MemRoot, cfg *local.BackendConfig) {
var scale int64
// Try aggressive resource usage successful.
if tryAggressiveMemory(ctx, memRoot, cfg) {
return
}

defaultMemSize := int64(cfg.TikvImporter.LocalWriterMemCacheSize) * int64(cfg.TikvImporter.RangeConcurrency)
defaultMemSize += 4 * int64(cfg.TikvImporter.EngineMemCacheSize)
defaultMemSize := int64(int(cfg.LocalWriterMemCacheSize) * cfg.WorkerConcurrency / 2)
defaultMemSize += 4 * int64(cfg.MemTableSize)
logutil.Logger(ctx).Info(LitInfoInitMemSetting,
zap.Int64("local writer memory cache size", int64(cfg.TikvImporter.LocalWriterMemCacheSize)),
zap.Int64("engine memory cache size", int64(cfg.TikvImporter.EngineMemCacheSize)),
zap.Int("range concurrency", cfg.TikvImporter.RangeConcurrency))
zap.Int64("local writer memory cache size", cfg.LocalWriterMemCacheSize),
zap.Int("engine memory cache size", cfg.MemTableSize),
zap.Int("worker concurrency", cfg.WorkerConcurrency))

maxLimit := memRoot.MaxMemoryQuota()
scale = defaultMemSize / maxLimit
Expand All @@ -165,28 +158,28 @@ func adjustImportMemory(ctx context.Context, memRoot MemRoot, cfg *lightning.Con
return
}

cfg.TikvImporter.LocalWriterMemCacheSize /= lightning.ByteSize(scale)
cfg.TikvImporter.EngineMemCacheSize /= lightning.ByteSize(scale)
cfg.LocalWriterMemCacheSize /= scale
cfg.MemTableSize /= int(scale)

logutil.Logger(ctx).Info(LitInfoChgMemSetting,
zap.Int64("local writer memory cache size", int64(cfg.TikvImporter.LocalWriterMemCacheSize)),
zap.Int64("engine memory cache size", int64(cfg.TikvImporter.EngineMemCacheSize)),
zap.Int("range concurrency", cfg.TikvImporter.RangeConcurrency))
zap.Int64("local writer memory cache size", cfg.LocalWriterMemCacheSize),
zap.Int("engine memory cache size", cfg.MemTableSize),
zap.Int("worker concurrency", cfg.WorkerConcurrency))
}

// tryAggressiveMemory lightning memory parameters according memory root's max limitation.
func tryAggressiveMemory(ctx context.Context, memRoot MemRoot, cfg *lightning.Config) bool {
func tryAggressiveMemory(ctx context.Context, memRoot MemRoot, cfg *local.BackendConfig) bool {
var defaultMemSize int64
defaultMemSize = int64(int(cfg.TikvImporter.LocalWriterMemCacheSize) * cfg.TikvImporter.RangeConcurrency)
defaultMemSize += int64(cfg.TikvImporter.EngineMemCacheSize)
defaultMemSize = int64(int(cfg.LocalWriterMemCacheSize) * cfg.WorkerConcurrency / 2)
defaultMemSize += int64(cfg.MemTableSize)

if (defaultMemSize + memRoot.CurrentUsage()) > memRoot.MaxMemoryQuota() {
return false
}
logutil.Logger(ctx).Info(LitInfoChgMemSetting,
zap.Int64("local writer memory cache size", int64(cfg.TikvImporter.LocalWriterMemCacheSize)),
zap.Int64("engine memory cache size", int64(cfg.TikvImporter.EngineMemCacheSize)),
zap.Int("range concurrency", cfg.TikvImporter.RangeConcurrency))
zap.Int64("local writer memory cache size", cfg.LocalWriterMemCacheSize),
zap.Int("engine memory cache size", cfg.MemTableSize),
zap.Int("worker concurrency", cfg.WorkerConcurrency))
return true
}

Expand Down
4 changes: 0 additions & 4 deletions pkg/ddl/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -55,7 +54,6 @@ type engineInfo struct {

uuid uuid.UUID
cfg *backend.EngineConfig
litCfg *config.Config
writerCache generic.SyncMap[int, backend.EngineWriter]
memRoot MemRoot
flushLock *sync.RWMutex
Expand All @@ -67,7 +65,6 @@ func newEngineInfo(
jobID, indexID int64,
unique bool,
cfg *backend.EngineConfig,
litCfg *config.Config,
en *backend.OpenedEngine,
uuid uuid.UUID,
memRoot MemRoot,
Expand All @@ -78,7 +75,6 @@ func newEngineInfo(
indexID: indexID,
unique: unique,
cfg: cfg,
litCfg: litCfg,
openedEngine: en,
uuid: uuid,
writerCache: generic.NewSyncMap[int, backend.EngineWriter](4),
Expand Down
1 change: 0 additions & 1 deletion pkg/ddl/ingest/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func (bc *litBackendCtx) Register(indexIDs []int64, uniques []bool, tbl table.Ta
indexID,
uniques[i],
cfg,
bc.cfg,
openedEngine,
openedEngine.GetEngineUUID(),
bc.memRoot,
Expand Down
Loading

0 comments on commit 2621a4c

Please sign in to comment.