diff --git a/etcdctl/ctlv3/command/check.go b/etcdctl/ctlv3/command/check.go new file mode 100644 index 00000000000..0a40f0b6f2c --- /dev/null +++ b/etcdctl/ctlv3/command/check.go @@ -0,0 +1,217 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "context" + "encoding/binary" + "fmt" + "math" + "math/rand" + "os" + "sync" + "time" + + v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/pkg/report" + + "github.com/spf13/cobra" + "golang.org/x/time/rate" + "gopkg.in/cheggaaa/pb.v1" +) + +var ( + checkPerfLoad string + checkPerfPrefix string +) + +type checkPerfCfg struct { + limit int + clients int + duration int +} + +var checkPerfCfgMap = map[string]checkPerfCfg{ + // TODO: support read limit + "s": { + limit: 150, + clients: 50, + duration: 60, + }, + "m": { + limit: 1000, + clients: 200, + duration: 60, + }, + "l": { + limit: 8000, + clients: 500, + duration: 60, + }, + "xl": { + limit: 15000, + clients: 1000, + duration: 60, + }, +} + +// NewCheckCommand returns the cobra command for "check". +func NewCheckCommand() *cobra.Command { + cc := &cobra.Command{ + Use: "check ", + Short: "commands for checking properties of the etcd cluster", + } + + cc.AddCommand(NewCheckPerfCommand()) + + return cc +} + +// NewCheckPerfCommand returns the cobra command for "check perf". +func NewCheckPerfCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "perf [options]", + Short: "Check the performance of the etcd cluster", + Run: newCheckPerfCommand, + } + + // TODO: support customized configuration + cmd.Flags().StringVar(&checkPerfLoad, "load", "s", "The performance check's workload model. Accepted workloads: s(small), m(medium), l(large), xl(xLarge)") + cmd.Flags().StringVar(&checkPerfPrefix, "prefix", "/etcdctl-check-perf/", "The prefix for writing the performance check's keys.") + + return cmd +} + +// newCheckPerfCommand executes the "check perf" command. +func newCheckPerfCommand(cmd *cobra.Command, args []string) { + var checkPerfAlias = map[string]string{ + "s": "s", "small": "s", + "m": "m", "medium": "m", + "l": "l", "large": "l", + "xl": "xl", "xLarge": "xl", + } + + model, ok := checkPerfAlias[checkPerfLoad] + if !ok { + ExitWithError(ExitBadFeature, fmt.Errorf("unknown load option %v", checkPerfLoad)) + } + cfg := checkPerfCfgMap[model] + + requests := make(chan v3.Op, cfg.clients) + limit := rate.NewLimiter(rate.Limit(cfg.limit), 1) + + var clients []*v3.Client + for i := 0; i < cfg.clients; i++ { + clients = append(clients, mustClientFromCmd(cmd)) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.duration)*time.Second) + resp, err := clients[0].Get(ctx, checkPerfPrefix, v3.WithPrefix(), v3.WithLimit(1)) + cancel() + if err != nil { + ExitWithError(ExitError, err) + } + if len(resp.Kvs) > 0 { + ExitWithError(ExitInvalidInput, fmt.Errorf("prefix %q has keys. Delete with etcdctl del --prefix %s first.", checkPerfPrefix, checkPerfPrefix)) + } + + ksize, vsize := 256, 1024 + k, v := make([]byte, ksize), string(make([]byte, vsize)) + + bar := pb.New(cfg.duration) + bar.Format("Bom !") + bar.Start() + + r := report.NewReport("%4.4f") + var wg sync.WaitGroup + + wg.Add(len(clients)) + for i := range clients { + go func(c *v3.Client) { + defer wg.Done() + for op := range requests { + st := time.Now() + _, derr := c.Do(context.Background(), op) + r.Results() <- report.Result{Err: derr, Start: st, End: time.Now()} + } + }(clients[i]) + } + + go func() { + cctx, _ := context.WithTimeout(context.Background(), time.Duration(cfg.duration)*time.Second) + + for limit.Wait(cctx) == nil { + binary.PutVarint(k, int64(rand.Int63n(math.MaxInt64))) + requests <- v3.OpPut(checkPerfPrefix+string(k), v) + } + close(requests) + }() + + go func() { + for i := 0; i < cfg.duration; i++ { + time.Sleep(time.Second) + bar.Add(1) + } + bar.Finish() + }() + + sc := r.Stats() + wg.Wait() + close(r.Results()) + + s := <-sc + + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + _, err = clients[0].Delete(ctx, checkPerfPrefix, v3.WithPrefix()) + cancel() + if err != nil { + ExitWithError(ExitError, err) + } + + ok = true + if len(s.ErrorDist) != 0 { + fmt.Println("FAIL: too many errors") + for k, v := range s.ErrorDist { + fmt.Printf("FAIL: ERROR(%v) -> %d\n", k, v) + } + ok = false + } + + if s.RPS/float64(cfg.limit) <= 0.9 { + fmt.Printf("FAIL: Throughput too low: %d writes/s\n", int(s.RPS)+1) + ok = false + } else { + fmt.Printf("PASS: Throughput is %d writes/s\n", int(s.RPS)+1) + } + if s.Slowest > 0.5 { // slowest request > 500ms + fmt.Printf("Slowest request took too long: %fs\n", s.Slowest) + ok = false + } else { + fmt.Printf("PASS: Slowest request took %fs\n", s.Slowest) + } + if s.Stddev > 0.1 { // stddev > 100ms + fmt.Printf("Stddev too high: %fs\n", s.Stddev) + ok = false + } else { + fmt.Printf("PASS: Stddev is %fs\n", s.Stddev) + } + + if ok { + fmt.Println("PASS") + } else { + fmt.Println("FAIL") + os.Exit(ExitError) + } +} diff --git a/etcdctl/ctlv3/ctl.go b/etcdctl/ctlv3/ctl.go index 4875b527839..68ca6e0f250 100644 --- a/etcdctl/ctlv3/ctl.go +++ b/etcdctl/ctlv3/ctl.go @@ -80,6 +80,7 @@ func init() { command.NewAuthCommand(), command.NewUserCommand(), command.NewRoleCommand(), + command.NewCheckCommand(), ) }