Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Jaeger's GRPC handler to flow the tenant from an HTTP header #3688

Merged
merged 2 commits into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cmd/collector/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/ports"
)
Expand Down Expand Up @@ -152,6 +153,8 @@ type GRPCOptions struct {
// MaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
// See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace.
MaxConnectionAgeGrace time.Duration
// Tenancy configures tenancy for endpoints that collect spans
Tenancy tenancy.Options
}

// AddFlags adds flags for CollectorOptions
Expand All @@ -172,6 +175,8 @@ func AddFlags(flags *flag.FlagSet) {
flags.String(flagZipkinAllowedOrigins, "*", "Comma separated list of allowed origins for the Zipkin collector service, default accepts all")
flags.String(flagZipkinHTTPHostPort, "", "The host:port (e.g. 127.0.0.1:9411 or :9411) of the collector's Zipkin server (disabled by default)")
tlsZipkinFlagsConfig.AddFlags(flags)

tenancy.AddFlags(flags)
}

func addHTTPFlags(flags *flag.FlagSet, cfg serverFlagsConfig, defaultHostPort string) {
Expand Down Expand Up @@ -219,6 +224,11 @@ func (opts *GRPCOptions) initFromViper(v *viper.Viper, logger *zap.Logger, cfg s
} else {
return fmt.Errorf("failed to parse gRPC TLS options: %w", err)
}
if tenancy, err := tenancy.InitFromViper(v); err == nil {
opts.Tenancy = tenancy
} else {
return fmt.Errorf("failed to parse Tenancy options: %w", err)
}

return nil
}
Expand Down
36 changes: 36 additions & 0 deletions cmd/collector/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,39 @@ func TestCollectorOptionsWithFlags_CheckMaxConnectionAge(t *testing.T) {
assert.Equal(t, 5*time.Minute, c.GRPC.MaxConnectionAge)
assert.Equal(t, time.Minute, c.GRPC.MaxConnectionAgeGrace)
}

func TestCollectorOptionsWithFlags_CheckNoTenancy(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{})
c.InitFromViper(v, zap.NewNop())

assert.Equal(t, false, c.GRPC.Tenancy.Enabled)
}

func TestCollectorOptionsWithFlags_CheckSimpleTenancy(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--multi_tenancy.enabled=true",
})
c.InitFromViper(v, zap.NewNop())

assert.Equal(t, true, c.GRPC.Tenancy.Enabled)
assert.Equal(t, "x-tenant", c.GRPC.Tenancy.Header)
}

func TestCollectorOptionsWithFlags_CheckFullTenancy(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--multi_tenancy.enabled=true",
"--multi_tenancy.header=custom-tenant-header",
"--multi_tenancy.tenants=acme,hardware-store",
})
c.InitFromViper(v, zap.NewNop())

assert.Equal(t, true, c.GRPC.Tenancy.Enabled)
assert.Equal(t, "custom-tenant-header", c.GRPC.Tenancy.Header)
assert.Equal(t, []string{"acme", "hardware-store"}, c.GRPC.Tenancy.Tenants)
}
41 changes: 38 additions & 3 deletions cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,24 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/encoding/gzip" // register zip encoding
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

// GRPCHandler implements gRPC CollectorService.
type GRPCHandler struct {
logger *zap.Logger
batchConsumer batchConsumer
tenancyConfig *tenancy.TenancyConfig
}

// NewGRPCHandler registers routes for this handler on the given router.
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor) *GRPCHandler {
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyConfig *tenancy.TenancyConfig) *GRPCHandler {
return &GRPCHandler{
logger: logger,
batchConsumer: batchConsumer{
Expand All @@ -45,13 +48,20 @@ func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor) *
SpanFormat: processor.ProtoSpanFormat,
},
},
tenancyConfig: tenancyConfig,
}
}

// PostSpans implements gRPC CollectorService.
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
tenant, err := g.validateTenant(ctx)
if err != nil {
g.logger.Error("rejecting spans (tenancy)", zap.Error(err))
return nil, err
}

batch := &r.Batch
err := g.batchConsumer.consume(batch)
err = g.batchConsumer.consume(batch, tenant)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest changing the signature to consume(context, batch) and handling tenancy inside. This way it can be reused in OTLP receiver.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(ok to do in another PR if you prefer)

return &api_v2.PostSpansResponse{}, err
}

Expand All @@ -61,7 +71,7 @@ type batchConsumer struct {
spanOptions processor.SpansOptions
}

func (c *batchConsumer) consume(batch *model.Batch) error {
func (c *batchConsumer) consume(batch *model.Batch, tenant string) error {
for _, span := range batch.Spans {
if span.GetProcess() == nil {
span.Process = batch.Process
Expand All @@ -70,6 +80,7 @@ func (c *batchConsumer) consume(batch *model.Batch) error {
_, err := c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{
InboundTransport: processor.GRPCTransport,
SpanFormat: processor.ProtoSpanFormat,
Tenant: tenant,
})
if err != nil {
if err == processor.ErrBusy {
Expand All @@ -80,3 +91,27 @@ func (c *batchConsumer) consume(batch *model.Batch) error {
}
return nil
}

func (g *GRPCHandler) validateTenant(ctx context.Context) (string, error) {
if !g.tenancyConfig.Enabled {
return "", nil
}

md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
}

tenants := md[g.tenancyConfig.Header]
if len(tenants) < 1 {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
} else if len(tenants) > 1 {
return "", status.Errorf(codes.PermissionDenied, "extra tenant header")
}

if !g.tenancyConfig.Valid(tenants[0]) {
return "", status.Errorf(codes.PermissionDenied, "unknown tenant")
}

return tenants[0], nil
}
Loading