Skip to content

Commit

Permalink
add tso storage
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Feb 2, 2023
1 parent 6ca9a33 commit 974bccd
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 2 deletions.
15 changes: 15 additions & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ const (
// resource group storage endpoint has prefix `resource_group`
resourceGroupSettingsPath = "settings"

microserviceKey = "microservice"
tsoServiceKey = "tso"
timestampKey = "timestamp"

// we use uint64 to represent ID, the max length of uint64 is 20.
keyLen = 20
)
Expand Down Expand Up @@ -211,3 +215,14 @@ func KeyspaceIDAlloc() string {
func encodeKeyspaceID(spaceID uint32) string {
return fmt.Sprintf("%08d", spaceID)
}

func timestampPath(keyspaceGroupName string, dcLocationKey ...string) string {
if len(dcLocationKey) != 0 {
return path.Join(microserviceKey, tsoServiceKey, keyspaceGroupName, dcLocationKey[0], timestampKey)
}
return path.Join(microserviceKey, tsoServiceKey, keyspaceGroupName, timestampKey)
}

func timestampPrefix(keyspaceGroupName string) string {
return path.Join(microserviceKey, tsoServiceKey, keyspaceGroupName) + "/"
}
2 changes: 1 addition & 1 deletion pkg/storage/endpoint/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/gogo/protobuf/proto"
)

// ResourceGroupStorage defines the storage operations on the rule.
// ResourceGroupStorage defines the storage operations on the resource group.
type ResourceGroupStorage interface {
LoadResourceGroupSettings(f func(k, v string)) error
SaveResourceGroupSetting(name string, msg proto.Message) error
Expand Down
78 changes: 78 additions & 0 deletions pkg/storage/endpoint/tso.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package endpoint

import (
"strings"
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

// TSOStorage defines the storage operations on the TSO.
// global tso: key is `/microservice/tso/{keyspaceGroupName}/timestamp`
// local tso: key is `/microservice/tso/{keyspaceGroupName}/lts/{dcLocation}/timestamp`
// FIXME: When we upgrade from the old version, there may be compatibility issues.
type TSOStorage interface {
LoadTimestamp(keyspaceGroupName string, dcLocationKey ...string) (time.Time, error)
SaveTimestamp(keyspaceGroupName string, ts time.Time, dcLocationKey ...string) error
}

var _ TSOStorage = (*StorageEndpoint)(nil)

// LoadTimestamp loads a timestamp from the storage according to the keyspaceGroupName and dcLocation.
func (se *StorageEndpoint) LoadTimestamp(keyspaceGroupName string, dcLocationKey ...string) (time.Time, error) {
var prefix string
if len(dcLocationKey) != 0 {
prefix = timestampPrefix(keyspaceGroupName) + dcLocationKey[0] + "/"
} else {
prefix = timestampPrefix(keyspaceGroupName)
}
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
return time.Time{}, err
}
if len(keys) == 0 {
return time.Time{}, nil
}

maxTSWindow := typeutil.ZeroTime
for i, key := range keys {
key := strings.TrimSpace(key)
if !strings.HasSuffix(key, timestampKey) {
continue
}
tsWindow, err := typeutil.ParseTimestamp([]byte(values[i]))
if err != nil {
log.Error("parse timestamp window that from etcd failed", zap.String("ts-window-key", key), zap.Time("max-ts-window", maxTSWindow), zap.Error(err))
continue
}
if typeutil.SubRealTimeByWallClock(tsWindow, maxTSWindow) > 0 {
maxTSWindow = tsWindow
}
}
return maxTSWindow, nil
}

// SaveTimestamp saves the timestamp to the storage.
func (se *StorageEndpoint) SaveTimestamp(keyspaceGroupName string, ts time.Time, dcLocationKey ...string) error {
key := timestampPath(keyspaceGroupName, dcLocationKey...)
data := typeutil.Uint64ToBytes(uint64(ts.UnixNano()))
return se.Save(key, string(data))
}
1 change: 1 addition & 0 deletions server/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Storage interface {
endpoint.KeyspaceGCSafePointStorage
endpoint.KeyspaceStorage
endpoint.ResourceGroupStorage
endpoint.TSOStorage
}

// NewStorageWithMemoryBackend creates a new storage with memory backend.
Expand Down
89 changes: 89 additions & 0 deletions server/storage/storage_tso_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import (
"path"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)

func TestSaveLoadTimestamp(t *testing.T) {
re := require.New(t)

cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
re.NoError(err)
defer etcd.Close()

ep := cfg.LCUrls[0].String()
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)
rootPath := path.Join("/pd", strconv.FormatUint(100, 10))
storage := NewStorageWithEtcdBackend(client, rootPath)

keyspaceGroupName := "test"
expectedTS := time.Now().Round(0)
err = storage.SaveTimestamp(keyspaceGroupName, expectedTS)
re.NoError(err)
ts, err := storage.LoadTimestamp(keyspaceGroupName)
re.NoError(err)
re.Equal(expectedTS, ts)
}

func TestGlobalLocalTimestamp(t *testing.T) {
re := require.New(t)

cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
re.NoError(err)
defer etcd.Close()

ep := cfg.LCUrls[0].String()
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)
rootPath := path.Join("/pd", strconv.FormatUint(100, 10))
storage := NewStorageWithEtcdBackend(client, rootPath)

keyspaceGroupName := "test"
dc1LocationKey, dc2LocationKey := "lts/dc1", "lts/dc2"
localTS1 := time.Now().Round(0)
err = storage.SaveTimestamp(keyspaceGroupName, localTS1, dc1LocationKey)
re.NoError(err)
globalTS := time.Now().Round(0)
err = storage.SaveTimestamp(keyspaceGroupName, globalTS)
re.NoError(err)
localTS2 := time.Now().Round(0)
err = storage.SaveTimestamp(keyspaceGroupName, localTS2, dc2LocationKey)
re.NoError(err)
// return the max ts between global and local
ts, err := storage.LoadTimestamp(keyspaceGroupName)
re.NoError(err)
re.Equal(localTS2, ts)
// return the local ts for a given dc location
ts, err = storage.LoadTimestamp(keyspaceGroupName, dc1LocationKey)
re.NoError(err)
re.Equal(localTS1, ts)
}
2 changes: 1 addition & 1 deletion server/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func (am *AllocatorManager) getOrCreateLocalTSOSuffix(dcLocation string) (int32,
if !txnResp.Succeeded {
log.Warn("write local tso suffix into etcd failed",
zap.String("dc-location", dcLocation),
zap.String("local-tso-surfix", localTSOSuffixValue),
zap.String("local-tso-suffix", localTSOSuffixValue),
zap.String("server-name", am.member.Member().Name),
zap.Uint64("server-id", am.member.ID()))
return -1, errs.ErrEtcdTxnConflict.FastGenByArgs()
Expand Down

0 comments on commit 974bccd

Please sign in to comment.