Skip to content

Commit

Permalink
Native API Gateway Config Entries (#15897)
Browse files Browse the repository at this point in the history
* Stub Config Entries for Consul Native API Gateway (#15644)
* Add empty InlineCertificate struct and protobuf
* apigateway stubs
* Stub HTTPRoute in api pkg
* Stub HTTPRoute in structs pkg
* Simplify api.APIGatewayConfigEntry to be consistent w/ other entries
* Update makeConfigEntry switch, add docstring for HTTPRouteConfigEntry
* Add TCPRoute to MakeConfigEntry, return unique Kind
* Stub BoundAPIGatewayConfigEntry in agent
* Add RaftIndex to APIGatewayConfigEntry stub
* Add new config entry kinds to validation allow-list
* Add RaftIndex to other added config entry stubs
* Update usage metrics assertions to include new cfg entries
* Add Meta and acl.EnterpriseMeta to all new ConfigEntry types
* Remove unnecessary Services field from added config entry types
* Implement GetMeta(), GetEnterpriseMeta() for added config entry types
* Add meta field to proto, name consistently w/ existing config entries
* Format config_entry.proto
* Add initial implementation of CanRead + CanWrite for new config entry types
* Add unit tests for decoding of new config entry types
* Add unit tests for parsing of new config entry types
* Add unit tests for API Gateway config entry ACLs
* Return typed PermissionDeniedError on BoundAPIGateway CanWrite
* Add unit tests for added config entry ACLs
* Add BoundAPIGateway type to AllConfigEntryKinds
* Return proper kind from BoundAPIGateway
* Add docstrings for new config entry types
* Add missing config entry kinds to proto def
* Update usagemetrics_oss_test.go
* Use utility func for returning PermissionDeniedError
* EventPublisher subscriptions for Consul Native API Gateway (#15757)
* Create new event topics in subscribe proto
* Add tests for PBSubscribe func
* Make configs singular, add all configs to PBToStreamSubscribeRequest
* Add snapshot methods
* Add config_entry_events tests
* Add config entry kind to topic for new configs
* Add unit tests for snapshot methods
* Start adding integration test
* Test using the new controller code
* Update agent/consul/state/config_entry_events.go
* Check value of error
* Add controller stubs for API Gateway (#15837)
* update initial stub implementation
* move files, clean up mutex references
* Remove embed, use idiomatic names for constructors
* Remove stray file introduced in merge
* Add APIGateway validation (#15847)
* Add APIGateway validation
* Add additional validations
* Add cert ref validation
* Add protobuf definitions
* Fix up field types
* Add API structs
* Move struct fields around a bit
* APIGateway InlineCertificate validation (#15856)
* Add APIGateway validation
* Add additional validations
* Add protobuf definitions
* Tabs to spaces
* Add API structs
* Move struct fields around a bit
* Add validation for InlineCertificate
* Fix ACL test
* APIGateway BoundAPIGateway validation (#15858)
* Add APIGateway validation
* Add additional validations
* Add cert ref validation
* Add protobuf definitions
* Fix up field types
* Add API structs
* Move struct fields around a bit
* Add validation for BoundAPIGateway
* APIGateway TCPRoute validation (#15855)
* Add APIGateway validation
* Add additional validations
* Add cert ref validation
* Add protobuf definitions
* Fix up field types
* Add API structs
* Add TCPRoute normalization and validation
* Add forgotten Status
* Add some more field docs in api package
* Fix test
* Format imports
* Rename snapshot test variable names
* Add plumbing for Native API GW Subscriptions (#16003)

Co-authored-by: Sarah Alsmiller <[email protected]>
Co-authored-by: Nathan Coleman <[email protected]>
Co-authored-by: sarahalsmiller <[email protected]>
Co-authored-by: Andrew Stucki <[email protected]>
  • Loading branch information
5 people authored Jan 18, 2023
1 parent 4e15414 commit 13da1a5
Show file tree
Hide file tree
Showing 37 changed files with 5,711 additions and 452 deletions.
2 changes: 1 addition & 1 deletion acl/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type PermissionDeniedError struct {
Accessor string
// Resource (e.g. Service)
Resource Resource
// Access leve (e.g. Read)
// Access level (e.g. Read)
AccessLevel AccessLevel
// e.g. "sidecar-proxy-1"
ResourceID ResourceDescriptor
Expand Down
146 changes: 145 additions & 1 deletion agent/consul/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ LOOP:
}
}

// since we only modified each entry once, we should have exactly 200 reconcliation calls
// since we only modified each entry once, we should have exactly 200 reconciliation calls
require.Len(t, received, 200)
for i := 0; i < 200; i++ {
require.Contains(t, received, fmt.Sprintf("foo-%d", i))
Expand Down Expand Up @@ -271,3 +271,147 @@ func TestBasicController_RunPanicAssertions(t *testing.T) {
controller.WithQueueFactory(RunWorkQueue)
})
}

func TestConfigEntrySubscriptions(t *testing.T) {
t.Parallel()

cases := map[string]struct {
configEntry func(string) structs.ConfigEntry
topic stream.Topic
kind string
}{
"Subscribe to Service Resolver Config Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: name,
}
},
topic: state.EventTopicServiceResolver,
kind: structs.ServiceResolver,
},
"Subscribe to Ingress Gateway Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: name,
}
},
topic: state.EventTopicIngressGateway,
kind: structs.IngressGateway,
},
"Subscribe to Service Intentions Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: name,
}
},
topic: state.EventTopicServiceIntentions,
kind: structs.ServiceIntentions,
},
"Subscribe to API Gateway Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.APIGatewayConfigEntry{
Kind: structs.APIGateway,
Name: name,
}
},
topic: state.EventTopicAPIGateway,
kind: structs.APIGateway,
},
"Subscribe to Inline Certificate Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.InlineCertificateConfigEntry{
Kind: structs.InlineCertificate,
Name: name,
}
},
topic: state.EventTopicInlineCertificate,
kind: structs.InlineCertificate,
},
"Subscribe to HTTP Route Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.HTTPRouteConfigEntry{
Kind: structs.HTTPRoute,
Name: name,
}
},
topic: state.EventTopicHTTPRoute,
kind: structs.HTTPRoute,
},
"Subscribe to TCP Route Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.TCPRouteConfigEntry{
Kind: structs.TCPRoute,
Name: name,
}
},
topic: state.EventTopicTCPRoute,
kind: structs.TCPRoute,
},
"Subscribe to Bound API Gateway Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.BoundAPIGatewayConfigEntry{
Kind: structs.BoundAPIGateway,
Name: name,
}
},
topic: state.EventTopicBoundAPIGateway,
kind: structs.BoundAPIGateway,
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

