Skip to content

Commit

Permalink
[Jaeger] Add gRPC sampling endpoint
Browse files Browse the repository at this point in the history
Support fetching Jaeger gRPC sampling strategy from APM
server. Only probabilistic sampling is supported. Sampling
rates are configured via Kibana agent config management.

implements elastic#3487
  • Loading branch information
simitt committed Mar 16, 2020
1 parent 98001fd commit ee24b29
Show file tree
Hide file tree
Showing 15 changed files with 448 additions and 62 deletions.
10 changes: 10 additions & 0 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,16 @@ apm-server:
# does not apply to Jaeger agents.
#auth_tag: ""

# Enable fetching the sampling rate from the gRPC endpoint. Only probabilistic
# sampling is supported. Sampling is disabled by default.
# No auth token can be configured for this endpoint.
#sampling:
#enabled: false
# Set the default sampling rate used when no service specific
# sampling rate is configured.
# Valid range from 0 to 1.0. Defaults to 1.0.
#default_rate: 1.0

#http:
# Set to true to enable the Jaeger HTTP collector endpoint.
#enabled: false
Expand Down
10 changes: 10 additions & 0 deletions apm-server.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,16 @@ apm-server:
# does not apply to Jaeger agents.
#auth_tag: ""

# Enable fetching the sampling rate from the gRPC endpoint. Only probabilistic
# sampling is supported. Sampling is disabled by default.
# No auth token can be configured for this endpoint.
#sampling:
#enabled: false
# Set the default sampling rate used when no service specific
# sampling rate is configured.
# Valid range from 0 to 1.0. Defaults to 1.0.
#default_rate: 1.0

#http:
# Set to true to enable the Jaeger HTTP collector endpoint.
#enabled: false
Expand Down
10 changes: 10 additions & 0 deletions apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,16 @@ apm-server:
# does not apply to Jaeger agents.
#auth_tag: ""

# Enable fetching the sampling rate from the gRPC endpoint. Only probabilistic
# sampling is supported. Sampling is disabled by default.
# No auth token can be configured for this endpoint.
#sampling:
#enabled: false
# Set the default sampling rate used when no service specific
# sampling rate is configured.
# Valid range from 0 to 1.0. Defaults to 1.0.
#default_rate: 1.0

#http:
# Set to true to enable the Jaeger HTTP collector endpoint.
#enabled: false
Expand Down
9 changes: 3 additions & 6 deletions beater/api/config/agent/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"

"github.com/elastic/apm-server/agentcfg"
Expand Down Expand Up @@ -56,9 +55,7 @@ var (

errMsgKibanaDisabled = errors.New(msgKibanaDisabled)
errMsgNoKibanaConnection = errors.New(msgNoKibanaConnection)

minKibanaVersion = common.MustNewVersion("7.5.0")
errCacheControl = fmt.Sprintf("max-age=%v, must-revalidate", errMaxAgeDuration.Seconds())
errCacheControl = fmt.Sprintf("max-age=%v, must-revalidate", errMaxAgeDuration.Seconds())
)

// Handler returns a request.Handler for managing agent central configuration requests.
Expand Down Expand Up @@ -121,7 +118,7 @@ func validateClient(c *request.Context, client kibana.Client, withAuth bool) boo
return false
}

