Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: rewrite RPC metrics into the event pattern #2392

Merged
merged 14 commits into from
Aug 30, 2023
Merged
1 change: 1 addition & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ const (

// metrics key
const (
MetricsRpc = "dubbo.metrics.rpc"
MetricsRegistry = "dubbo.metrics.registry"
MetricsMetadata = "dubbo.metrics.metadata"
MetricApp = "dubbo.metrics.app"
Expand Down
7 changes: 2 additions & 5 deletions common/extension/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package extension

import (
"context"
"testing"
"time"
)

import (
Expand All @@ -29,7 +27,6 @@ import (

import (
"dubbo.apache.org/dubbo-go/v3/metrics"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

func TestGetMetricReporter(t *testing.T) {
Expand All @@ -45,8 +42,8 @@ func TestGetMetricReporter(t *testing.T) {
type mockReporter struct{}

// implement the interface of Reporter
func (m *mockReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
func (m *mockReporter) StartServer(config *metrics.ReporterConfig) {
}

func (m *mockReporter) ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) {
func (m *mockReporter) ShutdownServer() {
}
42 changes: 14 additions & 28 deletions filter/metrics/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* limitations under the License.
*/

// Package metrics provides metrics collection filter.
package metrics

import (
Expand All @@ -28,56 +27,43 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/metrics"
"dubbo.apache.org/dubbo-go/v3/metrics/rpc"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

// must initialize before using the filter and after loading configuration
var metricFilterInstance *Filter
var metricFilterInstance *metricsFilter

func init() {
extension.SetFilter(constant.MetricsFilterKey, newFilter)
}

// Filter will calculate the invocation's duration and the report to the reporters
// more info please take a look at dubbo-samples projects
type Filter struct {
reporters []metrics.Reporter
}
// metricsFilter will report RPC metrics to the metrics bus and implements the filter.Filter interface
type metricsFilter struct{}

// Invoke collect the duration of invocation and then report the duration by using goroutine
func (p *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
go func() {
for _, reporter := range p.reporters {
reporter.ReportBeforeInvocation(ctx, invoker, invocation)
}
}()
// Invoke publish the BeforeInvokeEvent and AfterInvokeEvent to metrics bus
func (mf *metricsFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
metrics.Publish(rpc.NewBeforeInvokeEvent(invoker, invocation))
start := time.Now()
res := invoker.Invoke(ctx, invocation)
end := time.Now()
duration := end.Sub(start)
go func() {
for _, reporter := range p.reporters {
reporter.ReportAfterInvocation(ctx, invoker, invocation, duration, res)
}
}()
metrics.Publish(rpc.NewAfterInvokeEvent(invoker, invocation, duration, res))
return res
}

// OnResponse do nothing and return the result
func (p *Filter) OnResponse(ctx context.Context, res protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
func (mf *metricsFilter) OnResponse(ctx context.Context, res protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return res
}

// newFilter the Filter is singleton.
// it's lazy initialization
// make sure that the configuration had been loaded before invoking this method.
// newFilter creates a new metricsFilter instance.
//
// It's lazy initialization,
// and make sure that the configuration had been loaded before invoking this method.
func newFilter() filter.Filter {
if metricFilterInstance == nil {
reporters := make([]metrics.Reporter, 0, 1)
reporters = append(reporters, extension.GetMetricReporter("prometheus", metrics.NewReporterConfig()))
metricFilterInstance = &Filter{
reporters: reporters,
}
metricFilterInstance = &metricsFilter{}
}
return metricFilterInstance
}
43 changes: 9 additions & 34 deletions filter/metrics/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,25 @@ package metrics

import (
"context"
"sync"
"testing"
"time"
)

import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/metrics"
_ "dubbo.apache.org/dubbo-go/v3/metrics/prometheus"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)

func TestMetricsFilterInvoke(t *testing.T) {
// prepare the mock reporter
mk := &mockReporter{}
extension.SetMetricReporter("mock", func(config *metrics.ReporterConfig) metrics.Reporter {
return mk
})

instance := newFilter()
mockChan := make(chan metrics.MetricsEvent, 10)
defer close(mockChan)
metrics.Subscribe(constant.MetricsRpc, mockChan)

url, _ := common.NewURL(
"dubbo://:20000/UserProvider?app.version=0.0.1&application=BDTService&bean.name=UserProvider" +
Expand All @@ -57,31 +49,14 @@ func TestMetricsFilterInvoke(t *testing.T) {

attach := make(map[string]interface{}, 10)
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach)

ctx := context.Background()

mk.On("Report", ctx, invoker, inv).Return(true, nil)

mk.wg.Add(1)
result := instance.Invoke(ctx, invoker, inv)
filter := newFilter()
result := filter.Invoke(ctx, invoker, inv)
assert.NotNil(t, result)
mk.AssertNotCalled(t, "Report", 1)
// it will do nothing
result = instance.OnResponse(ctx, nil, invoker, inv)
result = filter.OnResponse(ctx, nil, invoker, inv)
assert.Nil(t, result)
}

type mockReporter struct {
mock.Mock
wg sync.WaitGroup
}

func (m *mockReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
m.Called(ctx, invoker, invocation)
m.wg.Done()
}

func (m *mockReporter) ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) {
m.Called(ctx, invoker, invocation)
m.wg.Done()
assert.Equal(t, 2, len(mockChan))
assert.Equal(t, constant.MetricsRpc, (<-mockChan).Type())
}
Loading
Loading