Skip to content

Commit

Permalink
[WIP] Register mesh-gateways using the endpoints controller.
Browse files Browse the repository at this point in the history
  • Loading branch information
thisisnotashwin committed Sep 14, 2022
1 parent 8fb8b43 commit db72e56
Show file tree
Hide file tree
Showing 10 changed files with 1,515 additions and 299 deletions.
2 changes: 2 additions & 0 deletions charts/consul/templates/connect-inject-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ spec:
{{- else }}
-default-enable-metrics=false \
{{- end }}
-enable-gateway-metrics={{ .Values.global.metrics.enableGatewayMetrics }} \
-default-enable-metrics-merging={{ .Values.connectInject.metrics.defaultEnableMerging }} \
-default-merged-metrics-port={{ .Values.connectInject.metrics.defaultMergedMetricsPort }} \
-default-prometheus-scrape-port={{ .Values.connectInject.metrics.defaultPrometheusScrapePort }} \
Expand Down Expand Up @@ -399,6 +400,7 @@ spec:
{{- if and .Values.global.federation.enabled .Values.global.federation.primaryDatacenter .Values.global.enableConsulNamespaces }}
-acl-auth-method={{ template "consul.fullname" . }}-k8s-component-auth-method-{{ .Values.global.datacenter }} \
-primary-datacenter={{ .Values.global.federation.primaryDatacenter }} \
-enable-federation \
{{- else }}
-acl-auth-method={{ template "consul.fullname" . }}-k8s-component-auth-method \
{{- end }}
Expand Down
266 changes: 60 additions & 206 deletions charts/consul/templates/mesh-gateway-deployment.yaml

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions control-plane/connect-inject/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,31 @@ const (
// be set to a truthy or falsy value, as parseable by strconv.ParseBool.
annotationInject = "consul.hashicorp.com/connect-inject"

// annotationGatewayKind is the key of the annotation that annotates pods
// that represent Consul Connect Gateways. This should be set to a
// value that is either "mesh", "ingress" or "terminating".
annotationGatewayKind = "consul.hashicorp.com/gateway-kind"

// annotationMeshGatewaySource is the key of the annotation that annotates pods
// that represent Consul Connect Gateways. This should be set to a
// truthy or falsy value, as parseable by strconv.ParseBool.
annotationMeshGatewayContainerPort = "consul.hashicorp.com/mesh-gateway-container-port"

// annotationMeshGatewaySource is the key of the annotation that annotates pods
// that represent Consul Connect Gateways. This should be set to a
// truthy or falsy value, as parseable by strconv.ParseBool.
annotationMeshGatewaySource = "consul.hashicorp.com/mesh-gateway-source"

// annotationGatewayKind is the key of the annotation that annotates pods
// that represent Consul Connect Gateways. This should be set to a
// truthy or falsy value, as parseable by strconv.ParseBool.
annotationMeshGatewayWANAddress = "consul.hashicorp.com/mesh-gateway-wan-address"

// annotationGatewayKind is the key of the annotation that annotates pods
// that represent Consul Connect Gateways. This should be set to a
// truthy or falsy value, as parseable by strconv.ParseBool.
annotationMeshGatewayWANPort = "consul.hashicorp.com/mesh-gateway-wan-port"

// annotationInjectMountVolumes is the key of the annotation that controls whether
// the data volume that connect inject uses to store data including the Consul ACL token
// is mounted to other containers in the pod. It is a comma-separated list of container names
Expand Down
169 changes: 169 additions & 0 deletions control-plane/connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
MetaKeyKubeServiceName = "k8s-service-name"
MetaKeyKubeNS = "k8s-namespace"
MetaKeyManagedBy = "managed-by"
MetaKeyConsulWANFederation = "consul-wan-federation"
TokenMetaPodNameKey = "pod"
kubernetesSuccessReasonMsg = "Kubernetes health checks passing"
envoyPrometheusBindAddr = "envoy_prometheus_bind_addr"
Expand Down Expand Up @@ -83,6 +84,9 @@ type EndpointsController struct {
// EnableConsulNamespaces indicates that a user is running Consul Enterprise
// with version 1.7+ which supports namespaces.
EnableConsulNamespaces bool
// EnableWANFederation indicates that a user is running Consul with
// WAN Federation enabled.
EnableWANFederation bool
// ConsulDestinationNamespace is the name of the Consul namespace to create
// all config entries in. If EnableNSMirroring is true this is ignored.
ConsulDestinationNamespace string
Expand Down Expand Up @@ -196,6 +200,13 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
errs = multierror.Append(errs, err)
}
}
if isGateway(pod) {
endpointPods.Add(address.TargetRef.Name)
if err = r.registerGateway(pod, serviceEndpoints, healthStatus, endpointAddressMap); err != nil {
r.Log.Error(err, "failed to register gateway or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}
}
}
}
}
Expand Down Expand Up @@ -261,6 +272,38 @@ func (r *EndpointsController) registerServicesAndHealthCheck(pod corev1.Pod, ser
return nil
}

