Skip to content

Commit

Permalink
Merge pull request pingcap#8 from lichunzhu/hack53addTSO
Browse files Browse the repository at this point in the history
resolve dml problem and unique index problem
  • Loading branch information
lichunzhu committed Jan 9, 2022
2 parents cc662a4 + 13323db commit 9438b99
Show file tree
Hide file tree
Showing 15 changed files with 153 additions and 119 deletions.
3 changes: 1 addition & 2 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,12 @@ type KvPairs struct {
memBuf *kvMemBuf
}

func NewKvPairs(kvs []common.KvPair)*KvPairs{
func NewKvPairs(kvs []common.KvPair) *KvPairs {
return &KvPairs{
pairs: kvs,
}
}


// MakeRowsFromKvPairs converts a KvPair slice into a Rows instance. This is
// mainly used for testing only. The resulting Rows instance should only be used
// for the importer backend.
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type DuplicateManager struct {
keyAdapter KeyAdapter
remoteWorkerPool *utilpool.WorkerPool
opts *kv.SessionOptions
duplicateAbort bool
}

type pendingIndexHandles struct {
Expand Down Expand Up @@ -192,6 +193,7 @@ func NewDuplicateManager(local *local, ts uint64, opts *kv.SessionOptions) (*Dup
regionConcurrency: local.tcpConcurrency,
splitCli: local.splitCli,
tikvCli: local.tikvCli,
duplicateAbort: local.duplicateAbort,
keyAdapter: duplicateKeyAdapter{},
ts: ts,
connPool: common.NewGRPCConns(),
Expand Down Expand Up @@ -327,6 +329,10 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context,
}
hasErr = true
}
if manager.duplicateAbort && len(resp.Pairs) > 0 {
hasDupe.Store(true)
return tidbkv.ErrKeyExists.FastGenByArgs(resp.Pairs[0].String(), "unknown")
}

if hasErr || resp.GetKeyError() != nil {
r, err := manager.splitCli.GetRegionByID(ctx, region.Region.GetId())
Expand Down
21 changes: 15 additions & 6 deletions br/pkg/lightning/backend/local/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/logutil"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/codec"
)

Expand Down Expand Up @@ -60,6 +61,7 @@ type duplicateIter struct {
writeBatch *pebble.Batch
writeBatchSize int64
logger log.Logger
duplicateAbort bool
}

func (d *duplicateIter) Seek(key []byte) bool {
Expand Down Expand Up @@ -118,6 +120,7 @@ func (d *duplicateIter) Next() bool {
if d.err != nil {
return false
}
//d.logger.Debug("[detect-debug] current key", zap.ByteString("curKey", d.curKey))
if !bytes.Equal(d.nextKey, d.curKey) {
d.curKey, d.nextKey = d.nextKey, d.curKey[:0]
d.curRawKey = append(d.curRawKey[:0], d.iter.Key()...)
Expand All @@ -128,6 +131,10 @@ func (d *duplicateIter) Next() bool {
logutil.Key("key", d.curKey),
logutil.Key("prevValue", d.curVal),
logutil.Key("value", d.iter.Value()))
if d.duplicateAbort {
d.err = tidbkv.ErrKeyExists.FastGenByArgs(d.curKey, "unknown")
return false
}
if !recordFirst {
d.record(d.curRawKey, d.curVal)
recordFirst = true
Expand Down Expand Up @@ -183,12 +190,13 @@ func newDuplicateIter(ctx context.Context, engineFile *File, opts *pebble.IterOp
zap.Int64("tableID", engineFile.tableInfo.ID),
zap.Stringer("engineUUID", engineFile.UUID))
return &duplicateIter{
ctx: ctx,
iter: engineFile.db.NewIter(newOpts),
engineFile: engineFile,
keyAdapter: engineFile.keyAdapter,
writeBatch: engineFile.duplicateDB.NewBatch(),
logger: logger,
ctx: ctx,
iter: engineFile.db.NewIter(newOpts),
engineFile: engineFile,
keyAdapter: engineFile.keyAdapter,
writeBatch: engineFile.duplicateDB.NewBatch(),
logger: logger,
duplicateAbort: engineFile.duplicateAbort,
}
}

Expand All @@ -198,6 +206,7 @@ func newKeyIter(ctx context.Context, engineFile *File, opts *pebble.IterOptions)
newOpts.LowerBound = normalIterStartKey
opts = &newOpts
}
//log.L().Info("duplicate detectStatus", zap.Bool("detect", engineFile.duplicateDetection), zap.Bool("abort", engineFile.duplicateAbort))
if !engineFile.duplicateDetection {
return pebbleIter{Iterator: engineFile.db.NewIter(opts)}
}
Expand Down
16 changes: 14 additions & 2 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -74,6 +75,7 @@ import (
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/utils/utilmath"
"github.com/pingcap/tidb/br/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -205,6 +207,7 @@ type File struct {

keyAdapter KeyAdapter
duplicateDetection bool
duplicateAbort bool
duplicateDB *pebble.DB
errorMgr *errormanager.ErrorManager
}
Expand Down Expand Up @@ -825,6 +828,7 @@ type local struct {

checkTiKVAvaliable bool
duplicateDetection bool
duplicateAbort bool
duplicateDB *pebble.DB
errorMgr *errormanager.ErrorManager
}
Expand Down Expand Up @@ -976,6 +980,7 @@ func NewLocalBackend(
engineMemCacheSize: int(cfg.TikvImporter.EngineMemCacheSize),
localWriterMemCacheSize: int64(cfg.TikvImporter.LocalWriterMemCacheSize),
duplicateDetection: cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone,
duplicateAbort: cfg.TikvImporter.DuplicateResolution == config.DupeResAlgAbort,
checkTiKVAvaliable: cfg.App.CheckRequirements,
duplicateDB: duplicateDB,
errorMgr: errorMgr,
Expand Down Expand Up @@ -1304,12 +1309,14 @@ func (local *local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, e
config: engineCfg,
tableInfo: cfg.TableInfo,
duplicateDetection: local.duplicateDetection,
duplicateAbort: local.duplicateAbort,
duplicateDB: local.duplicateDB,
errorMgr: local.errorMgr,
keyAdapter: keyAdapter,
})
engine := e.(*File)
engine.db = db
engine.TS = cfg.TableInfo.TSO
engine.sstIngester = dbSSTIngester{e: engine}
if err = engine.loadEngineMeta(); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1352,6 +1359,7 @@ func (local *local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig,
sstMetasChan: make(chan metaOrFlush),
tableInfo: cfg.TableInfo,
duplicateDetection: local.duplicateDetection,
duplicateAbort: local.duplicateAbort,
duplicateDB: local.duplicateDB,
errorMgr: local.errorMgr,
}
Expand Down Expand Up @@ -1531,6 +1539,8 @@ func (local *local) WriteToTiKV(
}
count++
totalCount++
//log.L().Info("[debug-iter] iterator count", zap.Int("count", count), zap.Int64("totalCount", totalCount),
// zap.ByteString("key", iter.Key()), zap.ByteString("value", iter.Value()))

if count >= local.batchWriteKVPairs {
for i := range clients {
Expand Down Expand Up @@ -1792,7 +1802,7 @@ WriteAndIngest:
err = local.writeAndIngestPairs(ctx, engineFile, region, pairStart, end, regionSplitSize, regionSplitKeys)
local.ingestConcurrency.Recycle(w)
if err != nil {
if common.IsContextCanceledError(err) {
if common.IsContextCanceledError(err) || tidbkv.ErrKeyExists.Equal(err) || strings.Contains(err.Error(), strconv.FormatInt(mysql.ErrDupEntry, 10)) {
return err
}
_, regionStart, _ := codec.DecodeBytes(region.Region.StartKey, []byte{})
Expand Down Expand Up @@ -1839,7 +1849,7 @@ loopWrite:
var rangeStats rangeStats
metas, finishedRange, rangeStats, err = local.WriteToTiKV(ctx, engineFile, region, start, end, regionSplitSize, regionSplitKeys)
if err != nil {
if common.IsContextCanceledError(err) {
if common.IsContextCanceledError(err) || tidbkv.ErrKeyExists.Equal(err) || strings.Contains(err.Error(), strconv.FormatInt(mysql.ErrDupEntry, 10)) {
return err
}

Expand Down Expand Up @@ -1979,6 +1989,8 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engineFile *File
err = local.writeAndIngestByRange(ctx, engineFile, startKey, endKey, regionSplitSize, regionSplitKeys)
if err == nil || common.IsContextCanceledError(err) {
return
} else if tidbkv.ErrKeyExists.Equal(err) || strings.Contains(err.Error(), strconv.FormatInt(mysql.ErrDupEntry, 10)) {
break
}
log.L().Warn("write and ingest by range failed",
zap.Int("retry time", i+1), log.ShortError(err))
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/checkpoints/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type TidbDBInfo struct {

type TidbTableInfo struct {
ID int64
TSO uint64
DB string
Name string
Core *model.TableInfo
Expand Down
9 changes: 7 additions & 2 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ import (
"github.com/pingcap/errors"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
tidbcfg "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/mysql"
"go.uber.org/atomic"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -367,6 +368,10 @@ const (
// DupeResAlgRemove records all duplicate records like the 'record' algorithm and remove all information related to the
// duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v1 table to add back the correct rows.
DupeResAlgRemove

// DupeResAlgAbort records all duplicate records like the 'record' algorithm and remove all information related to the
// duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v1 table to add back the correct rows.
DupeResAlgAbort
)

func (dra *DuplicateResolutionAlgorithm) UnmarshalTOML(v interface{}) error {
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/utils/utildb/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"google.golang.org/grpc/status"

tmysql "github.com/pingcap/tidb/errno"
tidbkv "github.com/pingcap/tidb/kv"
)

var retryableServerError = []string{
Expand Down Expand Up @@ -111,6 +112,9 @@ func isSingleRetryableError(err error) bool {
case nil, context.Canceled, context.DeadlineExceeded, io.EOF, sql.ErrNoRows:
return false
}
if tidbkv.ErrKeyExists.Equal(err) || strings.Contains(err.Error(), "1062") {
return false
}

switch nerr := err.(type) {
case net.Error:
Expand Down
5 changes: 3 additions & 2 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"

ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand All @@ -39,8 +42,6 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

type backfillWorkerType byte
Expand Down
11 changes: 9 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"

"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand All @@ -41,8 +44,6 @@ import (
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/topsql"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -774,8 +775,14 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onSetDefaultValue(t, job)
case model.ActionAddIndex:
ver, err = w.onCreateIndex(d, t, job, false)
if err != nil {
logutil.BgLogger().Error("[ddl] fail to run add index job", zap.Error(err))
}
case model.ActionAddPrimaryKey:
ver, err = w.onCreateIndex(d, t, job, true)
if err != nil {
logutil.BgLogger().Error("[ddl] fail to run add index job", zap.Error(err))
}
case model.ActionDropIndex, model.ActionDropPrimaryKey:
ver, err = onDropIndex(t, job)
case model.ActionDropIndexes:
Expand Down
Loading

0 comments on commit 9438b99

Please sign in to comment.