Skip to content

Commit

Permalink
*: refine pipelined dml benchmarks (#54844)
Browse files Browse the repository at this point in the history
ref #50215
  • Loading branch information
ekexium committed Jul 25, 2024
1 parent cf4bb51 commit b41ad70
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 34 deletions.
44 changes: 43 additions & 1 deletion pkg/session/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
_ "github.com/pingcap/tidb/pkg/autoid_service"
"github.com/pingcap/tidb/pkg/config"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -1949,6 +1951,8 @@ var batchNum = 100
var batchSize = 100

func BenchmarkPipelinedSimpleInsert(b *testing.B) {
require.NoError(b, failpoint.Enable("tikvclient/pipelinedSkipResolveLock", "return"))
defer require.NoError(b, failpoint.Disable("tikvclient/pipelinedSkipResolveLock"))
logutil.InitLogger(&logutil.LogConfig{Config: log.Config{Level: "fatal"}})
se, do, st := prepareBenchSession()
defer func() {
Expand Down Expand Up @@ -1977,6 +1981,8 @@ func BenchmarkPipelinedSimpleInsert(b *testing.B) {
}

func BenchmarkPipelinedInsertIgnoreNoDuplicates(b *testing.B) {
require.NoError(b, failpoint.Enable("tikvclient/pipelinedSkipResolveLock", "return"))
defer require.NoError(b, failpoint.Disable("tikvclient/pipelinedSkipResolveLock"))
logutil.InitLogger(&logutil.LogConfig{Config: log.Config{Level: "fatal"}})
se, do, st := prepareBenchSession()
defer func() {
Expand Down Expand Up @@ -2058,7 +2064,7 @@ func BenchmarkPipelinedDelete(b *testing.B) {
}

se.GetSessionVars().BulkDMLEnabled = true
se.GetSessionVars().StmtCtx.InInsertStmt = true
se.GetSessionVars().StmtCtx.InDeleteStmt = true
b.StopTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -2072,6 +2078,8 @@ func BenchmarkPipelinedDelete(b *testing.B) {
}

func BenchmarkPipelinedReplaceNoDuplicates(b *testing.B) {
require.NoError(b, failpoint.Enable("tikvclient/pipelinedSkipResolveLock", "return"))
defer require.NoError(b, failpoint.Disable("tikvclient/pipelinedSkipResolveLock"))
logutil.InitLogger(&logutil.LogConfig{Config: log.Config{Level: "fatal"}})
se, do, st := prepareBenchSession()
defer func() {
Expand Down Expand Up @@ -2099,3 +2107,37 @@ func BenchmarkPipelinedReplaceNoDuplicates(b *testing.B) {
b.StopTimer()
b.ReportMetric(float64(b.Elapsed().Nanoseconds()/int64(b.N*batchSize*batchNum)), "ns/row")
}

func BenchmarkPipelinedUpdate(b *testing.B) {
logutil.InitLogger(&logutil.LogConfig{Config: log.Config{Level: "fatal"}})
se, do, st := prepareBenchSession()
defer func() {
se.Close()
do.Close()
st.Close()
}()
mustExecute(se, `create table src (id int, dt varchar(128))`)
for i := 0; i < batchNum; i++ {
mustExecute(se, "begin")
for lines := 0; lines < batchSize; lines++ {
mustExecute(se, "insert into src values (42, repeat('x', 128))")
}
mustExecute(se, "commit")
}

se.GetSessionVars().BulkDMLEnabled = true
se.GetSessionVars().StmtCtx.InUpdateStmt = true

b.StopTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StartTimer()
if i%2 == 0 {
se.Execute(context.Background(), "update src set dt = left(concat('y', dt), 128)")
} else {
se.Execute(context.Background(), "update src set dt = left(concat('z', dt), 128)")
}
b.StopTimer()
}
b.ReportMetric(float64(b.Elapsed().Nanoseconds()/int64(b.N*batchSize*batchNum)), "ns/row")
}
61 changes: 28 additions & 33 deletions pkg/table/tables/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/stretchr/testify/require"
)

const batchSize = 10000
const batchSize = 5000

func BenchmarkAddRecordInPipelinedDML(b *testing.B) {
logutil.InitLogger(&logutil.LogConfig{Config: log.Config{Level: "fatal"}})
Expand Down Expand Up @@ -94,7 +94,7 @@ func BenchmarkRemoveRecordInPipelinedDML(b *testing.B) {
// Create the table
_, err := tk.Session().Execute(
context.Background(),
"CREATE TABLE IF NOT EXISTS test.t (a int primary key auto_increment, b varchar(255))",
"CREATE TABLE IF NOT EXISTS test.t (a int primary key clustered, b varchar(255))",
)
require.NoError(b, err)
tb, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
Expand All @@ -108,39 +108,37 @@ func BenchmarkRemoveRecordInPipelinedDML(b *testing.B) {
records[j] = types.MakeDatums(j, "test")
}

// Add initial records
se := tk.Session()
for j := 0; j < batchSize; j++ {
tk.MustExec("INSERT INTO test.t VALUES (?, ?)", j, "test")
}

b.StopTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Reset environment for each batch
b.StopTimer()

ctx := tk.Session()
vars := ctx.GetSessionVars()
vars := se.GetSessionVars()
vars.BulkDMLEnabled = true
vars.TxnCtx.EnableMDL = true
vars.StmtCtx.InUpdateStmt = true
require.Nil(b, sessiontxn.NewTxn(context.Background(), tk.Session()))
txn, _ := ctx.Txn(true)
txn, _ := se.Txn(true)
require.True(b, txn.IsPipelined())

// Add initial records
for j := 0; j < batchSize; j++ {
_, err := tb.AddRecord(ctx.GetTableCtx(), records[j])
require.NoError(b, err)
}

b.StartTimer()
for j := 0; j < batchSize; j++ {
// Remove record
handle := kv.IntHandle(j)
err := tb.RemoveRecord(ctx.GetTableCtx(), handle, records[j])
err := tb.RemoveRecord(se.GetTableCtx(), handle, records[j])
if err != nil {
b.Fatal(err)
}
}
b.StopTimer()

// Rollback the transaction to avoid interference between batches
ctx.RollbackTxn(context.Background())
se.RollbackTxn(context.Background())
require.NoError(b, err)
}

Expand All @@ -157,7 +155,7 @@ func BenchmarkUpdateRecordInPipelinedDML(b *testing.B) {
// Create the table
_, err := tk.Session().Execute(
context.Background(),
"CREATE TABLE IF NOT EXISTS test.t (a int primary key auto_increment, b varchar(255))",
"CREATE TABLE IF NOT EXISTS test.t (a int primary key clustered, b varchar(255))",
)
require.NoError(b, err)
tb, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
Expand All @@ -175,43 +173,40 @@ func BenchmarkUpdateRecordInPipelinedDML(b *testing.B) {
newData[j] = types.MakeDatums(j, "updated")
}

// Add initial records
se := tk.Session()
for j := 0; j < batchSize; j++ {
tk.MustExec("INSERT INTO test.t VALUES (?, ?)", j, "test")
}

touched := make([]bool, len(tb.Meta().Columns))
touched[1] = true

b.StopTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Reset environment for each batch
b.StopTimer()

ctx := tk.Session()
vars := ctx.GetSessionVars()
vars := se.GetSessionVars()
vars.BulkDMLEnabled = true
vars.TxnCtx.EnableMDL = true
vars.StmtCtx.InUpdateStmt = true
require.Nil(b, sessiontxn.NewTxn(context.Background(), tk.Session()))

txn, _ := ctx.Txn(true)
txn, _ := se.Txn(true)
require.True(b, txn.IsPipelined())

// Add initial records
for j := 0; j < batchSize; j++ {
_, err := tb.AddRecord(ctx.GetTableCtx(), records[j])
require.NoError(b, err)
}

touched := make([]bool, len(tb.Meta().Columns))
touched[1] = true

b.StartTimer()
for j := 0; j < batchSize; j++ {
// Update record
handle := kv.IntHandle(j)
err := tb.UpdateRecord(context.TODO(), ctx.GetTableCtx(), handle, records[j], newData[j], touched)
err := tb.UpdateRecord(context.TODO(), se.GetTableCtx(), handle, records[j], newData[j], touched)
if err != nil {
b.Fatal(err)
}
}
b.StopTimer()

// Rollback the transaction to avoid interference between batches
ctx.RollbackTxn(context.Background())
se.RollbackTxn(context.Background())
require.NoError(b, err)
}

Expand Down

0 comments on commit b41ad70

Please sign in to comment.