reconciler := newTestReconciler(false)

publisher := stream.NewEventPublisher(1 * time.Millisecond)
go publisher.Run(ctx)

// get the store through the FSM since the publisher handlers get registered through it
store := fsm.NewFromDeps(fsm.Deps{
Logger: hclog.New(nil),
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
}).State()

for i := 0; i < 200; i++ {
entryIndex := uint64(i + 1)
name := fmt.Sprintf("foo-%d", i)
require.NoError(t, store.EnsureConfigEntry(entryIndex, tc.configEntry(name)))
}

go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
Topic: tc.topic,
Subject: stream.SubjectWildcard,
}).WithWorkers(10).Run(ctx)

received := []string{}
LOOP:
for {
select {
case request := <-reconciler.received:
require.Equal(t, tc.kind, request.Kind)
received = append(received, request.Name)
if len(received) == 200 {
break LOOP
}
case <-ctx.Done():
break LOOP
}
}

// since we only modified each entry once, we should have exactly 200 reconciliation calls
require.Len(t, received, 200)
for i := 0; i < 200; i++ {
require.Contains(t, received, fmt.Sprintf("foo-%d", i))
}
})
}
}
35 changes: 35 additions & 0 deletions agent/consul/fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,4 +342,39 @@ func (c *FSM) registerStreamSnapshotHandlers() {
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}

err = c.deps.Publisher.RegisterHandler(state.EventTopicAPIGateway, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().APIGatewaySnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}

err = c.deps.Publisher.RegisterHandler(state.EventTopicInlineCertificate, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().InlineCertificateSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}

err = c.deps.Publisher.RegisterHandler(state.EventTopicHTTPRoute, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().HTTPRouteSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}

err = c.deps.Publisher.RegisterHandler(state.EventTopicTCPRoute, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().TCPRouteSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}

err = c.deps.Publisher.RegisterHandler(state.EventTopicBoundAPIGateway, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().BoundAPIGatewaySnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}
}
34 changes: 34 additions & 0 deletions agent/consul/gateways/controller_gateways.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package gateways

import (
"context"

"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul/agent/consul/controller"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
)

type apiGatewayReconciler struct {
fsm *fsm.FSM
logger hclog.Logger
}

func (r apiGatewayReconciler) Reconcile(ctx context.Context, req controller.Request) error {
return nil
}

