From 3473330dbfd675c8ff11f4f568add3f119e47de9 Mon Sep 17 00:00:00 2001 From: Arenatlx <314806019@qq.com> Date: Fri, 5 Feb 2021 13:23:17 +0800 Subject: [PATCH 1/5] cherry pick #22670 to release-4.0 Signed-off-by: ti-srebot --- ddl/column.go | 509 ++++++++++++++++++++++++++++++++++++++++++++++- ddl/util/util.go | 68 ++++--- 2 files changed, 539 insertions(+), 38 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index d264f669c1f37..a723dd7d6f9a2 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -417,6 +417,496 @@ func (w *worker) doModifyColumn( return ver, errors.Trace(err) } } +<<<<<<< HEAD +======= + job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo) + // Refactor the job args to add the abandoned temporary index ids into delete range table. + idxIDs := make([]int64, 0, len(jobParam.changingIdxs)) + for _, idx := range jobParam.changingIdxs { + idxIDs = append(idxIDs, idx.ID) + } + job.Args = []interface{}{idxIDs, getPartitionIDs(tblInfo)} + return ver, nil +} + +func (w *worker) doModifyColumnTypeWithData( + d *ddlCtx, t *meta.Meta, job *model.Job, + dbInfo *model.DBInfo, tblInfo *model.TableInfo, changingCol, oldCol *model.ColumnInfo, + colName model.CIStr, pos *ast.ColumnPosition, changingIdxs []*model.IndexInfo) (ver int64, _ error) { + var err error + originalState := changingCol.State + switch changingCol.State { + case model.StateNone: + // Column from null to not null. + if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(changingCol.Flag) { + // Introduce the `mysql.PreventNullInsertFlag` flag to prevent users from inserting or updating null values. + err := modifyColsFromNull2NotNull(w, dbInfo, tblInfo, []*model.ColumnInfo{oldCol}, oldCol.Name, oldCol.Tp != changingCol.Tp) + if err != nil { + if ErrWarnDataTruncated.Equal(err) || errInvalidUseOfNull.Equal(err) { + job.State = model.JobStateRollingback + } + return ver, err + } + } + // none -> delete only + updateChangingInfo(changingCol, changingIdxs, model.StateDeleteOnly) + failpoint.Inject("mockInsertValueAfterCheckNull", func(val failpoint.Value) { + if valStr, ok := val.(string); ok { + var ctx sessionctx.Context + ctx, err := w.sessPool.get() + if err != nil { + failpoint.Return(ver, err) + } + defer w.sessPool.put(ctx) + + stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(context.Background(), valStr) + if err != nil { + job.State = model.JobStateCancelled + failpoint.Return(ver, err) + } + _, _, err = ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedStmt(context.Background(), stmt) + if err != nil { + job.State = model.JobStateCancelled + failpoint.Return(ver, err) + } + } + }) + ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != changingCol.State) + if err != nil { + return ver, errors.Trace(err) + } + // Make sure job args change after `updateVersionAndTableInfoWithCheck`, otherwise, the job args will + // be updated in `updateDDLJob` even if it meets an error in `updateVersionAndTableInfoWithCheck`. + job.SchemaState = model.StateDeleteOnly + metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn).Set(0) + job.Args = append(job.Args, changingCol, changingIdxs) + case model.StateDeleteOnly: + // Column from null to not null. + if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(changingCol.Flag) { + // Introduce the `mysql.PreventNullInsertFlag` flag to prevent users from inserting or updating null values. + err := modifyColsFromNull2NotNull(w, dbInfo, tblInfo, []*model.ColumnInfo{oldCol}, oldCol.Name, oldCol.Tp != changingCol.Tp) + if err != nil { + if ErrWarnDataTruncated.Equal(err) || errInvalidUseOfNull.Equal(err) { + job.State = model.JobStateRollingback + } + return ver, err + } + } + // delete only -> write only + updateChangingInfo(changingCol, changingIdxs, model.StateWriteOnly) + ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != changingCol.State) + if err != nil { + return ver, errors.Trace(err) + } + job.SchemaState = model.StateWriteOnly + case model.StateWriteOnly: + // write only -> reorganization + updateChangingInfo(changingCol, changingIdxs, model.StateWriteReorganization) + ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != changingCol.State) + if err != nil { + return ver, errors.Trace(err) + } + // Initialize SnapshotVer to 0 for later reorganization check. + job.SnapshotVer = 0 + job.SchemaState = model.StateWriteReorganization + case model.StateWriteReorganization: + tbl, err := getTable(d.store, dbInfo.ID, tblInfo) + if err != nil { + return ver, errors.Trace(err) + } + + reorgInfo, err := getReorgInfo(d, t, job, tbl, BuildElements(changingCol, changingIdxs)) + if err != nil || reorgInfo.first { + // If we run reorg firstly, we should update the job snapshot version + // and then run the reorg next time. + return ver, errors.Trace(err) + } + + // Inject a failpoint so that we can pause here and do verification on other components. + // With a failpoint-enabled version of TiDB, you can trigger this failpoint by the following command: + // enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData". + // disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData" + failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {}) + err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) { + defer util.Recover(metrics.LabelDDL, "onModifyColumn", + func() { + addIndexErr = errCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tblInfo.Name, oldCol.Name) + }, false) + return w.updateColumnAndIndexes(tbl, oldCol, changingCol, changingIdxs, reorgInfo) + }) + if err != nil { + if errWaitReorgTimeout.Equal(err) { + // If timeout, we should return, check for the owner and re-wait job done. + return ver, nil + } + if needRollbackData(err) { + if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { + logutil.BgLogger().Warn("[ddl] run modify column job failed, RemoveDDLReorgHandle failed, can't convert job to rollback", + zap.String("job", job.String()), zap.Error(err1)) + return ver, errors.Trace(err) + } + logutil.BgLogger().Warn("[ddl] run modify column job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) + // When encounter these error above, we change the job to rolling back job directly. + job.State = model.JobStateRollingback + return ver, errors.Trace(err) + } + // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. + w.reorgCtx.cleanNotifyReorgCancel() + return ver, errors.Trace(err) + } + // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. + w.reorgCtx.cleanNotifyReorgCancel() + + // Remove the old column and indexes. Update the relative column name and index names. + oldIdxIDs := make([]int64, 0, len(changingIdxs)) + tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1] + for _, cIdx := range changingIdxs { + idxName := getChangingIndexOriginName(cIdx) + for i, idx := range tblInfo.Indices { + if strings.EqualFold(idxName, idx.Name.O) { + cIdx.Name = model.NewCIStr(idxName) + tblInfo.Indices[i] = cIdx + oldIdxIDs = append(oldIdxIDs, idx.ID) + break + } + } + } + changingColumnUniqueName := changingCol.Name + changingCol.Name = colName + changingCol.ChangeStateInfo = nil + tblInfo.Indices = tblInfo.Indices[:len(tblInfo.Indices)-len(changingIdxs)] + // Adjust table column offset. + if err = adjustColumnInfoInModifyColumn(job, tblInfo, changingCol, oldCol, pos, changingColumnUniqueName.L); err != nil { + // TODO: Do rollback. + return ver, errors.Trace(err) + } + updateChangingInfo(changingCol, changingIdxs, model.StatePublic) + ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != changingCol.State) + if err != nil { + return ver, errors.Trace(err) + } + + // Finish this job. + job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) + // Refactor the job args to add the old index ids into delete range table. + job.Args = []interface{}{oldIdxIDs, getPartitionIDs(tblInfo)} + asyncNotifyEvent(d, &ddlutil.Event{Tp: model.ActionModifyColumn, TableInfo: tblInfo, ColumnInfos: []*model.ColumnInfo{changingCol}}) + default: + err = ErrInvalidDDLState.GenWithStackByArgs("column", changingCol.State) + } + + return ver, errors.Trace(err) +} + +// needRollbackData indicates whether it needs to rollback data when specific error occurs. +func needRollbackData(err error) bool { + return kv.ErrKeyExists.Equal(err) || errCancelledDDLJob.Equal(err) || errCantDecodeRecord.Equal(err) || + types.ErrOverflow.Equal(err) || types.ErrDataTooLong.Equal(err) || types.ErrTruncated.Equal(err) || + json.ErrInvalidJSONText.Equal(err) || types.ErrBadNumber.Equal(err) || types.ErrInvalidYear.Equal(err) || + types.ErrWrongValue.Equal(err) +} + +// BuildElements is exported for testing. +func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) []*meta.Element { + elements := make([]*meta.Element, 0, len(changingIdxs)+1) + elements = append(elements, &meta.Element{ID: changingCol.ID, TypeKey: meta.ColumnElementKey}) + for _, idx := range changingIdxs { + elements = append(elements, &meta.Element{ID: idx.ID, TypeKey: meta.IndexElementKey}) + } + return elements +} + +func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, oldColInfo, colInfo *model.ColumnInfo, reorgInfo *reorgInfo) error { + logutil.BgLogger().Info("[ddl] start to update table row", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) + return w.writePhysicalTableRecord(t.(table.PhysicalTable), typeUpdateColumnWorker, nil, oldColInfo, colInfo, reorgInfo) +} + +// updateColumnAndIndexes handles the modify column reorganization state for a table. +func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.ColumnInfo, idxes []*model.IndexInfo, reorgInfo *reorgInfo) error { + // TODO: Support partition tables. + if bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) { + err := w.updatePhysicalTableRow(t.(table.PhysicalTable), oldCol, col, reorgInfo) + if err != nil { + return errors.Trace(err) + } + } + + // Get the original start handle and end handle. + currentVer, err := getValidCurrentVersion(reorgInfo.d.store) + if err != nil { + return errors.Trace(err) + } + originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority) + if err != nil { + return errors.Trace(err) + } + + startElementOffset := 0 + startElementOffsetToResetHandle := -1 + // This backfill job starts with backfilling index data, whose index ID is currElement.ID. + if bytes.Equal(reorgInfo.currElement.TypeKey, meta.IndexElementKey) { + for i, idx := range idxes { + if reorgInfo.currElement.ID == idx.ID { + startElementOffset = i + startElementOffsetToResetHandle = i + break + } + } + } + + for i := startElementOffset; i < len(idxes); i++ { + // This backfill job has been exited during processing. At that time, the element is reorgInfo.elements[i+1] and handle range is [reorgInfo.StartHandle, reorgInfo.EndHandle]. + // Then the handle range of the rest elements' is [originalStartHandle, originalEndHandle]. + if i == startElementOffsetToResetHandle+1 { + reorgInfo.StartKey, reorgInfo.EndKey = originalStartHandle, originalEndHandle + } + + // Update the element in the reorgCtx to keep the atomic access for daemon-worker. + w.reorgCtx.setCurrentElement(reorgInfo.elements[i+1]) + + // Update the element in the reorgInfo for updating the reorg meta below. + reorgInfo.currElement = reorgInfo.elements[i+1] + // Write the reorg info to store so the whole reorganize process can recover from panic. + err := reorgInfo.UpdateReorgMeta(reorgInfo.StartKey) + logutil.BgLogger().Info("[ddl] update column and indexes", + zap.Int64("jobID", reorgInfo.Job.ID), + zap.ByteString("elementType", reorgInfo.currElement.TypeKey), + zap.Int64("elementID", reorgInfo.currElement.ID), + zap.String("startHandle", tryDecodeToHandleString(reorgInfo.StartKey)), + zap.String("endHandle", tryDecodeToHandleString(reorgInfo.EndKey))) + if err != nil { + return errors.Trace(err) + } + err = w.addTableIndex(t, idxes[i], reorgInfo) + if err != nil { + return errors.Trace(err) + } + } + return nil +} + +type updateColumnWorker struct { + *backfillWorker + oldColInfo *model.ColumnInfo + newColInfo *model.ColumnInfo + metricCounter prometheus.Counter + + // The following attributes are used to reduce memory allocation. + rowRecords []*rowRecord + rowDecoder *decoder.RowDecoder + + rowMap map[int64]types.Datum + + // For SQL Mode and warnings. + sqlMode mysql.SQLMode +} + +func newUpdateColumnWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, oldCol, newCol *model.ColumnInfo, decodeColMap map[int64]decoder.Column, sqlMode mysql.SQLMode) *updateColumnWorker { + rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) + return &updateColumnWorker{ + backfillWorker: newBackfillWorker(sessCtx, worker, id, t), + oldColInfo: oldCol, + newColInfo: newCol, + metricCounter: metrics.BackfillTotalCounter.WithLabelValues("update_col_speed"), + rowDecoder: rowDecoder, + rowMap: make(map[int64]types.Datum, len(decodeColMap)), + sqlMode: sqlMode, + } +} + +func (w *updateColumnWorker) AddMetricInfo(cnt float64) { + w.metricCounter.Add(cnt) +} + +type rowRecord struct { + key []byte // It's used to lock a record. Record it to reduce the encoding time. + vals []byte // It's the record. + warning *terror.Error // It's used to record the cast warning of a record. +} + +// getNextKey gets next handle of entry that we are going to process. +func (w *updateColumnWorker) getNextKey(taskRange reorgBackfillTask, + taskDone bool, lastAccessedHandle kv.Key) (nextHandle kv.Key) { + if !taskDone { + // The task is not done. So we need to pick the last processed entry's handle and add one. + return lastAccessedHandle.Next() + } + + return taskRange.endKey.Next() +} + +func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBackfillTask) ([]*rowRecord, kv.Key, bool, error) { + w.rowRecords = w.rowRecords[:0] + startTime := time.Now() + + // taskDone means that the added handle is out of taskRange.endHandle. + taskDone := false + var lastAccessedHandle kv.Key + oprStartTime := startTime + err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey, + func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) { + oprEndTime := time.Now() + logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in updateColumnWorker fetchRowColVals", 0) + oprStartTime = oprEndTime + + taskDone = recordKey.Cmp(taskRange.endKey) > 0 + + if taskDone || len(w.rowRecords) >= w.batchCnt { + return false, nil + } + + if err1 := w.getRowRecord(handle, recordKey, rawRow); err1 != nil { + return false, errors.Trace(err1) + } + lastAccessedHandle = recordKey + if recordKey.Cmp(taskRange.endKey) == 0 { + // If taskRange.endIncluded == false, we will not reach here when handle == taskRange.endHandle. + taskDone = true + return false, nil + } + return true, nil + }) + + if len(w.rowRecords) == 0 { + taskDone = true + } + + logutil.BgLogger().Debug("[ddl] txn fetches handle info", zap.Uint64("txnStartTS", txn.StartTS()), zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime))) + return w.rowRecords, w.getNextKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err) +} + +func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, rawRow []byte) error { + _, err := w.rowDecoder.DecodeAndEvalRowWithMap(w.sessCtx, handle, rawRow, time.UTC, timeutil.SystemLocation(), w.rowMap) + if err != nil { + return errors.Trace(errCantDecodeRecord.GenWithStackByArgs("column", err)) + } + + if _, ok := w.rowMap[w.newColInfo.ID]; ok { + // The column is already added by update or insert statement, skip it. + w.cleanRowMap() + return nil + } + + var recordWarning *terror.Error + // Since every updateColumnWorker handle their own work individually, we can cache warning in statement context when casting datum. + oldWarn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() + if oldWarn == nil { + oldWarn = []stmtctx.SQLWarn{} + } else { + oldWarn = oldWarn[:0] + } + w.sessCtx.GetSessionVars().StmtCtx.SetWarnings(oldWarn) + newColVal, err := table.CastValue(w.sessCtx, w.rowMap[w.oldColInfo.ID], w.newColInfo, false, false) + if err != nil { + return w.reformatErrors(err) + } + if w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() != nil && len(w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()) != 0 { + warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() + recordWarning = errors.Cause(w.reformatErrors(warn[0].Err)).(*terror.Error) + } + + failpoint.Inject("MockReorgTimeoutInOneRegion", func(val failpoint.Value) { + if val.(bool) { + if handle.IntValue() == 3000 && atomic.CompareAndSwapInt32(&TestCheckReorgTimeout, 0, 1) { + failpoint.Return(errors.Trace(errWaitReorgTimeout)) + } + } + }) + + w.rowMap[w.newColInfo.ID] = newColVal + newColumnIDs := make([]int64, 0, len(w.rowMap)) + newRow := make([]types.Datum, 0, len(w.rowMap)) + for colID, val := range w.rowMap { + newColumnIDs = append(newColumnIDs, colID) + newRow = append(newRow, val) + } + sctx, rd := w.sessCtx.GetSessionVars().StmtCtx, &w.sessCtx.GetSessionVars().RowEncoder + newRowVal, err := tablecodec.EncodeRow(sctx, newRow, newColumnIDs, nil, nil, rd) + if err != nil { + return errors.Trace(err) + } + + w.rowRecords = append(w.rowRecords, &rowRecord{key: recordKey, vals: newRowVal, warning: recordWarning}) + w.cleanRowMap() + return nil +} + +// reformatErrors casted error because `convertTo` function couldn't package column name and datum value for some errors. +func (w *updateColumnWorker) reformatErrors(err error) error { + // Since row count is not precious in concurrent reorganization, here we substitute row count with datum value. + if types.ErrTruncated.Equal(err) { + err = types.ErrTruncated.GenWithStack("Data truncated for column '%s', value is '%s'", w.oldColInfo.Name, w.rowMap[w.oldColInfo.ID]) + } + + if types.ErrInvalidYear.Equal(err) { + err = types.ErrInvalidYear.GenWithStack("Invalid year value for column '%s', value is '%s'", w.oldColInfo.Name, w.rowMap[w.oldColInfo.ID]) + } + return err +} + +func (w *updateColumnWorker) cleanRowMap() { + for id := range w.rowMap { + delete(w.rowMap, id) + } +} + +// BackfillDataInTxn will backfill the table record in a transaction, lock corresponding rowKey, if the value of rowKey is changed. +func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) { + oprStartTime := time.Now() + errInTxn = kv.RunInNewTxn(context.Background(), w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { + taskCtx.addedCount = 0 + taskCtx.scanCount = 0 + txn.SetOption(kv.Priority, w.priority) + + rowRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) + if err != nil { + return errors.Trace(err) + } + taskCtx.nextKey = nextKey + taskCtx.done = taskDone + + warningsMap := make(map[errors.ErrorID]*terror.Error, len(rowRecords)) + warningsCountMap := make(map[errors.ErrorID]int64, len(rowRecords)) + for _, rowRecord := range rowRecords { + taskCtx.scanCount++ + + err = txn.Set(rowRecord.key, rowRecord.vals) + if err != nil { + return errors.Trace(err) + } + taskCtx.addedCount++ + if rowRecord.warning != nil { + if _, ok := warningsCountMap[rowRecord.warning.ID()]; ok { + warningsCountMap[rowRecord.warning.ID()]++ + } else { + warningsCountMap[rowRecord.warning.ID()] = 1 + warningsMap[rowRecord.warning.ID()] = rowRecord.warning + } + } + } + + // Collect the warnings. + taskCtx.warnings, taskCtx.warningsCount = warningsMap, warningsCountMap + + return nil + }) + logSlowOperations(time.Since(oprStartTime), "BackfillDataInTxn", 3000) + + return +} + +func updateChangingInfo(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo, schemaState model.SchemaState) { + changingCol.State = schemaState + for _, idx := range changingIdxs { + idx.State = schemaState + } +} + +// doModifyColumn updates the column information and reorders all columns. It does not support modifying column data. +func (w *worker) doModifyColumn( + t *meta.Meta, job *model.Job, dbInfo *model.DBInfo, tblInfo *model.TableInfo, + newCol, oldCol *model.ColumnInfo, pos *ast.ColumnPosition) (ver int64, _ error) { +>>>>>>> 75f748568... ddl: migrate part of ddl package code from Execute/ExecRestricted to safe API (1) (#22670) // Column from null to not null. if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(newCol.Flag) { noPreventNullFlag := !mysql.HasPreventNullInsertFlag(oldCol.Flag) @@ -538,16 +1028,25 @@ func checkAndApplyNewAutoRandomBits(job *model.Job, t *meta.Meta, tblInfo *model // checkForNullValue ensure there are no null values of the column of this table. // `isDataTruncated` indicates whether the new field and the old field type are the same, in order to be compatible with mysql. func checkForNullValue(ctx sessionctx.Context, isDataTruncated bool, schema, table, newCol model.CIStr, oldCols ...*model.ColumnInfo) error { - colsStr := "" + var buf strings.Builder + buf.WriteString("select 1 from %n.%n where ") + paramsList := make([]interface{}, 0, 2+len(oldCols)) + paramsList = append(paramsList, schema.L, table.L) for i, col := range oldCols { if i == 0 { - colsStr += "`" + col.Name.L + "` is null" + buf.WriteString("%n is null") + paramsList = append(paramsList, col.Name.L) } else { - colsStr += " or `" + col.Name.L + "` is null" + buf.WriteString(" or %n is null") + paramsList = append(paramsList, col.Name.L) } } - sql := fmt.Sprintf("select 1 from `%s`.`%s` where %s limit 1;", schema.L, table.L, colsStr) - rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql) + buf.WriteString(" limit 1") + stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(context.Background(), buf.String(), paramsList...) + if err != nil { + return errors.Trace(err) + } + rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedStmt(context.Background(), stmt) if err != nil { return errors.Trace(err) } diff --git a/ddl/util/util.go b/ddl/util/util.go index 69a6d3ced56f7..b4e22b8f80034 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -14,11 +14,9 @@ package util import ( - "bytes" "context" "encoding/hex" - "fmt" - "strconv" + "strings" "github.com/pingcap/errors" "github.com/pingcap/parser/terror" @@ -32,12 +30,13 @@ import ( const ( deleteRangesTable = `gc_delete_range` doneDeleteRangesTable = `gc_delete_range_done` - loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.%s WHERE ts < %v` - recordDoneDeletedRangeSQL = `INSERT IGNORE INTO mysql.gc_delete_range_done SELECT * FROM mysql.gc_delete_range WHERE job_id = %d AND element_id = %d` - completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %d AND element_id = %d` - completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %d AND element_id in (%v)` - updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = "%s" WHERE job_id = %d AND element_id = %d AND start_key = "%s"` - deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %d AND element_id = %d` + loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.%n WHERE ts < %?` + recordDoneDeletedRangeSQL = `INSERT IGNORE INTO mysql.gc_delete_range_done SELECT * FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` + completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` + completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id in (` // + idList + ")" + updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = %? WHERE job_id = %? AND element_id = %? AND start_key = %?` + deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %? AND element_id = %?` + loadGlobalVars = `SELECT HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (` // + nameList + ")" ) // DelRangeTask is for run delete-range command in gc_worker. @@ -62,16 +61,14 @@ func LoadDoneDeleteRanges(ctx sessionctx.Context, safePoint uint64) (ranges []De } func loadDeleteRangesFromTable(ctx sessionctx.Context, table string, safePoint uint64) (ranges []DelRangeTask, _ error) { - sql := fmt.Sprintf(loadDeleteRangeSQL, table, safePoint) - rss, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) - if len(rss) > 0 { - defer terror.Call(rss[0].Close) + rs, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), loadDeleteRangeSQL, table, safePoint) + if rs != nil { + defer terror.Call(rs.Close) } if err != nil { return nil, errors.Trace(err) } - rs := rss[0] req := rs.NewChunk() it := chunk.NewIterator4Chunk(req) for { @@ -106,8 +103,7 @@ func loadDeleteRangesFromTable(ctx sessionctx.Context, table string, safePoint u // CompleteDeleteRange moves a record from gc_delete_range table to gc_delete_range_done table. // NOTE: This function WILL NOT start and run in a new transaction internally. func CompleteDeleteRange(ctx sessionctx.Context, dr DelRangeTask) error { - sql := fmt.Sprintf(recordDoneDeletedRangeSQL, dr.JobID, dr.ElementID) - _, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), recordDoneDeletedRangeSQL, dr.JobID, dr.ElementID) if err != nil { return errors.Trace(err) } @@ -117,29 +113,31 @@ func CompleteDeleteRange(ctx sessionctx.Context, dr DelRangeTask) error { // RemoveFromGCDeleteRange is exported for ddl pkg to use. func RemoveFromGCDeleteRange(ctx sessionctx.Context, jobID, elementID int64) error { - sql := fmt.Sprintf(completeDeleteRangeSQL, jobID, elementID) - _, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), completeDeleteRangeSQL, jobID, elementID) return errors.Trace(err) } // RemoveMultiFromGCDeleteRange is exported for ddl pkg to use. func RemoveMultiFromGCDeleteRange(ctx sessionctx.Context, jobID int64, elementIDs []int64) error { - var buf bytes.Buffer + var buf strings.Builder + buf.WriteString(completeDeleteMultiRangesSQL) + paramIDs := make([]interface{}, 0, 1+len(elementIDs)) + paramIDs = append(paramIDs, jobID) for i, elementID := range elementIDs { if i > 0 { buf.WriteString(", ") } - buf.WriteString(strconv.FormatInt(elementID, 10)) + buf.WriteString("%?") + paramIDs = append(paramIDs, elementID) } - sql := fmt.Sprintf(completeDeleteMultiRangesSQL, jobID, buf.String()) - _, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + buf.WriteString(")") + _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), buf.String(), paramIDs...) return errors.Trace(err) } // DeleteDoneRecord removes a record from gc_delete_range_done table. func DeleteDoneRecord(ctx sessionctx.Context, dr DelRangeTask) error { - sql := fmt.Sprintf(deleteDoneRecordSQL, dr.JobID, dr.ElementID) - _, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), deleteDoneRecordSQL, dr.JobID, dr.ElementID) return errors.Trace(err) } @@ -147,8 +145,7 @@ func DeleteDoneRecord(ctx sessionctx.Context, dr DelRangeTask) error { func UpdateDeleteRange(ctx sessionctx.Context, dr DelRangeTask, newStartKey, oldStartKey kv.Key) error { newStartKeyHex := hex.EncodeToString(newStartKey) oldStartKeyHex := hex.EncodeToString(oldStartKey) - sql := fmt.Sprintf(updateDeleteRangeSQL, newStartKeyHex, dr.JobID, dr.ElementID, oldStartKeyHex) - _, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), updateDeleteRangeSQL, newStartKeyHex, dr.JobID, dr.ElementID, oldStartKeyHex) return errors.Trace(err) } @@ -162,20 +159,25 @@ func LoadDDLVars(ctx sessionctx.Context) error { return LoadGlobalVars(ctx, []string{variable.TiDBDDLErrorCountLimit}) } -const loadGlobalVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (%s)" - // LoadGlobalVars loads global variable from mysql.global_variables. func LoadGlobalVars(ctx sessionctx.Context, varNames []string) error { if sctx, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok { - nameList := "" + var buf strings.Builder + buf.WriteString(loadGlobalVars) + paramNames := make([]interface{}, 0, len(varNames)) for i, name := range varNames { if i > 0 { - nameList += ", " + buf.WriteString(", ") } - nameList += fmt.Sprintf("'%s'", name) + buf.WriteString("%?") + paramNames = append(paramNames, name) + } + buf.WriteString(")") + stmt, err := sctx.ParseWithParams(context.Background(), buf.String(), paramNames...) + if err != nil { + return errors.Trace(err) } - sql := fmt.Sprintf(loadGlobalVarsSQL, nameList) - rows, _, err := sctx.ExecRestrictedSQL(sql) + rows, _, err := sctx.ExecRestrictedStmt(context.Background(), stmt) if err != nil { return errors.Trace(err) } From a15c38d5f4ec5a52bb67f5e8515b99ddf24b4413 Mon Sep 17 00:00:00 2001 From: ailinkid <314806019@qq.com> Date: Tue, 9 Mar 2021 17:15:39 +0800 Subject: [PATCH 2/5] address comment Signed-off-by: ailinkid <314806019@qq.com> --- ddl/column.go | 490 -------------------------------------------------- 1 file changed, 490 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index a723dd7d6f9a2..f498f6e601051 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -417,496 +417,6 @@ func (w *worker) doModifyColumn( return ver, errors.Trace(err) } } -<<<<<<< HEAD -======= - job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo) - // Refactor the job args to add the abandoned temporary index ids into delete range table. - idxIDs := make([]int64, 0, len(jobParam.changingIdxs)) - for _, idx := range jobParam.changingIdxs { - idxIDs = append(idxIDs, idx.ID) - } - job.Args = []interface{}{idxIDs, getPartitionIDs(tblInfo)} - return ver, nil -} - -func (w *worker) doModifyColumnTypeWithData( - d *ddlCtx, t *meta.Meta, job *model.Job, - dbInfo *model.DBInfo, tblInfo *model.TableInfo, changingCol, oldCol *model.ColumnInfo, - colName model.CIStr, pos *ast.ColumnPosition, changingIdxs []*model.IndexInfo) (ver int64, _ error) { - var err error - originalState := changingCol.State - switch changingCol.State { - case model.StateNone: - // Column from null to not null. - if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(changingCol.Flag) { - // Introduce the `mysql.PreventNullInsertFlag` flag to prevent users from inserting or updating null values. - err := modifyColsFromNull2NotNull(w, dbInfo, tblInfo, []*model.ColumnInfo{oldCol}, oldCol.Name, oldCol.Tp != changingCol.Tp) - if err != nil { - if ErrWarnDataTruncated.Equal(err) || errInvalidUseOfNull.Equal(err) { - job.State = model.JobStateRollingback - } - return ver, err - } - } - // none -> delete only - updateChangingInfo(changingCol, changingIdxs, model.StateDeleteOnly) - failpoint.Inject("mockInsertValueAfterCheckNull", func(val failpoint.Value) { - if valStr, ok := val.(string); ok { - var ctx sessionctx.Context - ctx, err := w.sessPool.get() - if err != nil { - failpoint.Return(ver, err) - } - defer w.sessPool.put(ctx) - - stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(context.Background(), valStr) - if err != nil { - job.State = model.JobStateCancelled - failpoint.Return(ver, err) - } - _, _, err = ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedStmt(context.Background(), stmt) - if err != nil { - job.State = model.JobStateCancelled - failpoint.Return(ver, err) - } - } - }) - ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != changingCol.State) - if err != nil { - return ver, errors.Trace(err) - } - // Make sure job args change after `updateVersionAndTableInfoWithCheck`, otherwise, the job args will - // be updated in `updateDDLJob` even if it meets an error in `updateVersionAndTableInfoWithCheck`. - job.SchemaState = model.StateDeleteOnly - metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn).Set(0) - job.Args = append(job.Args, changingCol, changingIdxs) - case model.StateDeleteOnly: - // Column from null to not null. - if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(changingCol.Flag) { - // Introduce the `mysql.PreventNullInsertFlag` flag to prevent users from inserting or updating null values. - err := modifyColsFromNull2NotNull(w, dbInfo, tblInfo, []*model.ColumnInfo{oldCol}, oldCol.Name, oldCol.Tp != changingCol.Tp) - if err != nil { - if ErrWarnDataTruncated.Equal(err) || errInvalidUseOfNull.Equal(err) { - job.State = model.JobStateRollingback - } - return ver, err - } - } - // delete only -> write only - updateChangingInfo(changingCol, changingIdxs, model.StateWriteOnly) - ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != changingCol.State) - if err != nil { - return ver, errors.Trace(err) - } - job.SchemaState = model.StateWriteOnly - case model.StateWriteOnly: - // write only -> reorganization - updateChangingInfo(changingCol, changingIdxs, model.StateWriteReorganization) - ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != changingCol.State) - if err != nil { - return ver, errors.Trace(err) - } - // Initialize SnapshotVer to 0 for later reorganization check. - job.SnapshotVer = 0 - job.SchemaState = model.StateWriteReorganization - case model.StateWriteReorganization: - tbl, err := getTable(d.store, dbInfo.ID, tblInfo) - if err != nil { - return ver, errors.Trace(err) - } - - reorgInfo, err := getReorgInfo(d, t, job, tbl, BuildElements(changingCol, changingIdxs)) - if err != nil || reorgInfo.first { - // If we run reorg firstly, we should update the job snapshot version - // and then run the reorg next time. - return ver, errors.Trace(err) - } - - // Inject a failpoint so that we can pause here and do verification on other components. - // With a failpoint-enabled version of TiDB, you can trigger this failpoint by the following command: - // enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData". - // disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData" - failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {}) - err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) { - defer util.Recover(metrics.LabelDDL, "onModifyColumn", - func() { - addIndexErr = errCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tblInfo.Name, oldCol.Name) - }, false) - return w.updateColumnAndIndexes(tbl, oldCol, changingCol, changingIdxs, reorgInfo) - }) - if err != nil { - if errWaitReorgTimeout.Equal(err) { - // If timeout, we should return, check for the owner and re-wait job done. - return ver, nil - } - if needRollbackData(err) { - if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { - logutil.BgLogger().Warn("[ddl] run modify column job failed, RemoveDDLReorgHandle failed, can't convert job to rollback", - zap.String("job", job.String()), zap.Error(err1)) - return ver, errors.Trace(err) - } - logutil.BgLogger().Warn("[ddl] run modify column job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) - // When encounter these error above, we change the job to rolling back job directly. - job.State = model.JobStateRollingback - return ver, errors.Trace(err) - } - // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. - w.reorgCtx.cleanNotifyReorgCancel() - return ver, errors.Trace(err) - } - // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. - w.reorgCtx.cleanNotifyReorgCancel() - - // Remove the old column and indexes. Update the relative column name and index names. - oldIdxIDs := make([]int64, 0, len(changingIdxs)) - tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1] - for _, cIdx := range changingIdxs { - idxName := getChangingIndexOriginName(cIdx) - for i, idx := range tblInfo.Indices { - if strings.EqualFold(idxName, idx.Name.O) { - cIdx.Name = model.NewCIStr(idxName) - tblInfo.Indices[i] = cIdx - oldIdxIDs = append(oldIdxIDs, idx.ID) - break - } - } - } - changingColumnUniqueName := changingCol.Name - changingCol.Name = colName - changingCol.ChangeStateInfo = nil - tblInfo.Indices = tblInfo.Indices[:len(tblInfo.Indices)-len(changingIdxs)] - // Adjust table column offset. - if err = adjustColumnInfoInModifyColumn(job, tblInfo, changingCol, oldCol, pos, changingColumnUniqueName.L); err != nil { - // TODO: Do rollback. - return ver, errors.Trace(err) - } - updateChangingInfo(changingCol, changingIdxs, model.StatePublic) - ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != changingCol.State) - if err != nil { - return ver, errors.Trace(err) - } - - // Finish this job. - job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) - // Refactor the job args to add the old index ids into delete range table. - job.Args = []interface{}{oldIdxIDs, getPartitionIDs(tblInfo)} - asyncNotifyEvent(d, &ddlutil.Event{Tp: model.ActionModifyColumn, TableInfo: tblInfo, ColumnInfos: []*model.ColumnInfo{changingCol}}) - default: - err = ErrInvalidDDLState.GenWithStackByArgs("column", changingCol.State) - } - - return ver, errors.Trace(err) -} - -// needRollbackData indicates whether it needs to rollback data when specific error occurs. -func needRollbackData(err error) bool { - return kv.ErrKeyExists.Equal(err) || errCancelledDDLJob.Equal(err) || errCantDecodeRecord.Equal(err) || - types.ErrOverflow.Equal(err) || types.ErrDataTooLong.Equal(err) || types.ErrTruncated.Equal(err) || - json.ErrInvalidJSONText.Equal(err) || types.ErrBadNumber.Equal(err) || types.ErrInvalidYear.Equal(err) || - types.ErrWrongValue.Equal(err) -} - -// BuildElements is exported for testing. -func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) []*meta.Element { - elements := make([]*meta.Element, 0, len(changingIdxs)+1) - elements = append(elements, &meta.Element{ID: changingCol.ID, TypeKey: meta.ColumnElementKey}) - for _, idx := range changingIdxs { - elements = append(elements, &meta.Element{ID: idx.ID, TypeKey: meta.IndexElementKey}) - } - return elements -} - -func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, oldColInfo, colInfo *model.ColumnInfo, reorgInfo *reorgInfo) error { - logutil.BgLogger().Info("[ddl] start to update table row", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(t.(table.PhysicalTable), typeUpdateColumnWorker, nil, oldColInfo, colInfo, reorgInfo) -} - -// updateColumnAndIndexes handles the modify column reorganization state for a table. -func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.ColumnInfo, idxes []*model.IndexInfo, reorgInfo *reorgInfo) error { - // TODO: Support partition tables. - if bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) { - err := w.updatePhysicalTableRow(t.(table.PhysicalTable), oldCol, col, reorgInfo) - if err != nil { - return errors.Trace(err) - } - } - - // Get the original start handle and end handle. - currentVer, err := getValidCurrentVersion(reorgInfo.d.store) - if err != nil { - return errors.Trace(err) - } - originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority) - if err != nil { - return errors.Trace(err) - } - - startElementOffset := 0 - startElementOffsetToResetHandle := -1 - // This backfill job starts with backfilling index data, whose index ID is currElement.ID. - if bytes.Equal(reorgInfo.currElement.TypeKey, meta.IndexElementKey) { - for i, idx := range idxes { - if reorgInfo.currElement.ID == idx.ID { - startElementOffset = i - startElementOffsetToResetHandle = i - break - } - } - } - - for i := startElementOffset; i < len(idxes); i++ { - // This backfill job has been exited during processing. At that time, the element is reorgInfo.elements[i+1] and handle range is [reorgInfo.StartHandle, reorgInfo.EndHandle]. - // Then the handle range of the rest elements' is [originalStartHandle, originalEndHandle]. - if i == startElementOffsetToResetHandle+1 { - reorgInfo.StartKey, reorgInfo.EndKey = originalStartHandle, originalEndHandle - } - - // Update the element in the reorgCtx to keep the atomic access for daemon-worker. - w.reorgCtx.setCurrentElement(reorgInfo.elements[i+1]) - - // Update the element in the reorgInfo for updating the reorg meta below. - reorgInfo.currElement = reorgInfo.elements[i+1] - // Write the reorg info to store so the whole reorganize process can recover from panic. - err := reorgInfo.UpdateReorgMeta(reorgInfo.StartKey) - logutil.BgLogger().Info("[ddl] update column and indexes", - zap.Int64("jobID", reorgInfo.Job.ID), - zap.ByteString("elementType", reorgInfo.currElement.TypeKey), - zap.Int64("elementID", reorgInfo.currElement.ID), - zap.String("startHandle", tryDecodeToHandleString(reorgInfo.StartKey)), - zap.String("endHandle", tryDecodeToHandleString(reorgInfo.EndKey))) - if err != nil { - return errors.Trace(err) - } - err = w.addTableIndex(t, idxes[i], reorgInfo) - if err != nil { - return errors.Trace(err) - } - } - return nil -} - -type updateColumnWorker struct { - *backfillWorker - oldColInfo *model.ColumnInfo - newColInfo *model.ColumnInfo - metricCounter prometheus.Counter - - // The following attributes are used to reduce memory allocation. - rowRecords []*rowRecord - rowDecoder *decoder.RowDecoder - - rowMap map[int64]types.Datum - - // For SQL Mode and warnings. - sqlMode mysql.SQLMode -} - -func newUpdateColumnWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, oldCol, newCol *model.ColumnInfo, decodeColMap map[int64]decoder.Column, sqlMode mysql.SQLMode) *updateColumnWorker { - rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) - return &updateColumnWorker{ - backfillWorker: newBackfillWorker(sessCtx, worker, id, t), - oldColInfo: oldCol, - newColInfo: newCol, - metricCounter: metrics.BackfillTotalCounter.WithLabelValues("update_col_speed"), - rowDecoder: rowDecoder, - rowMap: make(map[int64]types.Datum, len(decodeColMap)), - sqlMode: sqlMode, - } -} - -func (w *updateColumnWorker) AddMetricInfo(cnt float64) { - w.metricCounter.Add(cnt) -} - -type rowRecord struct { - key []byte // It's used to lock a record. Record it to reduce the encoding time. - vals []byte // It's the record. - warning *terror.Error // It's used to record the cast warning of a record. -} - -// getNextKey gets next handle of entry that we are going to process. -func (w *updateColumnWorker) getNextKey(taskRange reorgBackfillTask, - taskDone bool, lastAccessedHandle kv.Key) (nextHandle kv.Key) { - if !taskDone { - // The task is not done. So we need to pick the last processed entry's handle and add one. - return lastAccessedHandle.Next() - } - - return taskRange.endKey.Next() -} - -func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBackfillTask) ([]*rowRecord, kv.Key, bool, error) { - w.rowRecords = w.rowRecords[:0] - startTime := time.Now() - - // taskDone means that the added handle is out of taskRange.endHandle. - taskDone := false - var lastAccessedHandle kv.Key - oprStartTime := startTime - err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey, - func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) { - oprEndTime := time.Now() - logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in updateColumnWorker fetchRowColVals", 0) - oprStartTime = oprEndTime - - taskDone = recordKey.Cmp(taskRange.endKey) > 0 - - if taskDone || len(w.rowRecords) >= w.batchCnt { - return false, nil - } - - if err1 := w.getRowRecord(handle, recordKey, rawRow); err1 != nil { - return false, errors.Trace(err1) - } - lastAccessedHandle = recordKey - if recordKey.Cmp(taskRange.endKey) == 0 { - // If taskRange.endIncluded == false, we will not reach here when handle == taskRange.endHandle. - taskDone = true - return false, nil - } - return true, nil - }) - - if len(w.rowRecords) == 0 { - taskDone = true - } - - logutil.BgLogger().Debug("[ddl] txn fetches handle info", zap.Uint64("txnStartTS", txn.StartTS()), zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime))) - return w.rowRecords, w.getNextKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err) -} - -func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, rawRow []byte) error { - _, err := w.rowDecoder.DecodeAndEvalRowWithMap(w.sessCtx, handle, rawRow, time.UTC, timeutil.SystemLocation(), w.rowMap) - if err != nil { - return errors.Trace(errCantDecodeRecord.GenWithStackByArgs("column", err)) - } - - if _, ok := w.rowMap[w.newColInfo.ID]; ok { - // The column is already added by update or insert statement, skip it. - w.cleanRowMap() - return nil - } - - var recordWarning *terror.Error - // Since every updateColumnWorker handle their own work individually, we can cache warning in statement context when casting datum. - oldWarn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() - if oldWarn == nil { - oldWarn = []stmtctx.SQLWarn{} - } else { - oldWarn = oldWarn[:0] - } - w.sessCtx.GetSessionVars().StmtCtx.SetWarnings(oldWarn) - newColVal, err := table.CastValue(w.sessCtx, w.rowMap[w.oldColInfo.ID], w.newColInfo, false, false) - if err != nil { - return w.reformatErrors(err) - } - if w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() != nil && len(w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()) != 0 { - warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() - recordWarning = errors.Cause(w.reformatErrors(warn[0].Err)).(*terror.Error) - } - - failpoint.Inject("MockReorgTimeoutInOneRegion", func(val failpoint.Value) { - if val.(bool) { - if handle.IntValue() == 3000 && atomic.CompareAndSwapInt32(&TestCheckReorgTimeout, 0, 1) { - failpoint.Return(errors.Trace(errWaitReorgTimeout)) - } - } - }) - - w.rowMap[w.newColInfo.ID] = newColVal - newColumnIDs := make([]int64, 0, len(w.rowMap)) - newRow := make([]types.Datum, 0, len(w.rowMap)) - for colID, val := range w.rowMap { - newColumnIDs = append(newColumnIDs, colID) - newRow = append(newRow, val) - } - sctx, rd := w.sessCtx.GetSessionVars().StmtCtx, &w.sessCtx.GetSessionVars().RowEncoder - newRowVal, err := tablecodec.EncodeRow(sctx, newRow, newColumnIDs, nil, nil, rd) - if err != nil { - return errors.Trace(err) - } - - w.rowRecords = append(w.rowRecords, &rowRecord{key: recordKey, vals: newRowVal, warning: recordWarning}) - w.cleanRowMap() - return nil -} - -// reformatErrors casted error because `convertTo` function couldn't package column name and datum value for some errors. -func (w *updateColumnWorker) reformatErrors(err error) error { - // Since row count is not precious in concurrent reorganization, here we substitute row count with datum value. - if types.ErrTruncated.Equal(err) { - err = types.ErrTruncated.GenWithStack("Data truncated for column '%s', value is '%s'", w.oldColInfo.Name, w.rowMap[w.oldColInfo.ID]) - } - - if types.ErrInvalidYear.Equal(err) { - err = types.ErrInvalidYear.GenWithStack("Invalid year value for column '%s', value is '%s'", w.oldColInfo.Name, w.rowMap[w.oldColInfo.ID]) - } - return err -} - -func (w *updateColumnWorker) cleanRowMap() { - for id := range w.rowMap { - delete(w.rowMap, id) - } -} - -// BackfillDataInTxn will backfill the table record in a transaction, lock corresponding rowKey, if the value of rowKey is changed. -func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) { - oprStartTime := time.Now() - errInTxn = kv.RunInNewTxn(context.Background(), w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { - taskCtx.addedCount = 0 - taskCtx.scanCount = 0 - txn.SetOption(kv.Priority, w.priority) - - rowRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) - if err != nil { - return errors.Trace(err) - } - taskCtx.nextKey = nextKey - taskCtx.done = taskDone - - warningsMap := make(map[errors.ErrorID]*terror.Error, len(rowRecords)) - warningsCountMap := make(map[errors.ErrorID]int64, len(rowRecords)) - for _, rowRecord := range rowRecords { - taskCtx.scanCount++ - - err = txn.Set(rowRecord.key, rowRecord.vals) - if err != nil { - return errors.Trace(err) - } - taskCtx.addedCount++ - if rowRecord.warning != nil { - if _, ok := warningsCountMap[rowRecord.warning.ID()]; ok { - warningsCountMap[rowRecord.warning.ID()]++ - } else { - warningsCountMap[rowRecord.warning.ID()] = 1 - warningsMap[rowRecord.warning.ID()] = rowRecord.warning - } - } - } - - // Collect the warnings. - taskCtx.warnings, taskCtx.warningsCount = warningsMap, warningsCountMap - - return nil - }) - logSlowOperations(time.Since(oprStartTime), "BackfillDataInTxn", 3000) - - return -} - -func updateChangingInfo(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo, schemaState model.SchemaState) { - changingCol.State = schemaState - for _, idx := range changingIdxs { - idx.State = schemaState - } -} - -// doModifyColumn updates the column information and reorders all columns. It does not support modifying column data. -func (w *worker) doModifyColumn( - t *meta.Meta, job *model.Job, dbInfo *model.DBInfo, tblInfo *model.TableInfo, - newCol, oldCol *model.ColumnInfo, pos *ast.ColumnPosition) (ver int64, _ error) { ->>>>>>> 75f748568... ddl: migrate part of ddl package code from Execute/ExecRestricted to safe API (1) (#22670) // Column from null to not null. if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(newCol.Flag) { noPreventNullFlag := !mysql.HasPreventNullInsertFlag(oldCol.Flag) From 502089718c6902c72285e97e4d10283ccd387b1e Mon Sep 17 00:00:00 2001 From: ailinkid <314806019@qq.com> Date: Tue, 9 Mar 2021 17:21:05 +0800 Subject: [PATCH 3/5] . Signed-off-by: ailinkid <314806019@qq.com> --- ddl/util/util.go | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/ddl/util/util.go b/ddl/util/util.go index b4e22b8f80034..57e307735d6b7 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -36,7 +36,7 @@ const ( completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id in (` // + idList + ")" updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = %? WHERE job_id = %? AND element_id = %? AND start_key = %?` deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %? AND element_id = %?` - loadGlobalVars = `SELECT HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (` // + nameList + ")" + loadGlobalVars = `SELECT HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (%?)` ) // DelRangeTask is for run delete-range command in gc_worker. @@ -162,18 +162,7 @@ func LoadDDLVars(ctx sessionctx.Context) error { // LoadGlobalVars loads global variable from mysql.global_variables. func LoadGlobalVars(ctx sessionctx.Context, varNames []string) error { if sctx, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok { - var buf strings.Builder - buf.WriteString(loadGlobalVars) - paramNames := make([]interface{}, 0, len(varNames)) - for i, name := range varNames { - if i > 0 { - buf.WriteString(", ") - } - buf.WriteString("%?") - paramNames = append(paramNames, name) - } - buf.WriteString(")") - stmt, err := sctx.ParseWithParams(context.Background(), buf.String(), paramNames...) + stmt, err := sctx.ParseWithParams(context.Background(), loadGlobalVars, varNames) if err != nil { return errors.Trace(err) } From e32de448cd7d4f8b374b0b78bda974379fbe8b02 Mon Sep 17 00:00:00 2001 From: ailinkid <314806019@qq.com> Date: Tue, 9 Mar 2021 17:39:48 +0800 Subject: [PATCH 4/5] . Signed-off-by: ailinkid <314806019@qq.com> --- ddl/column.go | 1 + ddl/util/util.go | 18 ++---------------- 2 files changed, 3 insertions(+), 16 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index f498f6e601051..42e5de30efa4f 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -14,6 +14,7 @@ package ddl import ( + "context" "fmt" "math/bits" "strings" diff --git a/ddl/util/util.go b/ddl/util/util.go index 57e307735d6b7..8f9dc337b6449 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -16,8 +16,6 @@ package util import ( "context" "encoding/hex" - "strings" - "github.com/pingcap/errors" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/kv" @@ -33,7 +31,7 @@ const ( loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.%n WHERE ts < %?` recordDoneDeletedRangeSQL = `INSERT IGNORE INTO mysql.gc_delete_range_done SELECT * FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` - completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id in (` // + idList + ")" + completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id in (%?)` updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = %? WHERE job_id = %? AND element_id = %? AND start_key = %?` deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %? AND element_id = %?` loadGlobalVars = `SELECT HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (%?)` @@ -119,19 +117,7 @@ func RemoveFromGCDeleteRange(ctx sessionctx.Context, jobID, elementID int64) err // RemoveMultiFromGCDeleteRange is exported for ddl pkg to use. func RemoveMultiFromGCDeleteRange(ctx sessionctx.Context, jobID int64, elementIDs []int64) error { - var buf strings.Builder - buf.WriteString(completeDeleteMultiRangesSQL) - paramIDs := make([]interface{}, 0, 1+len(elementIDs)) - paramIDs = append(paramIDs, jobID) - for i, elementID := range elementIDs { - if i > 0 { - buf.WriteString(", ") - } - buf.WriteString("%?") - paramIDs = append(paramIDs, elementID) - } - buf.WriteString(")") - _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), buf.String(), paramIDs...) + _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), completeDeleteMultiRangesSQL, jobID, elementIDs) return errors.Trace(err) } From 101d3a06720770b54dbb9d55111d6ab0e35c5281 Mon Sep 17 00:00:00 2001 From: ailinkid <314806019@qq.com> Date: Wed, 10 Mar 2021 10:12:34 +0800 Subject: [PATCH 5/5] . Signed-off-by: ailinkid <314806019@qq.com> --- ddl/util/util.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/ddl/util/util.go b/ddl/util/util.go index 8f9dc337b6449..00c0f0e0aa021 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -14,6 +14,8 @@ package util import ( + "strings" + "context" "encoding/hex" "github.com/pingcap/errors" @@ -31,7 +33,7 @@ const ( loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.%n WHERE ts < %?` recordDoneDeletedRangeSQL = `INSERT IGNORE INTO mysql.gc_delete_range_done SELECT * FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` - completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id in (%?)` + completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id in (` // + idList + ")" updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = %? WHERE job_id = %? AND element_id = %? AND start_key = %?` deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %? AND element_id = %?` loadGlobalVars = `SELECT HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (%?)` @@ -117,7 +119,19 @@ func RemoveFromGCDeleteRange(ctx sessionctx.Context, jobID, elementID int64) err // RemoveMultiFromGCDeleteRange is exported for ddl pkg to use. func RemoveMultiFromGCDeleteRange(ctx sessionctx.Context, jobID int64, elementIDs []int64) error { - _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), completeDeleteMultiRangesSQL, jobID, elementIDs) + var buf strings.Builder + buf.WriteString(completeDeleteMultiRangesSQL) + paramIDs := make([]interface{}, 0, 1+len(elementIDs)) + paramIDs = append(paramIDs, jobID) + for i, elementID := range elementIDs { + if i > 0 { + buf.WriteString(", ") + } + buf.WriteString("%?") + paramIDs = append(paramIDs, elementID) + } + buf.WriteString(")") + _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), buf.String(), paramIDs...) return errors.Trace(err) }