// registerGateway creates Consul registrations for the Connect Gateways and registers them with Consul.
// It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready.
func (r *EndpointsController) registerGateway(pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error {
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true

var managedByEndpointsController bool
if raw, ok := pod.Labels[keyManagedBy]; ok && raw == managedByValue {
managedByEndpointsController = true
}
// For pods managed by this controller, create and register the service instance.
if managedByEndpointsController {
// Get information from the pod to create service instance registrations.
serviceRegistration, err := r.createGatewayRegistrations(pod, serviceEndpoints, healthStatus)
if err != nil {
r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
return err
}

// Register the service instance with Consul.
r.Log.Info("registering gateway with Consul", "name", serviceRegistration.Service.Service,
"id", serviceRegistration.ID)
_, err = r.ConsulClient.Catalog().Register(serviceRegistration, nil)
if err != nil {
r.Log.Error(err, "failed to register gateway", "name", serviceRegistration.Service.Service)
return err
}
}

return nil
}

// serviceName computes the service name to register with Consul from the pod and endpoints object. In a single port
// service, it defaults to the endpoints name, but can be overridden by a pod annotation. In a multi port service, the
// endpoints name is always used since the pod annotation will have multiple service names listed (one per port).
Expand Down Expand Up @@ -548,6 +591,124 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
return serviceRegistration, proxyServiceRegistration, nil
}

// createGatewayRegistrations creates the gateway service registrations with the information from the Pod.
func (r *EndpointsController) createGatewayRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) (*api.CatalogRegistration, error) {
wanAddr, wanPort, err := r.getWanData(pod, serviceEndpoints)
if err != nil {
return nil, err
}

meta := map[string]string{
MetaKeyPodName: pod.Name,
MetaKeyKubeServiceName: serviceEndpoints.Name,
MetaKeyKubeNS: serviceEndpoints.Namespace,
MetaKeyManagedBy: managedByValue,
}

if r.EnableWANFederation {
meta[MetaKeyConsulWANFederation] = "1"
}

port, err := strconv.Atoi(pod.Annotations[annotationMeshGatewayContainerPort])
if err != nil {
return nil, err
}

service := &api.AgentService{
Kind: api.ServiceKindMeshGateway,
ID: pod.Name,
Service: "mesh-gateway",
Port: port,
Address: pod.Status.PodIP,
Meta: meta,
TaggedAddresses: map[string]api.ServiceAddress{
"lan": {
Address: pod.Status.PodIP,
Port: port,
},
"wan": {
Address: wanAddr,
Port: wanPort,
},
},
}
if r.MetricsConfig.DefaultEnableMetrics && r.MetricsConfig.EnableGatewayMetrics {
service.Proxy = &api.AgentServiceConnectProxyConfig{
Config: map[string]interface{}{
"envoy_prometheus_bind_addr": fmt.Sprintf("%s:20200", pod.Status.PodIP),
},
}
}
serviceRegistration := &api.CatalogRegistration{
Node: ConsulNodeName,
Address: ConsulNodeAddress,
Service: service,
Check: &api.AgentCheck{
CheckID: consulHealthCheckID(pod.Namespace, pod.Name),
Name: ConsulKubernetesCheckName,
Type: ConsulKubernetesCheckType,
Status: healthStatus,
ServiceID: pod.Name,
Output: getHealthCheckStatusReason(healthStatus, pod.Name, pod.Namespace),
},
SkipNodeUpdate: true,
}

return serviceRegistration, nil
}

