From 458d29f58e41b17baf5d2ce72347aacd7085031f Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 6 Apr 2021 20:43:53 +0800 Subject: [PATCH 1/4] store/tikv: move binlog logic to txn_driver Signed-off-by: disksing --- store/driver/txn/binlog.go | 91 ++++++++++++++++++++++++++++++++++ store/driver/txn/txn_driver.go | 1 + store/tikv/2pc.go | 6 +-- store/tikv/binlog.go | 72 --------------------------- store/tikv/txn.go | 12 +++++ 5 files changed, 106 insertions(+), 76 deletions(-) create mode 100644 store/driver/txn/binlog.go diff --git a/store/driver/txn/binlog.go b/store/driver/txn/binlog.go new file mode 100644 index 0000000000000..67fbaf9b1ea7a --- /dev/null +++ b/store/driver/txn/binlog.go @@ -0,0 +1,91 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txn + +import ( + "context" + "sync" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/sessionctx/binloginfo" + "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/kv" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tipb/go-binlog" + "go.uber.org/zap" +) + +type binlogExecutor struct { + txn *tikv.KVTxn +} + +func (e *binlogExecutor) Skip() { + binloginfo.RemoveOneSkippedCommitter() +} + +func (e *binlogExecutor) Prewrite(ctx context.Context, primary []byte) <-chan tikv.BinlogWriteResult { + ch := make(chan tikv.BinlogWriteResult, 1) + go func() { + logutil.Eventf(ctx, "start prewrite binlog") + binInfo := e.txn.GetUnionStore().GetOption(kv.BinlogInfo).(*binloginfo.BinlogInfo) + bin := binInfo.Data + bin.StartTs = int64(e.txn.StartTS()) + if bin.Tp == binlog.BinlogType_Prewrite { + bin.PrewriteKey = primary + } + wr := binInfo.WriteBinlog(e.txn.GetClusterID()) + if wr.Skipped() { + binInfo.Data.PrewriteValue = nil + binloginfo.AddOneSkippedCommitter() + } + logutil.Eventf(ctx, "finish prewrite binlog") + ch <- wr + }() + return ch +} + +func (e *binlogExecutor) Commit(ctx context.Context, commitTS int64) { + binInfo := e.txn.GetUnionStore().GetOption(kv.BinlogInfo).(*binloginfo.BinlogInfo) + binInfo.Data.Tp = binlog.BinlogType_Commit + if commitTS == 0 { + binInfo.Data.Tp = binlog.BinlogType_Rollback + } + binInfo.Data.CommitTs = commitTS + binInfo.Data.PrewriteValue = nil + + wg := sync.WaitGroup{} + mock := false + failpoint.Inject("mockSyncBinlogCommit", func(val failpoint.Value) { + if val.(bool) { + wg.Add(1) + mock = true + } + }) + go func() { + logutil.Eventf(ctx, "start write finish binlog") + binlogWriteResult := binInfo.WriteBinlog(e.txn.GetClusterID()) + err := binlogWriteResult.GetError() + if err != nil { + logutil.BgLogger().Error("failed to write binlog", + zap.Error(err)) + } + logutil.Eventf(ctx, "finish write finish binlog") + if mock { + wg.Done() + } + }() + if mock { + wg.Wait() + } +} diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 5e2b7de1883c2..3d1bab4b8e05e 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -40,6 +40,7 @@ type tikvTxn struct { // NewTiKVTxn returns a new Transaction. func NewTiKVTxn(txn *tikv.KVTxn) kv.Transaction { + txn.SetBinlogExecutor(&binlogExecutor{txn: txn}) return &tikvTxn{txn, make(map[int64]*model.TableInfo)} } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 960dca0c60fa8..b797985727ec4 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -304,9 +304,7 @@ func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, err ch: make(chan struct{}), }, isPessimistic: txn.IsPessimistic(), - binlog: &binlogExecutor{ - txn: txn, - }, + binlog: txn.binlog, }, nil } @@ -1456,7 +1454,7 @@ func (c *twoPhaseCommitter) calculateMaxCommitTS(ctx context.Context) error { } func (c *twoPhaseCommitter) shouldWriteBinlog() bool { - return c.txn.us.GetOption(kv.BinlogInfo) != nil + return c.txn.us.GetOption(kv.BinlogInfo) != nil && c.binlog != nil } // TiKV recommends each RPC packet should be less than ~1MB. We keep each packet's diff --git a/store/tikv/binlog.go b/store/tikv/binlog.go index c30fa27cc0216..7f5616999e5d3 100644 --- a/store/tikv/binlog.go +++ b/store/tikv/binlog.go @@ -15,14 +15,6 @@ package tikv import ( "context" - "sync" - - "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/sessionctx/binloginfo" - "github.com/pingcap/tidb/store/tikv/kv" - "github.com/pingcap/tidb/store/tikv/logutil" - "github.com/pingcap/tipb/go-binlog" - zap "go.uber.org/zap" ) // BinlogExecutor defines the logic to replicate binlogs during transaction commit. @@ -37,67 +29,3 @@ type BinlogWriteResult interface { Skipped() bool GetError() error } - -type binlogExecutor struct { - txn *KVTxn -} - -func (e *binlogExecutor) Skip() { - binloginfo.RemoveOneSkippedCommitter() -} - -func (e *binlogExecutor) Prewrite(ctx context.Context, primary []byte) <-chan BinlogWriteResult { - ch := make(chan BinlogWriteResult, 1) - go func() { - logutil.Eventf(ctx, "start prewrite binlog") - binInfo := e.txn.us.GetOption(kv.BinlogInfo).(*binloginfo.BinlogInfo) - bin := binInfo.Data - bin.StartTs = int64(e.txn.startTS) - if bin.Tp == binlog.BinlogType_Prewrite { - bin.PrewriteKey = primary - } - wr := binInfo.WriteBinlog(e.txn.store.clusterID) - if wr.Skipped() { - binInfo.Data.PrewriteValue = nil - binloginfo.AddOneSkippedCommitter() - } - logutil.Eventf(ctx, "finish prewrite binlog") - ch <- wr - }() - return ch -} - -func (e *binlogExecutor) Commit(ctx context.Context, commitTS int64) { - binInfo := e.txn.us.GetOption(kv.BinlogInfo).(*binloginfo.BinlogInfo) - binInfo.Data.Tp = binlog.BinlogType_Commit - if commitTS == 0 { - binInfo.Data.Tp = binlog.BinlogType_Rollback - } - binInfo.Data.CommitTs = commitTS - binInfo.Data.PrewriteValue = nil - - wg := sync.WaitGroup{} - mock := false - failpoint.Inject("mockSyncBinlogCommit", func(val failpoint.Value) { - if val.(bool) { - wg.Add(1) - mock = true - } - }) - go func() { - logutil.Eventf(ctx, "start write finish binlog") - binlogWriteResult := binInfo.WriteBinlog(e.txn.store.clusterID) - err := binlogWriteResult.GetError() - if err != nil { - logutil.BgLogger().Error("failed to write binlog", - zap.Error(err)) - } - logutil.Eventf(ctx, "finish write finish binlog") - if mock { - wg.Done() - } - }() - if mock { - wg.Wait() - } -} diff --git a/store/tikv/txn.go b/store/tikv/txn.go index d05a05ca346b3..79c02227ecaba 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -69,6 +69,8 @@ type KVTxn struct { schemaAmender SchemaAmender // commitCallback is called after current transaction gets committed commitCallback func(info tidbkv.TxnInfo, err error) + + binlog BinlogExecutor } func newTiKVTxn(store *KVStore, txnScope string) (*KVTxn, error) { @@ -634,3 +636,13 @@ func (txn *KVTxn) GetMemBuffer() *unionstore.MemDB { func (txn *KVTxn) GetSnapshot() *KVSnapshot { return txn.snapshot } + +// SetBindlogExecutor sets the method to perform binlong synchronization. +func (txn *KVTxn) SetBinlogExecutor(binlog BinlogExecutor) { + txn.binlog = binlog +} + +// GetClusterID returns store's cluster id. +func (txn *KVTxn) GetClusterID() uint64 { + return txn.store.clusterID +} From 4123e3fc43c0f39ef4e4f2b0541b8f4487324f03 Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 6 Apr 2021 20:56:01 +0800 Subject: [PATCH 2/4] fix typo Signed-off-by: disksing --- store/tikv/txn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 79c02227ecaba..7efdd011db4d3 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -637,7 +637,7 @@ func (txn *KVTxn) GetSnapshot() *KVSnapshot { return txn.snapshot } -// SetBindlogExecutor sets the method to perform binlong synchronization. +// SetBinlogExecutor sets the method to perform binlong synchronization. func (txn *KVTxn) SetBinlogExecutor(binlog BinlogExecutor) { txn.binlog = binlog } From 7fe7138788a1e3c7232f913ccd52c64df6be3aef Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 7 Apr 2021 09:04:52 +0800 Subject: [PATCH 3/4] update failpoint Signed-off-by: disksing --- sessionctx/binloginfo/binloginfo_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sessionctx/binloginfo/binloginfo_test.go b/sessionctx/binloginfo/binloginfo_test.go index c059288228ba2..78cb30ada9a9c 100644 --- a/sessionctx/binloginfo/binloginfo_test.go +++ b/sessionctx/binloginfo/binloginfo_test.go @@ -416,9 +416,9 @@ func mutationRowsToRows(c *C, mutationRows [][]byte, columnValueOffsets ...int) } func (s *testBinlogSuite) TestBinlogForSequence(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockSyncBinlogCommit", `return(true)`), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/driver/txn/mockSyncBinlogCommit", `return(true)`), IsNil) defer func() { - c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockSyncBinlogCommit"), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/driver/txn/mockSyncBinlogCommit"), IsNil) }() tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") From 9fbbe13184609092aeeac7b0ed2b83d7cf628155 Mon Sep 17 00:00:00 2001 From: disksing Date: Fri, 9 Apr 2021 22:26:23 +0800 Subject: [PATCH 4/4] address comment Signed-off-by: disksing --- store/driver/txn/binlog.go | 22 ++++++++++------------ store/driver/txn/txn_driver.go | 14 +++++++++++++- store/tikv/2pc.go | 2 +- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/store/driver/txn/binlog.go b/store/driver/txn/binlog.go index 67fbaf9b1ea7a..3459149c3fffc 100644 --- a/store/driver/txn/binlog.go +++ b/store/driver/txn/binlog.go @@ -20,14 +20,14 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/store/tikv" - "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tipb/go-binlog" "go.uber.org/zap" ) type binlogExecutor struct { - txn *tikv.KVTxn + txn *tikv.KVTxn + binInfo *binloginfo.BinlogInfo } func (e *binlogExecutor) Skip() { @@ -38,15 +38,14 @@ func (e *binlogExecutor) Prewrite(ctx context.Context, primary []byte) <-chan ti ch := make(chan tikv.BinlogWriteResult, 1) go func() { logutil.Eventf(ctx, "start prewrite binlog") - binInfo := e.txn.GetUnionStore().GetOption(kv.BinlogInfo).(*binloginfo.BinlogInfo) - bin := binInfo.Data + bin := e.binInfo.Data bin.StartTs = int64(e.txn.StartTS()) if bin.Tp == binlog.BinlogType_Prewrite { bin.PrewriteKey = primary } - wr := binInfo.WriteBinlog(e.txn.GetClusterID()) + wr := e.binInfo.WriteBinlog(e.txn.GetClusterID()) if wr.Skipped() { - binInfo.Data.PrewriteValue = nil + e.binInfo.Data.PrewriteValue = nil binloginfo.AddOneSkippedCommitter() } logutil.Eventf(ctx, "finish prewrite binlog") @@ -56,13 +55,12 @@ func (e *binlogExecutor) Prewrite(ctx context.Context, primary []byte) <-chan ti } func (e *binlogExecutor) Commit(ctx context.Context, commitTS int64) { - binInfo := e.txn.GetUnionStore().GetOption(kv.BinlogInfo).(*binloginfo.BinlogInfo) - binInfo.Data.Tp = binlog.BinlogType_Commit + e.binInfo.Data.Tp = binlog.BinlogType_Commit if commitTS == 0 { - binInfo.Data.Tp = binlog.BinlogType_Rollback + e.binInfo.Data.Tp = binlog.BinlogType_Rollback } - binInfo.Data.CommitTs = commitTS - binInfo.Data.PrewriteValue = nil + e.binInfo.Data.CommitTs = commitTS + e.binInfo.Data.PrewriteValue = nil wg := sync.WaitGroup{} mock := false @@ -74,7 +72,7 @@ func (e *binlogExecutor) Commit(ctx context.Context, commitTS int64) { }) go func() { logutil.Eventf(ctx, "start write finish binlog") - binlogWriteResult := binInfo.WriteBinlog(e.txn.GetClusterID()) + binlogWriteResult := e.binInfo.WriteBinlog(e.txn.GetClusterID()) err := binlogWriteResult.GetError() if err != nil { logutil.BgLogger().Error("failed to write binlog", diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 3d1bab4b8e05e..7ea145a09a775 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" @@ -40,7 +41,6 @@ type tikvTxn struct { // NewTiKVTxn returns a new Transaction. func NewTiKVTxn(txn *tikv.KVTxn) kv.Transaction { - txn.SetBinlogExecutor(&binlogExecutor{txn: txn}) return &tikvTxn{txn, make(map[int64]*model.TableInfo)} } @@ -76,6 +76,18 @@ func (txn *tikvTxn) GetUnionStore() kv.UnionStore { return &tikvUnionStore{txn.KVTxn.GetUnionStore()} } +func (txn *tikvTxn) SetOption(opt int, val interface{}) { + switch opt { + case tikvstore.BinlogInfo: + txn.SetBinlogExecutor(&binlogExecutor{ + txn: txn.KVTxn, + binInfo: val.(*binloginfo.BinlogInfo), // val cannot be other type. + }) + default: + txn.KVTxn.SetOption(opt, val) + } +} + func (txn *tikvTxn) extractKeyErr(err error) error { if e, ok := errors.Cause(err).(*tikvstore.ErrKeyExist); ok { return txn.extractKeyExistsErr(e.GetKey()) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index b797985727ec4..70ef8e23fb7b7 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -1454,7 +1454,7 @@ func (c *twoPhaseCommitter) calculateMaxCommitTS(ctx context.Context) error { } func (c *twoPhaseCommitter) shouldWriteBinlog() bool { - return c.txn.us.GetOption(kv.BinlogInfo) != nil && c.binlog != nil + return c.binlog != nil } // TiKV recommends each RPC packet should be less than ~1MB. We keep each packet's