Skip to content

Commit

Permalink
FE; VIPs not advertised if LB internal connectivity is down
Browse files Browse the repository at this point in the history
FE won't advertise VIPs via BGP unless the collocated LB has
working internal connectivity. (External connectivity in FE
is also a requirement obviously.)

Implies VIPs are not advertised without configured application
targets.

FE monitors LB internal connectivity via NSP.
  • Loading branch information
zolug committed Jan 25, 2024
1 parent 07e1984 commit 3643f39
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 9 deletions.
5 changes: 3 additions & 2 deletions cmd/frontend/internal/config/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2022 Nordix Foundation
Copyright (c) 2022-2024 Nordix Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -23,5 +23,6 @@ import (

type Config struct {
*env.Config
NSPConn *grpc.ClientConn
NSPConn *grpc.ClientConn
Hostname string
}
158 changes: 154 additions & 4 deletions cmd/frontend/internal/frontend/service.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2021-2023 Nordix Foundation
Copyright (c) 2021-2024 Nordix Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"reflect"
"strconv"
Expand All @@ -37,9 +38,12 @@ import (
"github.com/nordix/meridio/cmd/frontend/internal/utils"
"github.com/nordix/meridio/pkg/health"
"github.com/nordix/meridio/pkg/k8s/watcher"
"github.com/nordix/meridio/pkg/loadbalancer/types"
"github.com/nordix/meridio/pkg/log"
"github.com/nordix/meridio/pkg/retry"
"github.com/vishvananda/netlink"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const rulePriorityVIP int = 100
Expand Down Expand Up @@ -88,6 +92,7 @@ func NewFrontEndService(ctx context.Context, c *feConfig.Config) *FrontEndServic
authCh: authCh,
logger: logger,
config: c,
extConnectivityCh: make(chan bool),
}

if len(frontEndService.vrrps) > 0 {
Expand Down Expand Up @@ -134,6 +139,7 @@ type FrontEndService struct {
authCh chan struct{} // used by secretDatabase to signal updates to FE Service
logger logr.Logger
config *feConfig.Config
extConnectivityCh chan bool
}

// CleanUp -
Expand All @@ -143,6 +149,7 @@ func (fes *FrontEndService) CleanUp() {

close(fes.reconfCh)
close(fes.authCh)
close(fes.extConnectivityCh)
_ = fes.RemoveVIPRules()
}

Expand All @@ -160,6 +167,7 @@ func (fes *FrontEndService) Start(ctx context.Context, errCh chan<- error) {
go fes.start(ctx, errCh)
go fes.reconfigurationAgent(ctx, fes.reconfCh, errCh)
go fes.authenticationAgent(ctx, errCh) // monitors updates for secrets of interest to trigger reconfiguration
go fes.vipAgent(ctx, fes.targetRegistryClient, errCh)
}

// Stop -
Expand Down Expand Up @@ -309,7 +317,11 @@ func (fes *FrontEndService) Monitor(ctx context.Context, errCh chan<- error) {
}
cancel()
health.SetServingStatus(ctx, health.EgressSvc, false)
fes.denounceVIP(ctx, errCh)
// announce change in connectivity
select {
case <-ctx.Done(): // allows blocked channel write to break free
case fes.extConnectivityCh <- hasConnectivity:
}
}

select {
Expand Down Expand Up @@ -391,7 +403,11 @@ func (fes *FrontEndService) Monitor(ctx context.Context, errCh chan<- error) {
retry.WithDelay(fes.nspEntryTimeout),
retry.WithErrorIngnored())
}()
fes.announceVIP(ctx, errCh)
// announce change in connectivity
select {
case <-ctx.Done(): // allows blocked channel write to break free
case fes.extConnectivityCh <- hasConnectivity:
}
}
}

