Skip to content

Commit

Permalink
LB; announce internal connectivity
Browse files Browse the repository at this point in the history
Announce NSP the status of internal connectivity. That is
announce whether a stateless-lb is capable of forwarding
incoming traffic towards application targets.

The solution considers both NSM connectivity and application
target availability: At least 1 application target is required
with an established NSM connection towards the serving proxy
to announce internal connectivity is available.

The implementation hooks into the removal and succesful
configuration of application targets. (Because configured
targets are continuously verified, thus disappearance of a
NSM interface can prompt the solution to revoke the internal
connectivity.)
  • Loading branch information
zolug committed Jan 25, 2024
1 parent af43525 commit 07e1984
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 68 deletions.
3 changes: 2 additions & 1 deletion cmd/stateless-lb/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2021-2022 Nordix Foundation
Copyright (c) 2021-2024 Nordix Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,6 +45,7 @@ type Config struct {
GRPCMaxBackoff time.Duration `default:"5s" desc:"Upper bound on gRPC connection backoff delay" envconfig:"grpc_max_backoff"`
MetricsEnabled bool `default:"false" desc:"Enable the metrics collection" split_words:"true"`
MetricsPort int `default:"2223" desc:"Specify the port used to expose the metrics" split_words:"true"`
NSPEntryTimeout time.Duration `default:"30s" desc:"Timeout of the entries" envconfig:"nsp_entry_timeout"`
}

// IsValid checks if the configuration is valid
Expand Down
94 changes: 58 additions & 36 deletions cmd/stateless-lb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
linuxKernel "github.com/nordix/meridio/pkg/kernel"
"github.com/nordix/meridio/pkg/kernel/neighbor"
"github.com/nordix/meridio/pkg/loadbalancer/flow"
streamForwarding "github.com/nordix/meridio/pkg/loadbalancer/forwarding/stream"
"github.com/nordix/meridio/pkg/loadbalancer/nfqlb"
"github.com/nordix/meridio/pkg/loadbalancer/stream"
"github.com/nordix/meridio/pkg/loadbalancer/target"
Expand Down Expand Up @@ -216,6 +217,22 @@ func main() {
cancel()
return
}

// announces forwarding availability of streams (i.e. if the LB can forward traffic towards application targets)
streamFwdAvailabilityService := streamForwarding.NewForwardingAvailabilityService(
ctx,
&nspAPI.Target{
Ips: []string{hostname},
Type: nspAPI.Target_LOADBALANCER,
Context: map[string]string{
types.IdentifierKey: hostname,
},
},
targetRegistryClient,
streamForwarding.WithNSPTimeout(config.NSPEntryTimeout),
)
defer streamFwdAvailabilityService.Stop()

