diff --git a/metrics/prometheus/constant.go b/metrics/prometheus/constant.go index 275fe25ca4..41904dba3e 100644 --- a/metrics/prometheus/constant.go +++ b/metrics/prometheus/constant.go @@ -36,6 +36,7 @@ const ( providerField = "provider" consumerField = "consumer" + qpsField = "qps" requestsField = "requests" rtField = "rt" @@ -48,6 +49,7 @@ const ( lastField = "last" totalField = "total" + aggregateField = "aggregate" processingField = "processing" succeedField = "succeed" ) diff --git a/metrics/prometheus/metric_set.go b/metrics/prometheus/metric_set.go index 6c661c7a9a..5d3caf2736 100644 --- a/metrics/prometheus/metric_set.go +++ b/metrics/prometheus/metric_set.go @@ -43,15 +43,19 @@ func (ms *metricSet) init(reporterConfig *metrics.ReporterConfig) { } type rpcCommonMetrics struct { - requestsTotal *prometheus.CounterVec - requestsProcessingTotal *prometheus.GaugeVec - requestsSucceedTotal *prometheus.CounterVec - rtMillisecondsMin *GaugeVecWithSyncMap - rtMillisecondsMax *GaugeVecWithSyncMap - rtMillisecondsSum *prometheus.CounterVec - rtMillisecondsAvg *GaugeVecWithSyncMap - rtMillisecondsLast *prometheus.GaugeVec - rtMillisecondsQuantiles *quantileGaugeVec + qpsTotal *qpsGaugeVec + requestsTotal *prometheus.CounterVec + requestsTotalAggregate *aggregateCounterGaugeVec + requestsProcessingTotal *prometheus.GaugeVec + requestsSucceedTotal *prometheus.CounterVec + requestsSucceedTotalAggregate *aggregateCounterGaugeVec + rtMillisecondsMin *GaugeVecWithSyncMap + rtMillisecondsMax *GaugeVecWithSyncMap + rtMillisecondsSum *prometheus.CounterVec + rtMillisecondsAvg *GaugeVecWithSyncMap + rtMillisecondsLast *prometheus.GaugeVec + rtMillisecondsQuantiles *quantileGaugeVec + rtMillisecondsAggregate *aggregateFunctionsGaugeVec } type providerMetrics struct { @@ -59,15 +63,25 @@ type providerMetrics struct { } func (pm *providerMetrics) init(reporterConfig *metrics.ReporterConfig) { + pm.qpsTotal = newQpsGaugeVec(buildMetricsName(providerField, qpsField, totalField), reporterConfig.Namespace, labelNames) pm.requestsTotal = newAutoCounterVec(buildMetricsName(providerField, requestsField, totalField), reporterConfig.Namespace, labelNames) + pm.requestsTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(providerField, requestsField, totalField, aggregateField), reporterConfig.Namespace, labelNames) pm.requestsProcessingTotal = newAutoGaugeVec(buildMetricsName(providerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames) pm.requestsSucceedTotal = newAutoCounterVec(buildMetricsName(providerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames) + pm.requestsSucceedTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(providerField, requestsField, succeedField, totalField, aggregateField), reporterConfig.Namespace, labelNames) pm.rtMillisecondsMin = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, minField), reporterConfig.Namespace, labelNames) pm.rtMillisecondsMax = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, maxField), reporterConfig.Namespace, labelNames) pm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(providerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames) pm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames) pm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(providerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames) pm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(providerField, quantiles), reporterConfig.Namespace, labelNames, quantiles) + pm.rtMillisecondsAggregate = newAggregateFunctionsGaugeVec( + buildMetricsName(providerField, rtField, minField, milliSecondsField, aggregateField), + buildMetricsName(providerField, rtField, maxField, milliSecondsField, aggregateField), + buildMetricsName(providerField, rtField, avgField, milliSecondsField, aggregateField), + reporterConfig.Namespace, + labelNames, + ) } type consumerMetrics struct { @@ -75,15 +89,25 @@ type consumerMetrics struct { } func (cm *consumerMetrics) init(reporterConfig *metrics.ReporterConfig) { + cm.qpsTotal = newQpsGaugeVec(buildMetricsName(consumerField, qpsField, totalField), reporterConfig.Namespace, labelNames) cm.requestsTotal = newAutoCounterVec(buildMetricsName(consumerField, requestsField, totalField), reporterConfig.Namespace, labelNames) + cm.requestsTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(consumerField, requestsField, totalField, aggregateField), reporterConfig.Namespace, labelNames) cm.requestsProcessingTotal = newAutoGaugeVec(buildMetricsName(consumerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames) cm.requestsSucceedTotal = newAutoCounterVec(buildMetricsName(consumerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames) + cm.requestsSucceedTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(consumerField, requestsField, succeedField, totalField, aggregateField), reporterConfig.Namespace, labelNames) cm.rtMillisecondsMin = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, minField), reporterConfig.Namespace, labelNames) cm.rtMillisecondsMax = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, maxField), reporterConfig.Namespace, labelNames) cm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(consumerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames) cm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames) cm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(consumerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames) cm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(consumerField, quantiles), reporterConfig.Namespace, labelNames, quantiles) + cm.rtMillisecondsAggregate = newAggregateFunctionsGaugeVec( + buildMetricsName(consumerField, rtField, minField, milliSecondsField, aggregateField), + buildMetricsName(consumerField, rtField, maxField, milliSecondsField, aggregateField), + buildMetricsName(consumerField, rtField, avgField, milliSecondsField, aggregateField), + reporterConfig.Namespace, + labelNames, + ) } // buildMetricsName builds metrics name split by "_". diff --git a/metrics/prometheus/model.go b/metrics/prometheus/model.go index 7ee94f27e9..3efd2149ce 100644 --- a/metrics/prometheus/model.go +++ b/metrics/prometheus/model.go @@ -298,3 +298,92 @@ func (gv *quantileGaugeVec) updateQuantile(labels *prometheus.Labels, curValue i updateFunc(cur) } } + +type qpsGaugeVec struct { + gaugeVec *prometheus.GaugeVec + syncMap *sync.Map // key: labels string, value: TimeWindowCounter +} + +func newQpsGaugeVec(name, namespace string, labels []string) *qpsGaugeVec { + return &qpsGaugeVec{ + gaugeVec: newAutoGaugeVec(name, namespace, labels), + syncMap: &sync.Map{}, + } +} + +func (gv *qpsGaugeVec) updateQps(labels *prometheus.Labels) { + key := convertLabelsToMapKey(*labels) + cur := aggregate.NewTimeWindowCounter(10, 120) + cur.Inc() + + if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded { + store := actual.(*aggregate.TimeWindowCounter) + store.Inc() + gv.gaugeVec.With(*labels).Set(store.Count() / float64(store.LivedSeconds())) + } else { + gv.gaugeVec.With(*labels).Set(cur.Count() / float64(cur.LivedSeconds())) + } +} + +type aggregateCounterGaugeVec struct { + gaugeVec *prometheus.GaugeVec + syncMap *sync.Map // key: labels string, value: TimeWindowCounter +} + +func newAggregateCounterGaugeVec(name, namespace string, labels []string) *aggregateCounterGaugeVec { + return &aggregateCounterGaugeVec{ + gaugeVec: newAutoGaugeVec(name, namespace, labels), + syncMap: &sync.Map{}, + } +} + +func (gv *aggregateCounterGaugeVec) inc(labels *prometheus.Labels) { + key := convertLabelsToMapKey(*labels) + cur := aggregate.NewTimeWindowCounter(10, 120) + cur.Inc() + + if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded { + store := actual.(*aggregate.TimeWindowCounter) + store.Inc() + gv.gaugeVec.With(*labels).Set(store.Count()) + } else { + gv.gaugeVec.With(*labels).Set(cur.Count()) + } +} + +type aggregateFunctionsGaugeVec struct { + min *prometheus.GaugeVec + max *prometheus.GaugeVec + avg *prometheus.GaugeVec + syncMap *sync.Map // key: labels string, value: TimeWindowAggregator +} + +func newAggregateFunctionsGaugeVec(minName, maxName, avgName, namespace string, labels []string) *aggregateFunctionsGaugeVec { + return &aggregateFunctionsGaugeVec{ + min: newAutoGaugeVec(minName, namespace, labels), + max: newAutoGaugeVec(maxName, namespace, labels), + avg: newAutoGaugeVec(avgName, namespace, labels), + syncMap: &sync.Map{}, + } +} + +func (gv *aggregateFunctionsGaugeVec) update(labels *prometheus.Labels, curValue int64) { + key := convertLabelsToMapKey(*labels) + cur := aggregate.NewTimeWindowAggregator(10, 120) + cur.Add(float64(curValue)) + + updateFunc := func(aggregator *aggregate.TimeWindowAggregator) { + result := aggregator.Result() + gv.min.With(*labels).Set(result.Min) + gv.max.With(*labels).Set(result.Max) + gv.avg.With(*labels).Set(result.Avg) + } + + if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded { + store := actual.(*aggregate.TimeWindowAggregator) + store.Add(float64(curValue)) + updateFunc(store) + } else { + updateFunc(cur) + } +} diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go index c123df33d1..715fd97ca8 100644 --- a/metrics/prometheus/reporter.go +++ b/metrics/prometheus/reporter.go @@ -116,7 +116,7 @@ func (reporter *PrometheusReporter) ReportBeforeInvocation(ctx context.Context, return } labels := buildLabels(url) - + reporter.incQpsTotal(role, &labels) reporter.incRequestsProcessingTotal(role, &labels) } @@ -142,12 +142,23 @@ func (reporter *PrometheusReporter) ReportAfterInvocation(ctx context.Context, i } } +func (reporter *PrometheusReporter) incQpsTotal(role string, labels *prometheus.Labels) { + switch role { + case providerField: + reporter.provider.qpsTotal.updateQps(labels) + case consumerField: + reporter.consumer.qpsTotal.updateQps(labels) + } +} + func (reporter *PrometheusReporter) incRequestsTotal(role string, labels *prometheus.Labels) { switch role { case providerField: reporter.provider.requestsTotal.With(*labels).Inc() + reporter.provider.requestsTotalAggregate.inc(labels) case consumerField: reporter.consumer.requestsTotal.With(*labels).Inc() + reporter.consumer.requestsTotalAggregate.inc(labels) } } @@ -173,8 +184,10 @@ func (reporter *PrometheusReporter) incRequestsSucceedTotal(role string, labels switch role { case providerField: reporter.provider.requestsSucceedTotal.With(*labels).Inc() + reporter.provider.requestsSucceedTotalAggregate.inc(labels) case consumerField: reporter.consumer.requestsSucceedTotal.With(*labels).Inc() + reporter.consumer.requestsSucceedTotalAggregate.inc(labels) } } @@ -187,6 +200,7 @@ func (reporter *PrometheusReporter) reportRTMilliseconds(role string, labels *pr go reporter.provider.rtMillisecondsMax.updateMax(labels, costMs) go reporter.provider.rtMillisecondsAvg.updateAvg(labels, costMs) go reporter.provider.rtMillisecondsQuantiles.updateQuantile(labels, costMs) + go reporter.provider.rtMillisecondsAggregate.update(labels, costMs) case consumerField: go reporter.consumer.rtMillisecondsLast.With(*labels).Set(float64(costMs)) go reporter.consumer.rtMillisecondsSum.With(*labels).Add(float64(costMs)) @@ -194,5 +208,6 @@ func (reporter *PrometheusReporter) reportRTMilliseconds(role string, labels *pr go reporter.consumer.rtMillisecondsMax.updateMax(labels, costMs) go reporter.consumer.rtMillisecondsAvg.updateAvg(labels, costMs) go reporter.consumer.rtMillisecondsQuantiles.updateQuantile(labels, costMs) + go reporter.consumer.rtMillisecondsAggregate.update(labels, costMs) } }