Skip to content

Commit

Permalink
implement the external timestamp client
Browse files Browse the repository at this point in the history
Signed-off-by: YangKeao <[email protected]>
  • Loading branch information
YangKeao committed Oct 26, 2022
1 parent 6c9c7c7 commit 17cca1f
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 17 deletions.
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20221014081430-26e28e6a281a
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_model v0.2.0
Expand Down Expand Up @@ -58,3 +58,5 @@ require (
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
stathat.com/c/consistent v1.0.0 // indirect
)

replace github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db => github.com/lhy1024/pd/client v0.0.0-20221026120600-7cf30bf5ce7d
15 changes: 6 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lhy1024/pd/client v0.0.0-20221026120600-7cf30bf5ce7d h1:Fa9UKRpXYynrEZdb0Vzshj+L/x6f/qPq5Sq0wM7VFgc=
github.com/lhy1024/pd/client v0.0.0-20221026120600-7cf30bf5ce7d/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down Expand Up @@ -154,13 +156,10 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZ
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c h1:ceg4xjEEXNgPsScTQ5dtidiltLF4h17Y/jUqfyLAy9E=
github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221014081430-26e28e6a281a h1:McYxPhA8SHqfUtLfQHHN0fQl4dy93IkhlX4Pp2MKIFA=
github.com/pingcap/kvproto v0.0.0-20221014081430-26e28e6a281a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172 h1:FYgKV9znRQmzVrrJDZ0gUfMIvKLAMU1tu1UKJib8bEQ=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 h1:URLoJ61DmmY++Sa/yyPEQHG2s/ZBeV1FbIswHEMrdoY=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -203,8 +202,6 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db h1:r1eMh9Rny3hfWuBuxOnbsCRrR4FhthiNxLQ5rAUtaww=
github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db/go.mod h1:ew8kS0yIcEaSetuuywkTLIUBR+sz3J5XvAYRae11qwc=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/kvproto v0.0.0-20221014081430-26e28e6a281a
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172
github.com/pingcap/tidb v1.1.0-beta.0.20220902042024-0482b2e83ed2
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.0
Expand Down Expand Up @@ -47,7 +47,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/badger v1.5.1-0.20220314162537-ab58fbf40580 // indirect
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect
github.com/pingcap/log v1.1.0 // indirect
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 // indirect
github.com/pingcap/tidb/parser v0.0.0-20220724090709-5484002f1963 // indirect
github.com/pingcap/tipb v0.0.0-20220824081009-0714a57aff1d // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,12 @@ github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 h1:Pe2LbxRmbTfAoKJ65bZL
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221014081430-26e28e6a281a h1:McYxPhA8SHqfUtLfQHHN0fQl4dy93IkhlX4Pp2MKIFA=
github.com/pingcap/kvproto v0.0.0-20221014081430-26e28e6a281a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172 h1:FYgKV9znRQmzVrrJDZ0gUfMIvKLAMU1tu1UKJib8bEQ=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.0 h1:ELiPxACz7vdo1qAvvaWJg1NrYFoY6gqAh/+Uo6aXdD8=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 h1:URLoJ61DmmY++Sa/yyPEQHG2s/ZBeV1FbIswHEMrdoY=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 h1:HYbcxtnkN3s5tqrZ/z3eJS4j3Db8wMphEm1q10lY/TM=
github.com/pingcap/tidb v1.1.0-beta.0.20220902042024-0482b2e83ed2 h1:DDUcT2xQn615E32PjtAxjw3ny1W9ttIXmm0zuCbbpRM=
github.com/pingcap/tidb v1.1.0-beta.0.20220902042024-0482b2e83ed2/go.mod h1:30/HzqtWK9Epm8XEYHn7xrVR6LDQtU0wn9fjx/9Fsnc=
Expand Down
30 changes: 30 additions & 0 deletions internal/mockstore/mocktikv/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pkg/errors"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
)

Expand All @@ -63,6 +65,8 @@ type pdClient struct {
// in GC.
serviceSafePoints map[string]uint64
gcSafePointMu sync.Mutex

externalTimestamp atomic.Uint64
}

// NewPDClient creates a mock pd.Client that uses local timestamp and meta data
Expand Down Expand Up @@ -116,6 +120,32 @@ func (c *pdClient) GetLocalTSAsync(ctx context.Context, dcLocation string) pd.TS
return c.GetTSAsync(ctx)
}

