diff --git a/common/host_util.go b/common/host_util.go index ba36c0c8a6..5c411ab4a9 100644 --- a/common/host_util.go +++ b/common/host_util.go @@ -23,6 +23,7 @@ import ( ) import ( + "github.com/dubbogo/gost/log/logger" gxnet "github.com/dubbogo/gost/net" ) @@ -31,6 +32,7 @@ import ( ) var localIp string +var localHostname string func GetLocalIp() string { if len(localIp) != 0 { @@ -40,6 +42,18 @@ func GetLocalIp() string { return localIp } +func GetLocalHostName() string { + if len(localHostname) != 0 { + return localHostname + } + hostname, err := os.Hostname() + if err != nil { + logger.Errorf("can not get local hostname") + } + localHostname = hostname + return localHostname +} + func HandleRegisterIPAndPort(url *URL) { // if developer define registry port and ip, use it first. if ipToRegistry := os.Getenv(constant.DubboIpToRegistryKey); len(ipToRegistry) > 0 { diff --git a/config/metric_config.go b/config/metric_config.go index b47d48a563..02bce7d5f9 100644 --- a/config/metric_config.go +++ b/config/metric_config.go @@ -22,9 +22,12 @@ import ( "github.com/dubbogo/gost/log/logger" + "github.com/pkg/errors" +) + +import ( "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/metrics" - "github.com/pkg/errors" ) // MetricConfig This is the config struct for all metrics implementation @@ -36,6 +39,7 @@ type MetricConfig struct { Path string `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"` PushGatewayAddress string `default:"" yaml:"push-gateway-address" json:"push-gateway-address,omitempty" property:"push-gateway-address"` SummaryMaxAge int64 `default:"600000000000" yaml:"summary-max-age" json:"summary-max-age,omitempty" property:"summary-max-age"` + Protocol string `default:"prometheus" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` } func (mc *MetricConfig) ToReporterConfig() *metrics.ReporterConfig { @@ -65,7 +69,9 @@ func (mc *MetricConfig) Init() error { if err := verify(mc); err != nil { return err } - extension.GetMetricReporter("prometheus", mc.ToReporterConfig()) + config := mc.ToReporterConfig() + extension.GetMetricReporter(mc.Protocol, config) + metrics.Init(config) return nil } @@ -88,7 +94,7 @@ func (mc *MetricConfig) DynamicUpdateProperties(newMetricConfig *MetricConfig) { mc.Enable = newMetricConfig.Enable logger.Infof("MetricConfig's Enable was dynamically updated, new value:%v", mc.Enable) - extension.GetMetricReporter("prometheus", mc.ToReporterConfig()) + extension.GetMetricReporter(mc.Protocol, mc.ToReporterConfig()) } } } diff --git a/imports/imports.go b/imports/imports.go index c5868969bd..71dd06b246 100644 --- a/imports/imports.go +++ b/imports/imports.go @@ -64,6 +64,7 @@ import ( _ "dubbo.apache.org/dubbo-go/v3/metadata/service/exporter/configurable" _ "dubbo.apache.org/dubbo-go/v3/metadata/service/local" _ "dubbo.apache.org/dubbo-go/v3/metadata/service/remote" + _ "dubbo.apache.org/dubbo-go/v3/metrics/app_info" _ "dubbo.apache.org/dubbo-go/v3/metrics/prometheus" _ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo" _ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3" diff --git a/metrics/api.go b/metrics/api.go new file mode 100644 index 0000000000..c9d57bed6d --- /dev/null +++ b/metrics/api.go @@ -0,0 +1,161 @@ +package metrics + +var registries = make(map[string]func(*ReporterConfig) MetricRegistry) +var collectors = make([]CollectorFunc, 0) +var registry MetricRegistry + +// CollectorFunc 各个指标处理模块扩展 +type CollectorFunc func(MetricRegistry, *ReporterConfig) + +// Init 整个 Metrics 模块初始化入口 +func Init(config *ReporterConfig) { + // config.extention = prometheus + regFunc, ok := registries[config.Protocol] + if !ok { + regFunc = registries["prometheus"] // default + } + registry = regFunc(config) + for _, co := range collectors { + co(registry, config) + } + registry.Export() +} + +// SetRegistry 扩展其他数据容器,暴露方式,内置 Prometheus 实现 +func SetRegistry(name string, v func(*ReporterConfig) MetricRegistry) { + registries[name] = v +} + +// AddCollector 扩展指标收集器,例如 metadata、耗时、配置中心等 +func AddCollector(name string, fun func(MetricRegistry, *ReporterConfig)) { + collectors = append(collectors, fun) +} + +// MetricRegistry 数据指标容器,指标计算、指标暴露、聚合 +type MetricRegistry interface { + Counter(*MetricId) CounterMetric // add or update a counter + Gauge(*MetricId) GaugeMetric // add or update a gauge + Histogram(*MetricId) HistogramMetric // add a metric num to a histogram + Summary(*MetricId) SummaryMetric // add a metric num to a summary + Export() // 数据暴露, 如 Prometheus 是 http 暴露 + // GetMetrics() []*MetricSample // 获取所有指标数据 + // GetMetricsString() (string, error) // 如需复用端口则加一下这个接口 +} + +// 组合暴露方式,参考 micrometer CompositeMeterRegistry +//type CompositeRegistry struct { +// rs []MetricRegistry +//} + +// Type 指标类型,暂定和 micrometer 一致 +type Type uint8 + +const ( + Counter Type = iota + Gauge + LongTaskTimer + Timer + DistributionSummary + Other +) + +// MetricId +// # HELP dubbo_metadata_store_provider_succeed_total Succeed Store Provider Metadata +// # TYPE dubbo_metadata_store_provider_succeed_total gauge +// dubbo_metadata_store_provider_succeed_total{application_name="provider",hostname="localhost",interface="org.example.DemoService",ip="10.252.156.213",} 1.0 +// 除值以外的其他属性 +type MetricId struct { + Name string + Desc string + Tags map[string]string + Type Type +} + +func (m *MetricId) TagKeys() []string { + keys := make([]string, 0, len(m.Tags)) + for k := range m.Tags { + keys = append(keys, k) + } + return keys +} + +// MetricSample 一个指标的完整定义,包含值,这是指标的最终呈现,不是中间值(如 summary,histogram 他们统计完后会导出为一组 MetricSample) +type MetricSample struct { + *MetricId + value float64 +} + +// CounterMetric 指标抽象接口 +type CounterMetric interface { + Inc() + Add(float64) +} + +// GaugeMetric 指标抽象接口 +type GaugeMetric interface { + Set(float64) + // Inc() + // Dec() + // Add(float64) + // Sub(float64) +} + +// HistogramMetric 指标抽象接口 +type HistogramMetric interface { + Record(float64) +} + +// SummaryMetric 指标抽象接口 +type SummaryMetric interface { + Record(float64) +} + +// StatesMetrics 综合指标,包括总数、成功数,失败数,调用 MetricsRegistry 实现最终暴露 +type StatesMetrics interface { + Success() + AddSuccess(float64) + Fail() + AddFailed(float64) +} + +func NewStatesMetrics(total *MetricId, succ *MetricId, fail *MetricId) StatesMetrics { + return &DefaultStatesMetric{total: total, succ: succ, fail: fail, r: registry} +} + +// TimeMetrics 综合指标, 包括 min(Gauge)、max(Gauge)、avg(Gauge)、sum(Gauge)、last(Gauge),调用 MetricRegistry 实现最终暴露 +// 参见 dubbo-java org.apache.dubbo.metrics.aggregate.TimeWindowAggregator 类实现 +type TimeMetrics interface { + Record(float64) +} + +// NewTimeMetrics init and write all data to registry +func NewTimeMetrics(min *MetricId, avg *MetricId, max *MetricId, last *MetricId, sum *MetricId) { + +} + +type DefaultStatesMetric struct { + r MetricRegistry + total *MetricId + succ *MetricId + fail *MetricId +} + +func (c DefaultStatesMetric) Success() { + c.r.Counter(c.total).Inc() + c.r.Counter(c.succ).Inc() +} + +func (c DefaultStatesMetric) AddSuccess(v float64) { + c.r.Counter(c.total).Add(v) + c.r.Counter(c.succ).Add(v) +} + +func (c DefaultStatesMetric) Fail() { + c.r.Counter(c.total).Inc() + c.r.Counter(c.fail).Inc() +} + +func (c DefaultStatesMetric) AddFailed(v float64) { + c.r.Counter(c.total).Add(v) + c.r.Counter(c.fail).Add(v) +} diff --git a/metrics/app_info/application_info.go b/metrics/app_info/application_info.go new file mode 100644 index 0000000000..8b95683cba --- /dev/null +++ b/metrics/app_info/application_info.go @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app_info + +import ( + "dubbo.apache.org/dubbo-go/v3/metrics" + "dubbo.apache.org/dubbo-go/v3/metrics/common" +) + +/* + * # HELP dubbo_application_info_total Total Application Info + * # TYPE dubbo_application_info_total counter + * dubbo_application_info_total{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="localhost",ip="127.0.0.1",} 1.0 + */ +var info = common.NewMetricKey("dubbo_application_info_total", "Total Application Info") // Total Application Info include application name、version etc + +func init() { + metrics.AddCollector("application_info", func(mr metrics.MetricRegistry, config *metrics.ReporterConfig) { + mr.Counter(&metrics.MetricId{Name: info.Name, Desc: info.Desc, Tags: common.NewApplicationLevel().Tags()}).Inc() + }) +} diff --git a/metrics/common/common.go b/metrics/common/common.go new file mode 100644 index 0000000000..c8acefebc4 --- /dev/null +++ b/metrics/common/common.go @@ -0,0 +1,101 @@ +package common + +import ( + "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/config" +) + +const ( + TagIp = "ip" + TagPid = "pid" + TagHostname = "hostname" + TagApplicationName = "application_name" + TagApplicationModule = "application_module_id" + TagInterfaceKey = "interface" + TagMethodKey = "method" + TagGroupKey = "group" + TagVersionKey = "version" + TagApplicationVersionKey = "application_version" + TagKeyKey = "key" + TagConfigCenter = "config_center" + TagChangeType = "change_type" + TagThreadName = "thread_pool_name" + TagGitCommitId = "git_commit_id" +) + +type MetricKey struct { + Name string + Desc string +} + +func NewMetricKey(name string, desc string) *MetricKey { + return &MetricKey{Name: name, Desc: desc} +} + +type MetricLevel interface { + Tags() map[string]string +} + +type ApplicationMetricLevel struct { + ApplicationName string + Version string + GitCommitId string + Ip string + HostName string +} + +var appLevel *ApplicationMetricLevel + +func NewApplicationLevel() *ApplicationMetricLevel { + if appLevel == nil { + var rootConfig = config.GetRootConfig() + appLevel = &ApplicationMetricLevel{ + ApplicationName: rootConfig.Application.Name, + Version: rootConfig.Application.Version, + Ip: common.GetLocalIp(), + HostName: common.GetLocalHostName(), + GitCommitId: "", + } + } + return appLevel +} + +func (m *ApplicationMetricLevel) Tags() map[string]string { + tags := make(map[string]string) + tags[TagIp] = m.Ip + tags[TagHostname] = m.HostName + tags[TagApplicationName] = m.ApplicationName + tags[TagApplicationVersionKey] = m.Version + tags[TagGitCommitId] = m.GitCommitId + return tags +} + +type ServiceMetricLevel struct { + *ApplicationMetricLevel + Interface string +} + +func NewServiceMetric(interfaceName string) *ServiceMetricLevel { + return &ServiceMetricLevel{ApplicationMetricLevel: NewApplicationLevel(), Interface: interfaceName} +} + +func (m ServiceMetricLevel) Tags() map[string]string { + tags := m.ApplicationMetricLevel.Tags() + tags[TagInterfaceKey] = m.Interface + return tags +} + +type MethodMetricLevel struct { + *ServiceMetricLevel + Method string + Group string + Version string +} + +func (m MethodMetricLevel) Tags() map[string]string { + tags := m.ServiceMetricLevel.Tags() + tags[TagMethodKey] = m.Method + tags[TagGroupKey] = m.Group + tags[TagVersionKey] = m.Version + return tags +} diff --git a/metrics/prometheus/registry.go b/metrics/prometheus/registry.go new file mode 100644 index 0000000000..977d9d216e --- /dev/null +++ b/metrics/prometheus/registry.go @@ -0,0 +1,172 @@ +package prometheus + +import ( + "bytes" + "sync" +) + +import ( + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/prometheus/common/expfmt" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/metrics" +) + +func init() { + metrics.SetRegistry("prometheus", func(rc *metrics.ReporterConfig) metrics.MetricRegistry { + return &promMetricRegistry{ + cvm: make(map[string]*prom.CounterVec), + gvm: make(map[string]*prom.GaugeVec), + hvm: make(map[string]*prom.HistogramVec), + svm: make(map[string]*prom.SummaryVec), + } + }) +} + +type promMetricRegistry struct { + mtx sync.RWMutex // Protects metrics. + cvm map[string]*prom.CounterVec // prom.CounterVec + gvm map[string]*prom.GaugeVec // prom.GaugeVec + hvm map[string]*prom.HistogramVec // prom.HistogramVec + svm map[string]*prom.SummaryVec // prom.SummaryVec +} + +func (p *promMetricRegistry) Counter(m *metrics.MetricId) metrics.CounterMetric { + p.mtx.RLock() + vec, ok := p.cvm[m.Name] + p.mtx.RUnlock() + if !ok { + p.mtx.Lock() + vec = promauto.NewCounterVec(prom.CounterOpts{ + Name: m.Name, + Help: m.Desc, + }, m.TagKeys()) + p.cvm[m.Name] = vec + p.mtx.Unlock() + } + c := vec.With(m.Tags) + return &counter{pc: c} +} + +func (p *promMetricRegistry) Gauge(m *metrics.MetricId) metrics.GaugeMetric { + p.mtx.RLock() + vec, ok := p.gvm[m.Name] + p.mtx.RUnlock() + if !ok { + p.mtx.Lock() + vec = promauto.NewGaugeVec(prom.GaugeOpts{ + Name: m.Name, + Help: m.Desc, + }, m.TagKeys()) + p.gvm[m.Name] = vec + p.mtx.Unlock() + } + g := vec.With(m.Tags) + return &gauge{pg: g} +} + +func (p *promMetricRegistry) Histogram(m *metrics.MetricId) metrics.HistogramMetric { + p.mtx.RLock() + vec, ok := p.hvm[m.Name] + p.mtx.RUnlock() + if !ok { + p.mtx.Lock() + vec = promauto.NewHistogramVec(prom.HistogramOpts{ + Name: m.Name, + Help: m.Desc, + }, m.TagKeys()) + p.hvm[m.Name] = vec + p.mtx.Unlock() + } + h := vec.With(m.Tags) + return &histogram{ph: h.(prom.Histogram)} +} + +func (p *promMetricRegistry) Summary(m *metrics.MetricId) metrics.SummaryMetric { + p.mtx.RLock() + vec, ok := p.svm[m.Name] + p.mtx.RUnlock() + if !ok { + p.mtx.Lock() + vec = promauto.NewSummaryVec(prom.SummaryOpts{ + Name: m.Name, + Help: m.Desc, + }, m.TagKeys()) + p.svm[m.Name] = vec + p.mtx.Unlock() + } + s := vec.With(m.Tags) + return &summary{ps: s.(prom.Summary)} +} + +func (p *promMetricRegistry) Export() { + +} + +func (p *promMetricRegistry) Scrape() (string, error) { + r := prom.DefaultRegisterer.(*prom.Registry) + gathering, err := r.Gather() + if err != nil { + return "", err + } + out := &bytes.Buffer{} + for _, mf := range gathering { + if _, err := expfmt.MetricFamilyToText(out, mf); err != nil { + return "", err + } + } + return out.String(), nil +} + +type counter struct { + pc prom.Counter +} + +func (c *counter) Inc() { + c.pc.Inc() +} +func (c *counter) Add(v float64) { + c.pc.Add(v) +} + +type gauge struct { + pg prom.Gauge +} + +// func (g *gauge) Inc() { +// g.pg.Inc() +// } +// +// func (g *gauge) Dec() { +// g.pg.Dec() +// } +func (g *gauge) Set(v float64) { + g.pg.Set(v) +} + +// func (g *gauge) Add(v float64) { +// g.pg.Add(v) +// } +// func (g *gauge) Sub(v float64) { +// g.pg.Sub(v) +// } + +type histogram struct { + ph prom.Histogram +} + +func (h *histogram) Record(v float64) { + h.ph.Observe(v) +} + +type summary struct { + ps prom.Summary +} + +func (s *summary) Record(v float64) { + s.ps.Observe(v) +} diff --git a/metrics/reporter.go b/metrics/reporter.go index 604d412bea..bf9693bb14 100644 --- a/metrics/reporter.go +++ b/metrics/reporter.go @@ -36,6 +36,7 @@ type ReporterConfig struct { Path string PushGatewayAddress string SummaryMaxAge int64 + Protocol string // MetricsRegistry 扩展配置 ,如:prometheus } type ReportMode string