func (r *EndpointsController) getWanData(pod corev1.Pod, endpoints corev1.Endpoints) (string, int, error) {
var wanAddr string
var wanPort int
source, ok := pod.Annotations[annotationMeshGatewaySource]
if !ok {
return "", 0, fmt.Errorf("failed to read annotation %s", annotationMeshGatewaySource)
}
switch source {
case "NodeName":
wanAddr = pod.Spec.NodeName
case "NodeIP":
wanAddr = pod.Status.HostIP
case "Static":
wanAddr = pod.Annotations[annotationMeshGatewayWANAddress]
case "Service":
svc, err := r.getService(endpoints)
if err != nil {
return "", 0, fmt.Errorf("failed to read service %s in namespace %s", endpoints.Name, endpoints.Namespace)
}
switch svc.Spec.Type {
case corev1.ServiceTypeNodePort:
wanAddr = pod.Status.HostIP
case corev1.ServiceTypeClusterIP:
wanAddr = svc.Spec.ClusterIP
case corev1.ServiceTypeLoadBalancer:
for _, ingr := range svc.Status.LoadBalancer.Ingress {
if ingr.IP != "" {
wanAddr = ingr.IP
break
} else if ingr.Hostname != "" {
wanAddr = ingr.Hostname
break
}
}
}
}

wanPort, err := strconv.Atoi(pod.Annotations[annotationMeshGatewayWANPort])
if err != nil {
return "", 0, fmt.Errorf("failed to parse WAN port from value %s", pod.Annotations[annotationMeshGatewayWANPort])
}
return wanAddr, wanPort, nil
}

func (r *EndpointsController) getService(endpoints corev1.Endpoints) (*corev1.Service, error) {
var svc corev1.Service
if err := r.Client.Get(r.Context, types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, &svc); err != nil {
return nil, err
}
return &svc, nil
}

// portValueFromIntOrString returns the integer port value from the port that can be
// a named port, an integer string (e.g. "80"), or an integer. If the port is a named port,
// this function will attempt to find the value from the containers of the pod.
Expand Down Expand Up @@ -981,6 +1142,14 @@ func hasBeenInjected(pod corev1.Pod) bool {
return false
}

// isGateway checks the value of the gateway annotation and returns true if the Pod represents a Gateway.
func isGateway(pod corev1.Pod) bool {
if anno, ok := pod.Annotations[annotationGatewayKind]; ok && anno != "" {
return true
}
return false
}

