Skip to content

Commit

Permalink
Report the volume of etcd writes via a diagnostic
Browse files Browse the repository at this point in the history
New EtcdWriteVolume diagnostic measures the number of writes in a time
period to determine where significant write volume is going.
  • Loading branch information
smarterclayton committed Jun 15, 2017
1 parent acb8636 commit 68e1ede
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 7 deletions.
15 changes: 14 additions & 1 deletion pkg/cmd/admin/diagnostics/diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (o *DiagnosticsOptions) Complete(args []string) error {

o.RequestedDiagnostics = append(o.RequestedDiagnostics, args...)
if len(o.RequestedDiagnostics) == 0 {
o.RequestedDiagnostics = availableDiagnostics().List()
o.RequestedDiagnostics = availableDiagnostics().Difference(defaultSkipDiagnostics()).List()
}

return nil
Expand Down Expand Up @@ -248,10 +248,17 @@ func availableDiagnostics() sets.String {
available := sets.NewString()
available.Insert(availableClientDiagnostics.List()...)
available.Insert(availableClusterDiagnostics.List()...)
available.Insert(availableEtcdDiagnostics.List()...)
available.Insert(availableHostDiagnostics.List()...)
return available
}

func defaultSkipDiagnostics() sets.String {
available := sets.NewString()
available.Insert(defaultSkipEtcdDiagnostics.List()...)
return available
}

// RunDiagnostics builds diagnostics based on the options and executes them, returning a summary.
func (o DiagnosticsOptions) RunDiagnostics() (bool, error, int, int) {
failed := false
Expand Down Expand Up @@ -299,6 +306,12 @@ func (o DiagnosticsOptions) RunDiagnostics() (bool, error, int, int) {
}
}

etcdDiags, ok, err := o.buildEtcdDiagnostics()
failed = failed || !ok
if ok {
diagnostics = append(diagnostics, etcdDiags...)
}

hostDiags, ok, err := o.buildHostDiagnostics()
failed = failed || !ok
if ok {
Expand Down
91 changes: 91 additions & 0 deletions pkg/cmd/admin/diagnostics/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package diagnostics

import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/util/sets"

etcdclient "github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"

"github.com/openshift/origin/pkg/cmd/server/etcd"
clustdiags "github.com/openshift/origin/pkg/diagnostics/cluster"
"github.com/openshift/origin/pkg/diagnostics/host"
"github.com/openshift/origin/pkg/diagnostics/types"
)

var (
// availableEtcdDiagnostics contains the names of etcd diagnostics that can be executed
// during a single run of diagnostics. Add more diagnostics to the list as they are defined.
availableEtcdDiagnostics = sets.NewString(
clustdiags.EtcdWriteName,
)
// defaultSkipEtcdDiagnostics is a list of diagnostics to skip by default
defaultSkipEtcdDiagnostics = sets.NewString(
clustdiags.EtcdWriteName,
)
)

// buildEtcdDiagnostics builds cluster Diagnostic objects if etcd is configured
func (o DiagnosticsOptions) buildEtcdDiagnostics() ([]types.Diagnostic, bool, error) {
requestedDiagnostics := availableEtcdDiagnostics.Intersection(sets.NewString(o.RequestedDiagnostics...)).List()
if len(requestedDiagnostics) == 0 { // no diagnostics to run here
return nil, true, nil // don't waste time on discovery
}

v2Client, v3Client, found, err := o.findEtcdClients(o.MasterConfigLocation)
if !found {
o.Logger.Notice("DE2000", "Could not configure etcd clients against the current config, so etcd diagnostics will be skipped")
return nil, true, err
}

diagnostics := []types.Diagnostic{}
for _, diagnosticName := range requestedDiagnostics {
var d types.Diagnostic
switch diagnosticName {
case clustdiags.EtcdWriteName:
d = &clustdiags.EtcdWriteVolume{V2Client: v2Client, V3Client: v3Client}
default:
return nil, false, fmt.Errorf("unknown diagnostic: %v", diagnosticName)
}
diagnostics = append(diagnostics, d)
}
return diagnostics, true, nil
}

// findEtcdClients finds and loads etcd clients
func (o DiagnosticsOptions) findEtcdClients(configFile string) (etcdclient.Client, *clientv3.Client, bool, error) {
r := types.NewDiagnosticResult("")
masterConfig, err := host.GetMasterConfig(r, configFile)
if err != nil {
configErr := fmt.Errorf("Unreadable master config; skipping this diagnostic.")
o.Logger.Error("DE2001", configErr.Error())
return nil, nil, false, configErr
}
if len(masterConfig.EtcdClientInfo.URLs) == 0 {
configErr := fmt.Errorf("No etcdClientInfo.urls defined; can't contact etcd")
o.Logger.Error("DE2002", configErr.Error())
return nil, nil, false, configErr
}
v2Client, err := etcd.MakeEtcdClient(masterConfig.EtcdClientInfo)
if err != nil {
configErr := fmt.Errorf("Unable to create an etcd v2 client: %v", err)
o.Logger.Error("DE2003", configErr.Error())
return nil, nil, false, configErr
}
config, err := etcd.MakeEtcdClientV3Config(masterConfig.EtcdClientInfo)
if err != nil {
configErr := fmt.Errorf("Unable to create an etcd v3 client config: %v", err)
o.Logger.Error("DE2004", configErr.Error())
return nil, nil, false, configErr
}
config.DialTimeout = 5 * time.Second
v3Client, err := clientv3.New(*config)
if err != nil {
configErr := fmt.Errorf("Unable to create an etcd v3 client: %v", err)
o.Logger.Error("DE2005", configErr.Error())
return nil, nil, false, configErr
}
return v2Client, v3Client, true, nil
}
19 changes: 13 additions & 6 deletions pkg/cmd/server/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func GetAndTestEtcdClientV3(etcdClientInfo configapi.EtcdConnectionInfo) (*clien
return etcdClient, nil
}

// MakeEtcdClient creates an etcd client based on the provided config.
func MakeEtcdClientV3(etcdClientInfo configapi.EtcdConnectionInfo) (*clientv3.Client, error) {
// MakeEtcdClientV3Config creates client configuration based on the configapi.
func MakeEtcdClientV3Config(etcdClientInfo configapi.EtcdConnectionInfo) (*clientv3.Config, error) {
tlsConfig, err := restclient.TLSConfigFor(&restclient.Config{
TLSClientConfig: restclient.TLSClientConfig{
CertFile: etcdClientInfo.ClientCert.CertFile,
Expand All @@ -108,16 +108,23 @@ func MakeEtcdClientV3(etcdClientInfo configapi.EtcdConnectionInfo) (*clientv3.Cl
return nil, err
}

cfg := clientv3.Config{
return &clientv3.Config{
Endpoints: etcdClientInfo.URLs,
DialTimeout: 30 * time.Second,
TLS: tlsConfig,
}
}, nil
}

return clientv3.New(cfg)
// MakeEtcdClientV3 creates an etcd v3 client based on the provided config.
func MakeEtcdClientV3(etcdClientInfo configapi.EtcdConnectionInfo) (*clientv3.Client, error) {
cfg, err := MakeEtcdClientV3Config(etcdClientInfo)
if err != nil {
return nil, err
}
return clientv3.New(*cfg)
}

// TestEtcdClient verifies a client is functional. It will attempt to
// TestEtcdClientV3 verifies a client is functional. It will attempt to
// connect to the etcd server and block until the server responds at least once, or return an
// error if the server never responded.
func TestEtcdClientV3(etcdClient *clientv3.Client) error {
Expand Down
189 changes: 189 additions & 0 deletions pkg/diagnostics/cluster/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package cluster

import (
"context"
"fmt"
"os"
"sort"
"strings"
"sync"
"text/tabwriter"
"time"

etcdclient "github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"

"bytes"

"github.com/openshift/origin/pkg/diagnostics/types"
)

// EtcdWriteVolume is a Diagnostic to check the writes occurring against etcd
// and organize them by volume.
type EtcdWriteVolume struct {
V2Client etcdclient.Client
V3Client *clientv3.Client
}

const (
EtcdWriteName = "EtcdWriteVolume"
)

func (d *EtcdWriteVolume) duration() time.Duration {
s := os.Getenv("ETCD_WRITE_VOLUME_DURATION")
if len(s) == 0 {
s = "1m"
}
duration, err := time.ParseDuration(s)
if err != nil {
panic(fmt.Errorf("ETCD_WRITE_VOLUME_DURATION could not be parsed: %v", err))
}
return duration
}

func (d *EtcdWriteVolume) Name() string {
return EtcdWriteName
}

func (d *EtcdWriteVolume) Description() string {
return fmt.Sprintf("Check the volume of writes against etcd and classify them by operation and key for %s", d.duration())
}

func (d *EtcdWriteVolume) CanRun() (bool, error) {
if d.V2Client == nil {
return false, fmt.Errorf("must have a V2 etcd client")
}
if d.V3Client == nil {
return false, fmt.Errorf("must have a V3 etcd client")
}
return true, nil
}

func (d *EtcdWriteVolume) Check() types.DiagnosticResult {
r := types.NewDiagnosticResult(EtcdWriteName)

var wg sync.WaitGroup

duration := d.duration()
ctx := context.Background()
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(duration))
defer cancel()

keyStats := &keyCounter{}
stats := &lockedKeyCounter{KeyCounter: keyStats}

wg.Add(2)
go func() {
defer wg.Done()
keys := etcdclient.NewKeysAPI(d.V2Client)
w := keys.Watcher("/", &etcdclient.WatcherOptions{Recursive: true})
for {
evt, err := w.Next(ctx)
if err != nil {
if err != context.DeadlineExceeded {
r.Error("DEw2001", err, fmt.Sprintf("Unable to get a v2 watch event, stopping early: %v", err))
}
return
}
node := evt.Node
if node == nil {
node = evt.PrevNode
}
if node == nil {
continue
}
action := fmt.Sprintf("v2:%s", evt.Action)
stats.Inc(strings.Split(action+"/"+strings.TrimPrefix(evt.Node.Key, "/"), "/"))
}
}()
go func() {
defer wg.Done()
ch := d.V3Client.Watch(ctx, "/", clientv3.WithKeysOnly(), clientv3.WithPrefix())
for resource := range ch {
for _, evt := range resource.Events {
if evt.Kv == nil {
continue
}
action := fmt.Sprintf("v3:%s", evt.Type)
stats.Inc(strings.Split(action+"/"+strings.TrimPrefix(string(evt.Kv.Key), "/"), "/"))
}
}
}()
wg.Wait()

bins := keyStats.Bins("", "/")
sort.Sort(DescendingBins(bins))

buf := &bytes.Buffer{}
tw := tabwriter.NewWriter(buf, 0, 0, 1, ' ', 0)
fmt.Fprintf(tw, "/\t%6d\t100.0%%\n", keyStats.count)
for _, b := range bins {
fmt.Fprintf(tw, "%s\t%6d\t%5.1f%%\n", b.Name, b.Count, float64(b.Count)/float64(keyStats.count)*100)
}
tw.Flush()
r.Info("DEw2004", fmt.Sprintf("Measured %.1f writes/sec\n", float64(keyStats.count)/float64(duration/time.Second))+buf.String())

return r
}

type KeyCounter interface {
Inc(key []string)
}

type lockedKeyCounter struct {
lock sync.Mutex
KeyCounter
}

func (c *lockedKeyCounter) Inc(key []string) {
c.lock.Lock()
defer c.lock.Unlock()
c.KeyCounter.Inc(key)
}

type keyCounter struct {
count int
children map[string]*keyCounter
}

func (b *keyCounter) Inc(key []string) {
b.count++
if len(key) == 0 {
return
}
if b.children == nil {
b.children = make(map[string]*keyCounter)
}
child, ok := b.children[key[0]]
if !ok {
child = &keyCounter{}
b.children[key[0]] = child
}
child.Inc(key[1:])
}

type Bin struct {
Name string
Count int
}

func (b *keyCounter) Bins(parent, separator string) []Bin {
var bins []Bin
for k, v := range b.children {
childKey := parent + separator + k
bins = append(bins, Bin{Name: childKey, Count: v.count})
bins = append(bins, v.Bins(childKey, separator)...)
}
return bins
}

type DescendingBins []Bin

func (m DescendingBins) Len() int { return len(m) }
func (m DescendingBins) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m DescendingBins) Less(i, j int) bool {
if m[i].Name < m[j].Name {
return true
}
return false
}
Loading

0 comments on commit 68e1ede

Please sign in to comment.