diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 5bf3febe5d3a..e0b8ee49c46c 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -45,6 +45,10 @@ const ( // resource group storage endpoint has prefix `resource_group` resourceGroupSettingsPath = "settings" + microserviceKey = "microservice" + tsoServiceKey = "tso" + timestampKey = "timestamp" + // we use uint64 to represent ID, the max length of uint64 is 20. keyLen = 20 ) @@ -211,3 +215,14 @@ func KeyspaceIDAlloc() string { func encodeKeyspaceID(spaceID uint32) string { return fmt.Sprintf("%08d", spaceID) } + +func timestampPath(keyspaceGroupName string, dcLocationKey ...string) string { + if len(dcLocationKey) != 0 { + return path.Join(microserviceKey, tsoServiceKey, keyspaceGroupName, dcLocationKey[0], timestampKey) + } + return path.Join(microserviceKey, tsoServiceKey, keyspaceGroupName, timestampKey) +} + +func timestampPrefix(keyspaceGroupName string) string { + return path.Join(microserviceKey, tsoServiceKey, keyspaceGroupName) + "/" +} diff --git a/pkg/storage/endpoint/resource_group.go b/pkg/storage/endpoint/resource_group.go index e965070cf772..76adb93da8d3 100644 --- a/pkg/storage/endpoint/resource_group.go +++ b/pkg/storage/endpoint/resource_group.go @@ -18,7 +18,7 @@ import ( "github.com/gogo/protobuf/proto" ) -// ResourceGroupStorage defines the storage operations on the rule. +// ResourceGroupStorage defines the storage operations on the resource group. type ResourceGroupStorage interface { LoadResourceGroupSettings(f func(k, v string)) error SaveResourceGroupSetting(name string, msg proto.Message) error diff --git a/pkg/storage/endpoint/tso.go b/pkg/storage/endpoint/tso.go new file mode 100644 index 000000000000..3fa4214cc5db --- /dev/null +++ b/pkg/storage/endpoint/tso.go @@ -0,0 +1,78 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endpoint + +import ( + "strings" + "time" + + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/utils/typeutil" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" +) + +// TSOStorage defines the storage operations on the TSO. +// global tso: key is `/microservice/tso/{keyspaceGroupName}/timestamp` +// local tso: key is `/microservice/tso/{keyspaceGroupName}/lts/{dcLocation}/timestamp` +// FIXME: When we upgrade from the old version, there may be compatibility issues. +type TSOStorage interface { + LoadTimestamp(keyspaceGroupName string, dcLocationKey ...string) (time.Time, error) + SaveTimestamp(keyspaceGroupName string, ts time.Time, dcLocationKey ...string) error +} + +var _ TSOStorage = (*StorageEndpoint)(nil) + +// LoadTimestamp loads a timestamp from the storage according to the keyspaceGroupName and dcLocation. +func (se *StorageEndpoint) LoadTimestamp(keyspaceGroupName string, dcLocationKey ...string) (time.Time, error) { + var prefix string + if len(dcLocationKey) != 0 { + prefix = timestampPrefix(keyspaceGroupName) + dcLocationKey[0] + "/" + } else { + prefix = timestampPrefix(keyspaceGroupName) + } + prefixEnd := clientv3.GetPrefixRangeEnd(prefix) + keys, values, err := se.LoadRange(prefix, prefixEnd, 0) + if err != nil { + return time.Time{}, err + } + if len(keys) == 0 { + return time.Time{}, nil + } + + maxTSWindow := typeutil.ZeroTime + for i, key := range keys { + key := strings.TrimSpace(key) + if !strings.HasSuffix(key, timestampKey) { + continue + } + tsWindow, err := typeutil.ParseTimestamp([]byte(values[i])) + if err != nil { + log.Error("parse timestamp window that from etcd failed", zap.String("ts-window-key", key), zap.Time("max-ts-window", maxTSWindow), zap.Error(err)) + continue + } + if typeutil.SubRealTimeByWallClock(tsWindow, maxTSWindow) > 0 { + maxTSWindow = tsWindow + } + } + return maxTSWindow, nil +} + +// SaveTimestamp saves the timestamp to the storage. +func (se *StorageEndpoint) SaveTimestamp(keyspaceGroupName string, ts time.Time, dcLocationKey ...string) error { + key := timestampPath(keyspaceGroupName, dcLocationKey...) + data := typeutil.Uint64ToBytes(uint64(ts.UnixNano())) + return se.Save(key, string(data)) +} diff --git a/server/storage/storage.go b/server/storage/storage.go index e9fd09498484..f2bfe1f74fb1 100644 --- a/server/storage/storage.go +++ b/server/storage/storage.go @@ -43,6 +43,7 @@ type Storage interface { endpoint.KeyspaceGCSafePointStorage endpoint.KeyspaceStorage endpoint.ResourceGroupStorage + endpoint.TSOStorage } // NewStorageWithMemoryBackend creates a new storage with memory backend. diff --git a/server/storage/storage_tso_test.go b/server/storage/storage_tso_test.go new file mode 100644 index 000000000000..f612e5e2b76b --- /dev/null +++ b/server/storage/storage_tso_test.go @@ -0,0 +1,89 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "path" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/utils/etcdutil" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/embed" +) + +func TestSaveLoadTimestamp(t *testing.T) { + re := require.New(t) + + cfg := etcdutil.NewTestSingleConfig(t) + etcd, err := embed.StartEtcd(cfg) + re.NoError(err) + defer etcd.Close() + + ep := cfg.LCUrls[0].String() + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + re.NoError(err) + rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) + storage := NewStorageWithEtcdBackend(client, rootPath) + + keyspaceGroupName := "test" + expectedTS := time.Now().Round(0) + err = storage.SaveTimestamp(keyspaceGroupName, expectedTS) + re.NoError(err) + ts, err := storage.LoadTimestamp(keyspaceGroupName) + re.NoError(err) + re.Equal(expectedTS, ts) +} + +func TestGlobalLocalTimestamp(t *testing.T) { + re := require.New(t) + + cfg := etcdutil.NewTestSingleConfig(t) + etcd, err := embed.StartEtcd(cfg) + re.NoError(err) + defer etcd.Close() + + ep := cfg.LCUrls[0].String() + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + re.NoError(err) + rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) + storage := NewStorageWithEtcdBackend(client, rootPath) + + keyspaceGroupName := "test" + dc1LocationKey, dc2LocationKey := "lts/dc1", "lts/dc2" + localTS1 := time.Now().Round(0) + err = storage.SaveTimestamp(keyspaceGroupName, localTS1, dc1LocationKey) + re.NoError(err) + globalTS := time.Now().Round(0) + err = storage.SaveTimestamp(keyspaceGroupName, globalTS) + re.NoError(err) + localTS2 := time.Now().Round(0) + err = storage.SaveTimestamp(keyspaceGroupName, localTS2, dc2LocationKey) + re.NoError(err) + // return the max ts between global and local + ts, err := storage.LoadTimestamp(keyspaceGroupName) + re.NoError(err) + re.Equal(localTS2, ts) + // return the local ts for a given dc location + ts, err = storage.LoadTimestamp(keyspaceGroupName, dc1LocationKey) + re.NoError(err) + re.Equal(localTS1, ts) +} diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index 2c0fa04524c1..d75d53a47bbc 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -757,7 +757,7 @@ func (am *AllocatorManager) getOrCreateLocalTSOSuffix(dcLocation string) (int32, if !txnResp.Succeeded { log.Warn("write local tso suffix into etcd failed", zap.String("dc-location", dcLocation), - zap.String("local-tso-surfix", localTSOSuffixValue), + zap.String("local-tso-suffix", localTSOSuffixValue), zap.String("server-name", am.member.Member().Name), zap.Uint64("server-id", am.member.ID())) return -1, errs.ErrEtcdTxnConflict.FastGenByArgs()