diff --git a/common/constant/key.go b/common/constant/key.go index 383b57e57c..0db06c01df 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -412,6 +412,7 @@ const ( // metrics key const ( + MetricsRpc = "dubbo.metrics.rpc" MetricsRegistry = "dubbo.metrics.registry" MetricsMetadata = "dubbo.metrics.metadata" MetricApp = "dubbo.metrics.app" diff --git a/common/extension/metrics_test.go b/common/extension/metrics_test.go index 573c4dd812..64f76422ca 100644 --- a/common/extension/metrics_test.go +++ b/common/extension/metrics_test.go @@ -18,9 +18,7 @@ package extension import ( - "context" "testing" - "time" ) import ( @@ -29,7 +27,6 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/metrics" - "dubbo.apache.org/dubbo-go/v3/protocol" ) func TestGetMetricReporter(t *testing.T) { @@ -45,8 +42,8 @@ func TestGetMetricReporter(t *testing.T) { type mockReporter struct{} // implement the interface of Reporter -func (m *mockReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) { +func (m *mockReporter) StartServer(config *metrics.ReporterConfig) { } -func (m *mockReporter) ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) { +func (m *mockReporter) ShutdownServer() { } diff --git a/filter/metrics/filter.go b/filter/metrics/filter.go index bf59b514c7..0b623d1028 100644 --- a/filter/metrics/filter.go +++ b/filter/metrics/filter.go @@ -15,7 +15,6 @@ * limitations under the License. */ -// Package metrics provides metrics collection filter. package metrics import ( @@ -28,56 +27,43 @@ import ( "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/filter" "dubbo.apache.org/dubbo-go/v3/metrics" + "dubbo.apache.org/dubbo-go/v3/metrics/rpc" "dubbo.apache.org/dubbo-go/v3/protocol" ) // must initialize before using the filter and after loading configuration -var metricFilterInstance *Filter +var metricFilterInstance *metricsFilter func init() { extension.SetFilter(constant.MetricsFilterKey, newFilter) } -// Filter will calculate the invocation's duration and the report to the reporters -// more info please take a look at dubbo-samples projects -type Filter struct { - reporters []metrics.Reporter -} +// metricsFilter will report RPC metrics to the metrics bus and implements the filter.Filter interface +type metricsFilter struct{} -// Invoke collect the duration of invocation and then report the duration by using goroutine -func (p *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - go func() { - for _, reporter := range p.reporters { - reporter.ReportBeforeInvocation(ctx, invoker, invocation) - } - }() +// Invoke publish the BeforeInvokeEvent and AfterInvokeEvent to metrics bus +func (mf *metricsFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + metrics.Publish(rpc.NewBeforeInvokeEvent(invoker, invocation)) start := time.Now() res := invoker.Invoke(ctx, invocation) end := time.Now() duration := end.Sub(start) - go func() { - for _, reporter := range p.reporters { - reporter.ReportAfterInvocation(ctx, invoker, invocation, duration, res) - } - }() + metrics.Publish(rpc.NewAfterInvokeEvent(invoker, invocation, duration, res)) return res } // OnResponse do nothing and return the result -func (p *Filter) OnResponse(ctx context.Context, res protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (mf *metricsFilter) OnResponse(ctx context.Context, res protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return res } -// newFilter the Filter is singleton. -// it's lazy initialization -// make sure that the configuration had been loaded before invoking this method. +// newFilter creates a new metricsFilter instance. +// +// It's lazy initialization, +// and make sure that the configuration had been loaded before invoking this method. func newFilter() filter.Filter { if metricFilterInstance == nil { - reporters := make([]metrics.Reporter, 0, 1) - reporters = append(reporters, extension.GetMetricReporter("prometheus", metrics.NewReporterConfig())) - metricFilterInstance = &Filter{ - reporters: reporters, - } + metricFilterInstance = &metricsFilter{} } return metricFilterInstance } diff --git a/filter/metrics/filter_test.go b/filter/metrics/filter_test.go index fd5614a943..f1165202e4 100644 --- a/filter/metrics/filter_test.go +++ b/filter/metrics/filter_test.go @@ -19,33 +19,25 @@ package metrics import ( "context" - "sync" "testing" - "time" ) import ( "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" ) import ( "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/metrics" - _ "dubbo.apache.org/dubbo-go/v3/metrics/prometheus" "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/protocol/invocation" ) func TestMetricsFilterInvoke(t *testing.T) { - // prepare the mock reporter - mk := &mockReporter{} - extension.SetMetricReporter("mock", func(config *metrics.ReporterConfig) metrics.Reporter { - return mk - }) - - instance := newFilter() + mockChan := make(chan metrics.MetricsEvent, 10) + defer close(mockChan) + metrics.Subscribe(constant.MetricsRpc, mockChan) url, _ := common.NewURL( "dubbo://:20000/UserProvider?app.version=0.0.1&application=BDTService&bean.name=UserProvider" + @@ -57,31 +49,14 @@ func TestMetricsFilterInvoke(t *testing.T) { attach := make(map[string]interface{}, 10) inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) - ctx := context.Background() - mk.On("Report", ctx, invoker, inv).Return(true, nil) - - mk.wg.Add(1) - result := instance.Invoke(ctx, invoker, inv) + filter := newFilter() + result := filter.Invoke(ctx, invoker, inv) assert.NotNil(t, result) - mk.AssertNotCalled(t, "Report", 1) - // it will do nothing - result = instance.OnResponse(ctx, nil, invoker, inv) + result = filter.OnResponse(ctx, nil, invoker, inv) assert.Nil(t, result) -} - -type mockReporter struct { - mock.Mock - wg sync.WaitGroup -} - -func (m *mockReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) { - m.Called(ctx, invoker, invocation) - m.wg.Done() -} -func (m *mockReporter) ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) { - m.Called(ctx, invoker, invocation) - m.wg.Done() + assert.Equal(t, 2, len(mockChan)) + assert.Equal(t, constant.MetricsRpc, (<-mockChan).Type()) } diff --git a/metrics/api.go b/metrics/api.go index 4c19e63cfd..d0c7f729ea 100644 --- a/metrics/api.go +++ b/metrics/api.go @@ -17,6 +17,25 @@ package metrics +import ( + "encoding/json" + "sync" +) + +import ( + "github.com/dubbogo/gost/log/logger" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate" +) + +const ( + DefaultCompression = 100 + DefaultBucketNum = 10 + DefaultTimeWindowSeconds = 120 +) + var ( registries = make(map[string]func(*ReporterConfig) MetricRegistry) collectors = make([]CollectorFunc, 0) @@ -29,7 +48,7 @@ type CollectorFunc func(MetricRegistry, *ReporterConfig) // Init Metrics module func Init(config *ReporterConfig) { if config.Enable { - // defalut protocol is already set in metricConfig + // default protocol is already set in metricConfig regFunc, ok := registries[config.Protocol] if ok { registry = regFunc(config) @@ -46,8 +65,8 @@ func SetRegistry(name string, v func(*ReporterConfig) MetricRegistry) { registries[name] = v } -// AddCollector add more indicators, like metadata、sla、configcenter etc -func AddCollector(name string, fun func(MetricRegistry, *ReporterConfig)) { +// AddCollector add more indicators, like metadata, sla, config-center etc. +func AddCollector(name string, fun CollectorFunc) { collectors = append(collectors, fun) } @@ -75,7 +94,7 @@ type RtOpts struct { // } // Type metric type, save with micrometer -type Type uint8 +type Type uint8 // TODO check if Type is is useful const ( Counter Type = iota @@ -94,8 +113,8 @@ const ( type MetricId struct { Name string Desc string - Tags map[string]string - Type Type + Tags map[string]string // also named label + Type Type // TODO check if this field is useful } func (m *MetricId) TagKeys() []string { @@ -110,6 +129,11 @@ func NewMetricId(key *MetricKey, level MetricLevel) *MetricId { return &MetricId{Name: key.Name, Desc: key.Desc, Tags: level.Tags()} } +// NewMetricIdByLabels create a MetricId by key and labels +func NewMetricIdByLabels(key *MetricKey, labels map[string]string) *MetricId { + return &MetricId{Name: key.Name, Desc: key.Desc, Tags: labels} +} + // MetricSample a metric sample,This is the final data presentation, // not an intermediate result(like summary,histogram they will export to a set of MetricSample) type MetricSample struct { @@ -126,10 +150,10 @@ type CounterMetric interface { // GaugeMetric gauge metric type GaugeMetric interface { Set(float64) - // Inc() - // Dec() - // Add(float64) - // Sub(float64) + Inc() + Dec() + Add(float64) + Sub(float64) } // histogram summary rt metric @@ -149,3 +173,252 @@ func (c *BaseCollector) StateCount(total, succ, fail *MetricKey, level MetricLev c.R.Counter(NewMetricId(fail, level)).Inc() } } + +// CounterVec means a set of counters with the same metricKey but different labels +type CounterVec interface { + Inc(labels map[string]string) + Add(labels map[string]string, v float64) +} + +// NewCounterVec create a CounterVec default implementation. +func NewCounterVec(metricRegistry MetricRegistry, metricKey *MetricKey) CounterVec { + return &DefaultCounterVec{ + metricRegistry: metricRegistry, + metricKey: metricKey, + } +} + +// DefaultCounterVec is a default CounterVec implementation. +type DefaultCounterVec struct { + metricRegistry MetricRegistry + metricKey *MetricKey +} + +func (d *DefaultCounterVec) Inc(labels map[string]string) { + d.metricRegistry.Counter(NewMetricIdByLabels(d.metricKey, labels)).Inc() +} + +func (d *DefaultCounterVec) Add(labels map[string]string, v float64) { + d.metricRegistry.Counter(NewMetricIdByLabels(d.metricKey, labels)).Add(v) +} + +// GaugeVec means a set of gauges with the same metricKey but different labels +type GaugeVec interface { + Set(labels map[string]string, v float64) + Inc(labels map[string]string) + Dec(labels map[string]string) + Add(labels map[string]string, v float64) + Sub(labels map[string]string, v float64) +} + +// NewGaugeVec create a GaugeVec default implementation. +func NewGaugeVec(metricRegistry MetricRegistry, metricKey *MetricKey) GaugeVec { + return &DefaultGaugeVec{ + metricRegistry: metricRegistry, + metricKey: metricKey, + } +} + +// DefaultGaugeVec is a default GaugeVec implementation. +type DefaultGaugeVec struct { + metricRegistry MetricRegistry + metricKey *MetricKey +} + +func (d *DefaultGaugeVec) Set(labels map[string]string, v float64) { + d.metricRegistry.Gauge(NewMetricIdByLabels(d.metricKey, labels)).Set(v) +} + +func (d *DefaultGaugeVec) Inc(labels map[string]string) { + d.metricRegistry.Gauge(NewMetricIdByLabels(d.metricKey, labels)).Inc() +} + +func (d *DefaultGaugeVec) Dec(labels map[string]string) { + d.metricRegistry.Gauge(NewMetricIdByLabels(d.metricKey, labels)).Dec() +} + +func (d *DefaultGaugeVec) Add(labels map[string]string, v float64) { + d.metricRegistry.Gauge(NewMetricIdByLabels(d.metricKey, labels)).Add(v) +} + +func (d *DefaultGaugeVec) Sub(labels map[string]string, v float64) { + d.metricRegistry.Gauge(NewMetricIdByLabels(d.metricKey, labels)).Sub(v) +} + +// RtVec means a set of rt metrics with the same metricKey but different labels +type RtVec interface { + Record(labels map[string]string, v float64) +} + +// NewRtVec create a RtVec default implementation DefaultRtVec. +func NewRtVec(metricRegistry MetricRegistry, metricKey *MetricKey, rtOpts *RtOpts) RtVec { + return &DefaultRtVec{ + metricRegistry: metricRegistry, + metricKey: metricKey, + rtOpts: rtOpts, + } +} + +// DefaultRtVec is a default RtVec implementation. +// +// If rtOpts.Aggregate is true, it will use the aggregate.TimeWindowAggregator with local aggregation, +// else it will use the aggregate.Result without aggregation. +type DefaultRtVec struct { + metricRegistry MetricRegistry + metricKey *MetricKey + rtOpts *RtOpts +} + +func (d *DefaultRtVec) Record(labels map[string]string, v float64) { + d.metricRegistry.Rt(NewMetricIdByLabels(d.metricKey, labels), d.rtOpts).Observe(v) +} + +// labelsToString convert @labels to json format string for cache key +func labelsToString(labels map[string]string) string { + labelsJson, err := json.Marshal(labels) + if err != nil { + logger.Errorf("json.Marshal(labels) = error:%v", err) + return "" + } + return string(labelsJson) +} + +// QpsMetricVec means a set of qps metrics with the same metricKey but different labels. +type QpsMetricVec interface { + Record(labels map[string]string) +} + +func NewQpsMetricVec(metricRegistry MetricRegistry, metricKey *MetricKey) QpsMetricVec { + return &DefaultQpsMetricVec{ + metricRegistry: metricRegistry, + metricKey: metricKey, + mux: sync.RWMutex{}, + cache: make(map[string]*aggregate.TimeWindowCounter), + } +} + +// DefaultQpsMetricVec is a default QpsMetricVec implementation. +// +// It is concurrent safe, and it uses the aggregate.TimeWindowCounter to store and calculate the qps metrics. +type DefaultQpsMetricVec struct { + metricRegistry MetricRegistry + metricKey *MetricKey + mux sync.RWMutex + cache map[string]*aggregate.TimeWindowCounter // key: metrics labels, value: TimeWindowCounter +} + +func (d *DefaultQpsMetricVec) Record(labels map[string]string) { + key := labelsToString(labels) + if key == "" { + return + } + d.mux.RLock() + twc, ok := d.cache[key] + d.mux.RUnlock() + if !ok { + d.mux.Lock() + twc, ok = d.cache[key] + if !ok { + twc = aggregate.NewTimeWindowCounter(DefaultBucketNum, DefaultTimeWindowSeconds) + d.cache[key] = twc + } + d.mux.Unlock() + } + twc.Inc() + d.metricRegistry.Gauge(NewMetricIdByLabels(d.metricKey, labels)).Set(twc.Count() / float64(twc.LivedSeconds())) +} + +// AggregateCounterVec means a set of aggregate counter metrics with the same metricKey but different labels. +type AggregateCounterVec interface { + Inc(labels map[string]string) +} + +func NewAggregateCounterVec(metricRegistry MetricRegistry, metricKey *MetricKey) AggregateCounterVec { + return &DefaultAggregateCounterVec{ + metricRegistry: metricRegistry, + metricKey: metricKey, + mux: sync.RWMutex{}, + cache: make(map[string]*aggregate.TimeWindowCounter), + } +} + +// DefaultAggregateCounterVec is a default AggregateCounterVec implementation. +// +// It is concurrent safe, and it uses the aggregate.TimeWindowCounter to store and calculate the aggregate counter metrics. +type DefaultAggregateCounterVec struct { + metricRegistry MetricRegistry + metricKey *MetricKey + mux sync.RWMutex + cache map[string]*aggregate.TimeWindowCounter // key: metrics labels, value: TimeWindowCounter +} + +func (d *DefaultAggregateCounterVec) Inc(labels map[string]string) { + key := labelsToString(labels) + if key == "" { + return + } + d.mux.RLock() + twc, ok := d.cache[key] + d.mux.RUnlock() + if !ok { + d.mux.Lock() + twc, ok = d.cache[key] + if !ok { + twc = aggregate.NewTimeWindowCounter(DefaultBucketNum, DefaultTimeWindowSeconds) + d.cache[key] = twc + } + d.mux.Unlock() + } + twc.Inc() + d.metricRegistry.Gauge(NewMetricIdByLabels(d.metricKey, labels)).Set(twc.Count()) +} + +// QuantileMetricVec means a set of quantile metrics with the same metricKey but different labels. +type QuantileMetricVec interface { + Record(labels map[string]string, v float64) +} + +func NewQuantileMetricVec(metricRegistry MetricRegistry, metricKeys []*MetricKey, quantiles []float64) QuantileMetricVec { + return &DefaultQuantileMetricVec{ + metricRegistry: metricRegistry, + metricKeys: metricKeys, + mux: sync.RWMutex{}, + cache: make(map[string]*aggregate.TimeWindowQuantile), + quantiles: quantiles, + } +} + +// DefaultQuantileMetricVec is a default QuantileMetricVec implementation. +// +// It is concurrent safe, and it uses the aggregate.TimeWindowQuantile to store and calculate the quantile metrics. +type DefaultQuantileMetricVec struct { + metricRegistry MetricRegistry + metricKeys []*MetricKey + mux sync.RWMutex + cache map[string]*aggregate.TimeWindowQuantile // key: metrics labels, value: TimeWindowQuantile + quantiles []float64 +} + +func (d *DefaultQuantileMetricVec) Record(labels map[string]string, v float64) { + key := labelsToString(labels) + if key == "" { + return + } + d.mux.RLock() + twq, ok := d.cache[key] + d.mux.RUnlock() + if !ok { + d.mux.Lock() + twq, ok = d.cache[key] + if !ok { + twq = aggregate.NewTimeWindowQuantile(DefaultCompression, DefaultBucketNum, DefaultTimeWindowSeconds) + d.cache[key] = twq + } + d.mux.Unlock() + } + twq.Add(v) + + for i, q := range twq.Quantiles(d.quantiles) { + d.metricRegistry.Gauge(NewMetricIdByLabels(d.metricKeys[i], labels)).Set(q) + } +} diff --git a/metrics/bus_test.go b/metrics/bus_test.go index 185f50896f..4e4a68eecf 100644 --- a/metrics/bus_test.go +++ b/metrics/bus_test.go @@ -18,10 +18,13 @@ package metrics import ( - "github.com/stretchr/testify/assert" "testing" ) +import ( + "github.com/stretchr/testify/assert" +) + var mockChan = make(chan MetricsEvent, 16) type MockEvent struct { diff --git a/metrics/prometheus/api.go b/metrics/prometheus/api.go deleted file mode 100644 index 2d01afdca3..0000000000 --- a/metrics/prometheus/api.go +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package prometheus - -import ( - "sync" -) - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -type syncMaps struct { - userGauge sync.Map - userSummary sync.Map - userCounter sync.Map - userCounterVec sync.Map - userGaugeVec sync.Map - userSummaryVec sync.Map -} - -// setGauge set gauge to target value with given label, if label is not empty, set gauge vec -// if target gauge/gaugevec not exist, just create new gauge and set the value -func (reporter *PrometheusReporter) setGauge(gaugeName string, toSetValue float64, labelMap prometheus.Labels) { - if len(labelMap) == 0 { - // gauge - if val, exist := reporter.userGauge.Load(gaugeName); !exist { - gauge := newGauge(gaugeName, reporter.namespace) - err := prometheus.DefaultRegisterer.Register(gauge) - if err == nil { - reporter.userGauge.Store(gaugeName, gauge) - gauge.Set(toSetValue) - } else if are, ok := err.(prometheus.AlreadyRegisteredError); ok { - // A gauge for that metric has been registered before. - // Use the old gauge from now on. - are.ExistingCollector.(prometheus.Gauge).Set(toSetValue) - } - - } else { - val.(prometheus.Gauge).Set(toSetValue) - } - return - } - - // gauge vec - if val, exist := reporter.userGaugeVec.Load(gaugeName); !exist { - keyList := make([]string, 0) - for k := range labelMap { - keyList = append(keyList, k) - } - gaugeVec := newGaugeVec(gaugeName, reporter.namespace, keyList) - err := prometheus.DefaultRegisterer.Register(gaugeVec) - if err == nil { - reporter.userGaugeVec.Store(gaugeName, gaugeVec) - gaugeVec.With(labelMap).Set(toSetValue) - } else if are, ok := err.(prometheus.AlreadyRegisteredError); ok { - // A gauge for that metric has been registered before. - // Use the old gauge from now on. - are.ExistingCollector.(*prometheus.GaugeVec).With(labelMap).Set(toSetValue) - } - } else { - val.(*prometheus.GaugeVec).With(labelMap).Set(toSetValue) - } -} - -// incCounter inc counter to inc if label is not empty, set counter vec -// if target counter/counterVec not exist, just create new counter and inc the value -func (reporter *PrometheusReporter) incCounter(counterName string, labelMap prometheus.Labels) { - if len(labelMap) == 0 { - // counter - if val, exist := reporter.userCounter.Load(counterName); !exist { - counter := newCounter(counterName, reporter.namespace) - err := prometheus.DefaultRegisterer.Register(counter) - if err == nil { - reporter.userCounter.Store(counterName, counter) - counter.Inc() - } else if are, ok := err.(prometheus.AlreadyRegisteredError); ok { - // A counter for that metric has been registered before. - // Use the old counter from now on. - are.ExistingCollector.(prometheus.Counter).Inc() - } - } else { - val.(prometheus.Counter).Inc() - } - return - } - - // counter vec inc - if val, exist := reporter.userCounterVec.Load(counterName); !exist { - keyList := make([]string, 0) - for k := range labelMap { - keyList = append(keyList, k) - } - counterVec := newCounterVec(counterName, reporter.namespace, keyList) - err := prometheus.DefaultRegisterer.Register(counterVec) - if err == nil { - reporter.userCounterVec.Store(counterName, counterVec) - counterVec.With(labelMap).Inc() - } else if are, ok := err.(prometheus.AlreadyRegisteredError); ok { - // A counter for that metric has been registered before. - // Use the old counter from now on. - are.ExistingCollector.(*prometheus.CounterVec).With(labelMap).Inc() - } - } else { - val.(*prometheus.CounterVec).With(labelMap).Inc() - } -} - -// incSummary inc summary to target value with given label, if label is not empty, set summary vec -// if target summary/summaryVec not exist, just create new summary and set the value -func (reporter *PrometheusReporter) incSummary(summaryName string, toSetValue float64, labelMap prometheus.Labels) { - if len(labelMap) == 0 { - // summary - if val, exist := reporter.userSummary.Load(summaryName); !exist { - summary := newSummary(summaryName, reporter.namespace) - err := prometheus.DefaultRegisterer.Register(summary) - if err == nil { - reporter.userSummary.Store(summaryName, summary) - summary.Observe(toSetValue) - } else if are, ok := err.(prometheus.AlreadyRegisteredError); ok { - // A summary for that metric has been registered before. - // Use the old summary from now on. - are.ExistingCollector.(prometheus.Summary).Observe(toSetValue) - } - } else { - val.(prometheus.Summary).Observe(toSetValue) - } - return - } - - // summary vec - if val, exist := reporter.userSummaryVec.Load(summaryName); !exist { - keyList := make([]string, 0) - for k := range labelMap { - keyList = append(keyList, k) - } - summaryVec := newSummaryVec(summaryName, reporter.namespace, keyList, reporter.reporterConfig.SummaryMaxAge) - err := prometheus.DefaultRegisterer.Register(summaryVec) - if err == nil { - reporter.userSummaryVec.Store(summaryName, summaryVec) - summaryVec.With(labelMap).Observe(toSetValue) - } else if are, ok := err.(prometheus.AlreadyRegisteredError); ok { - // A summary for that metric has been registered before. - // Use the old summary from now on. - are.ExistingCollector.(*prometheus.SummaryVec).With(labelMap).Observe(toSetValue) - } - } else { - val.(*prometheus.SummaryVec).With(labelMap).Observe(toSetValue) - } -} - -func SetGaugeWithLabel(gaugeName string, val float64, label prometheus.Labels) { - if reporterInstance.reporterConfig.Enable { - reporterInstance.setGauge(gaugeName, val, label) - } -} - -func SetGauge(gaugeName string, val float64) { - if reporterInstance.reporterConfig.Enable { - reporterInstance.setGauge(gaugeName, val, make(prometheus.Labels)) - } -} - -func IncCounterWithLabel(counterName string, label prometheus.Labels) { - if reporterInstance.reporterConfig.Enable { - reporterInstance.incCounter(counterName, label) - } -} - -func IncCounter(summaryName string) { - if reporterInstance.reporterConfig.Enable { - reporterInstance.incCounter(summaryName, make(prometheus.Labels)) - } -} - -func IncSummaryWithLabel(counterName string, val float64, label prometheus.Labels) { - if reporterInstance.reporterConfig.Enable { - reporterInstance.incSummary(counterName, val, label) - } -} - -func IncSummary(summaryName string, val float64) { - if reporterInstance.reporterConfig.Enable { - reporterInstance.incSummary(summaryName, val, make(prometheus.Labels)) - } -} diff --git a/metrics/prometheus/metric_set.go b/metrics/prometheus/metric_set.go deleted file mode 100644 index 5d3caf2736..0000000000 --- a/metrics/prometheus/metric_set.go +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package prometheus - -import ( - "fmt" - "strconv" - "strings" -) - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/metrics" -) - -// metricSet is a set of metrics that are reported to prometheus in dubbo-go -type metricSet struct { - provider providerMetrics - consumer consumerMetrics -} - -func (ms *metricSet) init(reporterConfig *metrics.ReporterConfig) { - ms.provider.init(reporterConfig) - ms.consumer.init(reporterConfig) -} - -type rpcCommonMetrics struct { - qpsTotal *qpsGaugeVec - requestsTotal *prometheus.CounterVec - requestsTotalAggregate *aggregateCounterGaugeVec - requestsProcessingTotal *prometheus.GaugeVec - requestsSucceedTotal *prometheus.CounterVec - requestsSucceedTotalAggregate *aggregateCounterGaugeVec - rtMillisecondsMin *GaugeVecWithSyncMap - rtMillisecondsMax *GaugeVecWithSyncMap - rtMillisecondsSum *prometheus.CounterVec - rtMillisecondsAvg *GaugeVecWithSyncMap - rtMillisecondsLast *prometheus.GaugeVec - rtMillisecondsQuantiles *quantileGaugeVec - rtMillisecondsAggregate *aggregateFunctionsGaugeVec -} - -type providerMetrics struct { - rpcCommonMetrics -} - -func (pm *providerMetrics) init(reporterConfig *metrics.ReporterConfig) { - pm.qpsTotal = newQpsGaugeVec(buildMetricsName(providerField, qpsField, totalField), reporterConfig.Namespace, labelNames) - pm.requestsTotal = newAutoCounterVec(buildMetricsName(providerField, requestsField, totalField), reporterConfig.Namespace, labelNames) - pm.requestsTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(providerField, requestsField, totalField, aggregateField), reporterConfig.Namespace, labelNames) - pm.requestsProcessingTotal = newAutoGaugeVec(buildMetricsName(providerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames) - pm.requestsSucceedTotal = newAutoCounterVec(buildMetricsName(providerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames) - pm.requestsSucceedTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(providerField, requestsField, succeedField, totalField, aggregateField), reporterConfig.Namespace, labelNames) - pm.rtMillisecondsMin = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, minField), reporterConfig.Namespace, labelNames) - pm.rtMillisecondsMax = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, maxField), reporterConfig.Namespace, labelNames) - pm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(providerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames) - pm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames) - pm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(providerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames) - pm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(providerField, quantiles), reporterConfig.Namespace, labelNames, quantiles) - pm.rtMillisecondsAggregate = newAggregateFunctionsGaugeVec( - buildMetricsName(providerField, rtField, minField, milliSecondsField, aggregateField), - buildMetricsName(providerField, rtField, maxField, milliSecondsField, aggregateField), - buildMetricsName(providerField, rtField, avgField, milliSecondsField, aggregateField), - reporterConfig.Namespace, - labelNames, - ) -} - -type consumerMetrics struct { - rpcCommonMetrics -} - -func (cm *consumerMetrics) init(reporterConfig *metrics.ReporterConfig) { - cm.qpsTotal = newQpsGaugeVec(buildMetricsName(consumerField, qpsField, totalField), reporterConfig.Namespace, labelNames) - cm.requestsTotal = newAutoCounterVec(buildMetricsName(consumerField, requestsField, totalField), reporterConfig.Namespace, labelNames) - cm.requestsTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(consumerField, requestsField, totalField, aggregateField), reporterConfig.Namespace, labelNames) - cm.requestsProcessingTotal = newAutoGaugeVec(buildMetricsName(consumerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames) - cm.requestsSucceedTotal = newAutoCounterVec(buildMetricsName(consumerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames) - cm.requestsSucceedTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(consumerField, requestsField, succeedField, totalField, aggregateField), reporterConfig.Namespace, labelNames) - cm.rtMillisecondsMin = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, minField), reporterConfig.Namespace, labelNames) - cm.rtMillisecondsMax = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, maxField), reporterConfig.Namespace, labelNames) - cm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(consumerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames) - cm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames) - cm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(consumerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames) - cm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(consumerField, quantiles), reporterConfig.Namespace, labelNames, quantiles) - cm.rtMillisecondsAggregate = newAggregateFunctionsGaugeVec( - buildMetricsName(consumerField, rtField, minField, milliSecondsField, aggregateField), - buildMetricsName(consumerField, rtField, maxField, milliSecondsField, aggregateField), - buildMetricsName(consumerField, rtField, avgField, milliSecondsField, aggregateField), - reporterConfig.Namespace, - labelNames, - ) -} - -// buildMetricsName builds metrics name split by "_". -func buildMetricsName(args ...string) string { - sb := strings.Builder{} - for _, arg := range args { - sb.WriteString("_") - sb.WriteString(arg) - } - res := strings.TrimPrefix(sb.String(), "_") - return res -} - -// buildRTQuantilesMetricsNames is only used for building rt quantiles metric names. -func buildRTQuantilesMetricsNames(role string, quantiles []float64) []string { - res := make([]string, 0, len(quantiles)) - for _, q := range quantiles { - quantileField := fmt.Sprintf("p%v", strconv.FormatFloat(q*100, 'f', -1, 64)) - name := buildMetricsName(role, rtField, milliSecondsField, quantileField) - res = append(res, name) - } - return res -} diff --git a/metrics/prometheus/model.go b/metrics/prometheus/model.go deleted file mode 100644 index 3efd2149ce..0000000000 --- a/metrics/prometheus/model.go +++ /dev/null @@ -1,389 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package prometheus - -import ( - "strings" - "sync" - "sync/atomic" - "time" -) - -import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate" -) - -func newHistogramVec(name, namespace string, labels []string) *prometheus.HistogramVec { - return prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: namespace, - Name: name, - Buckets: defaultHistogramBucket, - }, - labels) -} - -func newCounter(name, namespace string) prometheus.Counter { - return prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: namespace, - Name: name, - }) -} - -func newCounterVec(name, namespace string, labels []string) *prometheus.CounterVec { - return prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: name, - Namespace: namespace, - }, labels) -} - -func newGauge(name, namespace string) prometheus.Gauge { - return prometheus.NewGauge( - prometheus.GaugeOpts{ - Name: name, - Namespace: namespace, - }) -} - -func newGaugeVec(name, namespace string, labels []string) *prometheus.GaugeVec { - return prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: name, - Namespace: namespace, - }, labels) -} - -func newSummary(name, namespace string) prometheus.Summary { - return prometheus.NewSummary( - prometheus.SummaryOpts{ - Name: name, - Namespace: namespace, - }) -} - -// newSummaryVec create SummaryVec, the Namespace is dubbo -// the objectives is from my experience. -func newSummaryVec(name, namespace string, labels []string, maxAge int64) *prometheus.SummaryVec { - return prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Namespace: namespace, - Name: name, - Objectives: map[float64]float64{ - 0.5: 0.01, - 0.75: 0.01, - 0.90: 0.005, - 0.98: 0.002, - 0.99: 0.001, - 0.999: 0.0001, - }, - MaxAge: time.Duration(maxAge), - }, - labels, - ) -} - -// create an auto register histogram vec -func newAutoHistogramVec(name, namespace string, labels []string) *prometheus.HistogramVec { - return promauto.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: namespace, - Name: name, - Buckets: defaultHistogramBucket, - }, - labels) -} - -// create an auto register counter vec -func newAutoCounterVec(name, namespace string, labels []string) *prometheus.CounterVec { - return promauto.NewCounterVec( - prometheus.CounterOpts{ - Name: name, - Namespace: namespace, - }, labels) -} - -// create an auto register gauge vec -func newAutoGaugeVec(name, namespace string, labels []string) *prometheus.GaugeVec { - return promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: name, - Namespace: namespace, - }, labels) -} - -// create an auto register summary vec -func newAutoSummaryVec(name, namespace string, labels []string, maxAge int64) *prometheus.SummaryVec { - return promauto.NewSummaryVec( - prometheus.SummaryOpts{ - Namespace: namespace, - Name: name, - Objectives: map[float64]float64{ - 0.5: 0.01, - 0.75: 0.01, - 0.90: 0.005, - 0.98: 0.002, - 0.99: 0.001, - 0.999: 0.0001, - }, - MaxAge: time.Duration(maxAge), - }, - labels, - ) -} - -type GaugeVecWithSyncMap struct { - GaugeVec *prometheus.GaugeVec - SyncMap *sync.Map // key: labels, value: *atomic.Value -} - -func newAutoGaugeVecWithSyncMap(name, namespace string, labels []string) *GaugeVecWithSyncMap { - return &GaugeVecWithSyncMap{ - GaugeVec: newAutoGaugeVec(name, namespace, labels), - SyncMap: &sync.Map{}, - } -} - -func convertLabelsToMapKey(labels prometheus.Labels) string { - return strings.Join([]string{ - labels[applicationNameKey], - labels[groupKey], - labels[hostnameKey], - labels[interfaceKey], - labels[ipKey], - labels[versionKey], - labels[methodKey], - }, "_") -} - -func (gv *GaugeVecWithSyncMap) updateMin(labels *prometheus.Labels, curValue int64) { - key := convertLabelsToMapKey(*labels) - cur := &atomic.Value{} // for first store - cur.Store(curValue) - for { - if actual, loaded := gv.SyncMap.LoadOrStore(key, cur); loaded { - store := actual.(*atomic.Value) - storeValue := store.Load().(int64) - if curValue < storeValue { - if store.CompareAndSwap(storeValue, curValue) { - // value is not changed, should update - gv.GaugeVec.With(*labels).Set(float64(curValue)) - break - } - // value has changed, continue for loop - } else { - // no need to update - break - } - } else { - // store current curValue as this labels' init value - gv.GaugeVec.With(*labels).Set(float64(curValue)) - break - } - } -} - -func (gv *GaugeVecWithSyncMap) updateMax(labels *prometheus.Labels, curValue int64) { - key := convertLabelsToMapKey(*labels) - cur := &atomic.Value{} // for first store - cur.Store(curValue) - for { - if actual, loaded := gv.SyncMap.LoadOrStore(key, cur); loaded { - store := actual.(*atomic.Value) - storeValue := store.Load().(int64) - if curValue > storeValue { - if store.CompareAndSwap(storeValue, curValue) { - // value is not changed, should update - gv.GaugeVec.With(*labels).Set(float64(curValue)) - break - } - // value has changed, continue for loop - } else { - // no need to update - break - } - } else { - // store current curValue as this labels' init value - gv.GaugeVec.With(*labels).Set(float64(curValue)) - break - } - } -} - -func (gv *GaugeVecWithSyncMap) updateAvg(labels *prometheus.Labels, curValue int64) { - key := convertLabelsToMapKey(*labels) - cur := &atomic.Value{} // for first store - type avgPair struct { - Sum int64 - N int64 - } - cur.Store(avgPair{Sum: curValue, N: 1}) - - for { - if actual, loaded := gv.SyncMap.LoadOrStore(key, cur); loaded { - store := actual.(*atomic.Value) - storeValue := store.Load().(avgPair) - newValue := avgPair{Sum: storeValue.Sum + curValue, N: storeValue.N + 1} - if store.CompareAndSwap(storeValue, newValue) { - // value is not changed, should update - gv.GaugeVec.With(*labels).Set(float64(newValue.Sum / newValue.N)) - break - } - } else { - // store current curValue as this labels' init value - gv.GaugeVec.With(*labels).Set(float64(curValue)) - break - } - } -} - -type quantileGaugeVec struct { - gaugeVecSlice []*prometheus.GaugeVec - quantiles []float64 - syncMap *sync.Map // key: labels string, value: TimeWindowQuantile -} - -// Notice: names and quantiles should be the same length and same order. -func newQuantileGaugeVec(names []string, namespace string, labels []string, quantiles []float64) *quantileGaugeVec { - gvs := make([]*prometheus.GaugeVec, len(names)) - for i, name := range names { - gvs[i] = newAutoGaugeVec(name, namespace, labels) - } - gv := &quantileGaugeVec{ - gaugeVecSlice: gvs, - quantiles: quantiles, - syncMap: &sync.Map{}, - } - return gv -} - -func (gv *quantileGaugeVec) updateQuantile(labels *prometheus.Labels, curValue int64) { - key := convertLabelsToMapKey(*labels) - cur := aggregate.NewTimeWindowQuantile(100, 10, 120) - cur.Add(float64(curValue)) - - updateFunc := func(td *aggregate.TimeWindowQuantile) { - qs := td.Quantiles(gv.quantiles) - for i, q := range qs { - gv.gaugeVecSlice[i].With(*labels).Set(q) - } - } - - if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded { - store := actual.(*aggregate.TimeWindowQuantile) - store.Add(float64(curValue)) - updateFunc(store) - } else { - updateFunc(cur) - } -} - -type qpsGaugeVec struct { - gaugeVec *prometheus.GaugeVec - syncMap *sync.Map // key: labels string, value: TimeWindowCounter -} - -func newQpsGaugeVec(name, namespace string, labels []string) *qpsGaugeVec { - return &qpsGaugeVec{ - gaugeVec: newAutoGaugeVec(name, namespace, labels), - syncMap: &sync.Map{}, - } -} - -func (gv *qpsGaugeVec) updateQps(labels *prometheus.Labels) { - key := convertLabelsToMapKey(*labels) - cur := aggregate.NewTimeWindowCounter(10, 120) - cur.Inc() - - if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded { - store := actual.(*aggregate.TimeWindowCounter) - store.Inc() - gv.gaugeVec.With(*labels).Set(store.Count() / float64(store.LivedSeconds())) - } else { - gv.gaugeVec.With(*labels).Set(cur.Count() / float64(cur.LivedSeconds())) - } -} - -type aggregateCounterGaugeVec struct { - gaugeVec *prometheus.GaugeVec - syncMap *sync.Map // key: labels string, value: TimeWindowCounter -} - -func newAggregateCounterGaugeVec(name, namespace string, labels []string) *aggregateCounterGaugeVec { - return &aggregateCounterGaugeVec{ - gaugeVec: newAutoGaugeVec(name, namespace, labels), - syncMap: &sync.Map{}, - } -} - -func (gv *aggregateCounterGaugeVec) inc(labels *prometheus.Labels) { - key := convertLabelsToMapKey(*labels) - cur := aggregate.NewTimeWindowCounter(10, 120) - cur.Inc() - - if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded { - store := actual.(*aggregate.TimeWindowCounter) - store.Inc() - gv.gaugeVec.With(*labels).Set(store.Count()) - } else { - gv.gaugeVec.With(*labels).Set(cur.Count()) - } -} - -type aggregateFunctionsGaugeVec struct { - min *prometheus.GaugeVec - max *prometheus.GaugeVec - avg *prometheus.GaugeVec - syncMap *sync.Map // key: labels string, value: TimeWindowAggregator -} - -func newAggregateFunctionsGaugeVec(minName, maxName, avgName, namespace string, labels []string) *aggregateFunctionsGaugeVec { - return &aggregateFunctionsGaugeVec{ - min: newAutoGaugeVec(minName, namespace, labels), - max: newAutoGaugeVec(maxName, namespace, labels), - avg: newAutoGaugeVec(avgName, namespace, labels), - syncMap: &sync.Map{}, - } -} - -func (gv *aggregateFunctionsGaugeVec) update(labels *prometheus.Labels, curValue int64) { - key := convertLabelsToMapKey(*labels) - cur := aggregate.NewTimeWindowAggregator(10, 120) - cur.Add(float64(curValue)) - - updateFunc := func(aggregator *aggregate.TimeWindowAggregator) { - result := aggregator.Result() - gv.min.With(*labels).Set(result.Min) - gv.max.With(*labels).Set(result.Max) - gv.avg.With(*labels).Set(result.Avg) - } - - if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded { - store := actual.(*aggregate.TimeWindowAggregator) - store.Add(float64(curValue)) - updateFunc(store) - } else { - updateFunc(cur) - } -} diff --git a/metrics/prometheus/registry.go b/metrics/prometheus/registry.go index 21ebc0d6d3..144a3a0599 100644 --- a/metrics/prometheus/registry.go +++ b/metrics/prometheus/registry.go @@ -95,8 +95,10 @@ func (p *promMetricRegistry) Summary(m *metrics.MetricId) metrics.ObservableMetr } func (p *promMetricRegistry) Rt(m *metrics.MetricId, opts *metrics.RtOpts) metrics.ObservableMetric { + key := m.Name var supplier func() interface{} if opts != nil && opts.Aggregate { + key += "_aggregate" supplier = func() interface{} { // TODO set default aggregate config from config return NewAggRtVec(&RtOpts{ @@ -114,7 +116,7 @@ func (p *promMetricRegistry) Rt(m *metrics.MetricId, opts *metrics.RtOpts) metri }, m.TagKeys()) } } - vec := p.getOrComputeVec(m.Name, supplier).(*RtVec) + vec := p.getOrComputeVec(key, supplier).(*RtVec) return vec.With(m.Tags) } diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go index 0e9fedc822..a5e88eff28 100644 --- a/metrics/prometheus/reporter.go +++ b/metrics/prometheus/reporter.go @@ -21,82 +21,78 @@ import ( "context" "net/http" "sync" - "time" ) import ( "github.com/dubbogo/gost/log/logger" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" ) import ( "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/metrics" - "dubbo.apache.org/dubbo-go/v3/protocol" ) var ( - reporterInstance *PrometheusReporter + reporterInstance *reporter reporterInitOnce sync.Once ) +const ( + reporterName = "prometheus" +) + // should initialize after loading configuration func init() { // newPrometheusReporter() extension.SetMetricReporter(reporterName, newPrometheusReporter) } -// PrometheusReporter will collect the data for Prometheus +// reporter will export the metrics to Prometheus // if you want to use this feature, you need to initialize your prometheus. // https://prometheus.io/docs/guides/go-application/ -type PrometheusReporter struct { +type reporter struct { reporterServer *http.Server reporterConfig *metrics.ReporterConfig - metricSet - syncMaps - namespace string + namespace string } -// newPrometheusReporter create new prometheusReporter -// it will register the metrics into prometheus +// newPrometheusReporter create a new prometheus server or push gateway reporter func newPrometheusReporter(reporterConfig *metrics.ReporterConfig) metrics.Reporter { if reporterInstance == nil { reporterInitOnce.Do(func() { - ms := &metricSet{} - ms.init(reporterConfig) - reporterInstance = &PrometheusReporter{ + reporterInstance = &reporter{ reporterConfig: reporterConfig, namespace: reporterConfig.Namespace, - metricSet: *ms, } }) } if reporterConfig.Enable { if reporterConfig.Mode == metrics.ReportModePull { - go reporterInstance.startupServer(reporterConfig) + go reporterInstance.StartServer(reporterConfig) } // todo pushgateway support } else { - reporterInstance.shutdownServer() + reporterInstance.ShutdownServer() } return reporterInstance } -func (reporter *PrometheusReporter) startupServer(reporterConfig *metrics.ReporterConfig) { +func (r *reporter) StartServer(reporterConfig *metrics.ReporterConfig) { // start server mux := http.NewServeMux() mux.Handle(reporterConfig.Path, promhttp.Handler()) reporterInstance.reporterServer = &http.Server{Addr: ":" + reporterConfig.Port, Handler: mux} + logger.Infof("new prometheus reporter with port = %s, path = %s", reporterConfig.Port, reporterConfig.Path) if err := reporterInstance.reporterServer.ListenAndServe(); err != nil { logger.Warnf("new prometheus reporter with error = %s", err) } } -func (reporter *PrometheusReporter) shutdownServer() { +func (r *reporter) ShutdownServer() { if reporterInstance.reporterServer != nil { err := reporterInstance.reporterServer.Shutdown(context.Background()) if err != nil { @@ -105,110 +101,3 @@ func (reporter *PrometheusReporter) shutdownServer() { } } } - -func (reporter *PrometheusReporter) ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) { - if !reporter.reporterConfig.Enable { - return - } - url := invoker.GetURL() - - role := getRole(url) - if role == "" { - return - } - labels := buildLabels(url) - reporter.incQpsTotal(role, &labels) - reporter.incRequestsProcessingTotal(role, &labels) -} - -func (reporter *PrometheusReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) { - if !reporter.reporterConfig.Enable { - return - } - url := invoker.GetURL() - - role := getRole(url) - if role == "" { - return - } - labels := buildLabels(url) - - reporter.incRequestsTotal(role, &labels) - reporter.decRequestsProcessingTotal(role, &labels) - reporter.reportRTMilliseconds(role, &labels, cost.Milliseconds()) - - if res != nil && res.Error() == nil { - // succeed - reporter.incRequestsSucceedTotal(role, &labels) - } -} - -func (reporter *PrometheusReporter) incQpsTotal(role string, labels *prometheus.Labels) { - switch role { - case providerField: - reporter.provider.qpsTotal.updateQps(labels) - case consumerField: - reporter.consumer.qpsTotal.updateQps(labels) - } -} - -func (reporter *PrometheusReporter) incRequestsTotal(role string, labels *prometheus.Labels) { - switch role { - case providerField: - reporter.provider.requestsTotal.With(*labels).Inc() - reporter.provider.requestsTotalAggregate.inc(labels) - case consumerField: - reporter.consumer.requestsTotal.With(*labels).Inc() - reporter.consumer.requestsTotalAggregate.inc(labels) - } -} - -func (reporter *PrometheusReporter) incRequestsProcessingTotal(role string, labels *prometheus.Labels) { - switch role { - case providerField: - reporter.provider.requestsProcessingTotal.With(*labels).Inc() - case consumerField: - reporter.consumer.requestsProcessingTotal.With(*labels).Inc() - } -} - -func (reporter *PrometheusReporter) decRequestsProcessingTotal(role string, labels *prometheus.Labels) { - switch role { - case providerField: - reporter.provider.requestsProcessingTotal.With(*labels).Dec() - case consumerField: - reporter.consumer.requestsProcessingTotal.With(*labels).Dec() - } -} - -func (reporter *PrometheusReporter) incRequestsSucceedTotal(role string, labels *prometheus.Labels) { - switch role { - case providerField: - reporter.provider.requestsSucceedTotal.With(*labels).Inc() - reporter.provider.requestsSucceedTotalAggregate.inc(labels) - case consumerField: - reporter.consumer.requestsSucceedTotal.With(*labels).Inc() - reporter.consumer.requestsSucceedTotalAggregate.inc(labels) - } -} - -func (reporter *PrometheusReporter) reportRTMilliseconds(role string, labels *prometheus.Labels, costMs int64) { - switch role { - case providerField: - go reporter.provider.rtMillisecondsLast.With(*labels).Set(float64(costMs)) - go reporter.provider.rtMillisecondsSum.With(*labels).Add(float64(costMs)) - go reporter.provider.rtMillisecondsMin.updateMin(labels, costMs) - go reporter.provider.rtMillisecondsMax.updateMax(labels, costMs) - go reporter.provider.rtMillisecondsAvg.updateAvg(labels, costMs) - go reporter.provider.rtMillisecondsQuantiles.updateQuantile(labels, costMs) - go reporter.provider.rtMillisecondsAggregate.update(labels, costMs) - case consumerField: - go reporter.consumer.rtMillisecondsLast.With(*labels).Set(float64(costMs)) - go reporter.consumer.rtMillisecondsSum.With(*labels).Add(float64(costMs)) - go reporter.consumer.rtMillisecondsMin.updateMin(labels, costMs) - go reporter.consumer.rtMillisecondsMax.updateMax(labels, costMs) - go reporter.consumer.rtMillisecondsAvg.updateAvg(labels, costMs) - go reporter.consumer.rtMillisecondsQuantiles.updateQuantile(labels, costMs) - go reporter.consumer.rtMillisecondsAggregate.update(labels, costMs) - } -} diff --git a/metrics/prometheus/reporter_test.go b/metrics/prometheus/reporter_test.go deleted file mode 100644 index b3ca11c360..0000000000 --- a/metrics/prometheus/reporter_test.go +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package prometheus - -import ( - "context" - "testing" - "time" -) - -import ( - "github.com/stretchr/testify/assert" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/common/extension" - "dubbo.apache.org/dubbo-go/v3/metrics" - "dubbo.apache.org/dubbo-go/v3/protocol" - "dubbo.apache.org/dubbo-go/v3/protocol/invocation" -) - -func TestPrometheusReporter_Report(t *testing.T) { - reporter := extension.GetMetricReporter(reporterName, metrics.NewReporterConfig()) - url, _ := common.NewURL( - "dubbo://:20000/UserProvider?app.version=0.0.1&application=BDTService&bean.name=UserProvider" + - "&cluster=failover&environment=dev&group=&interface=com.ikurento.user.UserProvider&loadbalance=random&methods.GetUser." + - "loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=" + - "BDTService&organization=ikurento.com&owner=ZX®istry.role=3&retries=&" + - "service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100") - invoker := protocol.NewBaseInvoker(url) - - attach := make(map[string]interface{}, 10) - inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) - - assert.False(t, isConsumer(url)) - ctx := context.Background() - reporter.ReportBeforeInvocation(ctx, invoker, inv) - reporter.ReportAfterInvocation(ctx, invoker, inv, 100*time.Millisecond, nil) - - // consumer side - url, _ = common.NewURL( - "dubbo://:20000/UserProvider?app.version=0.0.1&application=BDTService&bean.name=UserProvider" + - "&cluster=failover&environment=dev&group=&interface=com.ikurento.user.UserProvider&loadbalance=random&methods.GetUser." + - "loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=" + - "BDTService&organization=ikurento.com&owner=ZX®istry.role=0&retries=&" + - "service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100") - invoker = protocol.NewBaseInvoker(url) - reporter.ReportBeforeInvocation(ctx, invoker, inv) - reporter.ReportAfterInvocation(ctx, invoker, inv, 100*time.Millisecond, nil) - - // invalid role - url, _ = common.NewURL( - "dubbo://:20000/UserProvider?app.version=0.0.1&application=BDTService&bean.name=UserProvider" + - "&cluster=failover&environment=dev&group=&interface=com.ikurento.user.UserProvider&loadbalance=random&methods.GetUser." + - "loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=" + - "BDTService&organization=ikurento.com&owner=ZX®istry.role=9&retries=&" + - "service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100") - invoker = protocol.NewBaseInvoker(url) - reporter.ReportBeforeInvocation(ctx, invoker, inv) - reporter.ReportAfterInvocation(ctx, invoker, inv, 100*time.Millisecond, nil) -} diff --git a/metrics/reporter.go b/metrics/reporter.go index bf9693bb14..15ca7e09e9 100644 --- a/metrics/reporter.go +++ b/metrics/reporter.go @@ -17,15 +17,6 @@ package metrics -import ( - "context" - "time" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/protocol" -) - const DefMaxAge = 600000000000 type ReporterConfig struct { @@ -36,7 +27,7 @@ type ReporterConfig struct { Path string PushGatewayAddress string SummaryMaxAge int64 - Protocol string // MetricsRegistry 扩展配置 ,如:prometheus + Protocol string // exporters, like prometheus } type ReportMode string @@ -58,11 +49,8 @@ func NewReporterConfig() *ReporterConfig { } } -// Reporter is the interface which will be used to report the invocation's duration -// -// Report method reports the duration of an invocation. +// Reporter is an interface used to represent the backend of metrics to be exported type Reporter interface { - ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, - cost time.Duration, res protocol.Result) - ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) + StartServer(config *ReporterConfig) + ShutdownServer() } diff --git a/metrics/rpc/collector.go b/metrics/rpc/collector.go new file mode 100644 index 0000000000..f59779f6b9 --- /dev/null +++ b/metrics/rpc/collector.go @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rpc + +import ( + "github.com/dubbogo/gost/log/logger" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/metrics" +) + +var ( + rpcMetricsChan = make(chan metrics.MetricsEvent, 1024) +) + +// init will add the rpc collectorFunc to metrics.collectors slice, and lazy start the rpc collector goroutine +func init() { + var collectorFunc metrics.CollectorFunc + collectorFunc = func(registry metrics.MetricRegistry, c *metrics.ReporterConfig) { + rc := &rpcCollector{ + registry: registry, + metricSet: buildMetricSet(registry), + } + go rc.start() + } + + metrics.AddCollector("rpc", collectorFunc) +} + +// rpcCollector is a collector which will collect the rpc metrics +type rpcCollector struct { + registry metrics.MetricRegistry + metricSet *metricSet // metricSet is a struct which contains all metrics about rpc +} + +// start will subscribe the rpc.metricsEvent from channel rpcMetricsChan, and handle the event from the channel +func (c *rpcCollector) start() { + metrics.Subscribe(constant.MetricsRpc, rpcMetricsChan) + for event := range rpcMetricsChan { + if rpcEvent, ok := event.(*metricsEvent); ok { + switch rpcEvent.name { + case BeforeInvoke: + c.beforeInvokeHandler(rpcEvent) + case AfterInvoke: + c.afterInvokeHandler(rpcEvent) + default: + } + } else { + logger.Error("Bad metrics event found in RPC collector") + } + } +} + +func (c *rpcCollector) beforeInvokeHandler(event *metricsEvent) { + url := event.invoker.GetURL() + role := getRole(url) + + if role == "" { + return + } + labels := buildLabels(url, event.invocation) + c.recordQps(role, labels) + c.incRequestsProcessingTotal(role, labels) +} + +func (c *rpcCollector) afterInvokeHandler(event *metricsEvent) { + url := event.invoker.GetURL() + role := getRole(url) + + if role == "" { + return + } + labels := buildLabels(url, event.invocation) + c.incRequestsTotal(role, labels) + c.decRequestsProcessingTotal(role, labels) + if event.result != nil { + if event.result.Error() == nil { + c.incRequestsSucceedTotal(role, labels) + } + } + c.reportRTMilliseconds(role, labels, event.costTime.Milliseconds()) +} + +func (c *rpcCollector) recordQps(role string, labels map[string]string) { + switch role { + case providerField: + c.metricSet.provider.qpsTotal.Record(labels) + case consumerField: + c.metricSet.consumer.qpsTotal.Record(labels) + } +} + +func (c *rpcCollector) incRequestsTotal(role string, labels map[string]string) { + switch role { + case providerField: + c.metricSet.provider.requestsTotal.Inc(labels) + c.metricSet.provider.requestsTotalAggregate.Inc(labels) + case consumerField: + c.metricSet.consumer.requestsTotal.Inc(labels) + c.metricSet.consumer.requestsTotalAggregate.Inc(labels) + } +} + +func (c *rpcCollector) incRequestsProcessingTotal(role string, labels map[string]string) { + switch role { + case providerField: + c.metricSet.provider.requestsProcessingTotal.Inc(labels) + case consumerField: + c.metricSet.consumer.requestsProcessingTotal.Inc(labels) + } +} + +func (c *rpcCollector) decRequestsProcessingTotal(role string, labels map[string]string) { + switch role { + case providerField: + c.metricSet.provider.requestsProcessingTotal.Dec(labels) + case consumerField: + c.metricSet.consumer.requestsProcessingTotal.Dec(labels) + } +} + +func (c *rpcCollector) incRequestsSucceedTotal(role string, labels map[string]string) { + switch role { + case providerField: + c.metricSet.provider.requestsSucceedTotal.Inc(labels) + c.metricSet.provider.requestsSucceedTotalAggregate.Inc(labels) + case consumerField: + c.metricSet.consumer.requestsSucceedTotal.Inc(labels) + c.metricSet.consumer.requestsSucceedTotalAggregate.Inc(labels) + } +} + +func (c *rpcCollector) reportRTMilliseconds(role string, labels map[string]string, cost int64) { + switch role { + case providerField: + c.metricSet.provider.rtMilliseconds.Record(labels, float64(cost)) + c.metricSet.provider.rtMillisecondsAggregate.Record(labels, float64(cost)) + c.metricSet.provider.rtMillisecondsQuantiles.Record(labels, float64(cost)) + case consumerField: + c.metricSet.consumer.rtMilliseconds.Record(labels, float64(cost)) + c.metricSet.consumer.rtMillisecondsAggregate.Record(labels, float64(cost)) + c.metricSet.consumer.rtMillisecondsQuantiles.Record(labels, float64(cost)) + } +} diff --git a/metrics/prometheus/constant.go b/metrics/rpc/constant.go similarity index 73% rename from metrics/prometheus/constant.go rename to metrics/rpc/constant.go index 41904dba3e..450febef1b 100644 --- a/metrics/prometheus/constant.go +++ b/metrics/rpc/constant.go @@ -15,14 +15,13 @@ * limitations under the License. */ -package prometheus +package rpc import ( "dubbo.apache.org/dubbo-go/v3/common/constant" ) const ( - reporterName = "prometheus" applicationNameKey = constant.ApplicationNameKey groupKey = constant.GroupKey hostnameKey = constant.HostnameKey @@ -35,25 +34,4 @@ const ( const ( providerField = "provider" consumerField = "consumer" - - qpsField = "qps" - requestsField = "requests" - rtField = "rt" - - milliSecondsField = "milliseconds" - - minField = "min" - maxField = "max" - sumField = "sum" - avgField = "avg" - lastField = "last" - - totalField = "total" - aggregateField = "aggregate" - processingField = "processing" - succeedField = "succeed" -) - -var ( - quantiles = []float64{0.5, 0.9, 0.95, 0.99} ) diff --git a/metrics/rpc/event.go b/metrics/rpc/event.go new file mode 100644 index 0000000000..c56ecae539 --- /dev/null +++ b/metrics/rpc/event.go @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rpc + +import ( + "time" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/metrics" + "dubbo.apache.org/dubbo-go/v3/protocol" +) + +// metricsEvent is the event defined for rpc metrics +type metricsEvent struct { + name metricsName + invoker protocol.Invoker + invocation protocol.Invocation + costTime time.Duration + result protocol.Result +} + +// Type returns the type of the event, it is used for metrics bus to dispatch the event to rpc collector +func (m metricsEvent) Type() string { + return constant.MetricsRpc +} + +type metricsName uint8 + +const ( + BeforeInvoke metricsName = iota + AfterInvoke +) + +func NewBeforeInvokeEvent(invoker protocol.Invoker, invocation protocol.Invocation) metrics.MetricsEvent { + return &metricsEvent{ + name: BeforeInvoke, + invoker: invoker, + invocation: invocation, + } +} + +func NewAfterInvokeEvent(invoker protocol.Invoker, invocation protocol.Invocation, costTime time.Duration, result protocol.Result) metrics.MetricsEvent { + return &metricsEvent{ + name: AfterInvoke, + invoker: invoker, + invocation: invocation, + costTime: costTime, + result: result, + } +} diff --git a/metrics/rpc/metric_set.go b/metrics/rpc/metric_set.go new file mode 100644 index 0000000000..a27d439bc9 --- /dev/null +++ b/metrics/rpc/metric_set.go @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rpc + +import ( + "dubbo.apache.org/dubbo-go/v3/metrics" +) + +// metricSet is the metric set for rpc +type metricSet struct { + provider *providerMetrics + consumer *consumerMetrics +} + +type providerMetrics struct { + rpcCommonMetrics +} + +type consumerMetrics struct { + rpcCommonMetrics +} + +// rpcCommonMetrics is the common metrics for both provider and consumer +type rpcCommonMetrics struct { + qpsTotal metrics.QpsMetricVec + requestsTotal metrics.CounterVec + requestsTotalAggregate metrics.AggregateCounterVec + requestsProcessingTotal metrics.GaugeVec + requestsSucceedTotal metrics.CounterVec + requestsSucceedTotalAggregate metrics.AggregateCounterVec + rtMilliseconds metrics.RtVec + rtMillisecondsQuantiles metrics.QuantileMetricVec + rtMillisecondsAggregate metrics.RtVec +} + +// buildMetricSet will call init functions to initialize the metricSet +func buildMetricSet(registry metrics.MetricRegistry) *metricSet { + ms := &metricSet{ + provider: &providerMetrics{}, + consumer: &consumerMetrics{}, + } + ms.provider.init(registry) + ms.consumer.init(registry) + return ms +} + +func (pm *providerMetrics) init(registry metrics.MetricRegistry) { + pm.qpsTotal = metrics.NewQpsMetricVec(registry, metrics.NewMetricKey("dubbo_provider_qps_total", "The number of requests received by the provider per second")) + pm.requestsTotal = metrics.NewCounterVec(registry, metrics.NewMetricKey("dubbo_provider_requests_total", "The total number of received requests by the provider")) + pm.requestsTotalAggregate = metrics.NewAggregateCounterVec(registry, metrics.NewMetricKey("dubbo_provider_requests_total_aggregate", "The total number of received requests by the provider under the sliding window")) + pm.requestsProcessingTotal = metrics.NewGaugeVec(registry, metrics.NewMetricKey("dubbo_provider_requests_processing_total", "The number of received requests being processed by the provider")) + pm.requestsSucceedTotal = metrics.NewCounterVec(registry, metrics.NewMetricKey("dubbo_provider_requests_succeed_total", "The number of requests successfully received by the provider")) + pm.requestsSucceedTotalAggregate = metrics.NewAggregateCounterVec(registry, metrics.NewMetricKey("dubbo_provider_requests_succeed_total_aggregate", "The number of successful requests received by the provider under the sliding window")) + pm.rtMilliseconds = metrics.NewRtVec(registry, + metrics.NewMetricKey("dubbo_provider_rt_milliseconds", "response time among all requests processed by the provider"), + &metrics.RtOpts{Aggregate: false}, + ) + pm.rtMillisecondsAggregate = metrics.NewRtVec(registry, + metrics.NewMetricKey("dubbo_provider_rt_milliseconds", "response time of the provider under the sliding window"), + &metrics.RtOpts{Aggregate: true, BucketNum: metrics.DefaultBucketNum, TimeWindowSeconds: metrics.DefaultTimeWindowSeconds}, + ) + pm.rtMillisecondsQuantiles = metrics.NewQuantileMetricVec(registry, []*metrics.MetricKey{ + metrics.NewMetricKey("dubbo_provider_rt_milliseconds_p50", "The total response time spent by providers processing 50% of requests"), + metrics.NewMetricKey("dubbo_provider_rt_milliseconds_p90", "The total response time spent by providers processing 90% of requests"), + metrics.NewMetricKey("dubbo_provider_rt_milliseconds_p95", "The total response time spent by providers processing 95% of requests"), + metrics.NewMetricKey("dubbo_provider_rt_milliseconds_p99", "The total response time spent by providers processing 99% of requests"), + }, []float64{0.5, 0.9, 0.95, 0.99}) +} + +func (cm *consumerMetrics) init(registry metrics.MetricRegistry) { + cm.qpsTotal = metrics.NewQpsMetricVec(registry, metrics.NewMetricKey("dubbo_consumer_qps_total", "The number of requests sent by consumers per second")) + cm.requestsTotal = metrics.NewCounterVec(registry, metrics.NewMetricKey("dubbo_consumer_requests_total", "The total number of requests sent by consumers")) + cm.requestsTotalAggregate = metrics.NewAggregateCounterVec(registry, metrics.NewMetricKey("dubbo_consumer_requests_total_aggregate", "The total number of requests sent by consumers under the sliding window")) + cm.requestsProcessingTotal = metrics.NewGaugeVec(registry, metrics.NewMetricKey("dubbo_consumer_requests_processing_total", "The number of received requests being processed by the consumer")) + cm.requestsSucceedTotal = metrics.NewCounterVec(registry, metrics.NewMetricKey("dubbo_consumer_requests_succeed_total", "The number of successful requests sent by consumers")) + cm.requestsSucceedTotalAggregate = metrics.NewAggregateCounterVec(registry, metrics.NewMetricKey("dubbo_consumer_requests_succeed_total_aggregate", "The number of successful requests sent by consumers under the sliding window")) + cm.rtMilliseconds = metrics.NewRtVec(registry, + metrics.NewMetricKey("dubbo_consumer_rt_milliseconds", "response time among all requests from consumers"), + &metrics.RtOpts{Aggregate: false}, + ) + cm.rtMillisecondsAggregate = metrics.NewRtVec(registry, + metrics.NewMetricKey("dubbo_consumer_rt_milliseconds", "response time of the consumer under the sliding window"), + &metrics.RtOpts{Aggregate: true, BucketNum: metrics.DefaultBucketNum, TimeWindowSeconds: metrics.DefaultTimeWindowSeconds}, + ) + cm.rtMillisecondsQuantiles = metrics.NewQuantileMetricVec(registry, []*metrics.MetricKey{ + metrics.NewMetricKey("dubbo_consumer_rt_milliseconds_p50", "The total response time spent by consumers processing 50% of requests"), + metrics.NewMetricKey("dubbo_consumer_rt_milliseconds_p90", "The total response time spent by consumers processing 90% of requests"), + metrics.NewMetricKey("dubbo_consumer_rt_milliseconds_p95", "The total response time spent by consumers processing 95% of requests"), + metrics.NewMetricKey("dubbo_consumer_rt_milliseconds_p99", "The total response time spent by consumers processing 99% of requests"), + }, []float64{0.5, 0.9, 0.95, 0.99}) +} diff --git a/metrics/prometheus/util.go b/metrics/rpc/util.go similarity index 78% rename from metrics/prometheus/util.go rename to metrics/rpc/util.go index 29e5ebf3c9..7422b390f9 100644 --- a/metrics/prometheus/util.go +++ b/metrics/rpc/util.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package prometheus +package rpc import ( "strconv" @@ -24,33 +24,28 @@ import ( import ( "github.com/dubbogo/gost/log/logger" - - "github.com/prometheus/client_golang/prometheus" ) import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/protocol" ) -var ( - labelNames = []string{applicationNameKey, groupKey, hostnameKey, interfaceKey, ipKey, methodKey, versionKey} - defaultHistogramBucket = []float64{10, 50, 100, 200, 500, 1000, 10000} -) - -func buildLabels(url *common.URL) prometheus.Labels { - return prometheus.Labels{ +// buildLabels will build the labels for the rpc metrics +func buildLabels(url *common.URL, invocation protocol.Invocation) map[string]string { + return map[string]string{ applicationNameKey: url.GetParam(constant.ApplicationKey, ""), groupKey: url.Group(), - hostnameKey: "not implemented yet", + hostnameKey: common.GetLocalHostName(), interfaceKey: url.Service(), ipKey: common.GetLocalIp(), versionKey: url.GetParam(constant.AppVersionKey, ""), - methodKey: url.GetParam(constant.MethodKey, ""), + methodKey: invocation.MethodName(), } } -// return the role of the application, provider or consumer, if the url is not a valid one, return empty string +// getRole will get the application role from the url func getRole(url *common.URL) (role string) { if isProvider(url) { role = providerField diff --git a/metrics/util/aggregate/quantile_test.go b/metrics/util/aggregate/quantile_test.go index 5e82431661..d90d09652e 100644 --- a/metrics/util/aggregate/quantile_test.go +++ b/metrics/util/aggregate/quantile_test.go @@ -17,7 +17,9 @@ package aggregate -import "testing" +import ( + "testing" +) func TestAddAndQuantile(t1 *testing.T) { timeWindowQuantile := NewTimeWindowQuantile(100, 10, 1)