// mapAddresses combines all addresses to a mapping of address to its health status.
func mapAddresses(addresses corev1.EndpointSubset) map[corev1.EndpointAddress]string {
m := make(map[corev1.EndpointAddress]string)
Expand Down
82 changes: 80 additions & 2 deletions control-plane/connect-inject/endpoints_controller_ent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,13 @@ func TestReconcileCreateEndpointWithNamespaces(t *testing.T) {
k8sObjects: func() []runtime.Object {
pod1 := createPodWithNamespace("pod1", testCase.SourceKubeNS, "1.2.3.4", true, true)
pod2 := createPodWithNamespace("pod2", testCase.SourceKubeNS, "2.2.3.4", true, true)
endpointWithTwoAddresses := &corev1.Endpoints{
meshGateway := createGatewayWithNamespace("mesh-gateway", testCase.SourceKubeNS, "3.3.3.3", map[string]string{
annotationMeshGatewaySource: "Static",
annotationMeshGatewayWANAddress: "2.3.4.5",
annotationMeshGatewayWANPort: "443",
annotationMeshGatewayContainerPort: "8443",
annotationGatewayKind: "mesh"})
endpointWithAddresses := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "service-created",
Namespace: testCase.SourceKubeNS,
Expand All @@ -111,11 +117,19 @@ func TestReconcileCreateEndpointWithNamespaces(t *testing.T) {
Namespace: testCase.SourceKubeNS,
},
},
{
IP: "3.3.3.3",
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Name: "mesh-gateway",
Namespace: testCase.SourceKubeNS,
},
},
},
},
},
}
return []runtime.Object{pod1, pod2, endpointWithTwoAddresses}
return []runtime.Object{pod1, pod2, meshGateway, endpointWithAddresses}
},
expectedConsulSvcInstances: []*api.CatalogService{
{
Expand All @@ -134,6 +148,25 @@ func TestReconcileCreateEndpointWithNamespaces(t *testing.T) {
ServiceTags: []string{},
Namespace: testCase.ExpConsulNS,
},
{
ServiceID: "mesh-gateway",
ServiceName: "mesh-gateway",
ServiceAddress: "3.3.3.3",
ServiceMeta: map[string]string{MetaKeyPodName: "mesh-gateway", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: testCase.SourceKubeNS, MetaKeyManagedBy: managedByValue},
ServiceTags: []string{},
ServicePort: 8443,
ServiceTaggedAddresses: map[string]api.ServiceAddress{
"lan": {
Address: "3.3.3.3",
Port: 8443,
},
"wan": {
Address: "2.3.4.5",
Port: 443,
},
},
Namespace: "default",
},
},
expectedProxySvcInstances: []*api.CatalogService{
{
Expand Down Expand Up @@ -204,6 +237,16 @@ func TestReconcileCreateEndpointWithNamespaces(t *testing.T) {
Type: ConsulKubernetesCheckType,
Namespace: testCase.ExpConsulNS,
},
{
CheckID: fmt.Sprintf("%s/mesh-gateway", testCase.SourceKubeNS),
ServiceName: "mesh-gateway",
ServiceID: "mesh-gateway",
Name: ConsulKubernetesCheckName,
Status: api.HealthPassing,
Output: kubernetesSuccessReasonMsg,
Type: ConsulKubernetesCheckType,
Namespace: "default",
},
},
}
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -259,6 +302,9 @@ func TestReconcileCreateEndpointWithNamespaces(t *testing.T) {
// After reconciliation, Consul should have the service with the correct number of instances.
serviceInstances, _, err := consulClient.Catalog().Service(setup.consulSvcName, "", &api.QueryOptions{Namespace: testCase.ExpConsulNS})
require.NoError(t, err)
service, _, err := consulClient.Catalog().Service("mesh-gateway", "", &api.QueryOptions{Namespace: "default"})
require.NoError(t, err)
serviceInstances = append(serviceInstances, service...)
require.Len(t, serviceInstances, len(setup.expectedConsulSvcInstances))
for i, instance := range serviceInstances {
require.Equal(t, setup.expectedConsulSvcInstances[i].ServiceID, instance.ServiceID)
Expand All @@ -267,6 +313,7 @@ func TestReconcileCreateEndpointWithNamespaces(t *testing.T) {
require.Equal(t, setup.expectedConsulSvcInstances[i].ServicePort, instance.ServicePort)
require.Equal(t, setup.expectedConsulSvcInstances[i].ServiceMeta, instance.ServiceMeta)
require.Equal(t, setup.expectedConsulSvcInstances[i].ServiceTags, instance.ServiceTags)
require.Equal(t, setup.expectedConsulSvcInstances[i].ServiceTaggedAddresses, instance.ServiceTaggedAddresses)
}
proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", setup.consulSvcName), "", &api.QueryOptions{
Namespace: testCase.ExpConsulNS,
Expand All @@ -285,8 +332,13 @@ func TestReconcileCreateEndpointWithNamespaces(t *testing.T) {

// Check that the Consul health checks was created for the k8s pod.
for _, expectedCheck := range setup.expectedHealthChecks {
var checks api.HealthChecks
filter := fmt.Sprintf("CheckID == `%s`", expectedCheck.CheckID)
checks, _, err := consulClient.Health().Checks(expectedCheck.ServiceName, &api.QueryOptions{Filter: filter})
if expectedCheck.ServiceName == "mesh-gateway" {
checks, _, err = consulClient.Health().Checks("mesh-gateway", &api.QueryOptions{Namespace: "default"})
require.NoError(t, err)
}
require.NoError(t, err)
require.Equal(t, len(checks), 1)
var ignoredFields = []string{"Node", "Definition", "Partition", "CreateIndex", "ModifyIndex", "ServiceTags"}
Expand Down Expand Up @@ -1625,3 +1677,29 @@ func createPodWithNamespace(name, namespace, ip string, inject bool, managedByEn
return pod

}

func createGatewayWithNamespace(name, namespace, ip string, annotations map[string]string) *corev1.Pod {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
keyManagedBy: managedByValue,
},
Annotations: annotations,
},
Status: corev1.PodStatus{
PodIP: ip,
HostIP: "127.0.0.1",
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
},
},
}
return pod

}
Loading

0 comments on commit db72e56

Please sign in to comment.