func (c *pdClient) SetExternalTimestamp(ctx context.Context, newTimestamp uint64) error {
p, l, err := c.GetTS(ctx)
if err != nil {
return err
}

currentTSO := oracle.ComposeTS(p, l)
if newTimestamp > currentTSO {
return errors.New("external timestamp is greater than global tso")
}
for {
externalTimestamp := c.externalTimestamp.Load()
if externalTimestamp > newTimestamp {
return errors.New("cannot decrease the external timestamp")
}

if c.externalTimestamp.CompareAndSwap(externalTimestamp, newTimestamp) {
return nil
}
}
}

func (c *pdClient) GetExternalTimestamp(ctx context.Context) (uint64, error) {
return c.externalTimestamp.Load(), nil
}

type mockTSFuture struct {
pdc *pdClient
ctx context.Context
Expand Down
3 changes: 3 additions & 0 deletions oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type Oracle interface {
IsExpired(lockTimestamp, TTL uint64, opt *Option) bool
UntilExpired(lockTimeStamp, TTL uint64, opt *Option) int64
Close()

GetExternalTimestamp(ctx context.Context) (uint64, error)
SetExternalTimestamp(ctx context.Context, ts uint64) error
}

// Future is a future which promises to return a timestamp.
Expand Down
10 changes: 10 additions & 0 deletions oracle/oracles/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type localOracle struct {
hook *struct {
currentTime time.Time
}

localExternalTimestamp
}

// NewLocalOracle creates an Oracle that uses local time as data source.
Expand Down Expand Up @@ -124,3 +126,11 @@ func (l *localOracle) UntilExpired(lockTimeStamp, TTL uint64, opt *oracle.Option

func (l *localOracle) Close() {
}

func (l *localOracle) SetExternalTimestamp(ctx context.Context, newTimestamp uint64) error {
return l.setExternalTimestamp(ctx, l, newTimestamp)
}

func (l *localOracle) GetExternalTimestamp(ctx context.Context) (uint64, error) {
return l.getExternalTimestamp(ctx)
}
52 changes: 52 additions & 0 deletions oracle/oracles/local_external_timestamp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2022 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package oracles

import (
"context"
"sync/atomic"

"github.com/pkg/errors"
"github.com/tikv/client-go/v2/oracle"
)

type localExternalTimestamp struct {
externalTimestamp atomic.Uint64
}

func (l *localExternalTimestamp) setExternalTimestamp(ctx context.Context, o oracle.Oracle, newTimestamp uint64) error {
currentTSO, err := o.GetTimestamp(ctx, nil)
if err != nil {
return err
}

if newTimestamp > currentTSO {
return errors.New("external timestamp is greater than global tso")
}
for {
externalTimestamp := l.externalTimestamp.Load()
if externalTimestamp > newTimestamp {
return errors.New("cannot decrease the external timestamp")
}

if l.externalTimestamp.CompareAndSwap(externalTimestamp, newTimestamp) {
return nil
}
}
}

func (l *localExternalTimestamp) getExternalTimestamp(ctx context.Context) (uint64, error) {
return l.externalTimestamp.Load(), nil
}
12 changes: 12 additions & 0 deletions oracle/oracles/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type MockOracle struct {
stop bool
offset time.Duration
lastTS uint64

localExternalTimestamp
}

// Enable enables the Oracle
Expand Down Expand Up @@ -140,3 +142,13 @@ func (o *MockOracle) UntilExpired(lockTimeStamp, TTL uint64, _ *oracle.Option) i
func (o *MockOracle) Close() {

}

// GetExternalTimestamp implement oracle.Oracle interface.
func (o *MockOracle) GetExternalTimestamp(ctx context.Context) (uint64, error) {
return o.getExternalTimestamp(ctx)
}

// SetExternalTimestamp implement oracle.Oracle interface.
func (o *MockOracle) SetExternalTimestamp(ctx context.Context, newTimestamp uint64) error {
return o.setExternalTimestamp(ctx, o, newTimestamp)
}
8 changes: 8 additions & 0 deletions oracle/oracles/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,11 @@ func (o *pdOracle) GetStaleTimestamp(ctx context.Context, txnScope string, prevS
}
return ts, nil
}

func (o *pdOracle) SetExternalTimestamp(ctx context.Context, ts uint64) error {
return o.c.SetExternalTimestamp(ctx, ts)
}

func (o *pdOracle) GetExternalTimestamp(ctx context.Context) (uint64, error) {
return o.c.GetExternalTimestamp(ctx)
}

0 comments on commit 17cca1f

Please sign in to comment.