Skip to content

Commit

Permalink
refactor: unified metrics names for forwarders (#1290)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Oct 30, 2023
1 parent 17b7b31 commit 32416bf
Show file tree
Hide file tree
Showing 48 changed files with 1,050 additions and 1,245 deletions.
2 changes: 1 addition & 1 deletion config/base/dex/numaflow-dex-server-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ data:
staticClients:
- id: numaflow-server-app
redirectURIs:
- <HOSTNAME>/<base_herf>/login
- <HOSTNAME>/<base_href>/login
name: 'Numaflow Server App'
public: true
connectors:
Expand Down
2 changes: 1 addition & 1 deletion config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16373,7 +16373,7 @@ metadata:
apiVersion: v1
data:
config.yaml: "issuer: <HOSTNAME>/dex\nstorage:\n type: memory\nweb:\n http: 0.0.0.0:5556\nstaticClients:\n
\ - id: numaflow-server-app\n redirectURIs: \n - <HOSTNAME>/<base_herf>/login\n
\ - id: numaflow-server-app\n redirectURIs: \n - <HOSTNAME>/<base_href>/login\n
\ name: 'Numaflow Server App'\n public: true\nconnectors:\n- type: github\n
\ # https://dexidp.io/docs/connectors/github/\n id: github\n name: GitHub\n
\ config:\n clientID: $GITHUB_CLIENT_ID\n clientSecret: $GITHUB_CLIENT_SECRET\n
Expand Down
2 changes: 1 addition & 1 deletion config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16279,7 +16279,7 @@ metadata:
apiVersion: v1
data:
config.yaml: "issuer: <HOSTNAME>/dex\nstorage:\n type: memory\nweb:\n http: 0.0.0.0:5556\nstaticClients:\n
\ - id: numaflow-server-app\n redirectURIs: \n - <HOSTNAME>/<base_herf>/login\n
\ - id: numaflow-server-app\n redirectURIs: \n - <HOSTNAME>/<base_href>/login\n
\ name: 'Numaflow Server App'\n public: true\nconnectors:\n- type: github\n
\ # https://dexidp.io/docs/connectors/github/\n id: github\n name: GitHub\n
\ config:\n clientID: $GITHUB_CLIENT_ID\n clientSecret: $GITHUB_CLIENT_SECRET\n
Expand Down
114 changes: 42 additions & 72 deletions docs/operations/metrics/metrics.md

Large diffs are not rendered by default.

11 changes: 0 additions & 11 deletions pkg/daemon/server/service/rater/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,6 @@ import (

const (
indexNotFound = -1

// The following string set is used to identify the vertex type of pod

// keyVertexTypeReduce is the vertex type string for reduce vertex
keyVertexTypeReduce = "reduce"
// keyVertexTypeSource is the vertex type string for a source vertex
keyVertexTypeSource = "source"
// keyVertexTypeSink is the vertex type string for a sink vertex
keyVertexTypeSink = "sink"
// keyVertexTypeOther is the vertex type string for other vertices
keyVertexTypeOther = "other"
)

// UpdateCount updates the count of processed messages for a pod at a given time
Expand Down
20 changes: 3 additions & 17 deletions pkg/daemon/server/service/rater/pod_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,9 @@ func (pt *PodTracker) trackActivePods(ctx context.Context) {

func (pt *PodTracker) updateActivePods() {
for _, v := range pt.pipeline.Spec.Vertices {
vType := getVertexType(v)
for i := 0; i < int(v.Scale.GetMaxReplicas()); i++ {
podName := fmt.Sprintf("%s-%s-%d", pt.pipeline.Name, v.Name, i)
podKey := pt.getPodKey(i, v.Name, vType)
podKey := pt.getPodKey(i, v.Name)
if pt.isActive(v.Name, podName) {
pt.activePods.PushBack(podKey)
} else {
Expand All @@ -112,19 +111,6 @@ func (pt *PodTracker) updateActivePods() {
pt.log.Debugf("Finished updating the active pod set: %v", pt.activePods.ToString())
}

func getVertexType(v v1alpha1.AbstractVertex) string {
switch {
case v.IsReduceUDF():
return keyVertexTypeReduce
case v.IsASource():
return keyVertexTypeSource
case v.IsASink():
return keyVertexTypeSink
default:
return keyVertexTypeOther
}
}

// LeastRecentlyUsed returns the least recently used pod from the active pod list.
// if there are no active pods, it returns an empty string.
func (pt *PodTracker) LeastRecentlyUsed() string {
Expand All @@ -145,9 +131,9 @@ func (pt *PodTracker) GetActivePodsCount() int {
return pt.activePods.Length()
}

func (pt *PodTracker) getPodKey(index int, vertexName string, vertexType string) string {
func (pt *PodTracker) getPodKey(index int, vertexName string) string {
// podKey is used as a unique identifier for the pod, it is used by worker to determine the count of processed messages of the pod.
return strings.Join([]string{pt.pipeline.Name, vertexName, fmt.Sprintf("%d", index), vertexType}, PodInfoSeparator)
return strings.Join([]string{pt.pipeline.Name, vertexName, fmt.Sprintf("%d", index)}, PodInfoSeparator)
}

func (pt *PodTracker) isActive(vertexName, podName string) bool {
Expand Down
8 changes: 4 additions & 4 deletions pkg/daemon/server/service/rater/pod_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func TestPodTracker_Start(t *testing.T) {
cancel()
wg.Wait()

assert.Equal(t, "p*v*0*other", tracker.LeastRecentlyUsed())
assert.Equal(t, "p*v*1*other", tracker.LeastRecentlyUsed())
assert.Equal(t, true, tracker.IsActive("p*v*4*other"))
assert.Equal(t, false, tracker.IsActive("p*v*5*other"))
assert.Equal(t, "p*v*0", tracker.LeastRecentlyUsed())
assert.Equal(t, "p*v*1", tracker.LeastRecentlyUsed())
assert.Equal(t, true, tracker.IsActive("p*v*4"))
assert.Equal(t, false, tracker.IsActive("p*v*5"))
}
21 changes: 6 additions & 15 deletions pkg/daemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.uber.org/zap"

"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/logging"
sharedqueue "github.com/numaproj/numaflow/pkg/shared/queue"
)
Expand Down Expand Up @@ -133,15 +134,14 @@ func (r *Rater) monitorOnePod(ctx context.Context, key string, worker int) error
log := logging.FromContext(ctx).With("worker", fmt.Sprint(worker)).With("podKey", key)
log.Debugf("Working on key: %s", key)
podInfo := strings.Split(key, PodInfoSeparator)
if len(podInfo) != 4 {
if len(podInfo) != 3 {
return fmt.Errorf("invalid key %q", key)
}
vertexName := podInfo[1]
vertexType := podInfo[3]
podName := strings.Join([]string{podInfo[0], podInfo[1], podInfo[2]}, "-")
var podReadCount *PodReadCount
if r.podTracker.IsActive(key) {
podReadCount = r.getPodReadCounts(vertexName, vertexType, podName)
podReadCount = r.getPodReadCounts(vertexName, podName)
if podReadCount == nil {
log.Debugf("Failed retrieving total podReadCount for pod %s", podName)
}
Expand Down Expand Up @@ -216,17 +216,8 @@ func sleep(ctx context.Context, duration time.Duration) {

// getPodReadCounts returns the total number of messages read by the pod
// since a pod can read from multiple partitions, we will return a map of partition to read count.
func (r *Rater) getPodReadCounts(vertexName, vertexType, podName string) *PodReadCount {
metricNames := map[string]string{
keyVertexTypeReduce: "reduce_isb_reader_data_read",
keyVertexTypeSource: "source_forwarder_read_total",
keyVertexTypeSink: "sink_forwarder_data_read",
}

readTotalMetricName, ok := metricNames[vertexType]
if !ok {
readTotalMetricName = "forwarder_data_read"
}
func (r *Rater) getPodReadCounts(vertexName, podName string) *PodReadCount {
readTotalMetricName := "forwarder_data_read_total"

// scrape the read total metric from pod metric port
url := fmt.Sprintf("https://%s.%s.%s.svc:%v/metrics", podName, r.pipeline.Name+"-"+vertexName+"-headless", r.pipeline.Namespace, v1alpha1.VertexMetricsPort)
Expand All @@ -250,7 +241,7 @@ func (r *Rater) getPodReadCounts(vertexName, vertexType, podName string) *PodRea
for _, ele := range metricsList {
var partitionName string
for _, label := range ele.Label {
if label.GetName() == "partition_name" {
if label.GetName() == metrics.LabelPartitionName {
partitionName = label.GetValue()
break
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/daemon/server/service/rater/rater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,19 @@ func (m *raterMockHttpClient) Get(url string) (*http.Response, error) {
// the test uses an abstract vertex without specifying vertex type, meaning it's neither source nor reduce,
// hence the default forwarder metric name "forwarder_data_read" is used to retrieve the metric
Body: io.NopCloser(bytes.NewReader([]byte(fmt.Sprintf(`
# HELP forwarder_data_read Total number of Messages Read
# TYPE forwarder_data_read counter
forwarder_data_read{buffer="input",pipeline="simple-pipeline",vertex="input",partition_name="p-v-0"} %d
# HELP forwarder_data_read_total Total number of Messages Read
# TYPE forwarder_data_read_total counter
forwarder_data_read_total{buffer="input",pipeline="simple-pipeline",vertex="input",replica="0",partition_name="p-v-0"} %d
`, m.podOneCount))))}
return resp, nil
} else if url == "https://p-v-1.p-v-headless.default.svc:2469/metrics" {
m.podTwoCount = m.podTwoCount + 60
resp := &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader([]byte(fmt.Sprintf(`
# HELP forwarder_data_read Total number of Messages Read
# TYPE forwarder_data_read counter
forwarder_data_read{buffer="input",pipeline="simple-pipeline",vertex="input", partition_name="p-v-1"} %d
# HELP forwarder_data_read_total Total number of Messages Read
# TYPE forwarder_data_read_total counter
forwarder_data_read_total{buffer="input",pipeline="simple-pipeline",vertex="input",replica="0",partition_name="p-v-1"} %d
`, m.podTwoCount))))}
return resp, nil
} else {
Expand Down
Loading

0 comments on commit 32416bf

Please sign in to comment.