Skip to content

Commit

Permalink
Merge #101404
Browse files Browse the repository at this point in the history
101404: kvclient,server: propagate pprof labels for BatchRequests r=dt a=adityamaru

This change teaches DistSender to populate a BatchRequest's header with the pprof labels set on the sender's context. If the node processing the request on the server side has CPU profiling with labels enabled, then the labels stored in the BatchRequest will be applied to the root context of the goroutine executing the request on the server. Doing so will ensure that all server-side CPU samples of the root goroutine and all its spawned goroutine will be labeled correctly.

Propagating these labels across RPC boundaries is useful to correlate server side samples with the sender. For example, in a CPU profile generated with this change, we will be able to identify which backup job sent an ExportRequest that is causing a CPU hotspot on a remote node.

Fixes: #100166
Release note: None

Co-authored-by: adityamaru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed Apr 22, 2023
2 parents b18455a + fc6dc7f commit 1d8a8a0
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/netutil",
"//pkg/util/pprofutil",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"
"runtime"
"runtime/pprof"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -748,6 +749,15 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
return kvpb.NewErrorf("unknown wait policy %s", ba.WaitPolicy)
}

// If the context has any pprof labels, attach them to the BatchRequest.
// These labels will be applied to the root context processing the request
// server-side, if the node processing the request is collecting a CPU
// profile with labels.
pprof.ForLabels(ctx, func(key, value string) bool {
ba.ProfileLabels = append(ba.ProfileLabels, key, value)
return true
})

return nil
}

Expand Down
50 changes: 50 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -3496,6 +3497,55 @@ func TestSenderTransport(t *testing.T) {
}
}

// TestPProfLabelsAppliedToBatchRequestHeader tests that pprof labels on the
// sender's context are copied to the BatchRequest.Header.
func TestPProfLabelsAppliedToBatchRequestHeader(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
g := makeGossip(t, stopper, rpcContext)

observedLabels := make(map[string]string)
testFn := func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
for i := 0; i < len(ba.Header.ProfileLabels)-1; i += 2 {
observedLabels[ba.Header.ProfileLabels[i]] = ba.Header.ProfileLabels[i+1]
}
return ba.CreateReply(), nil
}

cfg := DistSenderConfig{
AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(),
Clock: clock,
NodeDescs: g,
RPCContext: rpcContext,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(testFn),
},
RangeDescriptorDB: defaultMockRangeDescriptorDB,
Settings: cluster.MakeTestingClusterSettings(),
}
ds := NewDistSender(cfg)
ba := &kvpb.BatchRequest{}
ba.Add(kvpb.NewPut(roachpb.Key("a"), roachpb.MakeValueFromString("value")))
expectedLabels := map[string]string{"key": "value", "key2": "value2"}
var labels []string
for k, v := range expectedLabels {
labels = append(labels, k, v)
}
var undo func()
ctx, undo = pprofutil.SetProfilerLabels(ctx, labels...)
defer undo()
if _, err := ds.Send(ctx, ba); err != nil {
t.Fatalf("put encountered error: %s", err)
}
require.Equal(t, expectedLabels, observedLabels)
}

func TestGatewayNodeID(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2711,7 +2711,19 @@ message Header {
// RESUME_ELASTIC_CPU_LIMIT.
bool return_elastic_cpu_resume_spans = 30 [(gogoproto.customname) = "ReturnElasticCPUResumeSpans"];

// ProfileLabels are the pprof labels set on the context that is sending the
// BatchRequest.
//
// If the node processing the BatchRequest is collecting a CPU profile with
// labels, then these profile labels will be applied to the root context
// processing the BatchRequest on the server-side. Propagating these labels
// across RPC boundaries will help correlate server CPU profile samples to the
// sender.
repeated string profile_labels = 31 [(gogoproto.customname) = "ProfileLabels"];

reserved 7, 10, 12, 14, 20;

// Next ID: 32
}

// BoundedStalenessHeader contains configuration values pertaining to bounded
Expand Down
12 changes: 11 additions & 1 deletion pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,17 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR
tenantID = roachpb.SystemTenantID
} else {
// We had this tag before the ResetAndAnnotateCtx() call above.
ctx = logtags.AddTag(ctx, "tenant", tenantID.String())
ctx = logtags.AddTag(ctx, "tenant", tenantID)
}

// If the node is collecting a CPU profile with labels, and the sender has set
// pprof labels in the BatchRequest, then we apply them to the context that is
// going to execute the BatchRequest. These labels will help correlate server
// side CPU profile samples to the sender.
if len(args.ProfileLabels) != 0 && n.execCfg.Settings.CPUProfileType() == cluster.CPUProfileWithLabels {
var undo func()
ctx, undo = pprofutil.SetProfilerLabels(ctx, args.ProfileLabels...)
defer undo()
}

// Requests from tenants don't have gateway node id set but are required for
Expand Down
69 changes: 69 additions & 0 deletions pkg/server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"context"
"fmt"
"reflect"
"runtime/pprof"
"sort"
"testing"

Expand All @@ -30,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/server/status/statuspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -658,6 +660,73 @@ func TestNodeSendUnknownBatchRequest(t *testing.T) {
}
}

// TestNodeBatchRequestPProfLabels tests that node.Batch copies pprof labels
// from the BatchRequest and applies them to the root context if CPU profiling
// with labels is enabled.
func TestNodeBatchRequestPProfLabels(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

observedProfileLabels := make(map[string]string)
srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingResponseFilter: func(ctx context.Context, ba *kvpb.BatchRequest, _ *kvpb.BatchResponse) *kvpb.Error {
var foundBatch bool
for _, ru := range ba.Requests {
switch r := ru.GetInner().(type) {
case *kvpb.PutRequest:
if r.Header().Key.Equal(roachpb.Key("a")) {
foundBatch = true
}
}
}
if foundBatch {
pprof.ForLabels(ctx, func(key, value string) bool {
observedProfileLabels[key] = value
return true
})
}
return nil
},
},
},
})
defer srv.Stopper().Stop(context.Background())
ts := srv.(*TestServer)
n := ts.GetNode()

var ba kvpb.BatchRequest
ba.RangeID = 1
ba.Replica.StoreID = 1
expectedProfileLabels := map[string]string{"key": "value", "key2": "value2"}
ba.ProfileLabels = func() []string {
var labels []string
for k, v := range expectedProfileLabels {
labels = append(labels, k, v)
}
return labels
}()

gr := kvpb.NewGet(roachpb.Key("a"), false)
pr := kvpb.NewPut(gr.Header().Key, roachpb.Value{})
ba.Add(gr, pr)

// If CPU profiling with labels is not enabled, we should not observe any
// pprof labels on the context.
ctx := context.Background()
_, _ = n.Batch(ctx, &ba)
require.Equal(t, map[string]string{}, observedProfileLabels)

require.NoError(t, ts.ClusterSettings().SetCPUProfiling(cluster.CPUProfileWithLabels))
_, _ = n.Batch(ctx, &ba)

require.Len(t, observedProfileLabels, 3)
// Delete the labels for the range_str.
delete(observedProfileLabels, "range_str")
require.Equal(t, expectedProfileLabels, observedProfileLabels)
}

func TestNodeBatchRequestMetricsInc(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit 1d8a8a0

Please sign in to comment.