diff --git a/ddl/placement_policy_ddl_test.go b/ddl/placement_policy_ddl_test.go index 64959c4b3a52e..146f77f89febd 100644 --- a/ddl/placement_policy_ddl_test.go +++ b/ddl/placement_policy_ddl_test.go @@ -122,7 +122,7 @@ func TestPlacementPolicyInUse(t *testing.T) { t4.State = model.StatePublic db1.Tables = append(db1.Tables, t4) - builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos( + builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos( []*model.DBInfo{db1, db2, dbP}, nil, []*model.PolicyInfo{p1, p2, p3, p4, p5}, diff --git a/domain/domain.go b/domain/domain.go index 054e9d537abe0..751b3520d42b5 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -96,7 +96,6 @@ type Domain struct { serverID uint64 serverIDSession *concurrency.Session isLostConnectionToPD atomicutil.Int32 // !0: true, 0: false. - renewLeaseCh chan func() // It is used to call the renewLease function of the cache table. onClose func() sysExecutorFactory func(*Domain) (pools.Resource, error) } @@ -163,7 +162,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return nil, false, currentSchemaVersion, nil, err } - newISBuilder, err := infoschema.NewBuilder(do.Store(), do.renewLeaseCh, do.sysFacHack).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion) + newISBuilder, err := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion) if err != nil { return nil, false, currentSchemaVersion, nil, err } @@ -285,7 +284,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 } diffs = append(diffs, diff) } - builder := infoschema.NewBuilder(do.Store(), do.renewLeaseCh, do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest()) + builder := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest()) phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) for _, diff := range diffs { @@ -729,7 +728,6 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio indexUsageSyncLease: idxUsageSyncLease, planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease}, onClose: onClose, - renewLeaseCh: make(chan func(), 10), expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp), } @@ -862,10 +860,9 @@ func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain) // Local store needs to get the change information for every DDL state in each session. go do.loadSchemaInLoop(ctx, ddlLease) } - do.wg.Add(4) + do.wg.Add(3) go do.topNSlowQueryLoop() go do.infoSyncerKeeper() - go do.renewLease() go do.globalConfigSyncerKeeper() if !skipRegisterToDashboard { do.wg.Add(1) @@ -1786,22 +1783,6 @@ func (do *Domain) MockInfoCacheAndLoadInfoSchema(is infoschema.InfoSchema) { do.infoCache.Insert(is, 0) } -func (do *Domain) renewLease() { - defer func() { - do.wg.Done() - logutil.BgLogger().Info("renew lease goroutine exited.") - }() - for { - select { - case <-do.exit: - close(do.renewLeaseCh) - return - case op := <-do.renewLeaseCh: - op() - } - } -} - func init() { initByLDFlagsForGlobalKill() } diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index 7bce5d75fe547..12b885d1883ae 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -54,7 +54,7 @@ func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bu } func newSlowQueryRetriever() (*slowQueryRetriever, error) { - newISBuilder, err := infoschema.NewBuilder(nil, nil, nil).InitWithDBInfos(nil, nil, nil, 0) + newISBuilder, err := infoschema.NewBuilder(nil, nil).InitWithDBInfos(nil, nil, nil, 0) if err != nil { return nil, err } diff --git a/infoschema/builder.go b/infoschema/builder.go index 3475a00e885ed..b8e42344b867c 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -48,9 +48,8 @@ type Builder struct { // TODO: store is only used by autoid allocators // detach allocators from storage, use passed transaction in the feature store kv.Storage - // TODO: renewLeaseCh is only used to pass data between table and domain - renewLeaseCh chan func() - factory func() (pools.Resource, error) + + factory func() (pools.Resource, error) } // ApplyDiff applies SchemaDiff to the new InfoSchema. @@ -711,7 +710,7 @@ func (b *Builder) tableFromMeta(alloc autoid.Allocators, tblInfo *model.TableInf return nil, errors.Trace(err) } - err = t.Init(b.renewLeaseCh, tmp.(sqlexec.SQLExecutor)) + err = t.Init(tmp.(sqlexec.SQLExecutor)) if err != nil { return nil, errors.Trace(err) } @@ -755,7 +754,7 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc) } // NewBuilder creates a new Builder with a Handle. -func NewBuilder(store kv.Storage, renewCh chan func(), factory func() (pools.Resource, error)) *Builder { +func NewBuilder(store kv.Storage, factory func() (pools.Resource, error)) *Builder { return &Builder{ store: store, is: &infoSchema{ @@ -764,9 +763,8 @@ func NewBuilder(store kv.Storage, renewCh chan func(), factory func() (pools.Res ruleBundleMap: map[string]*placement.Bundle{}, sortedTablesBuckets: make([]sortedTables, bucketCount), }, - dirtyDB: make(map[string]bool), - renewLeaseCh: renewCh, - factory: factory, + dirtyDB: make(map[string]bool), + factory: factory, } } diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index 5fd05d9942deb..a179a3865bc3b 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -108,7 +108,7 @@ func TestBasic(t *testing.T) { }) require.NoError(t, err) - builder, err := infoschema.NewBuilder(dom.Store(), nil, nil).InitWithDBInfos(dbInfos, nil, nil, 1) + builder, err := infoschema.NewBuilder(dom.Store(), nil).InitWithDBInfos(dbInfos, nil, nil, 1) require.NoError(t, err) txn, err := store.Begin() @@ -254,7 +254,7 @@ func TestInfoTables(t *testing.T) { require.NoError(t, err) }() - builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos(nil, nil, nil, 0) + builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0) require.NoError(t, err) is := builder.Build() @@ -319,7 +319,7 @@ func TestGetBundle(t *testing.T) { require.NoError(t, err) }() - builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos(nil, nil, nil, 0) + builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0) require.NoError(t, err) is := builder.Build() diff --git a/table/table.go b/table/table.go index cc5056986bf5c..8c90b9e7323da 100644 --- a/table/table.go +++ b/table/table.go @@ -257,7 +257,7 @@ var MockTableFromMeta func(tableInfo *model.TableInfo) Table type CachedTable interface { Table - Init(renewCh chan func(), exec sqlexec.SQLExecutor) error + Init(exec sqlexec.SQLExecutor) error // TryReadFromCache checks if the cache table is readable. TryReadFromCache(ts uint64, leaseDuration time.Duration) kv.MemBuffer diff --git a/table/tables/cache.go b/table/tables/cache.go index 73d2f908fcda2..e5ab8d4bc52c8 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -16,7 +16,6 @@ package tables import ( "context" - "sync" "sync/atomic" "time" @@ -51,15 +50,14 @@ type cachedTable struct { TableCommon cacheData atomic.Value handle StateRemote - renewCh chan func() totalSize int64 - mu struct { - sync.RWMutex - lockingForRead bool - } + lockingForRead tokenLimit + renewReadLease tokenLimit } +type tokenLimit = chan struct{} + // cacheData pack the cache data and lease. type cacheData struct { Start uint64 @@ -94,11 +92,10 @@ func (c *cachedTable) TryReadFromCache(ts uint64, leaseDuration time.Duration) k nowTime := oracle.GetTimeFromTS(ts) distance := leaseTime.Sub(nowTime) if distance >= 0 && distance <= leaseDuration/2 { - op := c.renewLease(ts, RenewReadLease, data, leaseDuration) select { - case c.renewCh <- op: + case c.renewReadLease <- struct{}{}: + go c.renewLease(ts, data, leaseDuration) default: - // Skip this time, if the previous renew lease operation hasn't finished. } } return data.MemBuffer @@ -109,15 +106,16 @@ func (c *cachedTable) TryReadFromCache(ts uint64, leaseDuration time.Duration) k // newCachedTable creates a new CachedTable Instance func newCachedTable(tbl *TableCommon) (table.Table, error) { ret := &cachedTable{ - TableCommon: *tbl, + TableCommon: *tbl, + lockingForRead: make(chan struct{}, 1), + renewReadLease: make(chan struct{}, 1), } return ret, nil } // Init is an extra operation for cachedTable after TableFromMeta, // Because cachedTable need some additional parameter that can't be passed in TableFromMeta. -func (c *cachedTable) Init(renewCh chan func(), exec sqlexec.SQLExecutor) error { - c.renewCh = renewCh +func (c *cachedTable) Init(exec sqlexec.SQLExecutor) error { raw, ok := exec.(sqlExec) if !ok { return errors.New("Need sqlExec rather than sqlexec.SQLExecutor") @@ -172,19 +170,12 @@ func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) } func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64, leaseDuration time.Duration) { - c.mu.RLock() - lockingForRead := c.mu.lockingForRead - c.mu.RUnlock() - if lockingForRead { + select { + case c.lockingForRead <- struct{}{}: + go c.updateLockForRead(ctx, store, ts, leaseDuration) + default: // There is a inflight calling already. - return } - - c.mu.Lock() - c.mu.lockingForRead = true - c.mu.Unlock() - - go c.updateLockForRead(ctx, store, ts, leaseDuration) } func (c *cachedTable) updateLockForRead(ctx context.Context, store kv.Storage, ts uint64, leaseDuration time.Duration) { @@ -194,9 +185,7 @@ func (c *cachedTable) updateLockForRead(ctx context.Context, store kv.Storage, t zap.Reflect("r", r), zap.Stack("stack trace")) } - c.mu.Lock() - c.mu.lockingForRead = false - c.mu.Unlock() + <-c.lockingForRead }() // Load data from original table and the update lock information. @@ -260,20 +249,20 @@ func (c *cachedTable) RemoveRecord(sctx sessionctx.Context, h kv.Handle, r []typ return c.TableCommon.RemoveRecord(sctx, h, r) } -func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData, leaseDuration time.Duration) func() { - return func() { - tid := c.Meta().ID - lease := leaseFromTS(ts, leaseDuration) - succ, err := c.handle.RenewLease(context.Background(), tid, lease, op) - if err != nil { - log.Warn("Renew read lease error", zap.Error(err)) - } - if succ { - c.cacheData.Store(&cacheData{ - Start: data.Start, - Lease: lease, - MemBuffer: data.MemBuffer, - }) - } +func (c *cachedTable) renewLease(ts uint64, data *cacheData, leaseDuration time.Duration) { + defer func() { <-c.renewReadLease }() + + tid := c.Meta().ID + lease := leaseFromTS(ts, leaseDuration) + succ, err := c.handle.RenewLease(context.Background(), tid, lease, RenewReadLease) + if err != nil && !kv.IsTxnRetryableError(err) { + log.Warn("Renew read lease error", zap.Error(err)) + } + if succ { + c.cacheData.Store(&cacheData{ + Start: data.Start, + Lease: lease, + MemBuffer: data.MemBuffer, + }) } } diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 0a829686aec6e..a10856cc5a481 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -284,7 +284,7 @@ func (h *stateRemoteHandle) renewWriteLease(ctx context.Context, tid int64, newL } func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { - _, err := h.execSQL(ctx, "begin") + _, err := h.execSQL(ctx, "begin optimistic") return err } @@ -324,7 +324,7 @@ func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Co } func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int64) (CachedTableLockType, uint64, uint64, error) { - chunkRows, err := h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %? for update", tid) + chunkRows, err := h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %?", tid) if err != nil { return 0, 0, 0, errors.Trace(err) }