Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow thrift reporter even if grpc hosts are not provided #1400

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions cmd/agent/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,22 @@ package app

import (
"fmt"
"io/ioutil"
"net/http"
"os"

"github.com/apache/thrift/lib/go/thrift"
"github.com/pkg/errors"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"google.golang.org/grpc/grpclog"

"github.com/jaegertracing/jaeger/cmd/agent/app/configmanager"
"github.com/jaegertracing/jaeger/cmd/agent/app/httpserver"
"github.com/jaegertracing/jaeger/cmd/agent/app/processors"
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc"
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel"
"github.com/jaegertracing/jaeger/cmd/agent/app/servers"
"github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp"
zipkinThrift "github.com/jaegertracing/jaeger/thrift-gen/agent"
Expand Down Expand Up @@ -209,3 +214,30 @@ func defaultInt(value int, defaultVal int) int {
}
return value
}

// CreateCollectorProxy creates collector proxy
func CreateCollectorProxy(
opts *reporter.Options,
tchanRep *tchannel.Builder,
grpcRepOpts *grpc.Options,
logger *zap.Logger,
mFactory metrics.Factory,
) (CollectorProxy, error) {
// GRPC type is set as default in viper, but we check for legacy flags
// to keep backward compatibility
if opts.ReporterType == reporter.GRPC &&
len(tchanRep.CollectorHostPorts) > 0 &&
len(grpcRepOpts.CollectorHostPort) == 0 {
logger.Warn("Using deprecated configuration", zap.String("option", "--collector-host.port"))
return tchannel.NewCollectorProxy(tchanRep, mFactory, logger)
}
switch opts.ReporterType {
case reporter.GRPC:
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
return grpc.NewCollectorProxy(grpcRepOpts, mFactory, logger)
case reporter.TCHANNEL:
return tchannel.NewCollectorProxy(tchanRep, mFactory, logger)
default:
return nil, errors.New(fmt.Sprintf("unknown reporter type %s", string(opts.ReporterType)))
}
}
88 changes: 88 additions & 0 deletions cmd/agent/app/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,23 @@ package app

import (
"errors"
"flag"
"fmt"
"strings"
"testing"
"time"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/zap"
"gopkg.in/yaml.v2"

"github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc"
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel"
"github.com/jaegertracing/jaeger/cmd/agent/app/configmanager"
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
Expand Down Expand Up @@ -179,3 +186,84 @@ func (f fakeCollectorProxy) GetSamplingStrategy(serviceName string) (*sampling.S
func (fakeCollectorProxy) GetBaggageRestrictions(serviceName string) ([]*baggage.BaggageRestriction, error) {
return nil, nil
}


func TestCreateCollectorProxy(t *testing.T) {
tests := []struct{
flags []string
err string
metric metricstest.ExpectedMetric
}{
{
err: "could not create collector proxy, address is missing",
},
{
flags: []string{"--collector.host-port=foo"},
metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "tchannel", "format": "jaeger"}, Value: 1},
},
{
flags: []string{"--reporter.type=tchannel"},
metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "tchannel", "format": "jaeger"}, Value: 1},
},
{
flags: []string{"--reporter.type=tchannel", "--collector.host-port=foo"},
metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "tchannel", "format": "jaeger"}, Value: 1},
},
{
flags: []string{"--reporter.type=grpc", "--collector.host-port=foo"},
metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "tchannel", "format": "jaeger"}, Value: 1},
},
{
flags: []string{"--reporter.type=grpc", "--collector.host-port=foo"},
metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "tchannel", "format": "jaeger"}, Value: 1},
},
{
flags: []string{"--reporter.type=grpc", "--reporter.grpc.host-port=foo", "--collector.host-port=foo"},
metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "grpc", "format": "jaeger"}, Value: 1},
},
{
flags: []string{"--reporter.type=grpc", "--reporter.grpc.host-port=foo"},
metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "grpc", "format": "jaeger"}, Value: 1},
},
}

for _, test := range tests {
flags := &flag.FlagSet{}
tchannel.AddFlags(flags)
grpc.AddFlags(flags)
reporter.AddFlags(flags)

command := cobra.Command{}
command.PersistentFlags().AddGoFlagSet(flags)
v := viper.New()
v.BindPFlags(command.PersistentFlags())

err := command.ParseFlags(test.flags)
require.NoError(t, err)

rOpts := new(reporter.Options).InitFromViper(v)
tchan := tchannel.NewBuilder().InitFromViper(v, zap.NewNop())
grpcOpts := new(grpc.Options).InitFromViper(v)

metricsFactory := metricstest.NewFactory(time.Microsecond)
proxy, err := CreateCollectorProxy(rOpts, tchan, grpcOpts, zap.NewNop(), metricsFactory)
if test.err != "" {
assert.EqualError(t, err, test.err)
assert.Nil(t, proxy)
} else {
require.NoError(t, err)
proxy.GetReporter().EmitBatch(jaeger.NewBatch())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the batch being sent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nowhere I just need metrics to log it

metricsFactory.AssertCounterMetrics(t, test.metric)
}
}
}