func NewAPIGatewayController(fsm *fsm.FSM, publisher state.EventPublisher, logger hclog.Logger) controller.Controller {
reconciler := apiGatewayReconciler{
fsm: fsm,
logger: logger,
}
return controller.New(publisher, reconciler).Subscribe(
&stream.SubscribeRequest{
Topic: state.EventTopicAPIGateway,
Subject: stream.SubjectWildcard,
},
)
}
56 changes: 56 additions & 0 deletions agent/consul/gateways/controller_routes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package gateways

import (
"context"

"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul/agent/consul/controller"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
)

type tcpRouteReconciler struct {
fsm *fsm.FSM
logger hclog.Logger
}

func (r tcpRouteReconciler) Reconcile(ctx context.Context, req controller.Request) error {
return nil
}

func NewTCPRouteController(fsm *fsm.FSM, publisher state.EventPublisher, logger hclog.Logger) controller.Controller {
reconciler := tcpRouteReconciler{
fsm: fsm,
logger: logger,
}
return controller.New(publisher, reconciler).Subscribe(
&stream.SubscribeRequest{
Topic: state.EventTopicTCPRoute,
Subject: stream.SubjectWildcard,
},
)
}

type httpRouteReconciler struct {
fsm *fsm.FSM
logger hclog.Logger
}

func (r httpRouteReconciler) Reconcile(ctx context.Context, req controller.Request) error {
return nil
}

func NewHTTPRouteController(fsm *fsm.FSM, publisher state.EventPublisher, logger hclog.Logger) controller.Controller {
reconciler := httpRouteReconciler{
fsm: fsm,
logger: logger,
}
return controller.New(publisher, reconciler).Subscribe(
&stream.SubscribeRequest{
Topic: state.EventTopicHTTPRoute,
Subject: stream.SubjectWildcard,
},
)
}
5 changes: 5 additions & 0 deletions agent/consul/state/config_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,11 @@ func validateProposedConfigEntryInGraph(
case structs.ServiceIntentions:
case structs.MeshConfig:
case structs.ExportedServices:
case structs.APIGateway: // TODO Consider checkGatewayClash
case structs.BoundAPIGateway:
case structs.InlineCertificate:
case structs.HTTPRoute:
case structs.TCPRoute:
default:
return fmt.Errorf("unhandled kind %q during validation of %q", kindName.Kind, kindName.Name)
}
Expand Down
35 changes: 35 additions & 0 deletions agent/consul/state/config_entry_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ var configEntryKindToTopic = map[string]stream.Topic{
structs.IngressGateway: EventTopicIngressGateway,
structs.ServiceIntentions: EventTopicServiceIntentions,
structs.ServiceDefaults: EventTopicServiceDefaults,
structs.APIGateway: EventTopicAPIGateway,
structs.TCPRoute: EventTopicTCPRoute,
structs.HTTPRoute: EventTopicHTTPRoute,
structs.InlineCertificate: EventTopicInlineCertificate,
structs.BoundAPIGateway: EventTopicBoundAPIGateway,
}

// EventSubjectConfigEntry is a stream.Subject used to route and receive events
Expand Down Expand Up @@ -117,6 +122,36 @@ func (s *Store) ServiceDefaultsSnapshot(req stream.SubscribeRequest, buf stream.
return s.configEntrySnapshot(structs.ServiceDefaults, req, buf)
}

// APIGatewaySnapshot is a stream.SnapshotFunc that returns a snapshot of
// api-gateway config entries.
func (s *Store) APIGatewaySnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.APIGateway, req, buf)
}

// TCPRouteSnapshot is a stream.SnapshotFunc that returns a snapshot of
// tcp-route config entries.
func (s *Store) TCPRouteSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.TCPRoute, req, buf)
}

// HTTPRouteSnapshot is a stream.SnapshotFunc that retuns a snapshot of
// http-route config entries.
func (s *Store) HTTPRouteSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.HTTPRoute, req, buf)
}

// InlineCertificateSnapshot is a stream.SnapshotFunc that returns a snapshot of
// inline-certificate config entries.
func (s *Store) InlineCertificateSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.InlineCertificate, req, buf)
}

// BoundAPIGatewaySnapshot is a stream.SnapshotFunc that returns a snapshot of
// bound-api-gateway config entries.
func (s *Store) BoundAPIGatewaySnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.BoundAPIGateway, req, buf)
}

func (s *Store) configEntrySnapshot(kind string, req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
var (
idx uint64
Expand Down
Loading

0 comments on commit 13da1a5

Please sign in to comment.