if supported, err := client.SupportsVersion(c.Request.Context(), minKibanaVersion, true); !supported {
if supported, err := client.SupportsVersion(c.Request.Context(), kibana.AgentConfigMinVersion, true); !supported {
if err != nil {
c.Result.Set(request.IDResponseErrorsServiceUnavailable,
http.StatusServiceUnavailable,
Expand All @@ -134,7 +131,7 @@ func validateClient(c *request.Context, client kibana.Client, withAuth bool) boo
version, _ := client.GetVersion(c.Request.Context())

errMsg := fmt.Sprintf("%s: min version %+v, configured version %+v",
msgKibanaVersionNotCompatible, minKibanaVersion, version.String())
msgKibanaVersionNotCompatible, kibana.AgentConfigMinVersion, version.String())
body := authErrMsg(errMsg, msgKibanaVersionNotCompatible, withAuth)
c.Result.Set(request.IDResponseErrorsServiceUnavailable,
http.StatusServiceUnavailable,
Expand Down
16 changes: 10 additions & 6 deletions beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,14 @@ func Test_UnpackConfig(t *testing.T) {
},
},
},
"kibana": map[string]interface{}{"enabled": "true"},
"agent.config.cache.expiration": "2m",
"jaeger.grpc.enabled": true,
"jaeger.grpc.host": "localhost:12345",
"jaeger.http.enabled": true,
"jaeger.http.host": "localhost:6789",
"kibana": map[string]interface{}{"enabled": "true"},
"agent.config.cache.expiration": "2m",
"jaeger.grpc.enabled": true,
"jaeger.grpc.host": "localhost:12345",
"jaeger.grpc.sampling.enabled": true,
"jaeger.grpc.sampling.default_rate": 0.8,
"jaeger.http.enabled": true,
"jaeger.http.host": "localhost:6789",
"api_key": map[string]interface{}{
"enabled": true,
"limit": 200,
Expand Down Expand Up @@ -183,6 +185,7 @@ func Test_UnpackConfig(t *testing.T) {
require.NoError(t, err)
return tlsServerConfig.BuildModuleConfig("localhost:12345")
}(),
Sampling: Sampling{DefaultRate: 0.8, Enabled: true},
},
HTTP: JaegerHTTPConfig{
Enabled: true,
Expand Down Expand Up @@ -290,6 +293,7 @@ func Test_UnpackConfig(t *testing.T) {
require.NoError(t, err)
return tlsServerConfig.BuildModuleConfig("localhost:14250")
}(),
Sampling: Sampling{DefaultRate: 1.0, Enabled: false},
},
HTTP: JaegerHTTPConfig{
Enabled: false,
Expand Down
21 changes: 15 additions & 6 deletions beater/config/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,18 @@ type JaegerConfig struct {

// JaegerGRPCConfig holds configuration for the Jaeger gRPC server.
type JaegerGRPCConfig struct {
AuthTag string `config:"auth_tag"`
Enabled bool `config:"enabled"`
Host string `config:"host"`
TLS *tls.Config `config:"-"`
AuthTag string `config:"auth_tag"`
Enabled bool `config:"enabled"`
Host string `config:"host"`
TLS *tls.Config `config:"-"`
Sampling Sampling `config:"sampling"`
}

// Sampling defines if the sampling endpoint is enabled
// and the default rate for probabilistic sampling.
type Sampling struct {
Enabled bool `config:"enabled"`
DefaultRate float64 `config:"default_rate"`
}

// JaegerHTTPConfig holds configuration for the Jaeger HTTP server.
Expand All @@ -65,8 +73,9 @@ func (c *JaegerConfig) setup(cfg *Config) error {
func defaultJaeger() JaegerConfig {
return JaegerConfig{
GRPC: JaegerGRPCConfig{
Enabled: false,
Host: defaultJaegerGRPCHost,
Enabled: false,
Host: defaultJaegerGRPCHost,
Sampling: Sampling{Enabled: false, DefaultRate: 1},
},
HTTP: JaegerHTTPConfig{
Enabled: false,
Expand Down
5 changes: 3 additions & 2 deletions beater/config/jaeger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (
func TestJaeger_default(t *testing.T) {
expected := JaegerConfig{
GRPC: JaegerGRPCConfig{
Enabled: false,
Host: "localhost:14250",
Enabled: false,
Host: "localhost:14250",
Sampling: Sampling{Enabled: false, DefaultRate: 1},
},
HTTP: JaegerHTTPConfig{
Enabled: false,
Expand Down
95 changes: 87 additions & 8 deletions beater/jaeger/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,33 @@ package jaeger

import (
"context"
"fmt"
"strconv"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"

"github.com/elastic/apm-server/agentcfg"
"github.com/elastic/apm-server/beater/request"
"github.com/elastic/apm-server/kibana"
)

var (
gRPCRegistry = monitoring.Default.NewRegistry("apm-server.jaeger.grpc", monitoring.PublishExpvar)
gRPCMonitoringMap monitoringMap = request.MonitoringMapForRegistry(gRPCRegistry, monitoringKeys)
gRPCCollectorRegistry = monitoring.Default.NewRegistry("apm-server.jaeger.grpc.collect", monitoring.PublishExpvar)
gRPCCollectorMonitoringMap monitoringMap = request.MonitoringMapForRegistry(gRPCCollectorRegistry, monitoringKeys)
)

// grpcCollector implements Jaeger api_v2 protocol for receiving tracing data
type grpcCollector struct {
log *logp.Logger
auth authFunc
consumer consumer.TraceConsumer
}
Expand All @@ -47,21 +55,92 @@ type grpcCollector struct {
// The implementation of the protobuf contract is based on the open-telemetry implementation at
// https://github.com/open-telemetry/opentelemetry-collector/tree/master/receiver/jaegerreceiver
func (c grpcCollector) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
gRPCMonitoringMap.inc(request.IDRequestCount)
defer gRPCMonitoringMap.inc(request.IDResponseCount)
gRPCCollectorMonitoringMap.inc(request.IDRequestCount)
defer gRPCCollectorMonitoringMap.inc(request.IDResponseCount)

if err := c.postSpans(ctx, r.Batch); err != nil {
gRPCMonitoringMap.inc(request.IDResponseErrorsCount)
gRPCCollectorMonitoringMap.inc(request.IDResponseErrorsCount)
c.log.With(zap.Error(err)).Error("error gRPC PostSpans")
return nil, err
}
gRPCMonitoringMap.inc(request.IDResponseValidCount)
gRPCCollectorMonitoringMap.inc(request.IDResponseValidCount)
return &api_v2.PostSpansResponse{}, nil
}

func (c grpcCollector) postSpans(ctx context.Context, batch model.Batch) error {
if err := c.auth(ctx, batch); err != nil {
gRPCMonitoringMap.inc(request.IDResponseErrorsUnauthorized)
gRPCCollectorMonitoringMap.inc(request.IDResponseErrorsUnauthorized)
return status.Error(codes.Unauthenticated, err.Error())
}
return consumeBatch(ctx, batch, c.consumer, gRPCMonitoringMap)
return consumeBatch(ctx, batch, c.consumer, gRPCCollectorMonitoringMap)
}

const samplingRateKey = "transaction_sample_rate"

var (
gRPCSamplingRegistry = monitoring.Default.NewRegistry("apm-server.jaeger.grpc.sampling", monitoring.PublishExpvar)
gRPCSamplingMonitoringMap monitoringMap = request.MonitoringMapForRegistry(gRPCSamplingRegistry, monitoringKeys)
)

type grpcSampler struct {
log *logp.Logger
defaultRate float64
client kibana.Client
fetcher *agentcfg.Fetcher
}

// GetSamplingStrategy implements the api_v2/sampling.proto.
// Only probabilistic sampling is supported.
// It fetches the sampling rate from the central configuration management and returns the sampling strategy.
func (s grpcSampler) GetSamplingStrategy(
ctx context.Context,
params *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) {
gRPCSamplingMonitoringMap.inc(request.IDRequestCount)
defer gRPCSamplingMonitoringMap.inc(request.IDResponseCount)
samplingRate, err := s.fetchSamplingRate(ctx, params.ServiceName)
if err != nil {
s.log.With(zap.Error(err)).Error("Fetching sampling rate failed, falling back to default value.")
samplingRate = s.defaultRate
}
gRPCSamplingMonitoringMap.inc(request.IDResponseValidCount)
return &api_v2.SamplingStrategyResponse{
StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC,
ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{SamplingRate: samplingRate}}, nil
}

func (s grpcSampler) fetchSamplingRate(ctx context.Context, service string) (float64, error) {
if err := s.validateKibanaClient(ctx); err != nil {
return 0, err
}
result, err := s.fetcher.Fetch(ctx, agentcfg.NewQuery(service, ""))
if err != nil {
return 0, fmt.Errorf("fetching sampling rate from Kibana client failed: %w", err)
}

if sr, ok := result.Source.Settings[samplingRateKey]; ok {
srFloat64, err := strconv.ParseFloat(sr, 64)
if err != nil {
return 0, fmt.Errorf("parsing error for sample rate `%v`: %w", sr, err)
}
return srFloat64, nil
}
s.log.Debugf("No sampling rate found for %v, falling back to default value.", service)
return s.defaultRate, nil
}

func (s grpcSampler) validateKibanaClient(ctx context.Context) error {
supported, err := s.client.SupportsVersion(ctx, kibana.AgentConfigMinVersion, true)
if err != nil {
return fmt.Errorf("error checking kibana version: %w", err)
}
if !supported {
version := "unknown"
if ver, err := s.client.GetVersion(ctx); err == nil {
version = ver.String()
}
return fmt.Errorf("not supported by Kibana version %v, min Kibana version: %v, ",
version,
kibana.AgentConfigMinVersion)
}
return nil
}
Loading

0 comments on commit ee24b29

Please sign in to comment.