Skip to content

Commit

Permalink
load data: physical mode part 4 (#43162)
Browse files Browse the repository at this point in the history
ref #42930
  • Loading branch information
D3Hunter authored Apr 19, 2023
1 parent fc5392e commit 2aa72b7
Show file tree
Hide file tree
Showing 21 changed files with 320 additions and 34 deletions.
9 changes: 9 additions & 0 deletions br/pkg/lightning/backend/kv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type BaseKVEncoder struct {

logger *zap.Logger
recordCache []types.Datum
// the first auto-generated ID in the current encoder.
// if there's no auto-generated id column or the column value is not auto-generated, it will be 0.
LastInsertID uint64
}

// NewBaseKVEncoder creates a new BaseKVEncoder.
Expand Down Expand Up @@ -241,6 +244,9 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(e.SessionCtx,
types.NewIntDatum(rowID), col.ToInfo(), false, false)
if err == nil && e.LastInsertID == 0 {
e.LastInsertID = value.GetUint64()
}
case e.IsAutoRandomCol(col.ToInfo()):
var val types.Datum
realRowID := e.AutoIDFn(rowID)
Expand All @@ -250,6 +256,9 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu
val = types.NewIntDatum(realRowID)
}
value, err = table.CastValue(e.SessionCtx, val, col.ToInfo(), false, false)
if err == nil && e.LastInsertID == 0 {
e.LastInsertID = value.GetUint64()
}
case col.IsGenerated():
// inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil.
// if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic.
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//br/pkg/lightning/metric",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/tikv",
"//br/pkg/lightning/verification",
"//br/pkg/logutil",
"//br/pkg/membuf",
"//br/pkg/pdutil",
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/lightning/backend/local/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tipb/go-tipb"
Expand Down Expand Up @@ -60,6 +61,13 @@ type RemoteChecksum struct {
TotalBytes uint64
}

// IsEqual checks whether the checksum is equal to the other.
func (rc *RemoteChecksum) IsEqual(other *verification.KVChecksum) bool {
return rc.Checksum == other.Sum() &&
rc.TotalKVs == other.SumKVS() &&
rc.TotalBytes == other.SumSize()
}

// ChecksumManager is a manager that manages checksums.
type ChecksumManager interface {
Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error)
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1726,6 +1726,11 @@ func (local *Backend) EngineFileSizes() (res []backend.EngineFileSize) {
return
}

// GetPDClient returns the PD client.
func (local *Backend) GetPDClient() pd.Client {
return local.pdCtl.GetPDClient()
}

var getSplitConfFromStoreFunc = getSplitConfFromStore

// return region split size, region split keys, error
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/verification/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (c *KVChecksum) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
}

