Skip to content

Commit

Permalink
[WIP] Register mesh-gateways using the endpoints controller.
Browse files Browse the repository at this point in the history
  • Loading branch information
thisisnotashwin committed Sep 1, 2022
1 parent 27e0641 commit 8ad1b3d
Show file tree
Hide file tree
Showing 7 changed files with 342 additions and 224 deletions.
2 changes: 2 additions & 0 deletions charts/consul/templates/connect-inject-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ spec:
{{- else }}
-default-enable-metrics=false \
{{- end }}
-enable-gateway-metrics={{ .Values.global.metrics.enableGatewayMetrics }} \
-default-enable-metrics-merging={{ .Values.connectInject.metrics.defaultEnableMerging }} \
-default-merged-metrics-port={{ .Values.connectInject.metrics.defaultMergedMetricsPort }} \
-default-prometheus-scrape-port={{ .Values.connectInject.metrics.defaultPrometheusScrapePort }} \
Expand Down Expand Up @@ -398,6 +399,7 @@ spec:
{{- if and .Values.global.federation.enabled .Values.global.federation.primaryDatacenter .Values.global.enableConsulNamespaces }}
-acl-auth-method={{ template "consul.fullname" . }}-k8s-component-auth-method-{{ .Values.global.datacenter }} \
-primary-datacenter={{ .Values.global.federation.primaryDatacenter }} \
-enable-federation \
{{- else }}
-acl-auth-method={{ template "consul.fullname" . }}-k8s-component-auth-method \
{{- end }}
Expand Down
266 changes: 60 additions & 206 deletions charts/consul/templates/mesh-gateway-deployment.yaml

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions control-plane/connect-inject/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,31 @@ const (
// be set to a truthy or falsy value, as parseable by strconv.ParseBool.
annotationInject = "consul.hashicorp.com/connect-inject"

// annotationGatewayKind is the key of the annotation that annotates pods
// that represent Consul Connect Gateways. This should be set to a
// value that is either "mesh", "ingress" or "terminating".
annotationGatewayKind = "consul.hashicorp.com/gateway-kind"

// annotationMeshGatewaySource is the key of the annotation that annotates pods
// that represent Consul Connect Gateways. This should be set to a
// truthy or falsy value, as parseable by strconv.ParseBool.
annotationMeshGatewayContainerPort = "consul.hashicorp.com/mesh-gateway-container-port"

// annotationMeshGatewaySource is the key of the annotation that annotates pods
// that represent Consul Connect Gateways. This should be set to a
// truthy or falsy value, as parseable by strconv.ParseBool.
annotationMeshGatewaySource = "consul.hashicorp.com/mesh-gateway-source"

// annotationGatewayKind is the key of the annotation that annotates pods
// that represent Consul Connect Gateways. This should be set to a
// truthy or falsy value, as parseable by strconv.ParseBool.
annotationMeshGatewayWANAddress = "consul.hashicorp.com/mesh-gateway-wan-address"

// annotationGatewayKind is the key of the annotation that annotates pods
// that represent Consul Connect Gateways. This should be set to a
// truthy or falsy value, as parseable by strconv.ParseBool.
annotationMeshGatewayWANPort = "consul.hashicorp.com/mesh-gateway-wan-port"

// annotationInjectMountVolumes is the key of the annotation that controls whether
// the data volume that connect inject uses to store data including the Consul ACL token
// is mounted to other containers in the pod. It is a comma-separated list of container names
Expand Down
172 changes: 172 additions & 0 deletions control-plane/connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
MetaKeyKubeServiceName = "k8s-service-name"
MetaKeyKubeNS = "k8s-namespace"
MetaKeyManagedBy = "managed-by"
MetaKeyConsulWANFederation = "consul-wan-federation"
TokenMetaPodNameKey = "pod"
kubernetesSuccessReasonMsg = "Kubernetes health checks passing"
envoyPrometheusBindAddr = "envoy_prometheus_bind_addr"
Expand Down Expand Up @@ -83,6 +84,9 @@ type EndpointsController struct {
// EnableConsulNamespaces indicates that a user is running Consul Enterprise
// with version 1.7+ which supports namespaces.
EnableConsulNamespaces bool
// EnableWANFederation indicates that a user is running Consul with
// WAN Federation enabled.
EnableWANFederation bool
// ConsulDestinationNamespace is the name of the Consul namespace to create
// all config entries in. If EnableNSMirroring is true this is ignored.
ConsulDestinationNamespace string
Expand Down Expand Up @@ -196,6 +200,13 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
errs = multierror.Append(errs, err)
}
}
if isGateway(pod) {
endpointPods.Add(address.TargetRef.Name)
if err = r.registerGateway(pod, serviceEndpoints, healthStatus, endpointAddressMap); err != nil {
r.Log.Error(err, "failed to register gateway or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}
}
}
}
}
Expand Down Expand Up @@ -261,6 +272,38 @@ func (r *EndpointsController) registerServicesAndHealthCheck(pod corev1.Pod, ser
return nil
}

