Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds controller: resolve ServiceEndpoints references in ProxyStateTemp… #18544

Merged
merged 1 commit into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions internal/mesh/internal/controllers/xds/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package xds

import (
"context"

"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/xds/status"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/mappers/bimapper"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate"
"github.com/hashicorp/consul/proto-public/pbresource"
)

const ControllerName = "consul.io/xds-controller"

func Controller(mapper *bimapper.Mapper, updater ProxyUpdater) controller.Controller {
if mapper == nil || updater == nil {
panic("mapper and updater are required")
}

return controller.ForType(types.ProxyStateTemplateType).
WithWatch(catalog.ServiceEndpointsType, mapper.MapLink).
WithPlacement(controller.PlacementEachServer).
WithReconciler(&xdsReconciler{bimapper: mapper, updater: updater})
}

type xdsReconciler struct {
bimapper *bimapper.Mapper
updater ProxyUpdater
}

// ProxyUpdater is an interface that defines the ability to push proxy updates to the updater
// and also check its connectivity to the server.
type ProxyUpdater interface {
// PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy.
PushChange(id *pbresource.ID, snapshot *pbmesh.ProxyState) error

// ProxyConnectedToServer returns whether this id is connected to this server.
ProxyConnectedToServer(id *pbresource.ID) bool
}

func (r *xdsReconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {
rt.Logger = rt.Logger.With("resource-id", req.ID, "controller", ControllerName)

rt.Logger.Trace("reconciling proxy state template", "id", req.ID)

// Get the ProxyStateTemplate.
proxyStateTemplate, err := getProxyStateTemplate(ctx, rt, req.ID)
if err != nil {
rt.Logger.Error("error reading proxy state template", "error", err)
return err
}

if proxyStateTemplate == nil || proxyStateTemplate.Template == nil || !r.updater.ProxyConnectedToServer(req.ID) {
rt.Logger.Trace("proxy state template has been deleted or this controller is not responsible for this proxy state template", "id", req.ID)

// If the proxy state was deleted, we should remove references to it in the mapper.
r.bimapper.UntrackItem(req.ID)

return nil
}

var (
statusCondition *pbresource.Condition
pstResource *pbresource.Resource
)
pstResource = proxyStateTemplate.Resource

// Initialize the ProxyState endpoints map.
if proxyStateTemplate.Template.ProxyState == nil {
rt.Logger.Error("proxy state was missing from proxy state template")
// Set the status.
statusCondition = status.ConditionRejectedNilProxyState(status.KeyFromID(req.ID))
status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition)

return err
}
if proxyStateTemplate.Template.ProxyState.Endpoints == nil {
proxyStateTemplate.Template.ProxyState.Endpoints = make(map[string]*pbproxystate.Endpoints)
}

// Iterate through the endpoint references.
// For endpoints, the controller should:
// 1. Resolve ServiceEndpoint references
// 2. Translate them into pbproxystate.Endpoints
// 3. Add the pbproxystate.Endpoints to the ProxyState endpoints map.
// 4. Track relationships between ProxyState and ServiceEndpoints, such that we can look up ServiceEndpoints and
// figure out which ProxyStates are associated with it (for mapping watches) and vice versa (for looking up
// references). The bimapper package is useful for tracking these relationships.
endpointReferencesMap := proxyStateTemplate.Template.RequiredEndpoints
var endpointsInProxyStateTemplate []resource.ReferenceOrID
for xdsClusterName, endpointRef := range endpointReferencesMap {

// Step 1: Resolve the reference by looking up the ServiceEndpoints.
// serviceEndpoints will not be nil unless there is an error.
serviceEndpoints, err := getServiceEndpoints(ctx, rt, endpointRef.Id)
if err != nil {
rt.Logger.Error("error reading service endpoint", "id", endpointRef.Id, "error", err)
// Set the status.
statusCondition = status.ConditionRejectedErrorReadingEndpoints(status.KeyFromID(endpointRef.Id), err.Error())
status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition)

return err
}

// Step 2: Translate it into pbproxystate.Endpoints.
psEndpoints, err := generateProxyStateEndpoints(serviceEndpoints, endpointRef.Port)
if err != nil {
rt.Logger.Error("error translating service endpoints to proxy state endpoints", "endpoint", endpointRef.Id, "error", err)

// Set the status.
statusCondition = status.ConditionRejectedCreatingProxyStateEndpoints(status.KeyFromID(endpointRef.Id), err.Error())
status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition)

return err
}

// Step 3: Add the endpoints to ProxyState.
proxyStateTemplate.Template.ProxyState.Endpoints[xdsClusterName] = psEndpoints

// Track all the endpoints that are used by this ProxyStateTemplate, so we can use this for step 4.
endpointResourceRef := resource.Reference(endpointRef.Id, "")
endpointsInProxyStateTemplate = append(endpointsInProxyStateTemplate, endpointResourceRef)

}

// Step 4: Track relationships between ProxyStateTemplates and ServiceEndpoints.
r.bimapper.TrackItem(req.ID, endpointsInProxyStateTemplate)

computedProxyState := proxyStateTemplate.Template.ProxyState

err = r.updater.PushChange(req.ID, computedProxyState)
if err != nil {
// Set the status.
statusCondition = status.ConditionRejectedPushChangeFailed(status.KeyFromID(req.ID))
status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition)
return err
}

// Set the status.
statusCondition = status.ConditionAccepted()
status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition)
return nil
}

func resourceIdToReference(id *pbresource.ID) *pbresource.Reference {
ref := &pbresource.Reference{
Name: id.GetName(),
Type: id.GetType(),
Tenancy: id.GetTenancy(),
}
return ref
}
Loading