From a5ae5c640826b72de4e638749f4a0f72fd8d9f61 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 7 Jun 2023 15:52:01 -0700 Subject: [PATCH] weightedroundrobin: cherry-pick 2 commits from master (#6360) --- balancer/weightedroundrobin/balancer.go | 8 +- balancer/weightedroundrobin/balancer_test.go | 109 +++++++----- examples/go.mod | 2 +- examples/go.sum | 4 +- gcp/observability/go.sum | 2 +- go.mod | 2 +- go.sum | 4 +- interop/observability/go.mod | 2 +- interop/observability/go.sum | 4 +- orca/call_metrics_test.go | 32 ++-- orca/orca_test.go | 14 +- orca/producer_test.go | 28 +-- orca/server_metrics.go | 127 +++++++++++--- orca/server_metrics_test.go | 175 +++++++++++++++++++ orca/service_test.go | 13 +- stats/opencensus/go.sum | 2 +- 16 files changed, 408 insertions(+), 120 deletions(-) create mode 100644 orca/server_metrics_test.go diff --git a/balancer/weightedroundrobin/balancer.go b/balancer/weightedroundrobin/balancer.go index e957b91b1966..a164d1bedd7e 100644 --- a/balancer/weightedroundrobin/balancer.go +++ b/balancer/weightedroundrobin/balancer.go @@ -419,7 +419,11 @@ func (w *weightedSubConn) OnLoadReport(load *v3orcapb.OrcaLoadReport) { w.logger.Infof("Received load report for subchannel %v: %v", w.SubConn, load) } // Update weights of this subchannel according to the reported load - if load.CpuUtilization == 0 || load.RpsFractional == 0 { + utilization := load.ApplicationUtilization + if utilization == 0 { + utilization = load.CpuUtilization + } + if utilization == 0 || load.RpsFractional == 0 { if w.logger.V(2) { w.logger.Infof("Ignoring empty load report for subchannel %v", w.SubConn) } @@ -430,7 +434,7 @@ func (w *weightedSubConn) OnLoadReport(load *v3orcapb.OrcaLoadReport) { defer w.mu.Unlock() errorRate := load.Eps / load.RpsFractional - w.weightVal = load.RpsFractional / (load.CpuUtilization + errorRate*w.cfg.ErrorUtilizationPenalty) + w.weightVal = load.RpsFractional / (utilization + errorRate*w.cfg.ErrorUtilizationPenalty) if w.logger.V(2) { w.logger.Infof("New weight for subchannel %v: %v", w.SubConn, w.weightVal) } diff --git a/balancer/weightedroundrobin/balancer_test.go b/balancer/weightedroundrobin/balancer_test.go index a0a84a7f057b..1d67bcf1f008 100644 --- a/balancer/weightedroundrobin/balancer_test.go +++ b/balancer/weightedroundrobin/balancer_test.go @@ -110,7 +110,7 @@ func startServer(t *testing.T, r reportType) *testServer { if r := orca.CallMetricsRecorderFromContext(ctx); r != nil { // Copy metrics from what the test set in cmr into r. sm := cmr.(orca.ServerMetricsProvider).ServerMetrics() - r.SetCPUUtilization(sm.CPUUtilization) + r.SetApplicationUtilization(sm.AppUtilization) r.SetQPS(sm.QPS) r.SetEPS(sm.EPS) } @@ -230,10 +230,10 @@ func (s) TestBalancer_TwoAddresses_ReportingEnabledPerCall(t *testing.T) { // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed // disproportionately to srv2 (10:1). srv1.callMetrics.SetQPS(10.0) - srv1.callMetrics.SetCPUUtilization(1.0) + srv1.callMetrics.SetApplicationUtilization(1.0) srv2.callMetrics.SetQPS(10.0) - srv2.callMetrics.SetCPUUtilization(.1) + srv2.callMetrics.SetApplicationUtilization(.1) sc := svcConfig(t, perCallConfig) if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { @@ -253,33 +253,58 @@ func (s) TestBalancer_TwoAddresses_ReportingEnabledPerCall(t *testing.T) { // Tests two addresses with OOB ORCA reporting enabled. Checks the backends // are called in the appropriate ratios. func (s) TestBalancer_TwoAddresses_ReportingEnabledOOB(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() + testCases := []struct { + name string + utilSetter func(orca.ServerMetricsRecorder, float64) + }{{ + name: "application_utilization", + utilSetter: func(smr orca.ServerMetricsRecorder, val float64) { + smr.SetApplicationUtilization(val) + }, + }, { + name: "cpu_utilization", + utilSetter: func(smr orca.ServerMetricsRecorder, val float64) { + smr.SetCPUUtilization(val) + }, + }, { + name: "application over cpu", + utilSetter: func(smr orca.ServerMetricsRecorder, val float64) { + smr.SetApplicationUtilization(val) + smr.SetCPUUtilization(2.0) // ignored because ApplicationUtilization is set + }, + }} - srv1 := startServer(t, reportOOB) - srv2 := startServer(t, reportOOB) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() - // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed - // disproportionately to srv2 (10:1). - srv1.oobMetrics.SetQPS(10.0) - srv1.oobMetrics.SetCPUUtilization(1.0) + srv1 := startServer(t, reportOOB) + srv2 := startServer(t, reportOOB) - srv2.oobMetrics.SetQPS(10.0) - srv2.oobMetrics.SetCPUUtilization(.1) + // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed + // disproportionately to srv2 (10:1). + srv1.oobMetrics.SetQPS(10.0) + tc.utilSetter(srv1.oobMetrics, 1.0) - sc := svcConfig(t, oobConfig) - if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { - t.Fatalf("Error starting client: %v", err) - } - addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}} - srv1.R.UpdateState(resolver.State{Addresses: addrs}) + srv2.oobMetrics.SetQPS(10.0) + tc.utilSetter(srv2.oobMetrics, 0.1) - // Call each backend once to ensure the weights have been received. - ensureReached(ctx, t, srv1.Client, 2) + sc := svcConfig(t, oobConfig) + if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { + t.Fatalf("Error starting client: %v", err) + } + addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}} + srv1.R.UpdateState(resolver.State{Addresses: addrs}) - // Wait for the weight update period to allow the new weights to be processed. - time.Sleep(weightUpdatePeriod) - checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}) + // Call each backend once to ensure the weights have been received. + ensureReached(ctx, t, srv1.Client, 2) + + // Wait for the weight update period to allow the new weights to be processed. + time.Sleep(weightUpdatePeriod) + checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}) + }) + } } // Tests two addresses with OOB ORCA reporting enabled, where the reports @@ -295,10 +320,10 @@ func (s) TestBalancer_TwoAddresses_UpdateLoads(t *testing.T) { // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed // disproportionately to srv2 (10:1). srv1.oobMetrics.SetQPS(10.0) - srv1.oobMetrics.SetCPUUtilization(1.0) + srv1.oobMetrics.SetApplicationUtilization(1.0) srv2.oobMetrics.SetQPS(10.0) - srv2.oobMetrics.SetCPUUtilization(.1) + srv2.oobMetrics.SetApplicationUtilization(.1) sc := svcConfig(t, oobConfig) if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { @@ -317,10 +342,10 @@ func (s) TestBalancer_TwoAddresses_UpdateLoads(t *testing.T) { // Update the loads so srv2 is loaded and srv1 is not; ensure RPCs are // routed disproportionately to srv1. srv1.oobMetrics.SetQPS(10.0) - srv1.oobMetrics.SetCPUUtilization(.1) + srv1.oobMetrics.SetApplicationUtilization(.1) srv2.oobMetrics.SetQPS(10.0) - srv2.oobMetrics.SetCPUUtilization(1.0) + srv2.oobMetrics.SetApplicationUtilization(1.0) // Wait for the weight update period to allow the new weights to be processed. time.Sleep(weightUpdatePeriod + oobReportingInterval) @@ -340,19 +365,19 @@ func (s) TestBalancer_TwoAddresses_OOBThenPerCall(t *testing.T) { // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed // disproportionately to srv2 (10:1). srv1.oobMetrics.SetQPS(10.0) - srv1.oobMetrics.SetCPUUtilization(1.0) + srv1.oobMetrics.SetApplicationUtilization(1.0) srv2.oobMetrics.SetQPS(10.0) - srv2.oobMetrics.SetCPUUtilization(.1) + srv2.oobMetrics.SetApplicationUtilization(.1) // For per-call metrics (not used initially), srv2 reports that it is // loaded and srv1 reports low load. After confirming OOB works, switch to // per-call and confirm the new routing weights are applied. srv1.callMetrics.SetQPS(10.0) - srv1.callMetrics.SetCPUUtilization(.1) + srv1.callMetrics.SetApplicationUtilization(.1) srv2.callMetrics.SetQPS(10.0) - srv2.callMetrics.SetCPUUtilization(1.0) + srv2.callMetrics.SetApplicationUtilization(1.0) sc := svcConfig(t, oobConfig) if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { @@ -396,13 +421,13 @@ func (s) TestBalancer_TwoAddresses_ErrorPenalty(t *testing.T) { // to 0.9 which will cause the weights to be equal and RPCs to be routed // 50/50. srv1.oobMetrics.SetQPS(10.0) - srv1.oobMetrics.SetCPUUtilization(1.0) + srv1.oobMetrics.SetApplicationUtilization(1.0) srv1.oobMetrics.SetEPS(0) // srv1 weight before: 10.0 / 1.0 = 10.0 // srv1 weight after: 10.0 / 1.0 = 10.0 srv2.oobMetrics.SetQPS(10.0) - srv2.oobMetrics.SetCPUUtilization(.1) + srv2.oobMetrics.SetApplicationUtilization(.1) srv2.oobMetrics.SetEPS(10.0) // srv2 weight before: 10.0 / 0.1 = 100.0 // srv2 weight after: 10.0 / 1.0 = 10.0 @@ -476,10 +501,10 @@ func (s) TestBalancer_TwoAddresses_BlackoutPeriod(t *testing.T) { // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed // disproportionately to srv2 (10:1). srv1.oobMetrics.SetQPS(10.0) - srv1.oobMetrics.SetCPUUtilization(1.0) + srv1.oobMetrics.SetApplicationUtilization(1.0) srv2.oobMetrics.SetQPS(10.0) - srv2.oobMetrics.SetCPUUtilization(.1) + srv2.oobMetrics.SetApplicationUtilization(.1) cfg := oobConfig cfg.BlackoutPeriod = tc.blackoutPeriodCfg @@ -544,10 +569,10 @@ func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) { // is 1 minute but the weights expire in 1 second, routing will go to 50/50 // after the weights expire. srv1.oobMetrics.SetQPS(10.0) - srv1.oobMetrics.SetCPUUtilization(1.0) + srv1.oobMetrics.SetApplicationUtilization(1.0) srv2.oobMetrics.SetQPS(10.0) - srv2.oobMetrics.SetCPUUtilization(.1) + srv2.oobMetrics.SetApplicationUtilization(.1) cfg := oobConfig cfg.OOBReportingPeriod = stringp("60s") @@ -594,16 +619,16 @@ func (s) TestBalancer_AddressesChanging(t *testing.T) { // srv1: weight 10 srv1.oobMetrics.SetQPS(10.0) - srv1.oobMetrics.SetCPUUtilization(1.0) + srv1.oobMetrics.SetApplicationUtilization(1.0) // srv2: weight 100 srv2.oobMetrics.SetQPS(10.0) - srv2.oobMetrics.SetCPUUtilization(.1) + srv2.oobMetrics.SetApplicationUtilization(.1) // srv3: weight 20 srv3.oobMetrics.SetQPS(20.0) - srv3.oobMetrics.SetCPUUtilization(1.0) + srv3.oobMetrics.SetApplicationUtilization(1.0) // srv4: weight 200 srv4.oobMetrics.SetQPS(20.0) - srv4.oobMetrics.SetCPUUtilization(.1) + srv4.oobMetrics.SetApplicationUtilization(.1) sc := svcConfig(t, oobConfig) if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { diff --git a/examples/go.mod b/examples/go.mod index c631aae7da39..02954425f2fb 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -3,7 +3,7 @@ module google.golang.org/grpc/examples go 1.17 require ( - github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195 + github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 github.com/golang/protobuf v1.5.3 golang.org/x/oauth2 v0.7.0 google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 diff --git a/examples/go.sum b/examples/go.sum index d257912f6717..3d05e57fc6bc 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -627,8 +627,8 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195 h1:58f1tJ1ra+zFINPlwLWvQsR9CzAKt2e+EWV2yX9oXQ4= -github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/gcp/observability/go.sum b/gcp/observability/go.sum index 4e70d82da940..d2f71f39e42b 100644 --- a/gcp/observability/go.sum +++ b/gcp/observability/go.sum @@ -638,7 +638,7 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/go.mod b/go.mod index 088c703575da..f61c829a5bcc 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.17 require ( github.com/cespare/xxhash/v2 v2.2.0 github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe - github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195 + github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 github.com/envoyproxy/go-control-plane v0.11.1-0.20230524094728-9239064ad72f github.com/golang/glog v1.1.0 github.com/golang/protobuf v1.5.3 diff --git a/go.sum b/go.sum index 4e7adc822040..492dcc80e284 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe h1:QQ3GSy+MqSHxm/d8nCtnAiZdYFd45cYZPs8vOOIYKfk= github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195 h1:58f1tJ1ra+zFINPlwLWvQsR9CzAKt2e+EWV2yX9oXQ4= -github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.11.1-0.20230524094728-9239064ad72f h1:7T++XKzy4xg7PKy+bM+Sa9/oe1OC88yz2hXQUISoXfA= diff --git a/interop/observability/go.mod b/interop/observability/go.mod index 784ea504d1a0..a0f28b7976fb 100644 --- a/interop/observability/go.mod +++ b/interop/observability/go.mod @@ -18,7 +18,7 @@ require ( contrib.go.opencensus.io/exporter/stackdriver v0.13.12 // indirect github.com/aws/aws-sdk-go v1.44.162 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect - github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195 // indirect + github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 // indirect github.com/envoyproxy/protoc-gen-validate v0.10.1 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect diff --git a/interop/observability/go.sum b/interop/observability/go.sum index b21857e14a68..15c59443ba83 100644 --- a/interop/observability/go.sum +++ b/interop/observability/go.sum @@ -638,8 +638,8 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195 h1:58f1tJ1ra+zFINPlwLWvQsR9CzAKt2e+EWV2yX9oXQ4= -github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/orca/call_metrics_test.go b/orca/call_metrics_test.go index 4374b593b9f1..b0e6af646c91 100644 --- a/orca/call_metrics_test.go +++ b/orca/call_metrics_test.go @@ -23,13 +23,11 @@ import ( "errors" "io" "testing" - "time" "github.com/golang/protobuf/proto" "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/metadata" @@ -41,16 +39,6 @@ import ( testpb "google.golang.org/grpc/interop/grpc_testing" ) -type s struct { - grpctest.Tester -} - -func Test(t *testing.T) { - grpctest.RunSubTests(t, s{}) -} - -const defaultTestTimeout = 5 * time.Second - // TestE2ECallMetricsUnary tests the injection of custom backend metrics from // the server application for a unary RPC, and verifies that expected load // reports are received at the client. @@ -65,9 +53,9 @@ func (s) TestE2ECallMetricsUnary(t *testing.T) { injectMetrics: true, wantProto: &v3orcapb.OrcaLoadReport{ CpuUtilization: 1.0, - MemUtilization: 50.0, + MemUtilization: 0.9, RequestCost: map[string]float64{"queryCost": 25.0}, - Utilization: map[string]float64{"queueSize": 75.0}, + Utilization: map[string]float64{"queueSize": 0.75}, }, }, { @@ -92,7 +80,7 @@ func (s) TestE2ECallMetricsUnary(t *testing.T) { t.Error(err) return nil, err } - recorder.SetMemoryUtilization(50.0) + recorder.SetMemoryUtilization(0.9) // This value will be overwritten by a write to the same metric // from the server handler. recorder.SetNamedUtilization("queueSize", 1.0) @@ -114,7 +102,7 @@ func (s) TestE2ECallMetricsUnary(t *testing.T) { return nil, err } recorder.SetRequestCost("queryCost", 25.0) - recorder.SetNamedUtilization("queueSize", 75.0) + recorder.SetNamedUtilization("queueSize", 0.75) return &testpb.Empty{}, nil }, } @@ -171,9 +159,9 @@ func (s) TestE2ECallMetricsStreaming(t *testing.T) { injectMetrics: true, wantProto: &v3orcapb.OrcaLoadReport{ CpuUtilization: 1.0, - MemUtilization: 50.0, - RequestCost: map[string]float64{"queryCost": 25.0}, - Utilization: map[string]float64{"queueSize": 75.0}, + MemUtilization: 0.5, + RequestCost: map[string]float64{"queryCost": 0.25}, + Utilization: map[string]float64{"queueSize": 0.75}, }, }, { @@ -198,7 +186,7 @@ func (s) TestE2ECallMetricsStreaming(t *testing.T) { t.Error(err) return err } - recorder.SetMemoryUtilization(50.0) + recorder.SetMemoryUtilization(0.5) // This value will be overwritten by a write to the same metric // from the server handler. recorder.SetNamedUtilization("queueSize", 1.0) @@ -217,8 +205,8 @@ func (s) TestE2ECallMetricsStreaming(t *testing.T) { t.Error(err) return err } - recorder.SetRequestCost("queryCost", 25.0) - recorder.SetNamedUtilization("queueSize", 75.0) + recorder.SetRequestCost("queryCost", 0.25) + recorder.SetNamedUtilization("queueSize", 0.75) } // Streaming implementation replies with a dummy response until the diff --git a/orca/orca_test.go b/orca/orca_test.go index 096b54907148..4f85e7b01592 100644 --- a/orca/orca_test.go +++ b/orca/orca_test.go @@ -20,9 +20,11 @@ package orca_test import ( "testing" + "time" "github.com/golang/protobuf/proto" "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/metadata" "google.golang.org/grpc/orca/internal" @@ -30,7 +32,17 @@ import ( v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" ) -func TestToLoadReport(t *testing.T) { +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +const defaultTestTimeout = 5 * time.Second + +func (s) TestToLoadReport(t *testing.T) { goodReport := &v3orcapb.OrcaLoadReport{ CpuUtilization: 1.0, MemUtilization: 50.0, diff --git a/orca/producer_test.go b/orca/producer_test.go index ce376e7405e2..212cf2500f6b 100644 --- a/orca/producer_test.go +++ b/orca/producer_test.go @@ -158,12 +158,12 @@ func (s) TestProducer(t *testing.T) { // Set a few metrics and wait for them on the client side. smr.SetCPUUtilization(10) - smr.SetMemoryUtilization(100) - smr.SetNamedUtilization("bob", 555) + smr.SetMemoryUtilization(0.1) + smr.SetNamedUtilization("bob", 0.555) loadReportWant := &v3orcapb.OrcaLoadReport{ CpuUtilization: 10, - MemUtilization: 100, - Utilization: map[string]float64{"bob": 555}, + MemUtilization: 0.1, + Utilization: map[string]float64{"bob": 0.555}, } testReport: @@ -181,13 +181,13 @@ testReport: } // Change and add metrics and wait for them on the client side. - smr.SetCPUUtilization(50) - smr.SetMemoryUtilization(200) - smr.SetNamedUtilization("mary", 321) + smr.SetCPUUtilization(0.5) + smr.SetMemoryUtilization(0.2) + smr.SetNamedUtilization("mary", 0.321) loadReportWant = &v3orcapb.OrcaLoadReport{ - CpuUtilization: 50, - MemUtilization: 200, - Utilization: map[string]float64{"bob": 555, "mary": 321}, + CpuUtilization: 0.5, + MemUtilization: 0.2, + Utilization: map[string]float64{"bob": 0.555, "mary": 0.321}, } for { @@ -322,8 +322,8 @@ func (s) TestProducerBackoff(t *testing.T) { // Define a load report to send and expect the client to see. loadReportWant := &v3orcapb.OrcaLoadReport{ CpuUtilization: 10, - MemUtilization: 100, - Utilization: map[string]float64{"bob": 555}, + MemUtilization: 0.1, + Utilization: map[string]float64{"bob": 0.555}, } // Unblock the fake. @@ -444,8 +444,8 @@ func (s) TestProducerMultipleListeners(t *testing.T) { // Define a load report to send and expect the client to see. loadReportWant := &v3orcapb.OrcaLoadReport{ CpuUtilization: 10, - MemUtilization: 100, - Utilization: map[string]float64{"bob": 555}, + MemUtilization: 0.1, + Utilization: map[string]float64{"bob": 0.555}, } // Receive reports and update counts for the three listeners. diff --git a/orca/server_metrics.go b/orca/server_metrics.go index 6b63d3d252bf..f2cdb9b0b26f 100644 --- a/orca/server_metrics.go +++ b/orca/server_metrics.go @@ -27,8 +27,9 @@ import ( // ServerMetrics is the data returned from a server to a client to describe the // current state of the server and/or the cost of a request when used per-call. type ServerMetrics struct { - CPUUtilization float64 // CPU utilization: [0, 1.0]; unset=-1 + CPUUtilization float64 // CPU utilization: [0, inf); unset=-1 MemUtilization float64 // Memory utilization: [0, 1.0]; unset=-1 + AppUtilization float64 // Application utilization: [0, inf); unset=-1 QPS float64 // queries per second: [0, inf); unset=-1 EPS float64 // errors per second: [0, inf); unset=-1 @@ -52,6 +53,9 @@ func (sm *ServerMetrics) toLoadReportProto() *v3orcapb.OrcaLoadReport { if sm.MemUtilization != -1 { ret.MemUtilization = sm.MemUtilization } + if sm.AppUtilization != -1 { + ret.ApplicationUtilization = sm.AppUtilization + } if sm.QPS != -1 { ret.RpsFractional = sm.QPS } @@ -63,21 +67,24 @@ func (sm *ServerMetrics) toLoadReportProto() *v3orcapb.OrcaLoadReport { // merge merges o into sm, overwriting any values present in both. func (sm *ServerMetrics) merge(o *ServerMetrics) { + mergeMap(sm.Utilization, o.Utilization) + mergeMap(sm.RequestCost, o.RequestCost) + mergeMap(sm.NamedMetrics, o.NamedMetrics) if o.CPUUtilization != -1 { sm.CPUUtilization = o.CPUUtilization } if o.MemUtilization != -1 { sm.MemUtilization = o.MemUtilization } + if o.AppUtilization != -1 { + sm.AppUtilization = o.AppUtilization + } if o.QPS != -1 { sm.QPS = o.QPS } if o.EPS != -1 { sm.EPS = o.EPS } - mergeMap(sm.Utilization, o.Utilization) - mergeMap(sm.RequestCost, o.RequestCost) - mergeMap(sm.NamedMetrics, o.NamedMetrics) } func mergeMap(a, b map[string]float64) { @@ -91,34 +98,46 @@ func mergeMap(a, b map[string]float64) { type ServerMetricsRecorder interface { ServerMetricsProvider - // SetCPUUtilization sets the relevant server metric. + // SetCPUUtilization sets the CPU utilization server metric. Must be + // greater than zero. SetCPUUtilization(float64) - // DeleteCPUUtilization deletes the relevant server metric to prevent it - // from being sent. + // DeleteCPUUtilization deletes the CPU utilization server metric to + // prevent it from being sent. DeleteCPUUtilization() - // SetMemoryUtilization sets the relevant server metric. + // SetMemoryUtilization sets the memory utilization server metric. Must be + // in the range [0, 1]. SetMemoryUtilization(float64) - // DeleteMemoryUtilization deletes the relevant server metric to prevent it - // from being sent. + // DeleteMemoryUtilization deletes the memory utiliztion server metric to + // prevent it from being sent. DeleteMemoryUtilization() - // SetQPS sets the relevant server metric. + // SetApplicationUtilization sets the application utilization server + // metric. Must be greater than zero. + SetApplicationUtilization(float64) + // DeleteApplicationUtilization deletes the application utilization server + // metric to prevent it from being sent. + DeleteApplicationUtilization() + + // SetQPS sets the Queries Per Second server metric. Must be greater than + // zero. SetQPS(float64) - // DeleteQPS deletes the relevant server metric to prevent it from being - // sent. + // DeleteQPS deletes the Queries Per Second server metric to prevent it + // from being sent. DeleteQPS() - // SetEPS sets the relevant server metric. + // SetEPS sets the Errors Per Second server metric. Must be greater than + // zero. SetEPS(float64) - // DeleteEPS deletes the relevant server metric to prevent it from being - // sent. + // DeleteEPS deletes the Errors Per Second server metric to prevent it from + // being sent. DeleteEPS() - // SetNamedUtilization sets the relevant server metric. + // SetNamedUtilization sets the named utilization server metric for the + // name provided. val must be in the range [0, 1]. SetNamedUtilization(name string, val float64) - // DeleteNamedUtilization deletes the relevant server metric to prevent it - // from being sent. + // DeleteNamedUtilization deletes the named utilization server metric for + // the name provided to prevent it from being sent. DeleteNamedUtilization(name string) } @@ -139,6 +158,7 @@ func newServerMetricsRecorder() *serverMetricsRecorder { state: &ServerMetrics{ CPUUtilization: -1, MemUtilization: -1, + AppUtilization: -1, QPS: -1, EPS: -1, Utilization: make(map[string]float64), @@ -155,6 +175,7 @@ func (s *serverMetricsRecorder) ServerMetrics() *ServerMetrics { return &ServerMetrics{ CPUUtilization: s.state.CPUUtilization, MemUtilization: s.state.MemUtilization, + AppUtilization: s.state.AppUtilization, QPS: s.state.QPS, EPS: s.state.EPS, Utilization: copyMap(s.state.Utilization), @@ -173,6 +194,12 @@ func copyMap(m map[string]float64) map[string]float64 { // SetCPUUtilization records a measurement for the CPU utilization metric. func (s *serverMetricsRecorder) SetCPUUtilization(val float64) { + if val < 0 { + if logger.V(2) { + logger.Infof("Ignoring CPU Utilization value out of range: %v", val) + } + return + } s.mu.Lock() defer s.mu.Unlock() s.state.CPUUtilization = val @@ -181,11 +208,19 @@ func (s *serverMetricsRecorder) SetCPUUtilization(val float64) { // DeleteCPUUtilization deletes the relevant server metric to prevent it from // being sent. func (s *serverMetricsRecorder) DeleteCPUUtilization() { - s.SetCPUUtilization(-1) + s.mu.Lock() + defer s.mu.Unlock() + s.state.CPUUtilization = -1 } // SetMemoryUtilization records a measurement for the memory utilization metric. func (s *serverMetricsRecorder) SetMemoryUtilization(val float64) { + if val < 0 || val > 1 { + if logger.V(2) { + logger.Infof("Ignoring Memory Utilization value out of range: %v", val) + } + return + } s.mu.Lock() defer s.mu.Unlock() s.state.MemUtilization = val @@ -194,11 +229,41 @@ func (s *serverMetricsRecorder) SetMemoryUtilization(val float64) { // DeleteMemoryUtilization deletes the relevant server metric to prevent it // from being sent. func (s *serverMetricsRecorder) DeleteMemoryUtilization() { - s.SetMemoryUtilization(-1) + s.mu.Lock() + defer s.mu.Unlock() + s.state.MemUtilization = -1 +} + +// SetApplicationUtilization records a measurement for a generic utilization +// metric. +func (s *serverMetricsRecorder) SetApplicationUtilization(val float64) { + if val < 0 { + if logger.V(2) { + logger.Infof("Ignoring Application Utilization value out of range: %v", val) + } + return + } + s.mu.Lock() + defer s.mu.Unlock() + s.state.AppUtilization = val +} + +// DeleteApplicationUtilization deletes the relevant server metric to prevent +// it from being sent. +func (s *serverMetricsRecorder) DeleteApplicationUtilization() { + s.mu.Lock() + defer s.mu.Unlock() + s.state.AppUtilization = -1 } // SetQPS records a measurement for the QPS metric. func (s *serverMetricsRecorder) SetQPS(val float64) { + if val < 0 { + if logger.V(2) { + logger.Infof("Ignoring QPS value out of range: %v", val) + } + return + } s.mu.Lock() defer s.mu.Unlock() s.state.QPS = val @@ -206,11 +271,19 @@ func (s *serverMetricsRecorder) SetQPS(val float64) { // DeleteQPS deletes the relevant server metric to prevent it from being sent. func (s *serverMetricsRecorder) DeleteQPS() { - s.SetQPS(-1) + s.mu.Lock() + defer s.mu.Unlock() + s.state.QPS = -1 } // SetEPS records a measurement for the EPS metric. func (s *serverMetricsRecorder) SetEPS(val float64) { + if val < 0 { + if logger.V(2) { + logger.Infof("Ignoring EPS value out of range: %v", val) + } + return + } s.mu.Lock() defer s.mu.Unlock() s.state.EPS = val @@ -218,12 +291,20 @@ func (s *serverMetricsRecorder) SetEPS(val float64) { // DeleteEPS deletes the relevant server metric to prevent it from being sent. func (s *serverMetricsRecorder) DeleteEPS() { - s.SetEPS(-1) + s.mu.Lock() + defer s.mu.Unlock() + s.state.EPS = -1 } // SetNamedUtilization records a measurement for a utilization metric uniquely // identifiable by name. func (s *serverMetricsRecorder) SetNamedUtilization(name string, val float64) { + if val < 0 || val > 1 { + if logger.V(2) { + logger.Infof("Ignoring Named Utilization value out of range: %v", val) + } + return + } s.mu.Lock() defer s.mu.Unlock() s.state.Utilization[name] = val diff --git a/orca/server_metrics_test.go b/orca/server_metrics_test.go new file mode 100644 index 000000000000..ecc80d0e584b --- /dev/null +++ b/orca/server_metrics_test.go @@ -0,0 +1,175 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package orca + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/internal/grpctest" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +func (s) TestServerMetrics_Setters(t *testing.T) { + smr := NewServerMetricsRecorder() + + smr.SetCPUUtilization(0.1) + smr.SetMemoryUtilization(0.2) + smr.SetApplicationUtilization(0.3) + smr.SetQPS(0.4) + smr.SetEPS(0.5) + smr.SetNamedUtilization("x", 0.6) + + want := &ServerMetrics{ + CPUUtilization: 0.1, + MemUtilization: 0.2, + AppUtilization: 0.3, + QPS: 0.4, + EPS: 0.5, + Utilization: map[string]float64{"x": 0.6}, + NamedMetrics: map[string]float64{}, + RequestCost: map[string]float64{}, + } + + got := smr.ServerMetrics() + if d := cmp.Diff(got, want); d != "" { + t.Fatalf("unexpected server metrics: -got +want: %v", d) + } +} + +func (s) TestServerMetrics_Deleters(t *testing.T) { + smr := NewServerMetricsRecorder() + + smr.SetCPUUtilization(0.1) + smr.SetMemoryUtilization(0.2) + smr.SetApplicationUtilization(0.3) + smr.SetQPS(0.4) + smr.SetEPS(0.5) + smr.SetNamedUtilization("x", 0.6) + smr.SetNamedUtilization("y", 0.7) + + // Now delete everything except named_utilization "y". + smr.DeleteCPUUtilization() + smr.DeleteMemoryUtilization() + smr.DeleteApplicationUtilization() + smr.DeleteQPS() + smr.DeleteEPS() + smr.DeleteNamedUtilization("x") + + want := &ServerMetrics{ + CPUUtilization: -1, + MemUtilization: -1, + AppUtilization: -1, + QPS: -1, + EPS: -1, + Utilization: map[string]float64{"y": 0.7}, + NamedMetrics: map[string]float64{}, + RequestCost: map[string]float64{}, + } + + got := smr.ServerMetrics() + if d := cmp.Diff(got, want); d != "" { + t.Fatalf("unexpected server metrics: -got +want: %v", d) + } +} + +func (s) TestServerMetrics_Setters_Range(t *testing.T) { + smr := NewServerMetricsRecorder() + + smr.SetCPUUtilization(0.1) + smr.SetMemoryUtilization(0.2) + smr.SetApplicationUtilization(0.3) + smr.SetQPS(0.4) + smr.SetEPS(0.5) + smr.SetNamedUtilization("x", 0.6) + + // Negatives for all these fields should be ignored. + smr.SetCPUUtilization(-2) + smr.SetMemoryUtilization(-3) + smr.SetApplicationUtilization(-4) + smr.SetQPS(-0.1) + smr.SetEPS(-0.6) + smr.SetNamedUtilization("x", -2) + + // Memory and named utilizations over 1 are ignored. + smr.SetMemoryUtilization(1.1) + smr.SetNamedUtilization("x", 1.1) + + want := &ServerMetrics{ + CPUUtilization: 0.1, + MemUtilization: 0.2, + AppUtilization: 0.3, + QPS: 0.4, + EPS: 0.5, + Utilization: map[string]float64{"x": 0.6}, + NamedMetrics: map[string]float64{}, + RequestCost: map[string]float64{}, + } + + got := smr.ServerMetrics() + if d := cmp.Diff(got, want); d != "" { + t.Fatalf("unexpected server metrics: -got +want: %v", d) + } +} + +func (s) TestServerMetrics_Merge(t *testing.T) { + sm1 := &ServerMetrics{ + CPUUtilization: 0.1, + MemUtilization: 0.2, + AppUtilization: 0.3, + QPS: -1, + EPS: 0, + Utilization: map[string]float64{"x": 0.6}, + NamedMetrics: map[string]float64{"y": 0.2}, + RequestCost: map[string]float64{"a": 0.1}, + } + + sm2 := &ServerMetrics{ + CPUUtilization: -1, + AppUtilization: 0, + QPS: 0.9, + EPS: 20, + Utilization: map[string]float64{"x": 0.5, "y": 0.4}, + NamedMetrics: map[string]float64{"x": 0.1}, + RequestCost: map[string]float64{"a": 0.2}, + } + + want := &ServerMetrics{ + CPUUtilization: 0.1, + MemUtilization: 0, + AppUtilization: 0, + QPS: 0.9, + EPS: 20, + Utilization: map[string]float64{"x": 0.5, "y": 0.4}, + NamedMetrics: map[string]float64{"x": 0.1, "y": 0.2}, + RequestCost: map[string]float64{"a": 0.2}, + } + + sm1.merge(sm2) + if d := cmp.Diff(sm1, want); d != "" { + t.Fatalf("unexpected server metrics: -got +want: %v", d) + } +} diff --git a/orca/service_test.go b/orca/service_test.go index 73ad28430264..9c4defbe266b 100644 --- a/orca/service_test.go +++ b/orca/service_test.go @@ -60,9 +60,10 @@ func (t *testServiceImpl) UnaryCall(context.Context, *testpb.SimpleRequest) (*te t.requests++ t.mu.Unlock() - t.smr.SetNamedUtilization(requestsMetricKey, float64(t.requests)) + t.smr.SetNamedUtilization(requestsMetricKey, float64(t.requests)*0.01) t.smr.SetCPUUtilization(50.0) - t.smr.SetMemoryUtilization(99.0) + t.smr.SetMemoryUtilization(0.9) + t.smr.SetApplicationUtilization(1.2) return &testpb.SimpleResponse{}, nil } @@ -70,6 +71,7 @@ func (t *testServiceImpl) EmptyCall(context.Context, *testpb.Empty) (*testpb.Emp t.smr.DeleteNamedUtilization(requestsMetricKey) t.smr.SetCPUUtilization(0) t.smr.SetMemoryUtilization(0) + t.smr.DeleteApplicationUtilization() return &testpb.Empty{}, nil } @@ -150,9 +152,10 @@ func (s) TestE2E_CustomBackendMetrics_OutOfBand(t *testing.T) { } wantProto := &v3orcapb.OrcaLoadReport{ - CpuUtilization: 50.0, - MemUtilization: 99.0, - Utilization: map[string]float64{requestsMetricKey: numRequests}, + CpuUtilization: 50.0, + MemUtilization: 0.9, + ApplicationUtilization: 1.2, + Utilization: map[string]float64{requestsMetricKey: numRequests * 0.01}, } gotProto, err := stream.Recv() if err != nil { diff --git a/stats/opencensus/go.sum b/stats/opencensus/go.sum index 48faad4b66dd..bb94d12a7baa 100644 --- a/stats/opencensus/go.sum +++ b/stats/opencensus/go.sum @@ -622,7 +622,7 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=