From 012fc50fdb0533eb709220bd7939ed7c48a37020 Mon Sep 17 00:00:00 2001 From: Ashwin Venkatesh Date: Tue, 22 Aug 2023 10:55:52 -0400 Subject: [PATCH] xds controller: resolve ServiceEndpoints references in ProxyStateTemplate --- .../internal/controllers/xds/controller.go | 156 ++++ .../controllers/xds/controller_test.go | 664 ++++++++++++++++++ .../controllers/xds/endpoint_builder.go | 72 ++ .../controllers/xds/endpoint_builder_test.go | 233 ++++++ .../internal/controllers/xds/mock_updater.go | 90 +++ .../controllers/xds/reconciliation_data.go | 58 ++ .../internal/controllers/xds/status/status.go | 92 +++ 7 files changed, 1365 insertions(+) create mode 100644 internal/mesh/internal/controllers/xds/controller.go create mode 100644 internal/mesh/internal/controllers/xds/controller_test.go create mode 100644 internal/mesh/internal/controllers/xds/endpoint_builder.go create mode 100644 internal/mesh/internal/controllers/xds/endpoint_builder_test.go create mode 100644 internal/mesh/internal/controllers/xds/mock_updater.go create mode 100644 internal/mesh/internal/controllers/xds/reconciliation_data.go create mode 100644 internal/mesh/internal/controllers/xds/status/status.go diff --git a/internal/mesh/internal/controllers/xds/controller.go b/internal/mesh/internal/controllers/xds/controller.go new file mode 100644 index 000000000000..5d1e5da7cc6c --- /dev/null +++ b/internal/mesh/internal/controllers/xds/controller.go @@ -0,0 +1,156 @@ +package xds + +import ( + "context" + + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/mesh/internal/controllers/xds/status" + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/resource/mappers/bimapper" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +const ControllerName = "consul.io/xds-controller" + +func Controller(mapper *bimapper.Mapper, updater ProxyUpdater) controller.Controller { + if mapper == nil || updater == nil { + panic("mapper and updater are required") + } + + return controller.ForType(types.ProxyStateTemplateType). + WithWatch(catalog.ServiceEndpointsType, mapper.MapLink). + WithPlacement(controller.PlacementEachServer). + WithReconciler(&xdsReconciler{bimapper: mapper, updater: updater}) +} + +type xdsReconciler struct { + bimapper *bimapper.Mapper + updater ProxyUpdater +} + +// ProxyUpdater is an interface that defines the ability to push proxy updates to the updater +// and also check its connectivity to the server. +type ProxyUpdater interface { + // PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy. + PushChange(id *pbresource.ID, snapshot *pbmesh.ProxyState) error + + // ProxyConnectedToServer returns whether this id is connected to this server. + ProxyConnectedToServer(id *pbresource.ID) bool +} + +func (r *xdsReconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error { + rt.Logger = rt.Logger.With("resource-id", req.ID, "controller", ControllerName) + + rt.Logger.Trace("reconciling proxy state template", "id", req.ID) + + // Get the ProxyStateTemplate. + proxyStateTemplate, err := getProxyStateTemplate(ctx, rt, req.ID) + if err != nil { + rt.Logger.Error("error reading proxy state template", "error", err) + return err + } + + if proxyStateTemplate == nil || proxyStateTemplate.Template == nil || !r.updater.ProxyConnectedToServer(req.ID) { + rt.Logger.Trace("proxy state template has been deleted or this controller is not responsible for this proxy state template", "id", req.ID) + + // If the proxy state was deleted, we should remove references to it in the mapper. + r.bimapper.UntrackItem(req.ID) + + return nil + } + + var ( + statusCondition *pbresource.Condition + pstResource *pbresource.Resource + ) + pstResource = proxyStateTemplate.Resource + + // Initialize the ProxyState endpoints map. + if proxyStateTemplate.Template.ProxyState == nil { + rt.Logger.Error("proxy state was missing from proxy state template") + // Set the status. + statusCondition = status.ConditionRejectedNilProxyState(status.KeyFromID(req.ID)) + status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition) + + return err + } + if proxyStateTemplate.Template.ProxyState.Endpoints == nil { + proxyStateTemplate.Template.ProxyState.Endpoints = make(map[string]*pbproxystate.Endpoints) + } + + // Iterate through the endpoint references. + // For endpoints, the controller should: + // 1. Resolve ServiceEndpoint references + // 2. Translate them into pbproxystate.Endpoints + // 3. Add the pbproxystate.Endpoints to the ProxyState endpoints map. + // 4. Track relationships between ProxyState and ServiceEndpoints, such that we can look up ServiceEndpoints and + // figure out which ProxyStates are associated with it (for mapping watches) and vice versa (for looking up + // references). The bimapper package is useful for tracking these relationships. + endpointReferencesMap := proxyStateTemplate.Template.RequiredEndpoints + var endpointsInProxyStateTemplate []resource.ReferenceOrID + for xdsClusterName, endpointRef := range endpointReferencesMap { + + // Step 1: Resolve the reference by looking up the ServiceEndpoints. + // serviceEndpoints will not be nil unless there is an error. + serviceEndpoints, err := getServiceEndpoints(ctx, rt, endpointRef.Id) + if err != nil { + rt.Logger.Error("error reading service endpoint", "id", endpointRef.Id, "error", err) + // Set the status. + statusCondition = status.ConditionRejectedErrorReadingEndpoints(status.KeyFromID(endpointRef.Id), err.Error()) + status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition) + + return err + } + + // Step 2: Translate it into pbproxystate.Endpoints. + psEndpoints, err := generateProxyStateEndpoints(serviceEndpoints, endpointRef.Port) + if err != nil { + rt.Logger.Error("error translating service endpoints to proxy state endpoints", "endpoint", endpointRef.Id, "error", err) + + // Set the status. + statusCondition = status.ConditionRejectedCreatingProxyStateEndpoints(status.KeyFromID(endpointRef.Id), err.Error()) + status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition) + + return err + } + + // Step 3: Add the endpoints to ProxyState. + proxyStateTemplate.Template.ProxyState.Endpoints[xdsClusterName] = psEndpoints + + // Track all the endpoints that are used by this ProxyStateTemplate, so we can use this for step 4. + endpointResourceRef := resource.Reference(endpointRef.Id, "") + endpointsInProxyStateTemplate = append(endpointsInProxyStateTemplate, endpointResourceRef) + + } + + // Step 4: Track relationships between ProxyStateTemplates and ServiceEndpoints. + r.bimapper.TrackItem(req.ID, endpointsInProxyStateTemplate) + + computedProxyState := proxyStateTemplate.Template.ProxyState + + err = r.updater.PushChange(req.ID, computedProxyState) + if err != nil { + // Set the status. + statusCondition = status.ConditionRejectedPushChangeFailed(status.KeyFromID(req.ID)) + status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition) + return err + } + + // Set the status. + statusCondition = status.ConditionAccepted() + status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition) + return nil +} + +func resourceIdToReference(id *pbresource.ID) *pbresource.Reference { + ref := &pbresource.Reference{ + Name: id.GetName(), + Type: id.GetType(), + Tenancy: id.GetTenancy(), + } + return ref +} diff --git a/internal/mesh/internal/controllers/xds/controller_test.go b/internal/mesh/internal/controllers/xds/controller_test.go new file mode 100644 index 000000000000..e6df71c70eda --- /dev/null +++ b/internal/mesh/internal/controllers/xds/controller_test.go @@ -0,0 +1,664 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package xds + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/mesh/internal/controllers/xds/status" + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/resource/mappers/bimapper" + "github.com/hashicorp/consul/internal/resource/resourcetest" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/prototest" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/consul/sdk/testutil/retry" +) + +type xdsControllerTestSuite struct { + suite.Suite + + ctx context.Context + client *resourcetest.Client + runtime controller.Runtime + + ctl *xdsReconciler + mapper *bimapper.Mapper + updater *mockUpdater + + fooProxyStateTemplate *pbresource.Resource + barProxyStateTemplate *pbresource.Resource + barEndpointRefs map[string]*pbproxystate.EndpointRef + fooEndpointRefs map[string]*pbproxystate.EndpointRef + fooEndpoints *pbresource.Resource + fooService *pbresource.Resource + fooBarEndpoints *pbresource.Resource + fooBarService *pbresource.Resource + expectedFooProxyStateEndpoints map[string]*pbproxystate.Endpoints + expectedBarProxyStateEndpoints map[string]*pbproxystate.Endpoints +} + +func (suite *xdsControllerTestSuite) SetupTest() { + suite.ctx = testutil.TestContext(suite.T()) + resourceClient := svctest.RunResourceService(suite.T(), types.Register, catalog.RegisterTypes) + suite.runtime = controller.Runtime{Client: resourceClient, Logger: testutil.Logger(suite.T())} + suite.client = resourcetest.NewClient(resourceClient) + + suite.mapper = bimapper.New(types.ProxyStateTemplateType, catalog.ServiceEndpointsType) + suite.updater = NewMockUpdater() + + suite.ctl = &xdsReconciler{ + bimapper: suite.mapper, + updater: suite.updater, + } +} + +// This test ensures when a ProxyState is deleted, it is no longer tracked in the mapper. +func (suite *xdsControllerTestSuite) TestReconcile_NoProxyStateTemplate() { + // Track the id of a non-existent ProxyStateTemplate. + proxyStateTemplateId := resourcetest.Resource(types.ProxyStateTemplateType, "not-found").ID() + suite.mapper.TrackItem(proxyStateTemplateId, []resource.ReferenceOrID{}) + + // Run the reconcile, and since no ProxyStateTemplate is stored, this simulates a deletion. + err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ + ID: proxyStateTemplateId, + }) + require.NoError(suite.T(), err) + + // Assert that nothing is tracked in the mapper. + require.True(suite.T(), suite.mapper.IsEmpty()) +} + +// This test ensures if the controller was previously tracking a ProxyStateTemplate, and now that proxy has +// disconnected from this server, it's ignored and removed from the mapper. +func (suite *xdsControllerTestSuite) TestReconcile_RemoveTrackingProxiesNotConnectedToServer() { + // Store the initial ProxyStateTemplate and track it in the mapper. + proxyStateTemplate := resourcetest.Resource(types.ProxyStateTemplateType, "test"). + WithData(suite.T(), &pbmesh.ProxyStateTemplate{}). + Write(suite.T(), suite.client) + + suite.mapper.TrackItem(proxyStateTemplate.Id, []resource.ReferenceOrID{}) + + // Simulate the proxy disconnecting from this server. The resource still exists, but this proxy might be connected + // to a different server now, so we no longer need to track it. + suite.updater.notConnected = true + + // Run the reconcile. + err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ + ID: proxyStateTemplate.Id, + }) + require.NoError(suite.T(), err) + + // Assert that nothing is tracked in the mapper. + require.True(suite.T(), suite.mapper.IsEmpty()) +} + +// This test sets up the updater to return an error when calling PushChange, and ensures the status is set +// correctly. +func (suite *xdsControllerTestSuite) TestReconcile_PushChangeError() { + // Have the mock simulate an error from the PushChange call. + suite.updater.pushChangeError = true + + // Setup a happy path scenario. + suite.setupFooProxyStateTemplateAndEndpoints() + + // Run the reconcile. + err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ + ID: suite.fooProxyStateTemplate.Id, + }) + require.Error(suite.T(), err) + + // Assert on the status reflecting endpoint not found. + suite.client.RequireStatusCondition(suite.T(), suite.fooProxyStateTemplate.Id, ControllerName, status.ConditionRejectedPushChangeFailed(status.KeyFromID(suite.fooProxyStateTemplate.Id))) +} + +// This test sets up a ProxyStateTemplate that references a ServiceEndpoints that doesn't exist, and ensures the +// status is correct. +func (suite *xdsControllerTestSuite) TestReconcile_MissingEndpoint() { + // Set fooProxyStateTemplate with a reference to fooEndpoints, without storing fooEndpoints so the controller should + // notice it's missing. + fooEndpointsId := resourcetest.Resource(catalog.ServiceEndpointsType, "foo-service").ID() + fooRequiredEndpoints := make(map[string]*pbproxystate.EndpointRef) + fooRequiredEndpoints["test-cluster-1"] = &pbproxystate.EndpointRef{ + Id: fooEndpointsId, + Port: "mesh", + } + + fooProxyStateTemplate := resourcetest.Resource(types.ProxyStateTemplateType, "foo-pst"). + WithData(suite.T(), &pbmesh.ProxyStateTemplate{ + RequiredEndpoints: fooRequiredEndpoints, + ProxyState: &pbmesh.ProxyState{}, + }). + Write(suite.T(), suite.client) + + retry.Run(suite.T(), func(r *retry.R) { + suite.client.RequireResourceExists(r, fooProxyStateTemplate.Id) + }) + + // Run the reconcile. + err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ + ID: fooProxyStateTemplate.Id, + }) + require.Error(suite.T(), err) + + // Assert on the status reflecting endpoint not found. + suite.client.RequireStatusCondition(suite.T(), fooProxyStateTemplate.Id, ControllerName, status.ConditionRejectedErrorReadingEndpoints(status.KeyFromID(fooEndpointsId), "rpc error: code = NotFound desc = resource not found")) +} + +// This test sets up a ProxyStateTemplate that references a ServiceEndpoints that can't be read correctly, and +// checks the status is correct. +func (suite *xdsControllerTestSuite) TestReconcile_ReadEndpointError() { + badID := &pbresource.ID{ + Type: &pbresource.Type{ + Group: "not", + Kind: "found", + GroupVersion: "vfake", + }, + Tenancy: &pbresource.Tenancy{Namespace: "default", Partition: "default", PeerName: "local"}, + } + fooRequiredEndpoints := make(map[string]*pbproxystate.EndpointRef) + fooRequiredEndpoints["test-cluster-1"] = &pbproxystate.EndpointRef{ + Id: badID, + Port: "mesh", + } + + fooProxyStateTemplate := resourcetest.Resource(types.ProxyStateTemplateType, "foo-pst"). + WithData(suite.T(), &pbmesh.ProxyStateTemplate{ + RequiredEndpoints: fooRequiredEndpoints, + ProxyState: &pbmesh.ProxyState{}, + }). + Write(suite.T(), suite.client) + + retry.Run(suite.T(), func(r *retry.R) { + suite.client.RequireResourceExists(r, fooProxyStateTemplate.Id) + }) + + // Run the reconcile. + err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ + ID: fooProxyStateTemplate.Id, + }) + require.Error(suite.T(), err) + + // Assert on the status reflecting endpoint couldn't be read. + suite.client.RequireStatusCondition(suite.T(), fooProxyStateTemplate.Id, ControllerName, status.ConditionRejectedErrorReadingEndpoints(status.KeyFromID(badID), "rpc error: code = InvalidArgument desc = id.name is required")) +} + +// This test is a happy path creation test to make sure pbproxystate.Endpoints are created in the computed +// pbmesh.ProxyState from the RequiredEndpoints references. More specific translations between endpoint references +// and pbproxystate.Endpoints are unit tested in endpoint_builder.go. +func (suite *xdsControllerTestSuite) TestReconcile_ProxyStateTemplateComputesEndpoints() { + // Set up fooEndpoints and fooProxyStateTemplate with a reference to fooEndpoints and store them in the state store. + // This setup saves expected values in the suite so it can be asserted against later. + suite.setupFooProxyStateTemplateAndEndpoints() + + // Run the reconcile. + err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ + ID: suite.fooProxyStateTemplate.Id, + }) + require.NoError(suite.T(), err) + + // Assert on the status. + suite.client.RequireStatusCondition(suite.T(), suite.fooProxyStateTemplate.Id, ControllerName, status.ConditionAccepted()) + + // Assert that the endpoints computed in the controller matches the expected endpoints. + actualEndpoints := suite.updater.GetEndpoints(suite.fooProxyStateTemplate.Id.Name) + prototest.AssertDeepEqual(suite.T(), suite.expectedFooProxyStateEndpoints, actualEndpoints) +} + +// This test is a happy path creation test that calls reconcile multiple times with a more complex setup. This +// scenario is trickier to test in the controller test because the end computation of the xds controller is not +// stored in the state store. So this test ensures that between multiple reconciles the correct ProxyStates are +// computed for each ProxyStateTemplate. +func (suite *xdsControllerTestSuite) TestReconcile_MultipleProxyStateTemplatesComputesMultipleEndpoints() { + // Set up fooProxyStateTemplate and barProxyStateTemplate and their associated resources and store them. Resources + // and expected results are stored in the suite to assert against. + suite.setupFooBarProxyStateTemplateAndEndpoints() + + // Reconcile the fooProxyStateTemplate. + err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ + ID: suite.fooProxyStateTemplate.Id, + }) + require.NoError(suite.T(), err) + + // Assert on the status. + suite.client.RequireStatusCondition(suite.T(), suite.fooProxyStateTemplate.Id, ControllerName, status.ConditionAccepted()) + + // Assert that the endpoints computed in the controller matches the expected endpoints. + actualEndpoints := suite.updater.GetEndpoints(suite.fooProxyStateTemplate.Id.Name) + prototest.AssertDeepEqual(suite.T(), suite.expectedFooProxyStateEndpoints, actualEndpoints) + + // Reconcile the barProxyStateTemplate. + err = suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ + ID: suite.barProxyStateTemplate.Id, + }) + require.NoError(suite.T(), err) + + // Assert on the status. + suite.client.RequireStatusCondition(suite.T(), suite.barProxyStateTemplate.Id, ControllerName, status.ConditionAccepted()) + + // Assert that the endpoints computed in the controller matches the expected endpoints. + actualBarEndpoints := suite.updater.GetEndpoints(suite.barProxyStateTemplate.Id.Name) + prototest.AssertDeepEqual(suite.T(), suite.expectedBarProxyStateEndpoints, actualBarEndpoints) +} + +// Sets up a full controller, and tests that reconciles are getting triggered for the events it should. +func (suite *xdsControllerTestSuite) TestController_ComputeAddUpdateEndpoints() { + // Run the controller manager. + mgr := controller.NewManager(suite.client, suite.runtime.Logger) + mgr.Register(Controller(suite.mapper, suite.updater)) + mgr.SetRaftLeader(true) + go mgr.Run(suite.ctx) + + // Set up fooEndpoints and fooProxyStateTemplate with a reference to fooEndpoints. These need to be stored + // because the controller reconcile looks them up. + suite.setupFooProxyStateTemplateAndEndpoints() + + // Assert that the expected ProxyState matches the actual ProxyState that PushChange was called with. This needs to + // be in a retry block unlike the Reconcile tests because the controller triggers asynchronously. + retry.Run(suite.T(), func(r *retry.R) { + actualEndpoints := suite.updater.GetEndpoints(suite.fooProxyStateTemplate.Id.Name) + // Assert on the status. + suite.client.RequireStatusCondition(r, suite.fooProxyStateTemplate.Id, ControllerName, status.ConditionAccepted()) + // Assert that the endpoints computed in the controller matches the expected endpoints. + prototest.AssertDeepEqual(r, suite.expectedFooProxyStateEndpoints, actualEndpoints) + }) + + // Now, update the endpoint to be unhealthy. This will ensure the controller is getting triggered on changes to this + // endpoint that it should be tracking, even when the ProxyStateTemplate does not change. + resourcetest.Resource(catalog.ServiceEndpointsType, "foo-service"). + WithData(suite.T(), &pbcatalog.ServiceEndpoints{Endpoints: []*pbcatalog.Endpoint{ + { + Ports: map[string]*pbcatalog.WorkloadPort{ + "mesh": { + Port: 20000, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Addresses: []*pbcatalog.WorkloadAddress{ + { + Host: "10.1.1.1", + Ports: []string{"mesh"}, + }, + { + Host: "10.2.2.2", + Ports: []string{"mesh"}, + }, + }, + HealthStatus: pbcatalog.Health_HEALTH_CRITICAL, + }, + }}). + WithOwner(suite.fooService.Id). + Write(suite.T(), suite.client) + + // Wait for the endpoint to be written. + retry.Run(suite.T(), func(r *retry.R) { + suite.client.RequireVersionChanged(suite.T(), suite.fooEndpoints.Id, suite.fooEndpoints.Version) + }) + + // Update the expected endpoints to also have unhealthy status. + suite.expectedFooProxyStateEndpoints["test-cluster-1"].Endpoints[0].HealthStatus = pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY + suite.expectedFooProxyStateEndpoints["test-cluster-1"].Endpoints[1].HealthStatus = pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY + + retry.Run(suite.T(), func(r *retry.R) { + actualEndpoints := suite.updater.GetEndpoints(suite.fooProxyStateTemplate.Id.Name) + // Assert on the status. + suite.client.RequireStatusCondition(suite.T(), suite.fooProxyStateTemplate.Id, ControllerName, status.ConditionAccepted()) + // Assert that the endpoints computed in the controller matches the expected endpoints. + prototest.AssertDeepEqual(r, suite.expectedFooProxyStateEndpoints, actualEndpoints) + }) + + // Now add a new endpoint reference and endpoint to the fooProxyStateTemplate. This will ensure that the controller + // now tracks the newly added endpoint. + secondService := resourcetest.Resource(catalog.ServiceType, "second-service"). + WithData(suite.T(), &pbcatalog.Service{}). + Write(suite.T(), suite.client) + + secondEndpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "second-service"). + WithData(suite.T(), &pbcatalog.ServiceEndpoints{Endpoints: []*pbcatalog.Endpoint{ + { + Ports: map[string]*pbcatalog.WorkloadPort{ + "mesh": { + Port: 20000, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Addresses: []*pbcatalog.WorkloadAddress{ + { + Host: "10.5.5.5", + Ports: []string{"mesh"}, + }, + { + Host: "10.6.6.6", + Ports: []string{"mesh"}, + }, + }, + }, + }}). + WithOwner(secondService.Id). + Write(suite.T(), suite.client) + + // Update the endpoint references on the fooProxyStateTemplate. + suite.fooEndpointRefs["test-cluster-2"] = &pbproxystate.EndpointRef{ + Id: secondEndpoints.Id, + Port: "mesh", + } + oldVersion := suite.fooProxyStateTemplate.Version + fooProxyStateTemplate := resourcetest.Resource(types.ProxyStateTemplateType, "foo-pst"). + WithData(suite.T(), &pbmesh.ProxyStateTemplate{ + RequiredEndpoints: suite.fooEndpointRefs, + ProxyState: &pbmesh.ProxyState{}, + }). + Write(suite.T(), suite.client) + + retry.Run(suite.T(), func(r *retry.R) { + suite.client.RequireVersionChanged(r, fooProxyStateTemplate.Id, oldVersion) + }) + + // Update the expected endpoints with this new endpoints. + suite.expectedFooProxyStateEndpoints["test-cluster-2"] = &pbproxystate.Endpoints{ + Endpoints: []*pbproxystate.Endpoint{ + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.5.5.5", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY, + }, + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.6.6.6", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY, + }, + }, + } + + retry.Run(suite.T(), func(r *retry.R) { + actualEndpoints := suite.updater.GetEndpoints(suite.fooProxyStateTemplate.Id.Name) + // Assert on the status. + suite.client.RequireStatusCondition(suite.T(), suite.fooProxyStateTemplate.Id, ControllerName, status.ConditionAccepted()) + // Assert that the endpoints computed in the controller matches the expected endpoints. + prototest.AssertDeepEqual(r, suite.expectedFooProxyStateEndpoints, actualEndpoints) + }) + +} + +// Setup: fooProxyStateTemplate with an EndpointsRef to fooEndpoints +// Saves all related resources to the suite so they can be modified if needed. +func (suite *xdsControllerTestSuite) setupFooProxyStateTemplateAndEndpoints() { + fooService := resourcetest.Resource(catalog.ServiceType, "foo-service"). + WithData(suite.T(), &pbcatalog.Service{}). + Write(suite.T(), suite.client) + + fooEndpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "foo-service"). + WithData(suite.T(), &pbcatalog.ServiceEndpoints{Endpoints: []*pbcatalog.Endpoint{ + { + Ports: map[string]*pbcatalog.WorkloadPort{ + "mesh": { + Port: 20000, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Addresses: []*pbcatalog.WorkloadAddress{ + { + Host: "10.1.1.1", + Ports: []string{"mesh"}, + }, + { + Host: "10.2.2.2", + Ports: []string{"mesh"}, + }, + }, + }, + }}). + WithOwner(fooService.Id). + Write(suite.T(), suite.client) + + fooRequiredEndpoints := make(map[string]*pbproxystate.EndpointRef) + fooRequiredEndpoints["test-cluster-1"] = &pbproxystate.EndpointRef{ + Id: fooEndpoints.Id, + Port: "mesh", + } + + fooProxyStateTemplate := resourcetest.Resource(types.ProxyStateTemplateType, "foo-pst"). + WithData(suite.T(), &pbmesh.ProxyStateTemplate{ + RequiredEndpoints: fooRequiredEndpoints, + ProxyState: &pbmesh.ProxyState{}, + }). + Write(suite.T(), suite.client) + + retry.Run(suite.T(), func(r *retry.R) { + suite.client.RequireResourceExists(r, fooProxyStateTemplate.Id) + }) + + expectedFooProxyStateEndpoints := map[string]*pbproxystate.Endpoints{ + "test-cluster-1": {Endpoints: []*pbproxystate.Endpoint{ + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.1.1.1", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY, + }, + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.2.2.2", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY, + }, + }}, + } + suite.fooService = fooService + suite.fooEndpoints = fooEndpoints + suite.fooEndpointRefs = fooRequiredEndpoints + suite.fooProxyStateTemplate = fooProxyStateTemplate + suite.expectedFooProxyStateEndpoints = expectedFooProxyStateEndpoints + +} + +// Setup: +// - fooProxyStateTemplate with an EndpointsRef to fooEndpoints and fooBarEndpoints. +// - barProxyStateTemplate with an EndpointsRef to fooBarEndpoints. +// +// Saves all related resources to the suite so they can be modified if needed. +func (suite *xdsControllerTestSuite) setupFooBarProxyStateTemplateAndEndpoints() { + fooService := resourcetest.Resource(catalog.ServiceType, "foo-service"). + WithData(suite.T(), &pbcatalog.Service{}). + Write(suite.T(), suite.client) + + fooEndpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "foo-service"). + WithData(suite.T(), &pbcatalog.ServiceEndpoints{Endpoints: []*pbcatalog.Endpoint{ + { + Ports: map[string]*pbcatalog.WorkloadPort{ + "mesh": { + Port: 20000, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Addresses: []*pbcatalog.WorkloadAddress{ + { + Host: "10.1.1.1", + Ports: []string{"mesh"}, + }, + { + Host: "10.2.2.2", + Ports: []string{"mesh"}, + }, + }, + }, + }}). + WithOwner(fooService.Id). + Write(suite.T(), suite.client) + + fooBarService := resourcetest.Resource(catalog.ServiceType, "foo-bar-service"). + WithData(suite.T(), &pbcatalog.Service{}). + Write(suite.T(), suite.client) + + fooBarEndpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "foo-bar-service"). + WithData(suite.T(), &pbcatalog.ServiceEndpoints{Endpoints: []*pbcatalog.Endpoint{ + { + Ports: map[string]*pbcatalog.WorkloadPort{ + "mesh": { + Port: 20000, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Addresses: []*pbcatalog.WorkloadAddress{ + { + Host: "10.3.3.3", + Ports: []string{"mesh"}, + }, + { + Host: "10.4.4.4", + Ports: []string{"mesh"}, + }, + }, + }, + }}). + WithOwner(fooBarService.Id). + Write(suite.T(), suite.client) + + fooRequiredEndpoints := make(map[string]*pbproxystate.EndpointRef) + fooRequiredEndpoints["test-cluster-1"] = &pbproxystate.EndpointRef{ + Id: fooEndpoints.Id, + Port: "mesh", + } + fooRequiredEndpoints["test-cluster-2"] = &pbproxystate.EndpointRef{ + Id: fooBarEndpoints.Id, + Port: "mesh", + } + + barRequiredEndpoints := make(map[string]*pbproxystate.EndpointRef) + barRequiredEndpoints["test-cluster-1"] = &pbproxystate.EndpointRef{ + Id: fooBarEndpoints.Id, + // Sidecar proxy controller will usually set mesh port here. + Port: "mesh", + } + + fooProxyStateTemplate := resourcetest.Resource(types.ProxyStateTemplateType, "foo-pst"). + WithData(suite.T(), &pbmesh.ProxyStateTemplate{ + // Contains the foo and foobar endpoints. + RequiredEndpoints: fooRequiredEndpoints, + ProxyState: &pbmesh.ProxyState{}, + }). + Write(suite.T(), suite.client) + + retry.Run(suite.T(), func(r *retry.R) { + suite.client.RequireResourceExists(r, fooProxyStateTemplate.Id) + }) + + barProxyStateTemplate := resourcetest.Resource(types.ProxyStateTemplateType, "bar-pst"). + WithData(suite.T(), &pbmesh.ProxyStateTemplate{ + // Contains the foobar endpoint. + RequiredEndpoints: barRequiredEndpoints, + ProxyState: &pbmesh.ProxyState{}, + }). + Write(suite.T(), suite.client) + + retry.Run(suite.T(), func(r *retry.R) { + suite.client.RequireResourceExists(r, barProxyStateTemplate.Id) + }) + + expectedFooProxyStateEndpoints := map[string]*pbproxystate.Endpoints{ + "test-cluster-1": {Endpoints: []*pbproxystate.Endpoint{ + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.1.1.1", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY, + }, + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.2.2.2", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY, + }, + }}, + "test-cluster-2": {Endpoints: []*pbproxystate.Endpoint{ + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.3.3.3", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY, + }, + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.4.4.4", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY, + }, + }}, + } + + expectedBarProxyStateEndpoints := map[string]*pbproxystate.Endpoints{ + "test-cluster-1": {Endpoints: []*pbproxystate.Endpoint{ + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.3.3.3", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY, + }, + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.4.4.4", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY, + }, + }}, + } + + suite.fooProxyStateTemplate = fooProxyStateTemplate + suite.barProxyStateTemplate = barProxyStateTemplate + suite.barEndpointRefs = barRequiredEndpoints + suite.fooEndpointRefs = fooRequiredEndpoints + suite.fooEndpoints = fooEndpoints + suite.fooService = fooService + suite.fooBarEndpoints = fooBarEndpoints + suite.fooBarService = fooBarService + suite.expectedFooProxyStateEndpoints = expectedFooProxyStateEndpoints + suite.expectedBarProxyStateEndpoints = expectedBarProxyStateEndpoints +} + +func TestXdsController(t *testing.T) { + suite.Run(t, new(xdsControllerTestSuite)) +} diff --git a/internal/mesh/internal/controllers/xds/endpoint_builder.go b/internal/mesh/internal/controllers/xds/endpoint_builder.go new file mode 100644 index 000000000000..71cae4b9b4b6 --- /dev/null +++ b/internal/mesh/internal/controllers/xds/endpoint_builder.go @@ -0,0 +1,72 @@ +package xds + +import ( + "fmt" + "net" + + "golang.org/x/exp/slices" + + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" +) + +func generateProxyStateEndpoints(serviceEndpoints *ServiceEndpointsData, portName string) (*pbproxystate.Endpoints, error) { + var psEndpoints []*pbproxystate.Endpoint + + if serviceEndpoints.Endpoints == nil || serviceEndpoints.Resource == nil { + return nil, fmt.Errorf("service endpoints requires both endpoints and resource") + } + eps := serviceEndpoints.Endpoints.GetEndpoints() + + for _, ep := range eps { + for _, addr := range ep.Addresses { + // Check if the address is using the portName name this proxy state endpoints is for. If it does, create the + // endpoint. + if slices.Contains(addr.Ports, portName) { + // Lookup the portName number from the portName name. + wlPort, ok := ep.Ports[portName] + if !ok { + // This should never happen, as it should be validated by the ServiceEndpoints controller. + return nil, fmt.Errorf("could not find portName %q in endpoint %s", portName, serviceEndpoints.Resource.Id) + } + portNum := wlPort.Port + + psEndpoint, err := createProxyStateEndpoint(addr.Host, portNum, ep.HealthStatus) + if err != nil { + return nil, err + } + psEndpoints = append(psEndpoints, psEndpoint) + } + } + } + + return &pbproxystate.Endpoints{Endpoints: psEndpoints}, nil +} + +func createProxyStateEndpoint(host string, port uint32, health pbcatalog.Health) (*pbproxystate.Endpoint, error) { + addr := net.ParseIP(host) + if addr == nil { + return nil, fmt.Errorf("host is not an ip") + } + + psEndpoint := &pbproxystate.Endpoint{ + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: host, + Port: port, + }, + }, + HealthStatus: endpointHealth(health), + // TODO(xds): Weight will be added later. More information is potentially needed in the reference. + } + return psEndpoint, nil +} + +func endpointHealth(catalogHealth pbcatalog.Health) pbproxystate.HealthStatus { + health := pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY + + if catalogHealth == pbcatalog.Health_HEALTH_CRITICAL { + health = pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY + } + return health +} diff --git a/internal/mesh/internal/controllers/xds/endpoint_builder_test.go b/internal/mesh/internal/controllers/xds/endpoint_builder_test.go new file mode 100644 index 000000000000..6334429abd1c --- /dev/null +++ b/internal/mesh/internal/controllers/xds/endpoint_builder_test.go @@ -0,0 +1,233 @@ +package xds + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/resource/resourcetest" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" + "github.com/hashicorp/consul/proto/private/prototest" +) + +func TestMakeProxyStateEndpointsFromServiceEndpoints(t *testing.T) { + type test struct { + name string + serviceEndpointsData *ServiceEndpointsData + portName string + expErr string + expectedProxyStateEndpoints *pbproxystate.Endpoints + } + cases := []test{ + { + name: "endpoints with passing health", + serviceEndpointsData: serviceEndpointsData("passing"), + portName: "mesh", + expectedProxyStateEndpoints: &pbproxystate.Endpoints{ + Endpoints: []*pbproxystate.Endpoint{ + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.1.1.1", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY, + }, + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.2.2.2", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY, + }, + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.3.3.3", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY, + }, + }, + }, + }, + { + name: "endpoints with critical health", + serviceEndpointsData: serviceEndpointsData("critical"), + portName: "mesh", + expectedProxyStateEndpoints: &pbproxystate.Endpoints{ + Endpoints: []*pbproxystate.Endpoint{ + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.1.1.1", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY, + }, + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.2.2.2", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY, + }, + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.3.3.3", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY, + }, + }, + }, + }, + { + name: "endpoints with any health are considered healthy", + serviceEndpointsData: serviceEndpointsData("any"), + portName: "mesh", + expectedProxyStateEndpoints: &pbproxystate.Endpoints{ + Endpoints: []*pbproxystate.Endpoint{ + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.1.1.1", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY, + }, + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.2.2.2", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY, + }, + { + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "10.3.3.3", + Port: 20000, + }, + }, + HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY, + }, + }, + }, + }, + { + name: "endpoints with missing ports returns an error", + serviceEndpointsData: serviceEndpointsData("missing port lookup"), + portName: "mesh", + expErr: "could not find portName", + }, + { + name: "nil endpoints returns an error", + serviceEndpointsData: serviceEndpointsData("nil endpoints"), + portName: "mesh", + expErr: "service endpoints requires both endpoints and resource", + }, + { + name: "nil resource returns an error", + serviceEndpointsData: serviceEndpointsData("nil resource"), + portName: "mesh", + expErr: "service endpoints requires both endpoints and resource", + }, + { + name: "portName doesn't exist in endpoints results in empty endpoints", + serviceEndpointsData: serviceEndpointsData("passing"), + portName: "does-not-exist", + expectedProxyStateEndpoints: &pbproxystate.Endpoints{ + Endpoints: []*pbproxystate.Endpoint{}, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + actualEndpoints, err := generateProxyStateEndpoints(tc.serviceEndpointsData, tc.portName) + if tc.expErr != "" { + require.ErrorContains(t, err, tc.expErr) + } else { + prototest.AssertDeepEqual(t, tc.expectedProxyStateEndpoints, actualEndpoints) + } + }) + } +} + +func serviceEndpointsData(variation string) *ServiceEndpointsData { + r := resourcetest.Resource(catalog.ServiceEndpointsType, "test").Build() + eps := &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + Ports: map[string]*pbcatalog.WorkloadPort{ + "mesh": { + Port: 20000, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Addresses: []*pbcatalog.WorkloadAddress{ + { + Host: "10.1.1.1", + Ports: []string{"mesh"}, + }, + { + Host: "10.2.2.2", + Ports: []string{"mesh"}, + }, + }, + HealthStatus: pbcatalog.Health_HEALTH_PASSING, + }, + { + Ports: map[string]*pbcatalog.WorkloadPort{ + "mesh": { + Port: 20000, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Addresses: []*pbcatalog.WorkloadAddress{ + { + Host: "10.3.3.3", + Ports: []string{"mesh"}, + }, + }, + HealthStatus: pbcatalog.Health_HEALTH_PASSING, + }, + }, + } + + switch variation { + case "passing": + case "critical": + eps.Endpoints[0].HealthStatus = pbcatalog.Health_HEALTH_CRITICAL + eps.Endpoints[1].HealthStatus = pbcatalog.Health_HEALTH_CRITICAL + case "any": + eps.Endpoints[0].HealthStatus = pbcatalog.Health_HEALTH_ANY + eps.Endpoints[1].HealthStatus = pbcatalog.Health_HEALTH_ANY + case "missing port lookup": + delete(eps.Endpoints[0].Ports, "mesh") + case "nil endpoints": + eps = nil + case "nil resource": + r = nil + } + + return &ServiceEndpointsData{ + Resource: r, + Endpoints: eps, + } +} diff --git a/internal/mesh/internal/controllers/xds/mock_updater.go b/internal/mesh/internal/controllers/xds/mock_updater.go new file mode 100644 index 000000000000..33564062bc07 --- /dev/null +++ b/internal/mesh/internal/controllers/xds/mock_updater.go @@ -0,0 +1,90 @@ +package xds + +import ( + "fmt" + "sync" + + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" + + "github.com/hashicorp/consul/proto-public/pbresource" +) + +// mockUpdater mocks the updater functions, and stores ProxyStates from calls to PushChange, so we can assert it was +// computed correctly in the controller. +type mockUpdater struct { + lock sync.Mutex + // latestPs is a map from a ProxyStateTemplate's id.Name in string form to the last computed ProxyState for that + // ProxyStateTemplate. + latestPs map[string]*pbmesh.ProxyState + notConnected bool + pushChangeError bool +} + +func NewMockUpdater() *mockUpdater { + return &mockUpdater{ + latestPs: make(map[string]*pbmesh.ProxyState), + } +} + +func (m *mockUpdater) SetPushChangeErrorTrue() { + m.lock.Lock() + defer m.lock.Unlock() + m.pushChangeError = true +} + +func (m *mockUpdater) SetProxyAsNotConnected() { + m.lock.Lock() + defer m.lock.Unlock() + m.notConnected = true +} + +func (m *mockUpdater) PushChange(id *pbresource.ID, snapshot *pbmesh.ProxyState) error { + m.lock.Lock() + defer m.lock.Unlock() + if m.pushChangeError { + return fmt.Errorf("mock push change error") + } else { + m.setUnsafe(id.Name, snapshot) + } + return nil +} + +func (m *mockUpdater) ProxyConnectedToServer(_ *pbresource.ID) bool { + m.lock.Lock() + defer m.lock.Unlock() + if m.notConnected { + return false + } + return true +} + +func (p *mockUpdater) Get(name string) *pbmesh.ProxyState { + p.lock.Lock() + defer p.lock.Unlock() + ps, ok := p.latestPs[name] + if ok { + return ps + } + return nil +} + +func (p *mockUpdater) GetEndpoints(name string) map[string]*pbproxystate.Endpoints { + p.lock.Lock() + defer p.lock.Unlock() + ps, ok := p.latestPs[name] + if ok { + return ps.Endpoints + } + return nil +} + +func (p *mockUpdater) Set(name string, ps *pbmesh.ProxyState) { + p.lock.Lock() + defer p.lock.Unlock() + p.setUnsafe(name, ps) +} + +func (p *mockUpdater) setUnsafe(name string, ps *pbmesh.ProxyState) { + p.latestPs[name] = ps +} diff --git a/internal/mesh/internal/controllers/xds/reconciliation_data.go b/internal/mesh/internal/controllers/xds/reconciliation_data.go new file mode 100644 index 000000000000..53b515a05257 --- /dev/null +++ b/internal/mesh/internal/controllers/xds/reconciliation_data.go @@ -0,0 +1,58 @@ +package xds + +import ( + "context" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/resource" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +type ServiceEndpointsData struct { + Resource *pbresource.Resource + Endpoints *pbcatalog.ServiceEndpoints +} + +type ProxyStateTemplateData struct { + Resource *pbresource.Resource + Template *pbmesh.ProxyStateTemplate +} + +// getServiceEndpoints will return a non-nil &ServiceEndpointsData unless there is an error. +func getServiceEndpoints(ctx context.Context, rt controller.Runtime, id *pbresource.ID) (*ServiceEndpointsData, error) { + rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: id}) + if err != nil { + return nil, err + } + + var se pbcatalog.ServiceEndpoints + err = rsp.Resource.Data.UnmarshalTo(&se) + if err != nil { + return nil, resource.NewErrDataParse(&se, err) + } + + return &ServiceEndpointsData{Resource: rsp.Resource, Endpoints: &se}, nil +} + +func getProxyStateTemplate(ctx context.Context, rt controller.Runtime, id *pbresource.ID) (*ProxyStateTemplateData, error) { + rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: id}) + switch { + case status.Code(err) == codes.NotFound: + return nil, nil + case err != nil: + return nil, err + } + + var pst pbmesh.ProxyStateTemplate + err = rsp.Resource.Data.UnmarshalTo(&pst) + if err != nil { + return nil, resource.NewErrDataParse(&pst, err) + } + + return &ProxyStateTemplateData{Resource: rsp.Resource, Template: &pst}, nil +} diff --git a/internal/mesh/internal/controllers/xds/status/status.go b/internal/mesh/internal/controllers/xds/status/status.go new file mode 100644 index 000000000000..706152329035 --- /dev/null +++ b/internal/mesh/internal/controllers/xds/status/status.go @@ -0,0 +1,92 @@ +package status + +import ( + "context" + "fmt" + + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +const ( + StatusConditionProxyStateAccepted = "ProxyStateAccepted" + StatusReasonNilProxyState = "ProxyStateNil" + StatusReasonProxyStateReferencesComputed = "ProxyStateReferencesComputed" + StatusReasonEndpointNotRead = "ProxyStateEndpointReferenceReadError" + StatusReasonCreatingProxyStateEndpointsFailed = "ProxyStateEndpointsNotComputed" + StatusReasonPushChangeFailed = "ProxyStatePushChangeFailed" +) + +func KeyFromID(id *pbresource.ID) string { + return fmt.Sprintf("%s/%s/%s", + resource.ToGVK(id.Type), + resource.TenancyToString(id.Tenancy), + id.Name) +} + +func ConditionAccepted() *pbresource.Condition { + return &pbresource.Condition{ + Type: StatusConditionProxyStateAccepted, + State: pbresource.Condition_STATE_TRUE, + Reason: StatusReasonProxyStateReferencesComputed, + Message: fmt.Sprintf("proxy state was computed and pushed."), + } +} +func ConditionRejectedNilProxyState(pstRef string) *pbresource.Condition { + return &pbresource.Condition{ + Type: StatusConditionProxyStateAccepted, + State: pbresource.Condition_STATE_FALSE, + Reason: StatusReasonNilProxyState, + Message: fmt.Sprintf("nil proxy state is not valid %q.", pstRef), + } +} +func ConditionRejectedErrorReadingEndpoints(endpointRef string, err string) *pbresource.Condition { + return &pbresource.Condition{ + Type: StatusConditionProxyStateAccepted, + State: pbresource.Condition_STATE_FALSE, + Reason: StatusReasonEndpointNotRead, + Message: fmt.Sprintf("error reading referenced service endpoints %q: %s", endpointRef, err), + } +} +func ConditionRejectedCreatingProxyStateEndpoints(endpointRef string, err string) *pbresource.Condition { + return &pbresource.Condition{ + Type: StatusConditionProxyStateAccepted, + State: pbresource.Condition_STATE_FALSE, + Reason: StatusReasonCreatingProxyStateEndpointsFailed, + Message: fmt.Sprintf("could not create proxy state endpoints from service endpoints %q: %s", endpointRef, err), + } +} +func ConditionRejectedPushChangeFailed(pstRef string) *pbresource.Condition { + return &pbresource.Condition{ + Type: StatusConditionProxyStateAccepted, + State: pbresource.Condition_STATE_FALSE, + Reason: StatusReasonPushChangeFailed, + Message: fmt.Sprintf("failed to push change for proxy state template %q", pstRef), + } +} + +// WriteStatusIfChanged updates the ProxyStateTemplate status if it has changed. +func WriteStatusIfChanged(ctx context.Context, rt controller.Runtime, res *pbresource.Resource, condition *pbresource.Condition) { + newStatus := &pbresource.Status{ + ObservedGeneration: res.Generation, + Conditions: []*pbresource.Condition{ + condition, + }, + } + // If the status is unchanged then we should return and avoid the unnecessary write + const controllerName = "consul.io/xds-controller" + if resource.EqualStatus(res.Status[controllerName], newStatus, false) { + return + } else { + _, err := rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{ + Id: res.Id, + Key: controllerName, + Status: newStatus, + }) + + if err != nil { + rt.Logger.Error("error updating the proxy state template status", "error", err, "proxyStateTeamplate", res.Id) + } + } +}