Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: batch check the constrains when we add a unique-index. (#7132) #7562

Merged
merged 2 commits into from
Sep 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ LOOP:
s.mustExec(c, "delete from t1 where c1 = ?", i+10)
}
sessionExec(c, s.store, "create index c3_index on t1 (c3)")

s.mustExec(c, "drop table t1")
}

Expand Down
93 changes: 87 additions & 6 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ type indexRecord struct {
handle int64
key []byte // It's used to lock a record. Record it to reduce the encoding time.
vals []types.Datum // It's the index values.
skip bool // skip indicates that the index key is already exists, we should not add it.
}

type addIndexWorker struct {
Expand All @@ -420,9 +421,13 @@ type addIndexWorker struct {
colFieldMap map[int64]*types.FieldType
closed bool

defaultVals []types.Datum // It's used to reduce the number of new slice.
idxRecords []*indexRecord // It's used to reduce the number of new slice.
rowMap map[int64]types.Datum // It's the index column values map. It is used to reduce the number of making map.
// The following attributes are used to reduce memory allocation.
defaultVals []types.Datum
idxRecords []*indexRecord
rowMap map[int64]types.Datum
idxKeyBufs [][]byte
batchCheckKeys []kv.Key
distinctCheckFlags []bool
}

type reorgIndexTask struct {
Expand Down Expand Up @@ -452,7 +457,6 @@ func newAddIndexWorker(sessCtx sessionctx.Context, d *ddl, id int, t table.Table
index: index,
table: t,
colFieldMap: colFieldMap,

defaultVals: make([]types.Datum, len(t.Cols())),
rowMap: make(map[int64]types.Datum, len(colFieldMap)),
}
Expand Down Expand Up @@ -582,6 +586,71 @@ func (w *addIndexWorker) logSlowOperations(elapsed time.Duration, slowMsg string
}
}

func (w *addIndexWorker) initBatchCheckBufs(batchCount int) {
if len(w.idxKeyBufs) < batchCount {
w.idxKeyBufs = make([][]byte, batchCount)
}

w.batchCheckKeys = w.batchCheckKeys[:0]
w.distinctCheckFlags = w.distinctCheckFlags[:0]
}

func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*indexRecord) error {
idxInfo := w.index.Meta()
if !idxInfo.Unique {
// non-unique key need not to check, just overwrite it,
// because in most case, backfilling indices is not exists.
return nil
}

w.initBatchCheckBufs(len(idxRecords))
stmtCtx := w.sessCtx.GetSessionVars().StmtCtx
for i, record := range idxRecords {
idxKey, distinct, err := w.index.GenIndexKey(stmtCtx, record.vals, record.handle, w.idxKeyBufs[i])
if err != nil {
return errors.Trace(err)
}
// save the buffer to reduce memory allocations.
w.idxKeyBufs[i] = idxKey

w.batchCheckKeys = append(w.batchCheckKeys, idxKey)
w.distinctCheckFlags = append(w.distinctCheckFlags, distinct)
}

batchVals, err := kv.BatchGetValues(txn, w.batchCheckKeys)
if err != nil {
return errors.Trace(err)
}

// 1. unique-key is duplicate and the handle is equal, skip it.
// 2. unique-key is duplicate and the handle is not equal, return duplicate error.
// 3. non-unique-key is duplicate, skip it.
for i, key := range w.batchCheckKeys {
if val, found := batchVals[string(key)]; found {
if w.distinctCheckFlags[i] {
handle, err1 := tables.DecodeHandle(val)
if err1 != nil {
return errors.Trace(err1)
}

if handle != idxRecords[i].handle {
return errors.Trace(kv.ErrKeyExists)
}
}
idxRecords[i].skip = true
} else {
// The keys in w.batchCheckKeys also maybe duplicate,
// so we need to backfill the not found key into `batchVals` map.
if w.distinctCheckFlags[i] {
batchVals[string(key)] = tables.EncodeHandle(idxRecords[i].handle)
}
}
}
// Constrains is already checked.
stmtCtx.BatchCheck = true
return nil
}

// backfillIndexInTxn will backfill table index in a transaction, lock corresponding rowKey, if the value of rowKey is changed,
// indicate that index columns values may changed, index is not allowed to be added, so the txn will rollback and retry.
// backfillIndexInTxn will add w.batchCnt indices once, default value of w.batchCnt is 128.
Expand All @@ -601,15 +670,27 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (nextHan
return errors.Trace(err)
}

err = w.batchCheckUniqueKey(txn, idxRecords)
if err != nil {
return errors.Trace(err)
}

for _, idxRecord := range idxRecords {
scanCount++
// The index is already exists, we skip it, no needs to backfill it.
// The following update, delete, insert on these rows, TiDB can handle it correctly.
if idxRecord.skip {
continue
}

// Lock the row key to notify us that someone delete or update the row,
// then we should not backfill the index of it, otherwise the adding index is redundant.
err := txn.LockKeys(idxRecord.key)
if err != nil {
return errors.Trace(err)
}
scanCount++

// Create the index.
// TODO: backfill unique-key will check constraint every row, we can speed up this case by using batch check.
handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle)
if err != nil {
if kv.ErrKeyExists.Equal(err) && idxRecord.handle == handle {
Expand Down
16 changes: 16 additions & 0 deletions ddl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,22 @@ func (s *testIntegrationSuite) TestCreateTableIfNotExists(c *C) {
c.Assert(terror.ErrorEqual(infoschema.ErrTableExists, lastWarn.Err), IsTrue)
}

func (s *testIntegrationSuite) TestUniquekeyNullValue(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("USE test")

tk.MustExec("create table t(a int primary key, b varchar(255))")

tk.MustExec("insert into t values(1, NULL)")
tk.MustExec("insert into t values(2, NULL)")
tk.MustExec("alter table t add unique index b(b);")
res := tk.MustQuery("select count(*) from t use index(b);")
res.Check(testkit.Rows("2"))
tk.MustExec("admin check table t")
tk.MustExec("admin check index t b")
}

func (s *testIntegrationSuite) TestEndIncluded(c *C) {
tk := testkit.NewTestKit(c, s.store)

Expand Down