Skip to content

Commit

Permalink
server: add hooks to wait for background commit goroutines (#55608)
Browse files Browse the repository at this point in the history
close #55607
  • Loading branch information
YangKeao authored Sep 30, 2024
1 parent 0bcbf8a commit cd33d79
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ const (
SizeLimits
// SessionID marks the connection id, for logging and tracing.
SessionID
// BackgroundGoroutineLifecycleHooks is the hooks to track the start and end of background goroutine
BackgroundGoroutineLifecycleHooks
)

// TxnSizeLimits is the argument type for `SizeLimits` option
Expand Down
33 changes: 33 additions & 0 deletions pkg/server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,39 @@ func TestShutDown(t *testing.T) {
}
}

func TestCommitWaitGroup(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

se, err := session.CreateSession4Test(store)
require.NoError(t, err)
tc := &TiDBContext{Session: se}

cfg := serverutil.NewTestConfig()
cfg.Port = 0
cfg.Status.StatusPort = 0
drv := NewTiDBDriver(store)
srv, err := NewServer(cfg, drv)
require.NoError(t, err)
srv.SetDomain(dom)

cc := &clientConn{server: srv}
cc.SetCtx(tc)
cc.CompareAndSwapStatus(cc.getStatus(), connStatusReading)
cc.getCtx().GetSessionVars().SetInTxn(false)
srv.clients[dom.NextConnID()] = cc

wg := cc.getCtx().GetCommitWaitGroup()
wg.Add(1)
go func() {
time.Sleep(100 * time.Millisecond)
wg.Done()
}()
begin := time.Now()
srv.DrainClients(time.Second, time.Second)
require.Greater(t, time.Since(begin), 100*time.Millisecond)
require.Less(t, time.Since(begin), time.Second)
}

type snapshotCache interface {
SnapCacheHitCount() int
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1021,13 +1021,29 @@ func (s *Server) DrainClients(drainWait time.Duration, cancelWait time.Duration)
for _, conn := range conns {
// Wait for the connections with explicit transaction or an executing auto-commit query.
if conn.getStatus() == connStatusReading && !conn.getCtx().GetSessionVars().InTxn() {
// The waitgroup is not protected by the `quitWaitingForConns`. However, the implementation
// of `client-go` will guarantee this `Wait` will return at least after killing the
// connections. We also wait for a similar `WaitGroup` on the store after killing the connections.
//
// Therefore, it'll not cause goroutine leak. Even if, it's not a big issue when the TiDB is
// going to shutdown.
//
// It should be waited for connections in all status, even if it's not in transactions and is reading
// from the client. Because it may run background commit goroutines at any time.
conn.getCtx().Session.GetCommitWaitGroup().Wait()

continue
}
select {
case <-conn.quit:
case <-quitWaitingForConns:
return
}

// Wait for the commit wait group after waiting for the `conn.quit` channel to make sure the foreground
// process has finished to avoid the situation that after waiting for the wait group, the transaction starts
// a new background goroutine and increase the wait group.
conn.getCtx().Session.GetCommitWaitGroup().Wait()
}
}()

Expand Down
8 changes: 8 additions & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ type session struct {
sandBoxMode bool

cursorTracker cursor.Tracker

// Used to wait for all async commit background jobs to finish.
commitWaitGroup sync.WaitGroup
}

var parserPool = &sync.Pool{New: func() any { return parser.New() }}
Expand Down Expand Up @@ -4604,3 +4607,8 @@ func GetDBNames(seVar *variable.SessionVars) []string {
func (s *session) GetCursorTracker() cursor.Tracker {
return s.cursorTracker
}

// GetCommitWaitGroup returns the internal `sync.WaitGroup` for async commit and secondary key lock cleanup
func (s *session) GetCommitWaitGroup() *sync.WaitGroup {
return &s.commitWaitGroup
}
3 changes: 3 additions & 0 deletions pkg/sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package sessionctx

import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -207,6 +208,8 @@ type Context interface {
NewStmtIndexUsageCollector() *indexusage.StmtIndexUsageCollector
// GetCursorTracker returns the cursor tracker of the session
GetCursorTracker() cursor.Tracker
// GetCommitWaitGroup returns the wait group for async commit and secondary lock cleanup background goroutines
GetCommitWaitGroup() *sync.WaitGroup
}

// TxnFuture is an interface where implementations have a kv.Transaction field and after
Expand Down
1 change: 1 addition & 0 deletions pkg/sessiontxn/isolation/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//txnkv/transaction",
"@org_uber_go_zap//:zap",
],
)
Expand Down
14 changes: 14 additions & 0 deletions pkg/sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/pkg/util/tracing"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/txnkv/transaction"
)

// baseTxnContextProvider is a base class for the transaction context providers that implement `TxnContextProvider` in different isolation.
Expand Down Expand Up @@ -515,6 +516,19 @@ func (p *baseTxnContextProvider) SetOptionsOnTxnActive(txn kv.Transaction) {
}

txn.SetOption(kv.SessionID, p.sctx.GetSessionVars().ConnectionID)

// backgroundGoroutineWaitGroup is pre-initialized before the closure to avoid accessing `p.sctx` in the closure,
// which may cause unexpected race condition.
backgroundGoroutineWaitGroup := p.sctx.GetCommitWaitGroup()
lifecycleHooks := transaction.LifecycleHooks{
Pre: func() {
backgroundGoroutineWaitGroup.Add(1)
},
Post: func() {
backgroundGoroutineWaitGroup.Done()
},
}
txn.SetOption(kv.BackgroundGoroutineLifecycleHooks, lifecycleHooks)
}

func (p *baseTxnContextProvider) SetOptionsBeforeCommit(
Expand Down
3 changes: 3 additions & 0 deletions pkg/store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/client-go/v2/txnkv/transaction"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -308,6 +309,8 @@ func (txn *tikvTxn) SetOption(opt int, val any) {
txn.KVTxn.GetUnionStore().SetEntrySizeLimit(limits.Entry, limits.Total)
case kv.SessionID:
txn.KVTxn.SetSessionID(val.(uint64))
case kv.BackgroundGoroutineLifecycleHooks:
txn.KVTxn.SetBackgroundGoroutineLifecycleHooks(val.(transaction.LifecycleHooks))
}
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/util/mock/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package mock
import (
"context"
"fmt"
"sync"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -611,6 +612,11 @@ func (*Context) GetCursorTracker() cursor.Tracker {
return nil
}

// GetCommitWaitGroup implements the sessionctx.Context interface
func (*Context) GetCommitWaitGroup() *sync.WaitGroup {
return nil
}

// NewContext creates a new mocked sessionctx.Context.
func NewContext() *Context {
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit cd33d79

Please sign in to comment.