Skip to content

Commit

Permalink
kvclient,server: propagate pprof labels for BatchRequests
Browse files Browse the repository at this point in the history
This change teaches DistSender to populate a BatchRequest's
header with the pprof labels set on the sender's context.
If the node processing the request on the server side has CPU
profiling with labels enabled, then the labels stored in the BatchRequest
will be applied to the root context of the goroutine executing the
request on the server. Doing so will ensure that all server-side
CPU samples of the root goroutine and all its spawned goroutine
will be labeled correctly.

Propagating these labels across RPC boundaries is useful to correlate
server side samples with the sender. For example, in a CPU profile
generated with this change, we will be able to identify which backup
job sent an ExportRequest that is causing a CPU hotspot on a remote node.

Fixes: #100166
Release note: None
  • Loading branch information
adityamaru committed Apr 20, 2023
1 parent ac03a55 commit e9883f5
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/netutil",
"//pkg/util/pprofutil",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"
"runtime"
"runtime/pprof"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -748,6 +749,15 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
return kvpb.NewErrorf("unknown wait policy %s", ba.WaitPolicy)
}

// If the context has any pprof labels, attach them to the BatchRequest.
// These labels will be applied to the root context processing the request
// server-side, if the node processing the request is collecting a CPU
// profile with labels.
pprof.ForLabels(ctx, func(key, value string) bool {
ba.ProfileLabels = append(ba.ProfileLabels, key, value)
return true
})

return nil
}

Expand Down
50 changes: 50 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -3496,6 +3497,55 @@ func TestSenderTransport(t *testing.T) {
}
}

// TestPProfLabelsAppliedToBatchRequestHeader tests that pprof labels on the
// sender's context are copied to the BatchRequest.Header.
func TestPProfLabelsAppliedToBatchRequestHeader(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
g := makeGossip(t, stopper, rpcContext)

observedLabels := make(map[string]string)
testFn := func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
for i := 0; i < len(ba.Header.ProfileLabels)-1; i += 2 {
observedLabels[ba.Header.ProfileLabels[i]] = ba.Header.ProfileLabels[i+1]
}
return ba.CreateReply(), nil
}

cfg := DistSenderConfig{
AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(),
Clock: clock,
NodeDescs: g,
RPCContext: rpcContext,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(testFn),
},
RangeDescriptorDB: defaultMockRangeDescriptorDB,
Settings: cluster.MakeTestingClusterSettings(),
}
ds := NewDistSender(cfg)
ba := &kvpb.BatchRequest{}
ba.Add(kvpb.NewPut(roachpb.Key("a"), roachpb.MakeValueFromString("value")))
expectedLabels := map[string]string{"key": "value", "key2": "value2"}
var labels []string
for k, v := range expectedLabels {
labels = append(labels, k, v)
}
var undo func()
ctx, undo = pprofutil.SetProfilerLabels(ctx, labels...)
defer undo()
if _, err := ds.Send(ctx, ba); err != nil {
t.Fatalf("put encountered error: %s", err)
}
require.Equal(t, expectedLabels, observedLabels)
}

func TestGatewayNodeID(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2711,7 +2711,19 @@ message Header {
// RESUME_ELASTIC_CPU_LIMIT.
bool return_elastic_cpu_resume_spans = 30 [(gogoproto.customname) = "ReturnElasticCPUResumeSpans"];

// ProfileLabels are the pprof labels set on the context that is sending the
// BatchRequest.
//
// If the node processing the BatchRequest is collecting a CPU profile with
// labels, then these profile labels will be applied to the root context
// processing the BatchRequest on the server-side. Propagating these labels
// across RPC boundaries will help correlate server CPU profile samples to the
// sender.
repeated string profile_labels = 31 [(gogoproto.customname) = "ProfileLabels"];

reserved 7, 10, 12, 14, 20;

// Next ID: 32
}

// BoundedStalenessHeader contains configuration values pertaining to bounded
Expand Down
10 changes: 10 additions & 0 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,16 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR
ctx = logtags.AddTag(ctx, "tenant", tenantID.String())
}

// If the node is collecting a CPU profile with labels, and the sender has set
// pprof labels in the BatchRequest, then we apply them to the context that is
// going to execute the BatchRequest. These labels will help correlate server
// side CPU profile samples to the sender.
if len(args.ProfileLabels) != 0 && n.execCfg.Settings.CPUProfileType() == cluster.CPUProfileWithLabels {
var undo func()
ctx, undo = pprofutil.SetProfilerLabels(ctx, args.ProfileLabels...)
defer undo()
}

// Requests from tenants don't have gateway node id set but are required for
// the QPS based rebalancing to work. The GatewayNodeID is used as a proxy
// for the locality of the origin of the request. The replica stats aggregate
Expand Down
69 changes: 69 additions & 0 deletions pkg/server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"context"
"fmt"
"reflect"
"runtime/pprof"
"sort"
"testing"

Expand All @@ -30,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/server/status/statuspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -658,6 +660,73 @@ func TestNodeSendUnknownBatchRequest(t *testing.T) {
}
}

// TestNodeBatchRequestPProfLabels tests that node.Batch copies pprof labels
// from the BatchRequest and applies them to the root context if CPU profiling
// with labels is enabled.
func TestNodeBatchRequestPProfLabels(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

observedProfileLabels := make(map[string]string)
srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingResponseFilter: func(ctx context.Context, ba *kvpb.BatchRequest, _ *kvpb.BatchResponse) *kvpb.Error {
var foundBatch bool
for _, ru := range ba.Requests {
switch r := ru.GetInner().(type) {
case *kvpb.PutRequest:
if r.Header().Key.Equal(roachpb.Key("a")) {
foundBatch = true
}
}
}
if foundBatch {
pprof.ForLabels(ctx, func(key, value string) bool {
observedProfileLabels[key] = value
return true
})
}
return nil
},
},
},
})
defer srv.Stopper().Stop(context.Background())
ts := srv.(*TestServer)
n := ts.GetNode()

var ba kvpb.BatchRequest
ba.RangeID = 1
ba.Replica.StoreID = 1
expectedProfileLabels := map[string]string{"key": "value", "key2": "value2"}
ba.ProfileLabels = func() []string {
var labels []string
for k, v := range expectedProfileLabels {
labels = append(labels, k, v)
}
return labels
}()

gr := kvpb.NewGet(roachpb.Key("a"), false)
pr := kvpb.NewPut(gr.Header().Key, roachpb.Value{})
ba.Add(gr, pr)

// If CPU profiling with labels is not enabled, we should not observe any
// pprof labels on the context.
ctx := context.Background()
_, _ = n.Batch(ctx, &ba)
require.Equal(t, map[string]string{}, observedProfileLabels)

require.NoError(t, ts.ClusterSettings().SetCPUProfiling(cluster.CPUProfileWithLabels))
_, _ = n.Batch(ctx, &ba)

require.Len(t, observedProfileLabels, 3)
// Delete the labels for the range_str.
delete(observedProfileLabels, "range_str")
require.Equal(t, expectedProfileLabels, observedProfileLabels)
}

func TestNodeBatchRequestMetricsInc(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit e9883f5

Please sign in to comment.