Skip to content

Commit

Permalink
Fixed tls connection issue (#220)
Browse files Browse the repository at this point in the history
* Fixed Lookup service.
  • Loading branch information
shustsud authored Apr 14, 2020
1 parent 4e52758 commit 6e5c7d3
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func newClient(options ClientOptions) (Client, error) {
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout),
}
c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout)
c.lookupService = internal.NewLookupService(c.rpcClient, url)
c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig != nil)
c.handlers = internal.NewClientHandlers()
return c, nil
}
Expand Down
11 changes: 9 additions & 2 deletions pulsar/internal/lookup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,26 @@ type LookupService interface {
type lookupService struct {
rpcClient RPCClient
serviceURL *url.URL
tlsEnabled bool
}

// NewLookupService init a lookup service struct and return an object of LookupService.
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL) LookupService {
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, tlsEnabled bool) LookupService {
return &lookupService{
rpcClient: rpcClient,
serviceURL: serviceURL,
tlsEnabled: tlsEnabled,
}
}

func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse) (logicalAddress *url.URL,
physicalAddress *url.URL, err error) {
logicalAddress, err = url.ParseRequestURI(lr.GetBrokerServiceUrl())
if ls.tlsEnabled {
logicalAddress, err = url.ParseRequestURI(lr.GetBrokerServiceUrlTls())
} else {
logicalAddress, err = url.ParseRequestURI(lr.GetBrokerServiceUrl())
}

if err != nil {
return nil, nil, err
}
Expand Down
119 changes: 114 additions & 5 deletions pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestLookupSuccess(t *testing.T) {
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
}, url)
}, url, false)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -137,6 +137,38 @@ func TestLookupSuccess(t *testing.T) {
assert.Equal(t, "pulsar://broker-1:6650", lr.PhysicalAddr.String())
}

func TestTlsLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)

ls := NewLookupService(&mockedRPCClient{
t: t,

expectedRequests: []pb.CommandLookupTopic{
{
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
{
RequestId: proto.Uint64(1),
Response: responseType(pb.CommandLookupTopicResponse_Connect),
Authoritative: proto.Bool(true),
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
},
},
}, url, true)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
assert.NotNil(t, lr)

assert.Equal(t, "pulsar+ssl://broker-1:6651", lr.LogicalAddr.String())
assert.Equal(t, "pulsar+ssl://broker-1:6651", lr.PhysicalAddr.String())
}

func TestLookupWithProxy(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
Expand All @@ -160,7 +192,7 @@ func TestLookupWithProxy(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(true),
},
},
}, url)
}, url, false)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -170,6 +202,39 @@ func TestLookupWithProxy(t *testing.T) {
assert.Equal(t, "pulsar://example:6650", lr.PhysicalAddr.String())
}

func TestTlsLookupWithProxy(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)

ls := NewLookupService(&mockedRPCClient{
t: t,

expectedRequests: []pb.CommandLookupTopic{
{
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
{
RequestId: proto.Uint64(1),
Response: responseType(pb.CommandLookupTopicResponse_Connect),
Authoritative: proto.Bool(true),
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
ProxyThroughServiceUrl: proto.Bool(true),
},
},
}, url, true)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
assert.NotNil(t, lr)

assert.Equal(t, "pulsar+ssl://broker-1:6651", lr.LogicalAddr.String())
assert.Equal(t, "pulsar+ssl://example:6651", lr.PhysicalAddr.String())
}

func TestLookupWithRedirect(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
Expand Down Expand Up @@ -204,7 +269,7 @@ func TestLookupWithRedirect(t *testing.T) {
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
}, url)
}, url, false)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -214,6 +279,50 @@ func TestLookupWithRedirect(t *testing.T) {
assert.Equal(t, "pulsar://broker-1:6650", lr.PhysicalAddr.String())
}

func TestTlsLookupWithRedirect(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)

ls := NewLookupService(&mockedRPCClient{
t: t,
expectedURL: "pulsar+ssl://broker-2:6651",

expectedRequests: []pb.CommandLookupTopic{
{
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
},
{
RequestId: proto.Uint64(2),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(true),
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
{
RequestId: proto.Uint64(1),
Response: responseType(pb.CommandLookupTopicResponse_Redirect),
Authoritative: proto.Bool(true),
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-2:6651"),
},
{
RequestId: proto.Uint64(2),
Response: responseType(pb.CommandLookupTopicResponse_Connect),
Authoritative: proto.Bool(true),
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
},
},
}, url, true)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
assert.NotNil(t, lr)

assert.Equal(t, "pulsar+ssl://broker-1:6651", lr.LogicalAddr.String())
assert.Equal(t, "pulsar+ssl://broker-1:6651", lr.PhysicalAddr.String())
}

func TestLookupWithInvalidUrlResponse(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
Expand All @@ -237,7 +346,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(false),
},
},
}, url)
}, url, false)

lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
Expand Down Expand Up @@ -265,7 +374,7 @@ func TestLookupWithLookupFailure(t *testing.T) {
Authoritative: proto.Bool(true),
},
},
}, url)
}, url, false)

lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
Expand Down

0 comments on commit 6e5c7d3

Please sign in to comment.