Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: migrate old RT metric impl to new RT impl #2390

Merged
merged 1 commit into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 7 additions & 104 deletions metrics/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@

package metrics

import (
"sync"
)

import (
"dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
)

var (
registries = make(map[string]func(*ReporterConfig) MetricRegistry)
collectors = make([]CollectorFunc, 0)
Expand Down Expand Up @@ -145,104 +137,15 @@ type ObservableMetric interface {
Observe(float64)
}

// StatesMetrics multi metrics,include total,success num, fail num,call MetricsRegistry save data
type StatesMetrics interface {
Success()
AddSuccess(float64)
Fail()
AddFailed(float64)
Inc(succ bool)
type BaseCollector struct {
R MetricRegistry
}

func NewStatesMetrics(total *MetricId, succ *MetricId, fail *MetricId, reg MetricRegistry) StatesMetrics {
return &DefaultStatesMetric{total: total, succ: succ, fail: fail, r: reg}
}

type DefaultStatesMetric struct {
r MetricRegistry
total, succ, fail *MetricId
}

func (c DefaultStatesMetric) Inc(succ bool) {
if succ {
c.Success()
func (c *BaseCollector) StateCount(total, succ, fail *MetricKey, level MetricLevel, succed bool) {
c.R.Counter(NewMetricId(total, level)).Inc()
if succed {
c.R.Counter(NewMetricId(succ, level)).Inc()
} else {
c.Fail()
}
}
func (c DefaultStatesMetric) Success() {
c.r.Counter(c.total).Inc()
c.r.Counter(c.succ).Inc()
}

func (c DefaultStatesMetric) AddSuccess(v float64) {
c.r.Counter(c.total).Add(v)
c.r.Counter(c.succ).Add(v)
}

func (c DefaultStatesMetric) Fail() {
c.r.Counter(c.total).Inc()
c.r.Counter(c.fail).Inc()
}

func (c DefaultStatesMetric) AddFailed(v float64) {
c.r.Counter(c.total).Add(v)
c.r.Counter(c.fail).Add(v)
}

// TimeMetric muliti metrics, include min(Gauge)、max(Gauge)、avg(Gauge)、sum(Gauge)、last(Gauge),call MetricRegistry to expose
// see dubbo-java org.apache.dubbo.metrics.aggregate.TimeWindowAggregator
type TimeMetric interface {
Record(float64)
}

const (
defaultBucketNum = 10
defaultTimeWindowSeconds = 120
)

// NewTimeMetric init and write all data to registry
func NewTimeMetric(min, max, avg, sum, last *MetricId, mr MetricRegistry) TimeMetric {
return &DefaultTimeMetric{r: mr, min: min, max: max, avg: avg, sum: sum, last: last,
agg: aggregate.NewTimeWindowAggregator(defaultBucketNum, defaultTimeWindowSeconds)}
}

type DefaultTimeMetric struct {
r MetricRegistry
agg *aggregate.TimeWindowAggregator
min, max, avg, sum, last *MetricId
}

func (m *DefaultTimeMetric) Record(v float64) {
m.agg.Add(v)
result := m.agg.Result()
m.r.Gauge(m.max).Set(result.Max)
m.r.Gauge(m.min).Set(result.Min)
m.r.Gauge(m.avg).Set(result.Avg)
m.r.Gauge(m.sum).Set(result.Total)
m.r.Gauge(m.last).Set(v)
}

// cache if needed, TimeMetrics must cached
var metricsCache map[string]interface{} = make(map[string]interface{})
var metricsCacheMutex sync.RWMutex

func ComputeIfAbsentCache(key string, supplier func() interface{}) interface{} {
metricsCacheMutex.RLock()
v, ok := metricsCache[key]
metricsCacheMutex.RUnlock()
if ok {
return v
} else {
metricsCacheMutex.Lock()
defer metricsCacheMutex.Unlock()
v, ok = metricsCache[key] // double check,avoid overwriting
if ok {
return v
} else {
n := supplier()
metricsCache[key] = n
return n
}
c.R.Counter(NewMetricId(fail, level)).Inc()
}
}
2 changes: 1 addition & 1 deletion metrics/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (m *ApplicationMetricLevel) Tags() map[string]string {
tags := make(map[string]string)
tags[constant.IpKey] = m.Ip
tags[constant.HostnameKey] = m.HostName
tags[constant.ApplicationKey] = m.ApplicationName
tags[constant.ApplicationNameKey] = m.ApplicationName
tags[constant.ApplicationVersionKey] = m.Version
tags[constant.GitCommitIdKey] = m.GitCommitId
return tags
Expand Down
59 changes: 13 additions & 46 deletions metrics/metadata/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ var ch = make(chan metrics.MetricsEvent, 10)

func init() {
metrics.AddCollector("metadata", func(mr metrics.MetricRegistry, rc *metrics.ReporterConfig) {
l := &MetadataMetricCollector{r: mr}
l := &MetadataMetricCollector{metrics.BaseCollector{R: mr}}
l.start()
})
}

type MetadataMetricCollector struct {
r metrics.MetricRegistry
metrics.BaseCollector
}

func (c *MetadataMetricCollector) start() {
Expand All @@ -63,59 +63,26 @@ func (c *MetadataMetricCollector) start() {
}

func (c *MetadataMetricCollector) handleMetadataPush(event *MetadataMetricEvent) {
m := metrics.ComputeIfAbsentCache(dubboMetadataPush, func() interface{} {
return newStatesMetricFunc(metadataPushNum, metadataPushNumSucceed, metadataPushNumFailed, metrics.GetApplicationLevel(), c.r)
}).(metrics.StatesMetrics)
m.Inc(event.Succ)
metric := metrics.ComputeIfAbsentCache(dubboPushRt, func() interface{} {
return newTimeMetrics(pushRtMin, pushRtMax, pushRtAvg, pushRtSum, pushRtLast, metrics.GetApplicationLevel(), c.r)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
level := metrics.GetApplicationLevel()
c.StateCount(metadataPushNum, metadataPushSucceed, metadataPushFailed, level, event.Succ)
c.R.Rt(metrics.NewMetricId(pushRt, level), &metrics.RtOpts{}).Observe(event.CostMs())
}

func (c *MetadataMetricCollector) handleMetadataSub(event *MetadataMetricEvent) {
m := metrics.ComputeIfAbsentCache(dubboMetadataSubscribe, func() interface{} {
return newStatesMetricFunc(metadataSubNum, metadataSubNumSucceed, metadataSubNumFailed, metrics.GetApplicationLevel(), c.r)
}).(metrics.StatesMetrics)
m.Inc(event.Succ)
metric := metrics.ComputeIfAbsentCache(dubboSubscribeRt, func() interface{} {
return newTimeMetrics(subscribeRtMin, subscribeRtMax, subscribeRtAvg, subscribeRtSum, subscribeRtLast, metrics.GetApplicationLevel(), c.r)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
level := metrics.GetApplicationLevel()
c.StateCount(metadataSubNum, metadataSubSucceed, metadataSubFailed, level, event.Succ)
c.R.Rt(metrics.NewMetricId(subscribeRt, level), &metrics.RtOpts{}).Observe(event.CostMs())
}

func (c *MetadataMetricCollector) handleStoreProvider(event *MetadataMetricEvent) {
interfaceName := event.Attachment[constant.InterfaceKey]
m := metrics.ComputeIfAbsentCache(dubboMetadataStoreProvider+":"+interfaceName, func() interface{} {
return newStatesMetricFunc(metadataStoreProvider, metadataStoreProviderSucceed, metadataStoreProviderFailed,
metrics.NewServiceMetric(interfaceName), c.r)
}).(metrics.StatesMetrics)
m.Inc(event.Succ)
metric := metrics.ComputeIfAbsentCache(dubboStoreProviderInterfaceRt+":"+interfaceName, func() interface{} {
return newTimeMetrics(storeProviderInterfaceRtMin, storeProviderInterfaceRtMax, storeProviderInterfaceRtAvg,
storeProviderInterfaceRtSum, storeProviderInterfaceRtLast, metrics.NewServiceMetric(interfaceName), c.r)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
level := metrics.NewServiceMetric(event.Attachment[constant.InterfaceKey])
c.StateCount(metadataStoreProviderNum, metadataStoreProviderSucceed, metadataStoreProviderFailed, level, event.Succ)
c.R.Rt(metrics.NewMetricId(storeProviderInterfaceRt, level), &metrics.RtOpts{}).Observe(event.CostMs())
}

func (c *MetadataMetricCollector) handleSubscribeService(event *MetadataMetricEvent) {
interfaceName := event.Attachment[constant.InterfaceKey]
metric := metrics.ComputeIfAbsentCache(dubboSubscribeServiceRt+":"+interfaceName, func() interface{} {
return newTimeMetrics(subscribeServiceRtMin, subscribeServiceRtMax, subscribeServiceRtAvg, subscribeServiceRtSum,
subscribeServiceRtLast, metrics.NewServiceMetric(interfaceName), c.r)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
}

func newStatesMetricFunc(total *metrics.MetricKey, succ *metrics.MetricKey, fail *metrics.MetricKey,
level metrics.MetricLevel, reg metrics.MetricRegistry) metrics.StatesMetrics {
return metrics.NewStatesMetrics(metrics.NewMetricId(total, level), metrics.NewMetricId(succ, level),
metrics.NewMetricId(fail, level), reg)
}

func newTimeMetrics(min, max, avg, sum, last *metrics.MetricKey, level metrics.MetricLevel, mr metrics.MetricRegistry) metrics.TimeMetric {
return metrics.NewTimeMetric(metrics.NewMetricId(min, level), metrics.NewMetricId(max, level), metrics.NewMetricId(avg, level),
metrics.NewMetricId(sum, level), metrics.NewMetricId(last, level), mr)
level := metrics.NewServiceMetric(event.Attachment[constant.InterfaceKey])
c.R.Rt(metrics.NewMetricId(subscribeServiceRt, level), &metrics.RtOpts{}).Observe(event.CostMs())
}

type MetadataMetricEvent struct {
Expand Down
43 changes: 11 additions & 32 deletions metrics/metadata/metric_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,21 @@ const (
totalSuffix = "_total"
succSuffix = "_succeed_total"
failedSuffix = "_failed_total"
sumSuffix = "_sum"
lastSuffix = "_last"
minSuffix = "_min"
maxSuffix = "_max"
avgSuffix = "_avg"
)

var (
// app level
metadataPushNum = metrics.NewMetricKey(dubboMetadataPush+totalSuffix, "Total Num")
metadataPushNumSucceed = metrics.NewMetricKey(dubboMetadataPush+succSuffix, "Succeed Push Num")
metadataPushNumFailed = metrics.NewMetricKey(dubboMetadataPush+failedSuffix, "Failed Push Num")
metadataPushNum = metrics.NewMetricKey(dubboMetadataPush+totalSuffix, "Total Num")
metadataPushSucceed = metrics.NewMetricKey(dubboMetadataPush+succSuffix, "Succeed Push Num")
metadataPushFailed = metrics.NewMetricKey(dubboMetadataPush+failedSuffix, "Failed Push Num")
// app level
metadataSubNum = metrics.NewMetricKey(dubboMetadataSubscribe+totalSuffix, "Total Metadata Subscribe Num")
metadataSubNumSucceed = metrics.NewMetricKey(dubboMetadataSubscribe+succSuffix, "Succeed Metadata Subscribe Num")
metadataSubNumFailed = metrics.NewMetricKey(dubboMetadataSubscribe+failedSuffix, "Failed Metadata Subscribe Num")
metadataSubNum = metrics.NewMetricKey(dubboMetadataSubscribe+totalSuffix, "Total Metadata Subscribe Num")
metadataSubSucceed = metrics.NewMetricKey(dubboMetadataSubscribe+succSuffix, "Succeed Metadata Subscribe Num")
metadataSubFailed = metrics.NewMetricKey(dubboMetadataSubscribe+failedSuffix, "Failed Metadata Subscribe Num")
// app level
pushRtSum = metrics.NewMetricKey(dubboPushRt+sumSuffix, "Sum Response Time")
pushRtLast = metrics.NewMetricKey(dubboPushRt+lastSuffix, "Last Response Time")
pushRtMin = metrics.NewMetricKey(dubboPushRt+minSuffix, "Min Response Time")
pushRtMax = metrics.NewMetricKey(dubboPushRt+maxSuffix, "Max Response Time")
pushRtAvg = metrics.NewMetricKey(dubboPushRt+avgSuffix, "Average Response Time")
pushRt = metrics.NewMetricKey(dubboPushRt, "Response Time")
// app level
subscribeRtSum = metrics.NewMetricKey(dubboSubscribeRt+sumSuffix, "Sum Response Time")
subscribeRtLast = metrics.NewMetricKey(dubboSubscribeRt+lastSuffix, "Last Response Time")
subscribeRtMin = metrics.NewMetricKey(dubboSubscribeRt+minSuffix, "Min Response Time")
subscribeRtMax = metrics.NewMetricKey(dubboSubscribeRt+maxSuffix, "Max Response Time")
subscribeRtAvg = metrics.NewMetricKey(dubboSubscribeRt+avgSuffix, "Average Response Time")
subscribeRt = metrics.NewMetricKey(dubboSubscribeRt, "Response Time")

/*
# HELP dubbo_metadata_store_provider_succeed_total Succeed Store Provider Metadata
Expand All @@ -85,7 +72,7 @@ var (
// service level
metadataStoreProviderFailed = metrics.NewMetricKey(dubboMetadataStoreProvider+failedSuffix, "Total Failed Provider Metadata Store")
metadataStoreProviderSucceed = metrics.NewMetricKey(dubboMetadataStoreProvider+succSuffix, "Total Succeed Provider Metadata Store")
metadataStoreProvider = metrics.NewMetricKey(dubboMetadataStoreProvider+totalSuffix, "Total Provider Metadata Store")
metadataStoreProviderNum = metrics.NewMetricKey(dubboMetadataStoreProvider+totalSuffix, "Total Provider Metadata Store")

/*
# HELP dubbo_store_provider_interface_rt_milliseconds_avg Average Response Time
Expand All @@ -94,15 +81,7 @@ var (
dubbo_store_provider_interface_rt_milliseconds_avg{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService2",ip="10.252.156.213",} 10837.0
*/
// service level
storeProviderInterfaceRtAvg = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+avgSuffix, "Average Store Provider Interface Time")
storeProviderInterfaceRtLast = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+lastSuffix, "Last Store Provider Interface Time")
storeProviderInterfaceRtMax = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+maxSuffix, "Max Store Provider Interface Time")
storeProviderInterfaceRtMin = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+minSuffix, "Min Store Provider Interface Time")
storeProviderInterfaceRtSum = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+sumSuffix, "Sum Store Provider Interface Time")
storeProviderInterfaceRt = metrics.NewMetricKey(dubboStoreProviderInterfaceRt, "Store Provider Interface Time")

subscribeServiceRtLast = metrics.NewMetricKey(dubboSubscribeServiceRt+lastSuffix, "Last Subscribe Service Time")
subscribeServiceRtMax = metrics.NewMetricKey(dubboSubscribeServiceRt+maxSuffix, "Max Subscribe Service Time")
subscribeServiceRtMin = metrics.NewMetricKey(dubboSubscribeServiceRt+minSuffix, "Min Subscribe Service Time")
subscribeServiceRtSum = metrics.NewMetricKey(dubboSubscribeServiceRt+sumSuffix, "Sum Subscribe Service Time")
subscribeServiceRtAvg = metrics.NewMetricKey(dubboSubscribeServiceRt+avgSuffix, "Average Subscribe Service Time")
subscribeServiceRt = metrics.NewMetricKey(dubboSubscribeServiceRt, "Subscribe Service Time")
)
Loading
Loading