From 6e5c7d340f7d0d53026ba228379a0a68e06342dd Mon Sep 17 00:00:00 2001 From: shustsud <51769018+shustsud@users.noreply.github.com> Date: Wed, 15 Apr 2020 03:50:43 +0900 Subject: [PATCH] Fixed tls connection issue (#220) * Fixed Lookup service. --- pulsar/client_impl.go | 2 +- pulsar/internal/lookup_service.go | 11 ++- pulsar/internal/lookup_service_test.go | 119 +++++++++++++++++++++++-- 3 files changed, 124 insertions(+), 8 deletions(-) diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 6dfda154ee..f768389a64 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -95,7 +95,7 @@ func newClient(options ClientOptions) (Client, error) { cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout), } c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout) - c.lookupService = internal.NewLookupService(c.rpcClient, url) + c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig != nil) c.handlers = internal.NewClientHandlers() return c, nil } diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go index eed65b170d..54d15efb96 100644 --- a/pulsar/internal/lookup_service.go +++ b/pulsar/internal/lookup_service.go @@ -44,19 +44,26 @@ type LookupService interface { type lookupService struct { rpcClient RPCClient serviceURL *url.URL + tlsEnabled bool } // NewLookupService init a lookup service struct and return an object of LookupService. -func NewLookupService(rpcClient RPCClient, serviceURL *url.URL) LookupService { +func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, tlsEnabled bool) LookupService { return &lookupService{ rpcClient: rpcClient, serviceURL: serviceURL, + tlsEnabled: tlsEnabled, } } func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse) (logicalAddress *url.URL, physicalAddress *url.URL, err error) { - logicalAddress, err = url.ParseRequestURI(lr.GetBrokerServiceUrl()) + if ls.tlsEnabled { + logicalAddress, err = url.ParseRequestURI(lr.GetBrokerServiceUrlTls()) + } else { + logicalAddress, err = url.ParseRequestURI(lr.GetBrokerServiceUrl()) + } + if err != nil { return nil, nil, err } diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go index 5bb17242cf..2eeb566240 100644 --- a/pulsar/internal/lookup_service_test.go +++ b/pulsar/internal/lookup_service_test.go @@ -127,7 +127,7 @@ func TestLookupSuccess(t *testing.T) { BrokerServiceUrl: proto.String("pulsar://broker-1:6650"), }, }, - }, url) + }, url, false) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -137,6 +137,38 @@ func TestLookupSuccess(t *testing.T) { assert.Equal(t, "pulsar://broker-1:6650", lr.PhysicalAddr.String()) } +func TestTlsLookupSuccess(t *testing.T) { + url, err := url.Parse("pulsar+ssl://example:6651") + assert.NoError(t, err) + + ls := NewLookupService(&mockedRPCClient{ + t: t, + + expectedRequests: []pb.CommandLookupTopic{ + { + RequestId: proto.Uint64(1), + Topic: proto.String("my-topic"), + Authoritative: proto.Bool(false), + }, + }, + mockedResponses: []pb.CommandLookupTopicResponse{ + { + RequestId: proto.Uint64(1), + Response: responseType(pb.CommandLookupTopicResponse_Connect), + Authoritative: proto.Bool(true), + BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"), + }, + }, + }, url, true) + + lr, err := ls.Lookup("my-topic") + assert.NoError(t, err) + assert.NotNil(t, lr) + + assert.Equal(t, "pulsar+ssl://broker-1:6651", lr.LogicalAddr.String()) + assert.Equal(t, "pulsar+ssl://broker-1:6651", lr.PhysicalAddr.String()) +} + func TestLookupWithProxy(t *testing.T) { url, err := url.Parse("pulsar://example:6650") assert.NoError(t, err) @@ -160,7 +192,7 @@ func TestLookupWithProxy(t *testing.T) { ProxyThroughServiceUrl: proto.Bool(true), }, }, - }, url) + }, url, false) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -170,6 +202,39 @@ func TestLookupWithProxy(t *testing.T) { assert.Equal(t, "pulsar://example:6650", lr.PhysicalAddr.String()) } +func TestTlsLookupWithProxy(t *testing.T) { + url, err := url.Parse("pulsar+ssl://example:6651") + assert.NoError(t, err) + + ls := NewLookupService(&mockedRPCClient{ + t: t, + + expectedRequests: []pb.CommandLookupTopic{ + { + RequestId: proto.Uint64(1), + Topic: proto.String("my-topic"), + Authoritative: proto.Bool(false), + }, + }, + mockedResponses: []pb.CommandLookupTopicResponse{ + { + RequestId: proto.Uint64(1), + Response: responseType(pb.CommandLookupTopicResponse_Connect), + Authoritative: proto.Bool(true), + BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"), + ProxyThroughServiceUrl: proto.Bool(true), + }, + }, + }, url, true) + + lr, err := ls.Lookup("my-topic") + assert.NoError(t, err) + assert.NotNil(t, lr) + + assert.Equal(t, "pulsar+ssl://broker-1:6651", lr.LogicalAddr.String()) + assert.Equal(t, "pulsar+ssl://example:6651", lr.PhysicalAddr.String()) +} + func TestLookupWithRedirect(t *testing.T) { url, err := url.Parse("pulsar://example:6650") assert.NoError(t, err) @@ -204,7 +269,7 @@ func TestLookupWithRedirect(t *testing.T) { BrokerServiceUrl: proto.String("pulsar://broker-1:6650"), }, }, - }, url) + }, url, false) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -214,6 +279,50 @@ func TestLookupWithRedirect(t *testing.T) { assert.Equal(t, "pulsar://broker-1:6650", lr.PhysicalAddr.String()) } +func TestTlsLookupWithRedirect(t *testing.T) { + url, err := url.Parse("pulsar+ssl://example:6651") + assert.NoError(t, err) + + ls := NewLookupService(&mockedRPCClient{ + t: t, + expectedURL: "pulsar+ssl://broker-2:6651", + + expectedRequests: []pb.CommandLookupTopic{ + { + RequestId: proto.Uint64(1), + Topic: proto.String("my-topic"), + Authoritative: proto.Bool(false), + }, + { + RequestId: proto.Uint64(2), + Topic: proto.String("my-topic"), + Authoritative: proto.Bool(true), + }, + }, + mockedResponses: []pb.CommandLookupTopicResponse{ + { + RequestId: proto.Uint64(1), + Response: responseType(pb.CommandLookupTopicResponse_Redirect), + Authoritative: proto.Bool(true), + BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-2:6651"), + }, + { + RequestId: proto.Uint64(2), + Response: responseType(pb.CommandLookupTopicResponse_Connect), + Authoritative: proto.Bool(true), + BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"), + }, + }, + }, url, true) + + lr, err := ls.Lookup("my-topic") + assert.NoError(t, err) + assert.NotNil(t, lr) + + assert.Equal(t, "pulsar+ssl://broker-1:6651", lr.LogicalAddr.String()) + assert.Equal(t, "pulsar+ssl://broker-1:6651", lr.PhysicalAddr.String()) +} + func TestLookupWithInvalidUrlResponse(t *testing.T) { url, err := url.Parse("pulsar://example:6650") assert.NoError(t, err) @@ -237,7 +346,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) { ProxyThroughServiceUrl: proto.Bool(false), }, }, - }, url) + }, url, false) lr, err := ls.Lookup("my-topic") assert.Error(t, err) @@ -265,7 +374,7 @@ func TestLookupWithLookupFailure(t *testing.T) { Authoritative: proto.Bool(true), }, }, - }, url) + }, url, false) lr, err := ls.Lookup("my-topic") assert.Error(t, err)