sns := newSimpleNetworkService(
netUtils.WithInterfaceMonitor(ctx, interfaceMonitor),
targetRegistryClient,
Expand All @@ -227,6 +244,7 @@ func main() {
config.IdentifierOffsetStart,
targetHitsMetrics,
neighborMonitor,
streamFwdAvailabilityService,
)

interfaceMonitorEndpoint := interfacemonitor.NewServer(interfaceMonitor, sns, netUtils)
Expand Down Expand Up @@ -373,25 +391,26 @@ func main() {
// SimpleNetworkService -
type SimpleNetworkService struct {
*nspAPI.Conduit
targetRegistryClient nspAPI.TargetRegistryClient
ConfigurationManagerClient nspAPI.ConfigurationManagerClient
IdentifierOffsetGenerator *IdentifierOffsetGenerator
interfaces sync.Map
ctx context.Context
logger logr.Logger
streams map[string]types.Stream
netUtils networking.Utils
nfqueueIndex int
serviceCtrCh chan bool
simpleNetworkServiceBlocked bool
mu sync.Mutex
cancelStreamWatcher context.CancelFunc
streamWatcherRunning bool
lbFactory types.NFQueueLoadBalancerFactory
nfa types.NFAdaptor
natHandler *nat.NatHandler
targetHitsMetrics *target.HitsMetrics
neighborMonitor *neighbor.NeighborMonitor
targetRegistryClient nspAPI.TargetRegistryClient
ConfigurationManagerClient nspAPI.ConfigurationManagerClient
IdentifierOffsetGenerator *IdentifierOffsetGenerator
interfaces sync.Map
ctx context.Context
logger logr.Logger
streams map[string]types.Stream
netUtils networking.Utils
nfqueueIndex int
serviceCtrCh chan bool
simpleNetworkServiceBlocked bool
mu sync.Mutex
cancelStreamWatcher context.CancelFunc
streamWatcherRunning bool
lbFactory types.NFQueueLoadBalancerFactory
nfa types.NFAdaptor
natHandler *nat.NatHandler
targetHitsMetrics *target.HitsMetrics
neighborMonitor *neighbor.NeighborMonitor
streamFwdAvailabilityService *streamForwarding.ForwardingAvailabilityService
}

/* // Request checks if allowed to serve the request
Expand Down Expand Up @@ -430,6 +449,7 @@ func newSimpleNetworkService(
identifierOffsetStart int,
targetHitsMetrics *target.HitsMetrics,
neighborMonitor *neighbor.NeighborMonitor,
streamFwdAvailabilityService *streamForwarding.ForwardingAvailabilityService,
) *SimpleNetworkService {
identifierOffsetGenerator := NewIdentifierOffsetGenerator(identifierOffsetStart)
logger := log.FromContextOrGlobal(ctx).WithValues("class", "SimpleNetworkService",
Expand All @@ -440,23 +460,24 @@ func newSimpleNetworkService(
log.Fatal(logger, "Unable to init NAT", "error", err)
}
simpleNetworkService := &SimpleNetworkService{
Conduit: conduit,
targetRegistryClient: targetRegistryClient,
ConfigurationManagerClient: configurationManagerClient,
IdentifierOffsetGenerator: identifierOffsetGenerator,
ctx: ctx,
logger: logger,
netUtils: netUtils,
nfqueueIndex: 1,
streams: make(map[string]types.Stream),
serviceCtrCh: make(chan bool),
simpleNetworkServiceBlocked: true,
streamWatcherRunning: false,
lbFactory: lbFactory,
nfa: nfa,
natHandler: nh,
targetHitsMetrics: targetHitsMetrics,
neighborMonitor: neighborMonitor,
Conduit: conduit,
targetRegistryClient: targetRegistryClient,
ConfigurationManagerClient: configurationManagerClient,
IdentifierOffsetGenerator: identifierOffsetGenerator,
ctx: ctx,
logger: logger,
netUtils: netUtils,
nfqueueIndex: 1,
streams: make(map[string]types.Stream),
serviceCtrCh: make(chan bool),
simpleNetworkServiceBlocked: true,
streamWatcherRunning: false,
lbFactory: lbFactory,
nfa: nfa,
natHandler: nh,
targetHitsMetrics: targetHitsMetrics,
neighborMonitor: neighborMonitor,
streamFwdAvailabilityService: streamFwdAvailabilityService,
}
logger.Info("Created LB service", "conduit", conduit)
return simpleNetworkService
Expand Down Expand Up @@ -685,6 +706,7 @@ func (sns *SimpleNetworkService) addStream(strm *nspAPI.Stream) error {
identifierOffset,
sns.targetHitsMetrics,
neighborReachDetector,
sns.streamFwdAvailabilityService,
)
if err != nil {
return fmt.Errorf("failed to create stream (%s): %w", strm.String(), err)
Expand Down
70 changes: 39 additions & 31 deletions pkg/loadbalancer/stream/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/nordix/meridio/pkg/health"
"github.com/nordix/meridio/pkg/kernel/neighbor"
"github.com/nordix/meridio/pkg/loadbalancer/flow"
streamForwarding "github.com/nordix/meridio/pkg/loadbalancer/forwarding/stream"
targetMetrics "github.com/nordix/meridio/pkg/loadbalancer/target"
"github.com/nordix/meridio/pkg/loadbalancer/types"
"github.com/nordix/meridio/pkg/log"
Expand All @@ -44,23 +45,24 @@ var errNoTarget error = errors.New("the target is not existing")
// LoadBalancer -
type LoadBalancer struct {
*nspAPI.Stream
TargetRegistryClient nspAPI.TargetRegistryClient
ConfigurationManagerClient nspAPI.ConfigurationManagerClient
IdentifierOffset int
nfqlb types.NFQueueLoadBalancer
flows map[string]types.Flow
targets map[int]types.Target // key: Identifier
netUtils networking.Utils
nfqueue int
mu sync.Mutex
ctx context.Context
cancel context.CancelFunc
pendingTargets map[int]types.Target // key: Identifier
defrag *Defrag
pendingCh chan struct{} // trigger pending Targets processing
logger logr.Logger
targetHitsMetrics *targetMetrics.HitsMetrics
neighborReachDetector *neighbor.NeighborReachabilityDetector
TargetRegistryClient nspAPI.TargetRegistryClient
ConfigurationManagerClient nspAPI.ConfigurationManagerClient
IdentifierOffset int
nfqlb types.NFQueueLoadBalancer
flows map[string]types.Flow
targets map[int]types.Target // key: Identifier
netUtils networking.Utils
nfqueue int
mu sync.Mutex
ctx context.Context
cancel context.CancelFunc
pendingTargets map[int]types.Target // key: Identifier
defrag *Defrag
pendingCh chan struct{} // trigger pending Targets processing
logger logr.Logger
targetHitsMetrics *targetMetrics.HitsMetrics
neighborReachDetector *neighbor.NeighborReachabilityDetector
forwardingAvailabilityService *streamForwarding.ForwardingAvailabilityService
}

func New(
Expand All @@ -73,6 +75,7 @@ func New(
identifierOffset int,
targetHitsMetrics *targetMetrics.HitsMetrics,
neighborReachDetector *neighbor.NeighborReachabilityDetector,
forwardingAvailabilityService *streamForwarding.ForwardingAvailabilityService,
) (types.Stream, error) {
n := int(stream.GetMaxTargets())
m := int(stream.GetMaxTargets()) * 100
Expand All @@ -84,20 +87,21 @@ func New(
logger := log.Logger.WithValues("class", "LoadBalancer",
"instance", stream.GetName(), "identifierOffset", identifierOffset)
loadBalancer := &LoadBalancer{
Stream: stream,
TargetRegistryClient: targetRegistryClient,
ConfigurationManagerClient: configurationManagerClient,
IdentifierOffset: identifierOffset,
flows: make(map[string]types.Flow),
nfqlb: nfqlb,
targets: make(map[int]types.Target),
netUtils: netUtils,
nfqueue: nfqueue,
pendingTargets: make(map[int]types.Target),
pendingCh: make(chan struct{}, 10),
logger: logger,
targetHitsMetrics: targetHitsMetrics,
neighborReachDetector: neighborReachDetector,
Stream: stream,
TargetRegistryClient: targetRegistryClient,
ConfigurationManagerClient: configurationManagerClient,
IdentifierOffset: identifierOffset,
flows: make(map[string]types.Flow),
nfqlb: nfqlb,
targets: make(map[int]types.Target),
netUtils: netUtils,
nfqueue: nfqueue,
pendingTargets: make(map[int]types.Target),
pendingCh: make(chan struct{}, 10),
logger: logger,
targetHitsMetrics: targetHitsMetrics,
neighborReachDetector: neighborReachDetector,
forwardingAvailabilityService: forwardingAvailabilityService,
}
// first enable kernel's IP defrag except for the interfaces facing targets
// (defrag is needed by Flows to match rules with L4 information)
Expand Down Expand Up @@ -210,6 +214,7 @@ func (lb *LoadBalancer) AddTarget(target types.Target) error {
lb.targets[target.GetIdentifier()] = target
lb.neighborReachDetector.Register(target.GetIps()...)
lb.removeFromPendingTarget(target)
lb.forwardingAvailabilityService.Register(lb.Name)
logger.Info("Added target")
return nil
}
Expand All @@ -233,6 +238,9 @@ func (lb *LoadBalancer) RemoveTarget(identifier int) error {
errFinal = utils.AppendErr(errFinal, fmt.Errorf("target delete error: %w", err)) // todo
}
delete(lb.targets, target.GetIdentifier())
if len(lb.targets) == 0 {
lb.forwardingAvailabilityService.Unregister(lb.Name)
}
logger.Info("Removed target", "target", target)
return errFinal
}
Expand Down

0 comments on commit 07e1984

Please sign in to comment.