Expand Down Expand Up @@ -509,7 +525,7 @@ func (fes *FrontEndService) promoteConfig(ctx context.Context) error {
// promoteConfigNoLock -
func (fes *FrontEndService) promoteConfigNoLock(ctx context.Context) error {
if err := fes.writeConfig(); err != nil {
return fmt.Errorf("error writing configuration: %v", err)
return fmt.Errorf("error writing configuration: %w", err)
}
// send signal to reconfiguration agent to apply the new config
fes.logger.V(1).Info("promote configuration change")
Expand Down Expand Up @@ -1252,6 +1268,140 @@ func (fes *FrontEndService) getGatewayByName(name string) (*utils.Gateway, int)

//-------------------------------------------------------------------------------------------

// Watch and aggregate information to decide whether VIP addresses
// have to be announced through the routing protocol to the connected
// gateways. (Applicable in case of BGP.)
func (fes *FrontEndService) vipAgent(ctx context.Context, targetRegistryClient nspAPI.TargetRegistryClient, errCh chan<- error) {
const (
TARGET = "loadbalancerTarget" // local LB announced itself being capable of handling incoming traffic
CONNECTIVITY = "externalConnectivity" // FE has external connectivity
)
avilabilityMap := map[string]bool{
TARGET: false,
CONNECTIVITY: false,
}
availability := false // overall availability
targetCh := make(chan bool)
defer func() {
fes.logger.Info("VIP agent stopped")
close(targetCh)
}()

checkAvailabilityAction := func(currentAvail *bool, keyValuePairs ...struct {
string
bool
}) {
// check if any availability component has changed in a way
// that it would affect the overall availability
oldSum := *currentAvail
newSum := true

for _, elem := range keyValuePairs {
oldVal, ok := avilabilityMap[elem.string]
if !ok || oldVal != elem.bool {
fes.logger.Info("VIP agent set availability information", "key", elem.string, "value", elem.bool)
}
avilabilityMap[elem.string] = elem.bool
}

for _, v := range avilabilityMap {
newSum = newSum && v
if !v {
break
}
}

if newSum == oldSum { // no change in overall state
return
}

if newSum {
fes.announceVIP(ctx, errCh)
} else {
fes.denounceVIP(ctx, errCh)
}
*currentAvail = newSum
}

getLocal := func(targets []*nspAPI.Target) *nspAPI.Target {
// find and return collocated loadbalancer if any
for _, target := range targets {
identifierStr, exists := target.GetContext()[types.IdentifierKey]
if !exists {
continue
}
if identifierStr == fes.config.Hostname {
return target
}
}
return nil
}

go func() {
// watch loadbalancer Targets to learn possible changes affecting
// the local loadbalancer's ability to handle incoming traffic
_ = retry.Do(func() error {
watchClient, err := targetRegistryClient.Watch(ctx, &nspAPI.Target{
Status: nspAPI.Target_ANY,
Type: nspAPI.Target_LOADBALANCER,
})
if err != nil {
return fmt.Errorf("failed to create loadbalancer target watcher: %w", err)
}
for {
targetResponse, err := watchClient.Recv()
if err == io.EOF {
break
}
if err != nil {
if status.Code(err) != codes.Canceled {
fes.logger.Error(err, "loadbalancer target watcher receive")
}
return fmt.Errorf("loadbalancer target watch receive error: %w", err)
}

target := getLocal(targetResponse.GetTargets())
select {
case targetCh <- func() bool { return target != nil }():
case <-ctx.Done():
return fmt.Errorf("context closed, abort target channel write")
}
}
return nil
}, retry.WithContext(ctx),
retry.WithDelay(500*time.Millisecond), // TODO
retry.WithErrorIngnored())
}()

// process external connectivity and loadbalancer target related events
for {
select {
case <-ctx.Done():
return
case hasTarget, ok := <-targetCh:
if ok {
checkAvailabilityAction(
&availability,
struct {
string
bool
}{TARGET, hasTarget},
)
}
case hasConnectivity, ok := <-fes.extConnectivityCh:
if ok {
checkAvailabilityAction(
&availability,
struct {
string
bool
}{CONNECTIVITY, hasConnectivity},
)
}
}
}
}

// TODO: what to do once Static+BFD gets introduced? Probably do nothing...
// TODO: when there's only static, no need to play with announce/denounceVIP...

Expand Down
7 changes: 4 additions & 3 deletions cmd/frontend/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2021-2023 Nordix Foundation
Copyright (c) 2021-2024 Nordix Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -133,8 +133,9 @@ func main() {

// create and start frontend service
c := &feConfig.Config{
Config: config,
NSPConn: conn,
Config: config,
NSPConn: conn,
Hostname: hostname,
}
fe := frontend.NewFrontEndService(ctx, c)
defer fe.CleanUp()
Expand Down

0 comments on commit 3643f39

Please sign in to comment.