Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support merge addIndex #104

Open
wants to merge 4 commits into
base: multi-schema-change
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func setSessCtxLocation(sctx sessionctx.Context, info *reorgInfo) error {
// 4. Wait all these running tasks finished, then continue to step 3, until all tasks is done.
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType backfillWorkerType, indexInfo *model.IndexInfo, oldColInfo, colInfo *model.ColumnInfo, reorgInfo *reorgInfo) error {
func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType backfillWorkerType, indexInfo []*model.IndexInfo, oldColInfo, colInfo *model.ColumnInfo, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
totalAddedCount := job.GetRowCount()

Expand Down
2 changes: 1 addition & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column
if err != nil {
return errors.Trace(err)
}
err = w.addTableIndex(t, idxes[i], reorgInfo)
err = w.addTableIndex(t, []*model.IndexInfo{idxes[i]}, reorgInfo)
if err != nil {
return errors.Trace(err)
}
Expand Down
66 changes: 40 additions & 26 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,50 +325,64 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
tableID := job.TableID
var indexID int64
var ifExists bool
indexID := make([]int64, 1)
ifExists := make([]bool, 1)
var partitionIDs []int64
if err := job.DecodeArgs(&indexID, &ifExists, &partitionIDs); err != nil {
return errors.Trace(err)
if err := job.DecodeArgs(&indexID[0], &ifExists[0], &partitionIDs); err != nil {
if err = job.DecodeArgs(&indexID, &ifExists, &partitionIDs); err != nil {
return errors.Trace(err)
}
}
if len(partitionIDs) > 0 {
for _, pid := range partitionIDs {
startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1)
elemID := ea.allocForIndexID(pid, indexID)
if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil {
return errors.Trace(err)
for _, idxID := range indexID {
startKey := tablecodec.EncodeTableIndexPrefix(pid, idxID)
endKey := tablecodec.EncodeTableIndexPrefix(pid, idxID+1)
elemID := ea.allocForIndexID(pid, idxID)
if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil {
return errors.Trace(err)
}
}
}
} else {
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
elemID := ea.allocForIndexID(tableID, indexID)
return doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID))
for _, idxID := range indexID {
startKey := tablecodec.EncodeTableIndexPrefix(tableID, idxID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, idxID+1)
elemID := ea.allocForIndexID(tableID, idxID)
return doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID))
}
}
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableID := job.TableID
var indexName interface{}
var ifExists bool
var indexID int64
var partitionIDs []int64
if err := job.DecodeArgs(&indexName, &ifExists, &indexID, &partitionIDs); err != nil {
return errors.Trace(err)
ifExists := make([]bool, 1)
indexID := make([]int64, 1)
if err := job.DecodeArgs(&indexName, &ifExists[0], &indexID[0], &partitionIDs); err != nil {
if err = job.DecodeArgs(&indexName, &ifExists, &indexID, &partitionIDs); err != nil {
return errors.Trace(err)
}
}
if len(partitionIDs) > 0 {
for _, pid := range partitionIDs {
startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1)
elemID := ea.allocForIndexID(pid, indexID)
if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil {
return errors.Trace(err)
for _, idxID := range indexID {
startKey := tablecodec.EncodeTableIndexPrefix(pid, idxID)
endKey := tablecodec.EncodeTableIndexPrefix(pid, idxID+1)
elemID := ea.allocForIndexID(pid, idxID)
if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil {
return errors.Trace(err)
}
}
}
} else {
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
elemID := ea.allocForIndexID(tableID, indexID)
return doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("index ID is %d", indexID))
for _, idxID := range indexID {
startKey := tablecodec.EncodeTableIndexPrefix(tableID, idxID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, idxID+1)
elemID := ea.allocForIndexID(tableID, idxID)
if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("index ID is %d", idxID)); err != nil {
return errors.Trace(err)
}
}
}
case model.ActionDropColumn:
var colName model.CIStr
Expand Down
Loading