Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importinto: executor part of import from select #50341

Merged
merged 9 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions br/pkg/lightning/backend/local/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func newEngineManager(config BackendConfig, storeHelper StoreHelper, logger log.
}
}()

if err = prepareSortDir(config); err != nil {
return nil, err
}

keyAdapter := common.KeyAdapter(common.NoopKeyAdapter{})
if config.DupeDetectEnabled {
duplicateDB, err = openDuplicateDB(config.LocalStoreDir)
Expand Down Expand Up @@ -525,6 +529,15 @@ func (em *engineManager) close() {
}
em.duplicateDB = nil
}

// if checkpoint is disabled, or we finish load all data successfully, then files in this
// dir will be useless, so we clean up this dir and all files in it.
if !em.CheckpointEnabled || common.IsEmptyDir(em.LocalStoreDir) {
err := os.RemoveAll(em.LocalStoreDir)
if err != nil {
em.logger.Warn("remove local db file failed", zap.Error(err))
}
}
}

func (em *engineManager) getExternalEngine(uuid uuid.UUID) (common.Engine, bool) {
Expand Down Expand Up @@ -566,3 +579,24 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) {
}
return pebble.Open(dbPath, opts)
}

func prepareSortDir(config BackendConfig) error {
shouldCreate := true
if config.CheckpointEnabled {
if info, err := os.Stat(config.LocalStoreDir); err != nil {
if !os.IsNotExist(err) {
return err
}
} else if info.IsDir() {
shouldCreate = false
}
}

if shouldCreate {
err := os.Mkdir(config.LocalStoreDir, 0o700)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use MkdirAll?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copied from local backend ctor

if err != nil {
return common.ErrInvalidSortedKVDir.Wrap(err).GenWithStackByArgs(config.LocalStoreDir)
}
}
return nil
}
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/local/engine_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"io"
"os"
"path"
"sync"
"testing"

