Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Webhook refactor #454

Merged
merged 3 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion connect-inject/consul_sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
corev1 "k8s.io/api/core/v1"
)

func (h *Handler) consulSidecar(pod *corev1.Pod) (corev1.Container, error) {
func (h *Handler) consulSidecar(pod corev1.Pod) (corev1.Container, error) {
command := []string{
"consul-k8s",
"consul-sidecar",
Expand Down
10 changes: 5 additions & 5 deletions connect-inject/consul_sidecar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestConsulSidecar_Default(t *testing.T) {
ImageConsulK8S: "hashicorp/consul-k8s:9.9.9",
ConsulSidecarResources: consulSidecarResources,
}
container, err := handler.consulSidecar(&corev1.Pod{
container, err := handler.consulSidecar(corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestConsulSidecar_AuthMethod(t *testing.T) {
AuthMethod: authMethod,
ImageConsulK8S: "hashicorp/consul-k8s:9.9.9",
}
container, err := handler.consulSidecar(&corev1.Pod{
container, err := handler.consulSidecar(corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestConsulSidecar_SyncPeriodAnnotation(t *testing.T) {
Log: hclog.Default().Named("handler"),
ImageConsulK8S: "hashicorp/consul-k8s:9.9.9",
}
container, err := handler.consulSidecar(&corev1.Pod{
container, err := handler.consulSidecar(corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"consul.hashicorp.com/connect-sync-period": "55s",
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestConsulSidecar_TLS(t *testing.T) {
ConsulCACert: "consul-ca-cert",
ConsulSidecarResources: consulSidecarResources,
}
container, err := handler.consulSidecar(&corev1.Pod{
container, err := handler.consulSidecar(corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestConsulSidecar_MetricsFlags(t *testing.T) {
DefaultEnableMetrics: true,
DefaultEnableMetricsMerging: true,
}
container, err := handler.consulSidecar(&corev1.Pod{
container, err := handler.consulSidecar(corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
annotationMergedMetricsPort: "20100",
Expand Down
2 changes: 1 addition & 1 deletion connect-inject/container_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
corev1 "k8s.io/api/core/v1"
)

func (h *Handler) containerEnvVars(pod *corev1.Pod) []corev1.EnvVar {
func (h *Handler) containerEnvVars(pod corev1.Pod) []corev1.EnvVar {
raw, ok := pod.Annotations[annotationUpstreams]
if !ok || raw == "" {
return []corev1.EnvVar{}
Expand Down
2 changes: 1 addition & 1 deletion connect-inject/container_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestContainerEnvVars(t *testing.T) {
require := require.New(t)

var h Handler
envVars := h.containerEnvVars(&corev1.Pod{
envVars := h.containerEnvVars(corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
annotationService: "foo",
Expand Down
2 changes: 1 addition & 1 deletion connect-inject/container_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (h *Handler) containerInitCopyContainer() corev1.Container {

// containerInit returns the init container spec for registering the Consul
// service, setting up the Envoy bootstrap, etc.
func (h *Handler) containerInit(pod *corev1.Pod, k8sNamespace string) (corev1.Container, error) {
func (h *Handler) containerInit(pod corev1.Pod, k8sNamespace string) (corev1.Container, error) {
data := initContainerCommandData{
ServiceName: pod.Annotations[annotationService],
ProxyServiceName: fmt.Sprintf("%s-sidecar-proxy", pod.Annotations[annotationService]),
Expand Down
16 changes: 8 additions & 8 deletions connect-inject/container_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ services {
h := Handler{
ConsulClient: consulClient,
}
container, err := h.containerInit(tt.Pod(minimal()), k8sNamespace)
container, err := h.containerInit(*tt.Pod(minimal()), k8sNamespace)
require.NoError(err)
actual := strings.Join(container.Command, " ")
require.Contains(actual, tt.Cmd)
Expand Down Expand Up @@ -1103,7 +1103,7 @@ EOF
require.True(written)
h.ConsulClient = consulClient

container, err := h.containerInit(tt.Pod(minimal()), k8sNamespace)
container, err := h.containerInit(*tt.Pod(minimal()), k8sNamespace)
require.NoError(err)
actual := strings.Join(container.Command, " ")
require.Contains(actual, tt.Cmd)
Expand Down Expand Up @@ -1142,7 +1142,7 @@ func TestHandlerContainerInit_authMethod(t *testing.T) {
ServiceAccountName: "foo",
},
}
container, err := h.containerInit(pod, k8sNamespace)
container, err := h.containerInit(*pod, k8sNamespace)
require.NoError(err)
actual := strings.Join(container.Command, " ")
require.Contains(actual, `
Expand Down Expand Up @@ -1183,7 +1183,7 @@ func TestHandlerContainerInit_WithTLS(t *testing.T) {
},
},
}
container, err := h.containerInit(pod, k8sNamespace)
container, err := h.containerInit(*pod, k8sNamespace)
require.NoError(err)
actual := strings.Join(container.Command, " ")
require.Contains(actual, `
Expand Down Expand Up @@ -1227,7 +1227,7 @@ func TestHandlerContainerInit_Resources(t *testing.T) {
},
},
}
container, err := h.containerInit(pod, k8sNamespace)
container, err := h.containerInit(*pod, k8sNamespace)
require.NoError(err)
require.Equal(corev1.ResourceRequirements{
Limits: corev1.ResourceList{
Expand Down Expand Up @@ -1262,7 +1262,7 @@ func TestHandlerContainerInit_MismatchedServiceNameServiceAccountNameWithACLsEna
},
}

_, err := h.containerInit(pod, k8sNamespace)
_, err := h.containerInit(*pod, k8sNamespace)
require.EqualError(err, `serviceAccountName "notServiceName" does not match service name "foo"`)
}

Expand All @@ -1285,7 +1285,7 @@ func TestHandlerContainerInit_MismatchedServiceNameServiceAccountNameWithACLsDis
},
}

_, err := h.containerInit(pod, k8sNamespace)
_, err := h.containerInit(*pod, k8sNamespace)
require.NoError(err)
}

Expand Down Expand Up @@ -1410,7 +1410,7 @@ func TestHandlerContainerInit_MeshGatewayModeErrors(t *testing.T) {
},
},
}
_, err = h.containerInit(pod, k8sNamespace)
_, err = h.containerInit(*pod, k8sNamespace)
if c.ExpError == "" {
require.NoError(err)
} else {
Expand Down
106 changes: 100 additions & 6 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@ import (
"fmt"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

mapset "github.com/deckarep/golang-set"
"github.com/go-logr/logr"
"github.com/hashicorp/consul-k8s/consul"
"github.com/hashicorp/consul/api"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

type EndpointsController struct {
Expand Down Expand Up @@ -141,7 +145,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
// and register that port for the host service.
var servicePort int
if raw, ok := pod.Annotations[annotationPort]; ok && raw != "" {
if port, err := portValue(&pod, raw); port > 0 {
if port, err := portValue(pod, raw); port > 0 {
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -206,7 +210,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
proxyConfig.LocalServicePort = servicePort
}

upstreams, err := r.processUpstreams(&pod)
upstreams, err := r.processUpstreams(pod)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -311,7 +315,7 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(k8sSvcName, k8sSvcNam

// processUpstreams reads the list of upstreams from the Pod annotation and converts them into a list of api.Upstream
// objects.
func (r *EndpointsController) processUpstreams(pod *corev1.Pod) ([]api.Upstream, error) {
func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream, error) {
var upstreams []api.Upstream
if raw, ok := pod.Annotations[annotationUpstreams]; ok && raw != "" {
for _, raw := range strings.Split(raw, ",") {
Expand Down Expand Up @@ -382,7 +386,11 @@ func (r *EndpointsController) Logger(name types.NamespacedName) logr.Logger {
func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Endpoints{}).
Complete(r)
Watches(
&source.Kind{Type: &corev1.Pod{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.requestsForRunningAgentPods)},
builder.WithPredicates(predicate.NewPredicateFuncs(r.filterAgentPods)),
).Complete(r)
}

// getConsulClient returns an *api.Client that points at the consul agent local to the pod.
Expand Down Expand Up @@ -416,6 +424,92 @@ func shouldIgnore(namespace string, denySet, allowSet mapset.Set) bool {
return false
}

// filterAgentPods receives meta and object information for Kubernetes resources that are being watched,
// which in this case are Pods. It only returns true if the Pod is a Consul Client Agent Pod. It reads the labels
// from the meta of the resource and uses the values of the "app" and "component" label to validate that
// the Pod is a Consul Client Agent.
func (r EndpointsController) filterAgentPods(meta metav1.Object, object runtime.Object) bool {
podLabels := meta.GetLabels()
app, ok := podLabels["app"]
if !ok {
return false
}
component, ok := podLabels["component"]
if !ok {
return false
}

release, ok := podLabels["release"]
if !ok {
return false
}

if app == "consul" && component == "client" && release == r.ReleaseName {
return true
}
return false
}

// requestsForRunningAgentPods creates a slice of requests for the endpoints controller.
// It enqueues a request for each endpoint that needs to be reconciled. It iterates through
// the list of endpoints and creates a request for those endpoints that have an address that
// are on the same node as the new Consul Agent pod. It receives a Pod Object which is a
// Consul Agent that has been filtered by filterAgentPods and only enqueues endpoints
// for client agent pods where the Ready condition is true.
func (r EndpointsController) requestsForRunningAgentPods(object handler.MapObject) []ctrl.Request {
var consulClientPod corev1.Pod
r.Log.Info("received update for consulClientPod", "podName", object.Meta.GetName())
err := r.Client.Get(r.Ctx, types.NamespacedName{Name: object.Meta.GetName(), Namespace: object.Meta.GetNamespace()}, &consulClientPod)
if k8serrors.IsNotFound(err) {
// Ignore if consulClientPod is not found.
return []ctrl.Request{}
}
if err != nil {
r.Log.Error(err, "failed to get consulClientPod", "consulClientPod", consulClientPod.Name)
return []ctrl.Request{}
}
// We can ignore the agent pod if it's not running, since
// we can't reconcile and register/deregister services against that agent.
if consulClientPod.Status.Phase != corev1.PodRunning {
r.Log.Info("ignoring consulClientPod because it's not running", "consulClientPod", consulClientPod.Name)
return []ctrl.Request{}
}
// We can ignore the agent pod if it's not yet ready, since
// we can't reconcile and register/deregister services against that agent.
for _, cond := range consulClientPod.Status.Conditions {
if cond.Type == corev1.PodReady && cond.Status != corev1.ConditionTrue {
// Ignore if consulClientPod is not ready.
r.Log.Info("ignoring consulClientPod because it's not ready", "consulClientPod", consulClientPod.Name)
return []ctrl.Request{}
}
}

// Get the list of all endpoints.
var endpointsList corev1.EndpointsList
err = r.Client.List(r.Ctx, &endpointsList)
if err != nil {
r.Log.Error(err, "failed to list endpoints")
return []ctrl.Request{}
}

// Enqueue requests for endpoints that are on the same node
// as the client agent.
var requests []reconcile.Request
for _, ep := range endpointsList.Items {
for _, subset := range ep.Subsets {
allAddresses := subset.Addresses
allAddresses = append(allAddresses, subset.NotReadyAddresses...)
for _, address := range allAddresses {
// Only add requests for the address that is on the same node as the consul client pod.
if address.NodeName != nil && *address.NodeName == consulClientPod.Spec.NodeName {
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Name: ep.Name, Namespace: ep.Namespace}})
}
}
}
}
return requests
}

// hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected.
func hasBeenInjected(pod corev1.Pod) bool {
if anno, ok := pod.Annotations[annotationStatus]; ok {
Expand Down
Loading