Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NGINX Plus: dynamic upstream reloads support #1469

Merged
merged 1 commit into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 109 additions & 9 deletions internal/mode/static/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/go-logr/logr"
ngxclient "github.com/nginxinc/nginx-plus-go-client/client"
apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -98,21 +99,33 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
h.handleEvent(ctx, logger, event)
}

changed, graph := h.cfg.processor.Process()
if !changed {
changeType, graph := h.cfg.processor.Process()

var err error
switch changeType {
case state.NoChange:
logger.Info("Handling events didn't result into NGINX configuration changes")
sjberman marked this conversation as resolved.
Show resolved Hide resolved
if !h.cfg.healthChecker.ready && h.cfg.healthChecker.firstBatchError == nil {
h.cfg.healthChecker.setAsReady()
}
return
case state.EndpointsOnlyChange:
h.cfg.version++
err = h.updateUpstreamServers(
ctx,
logger,
dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version),
)
case state.ClusterStateChange:
h.cfg.version++
err = h.updateNginxConf(
ctx,
dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version),
)
}

var nginxReloadRes nginxReloadResult
h.cfg.version++
if err := h.updateNginx(
ctx,
dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version),
); err != nil {
if err != nil {
logger.Error(err, "Failed to update NGINX configuration")
nginxReloadRes.error = err
if !h.cfg.healthChecker.ready {
Expand Down Expand Up @@ -174,9 +187,9 @@ func (h *eventHandlerImpl) handleEvent(ctx context.Context, logger logr.Logger,
}
}

func (h *eventHandlerImpl) updateNginx(ctx context.Context, conf dataplane.Configuration) error {
// updateNginxConf updates nginx conf files and reloads nginx
func (h *eventHandlerImpl) updateNginxConf(ctx context.Context, conf dataplane.Configuration) error {
files := h.cfg.generator.Generate(conf)

if err := h.cfg.nginxFileMgr.ReplaceFiles(files); err != nil {
return fmt.Errorf("failed to replace NGINX configuration files: %w", err)
}
Expand All @@ -188,6 +201,93 @@ func (h *eventHandlerImpl) updateNginx(ctx context.Context, conf dataplane.Confi
return nil
}

// updateUpstreamServers is called only when endpoints have changed. It updates nginx conf files and then:
// - if using NGINX Plus, determines which servers have changed and uses the N+ API to update them;
// - otherwise if not using NGINX Plus, or an error was returned from the API, reloads nginx
func (h *eventHandlerImpl) updateUpstreamServers(
ctx context.Context,
logger logr.Logger,
conf dataplane.Configuration,
) error {
isPlus := h.cfg.nginxRuntimeMgr.IsPlus()

files := h.cfg.generator.Generate(conf)
if err := h.cfg.nginxFileMgr.ReplaceFiles(files); err != nil {
return fmt.Errorf("failed to replace NGINX configuration files: %w", err)
}

reload := func() error {
if err := h.cfg.nginxRuntimeMgr.Reload(ctx, conf.Version); err != nil {
return fmt.Errorf("failed to reload NGINX: %w", err)
}

return nil
}

if isPlus {
sjberman marked this conversation as resolved.
Show resolved Hide resolved
type upstream struct {
name string
servers []ngxclient.UpstreamServer
}
var upstreams []upstream

prevUpstreams, err := h.cfg.nginxRuntimeMgr.GetUpstreams()
if err != nil {
logger.Error(err, "failed to get upstreams from API, reloading configuration instead")
return reload()
}

for _, u := range conf.Upstreams {
upstream := upstream{
name: u.Name,
servers: ngxConfig.ConvertEndpoints(u.Endpoints),
}

if u, ok := prevUpstreams[upstream.name]; ok {
if !serversEqual(upstream.servers, u.Peers) {
upstreams = append(upstreams, upstream)
}
}
}

var reloadPlus bool
for _, upstream := range upstreams {
if err := h.cfg.nginxRuntimeMgr.UpdateHTTPServers(upstream.name, upstream.servers); err != nil {
logger.Error(
err, "couldn't update upstream via the API, reloading configuration instead",
"upstreamName", upstream.name,
)
reloadPlus = true
pleshakov marked this conversation as resolved.
Show resolved Hide resolved
}
}

if !reloadPlus {
return nil
}
}

return reload()
}

func serversEqual(newServers []ngxclient.UpstreamServer, oldServers []ngxclient.Peer) bool {
kate-osborn marked this conversation as resolved.
Show resolved Hide resolved
if len(newServers) != len(oldServers) {
return false
}

diff := make(map[string]struct{}, len(newServers))
for _, s := range newServers {
diff[s.Server] = struct{}{}
}

for _, s := range oldServers {
if _, ok := diff[s.Server]; !ok {
return false
}
}

return true
}

// updateControlPlaneAndSetStatus updates the control plane configuration and then sets the status
// based on the outcome
func (h *eventHandlerImpl) updateControlPlaneAndSetStatus(
Expand Down
172 changes: 167 additions & 5 deletions internal/mode/static/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"errors"

ngxclient "github.com/nginxinc/nginx-plus-go-client/client"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
discoveryV1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/file"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/file/filefakes"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/runtime/runtimefakes"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state"
staticConds "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/conditions"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/graph"
Expand Down Expand Up @@ -64,6 +67,7 @@ var _ = Describe("eventHandler", func() {

BeforeEach(func() {
fakeProcessor = &statefakes.FakeChangeProcessor{}
fakeProcessor.ProcessReturns(state.NoChange, &graph.Graph{})
fakeGenerator = &configfakes.FakeGenerator{}
fakeNginxFileMgr = &filefakes.FakeManager{}
fakeNginxRuntimeMgr = &runtimefakes.FakeManager{}
Expand Down Expand Up @@ -112,7 +116,7 @@ var _ = Describe("eventHandler", func() {
}

BeforeEach(func() {
fakeProcessor.ProcessReturns(true /* changed */, &graph.Graph{})
fakeProcessor.ProcessReturns(state.ClusterStateChange /* changed */, &graph.Graph{})

fakeGenerator.GenerateReturns(fakeCfgFiles)
})
Expand Down Expand Up @@ -280,11 +284,129 @@ var _ = Describe("eventHandler", func() {
})
})

When("receiving an EndpointsOnlyChange update", func() {
e := &events.UpsertEvent{Resource: &discoveryV1.EndpointSlice{
sjberman marked this conversation as resolved.
Show resolved Hide resolved
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-gateway",
Namespace: "nginx-gateway",
},
}}
batch := []interface{}{e}

BeforeEach(func() {
fakeProcessor.ProcessReturns(state.EndpointsOnlyChange, &graph.Graph{})
upstreams := ngxclient.Upstreams{
"one": ngxclient.Upstream{
Peers: []ngxclient.Peer{
{Server: "server1"},
},
},
}
fakeNginxRuntimeMgr.GetUpstreamsReturns(upstreams, nil)
})

When("running NGINX Plus", func() {
It("should call the NGINX Plus API", func() {
fakeNginxRuntimeMgr.IsPlusReturns(true)

handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
Expect(fakeGenerator.GenerateCallCount()).To(Equal(1))
Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(1))
Expect(fakeNginxRuntimeMgr.GetUpstreamsCallCount()).To(Equal(1))
})
})

When("not running NGINX Plus", func() {
It("should not call the NGINX Plus API", func() {
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
Expect(fakeGenerator.GenerateCallCount()).To(Equal(1))
Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(1))
Expect(fakeNginxRuntimeMgr.GetUpstreamsCallCount()).To(Equal(0))
Expect(fakeNginxRuntimeMgr.ReloadCallCount()).To(Equal(1))
})
})
})

When("updating upstream servers", func() {
conf := dataplane.Configuration{
Upstreams: []dataplane.Upstream{
{
Name: "one",
},
},
}

type callCounts struct {
generate int
update int
reload int
}

assertCallCounts := func(cc callCounts) {
Expect(fakeGenerator.GenerateCallCount()).To(Equal(cc.generate))
Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(cc.generate))
Expect(fakeNginxRuntimeMgr.UpdateHTTPServersCallCount()).To(Equal(cc.update))
Expect(fakeNginxRuntimeMgr.ReloadCallCount()).To(Equal(cc.reload))
}

BeforeEach(func() {
upstreams := ngxclient.Upstreams{
"one": ngxclient.Upstream{
Peers: []ngxclient.Peer{
{Server: "server1"},
},
},
}
fakeNginxRuntimeMgr.GetUpstreamsReturns(upstreams, nil)
})

When("running NGINX Plus", func() {
BeforeEach(func() {
fakeNginxRuntimeMgr.IsPlusReturns(true)
})

It("should update servers using the NGINX Plus API", func() {
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())

assertCallCounts(callCounts{generate: 1, update: 1, reload: 0})
})

It("should reload when GET API returns an error", func() {
fakeNginxRuntimeMgr.GetUpstreamsReturns(nil, errors.New("error"))
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())

assertCallCounts(callCounts{generate: 1, update: 0, reload: 1})
})

It("should reload when POST API returns an error", func() {
fakeNginxRuntimeMgr.UpdateHTTPServersReturns(errors.New("error"))
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())

assertCallCounts(callCounts{generate: 1, update: 1, reload: 1})
})
})

When("not running NGINX Plus", func() {
It("should update servers by reloading", func() {
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())

assertCallCounts(callCounts{generate: 1, update: 0, reload: 1})
})

It("should return an error when reloading fails", func() {
fakeNginxRuntimeMgr.ReloadReturns(errors.New("error"))
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).ToNot(Succeed())

assertCallCounts(callCounts{generate: 1, update: 0, reload: 1})
})
})
})