Expand All @@ -36,7 +37,7 @@ func getBackendConfig(t *testing.T) BackendConfig {
MemTableSize: config.DefaultEngineMemCacheSize,
MaxOpenFiles: 1000,
DisableAutomaticCompactions: true,
LocalStoreDir: t.TempDir(),
LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"),
DupeDetectEnabled: false,
DuplicateDetectOpt: common.DupDetectOpt{},
WorkerConcurrency: 8,
Expand Down
54 changes: 27 additions & 27 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"io"
"math"
"net"
"os"
"path/filepath"
"strings"
"sync"
Expand Down Expand Up @@ -517,24 +516,6 @@ func NewBackend(
return nil, common.NormalizeOrWrapErr(common.ErrCreatePDClient, err)
}

shouldCreate := true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to engine-mgr, below Close part too

if config.CheckpointEnabled {
if info, err := os.Stat(config.LocalStoreDir); err != nil {
if !os.IsNotExist(err) {
return nil, err
}
} else if info.IsDir() {
shouldCreate = false
}
}

if shouldCreate {
err = os.Mkdir(config.LocalStoreDir, 0o700)
if err != nil {
return nil, common.ErrInvalidSortedKVDir.Wrap(err).GenWithStackByArgs(config.LocalStoreDir)
}
}

// The following copies tikv.NewTxnClient without creating yet another pdClient.
spkv, err := tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig())
if err != nil {
Expand Down Expand Up @@ -597,6 +578,27 @@ func NewBackend(
return local, nil
}

// NewBackendForTest creates a new Backend for test.
func NewBackendForTest(ctx context.Context, config BackendConfig, storeHelper StoreHelper) (*Backend, error) {
config.adjust()

logger := log.FromContext(ctx)
engineMgr, err := newEngineManager(config, storeHelper, logger)
if err != nil {
return nil, err
}
local := &Backend{
BackendConfig: config,
logger: logger,
engineMgr: engineMgr,
}
if m, ok := metric.GetCommonMetric(ctx); ok {
local.metrics = m
}

return local, nil
}

// TotalMemoryConsume returns the total memory usage of the local backend.
func (local *Backend) TotalMemoryConsume() int64 {
return local.engineMgr.totalMemoryConsume()
Expand Down Expand Up @@ -672,14 +674,6 @@ func (local *Backend) Close() {
local.engineMgr.close()
local.importClientFactory.Close()

// if checkpoint is disabled, or we finish load all data successfully, then files in this
// dir will be useless, so we clean up this dir and all files in it.
if !local.CheckpointEnabled || common.IsEmptyDir(local.LocalStoreDir) {
err := os.RemoveAll(local.LocalStoreDir)
if err != nil {
local.logger.Warn("remove local db file failed", zap.Error(err))
}
}
_ = local.tikvCli.Close()
local.pdHTTPCli.Close()
local.pdCli.Close()
Expand Down Expand Up @@ -1592,6 +1586,12 @@ func (local *Backend) GetTiKVCodec() tikvclient.Codec {
return local.tikvCodec
}

// CloseEngineMgr close the engine manager.
// This function is used for test.
func (local *Backend) CloseEngineMgr() {
local.engineMgr.close()
}

var getSplitConfFromStoreFunc = getSplitConfFromStore

// return region split size, region split keys, error
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"math"
"math/rand"
"path"
"path/filepath"
"sort"
"strings"
Expand Down Expand Up @@ -1258,6 +1259,7 @@ func TestCheckPeersBusy(t *testing.T) {
supportMultiIngest: true,
BackendConfig: BackendConfig{
ShouldCheckWriteStall: true,
LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"),
},
tikvCodec: keyspace.CodecV1,
}
Expand Down Expand Up @@ -1380,6 +1382,7 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) {
supportMultiIngest: true,
BackendConfig: BackendConfig{
ShouldCheckWriteStall: true,
LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"),
},
tikvCodec: keyspace.CodecV1,
}
Expand Down Expand Up @@ -1477,6 +1480,9 @@ func TestPartialWriteIngestErrorWontPanic(t *testing.T) {
writeLimiter: noopStoreWriteLimiter{},
supportMultiIngest: true,
tikvCodec: keyspace.CodecV1,
BackendConfig: BackendConfig{
LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"),
},
}
var err error
local.engineMgr, err = newEngineManager(local.BackendConfig, local, local.logger)
Expand Down Expand Up @@ -1570,6 +1576,9 @@ func TestPartialWriteIngestBusy(t *testing.T) {
writeLimiter: noopStoreWriteLimiter{},
supportMultiIngest: true,
tikvCodec: keyspace.CodecV1,
BackendConfig: BackendConfig{
LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"),
},
}
var err error
local.engineMgr, err = newEngineManager(local.BackendConfig, local, local.logger)
Expand Down Expand Up @@ -2322,6 +2331,7 @@ func TestExternalEngine(t *testing.T) {
local := &Backend{
BackendConfig: BackendConfig{
WorkerConcurrency: 2,
LocalStoreDir: path.Join(t.TempDir(), "sorted-kv"),
},
splitCli: initTestSplitClient([][]byte{
keys[0], keys[50], endKey,
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ go_library(
"@com_github_burntsushi_toml//:toml",
"@com_github_docker_go_units//:go-units",
"@com_github_gogo_protobuf//proto",
"@com_github_google_uuid//:uuid",
"@com_github_ngaut_pools//:pools",
"@com_github_opentracing_basictracer_go//:basictracer-go",
"@com_github_opentracing_opentracing_go//:opentracing-go",
Expand Down
106 changes: 104 additions & 2 deletions pkg/executor/import_into.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package executor

import (
"context"
"fmt"
"sync/atomic"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/log"
Expand Down Expand Up @@ -105,6 +107,11 @@ func (e *ImportIntoExec) Next(ctx context.Context, req *chunk.Chunk) (err error)
e.importPlan = importPlan
e.controller = controller

if e.selectExec != nil {
// `import from select` doesn't return rows, so no need to set dataFilled.
return e.importFromSelect(ctx)
}

if err2 := e.controller.InitDataFiles(ctx); err2 != nil {
return err2
}
Expand Down Expand Up @@ -237,12 +244,107 @@ func (e *ImportIntoExec) doImport(ctx context.Context, se sessionctx.Context, di
// use background, since ctx is canceled already.
return cancelAndWaitImportJob(context.Background(), taskManager, distImporter.JobID())
}
if err2 := flushStats(ctx, se, e.importPlan.TableInfo.ID, distImporter.Result(ctx)); err2 != nil {
importResult := distImporter.Result(ctx)
if err2 := flushStats(ctx, se, e.importPlan.TableInfo.ID, &importResult); err2 != nil {
logutil.Logger(ctx).Error("flush stats failed", zap.Error(err2))
}
return err
}

func (e *ImportIntoExec) importFromSelect(ctx context.Context) error {
e.dataFilled = true
// must use a new session to pre-check, else the stmt in show processlist will be changed.
newSCtx, err2 := CreateSession(e.userSctx)
if err2 != nil {
return err2
}
defer CloseSession(newSCtx)

sqlExec := newSCtx.(sqlexec.SQLExecutor)
if err2 = e.controller.CheckRequirements(ctx, sqlExec); err2 != nil {
return err2
}
if err := e.importPlan.InitTiKVConfigs(ctx, newSCtx); err != nil {
return err
}

// TODO: we didn't use this `group` here, but have to init GroupCtx, refactor this later.
group, groupCtx := errgroup.WithContext(ctx)
param := &importer.JobImportParam{
Job: &importer.Job{},
Group: group,
GroupCtx: groupCtx,
Done: make(chan struct{}),
Progress: importer.NewProgress(),
}
importID := uuid.New().String()
logutil.Logger(ctx).Info("importing data from select statement",
zap.String("import-id", importID), zap.Int("concurrency", e.controller.ThreadCnt),
zap.String("target-table", e.controller.FullTableName()),
zap.Int64("target-table-id", e.controller.TableInfo.ID))
ti, err2 := importer.NewTableImporter(param, e.controller, importID)
if err2 != nil {
return err2
}
defer func() {
if err := ti.Close(); err != nil {
logutil.Logger(ctx).Error("close importer failed", zap.Error(err))
}
}()
selectedRowCh := make(chan importer.QueryRow)
ti.SetSelectedRowCh(selectedRowCh)

var importResult *importer.JobImportResult
eg, egCtx := errgroup.WithContext(ctx)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
eg.Go(func() error {
var err error
importResult, err = ti.ImportSelectedRows(egCtx, newSCtx)
return err
})
eg.Go(func() error {
defer close(selectedRowCh)
fields := exec.RetTypes(e.selectExec)
var idAllocator int64
for {
// rows will be consumed concurrently, we cannot use chunk pool in session ctx.
chk := exec.NewFirstChunk(e.selectExec)
iter := chunk.NewIterator4Chunk(chk)
err := exec.Next(egCtx, e.selectExec, chk)
if err != nil {
return err
}
if chk.NumRows() == 0 {
break
}
for innerChunkRow := iter.Begin(); innerChunkRow != iter.End(); innerChunkRow = iter.Next() {
idAllocator++
select {
case selectedRowCh <- importer.QueryRow{
ID: idAllocator,
Data: innerChunkRow.GetDatumRow(fields),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we can send a whole chk to ImportSelectedRows, after chk is finished reading we can put it back to pool

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the memory footprint is not that large here, let gc handle it now.

}:
case <-egCtx.Done():
return egCtx.Err()
}
}
}
return nil
})
if err := eg.Wait(); err != nil {
return err
}

if err2 = flushStats(ctx, e.userSctx, e.importPlan.TableInfo.ID, importResult); err2 != nil {
logutil.Logger(ctx).Error("flush stats failed", zap.Error(err2))
}

stmtCtx := e.userSctx.GetSessionVars().StmtCtx
stmtCtx.SetAffectedRows(importResult.Affected)
// TODO: change it after spec is ready.
stmtCtx.SetMessage(fmt.Sprintf("Records: %d, ID: %s", importResult.Affected, importID))
return nil
}

// ImportIntoActionExec represents a import into action executor.
type ImportIntoActionExec struct {
exec.BaseExecutor
Expand Down Expand Up @@ -297,7 +399,7 @@ func (e *ImportIntoActionExec) checkPrivilegeAndStatus(ctx context.Context, mana
}

// flushStats flushes the stats of the table.
func flushStats(ctx context.Context, se sessionctx.Context, tableID int64, result importer.JobImportResult) error {
func flushStats(ctx context.Context, se sessionctx.Context, tableID int64, result *importer.JobImportResult) error {
if err := sessiontxn.NewTxn(ctx, se); err != nil {
return err
}
Expand Down
Loading