diff --git a/table/tables/cache.go b/table/tables/cache.go index c7e4cea2e8814..1e27059445e97 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -171,7 +171,7 @@ func (c *cachedTable) UpdateLockForRead(store kv.Storage, ts uint64) error { // Load data from original table and the update lock information. tid := c.Meta().ID lease := leaseFromTS(ts) - succ, err := c.handle.LockForRead(tid, ts, lease) + succ, err := c.handle.LockForRead(context.Background(), tid, ts, lease) if err != nil { return errors.Trace(err) } @@ -198,7 +198,7 @@ func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts .. return nil, err } now := txn.StartTS() - err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(context.Background(), c.Meta().ID, now, leaseFromTS(now)) if err != nil { return nil, errors.Trace(err) } @@ -212,7 +212,7 @@ func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, return err } now := txn.StartTS() - err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(ctx, c.Meta().ID, now, leaseFromTS(now)) if err != nil { return errors.Trace(err) } @@ -226,7 +226,7 @@ func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return err } now := txn.StartTS() - err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(context.Background(), c.Meta().ID, now, leaseFromTS(now)) if err != nil { return errors.Trace(err) } @@ -237,7 +237,7 @@ func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData) return func() { tid := c.Meta().ID lease := leaseFromTS(ts) - succ, err := c.handle.RenewLease(tid, ts, lease, op) + succ, err := c.handle.RenewLease(context.Background(), tid, ts, lease, op) if err != nil { log.Warn("Renew read lease error") } diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 46a304127ec5e..938648f458c7d 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -15,11 +15,16 @@ package tables import ( + "context" "fmt" "sync" "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/oracle" ) @@ -37,19 +42,40 @@ const ( CachedTableLockWrite ) -// StateRemote Indicates the remote status information of the read-write lock +func (l CachedTableLockType) String() string { + switch l { + case CachedTableLockNone: + return "NONE" + case CachedTableLockRead: + return "READ" + case CachedTableLockIntend: + return "INTEND" + case CachedTableLockWrite: + return "WRITE" + } + panic("invalid CachedTableLockType value") +} + +// StateRemote is the interface to control the remote state of the cached table's lock meta information. type StateRemote interface { // Load obtain the corresponding lock type and lease value according to the tableID - Load(tid int64) (CachedTableLockType, uint64, error) - - // LockForRead try to add a read lock to the table with the specified tableID - LockForRead(tid int64, now, ts uint64) (bool, error) + Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) + + // LockForRead try to add a read lock to the table with the specified tableID. + // If this operation succeed, according to the protocol, the TiKV data will not be + // modified until the lease expire. It's safe for the caller to load the table data, + // cache and use the data. + // The parameter `now` means the current tso. Because the tso is get from PD, in + // the TiDB side, its value lags behind the real one forever, this doesn't matter. + // Because `now` is only used to clean up the orphan lock, as long as it's smaller + // than the real one, the correctness of the algorithm is not violated. + LockForRead(ctx context.Context, tid int64, now, lease uint64) (bool, error) // LockForWrite try to add a write lock to the table with the specified tableID - LockForWrite(tid int64, now, ts uint64) error + LockForWrite(ctx context.Context, tid int64, now, ts uint64) error // RenewLease attempt to renew the read / write lock on the table with the specified tableID - RenewLease(tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) + RenewLease(ctx context.Context, tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) } // mockStateRemoteHandle implement the StateRemote interface. @@ -57,7 +83,9 @@ type mockStateRemoteHandle struct { ch chan remoteTask } -func (r *mockStateRemoteHandle) Load(tid int64) (CachedTableLockType, uint64, error) { +var _ StateRemote = &mockStateRemoteHandle{} + +func (r *mockStateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) { op := &loadOP{tid: tid} op.Add(1) r.ch <- op @@ -65,7 +93,7 @@ func (r *mockStateRemoteHandle) Load(tid int64) (CachedTableLockType, uint64, er return op.lockType, op.lease, op.err } -func (r *mockStateRemoteHandle) LockForRead(tid int64, now, ts uint64) (bool, error) { +func (r *mockStateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) (bool, error) { op := &lockForReadOP{tid: tid, now: now, ts: ts} op.Add(1) r.ch <- op @@ -73,7 +101,7 @@ func (r *mockStateRemoteHandle) LockForRead(tid int64, now, ts uint64) (bool, er return op.succ, op.err } -func (r *mockStateRemoteHandle) LockForWrite(tid int64, now, ts uint64) error { +func (r *mockStateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts uint64) error { op := &lockForWriteOP{tid: tid, now: now, ts: ts} op.Add(1) r.ch <- op @@ -101,7 +129,7 @@ func (r *mockStateRemoteHandle) LockForWrite(tid int64, now, ts uint64) error { return op.err } -func (r *mockStateRemoteHandle) RenewLease(tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) { +func (r *mockStateRemoteHandle) RenewLease(ctx context.Context, tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) { switch op { case RenewReadLease: op := &renewLeaseForReadOP{tid: tid, oldTs: oldTs, newTs: newTs} @@ -326,5 +354,225 @@ func (r *mockStateRemoteData) renewLeaseForRead(tid int64, oldTs uint64, newTs u return true, nil } return false, errors.New("The new lease is smaller than the old lease is an illegal contract renewal operation") +} + +type sqlExec interface { + AffectedRows() uint64 + ExecuteInternal(context.Context, string, ...interface{}) (sqlexec.RecordSet, error) + GetStore() kv.Storage +} + +type stateRemoteHandle struct { + exec sqlExec + sync.Mutex +} + +// NewStateRemote creates a StateRemote object. +func NewStateRemote(exec sqlExec) *stateRemoteHandle { + return &stateRemoteHandle{ + exec: exec, + } +} + +var _ StateRemote = &stateRemoteHandle{} + +func (h *stateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) { + lockType, lease, _, err := h.loadRow(ctx, tid) + return lockType, lease, err +} + +// LockForRead try to lock the table, if this operation succeed, the remote data +// is "read locked" and will not be modified according to the protocol, until the lease expire. +func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) ( /*succ*/ bool, error) { + h.Lock() + defer h.Unlock() + succ := false + err := h.runInTxn(ctx, func(ctx context.Context) error { + lockType, lease, _, err := h.loadRow(ctx, tid) + if err != nil { + return errors.Trace(err) + } + // The old lock is outdated, clear orphan lock. + if now > lease { + succ = true + if err := h.updateRow(ctx, tid, "READ", ts); err != nil { + return errors.Trace(err) + } + return nil + } + + switch lockType { + case CachedTableLockNone: + case CachedTableLockRead: + case CachedTableLockWrite, CachedTableLockIntend: + return nil + } + succ = true + if ts > lease { // Note the check, don't decrease lease value! + if err := h.updateRow(ctx, tid, "READ", ts); err != nil { + return errors.Trace(err) + } + } + + return nil + }) + return succ, err +} + +func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts uint64) error { + h.Lock() + defer h.Unlock() + for { + waitAndRetry, err := h.lockForWriteOnce(ctx, tid, now, ts) + if err != nil { + return err + } + if waitAndRetry == 0 { + break + } + + time.Sleep(waitAndRetry) + store := h.exec.GetStore() + o := store.GetOracle() + newTS, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: kv.GlobalTxnScope}) + if err != nil { + return errors.Trace(err) + } + now, ts = newTS, leaseFromTS(newTS) + } + return nil +} +func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now, ts uint64) (waitAndRetry time.Duration, err error) { + err = h.runInTxn(ctx, func(ctx context.Context) error { + lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) + if err != nil { + return errors.Trace(err) + } + // The lease is outdated, so lock is invalid, clear orphan lock of any kind. + if now > lease { + if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return errors.Trace(err) + } + return nil + } + + // The lease is valid. + switch lockType { + case CachedTableLockNone: + if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return errors.Trace(err) + } + case CachedTableLockRead: + // Change from READ to INTEND + if _, err = h.execSQL(ctx, "update mysql.table_cache_meta set lock_type='INTEND', oldReadLease=%?, lease=%? where tid=%?", lease, ts, tid); err != nil { + return errors.Trace(err) + } + // Wait for lease to expire, and then retry. + waitAndRetry = waitForLeaseExpire(oldReadLease, now) + case CachedTableLockIntend, CachedTableLockWrite: + // `now` exceed `oldReadLease` means wait for READ lock lease is done, it's safe to read here. + if now > oldReadLease { + if lockType == CachedTableLockIntend { + if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return errors.Trace(err) + } + } + return nil + } + // Otherwise, the WRITE should wait for the READ lease expire. + // And then retry changing the lock to WRITE + waitAndRetry = waitForLeaseExpire(oldReadLease, now) + } + return nil + }) + + return +} + +func waitForLeaseExpire(oldReadLease, now uint64) time.Duration { + if oldReadLease >= now { + t1 := oracle.GetTimeFromTS(oldReadLease) + t2 := oracle.GetTimeFromTS(now) + waitDuration := t1.Sub(t2) + return waitDuration + } + return 0 +} + +func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, now, newTs uint64, op RenewLeaseType) (bool, error) { + h.Lock() + defer h.Unlock() + // TODO: `now` should use the real current tso to check the old lease is not expired. + _, err := h.execSQL(ctx, "update mysql.table_cache_meta set lease = %? where tid = %? and lock_type ='READ'", newTs, tid) + if err != nil { + return false, errors.Trace(err) + } + succ := h.exec.AffectedRows() > 0 + return succ, err +} + +func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { + _, err := h.execSQL(ctx, "begin") + return err +} + +func (h *stateRemoteHandle) commitTxn(ctx context.Context) error { + _, err := h.execSQL(ctx, "commit") + return err +} + +func (h *stateRemoteHandle) rollbackTxn(ctx context.Context) error { + _, err := h.execSQL(ctx, "rollback") + return err +} + +func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Context) error) error { + err := h.beginTxn(ctx) + if err != nil { + return errors.Trace(err) + } + + err = fn(ctx) + if err != nil { + terror.Log(h.rollbackTxn(ctx)) + return errors.Trace(err) + } + + return h.commitTxn(ctx) +} + +func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int64) (CachedTableLockType, uint64, uint64, error) { + chunkRows, err := h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %? for update", tid) + if err != nil { + return 0, 0, 0, errors.Trace(err) + } + if len(chunkRows) != 1 { + return 0, 0, 0, errors.Errorf("table_cache_meta tid not exist %d", tid) + } + col1 := chunkRows[0].GetEnum(0) + // Note, the MySQL enum value start from 1 rather than 0 + lockType := CachedTableLockType(col1.Value - 1) + lease := chunkRows[0].GetUint64(1) + oldReadLease := chunkRows[0].GetUint64(2) + return lockType, lease, oldReadLease, nil +} + +func (h *stateRemoteHandle) updateRow(ctx context.Context, tid int64, lockType string, lease uint64) error { + _, err := h.execSQL(ctx, "update mysql.table_cache_meta set lock_type = %?, lease = %? where tid = %?", lockType, lease, tid) + return err +} + +func (h *stateRemoteHandle) execSQL(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error) { + rs, err := h.exec.ExecuteInternal(ctx, sql, args...) + if rs != nil { + defer rs.Close() + } + if err != nil { + return nil, errors.Trace(err) + } + if rs != nil { + return sqlexec.DrainRecordSet(ctx, rs, 1) + } + return nil, nil } diff --git a/table/tables/state_remote_test.go b/table/tables/state_remote_test.go new file mode 100644 index 0000000000000..969f0fb56b0e0 --- /dev/null +++ b/table/tables/state_remote_test.go @@ -0,0 +1,128 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tables_test + +import ( + "context" + "testing" + + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/require" +) + +// CreateMetaLockForCachedTable initializes the cached table meta lock information. +func createMetaLockForCachedTable(h session.Session) error { + createTable := "CREATE TABLE IF NOT EXISTS `mysql`.`table_cache_meta` (" + + "`tid` int(11) NOT NULL DEFAULT 0," + + "`lock_type` enum('NONE','READ', 'INTEND', 'WRITE') NOT NULL DEFAULT 'NONE'," + + "`lease` bigint(20) NOT NULL DEFAULT 0," + + "`oldReadLease` bigint(20) NOT NULL DEFAULT 0," + + "PRIMARY KEY (`tid`))" + _, err := h.ExecuteInternal(context.Background(), createTable) + return err +} + +// InitRow add a new record into the cached table meta lock table. +func initRow(ctx context.Context, exec session.Session, tid int) error { + _, err := exec.ExecuteInternal(ctx, "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", tid) + return err +} + +func TestStateRemote(t *testing.T) { + t.Parallel() + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := tk.Session() + + ctx := context.Background() + h := tables.NewStateRemote(se) + err := createMetaLockForCachedTable(se) + require.NoError(t, err) + require.Equal(t, tables.CachedTableLockNone, tables.CachedTableLockType(0)) + + // Check the initial value. + require.NoError(t, initRow(ctx, se, 5)) + lockType, lease, err := h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockNone) + require.Equal(t, lockType.String(), "NONE") + require.Equal(t, lease, uint64(0)) + + // Check read lock. + succ, err := h.LockForRead(ctx, 5, 1234, 1234) + require.NoError(t, err) + require.True(t, succ) + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockRead) + require.Equal(t, lockType.String(), "READ") + require.Equal(t, lease, uint64(1234)) + + // LockForRead when read lock is hold. + // This operation equals to renew lease. + succ, err = h.LockForRead(ctx, 5, 1235, 1235) + require.NoError(t, err) + require.True(t, succ) + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockRead) + require.Equal(t, lockType.String(), "READ") + require.Equal(t, lease, uint64(1235)) + + // Renew read lock lease operation. + succ, err = h.RenewLease(ctx, 5, 0, 1264, tables.RenewReadLease) + require.NoError(t, err) + require.True(t, succ) + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockRead) + require.Equal(t, lockType.String(), "READ") + require.Equal(t, lease, uint64(1264)) + + // Check write lock. + require.NoError(t, h.LockForWrite(ctx, 5, 2234, 2234)) + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockWrite) + require.Equal(t, lockType.String(), "WRITE") + require.Equal(t, lease, uint64(2234)) + + // Lock for write again + require.NoError(t, h.LockForWrite(ctx, 5, 3234, 3234)) + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockWrite) + require.Equal(t, lockType.String(), "WRITE") + require.Equal(t, lease, uint64(3234)) + + // Renew read lock lease should fail when the write lock is hold. + succ, err = h.RenewLease(ctx, 5, 0, 1264, tables.RenewReadLease) + require.NoError(t, err) + require.False(t, succ) + + // Acquire read lock should also fail when the write lock is hold. + succ, err = h.LockForRead(ctx, 5, 1264, 1264) + require.NoError(t, err) + require.False(t, succ) + + // But clear orphan write lock should success. + succ, err = h.LockForRead(ctx, 5, 4234, 4234) + require.NoError(t, err) + require.True(t, succ) +}