Skip to content

Commit

Permalink
Handle errors properly when services are de-registered from the catal…
Browse files Browse the repository at this point in the history
…og (#2571)

- In the past, kubernetes nodes were used as the source of truth to
determine the list of services that should exist in Consul.
- In most cases this was ok but becomes a problem when nodes are quickly
deleted from kubernetes such as the case when using spot instances.
- Instead, use consul synthetic-nodes to get the list of services and
deregister the services that do not have endpoint addresses.

---------
Co-authored-by: mr-miles <[email protected]>
  • Loading branch information
curtbushko committed Jul 20, 2023
1 parent a4d9487 commit 414554c
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 23 deletions.
3 changes: 3 additions & 0 deletions .changelog/2571.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
control-plane: fix bug in endpoints controller when deregistering services from consul when a node is deleted.
```
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

err = r.Client.Get(ctx, req.NamespacedName, &serviceEndpoints)

// endpointPods holds a set of all pods this endpoints object is currently pointing to.
// We use this later when we reconcile ACL tokens to decide whether an ACL token in Consul
// is for a pod that no longer exists.
Expand All @@ -183,7 +182,7 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// It is possible that the endpoints object has never been registered, in which case deregistration is a no-op.
if isLabeledIgnore(serviceEndpoints.Labels) {
// We always deregister the service to handle the case where a user has registered the service, then added the label later.
r.Log.Info("Ignoring endpoint labeled with `consul.hashicorp.com/service-ignore: \"true\"`", "name", req.Name, "namespace", req.Namespace)
r.Log.Info("ignoring endpoint labeled with `consul.hashicorp.com/service-ignore: \"true\"`", "name", req.Name, "namespace", req.Namespace)
err = r.deregisterService(apiClient, req.Name, req.Namespace, nil)
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -915,14 +914,14 @@ func getHealthCheckStatusReason(healthCheckStatus, podName, podNamespace string)
// them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map
// has addresses, it will only deregister instances not in the map.
func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error {
// Get services matching metadata.
nodesWithSvcs, err := r.serviceInstancesForK8sNodes(apiClient, k8sSvcName, k8sSvcNamespace)
// Get services matching metadata from Consul
nodesWithSvcs, err := r.serviceInstancesForNodes(apiClient, k8sSvcName, k8sSvcNamespace)
if err != nil {
r.Log.Error(err, "failed to get service instances", "name", k8sSvcName)
return err
}

// Deregister each service instance that matches the metadata.
var errs error
for _, nodeSvcs := range nodesWithSvcs {
for _, svc := range nodeSvcs.Services {
// We need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
Expand All @@ -933,42 +932,48 @@ func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvc
if _, ok := endpointsAddressesMap[svc.Address]; !ok {
// If the service address is not in the Endpoints addresses, deregister it.
r.Log.Info("deregistering service from consul", "svc", svc.ID)
_, err = apiClient.Catalog().Deregister(&api.CatalogDeregistration{
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: nodeSvcs.Node.Node,
ServiceID: svc.ID,
Namespace: svc.Namespace,
}, nil)
if err != nil {
// Do not exit right away as there might be other services that need to be deregistered.
r.Log.Error(err, "failed to deregister service instance", "id", svc.ID)
return err
errs = multierror.Append(errs, err)
} else {
serviceDeregistered = true
}
serviceDeregistered = true
}
} else {
r.Log.Info("deregistering service from consul", "svc", svc.ID)
if _, err = apiClient.Catalog().Deregister(&api.CatalogDeregistration{
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: nodeSvcs.Node.Node,
ServiceID: svc.ID,
Namespace: svc.Namespace,
}, nil); err != nil {
}, nil)
if err != nil {
// Do not exit right away as there might be other services that need to be deregistered.
r.Log.Error(err, "failed to deregister service instance", "id", svc.ID)
return err
errs = multierror.Append(errs, err)
} else {
serviceDeregistered = true
}
serviceDeregistered = true
}

if r.AuthMethod != "" && serviceDeregistered {
r.Log.Info("reconciling ACL tokens for service", "svc", svc.Service)
err = r.deleteACLTokensForServiceInstance(apiClient, svc, k8sSvcNamespace, svc.Meta[constants.MetaKeyPodName])
err := r.deleteACLTokensForServiceInstance(apiClient, svc, k8sSvcNamespace, svc.Meta[constants.MetaKeyPodName])
if err != nil {
r.Log.Error(err, "failed to reconcile ACL tokens for service", "svc", svc.Service)
return err
errs = multierror.Append(errs, err)
}
}
}
}

return nil
return errs

}

// deleteACLTokensForServiceInstance finds the ACL tokens that belongs to the service instance and deletes it from Consul.
Expand Down Expand Up @@ -1088,21 +1093,32 @@ func getTokenMetaFromDescription(description string) (map[string]string, error)
return tokenMeta, nil
}

func (r *Controller) serviceInstancesForK8sNodes(apiClient *api.Client, k8sServiceName, k8sServiceNamespace string) ([]*api.CatalogNodeServiceList, error) {
func (r *Controller) serviceInstancesForNodes(apiClient *api.Client, k8sServiceName, k8sServiceNamespace string) ([]*api.CatalogNodeServiceList, error) {
var serviceList []*api.CatalogNodeServiceList
// Get a list of k8s nodes.
var nodeList corev1.NodeList
err := r.Client.List(r.Context, &nodeList)

// The nodelist may have changed between this point and when the event was raised
// For example, if a pod is evicted because a node has been deleted, there is no guarantee that that node will show up here
// query consul catalog for a list of nodes supporting this service
// quite a lot of results as synthetic nodes are never deregistered.
var nodes []*api.Node
filter := fmt.Sprintf(`Meta[%q] == %q `, "synthetic-node", "true")
nodes, _, err := apiClient.Catalog().Nodes(&api.QueryOptions{Filter: filter, Namespace: namespaces.WildcardNamespace})
if err != nil {
return nil, err
}
for _, node := range nodeList.Items {

var errs error
for _, node := range nodes {
var nodeServices *api.CatalogNodeServiceList
nodeServices, err = r.serviceInstancesForK8SServiceNameAndNamespace(apiClient, k8sServiceName, k8sServiceNamespace, common.ConsulNodeNameFromK8sNode(node.Name))
serviceList = append(serviceList, nodeServices)
nodeServices, err := r.serviceInstancesForK8SServiceNameAndNamespace(apiClient, k8sServiceName, k8sServiceNamespace, node.Node)
if err != nil {
errs = multierror.Append(errs, err)
} else {
serviceList = append(serviceList, nodeServices)
}
}

return serviceList, err
return serviceList, errs
}

// serviceInstancesForK8SServiceNameAndNamespace calls Consul's ServicesWithFilter to get the list
Expand Down
Loading

0 comments on commit 414554c

Please sign in to comment.