Skip to content

Commit

Permalink
update etcd namespace by keyspace prefix (pingcap#35)
Browse files Browse the repository at this point in the history
* update etcd namespace by keyspace prefix

Signed-off-by: ystaticy <[email protected]>
  • Loading branch information
ystaticy committed Aug 30, 2022
1 parent a254dd6 commit 910a86e
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 4 deletions.
9 changes: 9 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/domainutil"
"github.com/pingcap/tidb/util/etcd"
"github.com/pingcap/tidb/util/expensivequery"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -793,6 +794,14 @@ func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain)
},
TLS: ebd.TLSConfig(),
})

// If keyspace has been set in KvStorage
if isKvStorageKeyspaceSet(do.store) {
keyspaceId := KeyspaceIdBytesToUint32(do.store.GetCodec().GetKeyspaceID())
etcdPathPrefix := GetKeyspacePathPrefix(keyspaceId)
etcd.SetEtcdCliByNamespace(cli, etcdPathPrefix)
}

if err != nil {
return errors.Trace(err)
}
Expand Down
29 changes: 28 additions & 1 deletion domain/keyspace.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
package domain

import (
"encoding/binary"
"fmt"
"os"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
)

const EnvVarKeyspaceName = "KEYSPACE_NAME"
const (

// EnvVarKeyspaceName is the system env name for keyspace name.
EnvVarKeyspaceName = "KEYSPACE_NAME"

// tidbKeyspaceEtcdPathPrefix is the keyspace prefix for etcd namespace
tidbKeyspaceEtcdPathPrefix = "/keyspaces/tidb/"
)

