-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
importinto: executor part of import from select #50341
Changes from 5 commits
d82cc9a
0c8051f
4ddb22d
84aa013
f79fd03
7659774
f3748a6
87c73b0
6c1c867
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,6 @@ import ( | |
"io" | ||
"math" | ||
"net" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
"sync" | ||
|
@@ -517,24 +516,6 @@ func NewBackend( | |
return nil, common.NormalizeOrWrapErr(common.ErrCreatePDClient, err) | ||
} | ||
|
||
shouldCreate := true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved to engine-mgr, below |
||
if config.CheckpointEnabled { | ||
if info, err := os.Stat(config.LocalStoreDir); err != nil { | ||
if !os.IsNotExist(err) { | ||
return nil, err | ||
} | ||
} else if info.IsDir() { | ||
shouldCreate = false | ||
} | ||
} | ||
|
||
if shouldCreate { | ||
err = os.Mkdir(config.LocalStoreDir, 0o700) | ||
if err != nil { | ||
return nil, common.ErrInvalidSortedKVDir.Wrap(err).GenWithStackByArgs(config.LocalStoreDir) | ||
} | ||
} | ||
|
||
// The following copies tikv.NewTxnClient without creating yet another pdClient. | ||
spkv, err := tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig()) | ||
if err != nil { | ||
|
@@ -597,6 +578,27 @@ func NewBackend( | |
return local, nil | ||
} | ||
|
||
// NewBackendForTest creates a new Backend for test. | ||
func NewBackendForTest(ctx context.Context, config BackendConfig, storeHelper StoreHelper) (*Backend, error) { | ||
config.adjust() | ||
|
||
logger := log.FromContext(ctx) | ||
engineMgr, err := newEngineManager(config, storeHelper, logger) | ||
if err != nil { | ||
return nil, err | ||
} | ||
local := &Backend{ | ||
BackendConfig: config, | ||
logger: logger, | ||
engineMgr: engineMgr, | ||
} | ||
if m, ok := metric.GetCommonMetric(ctx); ok { | ||
local.metrics = m | ||
} | ||
|
||
return local, nil | ||
} | ||
|
||
// TotalMemoryConsume returns the total memory usage of the local backend. | ||
func (local *Backend) TotalMemoryConsume() int64 { | ||
return local.engineMgr.totalMemoryConsume() | ||
|
@@ -672,14 +674,6 @@ func (local *Backend) Close() { | |
local.engineMgr.close() | ||
local.importClientFactory.Close() | ||
|
||
// if checkpoint is disabled, or we finish load all data successfully, then files in this | ||
// dir will be useless, so we clean up this dir and all files in it. | ||
if !local.CheckpointEnabled || common.IsEmptyDir(local.LocalStoreDir) { | ||
err := os.RemoveAll(local.LocalStoreDir) | ||
if err != nil { | ||
local.logger.Warn("remove local db file failed", zap.Error(err)) | ||
} | ||
} | ||
_ = local.tikvCli.Close() | ||
local.pdHTTPCli.Close() | ||
local.pdCli.Close() | ||
|
@@ -1592,6 +1586,12 @@ func (local *Backend) GetTiKVCodec() tikvclient.Codec { | |
return local.tikvCodec | ||
} | ||
|
||
// CloseEngineMgr close the engine manager. | ||
// This function is used for test. | ||
func (local *Backend) CloseEngineMgr() { | ||
local.engineMgr.close() | ||
} | ||
|
||
var getSplitConfFromStoreFunc = getSplitConfFromStore | ||
|
||
// return region split size, region split keys, error | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,8 +16,10 @@ package executor | |
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync/atomic" | ||
|
||
"github.com/google/uuid" | ||
"github.com/pingcap/errors" | ||
"github.com/pingcap/failpoint" | ||
"github.com/pingcap/tidb/br/pkg/lightning/log" | ||
|
@@ -105,6 +107,11 @@ func (e *ImportIntoExec) Next(ctx context.Context, req *chunk.Chunk) (err error) | |
e.importPlan = importPlan | ||
e.controller = controller | ||
|
||
if e.selectExec != nil { | ||
// `import from select` doesn't return rows, so no need to set dataFilled. | ||
return e.importFromSelect(ctx) | ||
} | ||
|
||
if err2 := e.controller.InitDataFiles(ctx); err2 != nil { | ||
return err2 | ||
} | ||
|
@@ -237,12 +244,106 @@ func (e *ImportIntoExec) doImport(ctx context.Context, se sessionctx.Context, di | |
// use background, since ctx is canceled already. | ||
return cancelAndWaitImportJob(context.Background(), taskManager, distImporter.JobID()) | ||
} | ||
if err2 := flushStats(ctx, se, e.importPlan.TableInfo.ID, distImporter.Result(ctx)); err2 != nil { | ||
importResult := distImporter.Result(ctx) | ||
if err2 := flushStats(ctx, se, e.importPlan.TableInfo.ID, &importResult); err2 != nil { | ||
logutil.Logger(ctx).Error("flush stats failed", zap.Error(err2)) | ||
} | ||
return err | ||
} | ||
|
||
func (e *ImportIntoExec) importFromSelect(ctx context.Context) error { | ||
e.dataFilled = true | ||
// must use a new session to pre-check, else the stmt in show processlist will be changed. | ||
newSCtx, err2 := CreateSession(e.userSctx) | ||
if err2 != nil { | ||
return err2 | ||
} | ||
defer CloseSession(newSCtx) | ||
|
||
sqlExec := newSCtx.(sqlexec.SQLExecutor) | ||
if err2 = e.controller.CheckRequirements(ctx, sqlExec); err2 != nil { | ||
return err2 | ||
} | ||
if err := e.importPlan.InitTiKVConfigs(ctx, newSCtx); err != nil { | ||
return err | ||
} | ||
|
||
group, groupCtx := errgroup.WithContext(ctx) | ||
param := &importer.JobImportParam{ | ||
Job: &importer.Job{}, | ||
Group: group, | ||
GroupCtx: groupCtx, | ||
Done: make(chan struct{}), | ||
Progress: importer.NewProgress(), | ||
} | ||
importID := uuid.New().String() | ||
logutil.Logger(ctx).Info("importing data from select statement", | ||
zap.String("import-id", importID), zap.Int("concurrency", e.controller.ThreadCnt), | ||
zap.String("target-table", e.controller.FullTableName()), | ||
zap.Int64("target-table-id", e.controller.TableInfo.ID)) | ||
ti, err2 := importer.NewTableImporter(param, e.controller, importID) | ||
if err2 != nil { | ||
return err2 | ||
} | ||
defer func() { | ||
if err := ti.Close(); err != nil { | ||
logutil.Logger(ctx).Error("close importer failed", zap.Error(err)) | ||
} | ||
}() | ||
selectedRowCh := make(chan importer.QueryRow) | ||
ti.SetSelectedRowCh(selectedRowCh) | ||
|
||
var importResult *importer.JobImportResult | ||
eg, egCtx := errgroup.WithContext(ctx) | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
eg.Go(func() error { | ||
var err error | ||
importResult, err = ti.ImportSelectedRows(egCtx, newSCtx) | ||
return err | ||
}) | ||
eg.Go(func() error { | ||
defer close(selectedRowCh) | ||
fields := exec.RetTypes(e.selectExec) | ||
var idAllocator int64 | ||
for { | ||
// rows will be consumed concurrently, we cannot use chunk pool in session ctx. | ||
chk := exec.NewFirstChunk(e.selectExec) | ||
iter := chunk.NewIterator4Chunk(chk) | ||
err := exec.Next(egCtx, e.selectExec, chk) | ||
if err != nil { | ||
return err | ||
} | ||
if chk.NumRows() == 0 { | ||
break | ||
} | ||
for innerChunkRow := iter.Begin(); innerChunkRow != iter.End(); innerChunkRow = iter.Next() { | ||
idAllocator++ | ||
select { | ||
case selectedRowCh <- importer.QueryRow{ | ||
ID: idAllocator, | ||
Data: innerChunkRow.GetDatumRow(fields), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we can send a whole There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the memory footprint is not that large here, let gc handle it now. |
||
}: | ||
case <-egCtx.Done(): | ||
return egCtx.Err() | ||
} | ||
} | ||
} | ||
return nil | ||
}) | ||
if err := eg.Wait(); err != nil { | ||
return err | ||
} | ||
|
||
if err2 = flushStats(ctx, e.userSctx, e.importPlan.TableInfo.ID, importResult); err2 != nil { | ||
logutil.Logger(ctx).Error("flush stats failed", zap.Error(err2)) | ||
} | ||
|
||
stmtCtx := e.userSctx.GetSessionVars().StmtCtx | ||
stmtCtx.SetAffectedRows(importResult.Affected) | ||
// TODO: change it after spec is ready. | ||
stmtCtx.SetMessage(fmt.Sprintf("Records: %d, ID: %s", importResult.Affected, importID)) | ||
return nil | ||
} | ||
|
||
// ImportIntoActionExec represents a import into action executor. | ||
type ImportIntoActionExec struct { | ||
exec.BaseExecutor | ||
|
@@ -297,7 +398,7 @@ func (e *ImportIntoActionExec) checkPrivilegeAndStatus(ctx context.Context, mana | |
} | ||
|
||
// flushStats flushes the stats of the table. | ||
func flushStats(ctx context.Context, se sessionctx.Context, tableID int64, result importer.JobImportResult) error { | ||
func flushStats(ctx context.Context, se sessionctx.Context, tableID int64, result *importer.JobImportResult) error { | ||
if err := sessiontxn.NewTxn(ctx, se); err != nil { | ||
return err | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use MkdirAll?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copied from
local backend
ctor