// registerGateway creates Consul registrations for the Connect Gateways and registers them with Consul.
// It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready.
func (r *EndpointsController) registerGateway(pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error {
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true

var managedByEndpointsController bool
if raw, ok := pod.Labels[keyManagedBy]; ok && raw == managedByValue {
managedByEndpointsController = true
}
// For pods managed by this controller, create and register the service instance.
if managedByEndpointsController {
// Get information from the pod to create service instance registrations.
serviceRegistration, err := r.createGatewayRegistrations(pod, serviceEndpoints, healthStatus)
if err != nil {
r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
return err
}

// Register the service instance with Consul.
r.Log.Info("registering gateway with Consul", "name", serviceRegistration.Service.Service,
"id", serviceRegistration.ID)
_, err = r.ConsulClient.Catalog().Register(serviceRegistration, nil)
if err != nil {
r.Log.Error(err, "failed to register gateway", "name", serviceRegistration.Service.Service)
return err
}
}

return nil
}

// serviceName computes the service name to register with Consul from the pod and endpoints object. In a single port
// service, it defaults to the endpoints name, but can be overridden by a pod annotation. In a multi port service, the
// endpoints name is always used since the pod annotation will have multiple service names listed (one per port).
Expand Down Expand Up @@ -548,6 +591,127 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
return serviceRegistration, proxyServiceRegistration, nil
}

// createGatewayRegistrations creates the gateway service registrations with the information from the Pod.
func (r *EndpointsController) createGatewayRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) (*api.CatalogRegistration, error) {
wanAddr, wanPort, err := r.getWanData(pod, serviceEndpoints)
if err != nil {
return nil, err
}

meta := map[string]string{
MetaKeyPodName: pod.Name,
MetaKeyKubeServiceName: serviceEndpoints.Name,
MetaKeyKubeNS: serviceEndpoints.Namespace,
MetaKeyManagedBy: managedByValue,
}

if r.EnableWANFederation {
meta[MetaKeyConsulWANFederation] = "1"
}

consulNS := r.consulNamespace(pod.Namespace)
port, err := strconv.Atoi(pod.Annotations[annotationMeshGatewayContainerPort])
if err != nil {
return nil, err
}

service := &api.AgentService{
Kind: api.ServiceKindMeshGateway,
ID: pod.Name,
Service: "mesh-gateway",
Port: port,
Address: pod.Status.PodIP,
Meta: meta,
Namespace: consulNS,
TaggedAddresses: map[string]api.ServiceAddress{
"lan": api.ServiceAddress{
Address: pod.Status.PodIP,
Port: port,
},
"wan": api.ServiceAddress{
Address: wanAddr,
Port: wanPort,
},
},
}
if r.MetricsConfig.DefaultEnableMetrics && r.MetricsConfig.EnableGatewayMetrics {
service.Proxy = &api.AgentServiceConnectProxyConfig{
Config: map[string]interface{}{
"envoy_prometheus_bind_addr": fmt.Sprintf("%s:20200", pod.Status.PodIP),
},
}
}
serviceRegistration := &api.CatalogRegistration{
Node: ConsulNodeName,
Address: ConsulNodeAddress,
Service: service,
Check: &api.AgentCheck{
CheckID: consulHealthCheckID(pod.Namespace, pod.Name),
Name: ConsulKubernetesCheckName,
Type: ConsulKubernetesCheckType,
Status: healthStatus,
ServiceID: pod.Name,
Output: getHealthCheckStatusReason(healthStatus, pod.Name, pod.Namespace),
Namespace: consulNS,
},
SkipNodeUpdate: true,
}

return serviceRegistration, nil
}