It("should set the health checker status properly when there are changes", func() {
e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}}
batch := []interface{}{e}

fakeProcessor.ProcessReturns(true, &graph.Graph{})
fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{})

Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed())
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
Expand All @@ -304,22 +426,22 @@ var _ = Describe("eventHandler", func() {
e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}}
batch := []interface{}{e}

fakeProcessor.ProcessReturns(true, &graph.Graph{})
fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{})
fakeNginxRuntimeMgr.ReloadReturns(errors.New("reload error"))

handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed())

// now send an update with no changes; should still return an error
fakeProcessor.ProcessReturns(false, &graph.Graph{})
fakeProcessor.ProcessReturns(state.NoChange, &graph.Graph{})

handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed())

// error goes away
fakeProcessor.ProcessReturns(true, &graph.Graph{})
fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{})
fakeNginxRuntimeMgr.ReloadReturns(nil)

handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
Expand All @@ -339,6 +461,46 @@ var _ = Describe("eventHandler", func() {
})
})

var _ = Describe("serversEqual", func() {
DescribeTable("determines if server lists are equal",
func(newServers []ngxclient.UpstreamServer, oldServers []ngxclient.Peer, equal bool) {
Expect(serversEqual(newServers, oldServers)).To(Equal(equal))
},
Entry("different length",
[]ngxclient.UpstreamServer{
{Server: "server1"},
},
[]ngxclient.Peer{
{Server: "server1"},
{Server: "server2"},
},
false,
),
Entry("differing elements",
[]ngxclient.UpstreamServer{
{Server: "server1"},
{Server: "server2"},
},
[]ngxclient.Peer{
{Server: "server1"},
{Server: "server3"},
},
false,
),
Entry("same elements",
[]ngxclient.UpstreamServer{
{Server: "server1"},
{Server: "server2"},
},
[]ngxclient.Peer{
{Server: "server1"},
{Server: "server2"},
},
true,
),
)
})

var _ = Describe("getGatewayAddresses", func() {
It("gets gateway addresses from a Service", func() {
fakeClient := fake.NewFakeClient()
Expand Down
Loading
Loading