Skip to content

Commit

Permalink
Add ADS client
Browse files Browse the repository at this point in the history
Signed-off-by: Renuka Fernando <[email protected]>
  • Loading branch information
renuka-fernando committed Oct 27, 2022
1 parent 3f7804d commit f366d18
Showing 1 changed file with 101 additions and 0 deletions.
101 changes: 101 additions & 0 deletions pkg/adsclient/sotw/v3/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2022 Envoyproxy Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package sotw provides an implementation of GRPC SoTW (State of The World) part of XDS client
package sotw

import (
"context"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/golang/protobuf/ptypes/any"
"google.golang.org/grpc"
)

type AdsClient interface {
// Recv waits for a response from management server and return it.
Recv() (*Response, error)
// Ack acknowledge the validity of the last received response to management server.
Ack() error
// Nack acknowledge the invalidity of the last received response to management server.
Nack() error
// ReConnect reinitialize the gRPC connection with management server.
ReConnect(clientConn grpc.ClientConnInterface, opts ...grpc.CallOption) error
}

type Response struct {
Resources []*any.Any
}

type adsClient struct {
ctx context.Context
nodeID string
typeURL string

// streamClient is the ADS discovery client
streamClient discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient
// lastAckedResponse is the last response acked by the ADS client
lastAckedResponse *discovery.DiscoveryResponse
// lastReceivedResponse is the last response received from management server
lastReceivedResponse *discovery.DiscoveryResponse
}

func NewAdsClient(ctx context.Context, nodeID, typeURL string, clientConn grpc.ClientConnInterface, opts ...grpc.CallOption) (AdsClient, error) {
streamClint, err := discovery.NewAggregatedDiscoveryServiceClient(clientConn).StreamAggregatedResources(ctx, opts...)
if err != nil {
return nil, err
}
return &adsClient{
ctx: ctx,
nodeID: nodeID,
typeURL: typeURL,
streamClient: streamClint,
}, nil
}

func (c *adsClient) Recv() (*Response, error) {
resp, err := c.streamClient.Recv()
if err == nil {
c.lastReceivedResponse = resp
}

return &Response{
Resources: resp.Resources,
}, err
}

func (c *adsClient) Ack() error {
c.lastAckedResponse = c.lastReceivedResponse
return c.send()
}

func (c *adsClient) Nack() error {
return c.send()
}

func (c *adsClient) ReConnect(clientConn grpc.ClientConnInterface, opts ...grpc.CallOption) (err error) {
c.streamClient, err = discovery.NewAggregatedDiscoveryServiceClient(clientConn).StreamAggregatedResources(c.ctx, opts...)
return
}

func (c *adsClient) send() error {
req := &discovery.DiscoveryRequest{
Node: &core.Node{Id: c.nodeID},
VersionInfo: c.lastAckedResponse.GetVersionInfo(),
TypeUrl: c.typeURL,
ResponseNonce: c.lastReceivedResponse.GetNonce(),
}
return c.streamClient.Send(req)
}

0 comments on commit f366d18

Please sign in to comment.