diff --git a/common/constant/key.go b/common/constant/key.go index 514f81c998..b786b9951a 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -177,6 +177,7 @@ const ( const ( ApplicationKey = "application" ApplicationNameKey = "application_name" + ApplicationVersionKey = "application_version" HostnameKey = "hostname" IpKey = "ip" OrganizationKey = "organization" @@ -197,6 +198,10 @@ const ( ProvidersCategory = "providers" RouterKey = "router" ExportKey = "export" + GitCommitIdKey = "git_commit_id" + ConfigCenterKey = "config_center" + ChangeTypeKey = "change_type" + KeyKey = "key" ) // config center keys @@ -404,3 +409,10 @@ const ( LoggerFileLocalTimeKey = "logger.file.local-time" LoggerFileCompressKey = "logger.file.compress" ) + +// metrics key +const ( + MetricsRegistry = "dubbo.metrics.registry" + MetricsMetadata = "dubbo.metrics.metadata" + MetricApp = "dubbo.metrics.app" +) diff --git a/common/host_util.go b/common/host_util.go index ba36c0c8a6..5c411ab4a9 100644 --- a/common/host_util.go +++ b/common/host_util.go @@ -23,6 +23,7 @@ import ( ) import ( + "github.com/dubbogo/gost/log/logger" gxnet "github.com/dubbogo/gost/net" ) @@ -31,6 +32,7 @@ import ( ) var localIp string +var localHostname string func GetLocalIp() string { if len(localIp) != 0 { @@ -40,6 +42,18 @@ func GetLocalIp() string { return localIp } +func GetLocalHostName() string { + if len(localHostname) != 0 { + return localHostname + } + hostname, err := os.Hostname() + if err != nil { + logger.Errorf("can not get local hostname") + } + localHostname = hostname + return localHostname +} + func HandleRegisterIPAndPort(url *URL) { // if developer define registry port and ip, use it first. if ipToRegistry := os.Getenv(constant.DubboIpToRegistryKey); len(ipToRegistry) > 0 { diff --git a/config/config_center_config.go b/config/config_center_config.go index 5ed3f7d981..e206b8ae55 100644 --- a/config/config_center_config.go +++ b/config/config_center_config.go @@ -38,6 +38,9 @@ import ( "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/config_center" + "dubbo.apache.org/dubbo-go/v3/metrics" + metricsConfigCenter "dubbo.apache.org/dubbo-go/v3/metrics/config_center" + "dubbo.apache.org/dubbo-go/v3/remoting" ) // CenterConfig is configuration for config center @@ -146,6 +149,7 @@ func startConfigCenter(rc *RootConfig) error { logger.Warnf("[Config Center] Dynamic config center has started, but config may not be initialized, because: %s", err) return nil } + defer metrics.Publish(metricsConfigCenter.NewIncMetricEvent(cc.DataId, cc.Group, remoting.EventTypeAdd, cc.Protocol)) if len(strConf) == 0 { logger.Warnf("[Config Center] Dynamic config center has started, but got empty config with config-center configuration %+v\n"+ "Please check if your config-center config is correct.", cc) diff --git a/config/instance/metadata_report.go b/config/instance/metadata_report.go index 1cb5f639cb..16ba071219 100644 --- a/config/instance/metadata_report.go +++ b/config/instance/metadata_report.go @@ -34,11 +34,10 @@ var ( ) func GetMetadataReportInstance() report.MetadataReport { - if instance != nil { - return instance + if instance == nil { + instance = report.NewPubMetricEventReport(GetMetadataReportByRegistryProtocol("")) } - - return GetMetadataReportByRegistryProtocol("") + return instance } // SetMetadataReportInstance, init metadat report instance @@ -49,7 +48,7 @@ func SetMetadataReportInstance(selectiveUrl ...*common.URL) { url = selectiveUrl[0] fac := extension.GetMetadataReportFactory(url.Protocol) if fac != nil { - instance = fac.CreateMetadataReport(url) + instance = report.NewPubMetricEventReport(fac.CreateMetadataReport(url)) } reportUrl = url } diff --git a/config/metric_config.go b/config/metric_config.go index 3cc65c4ce8..a6b874987f 100644 --- a/config/metric_config.go +++ b/config/metric_config.go @@ -39,6 +39,7 @@ type MetricConfig struct { Path string `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"` PushGatewayAddress string `default:"" yaml:"push-gateway-address" json:"push-gateway-address,omitempty" property:"push-gateway-address"` SummaryMaxAge int64 `default:"600000000000" yaml:"summary-max-age" json:"summary-max-age,omitempty" property:"summary-max-age"` + Protocol string `default:"prometheus" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` } func (mc *MetricConfig) ToReporterConfig() *metrics.ReporterConfig { @@ -55,6 +56,7 @@ func (mc *MetricConfig) ToReporterConfig() *metrics.ReporterConfig { defaultMetricsReportConfig.Path = mc.Path defaultMetricsReportConfig.PushGatewayAddress = mc.PushGatewayAddress defaultMetricsReportConfig.SummaryMaxAge = mc.SummaryMaxAge + defaultMetricsReportConfig.Protocol = mc.Protocol return defaultMetricsReportConfig } @@ -68,7 +70,10 @@ func (mc *MetricConfig) Init() error { if err := verify(mc); err != nil { return err } - extension.GetMetricReporter("prometheus", mc.ToReporterConfig()) + metrics.InitAppInfo(GetRootConfig().Application.Name, GetRootConfig().Application.Version) + config := mc.ToReporterConfig() + extension.GetMetricReporter(mc.Protocol, config) + metrics.Init(config) return nil } @@ -91,7 +96,7 @@ func (mc *MetricConfig) DynamicUpdateProperties(newMetricConfig *MetricConfig) { mc.Enable = newMetricConfig.Enable logger.Infof("MetricConfig's Enable was dynamically updated, new value:%v", mc.Enable) - extension.GetMetricReporter("prometheus", mc.ToReporterConfig()) + extension.GetMetricReporter(mc.Protocol, mc.ToReporterConfig()) } } } diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go index defcafe5ba..58e9f083a3 100644 --- a/config_center/nacos/listener.go +++ b/config_center/nacos/listener.go @@ -31,11 +31,14 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/config_center" + "dubbo.apache.org/dubbo-go/v3/metrics" + metricsConfigCenter "dubbo.apache.org/dubbo-go/v3/metrics/config_center" "dubbo.apache.org/dubbo-go/v3/remoting" ) -func callback(listener config_center.ConfigurationListener, _, _, dataId, data string) { +func callback(listener config_center.ConfigurationListener, _, group, dataId, data string) { listener.Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate}) + metrics.Publish(metricsConfigCenter.NewIncMetricEvent(dataId, group, remoting.EventTypeUpdate, metricsConfigCenter.Nacos)) } func (n *nacosDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) { diff --git a/config_center/zookeeper/listener.go b/config_center/zookeeper/listener.go index 3d311799cd..12454d170d 100644 --- a/config_center/zookeeper/listener.go +++ b/config_center/zookeeper/listener.go @@ -25,6 +25,8 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/config_center" + "dubbo.apache.org/dubbo-go/v3/metrics" + metricsConfigCenter "dubbo.apache.org/dubbo-go/v3/metrics/config_center" "dubbo.apache.org/dubbo-go/v3/remoting" "dubbo.apache.org/dubbo-go/v3/remoting/zookeeper" ) @@ -73,10 +75,12 @@ func (l *CacheListener) DataChange(event remoting.Event) bool { changeType = remoting.EventTypeDel } + key, group := l.pathToKeyGroup(event.Path) + defer metrics.Publish(metricsConfigCenter.NewIncMetricEvent(key, group, changeType, metricsConfigCenter.Zookeeper)) if listeners, ok := l.keyListeners.Load(event.Path); ok { for listener := range listeners.(map[config_center.ConfigurationListener]struct{}) { listener.Process(&config_center.ConfigChangeEvent{ - Key: l.pathToKey(event.Path), + Key: key, Value: event.Content, ConfigType: changeType, }) @@ -86,10 +90,11 @@ func (l *CacheListener) DataChange(event remoting.Event) bool { return false } -func (l *CacheListener) pathToKey(path string) string { +func (l *CacheListener) pathToKeyGroup(path string) (string, string) { if len(path) == 0 { - return path + return path, "" } groupKey := strings.Replace(strings.Replace(path, l.rootPath+constant.PathSeparator, "", -1), constant.PathSeparator, constant.DotSeparator, -1) - return groupKey[strings.Index(groupKey, constant.DotSeparator)+1:] + index := strings.Index(groupKey, constant.DotSeparator) + return groupKey[index+1:], groupKey[0:index] } diff --git a/go.mod b/go.mod index 542118fc64..d661125701 100644 --- a/go.mod +++ b/go.mod @@ -47,6 +47,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/polarismesh/polaris-go v1.3.0 github.com/prometheus/client_golang v1.13.0 + github.com/prometheus/common v0.37.0 github.com/rogpeppe/go-internal v1.8.0 // indirect github.com/sirupsen/logrus v1.7.0 github.com/stretchr/testify v1.8.2 diff --git a/imports/imports.go b/imports/imports.go index c5868969bd..71dd06b246 100644 --- a/imports/imports.go +++ b/imports/imports.go @@ -64,6 +64,7 @@ import ( _ "dubbo.apache.org/dubbo-go/v3/metadata/service/exporter/configurable" _ "dubbo.apache.org/dubbo-go/v3/metadata/service/local" _ "dubbo.apache.org/dubbo-go/v3/metadata/service/remote" + _ "dubbo.apache.org/dubbo-go/v3/metrics/app_info" _ "dubbo.apache.org/dubbo-go/v3/metrics/prometheus" _ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo" _ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3" diff --git a/metadata/report/reporter_metric.go b/metadata/report/reporter_metric.go new file mode 100644 index 0000000000..5ca0995d15 --- /dev/null +++ b/metadata/report/reporter_metric.go @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package report + +import ( + "time" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/metadata/identifier" + "dubbo.apache.org/dubbo-go/v3/metrics" + "dubbo.apache.org/dubbo-go/v3/metrics/metadata" +) + +type PubMetricEventReport struct { + MetadataReport +} + +func NewPubMetricEventReport(r MetadataReport) MetadataReport { + return &PubMetricEventReport{MetadataReport: r} +} + +func (r *PubMetricEventReport) StoreProviderMetadata(i *identifier.MetadataIdentifier, s string) error { + event := metadata.NewMetadataMetricTimeEvent(metadata.StoreProvider) + err := r.MetadataReport.StoreProviderMetadata(i, s) + event.Succ = err == nil + event.End = time.Now() + event.Attachment[constant.InterfaceKey] = i.ServiceInterface + metrics.Publish(event) + return err +} + +func (r *PubMetricEventReport) GetAppMetadata(i *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) { + event := metadata.NewMetadataMetricTimeEvent(metadata.MetadataSub) + info, err := r.MetadataReport.GetAppMetadata(i) + event.Succ = err == nil + event.End = time.Now() + metrics.Publish(event) + return info, err +} + +func (r *PubMetricEventReport) PublishAppMetadata(i *identifier.SubscriberMetadataIdentifier, info *common.MetadataInfo) error { + event := metadata.NewMetadataMetricTimeEvent(metadata.MetadataPush) + err := r.MetadataReport.PublishAppMetadata(i, info) + event.Succ = err == nil + event.End = time.Now() + metrics.Publish(event) + return err +} diff --git a/metrics/api.go b/metrics/api.go new file mode 100644 index 0000000000..fdc69c1453 --- /dev/null +++ b/metrics/api.go @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metrics + +import ( + "sync" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate" +) + +var registries = make(map[string]func(*ReporterConfig) MetricRegistry) +var collectors = make([]CollectorFunc, 0) +var registry MetricRegistry + +// CollectorFunc used to extend more indicators +type CollectorFunc func(MetricRegistry, *ReporterConfig) + +// Init Metrics module +func Init(config *ReporterConfig) { + if config.Enable { + // defalut protocol is already set in metricConfig + regFunc, ok := registries[config.Protocol] + if ok { + registry = regFunc(config) + for _, co := range collectors { + co(registry, config) + } + registry.Export() + } + } +} + +// SetRegistry extend more MetricRegistry, default PrometheusRegistry +func SetRegistry(name string, v func(*ReporterConfig) MetricRegistry) { + registries[name] = v +} + +// AddCollector add more indicators, like metadata、sla、configcenter etc +func AddCollector(name string, fun func(MetricRegistry, *ReporterConfig)) { + collectors = append(collectors, fun) +} + +// MetricRegistry data container,data compute、expose、agg +type MetricRegistry interface { + Counter(*MetricId) CounterMetric // add or update a counter + Gauge(*MetricId) GaugeMetric // add or update a gauge + Histogram(*MetricId) HistogramMetric // add a metric num to a histogram + Summary(*MetricId) SummaryMetric // add a metric num to a summary + Export() // expose metric data, such as Prometheus http exporter + // GetMetrics() []*MetricSample // get all metric data + // GetMetricsString() (string, error) // get text format metric data +} + +// multi registry,like micrometer CompositeMeterRegistry +// type CompositeRegistry struct { +// rs []MetricRegistry +// } + +// Type metric type, save with micrometer +type Type uint8 + +const ( + Counter Type = iota + Gauge + LongTaskTimer + Timer + DistributionSummary + Other +) + +// MetricId +// # HELP dubbo_metadata_store_provider_succeed_total Succeed Store Provider Metadata +// # TYPE dubbo_metadata_store_provider_succeed_total gauge +// dubbo_metadata_store_provider_succeed_total{application_name="provider",hostname="localhost",interface="org.example.DemoService",ip="10.252.156.213",} 1.0 +// other properties except value +type MetricId struct { + Name string + Desc string + Tags map[string]string + Type Type +} + +func (m *MetricId) TagKeys() []string { + keys := make([]string, 0, len(m.Tags)) + for k := range m.Tags { + keys = append(keys, k) + } + return keys +} + +func NewMetricId(key *MetricKey, level MetricLevel) *MetricId { + return &MetricId{Name: key.Name, Desc: key.Desc, Tags: level.Tags()} +} + +// MetricSample a metric sample,This is the final data presentation, +// not an intermediate result(like summary,histogram they will export to a set of MetricSample) +type MetricSample struct { + *MetricId + value float64 +} + +// CounterMetric counter metric +type CounterMetric interface { + Inc() + Add(float64) +} + +// GaugeMetric gauge metric +type GaugeMetric interface { + Set(float64) + // Inc() + // Dec() + // Add(float64) + // Sub(float64) +} + +// HistogramMetric histogram metric +type HistogramMetric interface { + Record(float64) +} + +// SummaryMetric summary metric +type SummaryMetric interface { + Record(float64) +} + +// StatesMetrics multi metrics,include total,success num, fail num,call MetricsRegistry save data +type StatesMetrics interface { + Success() + AddSuccess(float64) + Fail() + AddFailed(float64) + Inc(succ bool) +} + +func NewStatesMetrics(total *MetricId, succ *MetricId, fail *MetricId, reg MetricRegistry) StatesMetrics { + return &DefaultStatesMetric{total: total, succ: succ, fail: fail, r: reg} +} + +type DefaultStatesMetric struct { + r MetricRegistry + total, succ, fail *MetricId +} + +func (c DefaultStatesMetric) Inc(succ bool) { + if succ { + c.Success() + } else { + c.Fail() + } +} +func (c DefaultStatesMetric) Success() { + c.r.Counter(c.total).Inc() + c.r.Counter(c.succ).Inc() +} + +func (c DefaultStatesMetric) AddSuccess(v float64) { + c.r.Counter(c.total).Add(v) + c.r.Counter(c.succ).Add(v) +} + +func (c DefaultStatesMetric) Fail() { + c.r.Counter(c.total).Inc() + c.r.Counter(c.fail).Inc() +} + +func (c DefaultStatesMetric) AddFailed(v float64) { + c.r.Counter(c.total).Add(v) + c.r.Counter(c.fail).Add(v) +} + +// TimeMetric muliti metrics, include min(Gauge)、max(Gauge)、avg(Gauge)、sum(Gauge)、last(Gauge),call MetricRegistry to expose +// see dubbo-java org.apache.dubbo.metrics.aggregate.TimeWindowAggregator +type TimeMetric interface { + Record(float64) +} + +const ( + defaultBucketNum = 10 + defalutTimeWindowSeconds = 120 +) + +// NewTimeMetric init and write all data to registry +func NewTimeMetric(min, max, avg, sum, last *MetricId, mr MetricRegistry) TimeMetric { + return &DefaultTimeMetric{r: mr, min: min, max: max, avg: avg, sum: sum, last: last, + agg: aggregate.NewTimeWindowAggregator(defaultBucketNum, defalutTimeWindowSeconds)} +} + +type DefaultTimeMetric struct { + r MetricRegistry + agg *aggregate.TimeWindowAggregator + min, max, avg, sum, last *MetricId +} + +func (m *DefaultTimeMetric) Record(v float64) { + m.agg.Add(v) + result := m.agg.Result() + m.r.Gauge(m.max).Set(result.Max) + m.r.Gauge(m.min).Set(result.Min) + m.r.Gauge(m.avg).Set(result.Avg) + m.r.Gauge(m.sum).Set(result.Total) + m.r.Gauge(m.last).Set(v) +} + +// cache if needed, TimeMetrics must cached +var metricsCache map[string]interface{} = make(map[string]interface{}) +var metricsCacheMutex sync.RWMutex + +func ComputeIfAbsentCache(key string, supplier func() interface{}) interface{} { + metricsCacheMutex.RLock() + v, ok := metricsCache[key] + metricsCacheMutex.RUnlock() + if ok { + return v + } else { + metricsCacheMutex.Lock() + defer metricsCacheMutex.Unlock() + n := supplier() + metricsCache[key] = n + return n + } +} diff --git a/metrics/app_info/collector.go b/metrics/app_info/collector.go new file mode 100644 index 0000000000..a2bae022fe --- /dev/null +++ b/metrics/app_info/collector.go @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app_info + +import ( + "dubbo.apache.org/dubbo-go/v3/metrics" +) + +/* + * # HELP dubbo_application_info_total Total Application Info + * # TYPE dubbo_application_info_total counter + * dubbo_application_info_total{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="localhost",ip="127.0.0.1",} 1.0 + */ +var info = metrics.NewMetricKey("dubbo_application_info_total", "Total Application Info") // Total Application Info include application name、version etc + +func init() { + metrics.AddCollector("application_info", func(mr metrics.MetricRegistry, config *metrics.ReporterConfig) { + mr.Counter(&metrics.MetricId{Name: info.Name, Desc: info.Desc, Tags: metrics.GetApplicationLevel().Tags()}).Inc() + }) +} diff --git a/metrics/bus.go b/metrics/bus.go new file mode 100644 index 0000000000..333d636ab3 --- /dev/null +++ b/metrics/bus.go @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metrics + +import ( + "sync" +) + +// eventListener is a struct that encapsulates the listener map and provides thread-safe access to it. +type eventListener struct { + mu sync.RWMutex + listener map[string]chan MetricsEvent +} + +var listener = &eventListener{ + listener: make(map[string]chan MetricsEvent), +} + +// Publish publishes an event to all subscribers of the same type. +func Publish(event MetricsEvent) { + listener.mu.RLock() + defer listener.mu.RUnlock() + + if ch, ok := listener.listener[event.Type()]; ok { + select { + case ch <- event: + default: + // If the channel is full, drop the event to avoid blocking. + } + } +} + +// Subscribe subscribes to events of the given type. +func Subscribe(typ string, ch chan MetricsEvent) { + listener.mu.Lock() + defer listener.mu.Unlock() + + listener.listener[typ] = ch +} + +// Unsubscribe unsubscribes from events of the given type. +func Unsubscribe(typ string) { + listener.mu.Lock() + defer listener.mu.Unlock() + + if ch, ok := listener.listener[typ]; ok { + close(ch) + delete(listener.listener, typ) + } +} diff --git a/metrics/bus_test.go b/metrics/bus_test.go new file mode 100644 index 0000000000..185f50896f --- /dev/null +++ b/metrics/bus_test.go @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metrics + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +var mockChan = make(chan MetricsEvent, 16) + +type MockEvent struct { +} + +func (m MockEvent) Type() string { + return "dubbo.metrics.mock" +} + +func NewEmptyMockEvent() *MockEvent { + return &MockEvent{} +} + +func init() { + Subscribe("dubbo.metrics.mock", mockChan) + Publish(NewEmptyMockEvent()) +} + +func TestBusPublish(t *testing.T) { + t.Run("testBusPublish", func(t *testing.T) { + event := <-mockChan + + if event, ok := event.(MockEvent); ok { + assert.Equal(t, event, NewEmptyMockEvent()) + } + }) +} diff --git a/metrics/common.go b/metrics/common.go new file mode 100644 index 0000000000..f0ce9cf9d7 --- /dev/null +++ b/metrics/common.go @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metrics + +import ( + "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/common/constant" +) + +type MetricKey struct { + Name string + Desc string +} + +func NewMetricKey(name string, desc string) *MetricKey { + return &MetricKey{Name: name, Desc: desc} +} + +type MetricLevel interface { + Tags() map[string]string +} + +type ApplicationMetricLevel struct { + ApplicationName string + Version string + GitCommitId string + Ip string + HostName string +} + +var applicationName string +var applicationVersion string + +// cannot import rootConfig,may cause cycle import,so be it +func InitAppInfo(appName string, appVersion string) { + applicationName = appName + applicationVersion = appVersion +} + +func GetApplicationLevel() *ApplicationMetricLevel { + return &ApplicationMetricLevel{ + ApplicationName: applicationName, + Version: applicationVersion, + Ip: common.GetLocalIp(), + HostName: common.GetLocalHostName(), + GitCommitId: "", + } +} + +func (m *ApplicationMetricLevel) Tags() map[string]string { + tags := make(map[string]string) + tags[constant.IpKey] = m.Ip + tags[constant.HostnameKey] = m.HostName + tags[constant.ApplicationKey] = m.ApplicationName + tags[constant.ApplicationVersionKey] = m.Version + tags[constant.GitCommitIdKey] = m.GitCommitId + return tags +} + +type ServiceMetricLevel struct { + *ApplicationMetricLevel + Interface string +} + +func NewServiceMetric(interfaceName string) *ServiceMetricLevel { + return &ServiceMetricLevel{ApplicationMetricLevel: GetApplicationLevel(), Interface: interfaceName} +} + +func (m ServiceMetricLevel) Tags() map[string]string { + tags := m.ApplicationMetricLevel.Tags() + tags[constant.InterfaceKey] = m.Interface + return tags +} + +type MethodMetricLevel struct { + *ServiceMetricLevel + Method string + Group string + Version string +} + +func (m MethodMetricLevel) Tags() map[string]string { + tags := m.ServiceMetricLevel.Tags() + tags[constant.MethodKey] = m.Method + tags[constant.GroupKey] = m.Group + tags[constant.VersionKey] = m.Version + return tags +} + +type ConfigCenterLevel struct { + ApplicationName string + Ip string + HostName string + Key string + Group string + ConfigCenter string + ChangeType string +} + +func NewConfigCenterLevel(key string, group string, configCenter string, changeType string) *ConfigCenterLevel { + return &ConfigCenterLevel{ + ApplicationName: applicationName, + Ip: common.GetLocalIp(), + HostName: common.GetLocalHostName(), + Key: key, + Group: group, + ConfigCenter: configCenter, + ChangeType: changeType, + } +} + +func (l ConfigCenterLevel) Tags() map[string]string { + tags := make(map[string]string) + tags[constant.ApplicationKey] = l.ApplicationName + tags[constant.IpKey] = l.Ip + tags[constant.HostnameKey] = l.HostName + tags[constant.KeyKey] = l.Key + tags[constant.GroupKey] = l.Group + tags[constant.ConfigCenterKey] = l.ConfigCenter + tags[constant.ChangeTypeKey] = l.ChangeType + return tags +} diff --git a/metrics/config_center/collector.go b/metrics/config_center/collector.go new file mode 100644 index 0000000000..7236501882 --- /dev/null +++ b/metrics/config_center/collector.go @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metrics + +import ( + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/metrics" + "dubbo.apache.org/dubbo-go/v3/remoting" +) + +const eventType = constant.MetricApp + +var ch = make(chan metrics.MetricsEvent, 10) +var info = metrics.NewMetricKey("dubbo_configcenter_total", "Config Changed Total") + +func init() { + metrics.AddCollector("application_info", func(mr metrics.MetricRegistry, config *metrics.ReporterConfig) { + c := &configCenterCollector{r: mr} + c.start() + }) +} + +type configCenterCollector struct { + r metrics.MetricRegistry +} + +func (c *configCenterCollector) start() { + metrics.Subscribe(eventType, ch) + go func() { + for e := range ch { + if event, ok := e.(*ConfigCenterMetricEvent); ok { + c.handleDataChange(event) + } + } + }() +} + +func (c *configCenterCollector) handleDataChange(event *ConfigCenterMetricEvent) { + id := metrics.NewMetricId(info, metrics.NewConfigCenterLevel(event.key, event.group, event.configCenter, event.getChangeType())) + c.r.Counter(id).Add(event.size) +} + +const ( + Nacos = "nacos" + Apollo = "apollo" + Zookeeper = "zookeeper" +) + +type ConfigCenterMetricEvent struct { + // Name MetricName + key string + group string + configCenter string + changeType remoting.EventType + size float64 +} + +func (e *ConfigCenterMetricEvent) getChangeType() string { + switch e.changeType { + case remoting.EventTypeAdd: + return "added" + case remoting.EventTypeDel: + return "deleted" + case remoting.EventTypeUpdate: + return "modified" + default: + return "" + } +} + +func (*ConfigCenterMetricEvent) Type() string { + return eventType +} + +func NewIncMetricEvent(key, group string, changeType remoting.EventType, c string) *ConfigCenterMetricEvent { + return &ConfigCenterMetricEvent{key: key, group: group, changeType: changeType, configCenter: c, size: 1} +} diff --git a/metrics/event.go b/metrics/event.go new file mode 100644 index 0000000000..f78589af98 --- /dev/null +++ b/metrics/event.go @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metrics + +// MetricsEvent represents an event that can be published and subscribed to. +type MetricsEvent interface { + Type() string +} diff --git a/metrics/metadata/collector.go b/metrics/metadata/collector.go new file mode 100644 index 0000000000..16740d3567 --- /dev/null +++ b/metrics/metadata/collector.go @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metadata + +import ( + "time" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/metrics" +) + +const eventType = constant.MetricsMetadata + +var ch = make(chan metrics.MetricsEvent, 10) + +func init() { + metrics.AddCollector("metadata", func(mr metrics.MetricRegistry, rc *metrics.ReporterConfig) { + l := &MetadataMetricCollector{r: mr} + l.start() + }) +} + +type MetadataMetricCollector struct { + r metrics.MetricRegistry +} + +func (c *MetadataMetricCollector) start() { + metrics.Subscribe(eventType, ch) + go func() { + for e := range ch { + if event, ok := e.(*MetadataMetricEvent); ok { + switch event.Name { + case StoreProvider: + c.handleStoreProvider(event) + case MetadataPush: + c.handleMetadataPush(event) + case MetadataSub: + c.handleMetadataSub(event) + case SubscribeServiceRt: + c.handleSubscribeService(event) + default: + } + } + } + }() +} + +func (c *MetadataMetricCollector) handleMetadataPush(event *MetadataMetricEvent) { + m := metrics.ComputeIfAbsentCache(dubboMetadataPush, func() interface{} { + return newStatesMetricFunc(metadataPushNum, metadataPushNumSucceed, metadataPushNumFailed, metrics.GetApplicationLevel(), c.r) + }).(metrics.StatesMetrics) + m.Inc(event.Succ) + metric := metrics.ComputeIfAbsentCache(dubboPushRt, func() interface{} { + return newTimeMetrics(pushRtMin, pushRtMax, pushRtAvg, pushRtSum, pushRtLast, metrics.GetApplicationLevel(), c.r) + }).(metrics.TimeMetric) + metric.Record(event.CostMs()) +} + +func (c *MetadataMetricCollector) handleMetadataSub(event *MetadataMetricEvent) { + m := metrics.ComputeIfAbsentCache(dubboMetadataSubscribe, func() interface{} { + return newStatesMetricFunc(metadataSubNum, metadataSubNumSucceed, metadataSubNumFailed, metrics.GetApplicationLevel(), c.r) + }).(metrics.StatesMetrics) + m.Inc(event.Succ) + metric := metrics.ComputeIfAbsentCache(dubboSubscribeRt, func() interface{} { + return newTimeMetrics(subscribeRtMin, subscribeRtMax, subscribeRtAvg, subscribeRtSum, subscribeRtLast, metrics.GetApplicationLevel(), c.r) + }).(metrics.TimeMetric) + metric.Record(event.CostMs()) +} + +func (c *MetadataMetricCollector) handleStoreProvider(event *MetadataMetricEvent) { + interfaceName := event.Attachment[constant.InterfaceKey] + m := metrics.ComputeIfAbsentCache(dubboMetadataStoreProvider+":"+interfaceName, func() interface{} { + return newStatesMetricFunc(metadataStoreProvider, metadataStoreProviderSucceed, metadataStoreProviderFailed, + metrics.NewServiceMetric(interfaceName), c.r) + }).(metrics.StatesMetrics) + m.Inc(event.Succ) + metric := metrics.ComputeIfAbsentCache(dubboStoreProviderInterfaceRt+":"+interfaceName, func() interface{} { + return newTimeMetrics(storeProviderInterfaceRtMin, storeProviderInterfaceRtMax, storeProviderInterfaceRtAvg, + storeProviderInterfaceRtSum, storeProviderInterfaceRtLast, metrics.NewServiceMetric(interfaceName), c.r) + }).(metrics.TimeMetric) + metric.Record(event.CostMs()) +} + +func (c *MetadataMetricCollector) handleSubscribeService(event *MetadataMetricEvent) { + interfaceName := event.Attachment[constant.InterfaceKey] + metric := metrics.ComputeIfAbsentCache(dubboSubscribeServiceRt+":"+interfaceName, func() interface{} { + return newTimeMetrics(subscribeServiceRtMin, subscribeServiceRtMax, subscribeServiceRtAvg, subscribeServiceRtSum, + subscribeServiceRtLast, metrics.NewServiceMetric(interfaceName), c.r) + }).(metrics.TimeMetric) + metric.Record(event.CostMs()) +} + +func newStatesMetricFunc(total *metrics.MetricKey, succ *metrics.MetricKey, fail *metrics.MetricKey, + level metrics.MetricLevel, reg metrics.MetricRegistry) metrics.StatesMetrics { + return metrics.NewStatesMetrics(metrics.NewMetricId(total, level), metrics.NewMetricId(succ, level), + metrics.NewMetricId(fail, level), reg) +} + +func newTimeMetrics(min, max, avg, sum, last *metrics.MetricKey, level metrics.MetricLevel, mr metrics.MetricRegistry) metrics.TimeMetric { + return metrics.NewTimeMetric(metrics.NewMetricId(min, level), metrics.NewMetricId(max, level), metrics.NewMetricId(avg, level), + metrics.NewMetricId(sum, level), metrics.NewMetricId(last, level), mr) +} + +type MetadataMetricEvent struct { + Name MetricName + Succ bool + Start time.Time + End time.Time + Attachment map[string]string +} + +func (*MetadataMetricEvent) Type() string { + return eventType +} + +func (e *MetadataMetricEvent) CostMs() float64 { + return float64(e.End.Sub(e.Start)) / float64(time.Millisecond) +} + +func NewMetadataMetricTimeEvent(n MetricName) *MetadataMetricEvent { + return &MetadataMetricEvent{Name: n, Start: time.Now(), Attachment: make(map[string]string)} +} diff --git a/metrics/metadata/metric_set.go b/metrics/metadata/metric_set.go new file mode 100644 index 0000000000..e7ade6e57c --- /dev/null +++ b/metrics/metadata/metric_set.go @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metadata + +import ( + "dubbo.apache.org/dubbo-go/v3/metrics" +) + +type MetricName int8 + +const ( + MetadataPush MetricName = iota + MetadataSub + StoreProvider + // PushRt + // SubscribeRt + // StoreProviderInterfaceRt + SubscribeServiceRt +) + +const ( + dubboMetadataPush = "dubbo_metadata_push_num" + dubboPushRt = "dubbo_push_rt_milliseconds" + dubboMetadataSubscribe = "dubbo_metadata_subscribe_num" + dubboSubscribeRt = "dubbo_subscribe_rt_milliseconds" + dubboMetadataStoreProvider = "dubbo_metadata_store_provider" + dubboStoreProviderInterfaceRt = "dubbo_store_provider_interface_rt_milliseconds" + dubboSubscribeServiceRt = "dubbo_subscribe_service_rt_milliseconds" +) + +const ( + totalSuffix = "_total" + succSuffix = "_succeed_total" + failedSuffix = "_failed_total" + sumSuffix = "_sum" + lastSuffix = "_last" + minSuffix = "_min" + maxSuffix = "_max" + avgSuffix = "_avg" +) + +var ( + // app level + metadataPushNum = metrics.NewMetricKey(dubboMetadataPush+totalSuffix, "Total Num") + metadataPushNumSucceed = metrics.NewMetricKey(dubboMetadataPush+succSuffix, "Succeed Push Num") + metadataPushNumFailed = metrics.NewMetricKey(dubboMetadataPush+failedSuffix, "Failed Push Num") + // app level + metadataSubNum = metrics.NewMetricKey(dubboMetadataSubscribe+totalSuffix, "Total Metadata Subscribe Num") + metadataSubNumSucceed = metrics.NewMetricKey(dubboMetadataSubscribe+succSuffix, "Succeed Metadata Subscribe Num") + metadataSubNumFailed = metrics.NewMetricKey(dubboMetadataSubscribe+failedSuffix, "Failed Metadata Subscribe Num") + // app level + pushRtSum = metrics.NewMetricKey(dubboPushRt+sumSuffix, "Sum Response Time") + pushRtLast = metrics.NewMetricKey(dubboPushRt+lastSuffix, "Last Response Time") + pushRtMin = metrics.NewMetricKey(dubboPushRt+minSuffix, "Min Response Time") + pushRtMax = metrics.NewMetricKey(dubboPushRt+maxSuffix, "Max Response Time") + pushRtAvg = metrics.NewMetricKey(dubboPushRt+avgSuffix, "Average Response Time") + // app level + subscribeRtSum = metrics.NewMetricKey(dubboSubscribeRt+sumSuffix, "Sum Response Time") + subscribeRtLast = metrics.NewMetricKey(dubboSubscribeRt+lastSuffix, "Last Response Time") + subscribeRtMin = metrics.NewMetricKey(dubboSubscribeRt+minSuffix, "Min Response Time") + subscribeRtMax = metrics.NewMetricKey(dubboSubscribeRt+maxSuffix, "Max Response Time") + subscribeRtAvg = metrics.NewMetricKey(dubboSubscribeRt+avgSuffix, "Average Response Time") + + /* + # HELP dubbo_metadata_store_provider_succeed_total Succeed Store Provider Metadata + # TYPE dubbo_metadata_store_provider_succeed_total gauge + dubbo_metadata_store_provider_succeed_total{application_name="metrics-provider",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService2",ip="10.252.156.213",} 1.0 + dubbo_metadata_store_provider_succeed_total{application_name="metrics-provider",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="10.252.156.213",} 1.0 + */ + // service level + metadataStoreProviderFailed = metrics.NewMetricKey(dubboMetadataStoreProvider+failedSuffix, "Total Failed Provider Metadata Store") + metadataStoreProviderSucceed = metrics.NewMetricKey(dubboMetadataStoreProvider+succSuffix, "Total Succeed Provider Metadata Store") + metadataStoreProvider = metrics.NewMetricKey(dubboMetadataStoreProvider+totalSuffix, "Total Provider Metadata Store") + + /* + # HELP dubbo_store_provider_interface_rt_milliseconds_avg Average Response Time + # TYPE dubbo_store_provider_interface_rt_milliseconds_avg gauge + dubbo_store_provider_interface_rt_milliseconds_avg{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="10.252.156.213",} 504.0 + dubbo_store_provider_interface_rt_milliseconds_avg{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService2",ip="10.252.156.213",} 10837.0 + */ + // service level + storeProviderInterfaceRtAvg = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+avgSuffix, "Average Store Provider Interface Time") + storeProviderInterfaceRtLast = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+lastSuffix, "Last Store Provider Interface Time") + storeProviderInterfaceRtMax = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+maxSuffix, "Max Store Provider Interface Time") + storeProviderInterfaceRtMin = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+minSuffix, "Min Store Provider Interface Time") + storeProviderInterfaceRtSum = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+sumSuffix, "Sum Store Provider Interface Time") + + subscribeServiceRtLast = metrics.NewMetricKey(dubboSubscribeServiceRt+lastSuffix, "Last Subscribe Service Time") + subscribeServiceRtMax = metrics.NewMetricKey(dubboSubscribeServiceRt+maxSuffix, "Max Subscribe Service Time") + subscribeServiceRtMin = metrics.NewMetricKey(dubboSubscribeServiceRt+minSuffix, "Min Subscribe Service Time") + subscribeServiceRtSum = metrics.NewMetricKey(dubboSubscribeServiceRt+sumSuffix, "Sum Subscribe Service Time") + subscribeServiceRtAvg = metrics.NewMetricKey(dubboSubscribeServiceRt+avgSuffix, "Average Subscribe Service Time") +) diff --git a/metrics/prometheus/registry.go b/metrics/prometheus/registry.go new file mode 100644 index 0000000000..70946d6b31 --- /dev/null +++ b/metrics/prometheus/registry.go @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package prometheus + +import ( + "bytes" + "sync" +) + +import ( + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/prometheus/common/expfmt" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/metrics" +) + +func init() { + metrics.SetRegistry("prometheus", func(rc *metrics.ReporterConfig) metrics.MetricRegistry { + return &promMetricRegistry{ + cvm: make(map[string]*prom.CounterVec), + gvm: make(map[string]*prom.GaugeVec), + hvm: make(map[string]*prom.HistogramVec), + svm: make(map[string]*prom.SummaryVec), + } + }) +} + +type promMetricRegistry struct { + mtx sync.RWMutex // Protects metrics. + cvm map[string]*prom.CounterVec // prom.CounterVec + gvm map[string]*prom.GaugeVec // prom.GaugeVec + hvm map[string]*prom.HistogramVec // prom.HistogramVec + svm map[string]*prom.SummaryVec // prom.SummaryVec +} + +func (p *promMetricRegistry) Counter(m *metrics.MetricId) metrics.CounterMetric { + p.mtx.RLock() + vec, ok := p.cvm[m.Name] + p.mtx.RUnlock() + if !ok { + p.mtx.Lock() + vec = promauto.NewCounterVec(prom.CounterOpts{ + Name: m.Name, + Help: m.Desc, + }, m.TagKeys()) + p.cvm[m.Name] = vec + p.mtx.Unlock() + } + c := vec.With(m.Tags) + return &counter{pc: c} +} + +func (p *promMetricRegistry) Gauge(m *metrics.MetricId) metrics.GaugeMetric { + p.mtx.RLock() + vec, ok := p.gvm[m.Name] + p.mtx.RUnlock() + if !ok { + p.mtx.Lock() + vec = promauto.NewGaugeVec(prom.GaugeOpts{ + Name: m.Name, + Help: m.Desc, + }, m.TagKeys()) + p.gvm[m.Name] = vec + p.mtx.Unlock() + } + g := vec.With(m.Tags) + return &gauge{pg: g} +} + +func (p *promMetricRegistry) Histogram(m *metrics.MetricId) metrics.HistogramMetric { + p.mtx.RLock() + vec, ok := p.hvm[m.Name] + p.mtx.RUnlock() + if !ok { + p.mtx.Lock() + vec = promauto.NewHistogramVec(prom.HistogramOpts{ + Name: m.Name, + Help: m.Desc, + }, m.TagKeys()) + p.hvm[m.Name] = vec + p.mtx.Unlock() + } + h := vec.With(m.Tags) + return &histogram{ph: h.(prom.Histogram)} +} + +func (p *promMetricRegistry) Summary(m *metrics.MetricId) metrics.SummaryMetric { + p.mtx.RLock() + vec, ok := p.svm[m.Name] + p.mtx.RUnlock() + if !ok { + p.mtx.Lock() + vec = promauto.NewSummaryVec(prom.SummaryOpts{ + Name: m.Name, + Help: m.Desc, + }, m.TagKeys()) + p.svm[m.Name] = vec + p.mtx.Unlock() + } + s := vec.With(m.Tags) + return &summary{ps: s.(prom.Summary)} +} + +func (p *promMetricRegistry) Export() { + +} + +func (p *promMetricRegistry) Scrape() (string, error) { + r := prom.DefaultRegisterer.(*prom.Registry) + gathering, err := r.Gather() + if err != nil { + return "", err + } + out := &bytes.Buffer{} + for _, mf := range gathering { + if _, err := expfmt.MetricFamilyToText(out, mf); err != nil { + return "", err + } + } + return out.String(), nil +} + +type counter struct { + pc prom.Counter +} + +func (c *counter) Inc() { + c.pc.Inc() +} +func (c *counter) Add(v float64) { + c.pc.Add(v) +} + +type gauge struct { + pg prom.Gauge +} + +// func (g *gauge) Inc() { +// g.pg.Inc() +// } +// +// func (g *gauge) Dec() { +// g.pg.Dec() +// } +func (g *gauge) Set(v float64) { + g.pg.Set(v) +} + +// func (g *gauge) Add(v float64) { +// g.pg.Add(v) +// } +// func (g *gauge) Sub(v float64) { +// g.pg.Sub(v) +// } + +type histogram struct { + ph prom.Histogram +} + +func (h *histogram) Record(v float64) { + h.ph.Observe(v) +} + +type summary struct { + ps prom.Summary +} + +func (s *summary) Record(v float64) { + s.ps.Observe(v) +} diff --git a/metrics/registry/collector.go b/metrics/registry/collector.go new file mode 100644 index 0000000000..7479fd23d5 --- /dev/null +++ b/metrics/registry/collector.go @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package registry + +import ( + "time" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/metrics" +) + +var ( + registryChan = make(chan metrics.MetricsEvent, 128) +) + +func init() { + metrics.AddCollector("registry", func(m metrics.MetricRegistry, c *metrics.ReporterConfig) { + rc := ®istryCollector{regRegistry: m} + go rc.start() + }) +} + +// registryCollector is the registry's metrics collector +type registryCollector struct { + regRegistry metrics.MetricRegistry +} + +func (rc *registryCollector) start() { + metrics.Subscribe(constant.MetricsRegistry, registryChan) + for event := range registryChan { + if registryEvent, ok := event.(*RegistryMetricsEvent); ok { + switch registryEvent.Name { + case Reg: + rc.regHandler(registryEvent) + case Sub: + rc.subHandler(registryEvent) + case Notify: + rc.notifyHandler(registryEvent) + case ServerReg: + rc.serverRegHandler(registryEvent) + case ServerSub: + rc.serverSubHandler(registryEvent) + default: + } + } + } +} + +func newStatesMetricFunc(total *metrics.MetricKey, succ *metrics.MetricKey, fail *metrics.MetricKey, + level metrics.MetricLevel, reg metrics.MetricRegistry) metrics.StatesMetrics { + return metrics.NewStatesMetrics(metrics.NewMetricId(total, level), metrics.NewMetricId(succ, level), + metrics.NewMetricId(fail, level), reg) +} + +func newTimeMetrics(min, max, avg, sum, last *metrics.MetricKey, level metrics.MetricLevel, mr metrics.MetricRegistry) metrics.TimeMetric { + return metrics.NewTimeMetric(metrics.NewMetricId(min, level), metrics.NewMetricId(max, level), metrics.NewMetricId(avg, level), + metrics.NewMetricId(sum, level), metrics.NewMetricId(last, level), mr) +} + +// regHandler handles register metrics +func (rc *registryCollector) regHandler(event *RegistryMetricsEvent) { + // Event is converted to metrics + // Save metrics to the MetricRegistry + m := metrics.ComputeIfAbsentCache(dubboRegNum, func() interface{} { + return newStatesMetricFunc(RegisterMetricRequests, RegisterMetricRequestsSucceed, RegisterMetricRequestsFailed, metrics.GetApplicationLevel(), rc.regRegistry) + }).(metrics.StatesMetrics) + m.Inc(event.Succ) + metric := metrics.ComputeIfAbsentCache(dubboRegRt, func() interface{} { + return newTimeMetrics(RegisterRtMillisecondsMin, RegisterRtMillisecondsMax, RegisterRtMillisecondsAvg, RegisterRtMillisecondsSum, RegisterRtMillisecondsLast, metrics.GetApplicationLevel(), rc.regRegistry) + }).(metrics.TimeMetric) + metric.Record(event.CostMs()) +} + +// subHandler handles subscribe metrics +func (rc *registryCollector) subHandler(event *RegistryMetricsEvent) { + // Event is converted to metrics + // Save metrics to the MetricRegistry + m := newStatesMetricFunc(SubscribeMetricNum, SubscribeMetricNumSucceed, SubscribeMetricNumFailed, metrics.GetApplicationLevel(), rc.regRegistry) + m.Inc(event.Succ) +} + +// notifyHandler handles notify metrics +func (rc *registryCollector) notifyHandler(event *RegistryMetricsEvent) { + // Event is converted to metrics + // Save metrics to the MetricRegistry + rc.regRegistry.Counter(metrics.NewMetricId(NotifyMetricRequests, metrics.GetApplicationLevel())).Inc() + rc.regRegistry.Histogram(metrics.NewMetricId(NotifyMetricNumLast, metrics.GetApplicationLevel())).Record(float64(event.End.UnixNano()) / float64(time.Second)) + metric := metrics.ComputeIfAbsentCache(dubboNotifyRt, func() interface{} { + return newTimeMetrics(NotifyRtMillisecondsMin, NotifyRtMillisecondsMax, NotifyRtMillisecondsAvg, NotifyRtMillisecondsSum, NotifyRtMillisecondsLast, metrics.GetApplicationLevel(), rc.regRegistry) + }).(metrics.TimeMetric) + metric.Record(event.CostMs()) +} + +// directoryHandler handles directory metrics +func (rc *registryCollector) directoryHandler(event *RegistryMetricsEvent) { + // Event is converted to metrics + // Save metrics to the MetricRegistry + level := metrics.GetApplicationLevel() + typ := event.Attachment["DirTyp"] + switch typ { + case NumAllInc: + rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumAll, level)).Inc() + case NumAllDec: + rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumAll, level)).Add(-1) + case NumDisableTotal: + rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumDisable, level)).Inc() + case NumToReconnectTotal: + rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumToReconnect, level)).Inc() + case NumValidTotal: + rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumValid, level)).Inc() + default: + } + +} + +// serverRegHandler handles server register metrics +func (rc *registryCollector) serverRegHandler(event *RegistryMetricsEvent) { + // Event is converted to metrics + // Save metrics to the MetricRegistry + m := metrics.ComputeIfAbsentCache(dubboRegServerNum, func() interface{} { + return newStatesMetricFunc(ServiceRegisterMetricRequests, ServiceRegisterMetricRequestsSucceed, ServiceRegisterMetricRequestsFailed, metrics.GetApplicationLevel(), rc.regRegistry) + }).(metrics.StatesMetrics) + m.Inc(event.Succ) + metric := metrics.ComputeIfAbsentCache(dubboRegServerRt, func() interface{} { + return newTimeMetrics(RegisterServiceRtMillisecondsMin, RegisterServiceRtMillisecondsMax, RegisterServiceRtMillisecondsAvg, RegisterServiceRtMillisecondsSum, RegisterServiceRtMillisecondsLast, metrics.GetApplicationLevel(), rc.regRegistry) + }).(metrics.TimeMetric) + metric.Record(event.CostMs()) +} + +// serverSubHandler handles server subscribe metrics +func (rc *registryCollector) serverSubHandler(event *RegistryMetricsEvent) { + // Event is converted to metrics + // Save metrics to the MetricRegistry + m := newStatesMetricFunc(ServiceSubscribeMetricNum, ServiceSubscribeMetricNumSucceed, ServiceSubscribeMetricNumFailed, metrics.GetApplicationLevel(), rc.regRegistry) + m.Inc(event.Succ) +} diff --git a/metrics/registry/event.go b/metrics/registry/event.go new file mode 100644 index 0000000000..87b135aab0 --- /dev/null +++ b/metrics/registry/event.go @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package registry + +import ( + "time" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/metrics" +) + +// RegistryMetricsEvent contains info about register metrics +type RegistryMetricsEvent struct { + Name MetricName + Succ bool + Start time.Time + End time.Time + Attachment map[string]string +} + +func (r RegistryMetricsEvent) Type() string { + return constant.MetricsRegistry +} + +func (r *RegistryMetricsEvent) CostMs() float64 { + return float64(r.End.Sub(r.Start)) / float64(time.Millisecond) +} + +// NewRegisterEvent for register metrics +func NewRegisterEvent(succ bool, start time.Time) metrics.MetricsEvent { + return &RegistryMetricsEvent{ + Name: Reg, + Succ: succ, + Start: start, + End: time.Now(), + } +} + +// NewSubscribeEvent for subscribe metrics +func NewSubscribeEvent(succ bool) metrics.MetricsEvent { + return &RegistryMetricsEvent{ + Name: Sub, + Succ: succ, + } +} + +// NewNotifyEvent for notify metrics +func NewNotifyEvent(start time.Time) metrics.MetricsEvent { + return &RegistryMetricsEvent{ + Name: Notify, + Start: start, + End: time.Now(), + } +} + +// NewDirectoryEvent for directory metrics +func NewDirectoryEvent(dirTyp string) metrics.MetricsEvent { + return &RegistryMetricsEvent{ + Name: Directory, + Attachment: map[string]string{"DirTyp": dirTyp}, + } +} + +// NewServerRegisterEvent for server register metrics +func NewServerRegisterEvent(succ bool, start time.Time) metrics.MetricsEvent { + return &RegistryMetricsEvent{ + Name: ServerReg, + Succ: succ, + Start: start, + End: time.Now(), + } +} + +// NewServerSubscribeEvent for server subscribe metrics +func NewServerSubscribeEvent(succ bool) metrics.MetricsEvent { + return &RegistryMetricsEvent{ + Name: ServerSub, + Succ: succ, + } +} diff --git a/metrics/registry/metric_set.go b/metrics/registry/metric_set.go new file mode 100644 index 0000000000..02408bd153 --- /dev/null +++ b/metrics/registry/metric_set.go @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package registry + +import ( + "dubbo.apache.org/dubbo-go/v3/metrics" +) + +type MetricName int8 + +const ( + Reg MetricName = iota + Sub + Notify + Directory + ServerReg + ServerSub +) + +const ( + NumAllInc = "numAllInc" + NumAllDec = "numAllDec" + NumDisableTotal = "numDisableTotal" + NumToReconnectTotal = "numToReconnectTotal" + NumValidTotal = "numValidTotal" +) + +const ( + dubboRegNum = "dubbo_registry_register_metrics_num" + dubboRegRt = "dubbo_registry_register_metrics_rt" + dubboRegServerNum = "dubbo_registry_register_server_metrics_num" + dubboRegServerRt = "dubbo_registry_register_server_metrics_rt" + dubboNotifyRt = "dubbo_notify_rt" +) + +var ( + // register metrics key + RegisterMetricRequests = metrics.NewMetricKey("dubbo_registry_register_requests_total", "Total Register Requests") + RegisterMetricRequestsSucceed = metrics.NewMetricKey("dubbo_registry_register_requests_succeed_total", "Succeed Register Requests") + RegisterMetricRequestsFailed = metrics.NewMetricKey("dubbo_registry_register_requests_failed_total", "Failed Register Requests") + + // subscribe metrics key + SubscribeMetricNum = metrics.NewMetricKey("dubbo_registry_subscribe_num_total", "Total Subscribe Num") + SubscribeMetricNumSucceed = metrics.NewMetricKey("dubbo_registry_subscribe_num_succeed_total", "Succeed Subscribe Num") + SubscribeMetricNumFailed = metrics.NewMetricKey("dubbo_registry_subscribe_num_failed_total", "Failed Subscribe Num") + + // directory metrics key + DirectoryMetricNumAll = metrics.NewMetricKey("dubbo_registry_directory_num_all", "All Directory Urls") + DirectoryMetricNumValid = metrics.NewMetricKey("dubbo_registry_directory_num_valid_total", "Valid Directory Urls") + DirectoryMetricNumToReconnect = metrics.NewMetricKey("dubbo_registry_directory_num_to_reconnect_total", "ToReconnect Directory Urls") + DirectoryMetricNumDisable = metrics.NewMetricKey("dubbo_registry_directory_num_disable_total", "Disable Directory Urls") + + NotifyMetricRequests = metrics.NewMetricKey("dubbo_registry_notify_requests_total", "Total Notify Requests") + NotifyMetricNumLast = metrics.NewMetricKey("dubbo_registry_notify_num_last", "Last Notify Nums") + + // register service metrics key + ServiceRegisterMetricRequests = metrics.NewMetricKey("dubbo_registry_register_service_total", "Total Service-Level Register Requests") + ServiceRegisterMetricRequestsSucceed = metrics.NewMetricKey("dubbo_registry_register_service_succeed_total", "Succeed Service-Level Register Requests") + ServiceRegisterMetricRequestsFailed = metrics.NewMetricKey("dubbo_registry_register_service_failed_total", "Failed Service-Level Register Requests") + + // subscribe metrics key + ServiceSubscribeMetricNum = metrics.NewMetricKey("dubbo_registry_subscribe_service_num_total", "Total Service-Level Subscribe Num") + ServiceSubscribeMetricNumSucceed = metrics.NewMetricKey("dubbo_registry_subscribe_service_num_succeed_total", "Succeed Service-Level Num") + ServiceSubscribeMetricNumFailed = metrics.NewMetricKey("dubbo_registry_subscribe_service_num_failed_total", "Failed Service-Level Num") + + // register metrics server rt key + RegisterServiceRtMillisecondsAvg = metrics.NewMetricKey("dubbo_register_service_rt_milliseconds_avg", "Average Service Register Time") + RegisterServiceRtMillisecondsLast = metrics.NewMetricKey("dubbo_register_service_rt_milliseconds_last", "Last Service Register Time") + RegisterServiceRtMillisecondsMax = metrics.NewMetricKey("dubbo_register_service_rt_milliseconds_max", "Max Service Register Time") + RegisterServiceRtMillisecondsMin = metrics.NewMetricKey("dubbo_register_service_rt_milliseconds_min", "Min Service Register Time") + RegisterServiceRtMillisecondsSum = metrics.NewMetricKey("dubbo_register_service_rt_milliseconds_sum", "Sum Service Register Time") + + // register metrics rt key + RegisterRtMillisecondsMax = metrics.NewMetricKey("dubbo_register_rt_milliseconds_max", "Max Response Time") + RegisterRtMillisecondsLast = metrics.NewMetricKey("dubbo_register_rt_milliseconds_last", "Last Response Time") + RegisterRtMillisecondsAvg = metrics.NewMetricKey("dubbo_register_rt_milliseconds_avg", "Average Response Time") + RegisterRtMillisecondsSum = metrics.NewMetricKey("dubbo_register_rt_milliseconds_sum", "Sum Response Time") + RegisterRtMillisecondsMin = metrics.NewMetricKey("dubbo_register_rt_milliseconds_min", "Min Response Time") + + // notify rt key + NotifyRtMillisecondsAvg = metrics.NewMetricKey("dubbo_notify_rt_milliseconds_avg", "Average Notify Time") + NotifyRtMillisecondsLast = metrics.NewMetricKey("dubbo_notify_rt_milliseconds_last", "Last Notify Time") + NotifyRtMillisecondsMax = metrics.NewMetricKey("dubbo_notify_rt_milliseconds_max", "Max Notify Time") + NotifyRtMillisecondsMin = metrics.NewMetricKey("dubbo_notify_rt_milliseconds_min", "Min Notify Time") + NotifyRtMillisecondsSum = metrics.NewMetricKey("dubbo_notify_rt_milliseconds_sum", "Sum Notify Time") +) diff --git a/metrics/reporter.go b/metrics/reporter.go index 604d412bea..bf9693bb14 100644 --- a/metrics/reporter.go +++ b/metrics/reporter.go @@ -36,6 +36,7 @@ type ReporterConfig struct { Path string PushGatewayAddress string SummaryMaxAge int64 + Protocol string // MetricsRegistry 扩展配置 ,如:prometheus } type ReportMode string diff --git a/registry/base_registry.go b/registry/base_registry.go index 2e0bba2d4d..8436865c61 100644 --- a/registry/base_registry.go +++ b/registry/base_registry.go @@ -36,6 +36,8 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/metrics" + metricsRegistry "dubbo.apache.org/dubbo-go/v3/metrics/registry" ) const ( @@ -131,12 +133,14 @@ func (r *BaseRegistry) Destroy() { // Register implement interface registry to register func (r *BaseRegistry) Register(url *common.URL) error { + start := time.Now() // todo bug when provider、consumer simultaneous initialization if _, ok := r.registered.Load(url.Key()); ok { return perrors.Errorf("Service {%s} has been registered", url.Key()) } err := r.register(url) + defer metrics.Publish(metricsRegistry.NewRegisterEvent(err == nil, start)) if err == nil { r.registered.Store(url.Key(), url) @@ -152,8 +156,8 @@ func (r *BaseRegistry) UnRegister(url *common.URL) error { if _, ok := r.registered.Load(url.Key()); !ok { return perrors.Errorf("Service {%s} has not registered", url.Key()) } - err := r.unregister(url) + metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil)) if err == nil { r.registered.Delete(url.Key()) } else { diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 7534e5fc24..39cd5c7361 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -22,6 +22,7 @@ import ( "net/url" "os" "sync" + "time" ) import ( @@ -41,6 +42,8 @@ import ( "dubbo.apache.org/dubbo-go/v3/config" "dubbo.apache.org/dubbo-go/v3/config_center" _ "dubbo.apache.org/dubbo-go/v3/config_center/configurator" + "dubbo.apache.org/dubbo-go/v3/metrics" + metricsRegistry "dubbo.apache.org/dubbo-go/v3/metrics/registry" "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper" "dubbo.apache.org/dubbo-go/v3/registry" @@ -99,7 +102,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (director if err := dir.registry.LoadSubscribeInstances(url.SubURL, dir); err != nil { return nil, err } - + metrics.Publish(metricsRegistry.NewDirectoryEvent(metricsRegistry.NumAllInc)) return dir, nil } @@ -117,7 +120,9 @@ func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) { if event == nil { return } + start := time.Now() dir.refreshInvokers(event) + metrics.Publish(metricsRegistry.NewNotifyEvent(start)) } // NotifyAll notify the events that are complete Service Event List. @@ -339,6 +344,7 @@ func (dir *RegistryDirectory) uncacheInvokerWithClusterID(clusterID string) []pr // uncacheInvoker will return abandoned Invoker, if no Invoker to be abandoned, return nil func (dir *RegistryDirectory) uncacheInvoker(event *registry.ServiceEvent) []protocol.Invoker { + defer metrics.Publish(metricsRegistry.NewDirectoryEvent(metricsRegistry.NumDisableTotal)) if clusterID := event.Service.GetParam(constant.MeshClusterIDKey, ""); event.Service.Location == constant.MeshAnyAddrMatcher && clusterID != "" { dir.uncacheInvokerWithClusterID(clusterID) } @@ -392,6 +398,7 @@ func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL, event *registry logger.Warnf("service will be added in cache invokers fail, result is null, invokers url is %+v", newUrl.String()) } } else { + metrics.Publish(metricsRegistry.NewDirectoryEvent(metricsRegistry.NumValidTotal)) // if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy // the old invoker. if common.GetCompareURLEqualFunc()(newUrl, cacheInvoker.(protocol.Invoker).GetURL()) { @@ -433,7 +440,7 @@ func (dir *RegistryDirectory) IsAvailable() bool { return true } } - + metrics.Publish(metricsRegistry.NewDirectoryEvent(metricsRegistry.NumToReconnectTotal)) return false } @@ -457,6 +464,7 @@ func (dir *RegistryDirectory) Destroy() { ivk.Destroy() } }) + metrics.Publish(metricsRegistry.NewDirectoryEvent(metricsRegistry.NumAllDec)) } func (dir *RegistryDirectory) overrideUrl(targetUrl *common.URL) { diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index 911d26ba5c..bfb45754c8 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -38,6 +38,8 @@ import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/metrics" + metricsRegistry "dubbo.apache.org/dubbo-go/v3/metrics/registry" "dubbo.apache.org/dubbo-go/v3/registry" "dubbo.apache.org/dubbo-go/v3/remoting" "dubbo.apache.org/dubbo-go/v3/remoting/nacos" @@ -112,11 +114,13 @@ func createRegisterParam(url *common.URL, serviceName string, groupName string) // Register will register the service @url to its nacos registry center. func (nr *nacosRegistry) Register(url *common.URL) error { + start := time.Now() serviceName := getServiceName(url) groupName := nr.URL.GetParam(constant.NacosGroupKey, defaultGroup) param := createRegisterParam(url, serviceName, groupName) logger.Infof("[Nacos Registry] Registry instance with param = %+v", param) isRegistry, err := nr.namingClient.Client().RegisterInstance(param) + metrics.Publish(metricsRegistry.NewRegisterEvent(err == nil && isRegistry, start)) if err != nil { return err } @@ -173,6 +177,7 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti } listener, err := nr.subscribe(url) + defer metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil)) if err != nil { if !nr.IsAvailable() { logger.Warnf("event listener game over.") diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 0b719eeaf5..db0986df50 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -21,6 +21,7 @@ import ( "bytes" "strings" "sync" + "time" ) import ( @@ -39,6 +40,9 @@ import ( "dubbo.apache.org/dubbo-go/v3/metadata/mapping" "dubbo.apache.org/dubbo-go/v3/metadata/service" "dubbo.apache.org/dubbo-go/v3/metadata/service/local" + "dubbo.apache.org/dubbo-go/v3/metrics" + metricMetadata "dubbo.apache.org/dubbo-go/v3/metrics/metadata" + metricsRegistry "dubbo.apache.org/dubbo-go/v3/metrics/registry" "dubbo.apache.org/dubbo-go/v3/registry" _ "dubbo.apache.org/dubbo-go/v3/registry/event" "dubbo.apache.org/dubbo-go/v3/registry/servicediscovery/synthesizer" @@ -175,7 +179,10 @@ func (s *ServiceDiscoveryRegistry) Register(url *common.URL) error { return nil } common.HandleRegisterIPAndPort(url) + + start := time.Now() ok, err := s.metaDataService.ExportURL(url) + metrics.Publish(metricsRegistry.NewServerRegisterEvent(ok && err == nil, start)) if err != nil { logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error()) @@ -244,7 +251,13 @@ func (s *ServiceDiscoveryRegistry) SubscribeURL(url *common.URL, notify registry } s.serviceListeners[serviceNamesKey] = listener listener.AddListenerAndNotify(protocolServiceKey, notify) + event := metricMetadata.NewMetadataMetricTimeEvent(metricMetadata.SubscribeServiceRt) err = s.serviceDiscovery.AddListener(listener) + event.Succ = err != nil + event.End = time.Now() + event.Attachment[constant.InterfaceKey] = url.Interface() + metrics.Publish(event) + metrics.Publish(metricsRegistry.NewServerSubscribeEvent(err == nil)) if err != nil { logger.Errorf("add instance listener catch error,url:%s err:%s", url.String(), err.Error()) }