From ab8d40d538baf57432e47178c874eb89ead91e21 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 17 Mar 2022 01:46:20 +0800 Subject: [PATCH 1/2] table/tables: fix some stability issues with cached table --- table/tables/cache.go | 1 + table/tables/state_remote.go | 32 ++++++++++++++++++++++---------- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/table/tables/cache.go b/table/tables/cache.go index 48850bb85d736..4de65204cb944 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -194,6 +194,7 @@ func (c *cachedTable) updateLockForRead(ctx context.Context, store kv.Storage, t if succ { mb, startTS, totalSize, err := c.loadDataFromOriginalTable(store, lease) if err != nil { + log.Info("load data from table", zap.Error(err)) return } diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index a7ebcf860e752..88bcdc9ec7a94 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -103,7 +103,7 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, newLease h.Lock() defer h.Unlock() succ := false - err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + err := h.runInTxn(ctx, false, func(ctx context.Context, now uint64) error { lockType, lease, _, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) @@ -155,7 +155,7 @@ func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, leaseDu } func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, leaseDuration time.Duration) (waitAndRetry time.Duration, ts uint64, err error) { - err = h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + err = h.runInTxn(ctx, true, func(ctx context.Context, now uint64) error { lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) @@ -211,8 +211,11 @@ func waitForLeaseExpire(oldReadLease, now uint64) time.Duration { if oldReadLease >= now { t1 := oracle.GetTimeFromTS(oldReadLease) t2 := oracle.GetTimeFromTS(now) - waitDuration := t1.Sub(t2) - return waitDuration + if t1.After(t2) { + waitDuration := t1.Sub(t2) + return waitDuration + } + return time.Microsecond } return 0 } @@ -220,8 +223,10 @@ func waitForLeaseExpire(oldReadLease, now uint64) time.Duration { // RenewReadLease renew the read lock lease. // Return the current lease value on success, and return 0 on fail. func (h *stateRemoteHandle) RenewReadLease(ctx context.Context, tid int64, oldLocalLease, newValue uint64) (uint64, error) { + h.Lock() + defer h.Unlock() var newLease uint64 - err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + err := h.runInTxn(ctx, false, func(ctx context.Context, now uint64) error { lockType, remoteLease, _, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) @@ -266,8 +271,10 @@ func (h *stateRemoteHandle) RenewReadLease(ctx context.Context, tid int64, oldLo } func (h *stateRemoteHandle) RenewWriteLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { + h.Lock() + defer h.Unlock() var succ bool - err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + err := h.runInTxn(ctx, true, func(ctx context.Context, now uint64) error { lockType, oldLease, _, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) @@ -293,8 +300,13 @@ func (h *stateRemoteHandle) RenewWriteLease(ctx context.Context, tid int64, newL return succ, err } -func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { - _, err := h.execSQL(ctx, "begin optimistic") +func (h *stateRemoteHandle) beginTxn(ctx context.Context, pess bool) error { + var err error + if pess { + _, err = h.execSQL(ctx, "begin pessimistic") + } else { + _, err = h.execSQL(ctx, "begin optimistic") + } return err } @@ -308,8 +320,8 @@ func (h *stateRemoteHandle) rollbackTxn(ctx context.Context) error { return err } -func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Context, txnTS uint64) error) error { - err := h.beginTxn(ctx) +func (h *stateRemoteHandle) runInTxn(ctx context.Context, pessimistic bool, fn func(ctx context.Context, txnTS uint64) error) error { + err := h.beginTxn(ctx, pessimistic) if err != nil { return errors.Trace(err) } From 8a0b2e543f1c235e8aca87b1180b4563c54b452b Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 17 Mar 2022 12:26:16 +0800 Subject: [PATCH 2/2] address comment --- table/tables/state_remote.go | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 88bcdc9ec7a94..6abbebf3e30a5 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -103,7 +103,7 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, newLease h.Lock() defer h.Unlock() succ := false - err := h.runInTxn(ctx, false, func(ctx context.Context, now uint64) error { + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { lockType, lease, _, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) @@ -155,7 +155,7 @@ func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, leaseDu } func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, leaseDuration time.Duration) (waitAndRetry time.Duration, ts uint64, err error) { - err = h.runInTxn(ctx, true, func(ctx context.Context, now uint64) error { + err = h.runInTxn(ctx, func(ctx context.Context, now uint64) error { lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) @@ -226,7 +226,7 @@ func (h *stateRemoteHandle) RenewReadLease(ctx context.Context, tid int64, oldLo h.Lock() defer h.Unlock() var newLease uint64 - err := h.runInTxn(ctx, false, func(ctx context.Context, now uint64) error { + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { lockType, remoteLease, _, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) @@ -274,7 +274,7 @@ func (h *stateRemoteHandle) RenewWriteLease(ctx context.Context, tid int64, newL h.Lock() defer h.Unlock() var succ bool - err := h.runInTxn(ctx, true, func(ctx context.Context, now uint64) error { + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { lockType, oldLease, _, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) @@ -300,13 +300,8 @@ func (h *stateRemoteHandle) RenewWriteLease(ctx context.Context, tid int64, newL return succ, err } -func (h *stateRemoteHandle) beginTxn(ctx context.Context, pess bool) error { - var err error - if pess { - _, err = h.execSQL(ctx, "begin pessimistic") - } else { - _, err = h.execSQL(ctx, "begin optimistic") - } +func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { + _, err := h.execSQL(ctx, "begin optimistic") return err } @@ -320,8 +315,8 @@ func (h *stateRemoteHandle) rollbackTxn(ctx context.Context) error { return err } -func (h *stateRemoteHandle) runInTxn(ctx context.Context, pessimistic bool, fn func(ctx context.Context, txnTS uint64) error) error { - err := h.beginTxn(ctx, pessimistic) +func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Context, txnTS uint64) error) error { + err := h.beginTxn(ctx) if err != nil { return errors.Trace(err) }