func GetKeyspaceNameBySettings() (keyspaceName string) {

Expand All @@ -23,3 +33,20 @@ func GetKeyspaceNameBySettings() (keyspaceName string) {
func IsKeyspaceNameEmpty(keyspaceName string) bool {
return keyspaceName == ""
}

// GetKeyspacePathPrefix return the keyspace prefix path for etcd namespace
func GetKeyspacePathPrefix(keyspaceId uint32) string {
path := fmt.Sprintf(tidbKeyspaceEtcdPathPrefix+"%d"+"/", keyspaceId)
return path
}

// KeyspaceIdBytesToUint32 is used to convert byte array to uint32
func KeyspaceIdBytesToUint32(b []byte) uint32 {
c := make([]byte, 4)
copy(c[1:4], b[0:3])
return binary.BigEndian.Uint32(c)
}

func isKvStorageKeyspaceSet(store kv.Storage) bool {
return store.GetCodec().GetKeyspaceID() != nil
}
13 changes: 13 additions & 0 deletions domain/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package domain

import (
"encoding/binary"
"os"
"testing"

Expand Down Expand Up @@ -78,3 +79,15 @@ func (k *keyspaceSuite) TestNoKeyspaceNameSet() {
k.Equal(true, IsKeyspaceNameEmpty(getKeyspaceName))

}

func (k *keyspaceSuite) TestID2Uint32() {

expectId := uint32(1)
expectBytes := make([]byte, 4)
binary.BigEndian.PutUint32(expectBytes, expectId)

testBytes := expectBytes[1:]
testId := KeyspaceIdBytesToUint32(testBytes)
k.Equal(expectId, testId)

}
49 changes: 46 additions & 3 deletions tests/realtikvtest/testkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ package realtikvtest
import (
"context"
"flag"
"fmt"
"github.com/pingcap/tidb/tablecodec"
"go.uber.org/zap"
"os"
"sync/atomic"
"testing"
"time"
Expand All @@ -40,12 +44,26 @@ import (
)

// WithRealTiKV is a flag identify whether tests run with real TiKV
var WithRealTiKV = flag.Bool("with-real-tikv", false, "whether tests run with real TiKV")
var (
WithRealTiKV = flag.Bool("with-real-tikv", false, "whether tests run with real TiKV")
// test tikv
tikvPath = flag.String("tikv-path", "tikv://127.0.0.1:2379?disableGC=true", "TiKV addr")

// KeyspaceName is an option to specify the name of keyspace that the tests run on,
// this option is only valid while the flag WithRealTiKV is set.
KeyspaceName = flag.String("keyspace-name", "", "the name of keyspace that the tests run on")
)

// RunTestMain run common setups for all real tikv tests.
func RunTestMain(m *testing.M) {
testsetup.SetupForCommonTest()
flag.Parse()

if !*WithRealTiKV && *KeyspaceName != "" {
_, _ = fmt.Fprintf(os.Stderr, "could not set -keyspace-name while not running with real TiKVs.")
os.Exit(-1)
}

session.SetSchemaLease(5 * time.Second)
config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.SafeWindow = 0
Expand Down Expand Up @@ -74,14 +92,35 @@ func RunTestMain(m *testing.M) {
}

func clearTiKVStorage(t *testing.T, store kv.Storage) {
// clear table data.
tableStart := tablecodec.TablePrefix()
deleteRange(t, store, tableStart, endOfRange(tableStart))

// clear meta data.
metaStart := tablecodec.MetaPrefix()
deleteRange(t, store, metaStart, endOfRange(metaStart))
}

func endOfRange(start []byte) []byte {
return []byte{start[0] + 1}
}

func deleteRange(t *testing.T, store kv.Storage, start, end []byte) {

fmt.Println("deleteRange:", zap.Binary("start:", start), zap.Binary("end:", end))
txn, err := store.Begin()
require.NoError(t, err)
iter, err := txn.Iter(nil, nil)

// Clean all table data.
iter, err := txn.Iter(start, end)

//iter, err := txn.Iter(nil, nil)
require.NoError(t, err)
for iter.Valid() {
require.NoError(t, txn.Delete(iter.Key()))
require.NoError(t, iter.Next())
}

require.NoError(t, txn.Commit(context.Background()))
}

Expand Down Expand Up @@ -127,12 +166,16 @@ func CreateMockStoreAndDomainAndSetup(t *testing.T, opts ...mockstore.MockTiKVSt
var dom *domain.Domain
var err error

println("WithRealTiKV:", *WithRealTiKV)
println("tikvPath:", *tikvPath)

if *WithRealTiKV {
var d driver.TiKVDriver
config.UpdateGlobal(func(conf *config.Config) {
conf.TxnLocalLatches.Enabled = false
conf.KeyspaceName = *KeyspaceName
})
store, err = d.Open("tikv://127.0.0.1:2379?disableGC=true")
store, err = d.Open(*tikvPath)
require.NoError(t, err)

clearTiKVStorage(t, store)
Expand Down
8 changes: 8 additions & 0 deletions util/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/pingcap/errors"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/namespace"
)

// Node organizes the ectd query result as a Trie tree
Expand Down Expand Up @@ -93,6 +94,13 @@ func NewClientFromCfg(endpoints []string, dialTimeout time.Duration, root string
}, nil
}

func SetEtcdCliByNamespace(cli *clientv3.Client, namespacePrefix string) {
cli.KV = namespace.NewKV(cli.KV, namespacePrefix)
cli.Watcher = namespace.NewWatcher(cli.Watcher, namespacePrefix)
cli.Lease = namespace.NewLease(cli.Lease, namespacePrefix)

}

// Close shutdowns the connection to etcd
func (e *Client) Close() error {
if err := e.client.Close(); err != nil {
Expand Down
28 changes: 28 additions & 0 deletions util/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,3 +395,31 @@ func testSetup(t *testing.T) (context.Context, *Client, *integration.ClusterV3)
etcd := NewClient(cluster.RandClient(), "binlog")
return context.Background(), etcd, cluster
}

func testSetupOriginal(t *testing.T) (context.Context, *clientv3.Client, *integration.ClusterV3) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
return context.Background(), cluster.RandClient(), cluster
}

func TestSetEtcdCliByNamespace(t *testing.T) {
integration.BeforeTest(t)
ctx, origEtcdCli, etcdMockCluster := testSetupOriginal(t)
defer etcdMockCluster.Terminate(t)

namespacePrefix := "testNamespace/"
key := "testkey"
obj := "test"

unprefixedKV := origEtcdCli.KV
cliNamespace := origEtcdCli
SetEtcdCliByNamespace(cliNamespace, namespacePrefix)

_, err := cliNamespace.Put(ctx, key, obj)
require.NoError(t, err)

// verify that kv pair is empty before set
getResp, err := unprefixedKV.Get(ctx, namespacePrefix+key)
require.NoError(t, err)
require.Len(t, getResp.Kvs, 1)

}

0 comments on commit 910a86e

Please sign in to comment.