Skip to content

Commit

Permalink
Introduce metrics bus (#2351)
Browse files Browse the repository at this point in the history
  • Loading branch information
FinalT committed Jul 7, 2023
1 parent 2a4625d commit 12bcf73
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 0 deletions.
5 changes: 5 additions & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,3 +404,8 @@ const (
LoggerFileLocalTimeKey = "logger.file.local-time"
LoggerFileCompressKey = "logger.file.compress"
)

// metrics key
const (
MetricsRegistry = "dubbo.metrics.registry"
)
65 changes: 65 additions & 0 deletions metrics/bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package metrics

import (
"sync"
)

// eventListener is a struct that encapsulates the listener map and provides thread-safe access to it.
type eventListener struct {
mu sync.RWMutex
listener map[string]chan MetricsEvent
}

var listener = &eventListener{
listener: make(map[string]chan MetricsEvent),
}

// Publish publishes an event to all subscribers of the same type.
func Publish(event MetricsEvent) {
listener.mu.RLock()
defer listener.mu.RUnlock()

if ch, ok := listener.listener[event.Type()]; ok {
select {
case ch <- event:
default:
// If the channel is full, drop the event to avoid blocking.
}
}
}

// Subscribe subscribes to events of the given type.
func Subscribe(typ string, ch chan MetricsEvent) {
listener.mu.Lock()
defer listener.mu.Unlock()

listener.listener[typ] = ch
}

// Unsubscribe unsubscribes from events of the given type.
func Unsubscribe(typ string) {
listener.mu.Lock()
defer listener.mu.Unlock()

if ch, ok := listener.listener[typ]; ok {
close(ch)
delete(listener.listener, typ)
}
}
51 changes: 51 additions & 0 deletions metrics/bus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package metrics

import (
"github.com/stretchr/testify/assert"
"testing"
)

var mockChan = make(chan MetricsEvent, 16)

type MockEvent struct {
}

func (m MockEvent) Type() string {
return "dubbo.metrics.mock"
}

func NewEmptyMockEvent() *MockEvent {
return &MockEvent{}
}

func init() {
Subscribe("dubbo.metrics.mock", mockChan)
Publish(NewEmptyMockEvent())
}

func TestBusPublish(t *testing.T) {
t.Run("testBusPublish", func(t *testing.T) {
event := <-mockChan

if event, ok := event.(MockEvent); ok {
assert.Equal(t, event, NewEmptyMockEvent())
}
})
}
23 changes: 23 additions & 0 deletions metrics/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package metrics

// MetricsEvent represents an event that can be published and subscribed to.
type MetricsEvent interface {
Type() string
}
51 changes: 51 additions & 0 deletions metrics/registry/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package registry

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
)

type RegistryMetricsEvent struct {
//Contains some information, such as time, success, failure

// PostType MetricKey
// FinishType MetricKey
// ErrorType MetricKey
// Level MetricsLevel

// Time
// Start time.time
// End time.time
}

func (r RegistryMetricsEvent) Type() string {
return constant.MetricsRegistry
}

// NewRegistryEvent

// NewSubscribeEvent

// NewNotifyEvent

// NewDirectoryEvent

// NewServerRegistryEvent

// NewServerSubscribeEvent
98 changes: 98 additions & 0 deletions metrics/registry/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package registry

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/metrics"
)

var (
//regRegistry metrics.MetricRegistry
registryChan = make(chan metrics.MetricsEvent, 128)
handlers []func(event *RegistryMetricsEvent)
)

// func Collector(m metrics.MetricRegistry, r *metrics.ReporterConfig) {
// regRegistry = m
//
// // init related metrics
// }
func init() {
AddHandler(regHandler, subHandler, notifyHandler, directoryHandler, serverSubHandler, serverRegHandler)
//metrics.AddCollector(Collector)
metrics.Subscribe(constant.MetricsRegistry, registryChan)
go receiveEvent()
}

func receiveEvent() {
for event := range registryChan {
registryEvent, ok := event.(*RegistryMetricsEvent)
if !ok {
continue
}
for _, handler := range handlers {
go func(handler func(event *RegistryMetricsEvent)) {
handler(registryEvent)
}(handler)
}
}
}

func AddHandler(handler ...func(event *RegistryMetricsEvent)) {
for _, h := range handler {
handlers = append(handlers, h)
}
}

func regHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics

// Save metrics to the MetricRegistry
}

func subHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics

// Save metrics to the MetricRegistry

}

func notifyHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics

// Save metrics to the MetricRegistry
}

func directoryHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics

// Save metrics to the MetricRegistry
}

func serverRegHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics

// Save metrics to the MetricRegistry
}

func serverSubHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics

// Save metrics to the MetricRegistry
}

0 comments on commit 12bcf73

Please sign in to comment.