Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support secure gRPC channel between agent and collector #1391

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/agent/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestBingFlags(t *testing.T) {
func TestBindFlags(t *testing.T) {
v := viper.New()
b := &Builder{}
command := cobra.Command{}
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestBingFlags(t *testing.T) {
func TestBindFlags(t *testing.T) {
v := viper.New()
command := cobra.Command{}
flags := &flag.FlagSet{}
Expand Down
27 changes: 26 additions & 1 deletion cmd/agent/app/reporter/grpc/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
package grpc

import (
"crypto/x509"
"errors"

"github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"

Expand All @@ -37,11 +39,34 @@ type ProxyBuilder struct {
conn *grpc.ClientConn
}

var systemCertPool = x509.SystemCertPool // to allow overriding in unit test

// NewCollectorProxy creates ProxyBuilder
func NewCollectorProxy(o *Options, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
if len(o.CollectorHostPort) == 0 {
return nil, errors.New("could not create collector proxy, address is missing")
}
var dialOption grpc.DialOption
if o.TLS { // user requested a secure connection
var creds credentials.TransportCredentials
if len(o.TLSCA) == 0 { // no truststore given, use SystemCertPool
pool, err := systemCertPool()
if err != nil {
return nil, err
}
creds = credentials.NewClientTLSFromCert(pool, o.TLSServerName)
} else { // setup user specified truststore
var err error
creds, err = credentials.NewClientTLSFromFile(o.TLSCA, o.TLSServerName)
if err != nil {
return nil, err
}
}
dialOption = grpc.WithTransportCredentials(creds)
} else { // insecure connection
dialOption = grpc.WithInsecure()
}

var target string
if len(o.CollectorHostPort) > 1 {
r, _ := manual.GenerateAndRegisterManualResolver()
Expand All @@ -56,7 +81,7 @@ func NewCollectorProxy(o *Options, mFactory metrics.Factory, logger *zap.Logger)
}
// It does not return error if the collector is not running
conn, _ := grpc.Dial(target,
grpc.WithInsecure(),
dialOption,
grpc.WithBalancerName(roundrobin.Name),
grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(o.MaxRetry))))
grpcMetrics := mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "grpc"}})
Expand Down
97 changes: 90 additions & 7 deletions cmd/agent/app/reporter/grpc/collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package grpc

import (
"crypto/x509"
"errors"
"io/ioutil"
"net"
"os"
"testing"
"time"

Expand All @@ -30,20 +34,99 @@ import (
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
)

const certPEM = `
-----BEGIN CERTIFICATE-----
MIICBzCCAXCgAwIBAgIQNkTaUtOczDHvL2YT/kqScTANBgkqhkiG9w0BAQsFADAX
MRUwEwYDVQQKEwxqYWdlcnRyYWNpbmcwHhcNMTkwMjA4MDYyODAyWhcNMTkwMjA4
MDcyODAyWjAXMRUwEwYDVQQKEwxqYWdlcnRyYWNpbmcwgZ8wDQYJKoZIhvcNAQEB
BQADgY0AMIGJAoGBAMcOLYflHGbqC1f7+tbnsdfcpd0rEuX65+ab0WzelAgvo988
yD+j7LDLPIE8IPk/tfqaETZ8h0LRUUTn8F2rW/wgrl/G8Onz0utog38N0elfTifG
Mu7GJCr/+aYM5xbQMDj4Brb4vhnkJF8UBe49fWILhIltUcm1SeKqVX3d1FvpAgMB
AAGjVDBSMA4GA1UdDwEB/wQEAwICpDATBgNVHSUEDDAKBggrBgEFBQcDATAPBgNV
HRMBAf8EBTADAQH/MBoGA1UdEQQTMBGCCWxvY2FsaG9zdIcEfwAAATANBgkqhkiG
9w0BAQsFAAOBgQCreFjwpAn1HqJT812JOwoWKrt1NjOKGcz7pvIs1k3DfQVLH2aZ
iPKnCkzNgxMzQtwdgpAOXIAqXyNibvyOAv1C+3QSMLKbuPEHaIxlCuvl1suX/g25
17x1o3Q64AnPCWOLpN2wjkfZqX7gZ84nsxpqb9Sbw1+2+kqX7dSZ3mfVxQ==
-----END CERTIFICATE-----`

func TestProxyBuilderMissingAddress(t *testing.T) {
proxy, err := NewCollectorProxy(&Options{}, metrics.NullFactory, zap.NewNop())
require.Nil(t, proxy)
assert.EqualError(t, err, "could not create collector proxy, address is missing")
}

func TestProxyBuilder(t *testing.T) {
proxy, err := NewCollectorProxy(&Options{CollectorHostPort: []string{"localhost:0000"}}, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
require.NotNil(t, proxy)
assert.NotNil(t, proxy.GetReporter())
assert.NotNil(t, proxy.GetManager())
assert.Nil(t, proxy.Close())
assert.EqualError(t, proxy.Close(), "rpc error: code = Canceled desc = grpc: the client connection is closing")
tmpfile, err := ioutil.TempFile("", "cert*.pem")
if err != nil {
t.Fatalf("failed to create tempfile: %s", err)
}

defer func() {
tmpfile.Close()
os.Remove(tmpfile.Name())
}()

if _, err := tmpfile.Write([]byte(certPEM)); err != nil {
t.Fatalf("failed to write cert to tempfile: %s", err)
}

tests := []struct {
name string
proxyOptions *Options
expectError bool
}{
{
name: "with insecure grpc connection",
proxyOptions: &Options{CollectorHostPort: []string{"localhost:0000"}},
expectError: false,
},
{
name: "with secure grpc connection",
proxyOptions: &Options{CollectorHostPort: []string{"localhost:0000"}, TLS: true},
expectError: false,
},
{
name: "with secure grpc connection and own CA",
proxyOptions: &Options{CollectorHostPort: []string{"localhost:0000"}, TLS: true, TLSCA: tmpfile.Name()},
expectError: false,
},
{
name: "with secure grpc connection and a CA file which does not exist",
proxyOptions: &Options{CollectorHostPort: []string{"localhost:0000"}, TLS: true, TLSCA: "/not/valid"},
expectError: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
proxy, err := NewCollectorProxy(test.proxyOptions, metrics.NullFactory, zap.NewNop())
if test.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
require.NotNil(t, proxy)

assert.NotNil(t, proxy.GetReporter())
assert.NotNil(t, proxy.GetManager())

assert.Nil(t, proxy.Close())
assert.EqualError(t, proxy.Close(), "rpc error: code = Canceled desc = grpc: the client connection is closing")
}
})
}
}

// This test is only for coverage.
func TestSystemCertPoolError(t *testing.T) {
fakeErr := errors.New("fake error")
systemCertPool = func() (*x509.CertPool, error) {
return nil, fakeErr
}
_, err := NewCollectorProxy(&Options{
CollectorHostPort: []string{"foo", "bar"},
TLS: true,
}, nil, nil)
assert.Equal(t, fakeErr, err)
}

func TestMultipleCollectors(t *testing.T) {
Expand Down
20 changes: 16 additions & 4 deletions cmd/agent/app/reporter/grpc/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,32 @@ import (
)

const (
gRPCPrefix = "reporter.grpc."
collectorHostPort = gRPCPrefix + "host-port"
retry = gRPCPrefix + "retry.max"
defaultMaxRetry = 3
gRPCPrefix = "reporter.grpc."
collectorHostPort = gRPCPrefix + "host-port"
retry = gRPCPrefix + "retry.max"
defaultMaxRetry = 3
collectorTLS = gRPCPrefix + "tls"
collectorTLSCA = gRPCPrefix + "tls.ca"
collectorTLSServerName = gRPCPrefix + "tls.server-name"
)

// Options Struct to hold configurations
type Options struct {
// CollectorHostPort is list of host:port Jaeger Collectors.
CollectorHostPort []string
MaxRetry uint
TLS bool
TLSCA string
TLSServerName string
}

// AddFlags adds flags for Options.
func AddFlags(flags *flag.FlagSet) {
flags.String(collectorHostPort, "", "Comma-separated string representing host:port of a static list of collectors to connect to directly.")
flags.Uint(retry, defaultMaxRetry, "Sets the maximum number of retries for a call.")
flags.Bool(collectorTLS, false, "Enable TLS.")
flags.String(collectorTLSCA, "", "Path to a TLS CA file. (default use the systems truststore)")
flags.String(collectorTLSServerName, "", "Override the TLS server name.")
}

// InitFromViper initializes Options with properties retrieved from Viper.
Expand All @@ -48,5 +57,8 @@ func (o *Options) InitFromViper(v *viper.Viper) *Options {
o.CollectorHostPort = strings.Split(hostPorts, ",")
}
o.MaxRetry = uint(v.GetInt(retry))
o.TLS = v.GetBool(collectorTLS)
o.TLSCA = v.GetString(collectorTLSCA)
o.TLSServerName = v.GetString(collectorTLSServerName)
return o
}
6 changes: 3 additions & 3 deletions cmd/agent/app/reporter/grpc/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import (
"github.com/stretchr/testify/require"
)

func TestBingFlags(t *testing.T) {
func TestBindFlags(t *testing.T) {
tests := []struct {
cOpts []string
expected *Options
}{
{cOpts: []string{"--reporter.grpc.host-port=localhost:1111", "--reporter.grpc.retry.max=15"},
expected: &Options{CollectorHostPort: []string{"localhost:1111"}, MaxRetry:15}},
expected: &Options{CollectorHostPort: []string{"localhost:1111"}, MaxRetry: 15}},
{cOpts: []string{"--reporter.grpc.host-port=localhost:1111,localhost:2222"},
expected: &Options{CollectorHostPort: []string{"localhost:1111", "localhost:2222"}, MaxRetry:defaultMaxRetry}},
expected: &Options{CollectorHostPort: []string{"localhost:1111", "localhost:2222"}, MaxRetry: defaultMaxRetry}},
}
for _, test := range tests {
v := viper.New()
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/tchannel/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"go.uber.org/zap"
)

func TestBingFlags(t *testing.T) {
func TestBindFlags(t *testing.T) {
tests := []struct {
flags []string
builder Builder
Expand Down
15 changes: 15 additions & 0 deletions cmd/collector/app/builder/builder_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (
collectorPort = "collector.port"
collectorHTTPPort = "collector.http-port"
collectorGRPCPort = "collector.grpc-port"
collectorGRPCTLS = "collector.grpc.tls"
collectorGRPCCert = "collector.grpc.tls.cert"
collectorGRPCKey = "collector.grpc.tls.key"
collectorZipkinHTTPort = "collector.zipkin.http-port"

defaultTChannelPort = 14267
Expand All @@ -49,6 +52,12 @@ type CollectorOptions struct {
CollectorHTTPPort int
// CollectorGRPCPort is the port that the collector service listens in on for gRPC requests
CollectorGRPCPort int
// CollectorGRPCTLS defines if the server is setup with TLS
CollectorGRPCTLS bool
// CollectorGRPCCert is the path to a TLS certificate file for the server
CollectorGRPCCert string
// CollectorGRPCKey is the path to a TLS key file for the server
CollectorGRPCKey string
// CollectorZipkinHTTPPort is the port that the Zipkin collector service listens in on for http requests
CollectorZipkinHTTPPort int
}
Expand All @@ -61,6 +70,9 @@ func AddFlags(flags *flag.FlagSet) {
flags.Int(collectorHTTPPort, defaultHTTPPort, "The HTTP port for the collector service")
flags.Int(collectorGRPCPort, defaultGRPCPort, "The gRPC port for the collector service")
flags.Int(collectorZipkinHTTPort, 0, "The HTTP port for the Zipkin collector service e.g. 9411")
flags.Bool(collectorGRPCTLS, false, "Enable TLS")
flags.String(collectorGRPCCert, "", "Path to TLS certificate file")
flags.String(collectorGRPCKey, "", "Path to TLS key file")
}

// InitFromViper initializes CollectorOptions with properties from viper
Expand All @@ -70,6 +82,9 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions {
cOpts.CollectorPort = v.GetInt(collectorPort)
cOpts.CollectorHTTPPort = v.GetInt(collectorHTTPPort)
cOpts.CollectorGRPCPort = v.GetInt(collectorGRPCPort)
cOpts.CollectorGRPCTLS = v.GetBool(collectorGRPCTLS)
cOpts.CollectorGRPCCert = v.GetString(collectorGRPCCert)
cOpts.CollectorGRPCKey = v.GetString(collectorGRPCKey)
cOpts.CollectorZipkinHTTPPort = v.GetInt(collectorZipkinHTTPort)
return cOpts
}
25 changes: 21 additions & 4 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/uber/tchannel-go/thrift"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

basicB "github.com/jaegertracing/jaeger/cmd/builder"
"github.com/jaegertracing/jaeger/cmd/collector/app"
Expand Down Expand Up @@ -140,7 +141,7 @@ func main() {
ch.Serve(listener)
}

server, err := startGRPCServer(builderOpts.CollectorGRPCPort, grpcHandler, strategyStore, logger)
server, err := startGRPCServer(builderOpts, grpcHandler, strategyStore, logger)
if err != nil {
logger.Fatal("Could not start gRPC collector", zap.Error(err))
}
Expand Down Expand Up @@ -207,13 +208,29 @@ func main() {
}

func startGRPCServer(
port int,
opts *builder.CollectorOptions,
handler *app.GRPCHandler,
samplingStore strategystore.StrategyStore,
logger *zap.Logger,
) (*grpc.Server, error) {
server := grpc.NewServer()
_, err := grpcserver.StartGRPCCollector(port, server, handler, samplingStore, logger, func(err error) {
var server *grpc.Server

if opts.CollectorGRPCTLS { // user requested a server with TLS, setup creds
if opts.CollectorGRPCCert == "" || opts.CollectorGRPCKey == "" {
return nil, fmt.Errorf("you requested TLS but configuration does not include a path to cert and/or key")
}
creds, err := credentials.NewServerTLSFromFile(
opts.CollectorGRPCCert,
opts.CollectorGRPCKey,
)
if err != nil {
return nil, fmt.Errorf("failed to load TLS keys: %s", err)
}
server = grpc.NewServer(grpc.Creds(creds))
} else { // server without TLS
server = grpc.NewServer()
}
_, err := grpcserver.StartGRPCCollector(opts.CollectorGRPCPort, server, handler, samplingStore, logger, func(err error) {
logger.Fatal("gRPC collector failed", zap.Error(err))
})
if err != nil {
Expand Down