Skip to content

Commit

Permalink
session: refine non-transactional delete (#34273)
Browse files Browse the repository at this point in the history
ref #33485
  • Loading branch information
ekexium authored May 9, 2022
1 parent 1f4fd07 commit 6c30303
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 58 deletions.
2 changes: 1 addition & 1 deletion executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

switch stmt.(type) {
case *ast.LoadDataStmt, *ast.PrepareStmt, *ast.ExecuteStmt, *ast.DeallocateStmt:
case *ast.LoadDataStmt, *ast.PrepareStmt, *ast.ExecuteStmt, *ast.DeallocateStmt, *ast.NonTransactionalDeleteStmt:
return ErrUnsupportedPs
}

Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func RegisterMetrics() {
prometheus.MustRegister(CPUProfileCounter)
prometheus.MustRegister(ReadFromTableCacheCounter)
prometheus.MustRegister(LoadTableCacheDurationHistogram)
prometheus.MustRegister(NonTransactionalDeleteCount)

tikvmetrics.InitMetrics(TiDB, TiKVClient)
tikvmetrics.RegisterMetrics()
Expand Down
8 changes: 8 additions & 0 deletions metrics/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ var (
Name: "validate_read_ts_from_pd_count",
Help: "Counter of validating read ts by getting a timestamp from PD",
})

NonTransactionalDeleteCount = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "session",
Name: "non_transactional_delete_count",
Help: "Counter of non-transactional delete",
})
)

// Label constants.
Expand Down
19 changes: 19 additions & 0 deletions metrics/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,22 @@ func GetCTECounter() CTEUsageCounter {
NonCTEUsed: readCounter(TelemetrySQLCTECnt.With(prometheus.Labels{LblCTEType: "notCTE"})),
}
}

// NonTransactionalStmtCounter records the usages of non-transactional statements.
type NonTransactionalStmtCounter struct {
DeleteCount int64 `json:"delete"`
}

// Sub returns the difference of two counters.
func (n NonTransactionalStmtCounter) Sub(rhs NonTransactionalStmtCounter) NonTransactionalStmtCounter {
return NonTransactionalStmtCounter{
DeleteCount: n.DeleteCount - rhs.DeleteCount,
}
}

// GetNonTransactionalStmtCounter gets the NonTransactionalStmtCounter.
func GetNonTransactionalStmtCounter() NonTransactionalStmtCounter {
return NonTransactionalStmtCounter{
DeleteCount: readCounter(NonTransactionalDeleteCount),
}
}
95 changes: 64 additions & 31 deletions session/nontransactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/format"
"github.com/pingcap/tidb/parser/model"
Expand All @@ -35,6 +38,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)
Expand All @@ -45,7 +49,7 @@ type job struct {
end types.Datum
err error
jobID int
jobSize int
jobSize int // it can be inaccurate if there are concurrent writes
sql string
}

Expand All @@ -57,8 +61,11 @@ type statementBuildInfo struct {
originalCondition ast.ExprNode
}

