Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NET-10247] Add Partition and Namespace blocks to external registration acl role/policy #4153

Merged
merged 4 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/4153.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
terminating-gateway: Fix generated acl policy for external services to include the namespace and partition block if they are enabled.
```
1 change: 0 additions & 1 deletion charts/consul/templates/crd-registrations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ spec:
- warning
type: object
required:
- address
- name
- port
type: object
Expand Down
3 changes: 3 additions & 0 deletions control-plane/api-gateway/binding/cleanup.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package binding

import (
Expand Down
3 changes: 3 additions & 0 deletions control-plane/api-gateway/binding/cleanup_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package binding

import (
Expand Down
3 changes: 3 additions & 0 deletions control-plane/api-gateway/gatekeeper/ownership.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package gatekeeper

import (
Expand Down
3 changes: 3 additions & 0 deletions control-plane/api-gateway/gatekeeper/secret.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package gatekeeper

import (
Expand Down
2 changes: 1 addition & 1 deletion control-plane/api/v1alpha1/registration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type Service struct {
Tags []string `json:"tags,omitempty"`
Meta map[string]string `json:"meta,omitempty"`
Port int `json:"port"`
Address string `json:"address"`
Address string `json:"address,omitempty"`
SocketPath string `json:"socketPath,omitempty"`
TaggedAddresses map[string]ServiceAddress `json:"taggedAddresses,omitempty"`
Weights Weights `json:"weights,omitempty"`
Expand Down
196 changes: 147 additions & 49 deletions control-plane/catalog/registration/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,94 @@
package registration

import (
"bytes"
"context"
"errors"
"fmt"
"slices"
"strings"
"sync"
"text/template"

mapset "github.com/deckarep/golang-set/v2"
"github.com/go-logr/logr"
"github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1"
"github.com/hashicorp/consul-k8s/control-plane/consul"
capi "github.com/hashicorp/consul/api"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const NotInServiceMeshFilter = "ServiceMeta[\"managed-by\"] != \"consul-k8s-endpoints-controller\""

func init() {
gatewayTpl = template.Must(template.New("root").Parse(strings.TrimSpace(gatewayRulesTpl)))
}

type templateArgs struct {
EnablePartitions bool
Partition string
EnableNamespaces bool
Namespace string
ServiceName string
}

var (
gatewayTpl *template.Template
gatewayRulesTpl = `
{{ if .EnablePartitions }}
partition "{{.Partition}}" {
{{- end }}
{{- if .EnableNamespaces }}
namespace "{{.Namespace}}" {
{{- end }}
service "{{.ServiceName}}" {
policy = "write"
}
{{- if .EnableNamespaces }}
}
{{- end }}
{{- if .EnablePartitions }}
}
{{- end }}
`
)

type RegistrationCache struct {
// we include the context here so that we can use it for cancellation of `run` invocations that are scheduled after the cache is started
// this occurs when registering services in a new namespace as we have an invocation of `run` per namespace that is registered
ctx context.Context

ConsulClientConfig *consul.Config
ConsulServerConnMgr consul.ServerConnectionManager
serviceMtx *sync.Mutex
Services map[string]*v1alpha1.Registration
synced chan struct{}
UpdateChan chan string
k8sClient client.Client

serviceMtx *sync.Mutex
Services map[string]*v1alpha1.Registration

namespaces mapset.Set[string]

synced chan struct{}
UpdateChan chan string

namespacesEnabled bool
partitionsEnabled bool
}

func NewRegistrationCache(consulClientConfig *consul.Config, consulServerConnMgr consul.ServerConnectionManager) *RegistrationCache {
func NewRegistrationCache(ctx context.Context, consulClientConfig *consul.Config, consulServerConnMgr consul.ServerConnectionManager, k8sClient client.Client, namespacesEnabled, partitionsEnabled bool) *RegistrationCache {
return &RegistrationCache{
ctx: ctx,
ConsulClientConfig: consulClientConfig,
ConsulServerConnMgr: consulServerConnMgr,
k8sClient: k8sClient,
serviceMtx: &sync.Mutex{},
Services: make(map[string]*v1alpha1.Registration),
UpdateChan: make(chan string),
synced: make(chan struct{}),
namespaces: mapset.NewSet[string](),
namespacesEnabled: namespacesEnabled,
partitionsEnabled: partitionsEnabled,
}
}

Expand All @@ -50,13 +105,13 @@ func (c *RegistrationCache) waitSynced(ctx context.Context) {
}
}

func (c *RegistrationCache) run(ctx context.Context, log logr.Logger) {
func (c *RegistrationCache) run(log logr.Logger, namespace string) {
once := &sync.Once{}
opts := &capi.QueryOptions{Filter: NotInServiceMeshFilter}
opts := &capi.QueryOptions{Filter: NotInServiceMeshFilter, Namespace: namespace}

for {
select {
case <-ctx.Done():
case <-c.ctx.Done():
return
default:

Expand All @@ -65,7 +120,7 @@ func (c *RegistrationCache) run(ctx context.Context, log logr.Logger) {
log.Error(err, "error initializing consul client")
continue
}
entries, meta, err := client.Catalog().Services(opts.WithContext(ctx))
entries, meta, err := client.Catalog().Services(opts.WithContext(c.ctx))
if err != nil {
// if we timeout we don't care about the error message because it's expected to happen on long polls
// any other error we want to alert on
Expand All @@ -77,20 +132,40 @@ func (c *RegistrationCache) run(ctx context.Context, log logr.Logger) {
continue
}

diffs := mapset.NewSet[string]()
servicesToRemove := mapset.NewSet[string]()
servicesToAdd := mapset.NewSet[string]()
c.serviceMtx.Lock()
for svc := range c.Services {
if _, ok := entries[svc]; !ok {
diffs.Add(svc)
servicesToRemove.Add(svc)
}
}

for svc := range entries {
if _, ok := c.Services[svc]; !ok {
servicesToAdd.Add(svc)
}
}
c.serviceMtx.Unlock()

for _, svc := range diffs.ToSlice() {
for _, svc := range servicesToRemove.ToSlice() {
log.Info("consul deregistered service", "svcName", svc)
c.UpdateChan <- svc
}

for _, svc := range servicesToAdd.ToSlice() {
registration := &v1alpha1.Registration{}

if err := c.k8sClient.Get(c.ctx, types.NamespacedName{Name: svc, Namespace: namespace}, registration); err != nil {
if !k8serrors.IsNotFound(err) {
log.Error(err, "unable to get registration", "svcName", svc, "namespace", namespace)
}
continue
}

c.Services[svc] = registration
}

opts.WaitIndex = meta.LastIndex
once.Do(func() {
log.Info("Initial sync complete")
Expand Down Expand Up @@ -122,21 +197,26 @@ func (c *RegistrationCache) registerService(log logr.Logger, reg *v1alpha1.Regis
return err
}

_, err = client.Catalog().Register(regReq, nil)
_, err = client.Catalog().Register(regReq, &capi.WriteOptions{Namespace: reg.Spec.Service.Namespace})
if err != nil {
log.Error(err, "error registering service", "svcName", regReq.Service.Service)
return err
}

c.serviceMtx.Lock()
defer c.serviceMtx.Unlock()
c.Services[reg.Spec.Service.Name] = reg
if !c.namespaces.Contains(reg.Spec.Service.Namespace) && !emptyOrDefault(reg.Spec.Service.Namespace) {
c.namespaces.Add(reg.Spec.Service.Namespace)
go c.run(log, reg.Spec.Service.Namespace)
}

log.Info("Successfully registered service", "svcName", regReq.Service.Service)

return nil
}

func emptyOrDefault(s string) bool {
return s == "" || s == "default"
}

func (c *RegistrationCache) updateTermGWACLRole(log logr.Logger, registration *v1alpha1.Registration, termGWsToUpdate []v1alpha1.TerminatingGateway) error {
if len(termGWsToUpdate) == 0 {
log.Info("terminating gateway not found")
Expand All @@ -148,38 +228,52 @@ func (c *RegistrationCache) updateTermGWACLRole(log logr.Logger, registration *v
return err
}

roles, _, err := client.ACL().RoleList(nil)
if err != nil {
log.Error(err, "error reading role list")
return err
var data bytes.Buffer
if err := gatewayTpl.Execute(&data, templateArgs{
EnablePartitions: c.partitionsEnabled,
Partition: registration.Spec.Service.Partition,
EnableNamespaces: c.namespacesEnabled,
Namespace: registration.Spec.Service.Namespace,
ServiceName: registration.Spec.Service.Name,
}); err != nil {
// just panic if we can't compile the simple template
// as it means something else is going severly wrong.
panic(err)
}

policy := &capi.ACLPolicy{
Name: servicePolicyName(registration.Spec.Service.Name),
Description: "Write policy for terminating gateways for external service",
Rules: fmt.Sprintf(`service %q { policy = "write" }`, registration.Spec.Service.Name),
Datacenters: []string{registration.Spec.Datacenter},
Namespace: registration.Spec.Service.Namespace,
Partition: registration.Spec.Service.Partition,
}
var mErr error
for _, termGW := range termGWsToUpdate {
// the terminating gateway role is _always_ in the default namespace
roles, _, err := client.ACL().RoleList(&capi.QueryOptions{})
if err != nil {
log.Error(err, "error reading role list")
return err
}

existingPolicy, _, err := client.ACL().PolicyReadByName(policy.Name, nil)
if err != nil {
log.Error(err, "error reading policy")
return err
}
policy := &capi.ACLPolicy{
Name: servicePolicyName(registration.Spec.Service.Name),
Description: "Write policy for terminating gateways for external service",
Rules: data.String(),
Datacenters: []string{registration.Spec.Datacenter},
}

if existingPolicy == nil {
policy, _, err = client.ACL().PolicyCreate(policy, nil)
existingPolicy, _, err := client.ACL().PolicyReadByName(policy.Name, &capi.QueryOptions{})
if err != nil {
return fmt.Errorf("error creating policy: %w", err)
log.Error(err, "error reading policy")
return err
}
} else {
policy = existingPolicy
}

var mErr error
for _, termGW := range termGWsToUpdate {
// we don't need to include the namespace/partition here because all roles and policies are created in the default namespace for consul-k8s managed resources.
writeOpts := &capi.WriteOptions{}
jm96441n marked this conversation as resolved.
Show resolved Hide resolved

if existingPolicy == nil {
policy, _, err = client.ACL().PolicyCreate(policy, writeOpts)
if err != nil {
return fmt.Errorf("error creating policy: %w", err)
}
} else {
policy = existingPolicy
}
var role *capi.ACLRole
for _, r := range roles {
if strings.HasSuffix(r.Name, fmt.Sprintf("-%s-acl-role", termGW.Name)) {
Expand All @@ -196,7 +290,7 @@ func (c *RegistrationCache) updateTermGWACLRole(log logr.Logger, registration *v

role.Policies = append(role.Policies, &capi.ACLRolePolicyLink{Name: policy.Name, ID: policy.ID})

_, _, err = client.ACL().RoleUpdate(role, nil)
_, _, err = client.ACL().RoleUpdate(role, writeOpts)
if err != nil {
log.Error(err, "error updating role", "roleName", role.Name)
mErr = errors.Join(mErr, fmt.Errorf("error updating role %q", role.Name))
Expand Down Expand Up @@ -239,13 +333,17 @@ func (c *RegistrationCache) removeTermGWACLRole(log logr.Logger, registration *v
return err
}

roles, _, err := client.ACL().RoleList(nil)
if err != nil {
return err
}

var mErr error
for _, termGW := range termGWsToUpdate {

// we don't need to include the namespace/partition here because all roles and policies are created in the default namespace for consul-k8s managed resources.
queryOpts := &capi.QueryOptions{}
writeOpts := &capi.WriteOptions{}

roles, _, err := client.ACL().RoleList(queryOpts)
if err != nil {
return err
}
var role *capi.ACLRole
for _, r := range roles {
if strings.HasSuffix(r.Name, fmt.Sprintf("-%s-acl-role", termGW.Name)) {
Expand Down Expand Up @@ -276,14 +374,14 @@ func (c *RegistrationCache) removeTermGWACLRole(log logr.Logger, registration *v
continue
}

_, _, err = client.ACL().RoleUpdate(role, nil)
_, _, err = client.ACL().RoleUpdate(role, writeOpts)
if err != nil {
log.Error(err, "error updating role", "roleName", role.Name)
mErr = errors.Join(mErr, fmt.Errorf("error updating role %q", role.Name))
continue
}

_, err = client.ACL().PolicyDelete(policyID, nil)
_, err = client.ACL().PolicyDelete(policyID, writeOpts)
if err != nil {
log.Error(err, "error deleting service policy", "policyID", policyID, "policyName", expectedPolicyName)
mErr = errors.Join(mErr, fmt.Errorf("error deleting service ACL policy %q", policyID))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (r *RegistrationsController) Logger(name types.NamespacedName) logr.Logger

func (r *RegistrationsController) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
// setup the cache
go r.Cache.run(ctx, r.Log)
go r.Cache.run(r.Log, "")
r.Cache.waitSynced(ctx)

go r.watchForDeregistrations(ctx)
Expand Down
Loading
Loading