Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add consul.hashicorp.com/proxy-config-map annotation. #3347

Merged
merged 1 commit into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/3347.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
control-plane: Add new `consul.hashicorp.com/proxy-config-map` annotation that allows for setting values in the opaque config map for proxy service registrations.
```
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ const (
// connections to.
AnnotationPort = "consul.hashicorp.com/connect-service-port"

// AnnotationProxyConfigMap allows for default values to be set in the opaque config map
// during proxy registration. The value for this annotation is expected to be valid json.
// Other annotations / configuration may overwrite the values in the map.
AnnotationProxyConfigMap = "consul.hashicorp.com/proxy-config-map"

// AnnotationUpstreams is a list of upstreams to register with the
// proxy in the format of `<service-name>:<local-port>,...`. The
// service name should map to a Consul service name and the local port
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,18 @@ func proxyServiceID(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string {
return fmt.Sprintf("%s-%s", pod.Name, proxySvcName)
}

func annotationProxyConfigMap(pod corev1.Pod) (map[string]any, error) {
parsed := make(map[string]any)
if config, ok := pod.Annotations[constants.AnnotationProxyConfigMap]; ok && config != "" {
err := json.Unmarshal([]byte(config), &parsed)
if err != nil {
// Always return an empty map on error
return make(map[string]any), fmt.Errorf("unable to parse `%v` annotation for pod `%v`: %w", constants.AnnotationProxyConfigMap, pod.Name, err)
}
}
return parsed, nil
}

// createServiceRegistrations creates the service and proxy service instance registrations with the information from the
// Pod.
func (r *Controller) createServiceRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) (*api.CatalogRegistration, *api.CatalogRegistration, error) {
Expand Down Expand Up @@ -485,10 +497,16 @@ func (r *Controller) createServiceRegistrations(pod corev1.Pod, serviceEndpoints

proxySvcName := proxyServiceName(pod, serviceEndpoints)
proxySvcID := proxyServiceID(pod, serviceEndpoints)

// Set the default values from the annotation, if possible.
baseConfig, err := annotationProxyConfigMap(pod)
if err != nil {
r.Log.Error(err, "annotation unable to be applied")
}
proxyConfig := &api.AgentServiceConnectProxyConfig{
DestinationServiceName: svcName,
DestinationServiceID: svcID,
Config: make(map[string]interface{}),
Config: baseConfig,
}

// If metrics are enabled, the proxyConfig should set envoy_prometheus_bind_addr to a listener on 0.0.0.0 on
Expand Down Expand Up @@ -692,12 +710,18 @@ func (r *Controller) createGatewayRegistrations(pod corev1.Pod, serviceEndpoints
constants.MetaKeyPodUID: string(pod.UID),
}

// Set the default values from the annotation, if possible.
baseConfig, err := annotationProxyConfigMap(pod)
if err != nil {
r.Log.Error(err, "annotation unable to be applied")
}

service := &api.AgentService{
ID: pod.Name,
Address: pod.Status.PodIP,
Meta: meta,
Proxy: &api.AgentServiceConnectProxyConfig{
Config: map[string]interface{}{},
Config: baseConfig,
},
}

Expand Down Expand Up @@ -771,14 +795,10 @@ func (r *Controller) createGatewayRegistrations(pod corev1.Pod, serviceEndpoints
Port: wanPort,
},
}
service.Proxy = &api.AgentServiceConnectProxyConfig{
Config: map[string]interface{}{
"envoy_gateway_no_default_bind": true,
"envoy_gateway_bind_addresses": map[string]interface{}{
"all-interfaces": map[string]interface{}{
"address": "0.0.0.0",
},
},
service.Proxy.Config["envoy_gateway_no_default_bind"] = true
service.Proxy.Config["envoy_gateway_bind_addresses"] = map[string]interface{}{
"all-interfaces": map[string]interface{}{
"address": "0.0.0.0",
},
}

Expand All @@ -787,15 +807,7 @@ func (r *Controller) createGatewayRegistrations(pod corev1.Pod, serviceEndpoints
}

if r.MetricsConfig.DefaultEnableMetrics && r.MetricsConfig.EnableGatewayMetrics {
if pod.Annotations[constants.AnnotationGatewayKind] == ingressGateway {
service.Proxy.Config["envoy_prometheus_bind_addr"] = fmt.Sprintf("%s:20200", pod.Status.PodIP)
} else {
service.Proxy = &api.AgentServiceConnectProxyConfig{
Config: map[string]interface{}{
"envoy_prometheus_bind_addr": fmt.Sprintf("%s:20200", pod.Status.PodIP),
},
}
}
service.Proxy.Config["envoy_prometheus_bind_addr"] = fmt.Sprintf("%s:20200", pod.Status.PodIP)
}

if r.EnableTelemetryCollector && service.Proxy != nil && service.Proxy.Config != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -986,6 +987,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
},
k8sObjects: func() []runtime.Object {
pod1 := createServicePod("pod1", "1.2.3.4", true, true)
pod1.Annotations[constants.AnnotationProxyConfigMap] = `{ "xds_fetch_timeout_ms": 9999 }`
endpoint := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "service-created",
Expand Down Expand Up @@ -1034,7 +1036,10 @@ func TestReconcileCreateEndpoint(t *testing.T) {
DestinationServiceID: "pod1-service-created",
LocalServiceAddress: "",
LocalServicePort: 0,
Config: map[string]any{"envoy_telemetry_collector_bind_socket_dir": string("/consul/connect-inject")},
Config: map[string]any{
"envoy_telemetry_collector_bind_socket_dir": string("/consul/connect-inject"),
"xds_fetch_timeout_ms": float64(9999),
},
},
ServiceMeta: map[string]string{constants.MetaKeyPodName: "pod1", metaKeyKubeServiceName: "service-created", constants.MetaKeyKubeNS: "default", metaKeyManagedBy: constants.ManagedByValue, metaKeySyntheticNode: "true", constants.MetaKeyPodUID: ""},
ServiceTags: []string{},
Expand Down Expand Up @@ -1079,6 +1084,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
constants.AnnotationGatewayWANAddress: "2.3.4.5",
constants.AnnotationGatewayWANPort: "443",
constants.AnnotationMeshGatewayContainerPort: "8443",
constants.AnnotationProxyConfigMap: `{ "xds_fetch_timeout_ms": 9999 }`,
constants.AnnotationGatewayKind: meshGateway})
endpoint := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -1121,7 +1127,10 @@ func TestReconcileCreateEndpoint(t *testing.T) {
},
},
ServiceProxy: &api.AgentServiceConnectProxyConfig{
Config: map[string]any{"envoy_telemetry_collector_bind_socket_dir": string("/consul/service")},
Config: map[string]any{
"envoy_telemetry_collector_bind_socket_dir": string("/consul/service"),
"xds_fetch_timeout_ms": float64(9999),
},
},
NodeMeta: map[string]string{
"synthetic-node": "true",
Expand Down Expand Up @@ -1295,6 +1304,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
gateway := createGatewayPod("terminating-gateway", "1.2.3.4", map[string]string{
constants.AnnotationGatewayKind: terminatingGateway,
constants.AnnotationGatewayConsulServiceName: "terminating-gateway",
constants.AnnotationProxyConfigMap: `{ "xds_fetch_timeout_ms": 9999 }`,
})
endpoint := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -1334,7 +1344,10 @@ func TestReconcileCreateEndpoint(t *testing.T) {
},
ServiceTags: []string{},
ServiceProxy: &api.AgentServiceConnectProxyConfig{
Config: map[string]any{"envoy_telemetry_collector_bind_socket_dir": string("/consul/service")},
Config: map[string]any{
"envoy_telemetry_collector_bind_socket_dir": string("/consul/service"),
"xds_fetch_timeout_ms": float64(9999),
},
},
},
},
Expand Down Expand Up @@ -1427,6 +1440,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
constants.AnnotationGatewayKind: ingressGateway,
constants.AnnotationGatewayWANSource: "Service",
constants.AnnotationGatewayWANPort: "8443",
constants.AnnotationProxyConfigMap: `{ "xds_fetch_timeout_ms": 9999 }`,
})
endpoint := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -1502,6 +1516,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
},
},
"envoy_telemetry_collector_bind_socket_dir": "/consul/service",
"xds_fetch_timeout_ms": float64(9999),
},
},
},
Expand Down Expand Up @@ -3558,8 +3573,11 @@ func TestReconcileUpdateEndpoint(t *testing.T) {

// Register service and proxy in consul.
for _, svc := range tt.initialConsulSvcs {
_, err := consulClient.Catalog().Register(svc, nil)
require.NoError(t, err)
// Retry because ACLs may not have been initialized yet.
retry.Run(t, func(r *retry.R) {
_, err := consulClient.Catalog().Register(svc, nil)
require.NoError(r, err)
})

// Create a token for this service if ACLs are enabled.
if tt.enableACLs {
Expand Down Expand Up @@ -4207,6 +4225,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) {

// Register service and proxy in consul
var token *api.ACLToken
var err error
for _, svc := range tt.initialConsulSvcs {
serviceRegistration := &api.CatalogRegistration{
Node: consulNodeName,
Expand All @@ -4216,8 +4235,11 @@ func TestReconcileDeleteEndpoint(t *testing.T) {
},
Service: svc,
}
_, err := consulClient.Catalog().Register(serviceRegistration, nil)
require.NoError(t, err)
// Retry because the ACLs may not have been fully initialized yet.
retry.Run(t, func(r *retry.R) {
_, err = consulClient.Catalog().Register(serviceRegistration, nil)
require.NoError(r, err)
})

// Create a token for it if ACLs are enabled.
if tt.enableACLs {
Expand Down
Loading