From d82cc9a38a67b89047ab5c4f1c2035f5a1382fe0 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 11 Jan 2024 17:25:58 +0800 Subject: [PATCH 1/9] change --- br/pkg/lightning/backend/local/engine_mgr.go | 34 ++ br/pkg/lightning/backend/local/local.go | 54 ++-- pkg/executor/import_into.go | 102 +++++- pkg/executor/importer/chunk_process.go | 300 +++++++++++++----- .../importer/chunk_process_testkit_test.go | 2 +- pkg/executor/importer/engine_process.go | 39 ++- pkg/executor/importer/import.go | 70 +++- pkg/executor/importer/import_test.go | 29 +- .../importer/importer_testkit_test.go | 12 +- pkg/executor/importer/main_test.go | 36 +++ pkg/executor/importer/precheck.go | 6 +- pkg/executor/importer/precheck_test.go | 3 +- pkg/executor/importer/table_import.go | 194 ++++++++++- .../importer/table_import_testkit_test.go | 127 ++++++++ 14 files changed, 870 insertions(+), 138 deletions(-) create mode 100644 pkg/executor/importer/main_test.go create mode 100644 pkg/executor/importer/table_import_testkit_test.go diff --git a/br/pkg/lightning/backend/local/engine_mgr.go b/br/pkg/lightning/backend/local/engine_mgr.go index 260b3d407d805..dfeb4f4726f76 100644 --- a/br/pkg/lightning/backend/local/engine_mgr.go +++ b/br/pkg/lightning/backend/local/engine_mgr.go @@ -74,6 +74,10 @@ func newEngineManager(config BackendConfig, storeHelper StoreHelper, logger log. } }() + if err = prepareSortDir(config); err != nil { + return nil, err + } + keyAdapter := common.KeyAdapter(common.NoopKeyAdapter{}) if config.DupeDetectEnabled { duplicateDB, err = openDuplicateDB(config.LocalStoreDir) @@ -525,6 +529,15 @@ func (em *engineManager) close() { } em.duplicateDB = nil } + + // if checkpoint is disabled, or we finish load all data successfully, then files in this + // dir will be useless, so we clean up this dir and all files in it. + if !em.CheckpointEnabled || common.IsEmptyDir(em.LocalStoreDir) { + err := os.RemoveAll(em.LocalStoreDir) + if err != nil { + em.logger.Warn("remove local db file failed", zap.Error(err)) + } + } } func (em *engineManager) getExternalEngine(uuid uuid.UUID) (common.Engine, bool) { @@ -566,3 +579,24 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) { } return pebble.Open(dbPath, opts) } + +func prepareSortDir(config BackendConfig) error { + shouldCreate := true + if config.CheckpointEnabled { + if info, err := os.Stat(config.LocalStoreDir); err != nil { + if !os.IsNotExist(err) { + return err + } + } else if info.IsDir() { + shouldCreate = false + } + } + + if shouldCreate { + err := os.Mkdir(config.LocalStoreDir, 0o700) + if err != nil { + return common.ErrInvalidSortedKVDir.Wrap(err).GenWithStackByArgs(config.LocalStoreDir) + } + } + return nil +} diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 79f3b4e1a96d4..5f6081fe9a64e 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -22,7 +22,6 @@ import ( "io" "math" "net" - "os" "path/filepath" "strings" "sync" @@ -517,24 +516,6 @@ func NewBackend( return nil, common.NormalizeOrWrapErr(common.ErrCreatePDClient, err) } - shouldCreate := true - if config.CheckpointEnabled { - if info, err := os.Stat(config.LocalStoreDir); err != nil { - if !os.IsNotExist(err) { - return nil, err - } - } else if info.IsDir() { - shouldCreate = false - } - } - - if shouldCreate { - err = os.Mkdir(config.LocalStoreDir, 0o700) - if err != nil { - return nil, common.ErrInvalidSortedKVDir.Wrap(err).GenWithStackByArgs(config.LocalStoreDir) - } - } - // The following copies tikv.NewTxnClient without creating yet another pdClient. spkv, err := tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig()) if err != nil { @@ -597,6 +578,27 @@ func NewBackend( return local, nil } +// NewBackendForTest creates a new Backend for test. +func NewBackendForTest(ctx context.Context, config BackendConfig, storeHelper StoreHelper) (*Backend, error) { + config.adjust() + + logger := log.FromContext(ctx) + engineMgr, err := newEngineManager(config, storeHelper, logger) + if err != nil { + return nil, err + } + local := &Backend{ + BackendConfig: config, + logger: logger, + engineMgr: engineMgr, + } + if m, ok := metric.GetCommonMetric(ctx); ok { + local.metrics = m + } + + return local, nil +} + // TotalMemoryConsume returns the total memory usage of the local backend. func (local *Backend) TotalMemoryConsume() int64 { return local.engineMgr.totalMemoryConsume() @@ -672,14 +674,6 @@ func (local *Backend) Close() { local.engineMgr.close() local.importClientFactory.Close() - // if checkpoint is disabled, or we finish load all data successfully, then files in this - // dir will be useless, so we clean up this dir and all files in it. - if !local.CheckpointEnabled || common.IsEmptyDir(local.LocalStoreDir) { - err := os.RemoveAll(local.LocalStoreDir) - if err != nil { - local.logger.Warn("remove local db file failed", zap.Error(err)) - } - } _ = local.tikvCli.Close() local.pdHTTPCli.Close() local.pdCli.Close() @@ -1592,6 +1586,12 @@ func (local *Backend) GetTiKVCodec() tikvclient.Codec { return local.tikvCodec } +// CloseEngineMgr close the engine manager. +// This function is used for test. +func (local *Backend) CloseEngineMgr() { + local.engineMgr.close() +} + var getSplitConfFromStoreFunc = getSplitConfFromStore // return region split size, region split keys, error diff --git a/pkg/executor/import_into.go b/pkg/executor/import_into.go index 013ccf79e0109..71f74a7c1f743 100644 --- a/pkg/executor/import_into.go +++ b/pkg/executor/import_into.go @@ -16,8 +16,10 @@ package executor import ( "context" + "fmt" "sync/atomic" + "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/log" @@ -105,6 +107,10 @@ func (e *ImportIntoExec) Next(ctx context.Context, req *chunk.Chunk) (err error) e.importPlan = importPlan e.controller = controller + if e.selectExec != nil { + return e.importFromSelect(ctx) + } + if err2 := e.controller.InitDataFiles(ctx); err2 != nil { return err2 } @@ -237,12 +243,104 @@ func (e *ImportIntoExec) doImport(ctx context.Context, se sessionctx.Context, di // use background, since ctx is canceled already. return cancelAndWaitImportJob(context.Background(), taskManager, distImporter.JobID()) } - if err2 := flushStats(ctx, se, e.importPlan.TableInfo.ID, distImporter.Result(ctx)); err2 != nil { + importResult := distImporter.Result(ctx) + if err2 := flushStats(ctx, se, e.importPlan.TableInfo.ID, &importResult); err2 != nil { logutil.Logger(ctx).Error("flush stats failed", zap.Error(err2)) } return err } +func (e *ImportIntoExec) importFromSelect(ctx context.Context) error { + e.dataFilled = true + // must use a new session to pre-check, else the stmt in show processlist will be changed. + newSCtx, err2 := CreateSession(e.userSctx) + if err2 != nil { + return err2 + } + defer CloseSession(newSCtx) + + sqlExec := newSCtx.(sqlexec.SQLExecutor) + if err2 = e.controller.CheckRequirements(ctx, sqlExec); err2 != nil { + return err2 + } + if err := e.importPlan.InitTiKVConfigs(ctx, newSCtx); err != nil { + return err + } + + group, groupCtx := errgroup.WithContext(ctx) + param := &importer.JobImportParam{ + Job: &importer.Job{}, + Group: group, + GroupCtx: groupCtx, + Done: make(chan struct{}), + Progress: importer.NewProgress(), + } + importID := uuid.New().String() + logutil.Logger(ctx).Info("importing data from select statement", + zap.String("importID", importID), zap.Int("concurrency", e.controller.ThreadCnt)) + ti, err2 := importer.NewTableImporter(param, e.controller, importID) + if err2 != nil { + return err2 + } + defer func() { + if err := ti.Close(); err != nil { + logutil.Logger(ctx).Error("close importer failed", zap.Error(err)) + } + }() + selectedRowCh := make(chan importer.QueryRow) + ti.SetSelectedRowCh(selectedRowCh) + + var importResult *importer.JobImportResult + eg, egCtx := errgroup.WithContext(ctx) + eg.Go(func() error { + var err error + importResult, err = ti.ImportSelectedRows(egCtx, newSCtx) + return err + }) + eg.Go(func() error { + defer close(selectedRowCh) + fields := exec.RetTypes(e.selectExec) + var idAllocator int64 + for { + // rows will be consumed concurrently, we cannot use chunk pool in session ctx. + chk := exec.NewFirstChunk(e.selectExec) + iter := chunk.NewIterator4Chunk(chk) + err := exec.Next(egCtx, e.selectExec, chk) + if err != nil { + return err + } + if chk.NumRows() == 0 { + break + } + for innerChunkRow := iter.Begin(); innerChunkRow != iter.End(); innerChunkRow = iter.Next() { + idAllocator++ + select { + case selectedRowCh <- importer.QueryRow{ + ID: idAllocator, + Data: innerChunkRow.GetDatumRow(fields), + }: + case <-egCtx.Done(): + return egCtx.Err() + } + } + } + return nil + }) + if err := eg.Wait(); err != nil { + return err + } + + if err2 = flushStats(ctx, e.userSctx, e.importPlan.TableInfo.ID, importResult); err2 != nil { + logutil.Logger(ctx).Error("flush stats failed", zap.Error(err2)) + } + + stmtCtx := e.userSctx.GetSessionVars().StmtCtx + stmtCtx.SetAffectedRows(importResult.Affected) + // TODO: change it after spec is ready. + stmtCtx.SetMessage(fmt.Sprintf("Records: %d, ID: %s", importResult.Affected, importID)) + return nil +} + // ImportIntoActionExec represents a import into action executor. type ImportIntoActionExec struct { exec.BaseExecutor @@ -297,7 +395,7 @@ func (e *ImportIntoActionExec) checkPrivilegeAndStatus(ctx context.Context, mana } // flushStats flushes the stats of the table. -func flushStats(ctx context.Context, se sessionctx.Context, tableID int64, result importer.JobImportResult) error { +func flushStats(ctx context.Context, se sessionctx.Context, tableID int64, result *importer.JobImportResult) error { if err := sessiontxn.NewTxn(ctx, se); err != nil { return err } diff --git a/pkg/executor/importer/chunk_process.go b/pkg/executor/importer/chunk_process.go index aa559475a4462..3cef5c4a62281 100644 --- a/pkg/executor/importer/chunk_process.go +++ b/pkg/executor/importer/chunk_process.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/mydump" verify "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/pkg/tablecodec" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/syncutil" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/tikv" @@ -100,8 +101,14 @@ func (b *deliverKVBatch) add(kvs *kv.Pairs) { } } -// chunkEncoder encode data chunk(either a data file or part of a file). -type chunkEncoder struct { +type chunkEncoder interface { + init() error + encodeLoop(ctx context.Context) error + logFields() []zap.Field +} + +// fileChunkEncoder encode data chunk(either a data file or part of a file). +type fileChunkEncoder struct { parser mydump.Parser chunkInfo *checkpoints.ChunkCheckpoint logger *zap.Logger @@ -116,10 +123,11 @@ type chunkEncoder struct { // total duration takes by read/encode/deliver. readTotalDur time.Duration encodeTotalDur time.Duration - metrics *metric.Common } -func (p *chunkEncoder) initProgress() error { +var _ chunkEncoder = (*fileChunkEncoder)(nil) + +func (p *fileChunkEncoder) init() error { // we might skip N rows or start from checkpoint offset, err := p.parser.ScannedPos() if err != nil { @@ -129,16 +137,17 @@ func (p *chunkEncoder) initProgress() error { return nil } -func (p *chunkEncoder) encodeLoop(ctx context.Context) error { +func (p *fileChunkEncoder) encodeLoop(ctx context.Context) error { var err error reachEOF := false prevOffset, currOffset := p.startOffset, p.startOffset var encodedBytesCounter, encodedRowsCounter prometheus.Counter - if p.metrics != nil { - encodedBytesCounter = p.metrics.BytesCounter.WithLabelValues(metric.StateRestored) + metrics, _ := metric.GetCommonMetric(ctx) + if metrics != nil { + encodedBytesCounter = metrics.BytesCounter.WithLabelValues(metric.StateRestored) // table name doesn't matter here, all those metrics will have task-id label. - encodedRowsCounter = p.metrics.RowsCounter.WithLabelValues(metric.StateRestored, "") + encodedRowsCounter = metrics.RowsCounter.WithLabelValues(metric.StateRestored, "") } for !reachEOF { @@ -199,9 +208,9 @@ func (p *chunkEncoder) encodeLoop(ctx context.Context) error { p.encodeTotalDur += encodeDur p.readTotalDur += readDur - if p.metrics != nil { - p.metrics.RowEncodeSecondsHistogram.Observe(encodeDur.Seconds()) - p.metrics.RowReadSecondsHistogram.Observe(readDur.Seconds()) + if metrics != nil { + metrics.RowEncodeSecondsHistogram.Observe(encodeDur.Seconds()) + metrics.RowReadSecondsHistogram.Observe(readDur.Seconds()) // if we're using split_file, this metric might larger than total // source file size, as the offset we're using is the reader offset, // not parser offset, and we'll buffer data. @@ -219,6 +228,13 @@ func (p *chunkEncoder) encodeLoop(ctx context.Context) error { return nil } +func (p *fileChunkEncoder) logFields() []zap.Field { + return []zap.Field{ + zap.Duration("readDur", p.readTotalDur), + zap.Duration("encodeDur", p.encodeTotalDur), + } +} + // ChunkProcessor is used to process a chunk of data, include encode data to KV // and deliver KV to local or global storage. type ChunkProcessor interface { @@ -226,70 +242,44 @@ type ChunkProcessor interface { } type baseChunkProcessor struct { - enc *chunkEncoder - logger *zap.Logger - kvCodec tikv.Codec - deliverLoop func(ctx context.Context) error - encodeDone func(ctx context.Context) - - // initialized when Process - metrics *metric.Common - - checksum verify.KVChecksum - deliverTotalDur time.Duration + sourceType dataSourceType + enc chunkEncoder + deliver *dataDeliver + logger *zap.Logger + chunkInfo *checkpoints.ChunkCheckpoint } func (p *baseChunkProcessor) Process(ctx context.Context) (err error) { task := log.BeginTask(p.logger, "process chunk") defer func() { - task.End(zap.ErrorLevel, err, - zap.Duration("readDur", p.enc.readTotalDur), - zap.Duration("encodeDur", p.enc.encodeTotalDur), - zap.Duration("deliverDur", p.deliverTotalDur), - zap.Object("checksum", &p.checksum), - ) - if err == nil && p.metrics != nil { - p.metrics.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Inc() + logFields := append(p.enc.logFields(), p.deliver.logFields()...) + logFields = append(logFields, zap.Stringer("type", p.sourceType)) + task.End(zap.ErrorLevel, err, logFields...) + if metrics, ok := metric.GetCommonMetric(ctx); ok && err == nil { + metrics.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Inc() } }() - if err2 := p.enc.initProgress(); err2 != nil { + if err2 := p.enc.init(); err2 != nil { return err2 } - if metrics, ok := metric.GetCommonMetric(ctx); ok { - p.enc.metrics = metrics - p.metrics = p.enc.metrics - } - group, gCtx := errgroup.WithContext(ctx) group.Go(func() error { - return p.deliverLoop(gCtx) + return p.deliver.deliverLoop(gCtx) }) group.Go(func() error { - defer p.encodeDone(gCtx) + defer p.deliver.encodeDone() return p.enc.encodeLoop(gCtx) }) err2 := group.Wait() - p.enc.chunkInfo.Checksum.Add(&p.checksum) + p.chunkInfo.Checksum.Add(&p.deliver.checksum) return err2 } -// localSortChunkProcessor encode and sort kv, then write to local storage. -// each chunk processor will have a pair of encode and deliver routine. -type localSortChunkProcessor struct { - *baseChunkProcessor - kvsCh chan []deliveredRow - diskQuotaLock *syncutil.RWMutex - dataWriter backend.EngineWriter - indexWriter backend.EngineWriter -} - -var _ ChunkProcessor = &localSortChunkProcessor{} - -// NewLocalSortChunkProcessor creates a new local sort chunk processor. +// NewFileChunkProcessor creates a new local sort chunk processor. // exported for test. -func NewLocalSortChunkProcessor( +func NewFileChunkProcessor( parser mydump.Parser, encoder KVEncoder, kvCodec tikv.Codec, @@ -300,34 +290,47 @@ func NewLocalSortChunkProcessor( indexWriter backend.EngineWriter, ) ChunkProcessor { chunkLogger := logger.With(zap.String("key", chunk.GetKey())) - cp := &localSortChunkProcessor{ + deliver := &dataDeliver{ + logger: chunkLogger, + kvCodec: kvCodec, diskQuotaLock: diskQuotaLock, kvsCh: make(chan []deliveredRow, maxKVQueueSize), dataWriter: dataWriter, indexWriter: indexWriter, } - cp.baseChunkProcessor = &baseChunkProcessor{ - enc: &chunkEncoder{ + return &baseChunkProcessor{ + sourceType: DataSourceTypeFile, + deliver: deliver, + enc: &fileChunkEncoder{ parser: parser, chunkInfo: chunk, logger: chunkLogger, encoder: encoder, kvCodec: kvCodec, - sendFn: cp.sendEncodedData, + sendFn: deliver.sendEncodedData, }, - logger: chunkLogger, - kvCodec: kvCodec, - deliverLoop: cp.deliverLoop, - encodeDone: cp.encodeDone, + logger: chunkLogger, + chunkInfo: chunk, } - return cp } -func (p *localSortChunkProcessor) encodeDone(context.Context) { +type dataDeliver struct { + logger *zap.Logger + kvCodec tikv.Codec + kvsCh chan []deliveredRow + diskQuotaLock *syncutil.RWMutex + dataWriter backend.EngineWriter + indexWriter backend.EngineWriter + + checksum verify.KVChecksum + deliverTotalDur time.Duration +} + +func (p *dataDeliver) encodeDone() { close(p.kvsCh) } -func (p *localSortChunkProcessor) sendEncodedData(ctx context.Context, kvs []deliveredRow) error { +func (p *dataDeliver) sendEncodedData(ctx context.Context, kvs []deliveredRow) error { select { case p.kvsCh <- kvs: return nil @@ -336,7 +339,7 @@ func (p *localSortChunkProcessor) sendEncodedData(ctx context.Context, kvs []del } } -func (p *localSortChunkProcessor) deliverLoop(ctx context.Context) error { +func (p *dataDeliver) deliverLoop(ctx context.Context) error { kvBatch := newDeliverKVBatch(p.kvCodec) var ( @@ -344,12 +347,14 @@ func (p *localSortChunkProcessor) deliverLoop(ctx context.Context) error { dataKVPairsHist, indexKVPairsHist prometheus.Observer deliverBytesCounter prometheus.Counter ) - if p.metrics != nil { - dataKVBytesHist = p.metrics.BlockDeliverBytesHistogram.WithLabelValues(metric.BlockDeliverKindData) - indexKVBytesHist = p.metrics.BlockDeliverBytesHistogram.WithLabelValues(metric.BlockDeliverKindIndex) - dataKVPairsHist = p.metrics.BlockDeliverKVPairsHistogram.WithLabelValues(metric.BlockDeliverKindData) - indexKVPairsHist = p.metrics.BlockDeliverKVPairsHistogram.WithLabelValues(metric.BlockDeliverKindIndex) - deliverBytesCounter = p.metrics.BytesCounter.WithLabelValues(metric.StateRestoreWritten) + + metrics, _ := metric.GetCommonMetric(ctx) + if metrics != nil { + dataKVBytesHist = metrics.BlockDeliverBytesHistogram.WithLabelValues(metric.BlockDeliverKindData) + indexKVBytesHist = metrics.BlockDeliverBytesHistogram.WithLabelValues(metric.BlockDeliverKindIndex) + dataKVPairsHist = metrics.BlockDeliverKVPairsHistogram.WithLabelValues(metric.BlockDeliverKindData) + indexKVPairsHist = metrics.BlockDeliverKVPairsHistogram.WithLabelValues(metric.BlockDeliverKindIndex) + deliverBytesCounter = metrics.BytesCounter.WithLabelValues(metric.StateRestoreWritten) } for { @@ -392,8 +397,8 @@ func (p *localSortChunkProcessor) deliverLoop(ctx context.Context) error { deliverDur := time.Since(start) p.deliverTotalDur += deliverDur - if p.metrics != nil { - p.metrics.BlockDeliverSecondsHistogram.Observe(deliverDur.Seconds()) + if metrics != nil { + metrics.BlockDeliverSecondsHistogram.Observe(deliverDur.Seconds()) dataKVBytesHist.Observe(float64(kvBatch.dataChecksum.SumSize())) indexKVBytesHist.Observe(float64(kvBatch.indexChecksum.SumSize())) dataKVPairsHist.Observe(float64(kvBatch.dataChecksum.SumKVS())) @@ -405,7 +410,7 @@ func (p *localSortChunkProcessor) deliverLoop(ctx context.Context) error { return err } - if p.metrics != nil { + if metrics != nil { deliverBytesCounter.Add(float64(kvBatch.size())) } @@ -418,6 +423,153 @@ func (p *localSortChunkProcessor) deliverLoop(ctx context.Context) error { return nil } +func (p *dataDeliver) logFields() []zap.Field { + return []zap.Field{ + zap.Duration("deliverDur", p.deliverTotalDur), + zap.Object("checksum", &p.checksum), + } +} + +// fileChunkEncoder encode data chunk(either a data file or part of a file). +type queryChunkEncoder struct { + rowCh chan QueryRow + chunkInfo *checkpoints.ChunkCheckpoint + logger *zap.Logger + encoder KVEncoder + sendFn func(ctx context.Context, kvs []deliveredRow) error + + // total duration takes by read/encode/deliver. + readTotalDur time.Duration + encodeTotalDur time.Duration +} + +var _ chunkEncoder = (*queryChunkEncoder)(nil) + +func (e *queryChunkEncoder) init() error { + return nil +} + +func (e *queryChunkEncoder) encodeLoop(ctx context.Context) error { + var err error + reachEOF := false + var encodedRowsCounter prometheus.Counter + metrics, _ := metric.GetCommonMetric(ctx) + if metrics != nil { + // table name doesn't matter here, all those metrics will have task-id label. + encodedRowsCounter = metrics.RowsCounter.WithLabelValues(metric.StateRestored, "") + } + + for !reachEOF { + var readDur, encodeDur time.Duration + canDeliver := false + rowBatch := make([]deliveredRow, 0, MinDeliverRowCnt) + var rowCount, kvSize uint64 + outLoop: + for !canDeliver { + readDurStart := time.Now() + var ( + lastRow QueryRow + rowID int64 + ok bool + ) + select { + case lastRow, ok = <-e.rowCh: + if !ok { + reachEOF = true + break outLoop + } + case <-ctx.Done(): + return ctx.Err() + } + readDur += time.Since(readDurStart) + encodeDurStart := time.Now() + // sql -> kv + kvs, encodeErr := e.encoder.Encode(lastRow.Data, lastRow.ID) + encodeDur += time.Since(encodeDurStart) + + if encodeErr != nil { + err = common.ErrEncodeKV.Wrap(encodeErr).GenWithStackByArgs(e.chunkInfo.GetKey(), rowID) + } + if err != nil { + return err + } + + rowBatch = append(rowBatch, deliveredRow{kvs: kvs}) + kvSize += kvs.Size() + rowCount++ + // pebble cannot allow > 4.0G kv in one batch. + // we will meet pebble panic when import sql file and each kv has the size larger than 4G / maxKvPairsCnt. + // so add this check. + if kvSize >= MinDeliverBytes || len(rowBatch) >= MinDeliverRowCnt { + canDeliver = true + } + } + + e.encodeTotalDur += encodeDur + e.readTotalDur += readDur + if metrics != nil { + metrics.RowEncodeSecondsHistogram.Observe(encodeDur.Seconds()) + metrics.RowReadSecondsHistogram.Observe(readDur.Seconds()) + encodedRowsCounter.Add(float64(rowCount)) + } + + if len(rowBatch) > 0 { + if err = e.sendFn(ctx, rowBatch); err != nil { + return err + } + } + } + + return nil +} + +func (e *queryChunkEncoder) logFields() []zap.Field { + return []zap.Field{ + zap.Duration("readDur", e.readTotalDur), + zap.Duration("encodeDur", e.encodeTotalDur), + } +} + +// QueryRow is a row from query result. +type QueryRow struct { + ID int64 + Data []types.Datum +} + +func newQueryChunkProcessor( + rowCh chan QueryRow, + encoder KVEncoder, + kvCodec tikv.Codec, + chunk *checkpoints.ChunkCheckpoint, + logger *zap.Logger, + diskQuotaLock *syncutil.RWMutex, + dataWriter backend.EngineWriter, + indexWriter backend.EngineWriter, +) ChunkProcessor { + chunkLogger := logger.With(zap.String("key", chunk.GetKey())) + deliver := &dataDeliver{ + logger: chunkLogger, + kvCodec: kvCodec, + diskQuotaLock: diskQuotaLock, + kvsCh: make(chan []deliveredRow, maxKVQueueSize), + dataWriter: dataWriter, + indexWriter: indexWriter, + } + return &baseChunkProcessor{ + sourceType: DataSourceTypeQuery, + deliver: deliver, + enc: &queryChunkEncoder{ + rowCh: rowCh, + chunkInfo: chunk, + logger: chunkLogger, + encoder: encoder, + sendFn: deliver.sendEncodedData, + }, + logger: chunkLogger, + chunkInfo: chunk, + } +} + // IndexRouteWriter is a writer for index when using global sort. // we route kvs of different index to different writer in order to make // merge sort easier, else kv data of all subtasks will all be overlapped. diff --git a/pkg/executor/importer/chunk_process_testkit_test.go b/pkg/executor/importer/chunk_process_testkit_test.go index cc3f18af27a93..4dabe52f7073a 100644 --- a/pkg/executor/importer/chunk_process_testkit_test.go +++ b/pkg/executor/importer/chunk_process_testkit_test.go @@ -111,7 +111,7 @@ func TestLocalSortChunkProcess(t *testing.T) { diskQuotaLock := &syncutil.RWMutex{} codec := tikv.NewCodecV1(tikv.ModeRaw) - processor := importer.NewLocalSortChunkProcessor( + processor := importer.NewFileChunkProcessor( csvParser, encoder, codec, chunkInfo, logger.Logger, diskQuotaLock, engineWriter, engineWriter, ) diff --git a/pkg/executor/importer/engine_process.go b/pkg/executor/importer/engine_process.go index bb87df7019339..51803023ac8fd 100644 --- a/pkg/executor/importer/engine_process.go +++ b/pkg/executor/importer/engine_process.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/mydump" "go.uber.org/zap" ) @@ -77,15 +78,21 @@ func ProcessChunkWith( progress *Progress, logger *zap.Logger, ) error { - parser, err := tableImporter.getParser(ctx, chunk) - if err != nil { - return err - } - defer func() { - if err2 := parser.Close(); err2 != nil { - logger.Warn("close parser failed", zap.Error(err2)) + var ( + err error + parser mydump.Parser + ) + if tableImporter.DataSourceType == DataSourceTypeFile { + parser, err = tableImporter.getParser(ctx, chunk) + if err != nil { + return err } - }() + defer func() { + if err2 := parser.Close(); err2 != nil { + logger.Warn("close parser failed", zap.Error(err2)) + } + }() + } encoder, err := tableImporter.getKVEncoder(chunk) if err != nil { return err @@ -98,10 +105,18 @@ func ProcessChunkWith( // TODO: right now we use this chunk processor for global sort too, will // impl another one for it later. - cp := NewLocalSortChunkProcessor( - parser, encoder, tableImporter.kvStore.GetCodec(), chunk, logger, - tableImporter.diskQuotaLock, dataWriter, indexWriter, - ) + var cp ChunkProcessor + if tableImporter.DataSourceType == DataSourceTypeQuery { + cp = newQueryChunkProcessor( + tableImporter.rowCh, encoder, tableImporter.kvStore.GetCodec(), chunk, logger, + tableImporter.diskQuotaLock, dataWriter, indexWriter, + ) + } else { + cp = NewFileChunkProcessor( + parser, encoder, tableImporter.kvStore.GetCodec(), chunk, logger, + tableImporter.diskQuotaLock, dataWriter, indexWriter, + ) + } err = cp.Process(ctx) if err != nil { return err diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index ea282de3b6b8d..30702689a57fb 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -101,6 +101,7 @@ const ( ) var ( + // all supported options. // name -> whether the option has value supportedOptions = map[string]bool{ characterSetOption: true, @@ -133,6 +134,10 @@ var ( splitFileOption: {}, } + importFromQueryOptions = map[string]struct{}{ + threadOption: {}, + } + // LoadDataReadBlockSize is exposed for test. LoadDataReadBlockSize = int64(config.ReadBlockSize) @@ -144,6 +149,20 @@ var ( } ) +type dataSourceType string + +const ( + // DataSourceTypeFile represents the data source of IMPORT INTO is file. + // exported for test. + DataSourceTypeFile dataSourceType = "file" + // DataSourceTypeQuery represents the data source of IMPORT INTO is query. + DataSourceTypeQuery dataSourceType = "query" +) + +func (t dataSourceType) String() string { + return string(t) +} + // GetKVStore returns a kv.Storage. // kv encoder of physical mode needs it. var GetKVStore func(path string, tls kvconfig.Security) (tidbkv.Storage, error) @@ -174,7 +193,8 @@ type Plan struct { // after import. DesiredTableInfo *model.TableInfo - Path string + Path string + // only effective when data source is file. Format string // Data interpretation is restrictive if the SQL mode is restrictive and neither // the IGNORE nor the LOCAL modifier is specified. Errors terminate the load @@ -213,7 +233,8 @@ type Plan struct { DistSQLScanConcurrency int // todo: remove it when load data code is reverted. - InImportInto bool + InImportInto bool + DataSourceType dataSourceType // only initialized for IMPORT INTO, used when creating job. Parameters *ImportParameters `json:"-"` // the user who executes the statement, in the form of user@host @@ -344,6 +365,7 @@ func NewPlanFromLoadDataPlan(userSctx sessionctx.Context, plan *plannercore.Load ImportantSysVars: getImportantSysVars(userSctx), DistSQLScanConcurrency: userSctx.GetSessionVars().DistSQLScanConcurrency(), + DataSourceType: DataSourceTypeFile, }, nil } @@ -385,6 +407,7 @@ func NewImportPlan(ctx context.Context, userSctx sessionctx.Context, plan *plann DistSQLScanConcurrency: userSctx.GetSessionVars().DistSQLScanConcurrency(), InImportInto: true, + DataSourceType: getDataSourceType(plan), User: userSctx.GetSessionVars().User.String(), } if err := p.initOptions(ctx, userSctx, plan.Options); err != nil { @@ -471,7 +494,7 @@ func NewLoadDataController(plan *Plan, tbl table.Table, astArgs *ASTArgs) (*Load } func (e *LoadDataController) checkFieldParams() error { - if e.Path == "" { + if e.DataSourceType == DataSourceTypeFile && e.Path == "" { return exeerrors.ErrLoadDataEmptyPath } if e.InImportInto { @@ -502,6 +525,10 @@ func (e *LoadDataController) checkFieldParams() error { func (p *Plan) initDefaultOptions(targetNodeCPUCnt int) { threadCnt := int(math.Max(1, float64(targetNodeCPUCnt)*0.5)) + if p.DataSourceType == DataSourceTypeQuery { + // TODO: change after spec is ready. + threadCnt = 1 + } p.Checksum = config.OpLevelRequired p.ThreadCnt = threadCnt @@ -518,7 +545,7 @@ func (p *Plan) initDefaultOptions(targetNodeCPUCnt int) { } func (p *Plan) initOptions(ctx context.Context, seCtx sessionctx.Context, options []*plannercore.LoadDataOpt) error { - targetNodeCPUCnt, err := GetTargetNodeCPUCnt(ctx, p.Path) + targetNodeCPUCnt, err := GetTargetNodeCPUCnt(ctx, p.DataSourceType, p.Path) if err != nil { return err } @@ -546,6 +573,13 @@ func (p *Plan) initOptions(ctx context.Context, seCtx sessionctx.Context, option } } } + if p.DataSourceType == DataSourceTypeQuery { + for k := range specifiedOptions { + if _, ok := importFromQueryOptions[k]; !ok { + return exeerrors.ErrLoadDataUnsupportedOption.FastGenByArgs(k, "import from query") + } + } + } optAsString := func(opt *plannercore.LoadDataOpt) (string, error) { if opt.Value.GetType().GetType() != mysql.TypeVarString { @@ -715,10 +749,17 @@ func (p *Plan) initOptions(ctx context.Context, seCtx sessionctx.Context, option } func (p *Plan) adjustOptions(targetNodeCPUCnt int) { + limit := targetNodeCPUCnt + if p.DataSourceType == DataSourceTypeQuery { + // for query, row is produced using 1 thread, the max cpu used is much + // lower than import from file, so we set limit to 2*targetNodeCPUCnt. + limit *= 2 + } // max value is cpu-count - if p.ThreadCnt > targetNodeCPUCnt { - log.L().Info("IMPORT INTO thread count is larger than cpu-count, set to cpu-count") - p.ThreadCnt = targetNodeCPUCnt + if p.ThreadCnt > limit { + log.L().Info("adjust IMPORT INTO thread count", + zap.Int("before", p.ThreadCnt), zap.Int("after", limit)) + p.ThreadCnt = limit } } @@ -1287,6 +1328,13 @@ func (e *LoadDataController) getLocalBackendCfg(pdAddr, dataDir string) local.Ba return backendConfig } +func getDataSourceType(p *plannercore.ImportInto) dataSourceType { + if p.SelectPlan != nil { + return DataSourceTypeQuery + } + return DataSourceTypeFile +} + // JobImportParam is the param of the job import. type JobImportParam struct { Job *Job @@ -1337,10 +1385,14 @@ func GetMsgFromBRError(err error) string { } // GetTargetNodeCPUCnt get cpu count of target node where the import into job will be executed. -// target node is current node if it's server-disk import or disttask is disabled, +// target node is current node if it's server-disk import, import from query or disttask is disabled, // else it's the node managed by disttask. // exported for testing. -func GetTargetNodeCPUCnt(ctx context.Context, path string) (int, error) { +func GetTargetNodeCPUCnt(ctx context.Context, sourceType dataSourceType, path string) (int, error) { + if sourceType == DataSourceTypeQuery { + return cpu.GetCPUCount(), nil + } + u, err2 := storage.ParseRawURL(path) if err2 != nil { return 0, exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource, diff --git a/pkg/executor/importer/import_test.go b/pkg/executor/importer/import_test.go index 2d88ed82f73ab..12a65805222e6 100644 --- a/pkg/executor/importer/import_test.go +++ b/pkg/executor/importer/import_test.go @@ -48,7 +48,15 @@ import ( ) func TestInitDefaultOptions(t *testing.T) { - plan := &Plan{} + plan := &Plan{ + DataSourceType: DataSourceTypeQuery, + } + plan.initDefaultOptions(10) + require.Equal(t, 1, plan.ThreadCnt) + + plan = &Plan{ + DataSourceType: DataSourceTypeFile, + } variable.CloudStorageURI.Store("s3://bucket/path") t.Cleanup(func() { variable.CloudStorageURI.Store("") @@ -170,13 +178,19 @@ func TestInitOptionsPositiveCase(t *testing.T) { func TestAdjustOptions(t *testing.T) { plan := &Plan{ - DiskQuota: 1, - ThreadCnt: 100000000, - MaxWriteSpeed: 10, + DiskQuota: 1, + ThreadCnt: 100000000, + MaxWriteSpeed: 10, + DataSourceType: DataSourceTypeFile, } plan.adjustOptions(16) require.Equal(t, 16, plan.ThreadCnt) require.Equal(t, config.ByteSize(10), plan.MaxWriteSpeed) // not adjusted + + plan.ThreadCnt = 100000000 + plan.DataSourceType = DataSourceTypeQuery + plan.adjustOptions(16) + require.Equal(t, 32, plan.ThreadCnt) } func TestAdjustDiskQuota(t *testing.T) { @@ -398,3 +412,10 @@ func TestSupportedSuffixForServerDisk(t *testing.T) { c.Path = path.Join(tempDir, "server-*.csv") require.NoError(t, c.InitDataFiles(ctx)) } + +func TestGetDataSourceType(t *testing.T) { + require.Equal(t, DataSourceTypeQuery, getDataSourceType(&plannercore.ImportInto{ + SelectPlan: &plannercore.PhysicalSelection{}, + })) + require.Equal(t, DataSourceTypeFile, getDataSourceType(&plannercore.ImportInto{})) +} diff --git a/pkg/executor/importer/importer_testkit_test.go b/pkg/executor/importer/importer_testkit_test.go index fbc982d6789ab..faa4b968cf146 100644 --- a/pkg/executor/importer/importer_testkit_test.go +++ b/pkg/executor/importer/importer_testkit_test.go @@ -81,21 +81,25 @@ func TestGetTargetNodeCpuCnt(t *testing.T) { require.NoError(t, tm.InitMeta(ctx, "tidb1", "")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(8)")) + targetNodeCPUCnt, err := importer.GetTargetNodeCPUCnt(ctx, importer.DataSourceTypeQuery, "") + require.NoError(t, err) + require.Equal(t, 8, targetNodeCPUCnt) + // invalid path - _, err := importer.GetTargetNodeCPUCnt(ctx, ":xx") + _, err = importer.GetTargetNodeCPUCnt(ctx, importer.DataSourceTypeFile, ":xx") require.ErrorIs(t, err, exeerrors.ErrLoadDataInvalidURI) // server disk import - targetNodeCPUCnt, err := importer.GetTargetNodeCPUCnt(ctx, "/path/to/xxx.csv") + targetNodeCPUCnt, err = importer.GetTargetNodeCPUCnt(ctx, importer.DataSourceTypeFile, "/path/to/xxx.csv") require.NoError(t, err) require.Equal(t, 8, targetNodeCPUCnt) // disttask disabled - targetNodeCPUCnt, err = importer.GetTargetNodeCPUCnt(ctx, "s3://path/to/xxx.csv") + targetNodeCPUCnt, err = importer.GetTargetNodeCPUCnt(ctx, importer.DataSourceTypeFile, "s3://path/to/xxx.csv") require.NoError(t, err) require.Equal(t, 8, targetNodeCPUCnt) // disttask enabled variable.EnableDistTask.Store(true) - targetNodeCPUCnt, err = importer.GetTargetNodeCPUCnt(ctx, "s3://path/to/xxx.csv") + targetNodeCPUCnt, err = importer.GetTargetNodeCPUCnt(ctx, importer.DataSourceTypeFile, "s3://path/to/xxx.csv") require.NoError(t, err) require.Equal(t, 16, targetNodeCPUCnt) } diff --git a/pkg/executor/importer/main_test.go b/pkg/executor/importer/main_test.go new file mode 100644 index 0000000000000..3a0a32f8d62ea --- /dev/null +++ b/pkg/executor/importer/main_test.go @@ -0,0 +1,36 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importer + +import ( + "testing" + + "github.com/pingcap/tidb/pkg/testkit/testsetup" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + testsetup.SetupForCommonTest() + + opts := []goleak.Option{ + goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), + goleak.IgnoreTopFunction("github.com/bazelbuild/rules_go/go/tools/bzltestutil.RegisterTimeoutHandler.func1"), + goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), + goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + goleak.IgnoreTopFunction("syscall.syscall"), + } + goleak.VerifyTestMain(m, opts...) +} diff --git a/pkg/executor/importer/precheck.go b/pkg/executor/importer/precheck.go index 1cb1c6f75a445..baedc8243bd8f 100644 --- a/pkg/executor/importer/precheck.go +++ b/pkg/executor/importer/precheck.go @@ -49,8 +49,10 @@ var GetEtcdClient = getEtcdClient // todo: check if there's running lightning tasks? // we check them one by one, and return the first error we meet. func (e *LoadDataController) CheckRequirements(ctx context.Context, conn sqlexec.SQLExecutor) error { - if err := e.checkTotalFileSize(); err != nil { - return err + if e.DataSourceType == DataSourceTypeFile { + if err := e.checkTotalFileSize(); err != nil { + return err + } } if err := e.checkTableEmpty(ctx, conn); err != nil { return err diff --git a/pkg/executor/importer/precheck_test.go b/pkg/executor/importer/precheck_test.go index 9645365fbf055..ade4bdc09fce2 100644 --- a/pkg/executor/importer/precheck_test.go +++ b/pkg/executor/importer/precheck_test.go @@ -81,7 +81,8 @@ func TestCheckRequirements(t *testing.T) { c := &importer.LoadDataController{ Plan: &importer.Plan{ - DBName: "test", + DBName: "test", + DataSourceType: importer.DataSourceTypeFile, }, Table: tableObj, } diff --git a/pkg/executor/importer/table_import.go b/pkg/executor/importer/table_import.go index 2c3259febfc7b..594585792f1d0 100644 --- a/pkg/executor/importer/table_import.go +++ b/pkg/executor/importer/table_import.go @@ -39,21 +39,27 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/lightning/mydump" verify "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/br/pkg/storage" tidb "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/keyspace" tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/autoid" + tidbmetrics "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" + tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/etcd" "github.com/pingcap/tidb/pkg/util/mathutil" + "github.com/pingcap/tidb/pkg/util/promutil" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/pingcap/tidb/pkg/util/syncutil" + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" clientv3 "go.etcd.io/etcd/client/v3" @@ -83,8 +89,7 @@ var ( // prepareSortDir creates a new directory for import, remove previous sort directory if exists. func prepareSortDir(e *LoadDataController, id string, tidbCfg *tidb.Config) (string, error) { - sortPathSuffix := "import-" + strconv.Itoa(int(tidbCfg.Port)) - importDir := filepath.Join(tidbCfg.TempDir, sortPathSuffix) + importDir := GetImportRootDir(tidbCfg) sortDir := filepath.Join(importDir, id) if info, err := os.Stat(importDir); err != nil || !info.IsDir() { @@ -267,6 +272,46 @@ type TableImporter struct { regionSplitKeys int64 diskQuota int64 diskQuotaLock *syncutil.RWMutex + + rowCh chan QueryRow +} + +// NewTableImporterForTest creates a new table importer for test. +func NewTableImporterForTest(param *JobImportParam, e *LoadDataController, id string, store tidbkv.Storage, helper local.StoreHelper) (*TableImporter, error) { + idAlloc := kv.NewPanickingAllocators(0) + tbl, err := tables.TableFromMeta(idAlloc, e.Table.Meta()) + if err != nil { + return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", e.Table.Meta().Name) + } + + tidbCfg := tidb.GetGlobalConfig() + dir, err := prepareSortDir(e, id, tidbCfg) + if err != nil { + return nil, err + } + + backendConfig := e.getLocalBackendCfg(tidbCfg.Path, dir) + localBackend, err := local.NewBackendForTest(param.GroupCtx, backendConfig, helper) + if err != nil { + return nil, err + } + + return &TableImporter{ + JobImportParam: param, + LoadDataController: e, + id: id, + backend: localBackend, + tableInfo: &checkpoints.TidbTableInfo{ + ID: e.Table.Meta().ID, + Name: e.Table.Meta().Name.O, + Core: e.Table.Meta(), + }, + encTable: tbl, + dbID: e.DBID, + kvStore: store, + logger: e.logger.With(zap.String("import-id", id)), + diskQuotaLock: new(syncutil.RWMutex), + }, nil } func (ti *TableImporter) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) { @@ -598,6 +643,116 @@ func (ti *TableImporter) CheckDiskQuota(ctx context.Context) { } } +// SetSelectedRowCh sets the channel to receive selected rows. +func (ti *TableImporter) SetSelectedRowCh(ch chan QueryRow) { + ti.rowCh = ch +} + +func (ti *TableImporter) closeAndCleanupEngine(engine *backend.OpenedEngine) { + // outer context might be done, so we create a new context here. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + closedEngine, err := engine.Close(ctx) + if err != nil { + ti.logger.Error("close engine failed", zap.Error(err)) + return + } + if err = closedEngine.Cleanup(ctx); err != nil { + ti.logger.Error("cleanup engine failed", zap.Error(err)) + } +} + +// ImportSelectedRows imports selected rows. +func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.Context) (*JobImportResult, error) { + var ( + err error + dataEngine, indexEngine *backend.OpenedEngine + ) + metrics := tidbmetrics.GetRegisteredImportMetrics(promutil.NewDefaultFactory(), + prometheus.Labels{ + proto.TaskIDLabelName: ti.id, + }) + ctx = metric.WithCommonMetric(ctx, metrics) + defer func() { + tidbmetrics.UnregisterImportMetrics(metrics) + if dataEngine != nil { + ti.closeAndCleanupEngine(dataEngine) + } + if indexEngine != nil { + ti.closeAndCleanupEngine(indexEngine) + } + }() + + dataEngine, err = ti.OpenDataEngine(ctx, 1) + if err != nil { + return nil, err + } + indexEngine, err = ti.OpenIndexEngine(ctx, common.IndexEngineID) + if err != nil { + return nil, err + } + + var ( + mu sync.Mutex + checksum verify.KVChecksum + colSizeMap = make(map[int64]int64) + ) + eg, egCtx := tidbutil.NewErrorGroupWithRecoverWithCtx(ctx) + for i := 0; i < int(ti.ThreadCnt); i++ { + eg.Go(func() error { + chunkCheckpoint := checkpoints.ChunkCheckpoint{} + progress := NewProgress() + defer func() { + mu.Lock() + defer mu.Unlock() + checksum.Add(&chunkCheckpoint.Checksum) + for k, v := range progress.GetColSize() { + colSizeMap[k] += v + } + }() + return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, progress, ti.logger) + }) + } + if err = eg.Wait(); err != nil { + return nil, err + } + + closedDataEngine, err := dataEngine.Close(ctx) + if err != nil { + return nil, err + } + failpoint.Inject("mockImportFromSelectErr", func() { + failpoint.Return(nil, errors.New("mock import from select error")) + }) + if err = closedDataEngine.Import(ctx, ti.regionSplitSize, ti.regionSplitKeys); err != nil { + return nil, err + } + dataKVCount := ti.backend.GetImportedKVCount(closedDataEngine.GetUUID()) + + closedIndexEngine, err := indexEngine.Close(ctx) + if err != nil { + return nil, err + } + if err = closedIndexEngine.Import(ctx, ti.regionSplitSize, ti.regionSplitKeys); err != nil { + return nil, err + } + + allocators := ti.Allocators() + maxIDs := map[autoid.AllocatorType]int64{ + autoid.RowIDAllocType: allocators.Get(autoid.RowIDAllocType).Base(), + autoid.AutoIncrementType: allocators.Get(autoid.AutoIncrementType).Base(), + autoid.AutoRandomType: allocators.Get(autoid.AutoRandomType).Base(), + } + if err = postProcess(ctx, se, maxIDs, ti.Plan, checksum, ti.logger); err != nil { + return nil, err + } + + return &JobImportResult{ + Affected: uint64(dataKVCount), + ColSizeMap: colSizeMap, + }, nil +} + func adjustDiskQuota(diskQuota int64, sortDir string, logger *zap.Logger) int64 { sz, err := common.GetStorageSize(sortDir) if err != nil { @@ -624,6 +779,27 @@ func adjustDiskQuota(diskQuota int64, sortDir string, logger *zap.Logger) int64 } } +// postProcess does the post-processing for the task. +func postProcess( + ctx context.Context, + se sessionctx.Context, + maxIDs map[autoid.AllocatorType]int64, + plan *Plan, + localChecksum verify.KVChecksum, + logger *zap.Logger, +) (err error) { + callLog := log.BeginTask(logger.With(zap.Any("checksum", localChecksum)), "post process") + defer func() { + callLog.End(zap.ErrorLevel, err) + }() + + if err = RebaseAllocatorBases(ctx, maxIDs, plan, logger); err != nil { + return err + } + + return VerifyChecksum(ctx, plan, localChecksum, se, logger) +} + type autoIDRequirement struct { store tidbkv.Storage autoidCli *autoid.ClientDiscover @@ -796,3 +972,17 @@ func setBackoffWeight(se sessionctx.Context, plan *Plan, logger *zap.Logger) err logger.Info("set backoff weight", zap.Int("weight", backoffWeight)) return se.GetSessionVars().SetSystemVar(variable.TiDBBackOffWeight, strconv.Itoa(backoffWeight)) } + +// GetImportRootDir returns the root directory for import. +// The directory structure is like: +// +// -> /path/to/tidb-tmpdir +// -> import-4000 +// -> 1 +// -> some-uuid +// +// exported for testing. +func GetImportRootDir(tidbCfg *tidb.Config) string { + sortPathSuffix := "import-" + strconv.Itoa(int(tidbCfg.Port)) + return filepath.Join(tidbCfg.TempDir, sortPathSuffix) +} diff --git a/pkg/executor/importer/table_import_testkit_test.go b/pkg/executor/importer/table_import_testkit_test.go new file mode 100644 index 0000000000000..199952efc43d1 --- /dev/null +++ b/pkg/executor/importer/table_import_testkit_test.go @@ -0,0 +1,127 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importer_test + +import ( + "context" + "os" + "testing" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local" + tidb "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/executor/importer" + tidbkv "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/model" + plannercore "github.com/pingcap/tidb/pkg/planner/core" + "github.com/pingcap/tidb/pkg/session" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" +) + +type storeHelper struct { + kvStore tidbkv.Storage +} + +func (*storeHelper) GetTS(_ context.Context) (physical, logical int64, err error) { + return 0, 0, nil +} + +func (s *storeHelper) GetTiKVCodec() tikv.Codec { + return s.kvStore.GetCodec() +} + +var _ local.StoreHelper = (*storeHelper)(nil) + +func checkImportDirEmpty(t *testing.T) { + tidbCfg := tidb.GetGlobalConfig() + importDir := importer.GetImportRootDir(tidbCfg) + if _, err := os.Stat(importDir); err != nil { + require.True(t, os.IsNotExist(err), importDir) + } else { + entries, err := os.ReadDir(importDir) + require.NoError(t, err) + require.Empty(t, entries) + } +} + +func TestImportFromSelectCleanup(t *testing.T) { + ctx := context.Background() + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tidbCfg := tidb.GetGlobalConfig() + tidbCfg.TempDir = t.TempDir() + checkImportDirEmpty(t) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/importer/mockImportFromSelectErr", `return(true)`)) + t.Cleanup(func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/importer/mockImportFromSelectErr")) + }) + + tk.MustExec("use test") + tk.MustExec("create table t(a int)") + do, err := session.GetDomain(store) + require.NoError(t, err) + dbInfo, ok := do.InfoSchema().SchemaByName(model.NewCIStr("test")) + require.True(t, ok) + table, err := do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + plan, err := importer.NewImportPlan(ctx, tk.Session(), &plannercore.ImportInto{ + Table: &ast.TableName{ + Name: model.NewCIStr("t"), + DBInfo: &model.DBInfo{ + Name: model.NewCIStr("test"), + ID: dbInfo.ID, + }, + }, + SelectPlan: &plannercore.PhysicalSelection{}, + }, table) + require.NoError(t, err) + controller, err := importer.NewLoadDataController(plan, table, &importer.ASTArgs{}) + require.NoError(t, err) + ti, err := importer.NewTableImporterForTest( + &importer.JobImportParam{ + GroupCtx: ctx, + }, + controller, + "11", + store, + &storeHelper{kvStore: store}, + ) + require.NoError(t, err) + ch := make(chan importer.QueryRow) + ti.SetSelectedRowCh(ch) + var wg util.WaitGroupWrapper + wg.Run(func() { + defer close(ch) + for i := 1; i <= 3; i++ { + ch <- importer.QueryRow{ + ID: int64(i), + Data: []types.Datum{ + types.NewIntDatum(int64(i)), + }, + } + } + }) + _, err = ti.ImportSelectedRows(ctx, tk.Session()) + require.ErrorContains(t, err, "mock import from select error") + wg.Wait() + ti.Backend().CloseEngineMgr() + checkImportDirEmpty(t) +} From 0c8051fc96be9c55e80ab02d5cddc9dabb8826b5 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 11 Jan 2024 18:33:05 +0800 Subject: [PATCH 2/9] i t --- pkg/executor/importer/chunk_process.go | 2 +- pkg/executor/importer/import.go | 15 +- .../importintotest2/from_select_test.go | 52 ++++ .../write_after_import_test.go | 283 ++++++++++++------ 4 files changed, 253 insertions(+), 99 deletions(-) create mode 100644 tests/realtikvtest/importintotest2/from_select_test.go diff --git a/pkg/executor/importer/chunk_process.go b/pkg/executor/importer/chunk_process.go index 3cef5c4a62281..5ecd67f144816 100644 --- a/pkg/executor/importer/chunk_process.go +++ b/pkg/executor/importer/chunk_process.go @@ -242,7 +242,7 @@ type ChunkProcessor interface { } type baseChunkProcessor struct { - sourceType dataSourceType + sourceType DataSourceType enc chunkEncoder deliver *dataDeliver logger *zap.Logger diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index 30702689a57fb..cff8d2a811677 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -149,17 +149,18 @@ var ( } ) -type dataSourceType string +// DataSourceType indicates the data source type of IMPORT INTO. +type DataSourceType string const ( // DataSourceTypeFile represents the data source of IMPORT INTO is file. // exported for test. - DataSourceTypeFile dataSourceType = "file" + DataSourceTypeFile DataSourceType = "file" // DataSourceTypeQuery represents the data source of IMPORT INTO is query. - DataSourceTypeQuery dataSourceType = "query" + DataSourceTypeQuery DataSourceType = "query" ) -func (t dataSourceType) String() string { +func (t DataSourceType) String() string { return string(t) } @@ -234,7 +235,7 @@ type Plan struct { // todo: remove it when load data code is reverted. InImportInto bool - DataSourceType dataSourceType + DataSourceType DataSourceType // only initialized for IMPORT INTO, used when creating job. Parameters *ImportParameters `json:"-"` // the user who executes the statement, in the form of user@host @@ -1328,7 +1329,7 @@ func (e *LoadDataController) getLocalBackendCfg(pdAddr, dataDir string) local.Ba return backendConfig } -func getDataSourceType(p *plannercore.ImportInto) dataSourceType { +func getDataSourceType(p *plannercore.ImportInto) DataSourceType { if p.SelectPlan != nil { return DataSourceTypeQuery } @@ -1388,7 +1389,7 @@ func GetMsgFromBRError(err error) string { // target node is current node if it's server-disk import, import from query or disttask is disabled, // else it's the node managed by disttask. // exported for testing. -func GetTargetNodeCPUCnt(ctx context.Context, sourceType dataSourceType, path string) (int, error) { +func GetTargetNodeCPUCnt(ctx context.Context, sourceType DataSourceType, path string) (int, error) { if sourceType == DataSourceTypeQuery { return cpu.GetCPUCount(), nil } diff --git a/tests/realtikvtest/importintotest2/from_select_test.go b/tests/realtikvtest/importintotest2/from_select_test.go new file mode 100644 index 0000000000000..9fdcb1ddeecbc --- /dev/null +++ b/tests/realtikvtest/importintotest2/from_select_test.go @@ -0,0 +1,52 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importintotest + +import ( + "github.com/pingcap/tidb/pkg/executor/importer" + "github.com/pingcap/tidb/pkg/planner/core" + "github.com/pingcap/tidb/pkg/testkit" +) + +func (s *mockGCSSuite) TestImportFromSelectBasic() { + s.prepareAndUseDB("from_select") + s.tk.MustExec("create table src(id int, v varchar(64))") + s.tk.MustExec("create table dst(id int, v varchar(64))") + s.tk.MustExec("insert into src values(4, 'aaaaaa'), (5, 'bbbbbb'), (6, 'cccccc'), (7, 'dddddd')") + + s.ErrorIs(s.tk.ExecToErr(`import into dst FROM select id from src`), core.ErrWrongValueCountOnRow) + s.ErrorIs(s.tk.ExecToErr(`import into dst(id) FROM select * from src`), core.ErrWrongValueCountOnRow) + + s.tk.MustExec(`import into dst FROM select * from src`) + s.Equal(uint64(4), s.tk.Session().GetSessionVars().StmtCtx.AffectedRows()) + s.Contains(s.tk.Session().LastMessage(), "Records: 4,") + s.tk.MustQuery("select * from dst").Check(testkit.Rows("4 aaaaaa", "5 bbbbbb", "6 cccccc", "7 dddddd")) + + // non-empty table + s.ErrorContains(s.tk.ExecToErr(`import into dst FROM select * from src`), "target table is not empty") + + s.tk.MustExec("truncate table dst") + s.tk.MustExec(`import into dst FROM select * from src where id > 5 with thread = 4`) + s.Equal(uint64(2), s.tk.Session().GetSessionVars().StmtCtx.AffectedRows()) + s.Contains(s.tk.Session().LastMessage(), "Records: 2,") + s.tk.MustQuery("select * from dst").Check(testkit.Rows("6 cccccc", "7 dddddd")) +} + +func (s *mockGCSSuite) TestWriteAfterImportFromSelect() { + s.prepareAndUseDB("from_select") + s.tk.MustExec("create table dt(id int, v varchar(64))") + s.tk.MustExec("insert into dt values(4, 'aaaaaa'), (5, 'bbbbbb'), (6, 'cccccc'), (7, 'dddddd')") + s.testWriteAfterImport(`import into t FROM select * from from_select.dt`, importer.DataSourceTypeQuery) +} diff --git a/tests/realtikvtest/importintotest2/write_after_import_test.go b/tests/realtikvtest/importintotest2/write_after_import_test.go index dd3591366c291..34605ae995a03 100644 --- a/tests/realtikvtest/importintotest2/write_after_import_test.go +++ b/tests/realtikvtest/importintotest2/write_after_import_test.go @@ -21,142 +21,243 @@ import ( "github.com/fsouza/fake-gcs-server/fakestorage" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/executor/importer" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/testkit" ) -func (s *mockGCSSuite) TestWriteAfterImport() { - // 2 files, each with 18 bytes, divide by column count 2, the calculated id - // range is [1, 9], [10, 18], the max id if it's used during encoding will be 11. - s.server.CreateObject(fakestorage.Object{ - ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "write_after_import", Name: "1.csv"}, - Content: []byte("4,aaaaaa\n5,bbbbbb\n"), - }) - s.server.CreateObject(fakestorage.Object{ - ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "write_after_import", Name: "2.csv"}, - Content: []byte("6,cccccc\n7,dddddd\n"), - }) - cases := []struct { - createTableSQL string - insertSQL string - insertedData string - nextGlobalAutoID []int64 - autoIDCache1 bool - }{ +type caseResult struct { + insertedData string + nextGlobalAutoID []int64 +} + +type writeAfterImportCase struct { + createTableSQL string + insertSQL string + caseResults map[importer.DataSourceType]caseResult + autoIDCache1 bool +} + +func (s *mockGCSSuite) testWriteAfterImport(importSQL string, sourceType importer.DataSourceType) { + cases := []writeAfterImportCase{ // with auto_increment { - createTableSQL: "CREATE TABLE t (id int AUTO_INCREMENT PRIMARY KEY CLUSTERED, v varchar(64))", - insertSQL: "insert into t(v) values(1)", - insertedData: "8 1", - nextGlobalAutoID: []int64{8}, + createTableSQL: "CREATE TABLE t (id int AUTO_INCREMENT PRIMARY KEY CLUSTERED, v varchar(64))", + insertSQL: "insert into t(v) values(1)", + caseResults: map[importer.DataSourceType]caseResult{ + importer.DataSourceTypeFile: { + insertedData: "8 1", + nextGlobalAutoID: []int64{8}, + }, + importer.DataSourceTypeQuery: { + insertedData: "8 1", + nextGlobalAutoID: []int64{8}, + }, + }, }, { - createTableSQL: "CREATE TABLE t (id int AUTO_INCREMENT PRIMARY KEY CLUSTERED, v varchar(64)) AUTO_ID_CACHE 1", - insertSQL: "insert into t(v) values(1)", - insertedData: "8 1", - nextGlobalAutoID: []int64{8, 8}, - autoIDCache1: true, + createTableSQL: "CREATE TABLE t (id int AUTO_INCREMENT PRIMARY KEY CLUSTERED, v varchar(64)) AUTO_ID_CACHE 1", + insertSQL: "insert into t(v) values(1)", + caseResults: map[importer.DataSourceType]caseResult{ + importer.DataSourceTypeFile: { + insertedData: "8 1", + nextGlobalAutoID: []int64{8, 8}, + }, + importer.DataSourceTypeQuery: { + insertedData: "8 1", + nextGlobalAutoID: []int64{8, 8}, + }, + }, + autoIDCache1: true, }, { - createTableSQL: "CREATE TABLE t (id int AUTO_INCREMENT PRIMARY KEY NONCLUSTERED, v varchar(64))", - insertSQL: "insert into t(v) values(1)", - insertedData: "12 1", - nextGlobalAutoID: []int64{12}, + createTableSQL: "CREATE TABLE t (id int AUTO_INCREMENT PRIMARY KEY NONCLUSTERED, v varchar(64))", + insertSQL: "insert into t(v) values(1)", + caseResults: map[importer.DataSourceType]caseResult{ + importer.DataSourceTypeFile: { + insertedData: "12 1", + nextGlobalAutoID: []int64{12}, + }, + importer.DataSourceTypeQuery: { + insertedData: "8 1", + nextGlobalAutoID: []int64{8}, + }, + }, }, { - createTableSQL: "CREATE TABLE t (id int AUTO_INCREMENT PRIMARY KEY NONCLUSTERED, v varchar(64)) AUTO_ID_CACHE 1", - insertSQL: "insert into t(v) values(1)", - insertedData: "12 1", - nextGlobalAutoID: []int64{12, 12}, - autoIDCache1: true, + createTableSQL: "CREATE TABLE t (id int AUTO_INCREMENT PRIMARY KEY NONCLUSTERED, v varchar(64)) AUTO_ID_CACHE 1", + insertSQL: "insert into t(v) values(1)", + caseResults: map[importer.DataSourceType]caseResult{ + importer.DataSourceTypeFile: { + insertedData: "12 1", + nextGlobalAutoID: []int64{12, 12}, + }, + importer.DataSourceTypeQuery: { + insertedData: "8 1", + nextGlobalAutoID: []int64{8, 8}, + }, + }, + autoIDCache1: true, }, // without auto_increment { createTableSQL: "CREATE TABLE t (id int PRIMARY KEY CLUSTERED, v varchar(64))", insertSQL: "insert into t values(1,1)", - insertedData: "1 1", + caseResults: map[importer.DataSourceType]caseResult{ + importer.DataSourceTypeFile: { + insertedData: "1 1", + }, + importer.DataSourceTypeQuery: { + insertedData: "1 1", + }, + }, }, { createTableSQL: "CREATE TABLE t (id int PRIMARY KEY CLUSTERED, v varchar(64)) AUTO_ID_CACHE 1", insertSQL: "insert into t values(1,1)", - insertedData: "1 1", - autoIDCache1: true, + caseResults: map[importer.DataSourceType]caseResult{ + importer.DataSourceTypeFile: { + insertedData: "1 1", + }, + importer.DataSourceTypeQuery: { + insertedData: "1 1", + }, + }, + autoIDCache1: true, }, { - createTableSQL: "CREATE TABLE t (id int, v varchar(64))", - insertSQL: "insert into t values(1,1)", - insertedData: "1 1", - nextGlobalAutoID: []int64{12}, + createTableSQL: "CREATE TABLE t (id int, v varchar(64))", + insertSQL: "insert into t values(1,1)", + caseResults: map[importer.DataSourceType]caseResult{ + importer.DataSourceTypeFile: { + insertedData: "1 1", + nextGlobalAutoID: []int64{12}, + }, + importer.DataSourceTypeQuery: { + insertedData: "1 1", + nextGlobalAutoID: []int64{5}, + }, + }, }, { - createTableSQL: "CREATE TABLE t (id int, v varchar(64)) AUTO_ID_CACHE 1", - insertSQL: "insert into t values(1,1)", - insertedData: "1 1", - nextGlobalAutoID: []int64{12}, - autoIDCache1: true, + createTableSQL: "CREATE TABLE t (id int, v varchar(64)) AUTO_ID_CACHE 1", + insertSQL: "insert into t values(1,1)", + caseResults: map[importer.DataSourceType]caseResult{ + importer.DataSourceTypeFile: { + insertedData: "1 1", + nextGlobalAutoID: []int64{12}, + }, + importer.DataSourceTypeQuery: { + insertedData: "1 1", + nextGlobalAutoID: []int64{5}, + }, + }, + autoIDCache1: true, }, { - createTableSQL: "CREATE TABLE t (id int PRIMARY KEY NONCLUSTERED, v varchar(64))", - insertSQL: "insert into t values(1,1)", - insertedData: "1 1", - nextGlobalAutoID: []int64{12}, + createTableSQL: "CREATE TABLE t (id int PRIMARY KEY NONCLUSTERED, v varchar(64))", + insertSQL: "insert into t values(1,1)", + caseResults: map[importer.DataSourceType]caseResult{ + importer.DataSourceTypeFile: { + insertedData: "1 1", + nextGlobalAutoID: []int64{12}, + }, + importer.DataSourceTypeQuery: { + insertedData: "1 1", + nextGlobalAutoID: []int64{5}, + }, + }, }, { - createTableSQL: "CREATE TABLE t (id int PRIMARY KEY NONCLUSTERED, v varchar(64)) AUTO_ID_CACHE 1", - insertSQL: "insert into t values(1,1)", - insertedData: "1 1", - nextGlobalAutoID: []int64{12}, - autoIDCache1: true, + createTableSQL: "CREATE TABLE t (id int PRIMARY KEY NONCLUSTERED, v varchar(64)) AUTO_ID_CACHE 1", + insertSQL: "insert into t values(1,1)", + caseResults: map[importer.DataSourceType]caseResult{ + importer.DataSourceTypeFile: { + insertedData: "1 1", + nextGlobalAutoID: []int64{12}, + }, + importer.DataSourceTypeQuery: { + insertedData: "1 1", + nextGlobalAutoID: []int64{5}, + }, + }, + autoIDCache1: true, }, // with auto_random { - createTableSQL: "CREATE TABLE t (id bigint PRIMARY KEY auto_random, v varchar(64))", - insertSQL: "insert into t(v) values(1)", - insertedData: "8 1", - nextGlobalAutoID: []int64{8}, - autoIDCache1: true, + createTableSQL: "CREATE TABLE t (id bigint PRIMARY KEY auto_random, v varchar(64))", + insertSQL: "insert into t(v) values(1)", + caseResults: map[importer.DataSourceType]caseResult{ + importer.DataSourceTypeFile: { + insertedData: "8 1", + nextGlobalAutoID: []int64{8}, + }, + importer.DataSourceTypeQuery: { + insertedData: "8 1", + nextGlobalAutoID: []int64{8}, + }, + }, + autoIDCache1: true, }, } allData := []string{"4 aaaaaa", "5 bbbbbb", "6 cccccc", "7 dddddd"} s.prepareAndUseDB("write_after_import") - loadDataSQL := fmt.Sprintf( - `import into t FROM 'gs://write_after_import/*.csv?endpoint=%s'`, gcsEndpoint) s.T().Cleanup(func() { s.tk.MustExec("drop table if exists t;") }) - for _, c := range cases { - fmt.Println("current case ", c.createTableSQL) - s.tk.MustExec("drop table if exists t;") - s.tk.MustExec(c.createTableSQL) - s.tk.MustQuery(loadDataSQL) - querySQL := "SELECT * FROM t;" - s.tk.MustQuery(querySQL).Check(testkit.Rows(allData...)) + for i, c := range cases { + s.Run(fmt.Sprintf("case-%d", i), func() { + fmt.Println("current case ", c.createTableSQL) + s.tk.MustExec("drop table if exists t;") + s.tk.MustExec(c.createTableSQL) + if sourceType == importer.DataSourceTypeFile { + s.tk.MustQuery(importSQL) + } else { + s.tk.MustExec(importSQL) + } + querySQL := "SELECT * FROM t;" + s.tk.MustQuery(querySQL).Check(testkit.Rows(allData...)) - is := s.tk.Session().GetDomainInfoSchema().(infoschema.InfoSchema) - dbInfo, ok := is.SchemaByName(model.NewCIStr("write_after_import")) - s.True(ok) - tableObj, err := is.TableByName(model.NewCIStr("write_after_import"), model.NewCIStr("t")) - s.NoError(err) - if common.TableHasAutoID(tableObj.Meta()) { - allocators, err := common.GetGlobalAutoIDAlloc(domain.GetDomain(s.tk.Session()), dbInfo.ID, tableObj.Meta()) + is := s.tk.Session().GetDomainInfoSchema().(infoschema.InfoSchema) + dbInfo, ok := is.SchemaByName(model.NewCIStr("write_after_import")) + s.True(ok) + tableObj, err := is.TableByName(model.NewCIStr("write_after_import"), model.NewCIStr("t")) s.NoError(err) - var nextGlobalAutoID []int64 - for _, alloc := range allocators { - id, err := alloc.NextGlobalAutoID() + if common.TableHasAutoID(tableObj.Meta()) { + allocators, err := common.GetGlobalAutoIDAlloc(domain.GetDomain(s.tk.Session()), dbInfo.ID, tableObj.Meta()) s.NoError(err) - nextGlobalAutoID = append(nextGlobalAutoID, id) + var nextGlobalAutoID []int64 + for _, alloc := range allocators { + id, err := alloc.NextGlobalAutoID() + s.NoError(err) + nextGlobalAutoID = append(nextGlobalAutoID, id) + } + s.Equal(c.caseResults[sourceType].nextGlobalAutoID, nextGlobalAutoID) } - s.Equal(c.nextGlobalAutoID, nextGlobalAutoID) - } - // when autoIDCache1=true, the id service is not started in real-tikv-test, cannot insert. - if !c.autoIDCache1 { - s.tk.MustExec(c.insertSQL) - newAllData := append(allData, c.insertedData) - slices.Sort(newAllData) - s.tk.MustQuery(querySQL).Sort().Check(testkit.Rows(newAllData...)) - } + // when autoIDCache1=true, the id service is not started in real-tikv-test, cannot insert. + if !c.autoIDCache1 { + s.tk.MustExec(c.insertSQL) + newAllData := append(allData, c.caseResults[sourceType].insertedData) + slices.Sort(newAllData) + s.tk.MustQuery(querySQL).Sort().Check(testkit.Rows(newAllData...)) + } + }) } } +func (s *mockGCSSuite) TestWriteAfterImportFromFile() { + // 2 files, each with 18 bytes, divide by column count 2, the calculated id + // range is [1, 9], [10, 18], the max id if it's used during encoding will be 11. + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "write_after_import", Name: "1.csv"}, + Content: []byte("4,aaaaaa\n5,bbbbbb\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "write_after_import", Name: "2.csv"}, + Content: []byte("6,cccccc\n7,dddddd\n"), + }) + importSQL := fmt.Sprintf(`import into t FROM 'gs://write_after_import/*.csv?endpoint=%s'`, gcsEndpoint) + s.testWriteAfterImport(importSQL, importer.DataSourceTypeFile) +} From 4ddb22dde9b9ba1d5be8f4a6c754d05774eb3d3e Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 11 Jan 2024 18:44:59 +0800 Subject: [PATCH 3/9] test --- .../importintotest2/from_select_test.go | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/tests/realtikvtest/importintotest2/from_select_test.go b/tests/realtikvtest/importintotest2/from_select_test.go index 9fdcb1ddeecbc..d2069ce69cf0d 100644 --- a/tests/realtikvtest/importintotest2/from_select_test.go +++ b/tests/realtikvtest/importintotest2/from_select_test.go @@ -15,6 +15,10 @@ package importintotest import ( + "fmt" + "slices" + "strings" + "github.com/pingcap/tidb/pkg/executor/importer" "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/testkit" @@ -37,11 +41,30 @@ func (s *mockGCSSuite) TestImportFromSelectBasic() { // non-empty table s.ErrorContains(s.tk.ExecToErr(`import into dst FROM select * from src`), "target table is not empty") + // with where s.tk.MustExec("truncate table dst") - s.tk.MustExec(`import into dst FROM select * from src where id > 5 with thread = 4`) + s.tk.MustExec(`import into dst FROM select * from src where id > 5`) s.Equal(uint64(2), s.tk.Session().GetSessionVars().StmtCtx.AffectedRows()) s.Contains(s.tk.Session().LastMessage(), "Records: 2,") s.tk.MustQuery("select * from dst").Check(testkit.Rows("6 cccccc", "7 dddddd")) + + // parallel + s.enableFailpoint("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", `return(8)`) + s.tk.MustExec("truncate table src") + s.tk.MustExec("truncate table dst") + var count = 5000 + values := make([]string, 0, count) + queryResult := make([]string, 0, count) + for i := 0; i < count; i++ { + values = append(values, fmt.Sprintf("(%d, 'abc-%d')", i, i)) + queryResult = append(queryResult, fmt.Sprintf("%d abc-%d", i, i)) + } + slices.Sort(queryResult) + s.tk.MustExec("insert into src values " + strings.Join(values, ",")) + s.tk.MustExec(`import into dst FROM select * from src with thread = 8`) + s.Equal(uint64(count), s.tk.Session().GetSessionVars().StmtCtx.AffectedRows()) + s.Contains(s.tk.Session().LastMessage(), fmt.Sprintf("Records: %d,", count)) + s.tk.MustQuery("select * from dst").Sort().Check(testkit.Rows(queryResult...)) } func (s *mockGCSSuite) TestWriteAfterImportFromSelect() { From 84aa013646f56a1cf575a20387a7d00662c3ff2a Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 11 Jan 2024 18:46:07 +0800 Subject: [PATCH 4/9] bazel --- pkg/executor/BUILD.bazel | 1 + pkg/executor/importer/BUILD.bazel | 10 +++++++++- tests/realtikvtest/importintotest2/BUILD.bazel | 3 +++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index 80e4056ad82ba..db69ba5212c79 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -242,6 +242,7 @@ go_library( "@com_github_burntsushi_toml//:toml", "@com_github_docker_go_units//:go-units", "@com_github_gogo_protobuf//proto", + "@com_github_google_uuid//:uuid", "@com_github_ngaut_pools//:pools", "@com_github_opentracing_basictracer_go//:basictracer-go", "@com_github_opentracing_opentracing_go//:opentracing-go", diff --git a/pkg/executor/importer/BUILD.bazel b/pkg/executor/importer/BUILD.bazel index 33cba79b31ef4..2472ce71cdcc7 100644 --- a/pkg/executor/importer/BUILD.bazel +++ b/pkg/executor/importer/BUILD.bazel @@ -33,10 +33,12 @@ go_library( "//pkg/config", "//pkg/ddl/util", "//pkg/disttask/framework/handle", + "//pkg/disttask/framework/proto", "//pkg/expression", "//pkg/keyspace", "//pkg/kv", "//pkg/meta/autoid", + "//pkg/metrics", "//pkg/parser", "//pkg/parser/ast", "//pkg/parser/format", @@ -61,6 +63,7 @@ go_library( "//pkg/util/intest", "//pkg/util/logutil", "//pkg/util/mathutil", + "//pkg/util/promutil", "//pkg/util/sqlexec", "//pkg/util/stringutil", "//pkg/util/syncutil", @@ -88,13 +91,15 @@ go_test( "import_test.go", "importer_testkit_test.go", "job_test.go", + "main_test.go", "precheck_test.go", "table_import_test.go", + "table_import_testkit_test.go", ], embed = [":importer"], flaky = True, race = "on", - shard_count = 22, + shard_count = 24, deps = [ "//br/pkg/errors", "//br/pkg/lightning/backend/encode", @@ -120,7 +125,9 @@ go_test( "//pkg/sessionctx", "//pkg/sessionctx/variable", "//pkg/testkit", + "//pkg/testkit/testsetup", "//pkg/types", + "//pkg/util", "//pkg/util/dbterror/exeerrors", "//pkg/util/etcd", "//pkg/util/logutil", @@ -138,6 +145,7 @@ go_test( "@com_github_tikv_client_go_v2//util", "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_server_v3//embed", + "@org_uber_go_goleak//:goleak", "@org_uber_go_mock//gomock", "@org_uber_go_zap//:zap", ], diff --git a/tests/realtikvtest/importintotest2/BUILD.bazel b/tests/realtikvtest/importintotest2/BUILD.bazel index 02337087b1329..eeb95a299f22e 100644 --- a/tests/realtikvtest/importintotest2/BUILD.bazel +++ b/tests/realtikvtest/importintotest2/BUILD.bazel @@ -4,6 +4,7 @@ go_test( name = "importintotest2_test", timeout = "moderate", srcs = [ + "from_select_test.go", "main_test.go", "write_after_import_test.go", ], @@ -13,9 +14,11 @@ go_test( "//br/pkg/lightning/common", "//pkg/config", "//pkg/domain", + "//pkg/executor/importer", "//pkg/infoschema", "//pkg/kv", "//pkg/parser/model", + "//pkg/planner/core", "//pkg/testkit", "//tests/realtikvtest", "@com_github_fsouza_fake_gcs_server//fakestorage", From f79fd0399c094aa40b02dc1d36240caf01e0dbf2 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 11 Jan 2024 19:10:50 +0800 Subject: [PATCH 5/9] change --- pkg/executor/import_into.go | 5 ++++- pkg/executor/importer/import.go | 10 ++++++++-- pkg/executor/importer/table_import.go | 9 ++------- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/pkg/executor/import_into.go b/pkg/executor/import_into.go index 71f74a7c1f743..68c8e9d0ee4de 100644 --- a/pkg/executor/import_into.go +++ b/pkg/executor/import_into.go @@ -108,6 +108,7 @@ func (e *ImportIntoExec) Next(ctx context.Context, req *chunk.Chunk) (err error) e.controller = controller if e.selectExec != nil { + // `import from select` doesn't return rows, so no need to set dataFilled. return e.importFromSelect(ctx) } @@ -277,7 +278,9 @@ func (e *ImportIntoExec) importFromSelect(ctx context.Context) error { } importID := uuid.New().String() logutil.Logger(ctx).Info("importing data from select statement", - zap.String("importID", importID), zap.Int("concurrency", e.controller.ThreadCnt)) + zap.String("import-id", importID), zap.Int("concurrency", e.controller.ThreadCnt), + zap.String("target-table", e.controller.FullTableName()), + zap.Int64("target-table-id", e.controller.TableInfo.ID)) ti, err2 := importer.NewTableImporter(param, e.controller, importID) if err2 != nil { return err2 diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index cff8d2a811677..e072c02e7f188 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -134,7 +134,7 @@ var ( splitFileOption: {}, } - importFromQueryOptions = map[string]struct{}{ + allowedOptionsOfImportFromQuery = map[string]struct{}{ threadOption: {}, } @@ -576,7 +576,7 @@ func (p *Plan) initOptions(ctx context.Context, seCtx sessionctx.Context, option } if p.DataSourceType == DataSourceTypeQuery { for k := range specifiedOptions { - if _, ok := importFromQueryOptions[k]; !ok { + if _, ok := allowedOptionsOfImportFromQuery[k]; !ok { return exeerrors.ErrLoadDataUnsupportedOption.FastGenByArgs(k, "import from query") } } @@ -754,6 +754,7 @@ func (p *Plan) adjustOptions(targetNodeCPUCnt int) { if p.DataSourceType == DataSourceTypeQuery { // for query, row is produced using 1 thread, the max cpu used is much // lower than import from file, so we set limit to 2*targetNodeCPUCnt. + // TODO: adjust after spec is ready. limit *= 2 } // max value is cpu-count @@ -1329,6 +1330,11 @@ func (e *LoadDataController) getLocalBackendCfg(pdAddr, dataDir string) local.Ba return backendConfig } +// FullTableName return FQDN of the table. +func (e *LoadDataController) FullTableName() string { + return common.UniqueTable(e.DBName, e.Table.Meta().Name.O) +} + func getDataSourceType(p *plannercore.ImportInto) DataSourceType { if p.SelectPlan != nil { return DataSourceTypeQuery diff --git a/pkg/executor/importer/table_import.go b/pkg/executor/importer/table_import.go index 594585792f1d0..49b9fe4d43a4a 100644 --- a/pkg/executor/importer/table_import.go +++ b/pkg/executor/importer/table_import.go @@ -511,7 +511,7 @@ func (ti *TableImporter) OpenIndexEngine(ctx context.Context, engineID int32) (* CompactThreshold: threshold, BlockSize: 16 * 1024, } - fullTableName := ti.fullTableName() + fullTableName := ti.FullTableName() // todo: cleanup all engine data on any error since we don't support checkpoint for now // some return path, didn't make sure all data engine and index engine are cleaned up. // maybe we can add this in upper level to clean the whole local-sort directory @@ -532,7 +532,7 @@ func (ti *TableImporter) OpenDataEngine(ctx context.Context, engineID int32) (*b // dataEngineCfg.Local.CompactThreshold = local.CompactionUpperThreshold //} mgr := backend.MakeEngineManager(ti.backend) - return mgr.OpenEngine(ctx, dataEngineCfg, ti.fullTableName(), engineID) + return mgr.OpenEngine(ctx, dataEngineCfg, ti.FullTableName(), engineID) } // ImportAndCleanup imports the engine and cleanup the engine data. @@ -548,11 +548,6 @@ func (ti *TableImporter) ImportAndCleanup(ctx context.Context, closedEngine *bac return kvCount, multierr.Combine(importErr, cleanupErr) } -// FullTableName return FQDN of the table. -func (ti *TableImporter) fullTableName() string { - return common.UniqueTable(ti.DBName, ti.Table.Meta().Name.O) -} - // Backend returns the backend of the importer. func (ti *TableImporter) Backend() *local.Backend { return ti.backend From 7659774de68b70e61b222e17a320c3efb74efbcb Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Fri, 12 Jan 2024 11:01:33 +0800 Subject: [PATCH 6/9] more test --- pkg/executor/importer/chunk_process.go | 2 +- pkg/executor/importer/table_import.go | 2 +- tests/realtikvtest/importintotest/util_test.go | 2 +- .../importintotest2/from_select_test.go | 15 +++++++++++++++ tests/realtikvtest/importintotest2/main_test.go | 2 +- tests/realtikvtest/importintotest3/main_test.go | 2 +- tests/realtikvtest/importintotest4/main_test.go | 2 +- 7 files changed, 21 insertions(+), 6 deletions(-) diff --git a/pkg/executor/importer/chunk_process.go b/pkg/executor/importer/chunk_process.go index 5ecd67f144816..dc793fc7fd917 100644 --- a/pkg/executor/importer/chunk_process.go +++ b/pkg/executor/importer/chunk_process.go @@ -445,7 +445,7 @@ type queryChunkEncoder struct { var _ chunkEncoder = (*queryChunkEncoder)(nil) -func (e *queryChunkEncoder) init() error { +func (*queryChunkEncoder) init() error { return nil } diff --git a/pkg/executor/importer/table_import.go b/pkg/executor/importer/table_import.go index 49b9fe4d43a4a..92facd037aed4 100644 --- a/pkg/executor/importer/table_import.go +++ b/pkg/executor/importer/table_import.go @@ -693,7 +693,7 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C colSizeMap = make(map[int64]int64) ) eg, egCtx := tidbutil.NewErrorGroupWithRecoverWithCtx(ctx) - for i := 0; i < int(ti.ThreadCnt); i++ { + for i := 0; i < ti.ThreadCnt; i++ { eg.Go(func() error { chunkCheckpoint := checkpoints.ChunkCheckpoint{} progress := NewProgress() diff --git a/tests/realtikvtest/importintotest/util_test.go b/tests/realtikvtest/importintotest/util_test.go index 2afa9a2c5d41d..5e594fa62865e 100644 --- a/tests/realtikvtest/importintotest/util_test.go +++ b/tests/realtikvtest/importintotest/util_test.go @@ -47,7 +47,7 @@ var ( maxWaitTime = 30 * time.Second ) -func TestLoadRemote(t *testing.T) { +func TestImportInto(t *testing.T) { suite.Run(t, &mockGCSSuite{}) } diff --git a/tests/realtikvtest/importintotest2/from_select_test.go b/tests/realtikvtest/importintotest2/from_select_test.go index d2069ce69cf0d..d0d8012df2a0e 100644 --- a/tests/realtikvtest/importintotest2/from_select_test.go +++ b/tests/realtikvtest/importintotest2/from_select_test.go @@ -67,6 +67,21 @@ func (s *mockGCSSuite) TestImportFromSelectBasic() { s.tk.MustQuery("select * from dst").Sort().Check(testkit.Rows(queryResult...)) } +func (s *mockGCSSuite) TestImportFromSelectColumnList() { + s.prepareAndUseDB("from_select") + s.tk.MustExec("create table src(id int, a varchar(64))") + s.tk.MustExec("create table dst(id int auto_increment primary key, a varchar(64), b int default 10, c int)") + s.tk.MustExec("insert into src values(4, 'aaaaaa'), (5, 'bbbbbb'), (6, 'cccccc'), (7, 'dddddd')") + s.tk.MustExec(`import into dst(c, a) FROM select * from src order by id`) + s.tk.MustQuery("select * from dst").Check(testkit.Rows("1 aaaaaa 10 4", "2 bbbbbb 10 5", "3 cccccc 10 6", "4 dddddd 10 7")) + + s.tk.MustExec("truncate table dst") + s.tk.MustExec("create table src2(id int, a varchar(64))") + s.tk.MustExec("insert into src2 values(4, 'four'), (5, 'five')") + s.tk.MustExec(`import into dst(c, a) FROM select y.id, y.a from src x join src2 y on x.id = y.id order by y.id`) + s.tk.MustQuery("select * from dst").Check(testkit.Rows("1 four 10 4", "2 five 10 5")) +} + func (s *mockGCSSuite) TestWriteAfterImportFromSelect() { s.prepareAndUseDB("from_select") s.tk.MustExec("create table dt(id int, v varchar(64))") diff --git a/tests/realtikvtest/importintotest2/main_test.go b/tests/realtikvtest/importintotest2/main_test.go index 645422f2a68cb..c6eb2e0c3bd40 100644 --- a/tests/realtikvtest/importintotest2/main_test.go +++ b/tests/realtikvtest/importintotest2/main_test.go @@ -45,7 +45,7 @@ var ( gcsEndpoint = fmt.Sprintf(gcsEndpointFormat, gcsHost, gcsPort) ) -func TestLoadRemote(t *testing.T) { +func TestImportInto(t *testing.T) { suite.Run(t, &mockGCSSuite{}) } diff --git a/tests/realtikvtest/importintotest3/main_test.go b/tests/realtikvtest/importintotest3/main_test.go index 645422f2a68cb..c6eb2e0c3bd40 100644 --- a/tests/realtikvtest/importintotest3/main_test.go +++ b/tests/realtikvtest/importintotest3/main_test.go @@ -45,7 +45,7 @@ var ( gcsEndpoint = fmt.Sprintf(gcsEndpointFormat, gcsHost, gcsPort) ) -func TestLoadRemote(t *testing.T) { +func TestImportInto(t *testing.T) { suite.Run(t, &mockGCSSuite{}) } diff --git a/tests/realtikvtest/importintotest4/main_test.go b/tests/realtikvtest/importintotest4/main_test.go index 645422f2a68cb..c6eb2e0c3bd40 100644 --- a/tests/realtikvtest/importintotest4/main_test.go +++ b/tests/realtikvtest/importintotest4/main_test.go @@ -45,7 +45,7 @@ var ( gcsEndpoint = fmt.Sprintf(gcsEndpointFormat, gcsHost, gcsPort) ) -func TestLoadRemote(t *testing.T) { +func TestImportInto(t *testing.T) { suite.Run(t, &mockGCSSuite{}) } From f3748a693d46b3bae7394db30ee6518999da47d4 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Fri, 12 Jan 2024 11:37:18 +0800 Subject: [PATCH 7/9] fix test --- br/pkg/lightning/backend/local/engine_mgr_test.go | 3 ++- br/pkg/lightning/backend/local/local_test.go | 10 ++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/br/pkg/lightning/backend/local/engine_mgr_test.go b/br/pkg/lightning/backend/local/engine_mgr_test.go index 0a117cad54809..e73afefd6b642 100644 --- a/br/pkg/lightning/backend/local/engine_mgr_test.go +++ b/br/pkg/lightning/backend/local/engine_mgr_test.go @@ -18,6 +18,7 @@ import ( "context" "io" "os" + "path" "sync" "testing" @@ -36,7 +37,7 @@ func getBackendConfig(t *testing.T) BackendConfig { MemTableSize: config.DefaultEngineMemCacheSize, MaxOpenFiles: 1000, DisableAutomaticCompactions: true, - LocalStoreDir: t.TempDir(), + LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"), DupeDetectEnabled: false, DuplicateDetectOpt: common.DupDetectOpt{}, WorkerConcurrency: 8, diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 180e3e7cdc5b3..45220e6cd74c3 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -22,6 +22,7 @@ import ( "io" "math" "math/rand" + "path" "path/filepath" "sort" "strings" @@ -1258,6 +1259,7 @@ func TestCheckPeersBusy(t *testing.T) { supportMultiIngest: true, BackendConfig: BackendConfig{ ShouldCheckWriteStall: true, + LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"), }, tikvCodec: keyspace.CodecV1, } @@ -1380,6 +1382,7 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) { supportMultiIngest: true, BackendConfig: BackendConfig{ ShouldCheckWriteStall: true, + LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"), }, tikvCodec: keyspace.CodecV1, } @@ -1477,6 +1480,9 @@ func TestPartialWriteIngestErrorWontPanic(t *testing.T) { writeLimiter: noopStoreWriteLimiter{}, supportMultiIngest: true, tikvCodec: keyspace.CodecV1, + BackendConfig: BackendConfig{ + LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"), + }, } var err error local.engineMgr, err = newEngineManager(local.BackendConfig, local, local.logger) @@ -1570,6 +1576,9 @@ func TestPartialWriteIngestBusy(t *testing.T) { writeLimiter: noopStoreWriteLimiter{}, supportMultiIngest: true, tikvCodec: keyspace.CodecV1, + BackendConfig: BackendConfig{ + LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"), + }, } var err error local.engineMgr, err = newEngineManager(local.BackendConfig, local, local.logger) @@ -2322,6 +2331,7 @@ func TestExternalEngine(t *testing.T) { local := &Backend{ BackendConfig: BackendConfig{ WorkerConcurrency: 2, + LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"), }, splitCli: initTestSplitClient([][]byte{ keys[0], keys[50], endKey, From 87c73b0fb6ce5c5cf6d792fae7d8cdf7e57dcc8c Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 17 Jan 2024 11:30:46 +0800 Subject: [PATCH 8/9] add comments --- pkg/executor/import_into.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/executor/import_into.go b/pkg/executor/import_into.go index 68c8e9d0ee4de..bbbb1e6fccb88 100644 --- a/pkg/executor/import_into.go +++ b/pkg/executor/import_into.go @@ -268,6 +268,7 @@ func (e *ImportIntoExec) importFromSelect(ctx context.Context) error { return err } + // TODO: we didn't use this `group` here, but have to init GroupCtx, refactor this later. group, groupCtx := errgroup.WithContext(ctx) param := &importer.JobImportParam{ Job: &importer.Job{}, From 6c1c867d973b3a423134c128b9fb16dbbde0632c Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 17 Jan 2024 13:36:30 +0800 Subject: [PATCH 9/9] fix comments --- pkg/executor/importer/chunk_process.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/executor/importer/chunk_process.go b/pkg/executor/importer/chunk_process.go index dc793fc7fd917..636f1048212f9 100644 --- a/pkg/executor/importer/chunk_process.go +++ b/pkg/executor/importer/chunk_process.go @@ -104,7 +104,7 @@ func (b *deliverKVBatch) add(kvs *kv.Pairs) { type chunkEncoder interface { init() error encodeLoop(ctx context.Context) error - logFields() []zap.Field + summaryFields() []zap.Field } // fileChunkEncoder encode data chunk(either a data file or part of a file). @@ -228,7 +228,7 @@ func (p *fileChunkEncoder) encodeLoop(ctx context.Context) error { return nil } -func (p *fileChunkEncoder) logFields() []zap.Field { +func (p *fileChunkEncoder) summaryFields() []zap.Field { return []zap.Field{ zap.Duration("readDur", p.readTotalDur), zap.Duration("encodeDur", p.encodeTotalDur), @@ -252,7 +252,7 @@ type baseChunkProcessor struct { func (p *baseChunkProcessor) Process(ctx context.Context) (err error) { task := log.BeginTask(p.logger, "process chunk") defer func() { - logFields := append(p.enc.logFields(), p.deliver.logFields()...) + logFields := append(p.enc.summaryFields(), p.deliver.logFields()...) logFields = append(logFields, zap.Stringer("type", p.sourceType)) task.End(zap.ErrorLevel, err, logFields...) if metrics, ok := metric.GetCommonMetric(ctx); ok && err == nil { @@ -449,6 +449,7 @@ func (*queryChunkEncoder) init() error { return nil } +// TODO logic is very similar to fileChunkEncoder, consider merge them. func (e *queryChunkEncoder) encodeLoop(ctx context.Context) error { var err error reachEOF := false @@ -523,7 +524,7 @@ func (e *queryChunkEncoder) encodeLoop(ctx context.Context) error { return nil } -func (e *queryChunkEncoder) logFields() []zap.Field { +func (e *queryChunkEncoder) summaryFields() []zap.Field { return []zap.Field{ zap.Duration("readDur", e.readTotalDur), zap.Duration("encodeDur", e.encodeTotalDur),