Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

table/tables: add StateRemote interface for the cached table #29152

Merged
merged 21 commits into from
Nov 25, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions table/tables/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (c *cachedTable) UpdateLockForRead(store kv.Storage, ts uint64) error {
// Load data from original table and the update lock information.
tid := c.Meta().ID
lease := leaseFromTS(ts)
succ, err := c.handle.LockForRead(tid, ts, lease)
succ, err := c.handle.LockForRead(context.Background(), tid, ts, lease)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -169,7 +169,7 @@ func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ..
return nil, err
}
now := txn.StartTS()
err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now))
err = c.handle.LockForWrite(context.Background(), c.Meta().ID, now, leaseFromTS(now))
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -183,7 +183,7 @@ func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context,
return err
}
now := txn.StartTS()
err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now))
err = c.handle.LockForWrite(ctx, c.Meta().ID, now, leaseFromTS(now))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -197,7 +197,7 @@ func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type
return err
}
now := txn.StartTS()
err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now))
err = c.handle.LockForWrite(context.Background(), c.Meta().ID, now, leaseFromTS(now))
if err != nil {
return errors.Trace(err)
}
Expand Down
268 changes: 259 additions & 9 deletions table/tables/state_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
package tables

import (
"context"
"fmt"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/oracle"
)

Expand All @@ -37,43 +42,59 @@ const (
CachedTableLockWrite
)

// StateRemote Indicates the remote status information of the read-write lock
func (l CachedTableLockType) String() string {
switch l {
case CachedTableLockNone:
return "NONE"
case CachedTableLockRead:
return "READ"
case CachedTableLockIntend:
return "INTEND"
case CachedTableLockWrite:
return "WRITE"
}
panic("invalid CachedTableLockType value")
}

// StateRemote is the interface to control the remote state of the cached table's lock meta information.
type StateRemote interface {
// Load obtain the corresponding lock type and lease value according to the tableID
Load(tid int64) (CachedTableLockType, uint64, error)
Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error)

// LockForRead try to add a read lock to the table with the specified tableID
LockForRead(tid int64, now, ts uint64) (bool, error)
LockForRead(ctx context.Context, tid int64, now, ts uint64) (bool, error)

// LockForWrite try to add a write lock to the table with the specified tableID
LockForWrite(tid int64, now, ts uint64) error
LockForWrite(ctx context.Context, tid int64, now, ts uint64) error

// RenewLease attempt to renew the read lock on the table with the specified tableID
RenewLease(tid int64, ts uint64) (bool, error)
RenewLease(ctx context.Context, tid int64, ts uint64) (bool, error)
}

// mockStateRemoteHandle implement the StateRemote interface.
type mockStateRemoteHandle struct {
ch chan remoteTask
}