// MarshalJSON implements the json.Marshaler interface.
func (c KVChecksum) MarshalJSON() ([]byte, error) {
func (c *KVChecksum) MarshalJSON() ([]byte, error) {
result := fmt.Sprintf(`{"checksum":%d,"size":%d,"kvs":%d}`, c.checksum, c.bytes, c.kvs)
return []byte(result), nil
}
3 changes: 2 additions & 1 deletion disttask/loaddata/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func generateSubtaskMetas(ctx context.Context, taskMeta *TaskMeta) ([]*SubtaskMe
if err != nil {
return nil, err
}
controller, err := importer.NewLoadDataController(&taskMeta.Plan, tbl)
// todo: use real session context
controller, err := importer.NewLoadDataController(nil, &taskMeta.Plan, tbl)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion disttask/loaddata/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func (s *ImportScheduler) InitSubtaskExecEnv(ctx context.Context) error {
if err != nil {
return err
}
controller, err := importer.NewLoadDataController(&s.taskMeta.Plan, tbl)
// todo: use real session context
controller, err := importer.NewLoadDataController(nil, &s.taskMeta.Plan, tbl)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ go_test(
"join_pkg_test.go",
"join_test.go",
"joiner_test.go",
"load_data_test.go",
"main_test.go",
"memtable_reader_test.go",
"merge_join_test.go",
Expand Down
3 changes: 3 additions & 0 deletions executor/asyncloaddata/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type LogicalImportProgress struct {

// PhysicalImportProgress is the progress info of the physical import mode.
type PhysicalImportProgress struct {
// ReadRowCnt is the number of rows read from data files.
// Lines ignored by IGNORE N LINES clause is not included.
ReadRowCnt atomic.Uint64
// EncodeFileSize is the size of the file that has finished KV encoding in bytes.
// it should equal to SourceFileSize eventually.
EncodeFileSize atomic.Int64
Expand Down
6 changes: 4 additions & 2 deletions executor/asyncloaddata/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,18 @@ func TestProgressMarshalUnmarshal(t *testing.T) {
p = NewProgress(false)
require.Nil(t, p.LogicalImportProgress)
p.SourceFileSize = 123
p.ReadRowCnt.Store(790)
p.EncodeFileSize.Store(100)
p.LoadedRowCnt.Store(789)

s = p.String()
require.Equal(t, `{"SourceFileSize":123,"EncodeFileSize":100,"LoadedRowCnt":789}`, s)
require.Equal(t, `{"SourceFileSize":123,"ReadRowCnt":790,"EncodeFileSize":100,"LoadedRowCnt":789}`, s)

s2 = `{"SourceFileSize":111,"EncodeFileSize":222,"LoadedRowCnt":333}`
s2 = `{"SourceFileSize":111,"ReadRowCnt":790,"EncodeFileSize":222,"LoadedRowCnt":333}`
p2, err = ProgressFromJSON([]byte(s2))
require.NoError(t, err)
require.Equal(t, int64(111), p2.SourceFileSize)
require.Equal(t, uint64(790), p2.ReadRowCnt.Load())
require.Equal(t, int64(222), p2.EncodeFileSize.Load())
require.Equal(t, uint64(333), p2.LoadedRowCnt.Load())
}
5 changes: 3 additions & 2 deletions executor/importer/chunk_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func (p *chunkProcessor) encodeLoop(ctx context.Context, deliverCompleteCh <-cha
return err
}

p.progress.ReadRowCnt.Inc()
rowBatch = append(rowBatch, deliveredRow{kvs: kvs, offset: newOffset})
kvSize += kvs.Size()
// pebble cannot allow > 4.0G kv in one batch.
Expand Down Expand Up @@ -299,8 +300,8 @@ func (p *chunkProcessor) deliverLoop(ctx context.Context) error {
p.progress.EncodeFileSize.Add(currOffset - prevOffset)
prevOffset = currOffset

p.checksum.Add(kvBatch.dataChecksum)
p.checksum.Add(kvBatch.indexChecksum)
p.chunkInfo.Checksum.Add(kvBatch.dataChecksum)
p.chunkInfo.Checksum.Add(kvBatch.indexChecksum)

kvBatch.reset()
}
Expand Down
1 change: 1 addition & 0 deletions executor/importer/engine_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func ProcessChunk(
}
// chunk process is responsible to close data/index writer
cp.close(ctx)
tableImporter.setLastInsertID(encoder.GetLastInsertID())
return nil
}

Expand Down
13 changes: 12 additions & 1 deletion executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ type Plan struct {
SplitFile bool
MaxRecordedErrors int64
Detached bool

DistSQLScanConcurrency int
}

// LoadDataController load data controller.
Expand Down Expand Up @@ -220,6 +222,10 @@ type LoadDataController struct {
dataFiles []*mydump.SourceFileMeta
// total data file size in bytes, only initialized when load from remote.
TotalFileSize int64
// user session context. DO NOT use it if load is in DETACHED mode.
UserCtx sessionctx.Context
// used for checksum in physical mode
distSQLScanConcurrency int
}

func getImportantSysVars(sctx sessionctx.Context) map[string]string {
Expand Down Expand Up @@ -284,6 +290,8 @@ func NewPlan(userSctx sessionctx.Context, plan *plannercore.LoadData, tbl table.
SQLMode: userSctx.GetSessionVars().SQLMode,
Charset: charset,
ImportantSysVars: getImportantSysVars(userSctx),

DistSQLScanConcurrency: userSctx.GetSessionVars().DistSQLScanConcurrency(),
}
if err := p.initOptions(userSctx, plan.Options); err != nil {
return nil, err
Expand All @@ -292,7 +300,7 @@ func NewPlan(userSctx sessionctx.Context, plan *plannercore.LoadData, tbl table.
}

// NewLoadDataController create new controller.
func NewLoadDataController(plan *Plan, tbl table.Table) (*LoadDataController, error) {
func NewLoadDataController(userCtx sessionctx.Context, plan *Plan, tbl table.Table) (*LoadDataController, error) {
fullTableName := common.UniqueTable(plan.TableName.Schema.L, plan.TableName.Name.L)
logger := log.L().With(zap.String("table", fullTableName))
c := &LoadDataController{
Expand Down Expand Up @@ -324,6 +332,9 @@ func NewLoadDataController(plan *Plan, tbl table.Table) (*LoadDataController, er
sqlMode: plan.SQLMode,
charset: plan.Charset,
importantSysVars: plan.ImportantSysVars,
UserCtx: userCtx,

distSQLScanConcurrency: plan.DistSQLScanConcurrency,
}
if err := c.initFieldParams(plan); err != nil {
return nil, err
Expand Down
8 changes: 8 additions & 0 deletions executor/importer/kv_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import (

type kvEncoder interface {
Encode(row []types.Datum, rowID int64) (*kv.Pairs, error)
// GetLastInsertID returns the first auto-generated ID in the current encoder.
// if there's no auto-generated id column or the column value is not auto-generated, it will be 0.
GetLastInsertID() uint64
io.Closer
}

Expand Down Expand Up @@ -82,6 +85,11 @@ func (en *tableKVEncoder) Encode(row []types.Datum, rowID int64) (*kv.Pairs, err
return en.Record2KV(record, row, rowID)
}

// GetLastInsertID implements the kvEncoder interface.
func (en *tableKVEncoder) GetLastInsertID() uint64 {
return en.LastInsertID
}

// todo merge with code in load_data.go
func (en *tableKVEncoder) parserData2TableData(parserData []types.Datum, rowID int64) ([]types.Datum, error) {
row := make([]types.Datum, 0, len(en.insertColumns))
Expand Down
76 changes: 71 additions & 5 deletions executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,22 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/storage"
tidb "github.com/pingcap/tidb/config"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util"
"go.uber.org/multierr"
"go.uber.org/zap"
)

func prepareSortDir(e *LoadDataController) (string, error) {
func prepareSortDir(e *LoadDataController, jobID int64) (string, error) {
tidbCfg := tidb.GetGlobalConfig()
// todo: add job id too
sortPathSuffix := "import-" + strconv.Itoa(int(tidbCfg.Port))
sortPath := filepath.Join(tidbCfg.TempDir, sortPathSuffix)
sortPath := filepath.Join(tidbCfg.TempDir, sortPathSuffix, strconv.FormatInt(jobID, 10))

if info, err := os.Stat(sortPath); err != nil {
if !os.IsNotExist(err) {
Expand Down Expand Up @@ -83,7 +84,7 @@ func NewTableImporter(param *JobImportParam, e *LoadDataController) (ti *TableIm
}

tidbCfg := tidb.GetGlobalConfig()
dir, err := prepareSortDir(e)
dir, err := prepareSortDir(e, param.Job.ID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -197,6 +198,9 @@ type TableImporter struct {
logger *zap.Logger
regionSplitSize int64
regionSplitKeys int64
// the smallest auto-generated ID in current import.
// if there's no auto-generated id column or the column value is not auto-generated, it will be 0.
lastInsertID uint64
}

var _ JobImporter = &TableImporter{}
Expand All @@ -216,7 +220,23 @@ func (ti *TableImporter) Import() {

// Result implements JobImporter.Result.
func (ti *TableImporter) Result() string {
return ""
var (
numWarnings uint64
numRecords uint64
numDeletes uint64
numSkipped uint64
)
numRecords = ti.Progress.ReadRowCnt.Load()
// todo: we don't have a strict REPLACE or IGNORE mode in physical mode, so we can't get the numDeletes/numSkipped.
// we can have it when there's duplicate detection.
msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrLoadInfo].Raw, numRecords, numDeletes, numSkipped, numWarnings)
if !ti.Detached {
userStmtCtx := ti.UserCtx.GetSessionVars().StmtCtx
userStmtCtx.SetMessage(msg)
userStmtCtx.SetAffectedRows(ti.Progress.LoadedRowCnt.Load())
userStmtCtx.LastInsertID = ti.lastInsertID
}
return msg
}

func (ti *TableImporter) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) {
Expand Down Expand Up @@ -267,7 +287,44 @@ func (ti *TableImporter) importTable(ctx context.Context) error {
if err := ti.importEngines(ctx); err != nil {
return err
}
return ti.postProcess(ctx)
}

func (ti *TableImporter) postProcess(ctx context.Context) error {
// todo: post process
if ti.checksum != config.OpLevelOff {
return ti.checksumTable(ctx)
}
return nil
}

func (ti *TableImporter) checksumTable(ctx context.Context) error {
var localChecksum verify.KVChecksum
for _, engine := range ti.tableCp.Engines {
for _, chunk := range engine.Chunks {
localChecksum.Add(&chunk.Checksum)
}
}
ti.logger.Info("local checksum", zap.Object("checksum", &localChecksum))
manager := local.NewTiKVChecksumManager(ti.kvStore.GetClient(), ti.backend.GetPDClient(), uint(ti.distSQLScanConcurrency))
remoteChecksum, err := manager.Checksum(ctx, ti.tableInfo)
if err != nil {
return err
}
if remoteChecksum.IsEqual(&localChecksum) {
ti.logger.Info("checksum pass", zap.Object("local", &localChecksum))
} else {
err2 := common.ErrChecksumMismatch.GenWithStackByArgs(
remoteChecksum.Checksum, localChecksum.Sum(),
remoteChecksum.TotalKVs, localChecksum.SumKVS(),
remoteChecksum.TotalBytes, localChecksum.SumSize(),
)
if ti.checksum == config.OpLevelOptional {
ti.logger.Warn("compare checksum failed, will skip this error and go on", log.ShortError(err2))
err2 = nil
}
return err2
}
return nil
}

Expand Down Expand Up @@ -442,3 +499,12 @@ func (ti *TableImporter) Close() error {
ti.backend.Close()
return nil
}

func (ti *TableImporter) setLastInsertID(id uint64) {
if id == 0 {
return
}
if ti.lastInsertID == 0 || id < ti.lastInsertID {
ti.lastInsertID = id
}
}
Loading

0 comments on commit 2aa72b7

Please sign in to comment.