Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TAPA logs and other improvements #485

Merged
merged 3 commits into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 80 additions & 17 deletions pkg/ambassador/tap/conduit/conduit.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (
"github.com/nordix/meridio/pkg/log"
"github.com/nordix/meridio/pkg/networking"
"github.com/nordix/meridio/pkg/retry"
grpcCodes "google.golang.org/grpc/codes"
grpcStatus "google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/sets"
)

Expand All @@ -62,10 +64,13 @@ type Conduit struct {
StreamFactory StreamFactory
connection *networkservice.Connection
mu sync.Mutex
localIPs []string
// IPs assigned to the NSM connection on this side, fetched to get announced as Target IPs to NSP
localIPs []string
// VIP addresses received from NSP and already configured on the NSM connection
vips []string
logger logr.Logger
monitorConnectionCancel context.CancelFunc
nspEntryTimeout time.Duration
}

// New is the constructor of Conduit.
Expand All @@ -81,6 +86,8 @@ func New(conduit *ambassadorAPI.Conduit,
streamRegistry types.Registry,
netUtils networking.Utils,
nspEntryTimeout time.Duration) (*Conduit, error) {
logger := log.Logger.WithValues("class", "Conduit", "conduit", conduit, "namespace", namespace, "node", nodeName)
logger.Info("Create conduit")
c := &Conduit{
TargetName: targetName,
Namespace: namespace,
Expand All @@ -92,11 +99,12 @@ func New(conduit *ambassadorAPI.Conduit,
connection: nil,
localIPs: []string{},
vips: []string{},
logger: logger,
nspEntryTimeout: nspEntryTimeout,
}
c.StreamFactory = stream.NewFactory(targetRegistryClient, c)
c.StreamManager = NewStreamManager(configurationManagerClient, targetRegistryClient, streamRegistry, c.StreamFactory, PendingTime, nspEntryTimeout)
c.Configuration = newConfigurationImpl(c.StreamManager.SetStreams, c.Conduit.ToNSP(), configurationManagerClient)
c.logger = log.Logger.WithValues("class", "Conduit", "instance", conduit.Name)
return c, nil
}

Expand Down Expand Up @@ -128,11 +136,11 @@ func (c *Conduit) Connect(ctx context.Context) error {
return nil
}

c.logger.Info("Connect")
var request *networkservice.NetworkServiceRequest
var monitoredConnections map[string]*networkservice.Connection
var nsName string = conduit.GetNetworkServiceNameWithProxy(c.Conduit.GetName(), c.Conduit.GetTrench().GetName(), c.Namespace)
var id string = fmt.Sprintf("%s-%s-%d", c.TargetName, nsName, 0)
c.logger.Info("Connect", "ID", id)

// initial request
request = &networkservice.NetworkServiceRequest{
Expand Down Expand Up @@ -164,7 +172,7 @@ func (c *Conduit) Connect(ctx context.Context) error {
})
if err != nil {
c.logger.Error(err, "Connect failed to create monitorConnectionClient")
return err
return fmt.Errorf("failed to create monitor connection client: %w", err)
}

event, err := stream.Recv()
Expand All @@ -180,7 +188,7 @@ func (c *Conduit) Connect(ctx context.Context) error {
path := conn.GetPath()
// XXX: surprisingly, I still got connections that did not match the filter ID.
if path != nil && path.Index == 1 && path.PathSegments[0].Id == id && conn.Mechanism.Type == request.MechanismPreferences[0].Type {
c.logger.Info("Connect using recovered connection", "conn", conn)
c.logger.Info("Connect recovered connection", "connection", conn)
// Keeping Policy Routes and VIPs if any as of now. Can mitigate impact of TAPA container crash. (I see no other benefit keeping them.
// Note: Might also recover connection that belonged to a recently redeployed trench.
// TODO: Maybe ignore Policy Routes and VIPs if last update time of connection had to be over 50% of our MaxTokenLifetime.
Expand Down Expand Up @@ -220,11 +228,12 @@ func (c *Conduit) Connect(ctx context.Context) error {
}

// request the connection
originalRequest := request.Clone()
connection, err := c.NetworkServiceClient.Request(ctx, request)
if err != nil {
return err
err := fmt.Errorf("nsc connection request error: %w", err)
return fmt.Errorf("%w; original request: %v", err, originalRequest)
}
c.logger.Info("Connected", "connection", connection)
c.connection = connection
if len(c.getGateways()) > 0 {
// Filter out any VIPs from local IPs by relying on gateway subnets,
Expand All @@ -234,7 +243,7 @@ func (c *Conduit) Connect(ctx context.Context) error {
} else {
c.localIPs = c.connection.GetContext().GetIpContext().GetSrcIpAddrs()
}
c.logger.V(1).Info("Connected", "localIPs", c.localIPs)
c.logger.Info("Connected", "connection", connection, "localIPs", c.localIPs)

c.Configuration.Watch()

Expand All @@ -261,27 +270,37 @@ func (c *Conduit) Disconnect(ctx context.Context) error {
// Stop the stream manager (close the streams)
errFinal = c.StreamManager.Stop(ctx)
// Close the NSM connection
_, err := c.NetworkServiceClient.Close(ctx, c.connection)
if err != nil {
errFinal = fmt.Errorf("%w; %v", errFinal, err) // todo
if c.isConnected() {
_, err := c.NetworkServiceClient.Close(ctx, c.connection)
if err != nil {
errFinal = fmt.Errorf("%w; nsc connection close error: %w", errFinal, err) // todo
}
}
c.connection = nil
c.vips = []string{}
return errFinal
}

// AddStream creates a stream based on its factory and will open it (in another goroutine)
func (c *Conduit) AddStream(ctx context.Context, strm *ambassadorAPI.Stream) error {
c.logger.Info("AddStream", "stream", strm)
c.logger.Info("Add stream", "stream", strm)
if !c.Equals(strm.GetConduit()) {
return errors.New("invalid stream for this conduit")
}
return c.StreamManager.AddStream(strm)
if err := c.StreamManager.AddStream(strm); err != nil {
return fmt.Errorf("conduit stream manager failed to add stream: %w", err)
}
return nil
}

// RemoveStream closes and removes the stream (if existing), and removes it from the
// stream registry.
func (c *Conduit) RemoveStream(ctx context.Context, strm *ambassadorAPI.Stream) error {
return c.StreamManager.RemoveStream(ctx, strm)
c.logger.Info("Remove stream", "stream", strm)
if err := c.StreamManager.RemoveStream(ctx, strm); err != nil {
return fmt.Errorf("conduit stream manager failed to remove stream: %w", err)
}
return nil
}

// GetStreams returns all streams previously added to this conduit
Expand Down Expand Up @@ -314,6 +333,12 @@ func (c *Conduit) SetVIPs(ctx context.Context, vips []string) error {
vipsInGatewaySubnet := getIPsInGatewaySubnet(vips, c.getGateways())
vipsInGatewaySubnetSet := sets.New(vipsInGatewaySubnet...)
vipsSet := sets.New(formatPrefixes(vips)...)
if vipsSet.Equal(sets.New(c.vips...)) {
// same set of vips, skip NSM connection update
// note: NSM heal related close and reconnect seems to keep IpContext intact (including VIP setup)
return nil
}
c.logger.Info("SetVIPs", "VIPs", vips)
vips = vipsSet.Difference(vipsInGatewaySubnetSet).UnsortedList()
// prepare SrcIpAddrs (IPs allocated by the proxy + VIPs)
c.connection.Context.IpContext.SrcIpAddrs = append(c.localIPs, vips...)
Expand Down Expand Up @@ -366,7 +391,7 @@ func (c *Conduit) SetVIPs(ctx context.Context, vips []string) error {
connection, err := c.NetworkServiceClient.Request(ctx, request)
if err != nil {
c.logger.Error(err, "Updating VIPs")
return err
return fmt.Errorf("nsc connection update error: %w", err)
}
c.connection = connection

Expand Down Expand Up @@ -404,6 +429,7 @@ func (c *Conduit) monitorConnection(ctx context.Context, initialConnection *netw
if c.MonitorConnectionClient == nil {
return
}
logger := c.logger.WithValues("func", "monitorConnection", "ID", initialConnection.GetId())
monitorScope := &networkservice.MonitorScopeSelector{
PathSegments: []*networkservice.PathSegment{
{
Expand All @@ -414,23 +440,60 @@ func (c *Conduit) monitorConnection(ctx context.Context, initialConnection *netw
_ = retry.Do(func() error {
monitorConnectionsClient, err := c.MonitorConnectionClient.MonitorConnections(ctx, monitorScope)
if err != nil {
return err
return fmt.Errorf("failed to create connection monitor client: %w", err)
}
for {
mccResponse, err := monitorConnectionsClient.Recv()
if err != nil {
s, _ := grpcStatus.FromError(err)
if s.Code() != grpcCodes.Canceled {
// client did not close the connection
// (refer to https://github.com/networkservicemesh/sdk/blob/v1.11.1/pkg/networkservice/common/heal/eventloop.go#L114)
logger.Info("Connection monitor lost contact with local NSMgr")
}
}
if err == io.EOF {
break
}
if err != nil {
return err
return fmt.Errorf("connection monitor client receive error: %w", err)
}
for _, connection := range mccResponse.Connections {
path := connection.GetPath()
if path != nil && len(path.PathSegments) >= 1 && path.PathSegments[0].Id == initialConnection.GetId() {
// Check for control plane down or connection close events to log them
//
// TODO: Check if it'd make sense closing the streams when the connection is
// closed (to synchronize NSP and its consumers), and assess possible side-effects.
// (Connection recovery would need to be tracked somehow to re-open streams.)
if connection.GetState() == networkservice.State_DOWN || mccResponse.Type == networkservice.ConnectionEventType_DELETE {
msg := "Connection monitor received delete event" // connection closed (e.g. due to NSM heal with reselect)
if connection.GetState() == networkservice.State_DOWN {
msg = "Connection monitor received down event" // control plane is down
}
logger.Info(msg, "event type", mccResponse.Type, "connection state", connection.GetState())
}
c.mu.Lock()
// Check for changes involving localIPs of the connection
if c.isConnected() {
c.connection.Context = connection.GetContext()
oldLocalIPsSet := sets.New(c.localIPs...)
c.localIPs = getLocalIPs(c.connection.GetContext().GetIpContext().GetSrcIpAddrs(), c.vips, c.getGateways())
if !oldLocalIPsSet.Equal(sets.New(c.localIPs...)) {
logger.Info("Connection IPs updated, streams require update", "ID", initialConnection.GetId(), "localIPs", c.localIPs, "old localIPs", oldLocalIPsSet)
// Trigger stream update to announce new localIPs.
// Note: First let's close streams in the conduit to
// release identifiers currently in use. In order to
// avoid lingering outdated Targets, and registration
// delays due to low availability of free identifiers.
// Note: Locks are held, context cannot be cancelled.
stopCtx, stopCancel := context.WithTimeout(ctx, 2*c.nspEntryTimeout) // don't risk blocking indefinitely
if err := c.StreamManager.Stop(stopCtx); err != nil {
logger.Info("Connection update triggered stream error", "err", err, "ID", initialConnection.GetId())
}
stopCancel()
c.StreamManager.Run()
}
}
c.mu.Unlock()
break
Expand Down
23 changes: 19 additions & 4 deletions pkg/ambassador/tap/conduit/configuration.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2021-2022 Nordix Foundation
Copyright (c) 2021-2023 Nordix Foundation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -19,13 +19,17 @@ package conduit

import (
"context"
"fmt"
"io"
"sync"
"time"

"github.com/go-logr/logr"
nspAPI "github.com/nordix/meridio/api/nsp/v1"
"github.com/nordix/meridio/pkg/log"
"github.com/nordix/meridio/pkg/retry"
grpcCodes "google.golang.org/grpc/codes"
grpcStatus "google.golang.org/grpc/status"
)

const (
Expand All @@ -44,22 +48,27 @@ type configurationImpl struct {
cancel context.CancelFunc
mu sync.Mutex
streamChan chan []*nspAPI.Stream
logger logr.Logger
}

func newConfigurationImpl(setStreams func([]*nspAPI.Stream),
conduit *nspAPI.Conduit,
configurationManagerClient nspAPI.ConfigurationManagerClient) *configurationImpl {
logger := log.Logger.WithValues("class", "conduit.configurationImpl", "conduit", conduit)
logger.V(1).Info("Create configuration implementation to set streams")
c := &configurationImpl{
SetStreams: setStreams,
Conduit: conduit,
ConfigurationManagerClient: configurationManagerClient,
logger: logger,
}
return c
}

func (c *configurationImpl) Watch() {
c.mu.Lock()
defer c.mu.Unlock()
c.logger.V(1).Info("Watch configuration")
var ctx context.Context
ctx, c.cancel = context.WithCancel(context.TODO())
c.streamChan = make(chan []*nspAPI.Stream, channelBufferSize)
Expand All @@ -70,6 +79,7 @@ func (c *configurationImpl) Watch() {
func (c *configurationImpl) Stop() {
c.mu.Lock()
defer c.mu.Unlock()
c.logger.V(1).Info("Stop configuration watcher")
if c.cancel != nil {
c.cancel()
}
Expand All @@ -93,15 +103,15 @@ func (c *configurationImpl) watchStreams(ctx context.Context) {
}
watchStreamClient, err := c.ConfigurationManagerClient.WatchStream(ctx, vipsToWatch)
if err != nil {
return err
return fmt.Errorf("configuration manager stream watch create failed: %w", err)
}
for {
streamResponse, err := watchStreamClient.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
return fmt.Errorf("configuration manager stream watch receive error: %w", err)
}
fixStreamsMaxTargets(streamResponse.GetStreams())
// flush previous context in channel
Expand All @@ -116,7 +126,12 @@ func (c *configurationImpl) watchStreams(ctx context.Context) {
retry.WithDelay(500*time.Millisecond),
retry.WithErrorIngnored())
if err != nil {
log.Logger.Error(err, "watchStreams") // todo
s, _ := grpcStatus.FromError(err)
if s.Code() == grpcCodes.Canceled {
c.logger.Info("watchStreams context cancelled")
} else {
log.Logger.Error(err, "watchStreams") // todo
}
}
}

Expand Down
Loading