Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvclient,server: propagate pprof labels for BatchRequests #101404

Merged
merged 2 commits into from
Apr 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/netutil",
"//pkg/util/pprofutil",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"
"runtime"
"runtime/pprof"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -748,6 +749,15 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
return kvpb.NewErrorf("unknown wait policy %s", ba.WaitPolicy)
}

// If the context has any pprof labels, attach them to the BatchRequest.
// These labels will be applied to the root context processing the request
// server-side, if the node processing the request is collecting a CPU
// profile with labels.
pprof.ForLabels(ctx, func(key, value string) bool {
ba.ProfileLabels = append(ba.ProfileLabels, key, value)
return true
})

return nil
}

Expand Down
50 changes: 50 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -3496,6 +3497,55 @@ func TestSenderTransport(t *testing.T) {
}
}

// TestPProfLabelsAppliedToBatchRequestHeader tests that pprof labels on the
// sender's context are copied to the BatchRequest.Header.
func TestPProfLabelsAppliedToBatchRequestHeader(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
g := makeGossip(t, stopper, rpcContext)

observedLabels := make(map[string]string)
testFn := func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
for i := 0; i < len(ba.Header.ProfileLabels)-1; i += 2 {
observedLabels[ba.Header.ProfileLabels[i]] = ba.Header.ProfileLabels[i+1]
}
return ba.CreateReply(), nil
}

cfg := DistSenderConfig{
AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(),
Clock: clock,
NodeDescs: g,
RPCContext: rpcContext,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(testFn),
},
RangeDescriptorDB: defaultMockRangeDescriptorDB,
Settings: cluster.MakeTestingClusterSettings(),
}
ds := NewDistSender(cfg)
ba := &kvpb.BatchRequest{}
ba.Add(kvpb.NewPut(roachpb.Key("a"), roachpb.MakeValueFromString("value")))
expectedLabels := map[string]string{"key": "value", "key2": "value2"}
var labels []string
for k, v := range expectedLabels {
labels = append(labels, k, v)
}
var undo func()
ctx, undo = pprofutil.SetProfilerLabels(ctx, labels...)
defer undo()
if _, err := ds.Send(ctx, ba); err != nil {
t.Fatalf("put encountered error: %s", err)
}
require.Equal(t, expectedLabels, observedLabels)
}

func TestGatewayNodeID(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2711,7 +2711,19 @@ message Header {
// RESUME_ELASTIC_CPU_LIMIT.
bool return_elastic_cpu_resume_spans = 30 [(gogoproto.customname) = "ReturnElasticCPUResumeSpans"];

// ProfileLabels are the pprof labels set on the context that is sending the
// BatchRequest.
//
// If the node processing the BatchRequest is collecting a CPU profile with
// labels, then these profile labels will be applied to the root context
// processing the BatchRequest on the server-side. Propagating these labels
// across RPC boundaries will help correlate server CPU profile samples to the
// sender.
repeated string profile_labels = 31 [(gogoproto.customname) = "ProfileLabels"];

reserved 7, 10, 12, 14, 20;

// Next ID: 32
}

// BoundedStalenessHeader contains configuration values pertaining to bounded
Expand Down
12 changes: 11 additions & 1 deletion pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,17 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR
tenantID = roachpb.SystemTenantID
} else {
// We had this tag before the ResetAndAnnotateCtx() call above.
ctx = logtags.AddTag(ctx, "tenant", tenantID.String())
ctx = logtags.AddTag(ctx, "tenant", tenantID)
}

// If the node is collecting a CPU profile with labels, and the sender has set
// pprof labels in the BatchRequest, then we apply them to the context that is
// going to execute the BatchRequest. These labels will help correlate server
// side CPU profile samples to the sender.
if len(args.ProfileLabels) != 0 && n.execCfg.Settings.CPUProfileType() == cluster.CPUProfileWithLabels {
var undo func()
ctx, undo = pprofutil.SetProfilerLabels(ctx, args.ProfileLabels...)
defer undo()
}

// Requests from tenants don't have gateway node id set but are required for
Expand Down
69 changes: 69 additions & 0 deletions pkg/server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"context"
"fmt"
"reflect"
"runtime/pprof"
"sort"
"testing"

Expand All @@ -30,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/server/status/statuspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -658,6 +660,73 @@ func TestNodeSendUnknownBatchRequest(t *testing.T) {
}
}

// TestNodeBatchRequestPProfLabels tests that node.Batch copies pprof labels
// from the BatchRequest and applies them to the root context if CPU profiling
// with labels is enabled.
func TestNodeBatchRequestPProfLabels(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

observedProfileLabels := make(map[string]string)
srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingResponseFilter: func(ctx context.Context, ba *kvpb.BatchRequest, _ *kvpb.BatchResponse) *kvpb.Error {
var foundBatch bool
for _, ru := range ba.Requests {
switch r := ru.GetInner().(type) {
case *kvpb.PutRequest:
if r.Header().Key.Equal(roachpb.Key("a")) {
foundBatch = true
}
}
}
if foundBatch {
pprof.ForLabels(ctx, func(key, value string) bool {
observedProfileLabels[key] = value
return true
})
}
return nil
},
},
},
})
defer srv.Stopper().Stop(context.Background())
ts := srv.(*TestServer)
n := ts.GetNode()

var ba kvpb.BatchRequest
ba.RangeID = 1
ba.Replica.StoreID = 1
expectedProfileLabels := map[string]string{"key": "value", "key2": "value2"}
ba.ProfileLabels = func() []string {
var labels []string
for k, v := range expectedProfileLabels {
labels = append(labels, k, v)
}
return labels
}()

gr := kvpb.NewGet(roachpb.Key("a"), false)
pr := kvpb.NewPut(gr.Header().Key, roachpb.Value{})
ba.Add(gr, pr)

// If CPU profiling with labels is not enabled, we should not observe any
// pprof labels on the context.
ctx := context.Background()
_, _ = n.Batch(ctx, &ba)
require.Equal(t, map[string]string{}, observedProfileLabels)

require.NoError(t, ts.ClusterSettings().SetCPUProfiling(cluster.CPUProfileWithLabels))
_, _ = n.Batch(ctx, &ba)

require.Len(t, observedProfileLabels, 3)
// Delete the labels for the range_str.
delete(observedProfileLabels, "range_str")
require.Equal(t, expectedProfileLabels, observedProfileLabels)
}

func TestNodeBatchRequestMetricsInc(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down