func (j job) String() string {
return fmt.Sprintf("job id: %d, job size: %d, range: [%s, %s]", j.jobID, j.jobSize, j.start.String(), j.end.String())
func (j job) String(redacted bool) string {
if redacted {
return fmt.Sprintf("job id: %d, estimated size: %d", j.jobID, j.jobSize)
}
return fmt.Sprintf("job id: %d, estimated size: %d, sql: %s", j.jobID, j.jobSize, j.sql)
}

// HandleNonTransactionalDelete is the entry point for a non-transactional delete
Expand All @@ -70,14 +77,21 @@ func HandleNonTransactionalDelete(ctx context.Context, stmt *ast.NonTransactiona
if err := checkConstraint(ctx, stmt, se); err != nil {
return nil, err
}
metrics.NonTransactionalDeleteCount.Inc()
tableName, selectSQL, shardColumnInfo, err := buildSelectSQL(stmt, se)
if err != nil {
return nil, err
}
if stmt.DryRun == ast.DryRunQuery {
return buildDryRunResults(stmt.DryRun, []string{selectSQL}, se.GetSessionVars().BatchSize.MaxChunkSize)
}
jobs, err := buildShardJobs(ctx, stmt, se, selectSQL, shardColumnInfo)

// TODO: choose an appropriate quota.
// Use the mem-quota-query as a workaround. As a result, a NT-DML may consume 2x of the memory quota.
memTracker := memory.NewTracker(memory.LabelForNonTransactionalDML, se.GetSessionVars().MemQuotaQuery)
memTracker.AttachToGlobalTracker(executor.GlobalMemoryUsageTracker)
defer memTracker.DetachFromGlobalTracker()
jobs, err := buildShardJobs(ctx, stmt, se, selectSQL, shardColumnInfo, memTracker)
if err != nil {
return nil, err
}
Expand All @@ -89,7 +103,7 @@ func HandleNonTransactionalDelete(ctx context.Context, stmt *ast.NonTransactiona
if stmt.DryRun == ast.DryRunSplitDml {
return buildDryRunResults(stmt.DryRun, splitStmts, se.GetSessionVars().BatchSize.MaxChunkSize)
}
return buildExecuteResults(jobs, se.GetSessionVars().BatchSize.MaxChunkSize)
return buildExecuteResults(ctx, jobs, se.GetSessionVars().BatchSize.MaxChunkSize, se.GetSessionVars().EnableRedactLog)
}

func checkConstraint(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, se Session) error {
Expand All @@ -108,13 +122,19 @@ func checkConstraint(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt,
if sessVars.SnapshotTS != 0 {
return errors.New("can't do non-transactional DML when tidb_snapshot is set")
}
// TODO: return error if there are multiple tables
if stmt.DeleteStmt.TableRefs == nil || stmt.DeleteStmt.TableRefs.TableRefs == nil {

if stmt.DeleteStmt.TableRefs == nil || stmt.DeleteStmt.TableRefs.TableRefs == nil || stmt.DeleteStmt.TableRefs.TableRefs.Left == nil {
return errors.New("table reference is nil")
}
if stmt.DeleteStmt.TableRefs.TableRefs.Right != nil {
return errors.New("Non-transactional delete doesn't support multiple tables")
}
if stmt.DeleteStmt.Limit != nil {
return errors.New("Non-transactional delete doesn't support limit")
}
if stmt.DeleteStmt.Order != nil {
return errors.New("Non-transactional delete doesn't support order by")
}
return nil
}

Expand Down Expand Up @@ -145,7 +165,7 @@ func splitDeleteWorker(ctx context.Context, jobs []job, stmt *ast.NonTransaction
failedJobs := make([]string, 0)
for _, job := range jobs {
if job.err != nil {
failedJobs = append(failedJobs, fmt.Sprintf("job:%s, error: %s", job.String(), job.err.Error()))
failedJobs = append(failedJobs, fmt.Sprintf("job:%s, error: %s", job.String(se.GetSessionVars().EnableRedactLog), job.err.Error()))
}
}
if len(failedJobs) == 0 {
Expand Down Expand Up @@ -177,17 +197,13 @@ func splitDeleteWorker(ctx context.Context, jobs []job, stmt *ast.NonTransaction

// if the first job failed, there is a large chance that all jobs will fail. So return early.
if i == 0 && jobs[i].err != nil {
jobs[i].err = errors.Wrap(jobs[i].err, "Early return: error occurred in the first job. All jobs are canceled")
logutil.Logger(ctx).Error("Non-transactional delete, early return", zap.Error(jobs[i].err))
break
return nil, errors.Annotate(jobs[i].err, "Early return: error occurred in the first job. All jobs are canceled")
}
}
return splitStmts, nil
}

func doOneJob(ctx context.Context, job *job, totalJobCount int, options statementBuildInfo, se Session, dryRun bool) string {
logutil.Logger(ctx).Info("start a Non-transactional delete", zap.String("job", job.String()), zap.Int("totalJobCount", totalJobCount))

var whereCondition ast.ExprNode

if job.start.IsNull() {
Expand Down Expand Up @@ -255,7 +271,7 @@ func doOneJob(ctx context.Context, job *job, totalJobCount int, options statemen
format.RestoreBracketAroundBinaryOperation|
format.RestoreStringWithoutCharset, &sb))
if err != nil {
job.err = err
job.err = errors.Annotate(err, "Failed to restore delete statement")
return ""
}
deleteSQL := sb.String()
Expand All @@ -265,6 +281,15 @@ func doOneJob(ctx context.Context, job *job, totalJobCount int, options statemen
}

job.sql = deleteSQL
logutil.Logger(ctx).Info("start a Non-transactional delete",
zap.String("job", job.String(se.GetSessionVars().EnableRedactLog)), zap.Int("totalJobCount", totalJobCount))
var deleteSQLInLog string
if se.GetSessionVars().EnableRedactLog {
deleteSQLInLog = parser.Normalize(deleteSQL)
} else {
deleteSQLInLog = deleteSQL
}

options.stmt.DeleteStmt.SetText(nil, fmt.Sprintf("/* job %v/%v */ %s", job.jobID, totalJobCount, deleteSQL))
rs, err := se.ExecuteStmt(ctx, options.stmt.DeleteStmt)

Expand All @@ -273,13 +298,11 @@ func doOneJob(ctx context.Context, job *job, totalJobCount int, options statemen
err = errors.New("injected split delete error")
})
if err != nil {
errStr := fmt.Sprintf("Non-transactional delete SQL failed, sql: %s, error: %s, jobID: %d, jobSize: %d. ",
deleteSQL, err.Error(), job.jobID, job.jobSize)
logutil.Logger(ctx).Error(errStr)
logutil.Logger(ctx).Error("Non-transactional delete SQL failed", zap.String("job", deleteSQLInLog), zap.Error(err), zap.Int("jobID", job.jobID), zap.Int("jobSize", job.jobSize))
job.err = err
} else {
logutil.Logger(ctx).Info("Non-transactional delete SQL finished successfully", zap.Int("jobID", job.jobID),
zap.Int("jobSize", job.jobSize), zap.String("deleteSQL", deleteSQL))
zap.Int("jobSize", job.jobSize), zap.String("deleteSQL", deleteSQLInLog))
}
if rs != nil {
rs.Close()
Expand All @@ -288,8 +311,7 @@ func doOneJob(ctx context.Context, job *job, totalJobCount int, options statemen
}

func buildShardJobs(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, se Session,
selectSQL string, shardColumnInfo *model.ColumnInfo) ([]job, error) {
logutil.Logger(ctx).Info("Non-transactional delete, select SQL", zap.String("selectSQL", selectSQL))
selectSQL string, shardColumnInfo *model.ColumnInfo, memTracker *memory.Tracker) ([]job, error) {
var shardColumnCollate string
if shardColumnInfo != nil {
shardColumnCollate = shardColumnInfo.GetCollate()
Expand Down Expand Up @@ -336,7 +358,7 @@ func buildShardJobs(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, s
if chk.NumRows() == 0 {
if currentSize > 0 {
// there's remaining work
jobs = append(jobs, job{jobID: jobCount, start: currentStart, end: currentEnd, jobSize: currentSize})
jobs = appendNewJob(jobs, jobCount+1, currentStart, currentEnd, currentSize, memTracker)
}
break
}
Expand All @@ -362,8 +384,8 @@ func buildShardJobs(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, s
return nil, err
}
if cmp != 0 {
jobs = append(jobs, job{jobID: jobCount, start: *currentStart.Clone(), end: *currentEnd.Clone(), jobSize: currentSize})
jobCount++
jobs = appendNewJob(jobs, jobCount, *currentStart.Clone(), *currentEnd.Clone(), currentSize, memTracker)
currentSize = 0
currentStart = newEnd
}
Expand All @@ -378,6 +400,12 @@ func buildShardJobs(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, s
return jobs, nil
}

func appendNewJob(jobs []job, id int, start types.Datum, end types.Datum, size int, tracker *memory.Tracker) []job {
jobs = append(jobs, job{jobID: id, start: start, end: end, jobSize: size})
tracker.Consume(start.EstimatedMemUsage() + end.EstimatedMemUsage() + 64)
return jobs
}

func buildSelectSQL(stmt *ast.NonTransactionalDeleteStmt, se Session) (*ast.TableName, string, *model.ColumnInfo, error) {
// only use the first table
tableSource, ok := stmt.DeleteStmt.TableRefs.TableRefs.Left.(*ast.TableSource)
Expand All @@ -390,7 +418,7 @@ func buildSelectSQL(stmt *ast.NonTransactionalDeleteStmt, se Session) (*ast.Tabl
}

// the shard column must be indexed
indexed, shardColumnInfo, err := selectShardColumn(stmt, se, tableName)
indexed, shardColumnInfo, err := selectShardColumn(stmt, se, tableName, tableSource.AsName)
if err != nil {
return nil, "", nil, err
}
Expand All @@ -406,7 +434,7 @@ func buildSelectSQL(stmt *ast.NonTransactionalDeleteStmt, se Session) (*ast.Tabl
format.RestoreBracketAroundBinaryOperation|
format.RestoreStringWithoutCharset, &sb))
if err != nil {
return nil, "", nil, errors.Trace(err)
return nil, "", nil, errors.Annotate(err, "Failed to restore where clause in non-transactional delete")
}
} else {
sb.WriteString("TRUE")
Expand All @@ -419,7 +447,7 @@ func buildSelectSQL(stmt *ast.NonTransactionalDeleteStmt, se Session) (*ast.Tabl

// it attempts to auto-select a shard column from handle if not specified, and fills back the corresponding info in the stmt,
// making it transparent to following steps
func selectShardColumn(stmt *ast.NonTransactionalDeleteStmt, se Session, tableName *ast.TableName) (indexed bool, shardColumnInfo *model.ColumnInfo, err error) {
func selectShardColumn(stmt *ast.NonTransactionalDeleteStmt, se Session, tableName *ast.TableName, tableAsName model.CIStr) (indexed bool, shardColumnInfo *model.ColumnInfo, err error) {
tbl, err := domain.GetDomain(se).InfoSchema().TableByName(tableName.Schema, tableName.Name)
if err != nil {
return false, nil, err
Expand Down Expand Up @@ -453,7 +481,7 @@ func selectShardColumn(stmt *ast.NonTransactionalDeleteStmt, se Session, tableNa
}
stmt.ShardColumn = &ast.ColumnName{
Schema: tableName.Schema,
Table: tableName.Name,
Table: tableAsName, // so that table alias works
Name: model.NewCIStr(shardColumnName),
}
return true, shardColumnInfo, nil
Expand Down Expand Up @@ -519,7 +547,7 @@ func buildDryRunResults(dryRunOption int, results []string, maxChunkSize int) (s
}, nil
}

func buildExecuteResults(jobs []job, maxChunkSize int) (sqlexec.RecordSet, error) {
func buildExecuteResults(ctx context.Context, jobs []job, maxChunkSize int, redactLog bool) (sqlexec.RecordSet, error) {
failedJobs := make([]job, 0)
for _, job := range jobs {
if job.err != nil {
Expand Down Expand Up @@ -574,14 +602,19 @@ func buildExecuteResults(jobs []job, maxChunkSize int) (sqlexec.RecordSet, error
}

rows := make([][]interface{}, 0, len(failedJobs))
var sb strings.Builder
for _, job := range failedJobs {
row := make([]interface{}, 3)
row[0] = job.String()
row[1] = job.sql
row[2] = job.err.Error()
row := make([]interface{}, 2)
row[0] = job.String(false)
row[1] = job.err.Error()
rows = append(rows, row)
sb.WriteString(fmt.Sprintf("%s, %s;\n", job.String(redactLog), job.err.Error()))
}

// log errors here in case the output is too long. There can be thousands of errors.
logutil.Logger(ctx).Warn("Non-transactional delete failed",
zap.Int("num_failed_jobs", len(failedJobs)), zap.String("failed_jobs", sb.String()))

return &sqlexec.SimpleRecordSet{
ResultFields: resultFields,
Rows: rows,
Expand Down
Loading

0 comments on commit 6c30303

Please sign in to comment.