Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: move binlog logic to txn_driver #23873

Merged
merged 10 commits into from
Apr 12, 2021
4 changes: 2 additions & 2 deletions sessionctx/binloginfo/binloginfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,9 @@ func mutationRowsToRows(c *C, mutationRows [][]byte, columnValueOffsets ...int)
}

func (s *testBinlogSuite) TestBinlogForSequence(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockSyncBinlogCommit", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/driver/txn/mockSyncBinlogCommit", `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockSyncBinlogCommit"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/driver/txn/mockSyncBinlogCommit"), IsNil)
}()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
89 changes: 89 additions & 0 deletions store/driver/txn/binlog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package txn

import (
"context"
"sync"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tipb/go-binlog"
"go.uber.org/zap"
)

type binlogExecutor struct {
txn *tikv.KVTxn
binInfo *binloginfo.BinlogInfo
}

func (e *binlogExecutor) Skip() {
binloginfo.RemoveOneSkippedCommitter()
}

func (e *binlogExecutor) Prewrite(ctx context.Context, primary []byte) <-chan tikv.BinlogWriteResult {
ch := make(chan tikv.BinlogWriteResult, 1)
go func() {
logutil.Eventf(ctx, "start prewrite binlog")
bin := e.binInfo.Data
bin.StartTs = int64(e.txn.StartTS())
if bin.Tp == binlog.BinlogType_Prewrite {
bin.PrewriteKey = primary
}
wr := e.binInfo.WriteBinlog(e.txn.GetClusterID())
if wr.Skipped() {
e.binInfo.Data.PrewriteValue = nil
binloginfo.AddOneSkippedCommitter()
}
logutil.Eventf(ctx, "finish prewrite binlog")
ch <- wr
}()
return ch
}

func (e *binlogExecutor) Commit(ctx context.Context, commitTS int64) {
e.binInfo.Data.Tp = binlog.BinlogType_Commit
if commitTS == 0 {
e.binInfo.Data.Tp = binlog.BinlogType_Rollback
}
e.binInfo.Data.CommitTs = commitTS
e.binInfo.Data.PrewriteValue = nil

wg := sync.WaitGroup{}
mock := false
failpoint.Inject("mockSyncBinlogCommit", func(val failpoint.Value) {
if val.(bool) {
wg.Add(1)
mock = true
}
})
go func() {
logutil.Eventf(ctx, "start write finish binlog")
binlogWriteResult := e.binInfo.WriteBinlog(e.txn.GetClusterID())
err := binlogWriteResult.GetError()
if err != nil {
logutil.BgLogger().Error("failed to write binlog",
zap.Error(err))
}
logutil.Eventf(ctx, "finish write finish binlog")
if mock {
wg.Done()
}
}()
if mock {
wg.Wait()
}
}
13 changes: 13 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/unionstore"
Expand Down Expand Up @@ -67,6 +68,18 @@ func (txn *tikvTxn) GetUnionStore() kv.UnionStore {
return &tikvUnionStore{txn.KVTxn.GetUnionStore()}
}

func (txn *tikvTxn) SetOption(opt int, val interface{}) {
switch opt {
case tikvstore.BinlogInfo:
txn.SetBinlogExecutor(&binlogExecutor{
txn: txn.KVTxn,
binInfo: val.(*binloginfo.BinlogInfo), // val cannot be other type.
})
default:
txn.KVTxn.SetOption(opt, val)
}
}

func (txn *tikvTxn) extractKeyErr(err error) error {
if e, ok := errors.Cause(err).(*tikvstore.ErrKeyExist); ok {
return txn.extractKeyExistsErr(e.GetKey())
Expand Down
6 changes: 2 additions & 4 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,7 @@ func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, err
ch: make(chan struct{}),
},
isPessimistic: txn.IsPessimistic(),
binlog: &binlogExecutor{
txn: txn,
},
binlog: txn.binlog,
}, nil
}

Expand Down Expand Up @@ -1455,7 +1453,7 @@ func (c *twoPhaseCommitter) calculateMaxCommitTS(ctx context.Context) error {
}

func (c *twoPhaseCommitter) shouldWriteBinlog() bool {
return c.txn.us.GetOption(kv.BinlogInfo) != nil
return c.binlog != nil
}

// TiKV recommends each RPC packet should be less than ~1MB. We keep each packet's
Expand Down
72 changes: 0 additions & 72 deletions store/tikv/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@ package tikv

import (
"context"
"sync"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tipb/go-binlog"
zap "go.uber.org/zap"
)

// BinlogExecutor defines the logic to replicate binlogs during transaction commit.
Expand All @@ -37,67 +29,3 @@ type BinlogWriteResult interface {
Skipped() bool
GetError() error
}

type binlogExecutor struct {
txn *KVTxn
}

func (e *binlogExecutor) Skip() {
binloginfo.RemoveOneSkippedCommitter()
}

func (e *binlogExecutor) Prewrite(ctx context.Context, primary []byte) <-chan BinlogWriteResult {
ch := make(chan BinlogWriteResult, 1)
go func() {
logutil.Eventf(ctx, "start prewrite binlog")
binInfo := e.txn.us.GetOption(kv.BinlogInfo).(*binloginfo.BinlogInfo)
bin := binInfo.Data
bin.StartTs = int64(e.txn.startTS)
if bin.Tp == binlog.BinlogType_Prewrite {
bin.PrewriteKey = primary
}
wr := binInfo.WriteBinlog(e.txn.store.clusterID)
if wr.Skipped() {
binInfo.Data.PrewriteValue = nil
binloginfo.AddOneSkippedCommitter()
}
logutil.Eventf(ctx, "finish prewrite binlog")
ch <- wr
}()
return ch
}

func (e *binlogExecutor) Commit(ctx context.Context, commitTS int64) {
binInfo := e.txn.us.GetOption(kv.BinlogInfo).(*binloginfo.BinlogInfo)
binInfo.Data.Tp = binlog.BinlogType_Commit
if commitTS == 0 {
binInfo.Data.Tp = binlog.BinlogType_Rollback
}
binInfo.Data.CommitTs = commitTS
binInfo.Data.PrewriteValue = nil

wg := sync.WaitGroup{}
mock := false
failpoint.Inject("mockSyncBinlogCommit", func(val failpoint.Value) {
if val.(bool) {
wg.Add(1)
mock = true
}
})
go func() {
logutil.Eventf(ctx, "start write finish binlog")
binlogWriteResult := binInfo.WriteBinlog(e.txn.store.clusterID)
err := binlogWriteResult.GetError()
if err != nil {
logutil.BgLogger().Error("failed to write binlog",
zap.Error(err))
}
logutil.Eventf(ctx, "finish write finish binlog")
if mock {
wg.Done()
}
}()
if mock {
wg.Wait()
}
}
12 changes: 12 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type KVTxn struct {
schemaAmender SchemaAmender
// commitCallback is called after current transaction gets committed
commitCallback func(info tidbkv.TxnInfo, err error)

binlog BinlogExecutor
}

func newTiKVTxn(store *KVStore, txnScope string) (*KVTxn, error) {
Expand Down Expand Up @@ -637,3 +639,13 @@ func (txn *KVTxn) GetMemBuffer() *unionstore.MemDB {
func (txn *KVTxn) GetSnapshot() *KVSnapshot {
return txn.snapshot
}

// SetBinlogExecutor sets the method to perform binlong synchronization.
func (txn *KVTxn) SetBinlogExecutor(binlog BinlogExecutor) {
txn.binlog = binlog
}

// GetClusterID returns store's cluster id.
func (txn *KVTxn) GetClusterID() uint64 {
return txn.store.clusterID
}