Skip to content

Commit

Permalink
feat: add volume attachment time & csi grpc execution time metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
iltyty committed Aug 29, 2024
1 parent d8b52a6 commit c688fbe
Show file tree
Hide file tree
Showing 14 changed files with 420 additions and 116 deletions.
6 changes: 3 additions & 3 deletions pkg/common/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func ParseEndpoint(ep string) (string, string, error) {
return "", "", fmt.Errorf("Invalid endpoint: %v", ep)
}

func RunCSIServer(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
ns = WrapNodeServerWithValidator(ns)
cs = WrapControllerServerWithValidator(cs)
func RunCSIServer(driverType, endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
ns = WrapNodeServerWithValidator(WrapNodeServerWithMetricRecorder(driverType, ns))
cs = WrapControllerServerWithValidator(WrapControllerServerWithMetricRecorder(driverType, cs))

proto, addr, err := ParseEndpoint(endpoint)
if err != nil {
Expand Down
78 changes: 78 additions & 0 deletions pkg/common/volume_stats_controllerserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package common

import (
"context"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/metric"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func WrapControllerServerWithMetricRecorder(driverType string, server csi.ControllerServer) csi.ControllerServer {
return &ControllerServerWithMetricRecorder{driverType, server}
}

type ControllerServerWithMetricRecorder struct {
driverType string
csi.ControllerServer
}

func (cs *ControllerServerWithMetricRecorder) observeExecTime(time int64, statType metric.VolumeStatType, volumeId string, err error) {
errCode := getCodeFromError(err).String()
metric.VolumeStatCollector.Metrics[statType].With(prometheus.Labels{
metric.VolumeStatTypeLabelName: cs.driverType,
metric.VolumeStatIdLabelName: volumeId,
metric.VolumeStatErrCodeLabelName: errCode,
}).Observe(float64(time))
}

func getCodeFromError(err error) codes.Code {
status, ok := status.FromError(err)
if ok {
return status.Code()
}
return codes.Unknown
}

func (cs *ControllerServerWithMetricRecorder) CreateVolume(context context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
return cs.ControllerServer.CreateVolume(context, req)
}

func (cs *ControllerServerWithMetricRecorder) DeleteVolume(context context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
return cs.ControllerServer.DeleteVolume(context, req)
}

func (cs *ControllerServerWithMetricRecorder) ControllerPublishVolume(context context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
startTime := time.Now()
resp, err := cs.ControllerServer.ControllerPublishVolume(context, req)
execTime := time.Since(startTime).Milliseconds()
cs.observeExecTime(execTime, metric.ControllerPublishExecTimeStat, req.GetVolumeId(), err)
return resp, err
}

func (cs *ControllerServerWithMetricRecorder) ControllerUnpublishVolume(context context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
startTime := time.Now()
resp, err := cs.ControllerServer.ControllerUnpublishVolume(context, req)
execTime := time.Since(startTime).Milliseconds()
cs.observeExecTime(execTime, metric.ControllerUnpublishExecTimeStat, req.GetVolumeId(), err)
return resp, err
}

func (cs *ControllerServerWithMetricRecorder) ValidateVolumeCapabilities(context context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
return cs.ControllerServer.ValidateVolumeCapabilities(context, req)
}

func (cs *ControllerServerWithMetricRecorder) CreateSnapshot(context context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
return cs.ControllerServer.CreateSnapshot(context, req)
}

func (cs *ControllerServerWithMetricRecorder) DeleteSnapshot(context context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
return cs.ControllerServer.DeleteSnapshot(context, req)
}

func (cs *ControllerServerWithMetricRecorder) ControllerExpandVolume(context context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
return cs.ControllerServer.ControllerExpandVolume(context, req)
}
129 changes: 129 additions & 0 deletions pkg/common/volume_stats_nodeserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package common

import (
"context"
"fmt"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/metric"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/options"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

const (
podNameKey = "csi.storage.k8s.io/pod.name"
podNamespaceKey = "csi.storage.k8s.io/pod.namespace"
)

func WrapNodeServerWithMetricRecorder(driverType string, server csi.NodeServer) csi.NodeServer {
config, err := clientcmd.BuildConfigFromFlags(options.MasterURL, options.Kubeconfig)
if err != nil {
logrus.Errorf("initializing kubernetes config for node server with metric recorder failed: %s", err)
return &NodeServerWithMetricRecorder{server, driverType, nil}
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
logrus.Errorf("initializing kubernetes clientset for node server with metric recorder failed: %s", err)
return &NodeServerWithMetricRecorder{server, driverType, nil}
}
return &NodeServerWithMetricRecorder{server, driverType, clientset}
}

type NodeServerWithMetricRecorder struct {
csi.NodeServer
driverType string
clientset *kubernetes.Clientset
}

func (ns *NodeServerWithMetricRecorder) observeExecTime(time int64, statType metric.VolumeStatType, volumeId string, err error) {
errCode := getCodeFromError(err).String()
metric.VolumeStatCollector.Metrics[statType].With(prometheus.Labels{
metric.VolumeStatTypeLabelName: ns.driverType,
metric.VolumeStatIdLabelName: volumeId,
metric.VolumeStatErrCodeLabelName: errCode,
}).Observe(float64(time))
}

func (ns *NodeServerWithMetricRecorder) observeVolumeAttachmentTime(curTime int64, req *csi.NodePublishVolumeRequest, err error) {
errCode := getCodeFromError(err).String()
if ns.clientset == nil {
return
}
podName, podNamespace := req.VolumeContext[podNameKey], req.VolumeContext[podNamespaceKey]
if podName == "" || podNamespace == "" {
logrus.Warnf("observeVolumeAttachmentTime: empty pod name/namespace: %s, %s", podName, podNamespace)
return
}
pod, err := ns.clientset.CoreV1().Pods(podNamespace).Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
logrus.Errorf("error getting pod %s/%s when observing volume attachment time for volume %s", podNamespace, podName, req.GetVolumeId())
return
}
podStartTime, err := getPodStartTimestamp(pod)
if err != nil {
logrus.Errorf("error getting scheduled time for pod %s/%s when observing volume attachment time for volume %s", podNamespace, podName, req.GetVolumeId())
return
}

metric.VolumeStatCollector.Metrics[metric.VolumeAttachTimeStat].With(prometheus.Labels{
metric.VolumeStatTypeLabelName: ns.driverType,
metric.VolumeStatIdLabelName: req.GetVolumeId(),
metric.VolumeStatErrCodeLabelName: errCode,
}).Observe(float64(curTime) - float64(podStartTime))
}

func getPodStartTimestamp(pod *v1.Pod) (int64, error) {
startTime := pod.Status.StartTime
if startTime == nil {
return 0, fmt.Errorf("no start time found for pod %s/%s ", pod.GetNamespace(), pod.GetName())
}
return startTime.Time.UnixMilli(), nil
}

func (ns *NodeServerWithMetricRecorder) NodeStageVolume(context context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
startTime := time.Now()
resp, err := ns.NodeServer.NodeStageVolume(context, req)
execTime := time.Since(startTime).Milliseconds()
ns.observeExecTime(execTime, metric.NodeStageExecTimeStat, req.GetVolumeId(), err)
return resp, err
}

func (ns *NodeServerWithMetricRecorder) NodeUnstageVolume(context context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
startTime := time.Now()
resp, err := ns.NodeServer.NodeUnstageVolume(context, req)
execTime := time.Since(startTime).Milliseconds()
ns.observeExecTime(execTime, metric.NodeUnstageExecTimeStat, req.GetVolumeId(), err)
return resp, err
}

func (ns *NodeServerWithMetricRecorder) NodePublishVolume(context context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
startTime := time.Now()
resp, err := ns.NodeServer.NodePublishVolume(context, req)
curTime := time.Now()
execTime := curTime.Sub(startTime)
ns.observeExecTime(execTime.Milliseconds(), metric.NodePublishExecTimeStat, req.GetVolumeId(), err)
ns.observeVolumeAttachmentTime(curTime.UnixMilli(), req, err)
return resp, err
}

func (ns *NodeServerWithMetricRecorder) NodeUnpublishVolume(context context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
startTime := time.Now()
resp, err := ns.NodeServer.NodeUnpublishVolume(context, req)
time := time.Since(startTime).Milliseconds()
ns.observeExecTime(time, metric.NodeUnpublishExecTimeStat, req.GetVolumeId(), err)
return resp, err
}

func (ns *NodeServerWithMetricRecorder) NodeGetVolumeStats(context context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
return ns.NodeServer.NodeGetVolumeStats(context, req)
}

func (ns *NodeServerWithMetricRecorder) NodeExpandVolume(context context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
return ns.NodeServer.NodeExpandVolume(context, req)
}
3 changes: 2 additions & 1 deletion pkg/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

// PluginFolder defines the location of diskplugin
const (
driverType = "disk"
driverName = "diskplugin.csi.alibabacloud.com"
TopologyZoneKey = "topology." + driverName + "/zone"
TopologyMultiZonePrefix = TopologyZoneKey + "-"
Expand Down Expand Up @@ -132,7 +133,7 @@ func NewDriver(m metadata.MetadataProvider, endpoint string, runAsController boo
// Run start a new NodeServer
func (disk *DISK) Run() {
log.Infof("Starting csi-plugin Driver: %v version: %v", driverName, version.VERSION)
common.RunCSIServer(disk.endpoint, disk.idServer, disk.controllerServer, disk.nodeServer)
common.RunCSIServer(driverType, disk.endpoint, disk.idServer, disk.controllerServer, disk.nodeServer)
}

// GlobalConfigSet set Global Config
Expand Down
3 changes: 2 additions & 1 deletion pkg/ens/ens.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
)

const (
driverType = "ens"
driverName = "ensplugin.csi.alibabacloud.com"
ensInstanceIDLabelKey = "alibabacloud.com/ens-instance-id"
clusterProfileKey = "ack-cluster-profile"
Expand Down Expand Up @@ -76,7 +77,7 @@ func NewDriver(nodeID, endpoint string) *ENS {

func (ens *ENS) Run() {
log.Infof("Run: start csi-plugin driver: %s, version %s", driverName, version.VERSION)
common.RunCSIServer(ens.endpoint, ens.idServer, ens.controllerServer, ens.nodeServer)
common.RunCSIServer(driverType, ens.endpoint, ens.idServer, ens.controllerServer, ens.nodeServer)
}

func NewGlobalConfig() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/metric/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (

var (
scrapeDurationDesc = prometheus.NewDesc(
prometheus.BuildFQName(clusterNamespace, scrapeSubSystem, "collector_duration_seconds"),
prometheus.BuildFQName(clusterNamespace, scrapeSubsystem, "collector_duration_seconds"),
"csi_metric: Duration of a collector scrape.",
[]string{"collector"},
nil,
)
scrapeSuccessDesc = prometheus.NewDesc(
prometheus.BuildFQName(clusterNamespace, scrapeSubSystem, "collector_success"),
prometheus.BuildFQName(clusterNamespace, scrapeSubsystem, "collector_success"),
"csi_metric: Whether a collector succeeded.",
[]string{"collector"},
nil,
Expand Down Expand Up @@ -71,8 +71,8 @@ func newCSICollector(metricType string, driverNames []string) error {
collectors[reg.Name] = collector
}
}

}
collectors[VolumeStatCollectorName] = &VolumeStatCollector
csiCollectorInstance = &CSICollector{Collectors: collectors}

return nil
Expand Down
34 changes: 18 additions & 16 deletions pkg/metric/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,24 @@ const (
)

const (
clusterNamespace string = "cluster"
nodeNamespace string = "node"
scrapeSubSystem string = "scrape"
volumeSubSystem string = "volume"
latencySwitch = "latency"
capacitySwitch = "capacity"
diskSectorSize = 512
diskDefaultsLantencyThreshold = 10
diskDefaultsCapacityPercentageThreshold = 85
nfsDefaultsCapacityPercentageThreshold = 85
float64EqualityThreshold = 1e-9
diskStatsFileName = "diskstats"
nfsStatsFileName = "/proc/self/mountstats"
latencyTooHigh = "LatencyTooHigh"
capacityNotEnough = "NotEnoughDiskSpace"
ioHang = "IOHang"
clusterNamespace = "cluster"
csiNamespace = "csi"
nodeNamespace = "node"
execTimeSubsystem = "exec_time"
scrapeSubsystem = "scrape"
volumeSubsystem = "volume"
latencySwitch = "latency"
capacitySwitch = "capacity"
diskSectorSize = 512
diskDefaultsLantencyThreshold = 10
diskDefaultsCapacityPercentageThreshold = 85
nfsDefaultsCapacityPercentageThreshold = 85
float64EqualityThreshold = 1e-9
diskStatsFileName = "diskstats"
nfsStatsFileName = "/proc/self/mountstats"
latencyTooHigh = "LatencyTooHigh"
capacityNotEnough = "NotEnoughDiskSpace"
ioHang = "IOHang"
)

const (
Expand Down
Loading

0 comments on commit c688fbe

Please sign in to comment.