func (r *EndpointsController) getWanData(pod corev1.Pod, endpoints corev1.Endpoints) (string, int, error) {
var wanAddr string
var wanPort int
source, ok := pod.Annotations[annotationMeshGatewaySource]
if !ok {
return "", 0, fmt.Errorf("failed to read annotation %s", annotationMeshGatewaySource)
}
switch source {
case "NodeName":
wanAddr = pod.Spec.NodeName
case "NodeIP":
wanAddr = pod.Status.HostIP
case "Static":
wanAddr = pod.Annotations[annotationMeshGatewayWANAddress]
case "Service":
svc, err := r.getService(endpoints)
if err != nil {
return "", 0, fmt.Errorf("failed to read service %s in namespace %s", endpoints.Name, endpoints.Namespace)
}
switch svc.Spec.Type {
case corev1.ServiceTypeNodePort:
wanAddr = pod.Status.HostIP
case corev1.ServiceTypeClusterIP:
wanAddr = svc.Spec.ClusterIP
case corev1.ServiceTypeLoadBalancer:
for _, ingr := range svc.Status.LoadBalancer.Ingress {
if ingr.IP != "" {
wanAddr = ingr.IP
break
} else if ingr.Hostname != "" {
wanAddr = ingr.Hostname
break
}
}
}
}

wanPort, err := strconv.Atoi(pod.Annotations[annotationMeshGatewayWANPort])
if err != nil {
return "", 0, fmt.Errorf("failed to parse WAN port from value %s", pod.Annotations[annotationMeshGatewayWANPort])
}
return wanAddr, wanPort, nil
}

func (r *EndpointsController) getService(endpoints corev1.Endpoints) (*corev1.Service, error) {
var svc corev1.Service
if err := r.Client.Get(r.Context, types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, &svc); err != nil {
return nil, err
}
return &svc, nil
}

// portValueFromIntOrString returns the integer port value from the port that can be
// a named port, an integer string (e.g. "80"), or an integer. If the port is a named port,
// this function will attempt to find the value from the containers of the pod.
Expand Down Expand Up @@ -981,6 +1145,14 @@ func hasBeenInjected(pod corev1.Pod) bool {
return false
}

// isGateway checks the value of the gateway annotation and returns true if the Pod represents a Gateway.
func isGateway(pod corev1.Pod) bool {
if anno, ok := pod.Annotations[annotationGatewayKind]; ok && anno != "" {
return true
}
return false
}

// mapAddresses combines all addresses to a mapping of address to its health status.
func mapAddresses(addresses corev1.EndpointSubset) map[corev1.EndpointAddress]string {
m := make(map[corev1.EndpointAddress]string)
Expand Down
1 change: 1 addition & 0 deletions control-plane/connect-inject/metrics_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
// MetricsConfig represents configuration common to connect-inject components related to metrics.
type MetricsConfig struct {
DefaultEnableMetrics bool
EnableGatewayMetrics bool
DefaultEnableMetricsMerging bool
DefaultMergedMetricsPort string
DefaultPrometheusScrapePort string
Expand Down
Loading

0 comments on commit 8ad1b3d

Please sign in to comment.