func (r *mockStateRemoteHandle) Load(tid int64) (CachedTableLockType, uint64, error) {
var _ StateRemote = &mockStateRemoteHandle{}

func (r *mockStateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) {
op := &loadOP{tid: tid}
op.Add(1)
r.ch <- op
op.Wait()
return op.lockType, op.lease, op.err
}

func (r *mockStateRemoteHandle) LockForRead(tid int64, now, ts uint64) (bool, error) {
func (r *mockStateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) (bool, error) {
op := &lockForReadOP{tid: tid, now: now, ts: ts}
op.Add(1)
r.ch <- op
op.Wait()
return op.succ, op.err
}

func (r *mockStateRemoteHandle) LockForWrite(tid int64, now, ts uint64) error {
func (r *mockStateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts uint64) error {
op := &lockForWriteOP{tid: tid, now: now, ts: ts}
op.Add(1)
r.ch <- op
Expand Down Expand Up @@ -101,7 +122,7 @@ func (r *mockStateRemoteHandle) LockForWrite(tid int64, now, ts uint64) error {
return op.err
}

func (r *mockStateRemoteHandle) RenewLease(tid int64, ts uint64) (bool, error) {
func (r *mockStateRemoteHandle) RenewLease(ctx context.Context, tid int64, ts uint64) (bool, error) {
return false, errors.New("not implemented yet")
}

Expand Down Expand Up @@ -276,3 +297,232 @@ func (r *mockStateRemoteData) LockForWrite(tid int64, now, ts uint64) (uint64, e
}
return 0, nil
}

type sqlExec interface {
AffectedRows() uint64
ExecuteInternal(context.Context, string, ...interface{}) (sqlexec.RecordSet, error)
GetStore() kv.Storage
}

type stateRemoteHandle struct {
exec sqlExec
sync.Mutex
}

// NewStateRemote creates a StateRemote object.
func NewStateRemote(exec sqlExec) *stateRemoteHandle {
return &stateRemoteHandle{
exec: exec,
}
}

var _ StateRemote = &stateRemoteHandle{}

func (h *stateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) {
lockType, lease, _, err := h.loadRow(ctx, tid)
return lockType, lease, err
}

func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) ( /*succ*/ bool, error) {
h.Lock()
defer h.Unlock()
succ := false
err := h.runInTxn(ctx, func(ctx context.Context) error {
lockType, lease, _, err := h.loadRow(ctx, tid)
if err != nil {
return errors.Trace(err)
}
// The old lock is outdated, clear orphan lock.
if now > lease {
succ = true
if err := h.updateRow(ctx, tid, "READ", ts); err != nil {
return errors.Trace(err)
}
return nil
}

switch lockType {
case CachedTableLockNone:
case CachedTableLockRead:
case CachedTableLockWrite, CachedTableLockIntend:
return nil
}
succ = true
if err := h.updateRow(ctx, tid, "READ", ts); err != nil {
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
return errors.Trace(err)
}

return nil
})
return succ, err
}

func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts uint64) error {
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
h.Lock()
defer h.Unlock()
for {
retry, err := h.lockForWriteOnce(ctx, tid, now, ts)
if err != nil {
return err
}
if !retry {
break
}

store := h.exec.GetStore()
o := store.GetOracle()
newTS, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: kv.GlobalTxnScope})
if err != nil {
return errors.Trace(err)
}
now, ts = newTS, leaseFromTS(newTS)
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now, ts uint64) ( /*retry*/ bool, error) {
err := h.beginTxn(ctx)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return false, errors.Trace(err)
}
defer func() {
if err != nil {
terror.Log(h.rollbackTxn(ctx))
}
}()

lockType, lease, oldReadLease, err := h.loadRow(ctx, tid)
if err != nil {
return false, errors.Trace(err)
}
// The lease is outdated, so lock is invalid, clear orphan lock of any kind.
if now > lease {
if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil {
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
return false, errors.Trace(err)
}
return false, h.commitTxn(ctx)
}

// The lease is valid.
switch lockType {
case CachedTableLockNone:
if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil {
return false, errors.Trace(err)
}
return false, h.commitTxn(ctx)
case CachedTableLockRead:
// Change from READ to INTEND
if _, err = h.execSQL(ctx, "update mysql.table_cache_meta set lock_type='INTEND', oldReadLease=%?, lease=%? where tid=%?", lease, ts, tid); err != nil {
return false, errors.Trace(err)
}
if err = h.commitTxn(ctx); err != nil {
return false, errors.Trace(err)
}

// Wait for lease to expire, and then retry.
waitForLeaseExpire(oldReadLease, now)
return true, nil
case CachedTableLockIntend, CachedTableLockWrite:
// `now` exceed `oldReadLease` means wait for READ lock lease is done, it's safe to read here.
if now > oldReadLease {
if lockType == CachedTableLockIntend {
if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil {
return false, errors.Trace(err)
}
}
return false, h.commitTxn(ctx)
}

// Otherwise, the WRITE should wait for the READ lease expire.
terror.Log(h.rollbackTxn(ctx))
waitForLeaseExpire(oldReadLease, now)
// And then retry change the lock to WRITE
return true, nil
}
return false, errors.New("should never run here")
}

func waitForLeaseExpire(oldReadLease, now uint64) {
if oldReadLease >= now {
t1 := oracle.GetTimeFromTS(oldReadLease)
t2 := oracle.GetTimeFromTS(now)
waitDuration := t1.Sub(t2)
time.Sleep(waitDuration)
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, ts uint64) (bool, error) {
h.Lock()
defer h.Unlock()
_, err := h.execSQL(ctx, "update mysql.table_cache_meta set lease = %? where tid = %? and lock_type ='READ'", ts, tid)
if err != nil {
return false, errors.Trace(err)
}
succ := h.exec.AffectedRows() > 0
return succ, err
}

func (h *stateRemoteHandle) beginTxn(ctx context.Context) error {
_, err := h.execSQL(ctx, "begin")
return err
}

func (h *stateRemoteHandle) commitTxn(ctx context.Context) error {
_, err := h.execSQL(ctx, "commit")
return err
}

func (h *stateRemoteHandle) rollbackTxn(ctx context.Context) error {
_, err := h.execSQL(ctx, "rollback")
return err
}

func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Context) error) error {
err := h.beginTxn(ctx)
if err != nil {
return errors.Trace(err)
}

err = fn(ctx)
if err != nil {
terror.Log(h.rollbackTxn(ctx))
return errors.Trace(err)
}

return h.commitTxn(ctx)
}

func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int64) (CachedTableLockType, uint64, uint64, error) {
chunkRows, err := h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %? for update", tid)
if err != nil {
return 0, 0, 0, errors.Trace(err)
}
if len(chunkRows) != 1 {
return 0, 0, 0, errors.Errorf("table_cache_meta tid not exist %d", tid)
}
col1 := chunkRows[0].GetEnum(0)
// Note, the MySQL enum value start from 1 rather than 0
lockType := CachedTableLockType(col1.Value - 1)
lease := chunkRows[0].GetUint64(1)
oldReadLease := chunkRows[0].GetUint64(2)
return lockType, lease, oldReadLease, nil
}

func (h *stateRemoteHandle) updateRow(ctx context.Context, tid int64, lockType string, lease uint64) error {
_, err := h.execSQL(ctx, "update mysql.table_cache_meta set lock_type = %?, lease = %? where tid = %?", lockType, lease, tid)
return err
}

func (h *stateRemoteHandle) execSQL(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error) {
rs, err := h.exec.ExecuteInternal(ctx, sql, args...)
if rs != nil {
defer rs.Close()
}
if err != nil {
return nil, errors.Trace(err)
}
if rs != nil {
return sqlexec.DrainRecordSet(ctx, rs, 1)
}
return nil, nil
}
Loading