func TestCreateCollectorProxy_UnknownReporter(t *testing.T) {
rOpts := new(reporter.Options)
tchan := tchannel.NewBuilder()
grpcOpts := new(grpc.Options)

proxy, err := CreateCollectorProxy(rOpts, tchan, grpcOpts, zap.NewNop(), metrics.NullFactory)
assert.Nil(t, proxy)
assert.EqualError(t, err, "unknown reporter type ")
}
3 changes: 2 additions & 1 deletion cmd/agent/app/reporter/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func TestMetricsReporter(t *testing.T) {
err := reporter.EmitBatch(&jaeger.Batch{Spans: []*jaeger.Span{{}}})
require.Error(t, err)
}, rep: &noopReporter{err: errors.New("foo")}},
{expectedCounters: []metricstest.ExpectedMetric{
{expectedCounters:
[]metricstest.ExpectedMetric{
{Name: "reporter.batches.failures", Tags: map[string]string{"format": "zipkin"}, Value: 1},
{Name: "reporter.spans.failures", Tags: map[string]string{"format": "zipkin"}, Value: 2},
}, expectedGauges: []metricstest.ExpectedMetric{
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/tchannel/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func AddFlags(flags *flag.FlagSet) {
tchannelPrefix+reportTimeout,
time.Second,
"sets the timeout used when reporting spans")
// TODO remove deprecated in 1.9
// TODO remove deprecated in 2.0
flags.String(
collectorHostPort,
"",
Expand Down
22 changes: 1 addition & 21 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package main
import (
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/signal"
Expand All @@ -28,7 +27,6 @@ import (
"github.com/spf13/viper"
jMetrics "github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"google.golang.org/grpc/grpclog"

"github.com/jaegertracing/jaeger/cmd/agent/app"
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
Expand Down Expand Up @@ -73,7 +71,7 @@ func main() {
rOpts := new(reporter.Options).InitFromViper(v)
tChanOpts := new(tchannel.Builder).InitFromViper(v, logger)
grpcOpts := new(grpc.Options).InitFromViper(v)
cp, err := createCollectorProxy(rOpts, tChanOpts, grpcOpts, logger, mFactory)
cp, err := app.CreateCollectorProxy(rOpts, tChanOpts, grpcOpts, logger, mFactory)
if err != nil {
logger.Fatal("Could not create collector proxy", zap.Error(err))
}
Expand Down Expand Up @@ -123,21 +121,3 @@ func main() {
os.Exit(1)
}
}

func createCollectorProxy(
opts *reporter.Options,
tchanRep *tchannel.Builder,
grpcRepOpts *grpc.Options,
logger *zap.Logger,
mFactory jMetrics.Factory,
) (app.CollectorProxy, error) {
switch opts.ReporterType {
case reporter.GRPC:
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
return grpc.NewCollectorProxy(grpcRepOpts, mFactory, logger)
case reporter.TCHANNEL:
return tchannel.NewCollectorProxy(tchanRep, mFactory, logger)
default:
return nil, errors.New(fmt.Sprintf("unknown reporter type %s", string(opts.ReporterType)))
}
}
23 changes: 2 additions & 21 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ func startAgent(
) {
metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil})

cp, err := createCollectorProxy(cOpts, repOpts, tchanRep, grpcRepOpts, logger, metricsFactory)
grpcRepOpts.CollectorHostPort = append(grpcRepOpts.CollectorHostPort, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorGRPCPort))
cp, err := agentApp.CreateCollectorProxy(repOpts, tchanRep, grpcRepOpts, logger, metricsFactory)
if err != nil {
logger.Fatal("Could not create collector proxy", zap.Error(err))
}
Expand All @@ -213,26 +214,6 @@ func startAgent(
}
}

func createCollectorProxy(
cOpts *collector.CollectorOptions,
repOpts *agentRep.Options,
tchanRepOpts *agentTchanRep.Builder,
grpcRepOpts *agentGrpcRep.Options,
logger *zap.Logger,
mFactory metrics.Factory,
) (agentApp.CollectorProxy, error) {
switch repOpts.ReporterType {
case agentRep.GRPC:
grpcRepOpts.CollectorHostPort = append(grpcRepOpts.CollectorHostPort, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorGRPCPort))
return agentGrpcRep.NewCollectorProxy(grpcRepOpts, mFactory, logger)
case agentRep.TCHANNEL:
tchanRepOpts.CollectorHostPorts = append(tchanRepOpts.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorPort))
return agentTchanRep.NewCollectorProxy(tchanRepOpts, mFactory, logger)
default:
return nil, errors.New(fmt.Sprintf("unknown reporter type %s", string(repOpts.ReporterType)))
}
}

func startCollector(
cOpts *collector.CollectorOptions,
spanWriter spanstore.Writer,
Expand Down