From e9883f506f23196d63579584183017b8c29353da Mon Sep 17 00:00:00 2001 From: adityamaru Date: Wed, 12 Apr 2023 20:28:12 -0400 Subject: [PATCH] kvclient,server: propagate pprof labels for BatchRequests This change teaches DistSender to populate a BatchRequest's header with the pprof labels set on the sender's context. If the node processing the request on the server side has CPU profiling with labels enabled, then the labels stored in the BatchRequest will be applied to the root context of the goroutine executing the request on the server. Doing so will ensure that all server-side CPU samples of the root goroutine and all its spawned goroutine will be labeled correctly. Propagating these labels across RPC boundaries is useful to correlate server side samples with the sender. For example, in a CPU profile generated with this change, we will be able to identify which backup job sent an ExportRequest that is causing a CPU hotspot on a remote node. Fixes: #100166 Release note: None --- pkg/kv/kvclient/kvcoord/BUILD.bazel | 1 + pkg/kv/kvclient/kvcoord/dist_sender.go | 10 +++ pkg/kv/kvclient/kvcoord/dist_sender_test.go | 50 +++++++++++++++ pkg/kv/kvpb/api.proto | 12 ++++ pkg/server/node.go | 10 +++ pkg/server/node_test.go | 69 +++++++++++++++++++++ 6 files changed, 152 insertions(+) diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 2aec2c3ff649..2458ad64487f 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -209,6 +209,7 @@ go_test( "//pkg/util/log", "//pkg/util/metric", "//pkg/util/netutil", + "//pkg/util/pprofutil", "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/retry", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 72f5c18beb94..2f8ac8b138db 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "runtime" + "runtime/pprof" "strings" "sync" "sync/atomic" @@ -748,6 +749,15 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ return kvpb.NewErrorf("unknown wait policy %s", ba.WaitPolicy) } + // If the context has any pprof labels, attach them to the BatchRequest. + // These labels will be applied to the root context processing the request + // server-side, if the node processing the request is collecting a CPU + // profile with labels. + pprof.ForLabels(ctx, func(key, value string) bool { + ba.ProfileLabels = append(ba.ProfileLabels, key, value) + return true + }) + return nil } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index e7e0f64633c0..b49f6bc18271 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -48,6 +48,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/netutil" + "github.com/cockroachdb/cockroach/pkg/util/pprofutil" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -3496,6 +3497,55 @@ func TestSenderTransport(t *testing.T) { } } +// TestPProfLabelsAppliedToBatchRequestHeader tests that pprof labels on the +// sender's context are copied to the BatchRequest.Header. +func TestPProfLabelsAppliedToBatchRequestHeader(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClockForTesting(nil) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + g := makeGossip(t, stopper, rpcContext) + + observedLabels := make(map[string]string) + testFn := func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { + for i := 0; i < len(ba.Header.ProfileLabels)-1; i += 2 { + observedLabels[ba.Header.ProfileLabels[i]] = ba.Header.ProfileLabels[i+1] + } + return ba.CreateReply(), nil + } + + cfg := DistSenderConfig{ + AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), + Clock: clock, + NodeDescs: g, + RPCContext: rpcContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(testFn), + }, + RangeDescriptorDB: defaultMockRangeDescriptorDB, + Settings: cluster.MakeTestingClusterSettings(), + } + ds := NewDistSender(cfg) + ba := &kvpb.BatchRequest{} + ba.Add(kvpb.NewPut(roachpb.Key("a"), roachpb.MakeValueFromString("value"))) + expectedLabels := map[string]string{"key": "value", "key2": "value2"} + var labels []string + for k, v := range expectedLabels { + labels = append(labels, k, v) + } + var undo func() + ctx, undo = pprofutil.SetProfilerLabels(ctx, labels...) + defer undo() + if _, err := ds.Send(ctx, ba); err != nil { + t.Fatalf("put encountered error: %s", err) + } + require.Equal(t, expectedLabels, observedLabels) +} + func TestGatewayNodeID(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 8bde469ac159..8541d5227061 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -2711,7 +2711,19 @@ message Header { // RESUME_ELASTIC_CPU_LIMIT. bool return_elastic_cpu_resume_spans = 30 [(gogoproto.customname) = "ReturnElasticCPUResumeSpans"]; + // ProfileLabels are the pprof labels set on the context that is sending the + // BatchRequest. + // + // If the node processing the BatchRequest is collecting a CPU profile with + // labels, then these profile labels will be applied to the root context + // processing the BatchRequest on the server-side. Propagating these labels + // across RPC boundaries will help correlate server CPU profile samples to the + // sender. + repeated string profile_labels = 31 [(gogoproto.customname) = "ProfileLabels"]; + reserved 7, 10, 12, 14, 20; + + // Next ID: 32 } // BoundedStalenessHeader contains configuration values pertaining to bounded diff --git a/pkg/server/node.go b/pkg/server/node.go index b4728d14dc41..bb009528fd93 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1251,6 +1251,16 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR ctx = logtags.AddTag(ctx, "tenant", tenantID.String()) } + // If the node is collecting a CPU profile with labels, and the sender has set + // pprof labels in the BatchRequest, then we apply them to the context that is + // going to execute the BatchRequest. These labels will help correlate server + // side CPU profile samples to the sender. + if len(args.ProfileLabels) != 0 && n.execCfg.Settings.CPUProfileType() == cluster.CPUProfileWithLabels { + var undo func() + ctx, undo = pprofutil.SetProfilerLabels(ctx, args.ProfileLabels...) + defer undo() + } + // Requests from tenants don't have gateway node id set but are required for // the QPS based rebalancing to work. The GatewayNodeID is used as a proxy // for the locality of the origin of the request. The replica stats aggregate diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index cdb41f04ff9a..781e6ca7e578 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -15,6 +15,7 @@ import ( "context" "fmt" "reflect" + "runtime/pprof" "sort" "testing" @@ -30,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/status" "github.com/cockroachdb/cockroach/pkg/server/status/statuspb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -658,6 +660,73 @@ func TestNodeSendUnknownBatchRequest(t *testing.T) { } } +// TestNodeBatchRequestPProfLabels tests that node.Batch copies pprof labels +// from the BatchRequest and applies them to the root context if CPU profiling +// with labels is enabled. +func TestNodeBatchRequestPProfLabels(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + observedProfileLabels := make(map[string]string) + srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingResponseFilter: func(ctx context.Context, ba *kvpb.BatchRequest, _ *kvpb.BatchResponse) *kvpb.Error { + var foundBatch bool + for _, ru := range ba.Requests { + switch r := ru.GetInner().(type) { + case *kvpb.PutRequest: + if r.Header().Key.Equal(roachpb.Key("a")) { + foundBatch = true + } + } + } + if foundBatch { + pprof.ForLabels(ctx, func(key, value string) bool { + observedProfileLabels[key] = value + return true + }) + } + return nil + }, + }, + }, + }) + defer srv.Stopper().Stop(context.Background()) + ts := srv.(*TestServer) + n := ts.GetNode() + + var ba kvpb.BatchRequest + ba.RangeID = 1 + ba.Replica.StoreID = 1 + expectedProfileLabels := map[string]string{"key": "value", "key2": "value2"} + ba.ProfileLabels = func() []string { + var labels []string + for k, v := range expectedProfileLabels { + labels = append(labels, k, v) + } + return labels + }() + + gr := kvpb.NewGet(roachpb.Key("a"), false) + pr := kvpb.NewPut(gr.Header().Key, roachpb.Value{}) + ba.Add(gr, pr) + + // If CPU profiling with labels is not enabled, we should not observe any + // pprof labels on the context. + ctx := context.Background() + _, _ = n.Batch(ctx, &ba) + require.Equal(t, map[string]string{}, observedProfileLabels) + + require.NoError(t, ts.ClusterSettings().SetCPUProfiling(cluster.CPUProfileWithLabels)) + _, _ = n.Batch(ctx, &ba) + + require.Len(t, observedProfileLabels, 3) + // Delete the labels for the range_str. + delete(observedProfileLabels, "range_str") + require.Equal(t, expectedProfileLabels, observedProfileLabels) +} + func TestNodeBatchRequestMetricsInc(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)