Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add etcd #247

Merged
merged 2 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ Available Commands:
- MongoDB
- Redis and Redis Cluster
- BoltDB
- etcd

## Database Configuration

Expand Down Expand Up @@ -299,6 +300,17 @@ Common configurations:
|bolt.mmap_flags|0|Set the DB.MmapFlags flag before memory mapping the file|
|bolt.initial_mmap_size|0|The initial mmap size of the database in bytes. If <= 0, the initial map size is 0. If the size is smaller than the previous database, it takes no effect|

### etcd

|field|default value|description|
|-|-|-|
|etcd.endpoints|"localhost:2379"|The etcd endpoint(s), multiple endpoints can be passed separated by comma.|
|etcd.dial_timeout|"2s"|The dial timeout duration passed into the client config.|
|etcd.cert_file|""|When using secure etcd, this should point to the crt file.|
|etcd.key_file|""|When using secure etcd, this should point to the pem file.|
|etcd.cacert_file|""|When using secure etcd, this should point to the ca file.|


## TODO

- [ ] Support more measurement, like HdrHistogram
Expand Down
6 changes: 4 additions & 2 deletions cmd/go-ycsb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package main

import (
"context"
"fmt"
"log"
"net/http"
_ "net/http/pprof"
Expand All @@ -28,15 +29,14 @@ import (

// Register workload

"fmt"
"github.com/spf13/cobra"

"github.com/pingcap/go-ycsb/pkg/client"
"github.com/pingcap/go-ycsb/pkg/measurement"
"github.com/pingcap/go-ycsb/pkg/prop"
"github.com/pingcap/go-ycsb/pkg/util"
_ "github.com/pingcap/go-ycsb/pkg/workload"
"github.com/pingcap/go-ycsb/pkg/ycsb"
"github.com/spf13/cobra"

// Register basic database
_ "github.com/pingcap/go-ycsb/db/basic"
Expand Down Expand Up @@ -70,6 +70,8 @@ import (
_ "github.com/pingcap/go-ycsb/db/minio"
// Register elastic
_ "github.com/pingcap/go-ycsb/db/elasticsearch"
// Register etcd
_ "github.com/pingcap/go-ycsb/db/etcd"
)

var (
Expand Down
163 changes: 163 additions & 0 deletions db/etcd/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package etcd

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"strings"
"time"

clientv3 "go.etcd.io/etcd/client/v3"

"github.com/magiconair/properties"
"go.etcd.io/etcd/client/pkg/v3/transport"

"github.com/pingcap/go-ycsb/pkg/ycsb"
)

// properties
const (
etcdEndpoints = "etcd.endpoints"
etcdDialTimeout = "etcd.dial_timeout"
etcdCertFile = "etcd.cert_file"
etcdKeyFile = "etcd.key_file"
etcdCaFile = "etcd.cacert_file"
)

type etcdCreator struct{}

type etcdDB struct {
p *properties.Properties
client *clientv3.Client
}

func init() {
ycsb.RegisterDBCreator("etcd", etcdCreator{})
}

func (c etcdCreator) Create(p *properties.Properties) (ycsb.DB, error) {
cfg, err := getClientConfig(p)
if err != nil {
return nil, err
}

client, err := clientv3.New(*cfg)
if err != nil {
return nil, err
}

return &etcdDB{
p: p,
client: client,
}, nil
}

func getClientConfig(p *properties.Properties) (*clientv3.Config, error) {
endpoints := p.GetString(etcdEndpoints, "localhost:2379")
dialTimeout := p.GetDuration(etcdDialTimeout, 2*time.Second)

var tlsConfig *tls.Config
if strings.Contains(endpoints, "https") {
tlsInfo := transport.TLSInfo{
CertFile: p.MustGetString(etcdCertFile),
KeyFile: p.MustGetString(etcdKeyFile),
TrustedCAFile: p.MustGetString(etcdCaFile),
}
c, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
tlsConfig = c
}

return &clientv3.Config{
Endpoints: strings.Split(endpoints, ","),
DialTimeout: dialTimeout,
TLS: tlsConfig,
}, nil
}

func (db *etcdDB) Close() error {
return db.client.Close()
}

func (db *etcdDB) InitThread(ctx context.Context, _ int, _ int) context.Context {
return ctx
}

func (db *etcdDB) CleanupThread(_ context.Context) {
}

func getRowKey(table string, key string) string {
return fmt.Sprintf("%s:%s", table, key)
}

func (db *etcdDB) Read(ctx context.Context, table string, key string, _ []string) (map[string][]byte, error) {
rkey := getRowKey(table, key)
value, err := db.client.Get(ctx, rkey)
if err != nil {
return nil, err
}

if value.Count == 0 {
return nil, fmt.Errorf("could not find value for key [%s]", rkey)
}

var r map[string][]byte
err = json.NewDecoder(bytes.NewReader(value.Kvs[0].Value)).Decode(&r)
if err != nil {
return nil, err
}
return r, nil
}

func (db *etcdDB) Scan(ctx context.Context, table string, startKey string, count int, _ []string) ([]map[string][]byte, error) {
res := make([]map[string][]byte, count)
rkey := getRowKey(table, startKey)
values, err := db.client.Get(ctx, rkey, clientv3.WithFromKey(), clientv3.WithLimit(int64(count)))
if err != nil {
return nil, err
}

if values.Count != int64(count) {
return nil, fmt.Errorf("unexpected number of result for key [%s], expected %d but was %d", rkey, count, values.Count)
}

for _, v := range values.Kvs {
var r map[string][]byte
err = json.NewDecoder(bytes.NewReader(v.Value)).Decode(&r)
if err != nil {
return nil, err
}
res = append(res, r)
}
return res, nil
}

func (db *etcdDB) Update(ctx context.Context, table string, key string, values map[string][]byte) error {
rkey := getRowKey(table, key)
data, err := json.Marshal(values)
if err != nil {
return err
}
_, err = db.client.Put(ctx, rkey, string(data))
if err != nil {
return err
}

return nil
}

func (db *etcdDB) Insert(ctx context.Context, table string, key string, values map[string][]byte) error {
return db.Update(ctx, table, key, values)
}

func (db *etcdDB) Delete(ctx context.Context, table string, key string) error {
_, err := db.client.Delete(ctx, getRowKey(table, key))
if err != nil {
return err
}
return nil
}
3 changes: 3 additions & 0 deletions db/etcd/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package etcd

// If you want to use etcd, please follow the [Getting Started](https://github.com/etcd-io/etcd#getting-etcd) guide to install it.