Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Feb 10, 2020
1 parent 24a8c2d commit 9e84846
Show file tree
Hide file tree
Showing 6 changed files with 391 additions and 2 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ build_for_integration_test:
GO111MODULE=on go build -race -o bin/locker tests/br_key_locked/*.go
# build gc
GO111MODULE=on go build -race -o bin/gc tests/br_z_gc_safepoint/*.go
# build rawkv client
GO111MODULE=on go build -race -o bin/rawkv tests/br_rawkv/*.go

test:
GO111MODULE=on go test -race -tags leak ./...
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUW
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/aws/aws-sdk-go v1.26.1 h1:JGQggXhOiNJIqsmbYUl3cYtJZUffeOWlHtxfzGK7WPI=
Expand Down Expand Up @@ -567,6 +569,7 @@ google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/gometalinter.v2 v2.0.12/go.mod h1:NDRytsqEZyolNuAgTzJkZMkSQM7FIKyzVzGhjB/qfYo=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c/go.mod h1:3HH7i1SgMqlzxCcBmUHW657sD4Kvv9sC3HpL3YukzwA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
4 changes: 3 additions & 1 deletion pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ func (mgr *Mgr) Close() {

// Gracefully shutdown domain so it does not affect other TiDB DDL.
// Must close domain before closing storage, otherwise it gets stuck forever.
mgr.dom.Close()
if mgr.dom != nil{
mgr.dom.Close()
}

atomic.StoreUint32(&tikv.ShuttingDown, 1)
mgr.storage.Close()
Expand Down
331 changes: 331 additions & 0 deletions tests/br_rawkv/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,331 @@
package main

import (
"bytes"
"encoding/hex"
"flag"
"fmt"
"hash/crc64"
"math/rand"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv"
"github.com/prometheus/common/log"
)

// * Rand Gen
// * Scan checksum
// * Scan print
// * Scan diff
//
// Ref TIDB: https://github.com/pingcap/tidb/commit/7a7b2d82a70858544d7b711c9cb8499bda5d6eb7

var (
pdAddr = flag.String("pd", "127.0.0.1:2379", "Address of PD")
runMode = flag.String("mode", "", "Mode. One of 'rand-gen', 'checksum', 'scan' and 'diff'")
startKeyStr = flag.String("start-key", "", "Start key in hex")
endKeyStr = flag.String("end-key", "", "End key in hex")
keyMaxLen = flag.Int("key-max-len", 32, "Max length of keys for rand-gen mode")
concurrency = flag.Int("concurrency", 32, "Concurrency to run rand-gen")
duration = flag.Int("duration", 10, "duration(second) of rand-gen")
)

func createClient(addr string) (*tikv.RawKVClient, error) {
cli, err := tikv.NewRawKVClient([]string{addr}, config.Security{})
return cli, err
}

func main() {
flag.Parse()

startKey, err := hex.DecodeString(*startKeyStr)
if err != nil {
log.Fatalf("Invalid startKey: %v, err: %+v", startKeyStr, err)
}
endKey, err := hex.DecodeString(*endKeyStr)
if err != nil {
log.Fatalf("Invalid endKey: %v, err: %+v", endKeyStr, err)
}
if len(endKey) == 0 {
log.Fatal("Empty endKey is not supported yet")
}

if *runMode == "test-rand-key" {
testRandKey(startKey, endKey, *keyMaxLen)
return
}

client, err := createClient(*pdAddr)
if err != nil {
log.Fatalf("Failed to create client to %v, err: %+v", *pdAddr, err)
}

switch *runMode {
case "rand-gen":
err = randGenWithDuration(client, startKey, endKey, *keyMaxLen, *concurrency, *duration)
case "checksum":
err = checksum(client, startKey, endKey)
case "scan":
err = scan(client, startKey, endKey)
case "delete":
err = deleteRange(client, startKey, endKey)
}

if err != nil {
log.Fatalf("Error: %+v", err)
}
}

func randGenWithDuration(client *tikv.RawKVClient, startKey, endKey []byte, maxLen int, concurrency int, duration int) error {
var err error
ok := make(chan struct{})
go func() {
err = randGen(client, startKey, endKey, maxLen, concurrency)
ok<- struct{}{}
}()
select {
case <-time.After(time.Second*time.Duration(duration)):
case <-ok:
}
return nil
}

func randGen(client *tikv.RawKVClient, startKey, endKey []byte, maxLen int, concurrency int) error {
log.Infof("Start rand-gen from %v to %v, maxLen %v", hex.EncodeToString(startKey), hex.EncodeToString(endKey), maxLen)
log.Infof("Rand-gen will keep running. Please Ctrl+C to stop manually.")

// Cannot generate shorter key than commonPrefix
commonPrefixLen := 0
for ; commonPrefixLen < len(startKey) && commonPrefixLen < len(endKey) &&
startKey[commonPrefixLen] == endKey[commonPrefixLen]; commonPrefixLen++ {
continue
}

if maxLen < commonPrefixLen {
return errors.Errorf("maxLen (%v) < commonPrefixLen (%v)", maxLen, commonPrefixLen)
}

const batchSize = 32

errCh := make(chan error, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
for {
keys := make([][]byte, 0, batchSize)
values := make([][]byte, 0, batchSize)

for i := 0; i < batchSize; i++ {
key := randKey(startKey, endKey, maxLen)
keys = append(keys, key)
value := randValue()
values = append(values, value)
}

err := client.BatchPut(keys, values)
if err != nil {
errCh <- errors.Trace(err)
}
}
}()
}

err := <-errCh
if err != nil {
return errors.Trace(err)
}

return nil
}

func testRandKey(startKey, endKey []byte, maxLen int) {
for {
k := randKey(startKey, endKey, maxLen)
if bytes.Compare(k, startKey) < 0 || bytes.Compare(k, endKey) >= 0 {
panic(hex.EncodeToString(k))
}
}
}

func randKey(startKey, endKey []byte, maxLen int) []byte {
Retry:
for { // Regenerate on fail
result := make([]byte, 0, maxLen)

upperUnbounded := false
lowerUnbounded := false

for i := 0; i < maxLen; i++ {
upperBound := 256
if !upperUnbounded {
if i >= len(endKey) {
// The generated key is the same as endKey which is invalid. Regenerate it.
continue Retry
}
upperBound = int(endKey[i]) + 1
}

lowerBound := 0
if !lowerUnbounded {
if i >= len(startKey) {
lowerUnbounded = true
} else {
lowerBound = int(startKey[i])
}
}

if lowerUnbounded {
if rand.Intn(257) == 0 {
return result
}
}

value := rand.Intn(upperBound - lowerBound)
value += lowerBound

if value < upperBound-1 {
upperUnbounded = true
}
if value > lowerBound {
lowerUnbounded = true
}

result = append(result, uint8(value))
}

return result
}
}

func randValue() []byte {
result := make([]byte, 0, 512)
for i := 0; i < 512; i++ {
value := rand.Intn(257)
if value == 256 {
if i > 0 {
return result
}
value--
}
result = append(result, uint8(value))
}
return result
}

func checksum(client *tikv.RawKVClient, startKey, endKey []byte) error {
log.Infof("Start checkcum on range %v to %v", hex.EncodeToString(startKey), hex.EncodeToString(endKey))

scanner := newRawKVScanner(client, startKey, endKey)
digest := crc64.New(crc64.MakeTable(crc64.ECMA))

var res uint64

for {
k, v, err := scanner.Next()
if err != nil {
return errors.Trace(err)
}
if len(k) == 0 {
break
}
_, _ = digest.Write(k)
_, _ = digest.Write(v)
res ^= digest.Sum64()
}

fmt.Printf("Checksum result: %016x\n", res)
return nil
}

func deleteRange(client *tikv.RawKVClient, startKey, endKey []byte) error {
log.Infof("Start delete data in range %v to %v", hex.EncodeToString(startKey), hex.EncodeToString(endKey))
return client.DeleteRange(startKey, endKey)
}

func scan(client *tikv.RawKVClient, startKey, endKey []byte) error {
log.Infof("Start scanning data in range %v to %v", hex.EncodeToString(startKey), hex.EncodeToString(endKey))

scanner := newRawKVScanner(client, startKey, endKey)

var key []byte
for {
k, v, err := scanner.Next()
if err != nil {
return errors.Trace(err)
}
if len(k) == 0 {
break
}
fmt.Printf("key: %v, value: %v\n", hex.EncodeToString(k), hex.EncodeToString(v))
if bytes.Compare(key, k) >= 0 {
log.Errorf("Scan result is not in order. "+
"Previous key: %v, Current key: %v",
hex.EncodeToString(key), hex.EncodeToString(k))
}
}

log.Infof("Finished Scanning.")
return nil
}

const defaultScanBatchSize = 128

type rawKVScanner struct {
client *tikv.RawKVClient
batchSize int

currentKey []byte
endKey []byte

bufferKeys [][]byte
bufferValues [][]byte
bufferCursor int
noMore bool
}

func newRawKVScanner(client *tikv.RawKVClient, startKey, endKey []byte) *rawKVScanner {
return &rawKVScanner{
client: client,
batchSize: defaultScanBatchSize,

currentKey: startKey,
endKey: endKey,

noMore: false,
}
}

func (s *rawKVScanner) Next() ([]byte, []byte, error) {
if s.bufferCursor >= len(s.bufferKeys) {
if s.noMore {
return nil, nil, nil
}

s.bufferCursor = 0

batchSize := s.batchSize
var err error
s.bufferKeys, s.bufferValues, err = s.client.Scan(s.currentKey, s.endKey, batchSize)
if err != nil {
return nil, nil, errors.Trace(err)
}

if len(s.bufferKeys) < batchSize {
s.noMore = true
}

if len(s.bufferKeys) == 0 {
return nil, nil, nil
}

bufferKey := s.bufferKeys[len(s.bufferKeys)-1]
bufferKey = append(bufferKey, 0)
s.currentKey = bufferKey
}

key := s.bufferKeys[s.bufferCursor]
value := s.bufferValues[s.bufferCursor]
s.bufferCursor++
return key, value, nil
}
Loading

0 comments on commit 9e84846

Please sign in to comment.