From 2b922992c8f4ac02958b259b119257e0211e2260 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Fri, 16 Aug 2024 01:59:41 -0400 Subject: [PATCH] Add filters to event policy (#8122) * feat: add filters to eventpolicy type Signed-off-by: Calum Murray * feat: add filters to eventpolicy crd Signed-off-by: Calum Murray * feat: updated auth package to handle filters as well Signed-off-by: Calum Murray * fix build issues, add unit tests Signed-off-by: Calum Murray * address review comments Signed-off-by: Calum Murray * test: added rekt test for eventpolicy with filters Signed-off-by: Calum Murray * fix(test): event policy resources work Signed-off-by: Calum Murray * small fixes to rekt test Signed-off-by: Calum Murray * fix: tests ran in correct order Signed-off-by: Calum Murray --------- Signed-off-by: Calum Murray --- config/core/resources/eventpolicy.yaml | 38 +++++ docs/eventing-api.md | 38 ++++- pkg/adapter/apiserver/adapter.go | 3 +- pkg/adapter/apiserver/adapter_test.go | 5 +- pkg/adapter/apiserver/delegate_test.go | 5 +- .../eventing/v1alpha1/eventpolicy_types.go | 11 ++ .../v1alpha1/eventpolicy_validation.go | 3 + .../v1alpha1/eventpolicy_validation_test.go | 40 +++++ .../v1alpha1/zz_generated.deepcopy.go | 8 + pkg/auth/event_policy.go | 28 ++-- pkg/auth/event_policy_test.go | 139 +++++++++++++----- pkg/auth/token_verifier.go | 65 +++++++- pkg/broker/filter/filter_handler.go | 69 +-------- pkg/broker/filter/filter_handler_test.go | 2 +- pkg/eventfilter/subscriptionsapi/create.go | 87 +++++++++++ .../authz/addressable_authz_conformance.go | 69 +++++++++ .../rekt/resources/eventpolicy/eventpolicy.go | 33 ++++- .../resources/eventpolicy/eventpolicy.yaml | 5 + .../resources/eventpolicy/eventpolicy_test.go | 8 + 19 files changed, 529 insertions(+), 127 deletions(-) create mode 100644 pkg/eventfilter/subscriptionsapi/create.go diff --git a/config/core/resources/eventpolicy.yaml b/config/core/resources/eventpolicy.yaml index 159d379e5bb..b68003ac1af 100644 --- a/config/core/resources/eventpolicy.yaml +++ b/config/core/resources/eventpolicy.yaml @@ -110,6 +110,44 @@ spec: description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed. type: object x-kubernetes-preserve-unknown-fields: true + filters: + description: 'Filters is an array of SubscriptionsAPIFilters that evaluate to true or false. If any filter expression in the array evaluates to false, the event will not continue pass the ingress of the target resources of the policy' + type: array + items: + type: object + properties: + all: + description: 'All evaluates to true if all the nested expressions evaluate to true. It must contain at least one filter expression' + type: array + items: + type: object + x-kubernetes-preserve-unknown-fields: true + any: + description: 'Any evaluates to true if any of the nested expressions evaluate to true. It must contain at least one filter expression' + type: array + items: + type: object + x-kubernetes-preserve-unknown-fields: true + cesql: + description: 'CESQL is a CloudEvents SQL v1 expression that will evaluate to true or false for each CloudEvent.' + type: string + exact: + description: 'Exact evaluates to true if the values of the matching CloudEvents attributes all exactly match with the associated value string specified (case sensitive)' + type: object + x-kubernetes-preserve-unknown-fields: true + not: + description: 'Not evaluates to true if the nested expression evaluates to false.' + type: object + x-kubernetes-preserve-unknown-fields: true + prefix: + description: 'Prefix evaluates to true if the values of the matching CloudEvents attributes all start with the associated value string specified (case sensitive)' + type: object + x-kubernetes-preserve-unknown-fields: true + suffix: + description: 'Exact evaluates to true if the values of the matching CloudEvents attributes all end with the associated value string specified (case sensitive)' + type: object + x-kubernetes-preserve-unknown-fields: true + status: description: Status represents the current state of the EventPolicy. This data may be out of date. type: object diff --git a/docs/eventing-api.md b/docs/eventing-api.md index 213521267f1..196ab597488 100644 --- a/docs/eventing-api.md +++ b/docs/eventing-api.md @@ -2254,7 +2254,7 @@ AppliedEventPoliciesStatus

SubscriptionsAPIFilter

-(Appears on:SubscriptionsAPIFilter, TriggerSpec, ApiServerSourceSpec) +(Appears on:SubscriptionsAPIFilter, TriggerSpec, EventPolicySpec, ApiServerSourceSpec)

SubscriptionsAPIFilter allows defining a filter expression using CloudEvents @@ -2735,6 +2735,24 @@ An empty list means it applies to all resources in the EventPolicies namespaceFrom is the list of sources or oidc identities, which are allowed to send events to the targets (.spec.to).

+ + +filters
+ + +[]SubscriptionsAPIFilter + + + + +(Optional) +

Filters is the list of SubscriptoinsApi filters which determine whether or not the event is accepted. +It is an array of filter expressions that evaluate to true or false. +If any filter expression in the array evaluates to false, the event will not +pass the target resource’s ingress. Absence of any filters implies that the filters +always evaluate to true.

+ + @@ -2898,6 +2916,24 @@ An empty list means it applies to all resources in the EventPolicies namespaceFrom is the list of sources or oidc identities, which are allowed to send events to the targets (.spec.to).

+ + +filters
+ + +[]SubscriptionsAPIFilter + + + + +(Optional) +

Filters is the list of SubscriptoinsApi filters which determine whether or not the event is accepted. +It is an array of filter expressions that evaluate to true or false. +If any filter expression in the array evaluates to false, the event will not +pass the target resource’s ingress. Absence of any filters implies that the filters +always evaluate to true.

+ +

EventPolicySpecFrom diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index aba3301306c..cda3e617e44 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -34,7 +34,6 @@ import ( "knative.dev/eventing/pkg/adapter/v2" v1 "knative.dev/eventing/pkg/apis/sources/v1" - brokerfilter "knative.dev/eventing/pkg/broker/filter" "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" ) @@ -73,7 +72,7 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er logger: a.logger, ref: a.config.EventMode == v1.ReferenceMode, apiServerSourceName: a.name, - filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...), + filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...), } if a.config.ResourceOwner != nil { a.logger.Infow("will be filtered", diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 93b33df0bb0..8c931170f60 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -35,7 +35,6 @@ import ( kubetesting "k8s.io/client-go/testing" adaptertest "knative.dev/eventing/pkg/adapter/v2/test" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" - brokerfilter "knative.dev/eventing/pkg/broker/filter" "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" rectesting "knative.dev/eventing/pkg/reconciler/testing" "knative.dev/pkg/logging" @@ -299,7 +298,7 @@ func makeResourceAndTestingClient() (*resourceDelegate, *adaptertest.TestCloudEv source: "unit-test", apiServerSourceName: apiServerSourceNameTest, logger: logger, - filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...), + filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...), }, ce } @@ -313,6 +312,6 @@ func makeRefAndTestingClient() (*resourceDelegate, *adaptertest.TestCloudEventsC apiServerSourceName: apiServerSourceNameTest, logger: zap.NewExample().Sugar(), ref: true, - filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...), + filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...), }, ce } diff --git a/pkg/adapter/apiserver/delegate_test.go b/pkg/adapter/apiserver/delegate_test.go index 00fc9dfe691..d2d978b1f5a 100644 --- a/pkg/adapter/apiserver/delegate_test.go +++ b/pkg/adapter/apiserver/delegate_test.go @@ -22,7 +22,6 @@ import ( adaptertest "knative.dev/eventing/pkg/adapter/v2/test" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/apis/sources" - brokerfilter "knative.dev/eventing/pkg/broker/filter" "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" ) @@ -87,7 +86,7 @@ func TestFilterFails(t *testing.T) { source: "unit-test", apiServerSourceName: apiServerSourceNameTest, logger: logger, - filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), filters)...), + filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), filters)...), } delegate.Update(simplePod("unit", "test")) @@ -104,7 +103,7 @@ func TestEmptyFiltersList(t *testing.T) { source: "unit-test", apiServerSourceName: apiServerSourceNameTest, logger: logger, - filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), filters)...), + filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), filters)...), } delegate.Update(simplePod("unit", "test")) diff --git a/pkg/apis/eventing/v1alpha1/eventpolicy_types.go b/pkg/apis/eventing/v1alpha1/eventpolicy_types.go index 53d62653444..e3c312aae9f 100644 --- a/pkg/apis/eventing/v1alpha1/eventpolicy_types.go +++ b/pkg/apis/eventing/v1alpha1/eventpolicy_types.go @@ -20,9 +20,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmeta" + + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" ) // +genclient @@ -71,6 +74,14 @@ type EventPolicySpec struct { // From is the list of sources or oidc identities, which are allowed to send events to the targets (.spec.to). From []EventPolicySpecFrom `json:"from,omitempty"` + + // Filters is the list of SubscriptoinsApi filters which determine whether or not the event is accepted. + // It is an array of filter expressions that evaluate to true or false. + // If any filter expression in the array evaluates to false, the event will not + // pass the target resource's ingress. Absence of any filters implies that the filters + // always evaluate to true. + // +optional + Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"` } type EventPolicySpecTo struct { diff --git a/pkg/apis/eventing/v1alpha1/eventpolicy_validation.go b/pkg/apis/eventing/v1alpha1/eventpolicy_validation.go index 5f05c240df9..4ae6ea3f7c2 100644 --- a/pkg/apis/eventing/v1alpha1/eventpolicy_validation.go +++ b/pkg/apis/eventing/v1alpha1/eventpolicy_validation.go @@ -20,6 +20,7 @@ import ( "context" "strings" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/apis" ) @@ -60,6 +61,8 @@ func (ets *EventPolicySpec) Validate(ctx context.Context) *apis.FieldError { } } + err = err.Also(eventingv1.ValidateSubscriptionAPIFiltersList(ctx, ets.Filters).ViaField("filters")) + return err } diff --git a/pkg/apis/eventing/v1alpha1/eventpolicy_validation_test.go b/pkg/apis/eventing/v1alpha1/eventpolicy_validation_test.go index 20034c8f907..14c40ddeb97 100644 --- a/pkg/apis/eventing/v1alpha1/eventpolicy_validation_test.go +++ b/pkg/apis/eventing/v1alpha1/eventpolicy_validation_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/apis" "knative.dev/pkg/ptr" @@ -294,6 +295,45 @@ func TestEventPolicySpecValidationWithOIDCAuthenticationFeatureFlagEnabled(t *te return nil }(), }, + { + name: "valid, from.sub exactly '*', valid filters", + ep: &EventPolicy{ + Spec: EventPolicySpec{ + From: []EventPolicySpecFrom{{ + Sub: ptr.String("*"), + }}, + Filters: []eventingv1.SubscriptionsAPIFilter{ + { + Prefix: map[string]string{"type": "example"}, + }, + }, + }, + }, + want: func() *apis.FieldError { + return nil + }(), + }, + { + name: "invalid, from.sub exactly '*', invalid cesql filter", + ep: &EventPolicy{ + Spec: EventPolicySpec{ + From: []EventPolicySpecFrom{{ + Sub: ptr.String("*"), + }}, + Filters: []eventingv1.SubscriptionsAPIFilter{ + { + CESQL: "type LIKE id", + }, + }, + }, + }, + want: func() *apis.FieldError { + + return apis.ErrInvalidValue("type LIKE id", "cesql", "parse error: syntax error: |failed to parse LIKE expression: the pattern was not a string literal"). + ViaFieldIndex("filters", 0). + ViaField("spec") + }(), + }, } for _, test := range tests { diff --git a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go index 998b577cad3..068369c53ba 100644 --- a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go @@ -24,6 +24,7 @@ package v1alpha1 import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -146,6 +147,13 @@ func (in *EventPolicySpec) DeepCopyInto(out *EventPolicySpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Filters != nil { + in, out := &in.Filters, &out.Filters + *out = make([]eventingv1.SubscriptionsAPIFilter, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } return } diff --git a/pkg/auth/event_policy.go b/pkg/auth/event_policy.go index d8635167439..972eb1147ef 100644 --- a/pkg/auth/event_policy.go +++ b/pkg/auth/event_policy.go @@ -17,10 +17,16 @@ limitations under the License. package auth import ( + "context" "fmt" "sort" "strings" + cloudevents "github.com/cloudevents/sdk-go/v2" + "go.uber.org/zap" + "knative.dev/eventing/pkg/eventfilter" + "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/feature" @@ -210,17 +216,19 @@ func resolveSubjectsFromReference(resolver *resolver.AuthenticatableResolver, re return objFullSANames, nil } -// SubjectContained checks if the given sub is contained in the list of allowedSubs -// or if it matches a prefix pattern in subs (e.g. system:serviceaccounts:my-ns:*) -func SubjectContained(sub string, allowedSubs []string) bool { - for _, s := range allowedSubs { - if strings.EqualFold(s, sub) { - return true - } +// SubjectAndFiltersPass checks if the given sub is contained in the list of allowedSubs +// or if it matches a prefix pattern in subs (e.g. system:serviceaccounts:my-ns:*), as +// well as if the event passes any filters associated with the subjects for an event policy +func SubjectAndFiltersPass(ctx context.Context, sub string, allowedSubsWithFilters []subjectsWithFilters, event *cloudevents.Event, logger *zap.SugaredLogger) bool { + if event == nil { + return false + } - if strings.HasSuffix(s, "*") && - strings.HasPrefix(sub, strings.TrimSuffix(s, "*")) { - return true + for _, swf := range allowedSubsWithFilters { + for _, s := range swf.subjects { + if strings.EqualFold(s, sub) || (strings.HasSuffix(s, "*") && strings.HasPrefix(sub, strings.TrimSuffix(s, "*"))) { + return subscriptionsapi.CreateSubscriptionsAPIFilters(logger.Desugar(), swf.filters).Filter(ctx, *event) != eventfilter.FailFilter + } } } diff --git a/pkg/auth/event_policy_test.go b/pkg/auth/event_policy_test.go index 124f1423173..c7396869b63 100644 --- a/pkg/auth/event_policy_test.go +++ b/pkg/auth/event_policy_test.go @@ -22,7 +22,11 @@ import ( "strings" "testing" + "go.uber.org/zap" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/utils/ptr" + + cetest "github.com/cloudevents/sdk-go/v2/test" "github.com/google/go-cmp/cmp" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,7 +42,6 @@ import ( duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/client/injection/ducks/duck/v1/authstatus" fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" - "knative.dev/pkg/ptr" reconcilertesting "knative.dev/pkg/reconciler/testing" "knative.dev/pkg/resolver" "knative.dev/pkg/tracker" @@ -541,9 +544,9 @@ func TestResolveSubjects(t *testing.T) { Namespace: namespace, }, }, { - Sub: ptr.String("system:serviceaccount:my-ns:my-app"), + Sub: ptr.To("system:serviceaccount:my-ns:my-app"), }, { - Sub: ptr.String("system:serviceaccount:my-ns:my-app-2"), + Sub: ptr.To("system:serviceaccount:my-ns:my-app-2"), }, }, objects: []runtime.Object{ @@ -555,7 +558,7 @@ func TestResolveSubjects(t *testing.T) { Status: sourcesv1.ApiServerSourceStatus{ SourceStatus: duckv1.SourceStatus{ Auth: &duckv1.AuthStatus{ - ServiceAccountName: ptr.String("my-apiserversource-oidc-sa"), + ServiceAccountName: ptr.To("my-apiserversource-oidc-sa"), }, }, }, @@ -591,9 +594,9 @@ func TestResolveSubjects(t *testing.T) { Namespace: namespace, }, }, { - Sub: ptr.String("system:serviceaccount:my-ns:my-app"), + Sub: ptr.To("system:serviceaccount:my-ns:my-app"), }, { - Sub: ptr.String("system:serviceaccount:my-ns:my-app-2"), + Sub: ptr.To("system:serviceaccount:my-ns:my-app-2"), }, }, objects: []runtime.Object{ @@ -605,7 +608,7 @@ func TestResolveSubjects(t *testing.T) { Status: sourcesv1.ApiServerSourceStatus{ SourceStatus: duckv1.SourceStatus{ Auth: &duckv1.AuthStatus{ - ServiceAccountName: ptr.String("my-apiserversource-oidc-sa"), + ServiceAccountName: ptr.To("my-apiserversource-oidc-sa"), }, }, }, @@ -618,7 +621,7 @@ func TestResolveSubjects(t *testing.T) { Status: sourcesv1.PingSourceStatus{ SourceStatus: duckv1.SourceStatus{ Auth: &duckv1.AuthStatus{ - ServiceAccountName: ptr.String("my-pingsource-oidc-sa"), + ServiceAccountName: ptr.To("my-pingsource-oidc-sa"), }, }, }, @@ -692,78 +695,148 @@ func TestResolveSubjects(t *testing.T) { } } -func TestSubjectContained(t *testing.T) { +func TestSubjectAndFiltersContained(t *testing.T) { tests := []struct { - name string - sub string - allowedSubs []string - want bool + name string + sub string + allowedSubsAndFilters []subjectsWithFilters + want bool }{ { name: "simple 1:1 match", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubs: []string{ - "system:serviceaccounts:my-ns:my-sa", + allowedSubsAndFilters: []subjectsWithFilters{ + { + subjects: []string{"system:serviceaccounts:my-ns:my-sa"}, + }, }, want: true, }, { name: "simple 1:n match", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubs: []string{ - "system:serviceaccounts:my-ns:another-sa", - "system:serviceaccounts:my-ns:my-sa", - "system:serviceaccounts:my-ns:yet-another-sa", + allowedSubsAndFilters: []subjectsWithFilters{ + { + subjects: []string{ + "system:serviceaccounts:my-ns:another-sa", + "system:serviceaccounts:my-ns:my-sa", + "system:serviceaccounts:my-ns:yet-another-sa"}, + }, }, want: true, }, { name: "pattern match (all)", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubs: []string{ - "*", + allowedSubsAndFilters: []subjectsWithFilters{ + { + subjects: []string{ + "*"}, + }, }, want: true, }, { name: "pattern match (namespace)", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubs: []string{ - "system:serviceaccounts:my-ns:*", + allowedSubsAndFilters: []subjectsWithFilters{ + { + subjects: []string{ + "system:serviceaccounts:my-ns:*", + }, + }, }, want: true, }, { name: "pattern match (different namespace)", sub: "system:serviceaccounts:my-ns-2:my-sa", - allowedSubs: []string{ - "system:serviceaccounts:my-ns:*", + allowedSubsAndFilters: []subjectsWithFilters{ + { + subjects: []string{ + "system:serviceaccounts:my-ns:*", + }, + }, }, want: false, }, { name: "pattern match (namespace prefix)", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubs: []string{ - "system:serviceaccounts:my-ns*", + allowedSubsAndFilters: []subjectsWithFilters{ + { + subjects: []string{ + "system:serviceaccounts:my-ns*", + }, + }, }, want: true, }, { name: "pattern match (namespace prefix 2)", sub: "system:serviceaccounts:my-ns-2:my-sa", - allowedSubs: []string{ - "system:serviceaccounts:my-ns*", + allowedSubsAndFilters: []subjectsWithFilters{ + { + subjects: []string{ + "system:serviceaccounts:my-ns*", + }, + }, }, want: true, }, { name: "pattern match (middle)", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubs: []string{ - "system:serviceaccounts:*:my-sa", + allowedSubsAndFilters: []subjectsWithFilters{ + { + subjects: []string{ + "system:serviceaccounts:*:my-sa", + }, + }, + }, + want: false, + }, { + name: "pattern match (namespace prefix) and failing event filter", + sub: "system:serviceaccounts:my-ns:my-sa", + allowedSubsAndFilters: []subjectsWithFilters{ + { + subjects: []string{ + "system:serviceaccounts:my-ns*", + }, + filters: []eventingv1.SubscriptionsAPIFilter{ + { + CESQL: "false", + }, + }, + }, + }, + want: false, + }, { + name: "only check filter if subject matches", + sub: "system:serviceaccounts:my-ns:my-sa", + allowedSubsAndFilters: []subjectsWithFilters{ + { + subjects: []string{ + "system:serviceaccounts:not-my-ns*", + }, + filters: []eventingv1.SubscriptionsAPIFilter{ + { + CESQL: "true", + }, + }, + }, + { + subjects: []string{ + "system:serviceaccounts:my-ns*", + }, + filters: []eventingv1.SubscriptionsAPIFilter{ + { + CESQL: "false", + }, + }, + }, }, want: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := SubjectContained(tt.sub, tt.allowedSubs); got != tt.want { - t.Errorf("SubjectContained(%q, '%v') = %v, want %v", tt.sub, tt.allowedSubs, got, tt.want) + if got := SubjectAndFiltersPass(context.Background(), tt.sub, tt.allowedSubsAndFilters, ptr.To(cetest.MinEvent()), zap.NewNop().Sugar()); got != tt.want { + t.Errorf("SubjectAndFiltersPass(%q, '%v') = %v, want %v", tt.sub, tt.allowedSubsAndFilters, got, tt.want) } }) } diff --git a/pkg/auth/token_verifier.go b/pkg/auth/token_verifier.go index b3be913be89..0d2c3888b9b 100644 --- a/pkg/auth/token_verifier.go +++ b/pkg/auth/token_verifier.go @@ -17,6 +17,7 @@ limitations under the License. package auth import ( + "bytes" "context" "encoding/json" "fmt" @@ -29,9 +30,12 @@ import ( eventpolicyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy" "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" + "github.com/cloudevents/sdk-go/v2/binding" + cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/coreos/go-oidc/v3/oidc" "go.uber.org/zap" "k8s.io/client-go/rest" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/injection" "knative.dev/pkg/logging" @@ -92,7 +96,7 @@ func (v *OIDCTokenVerifier) VerifyRequest(ctx context.Context, features feature. return fmt.Errorf("authentication of request could not be verified: %w", err) } - err = v.verifyAuthZ(features, idToken, resourceNamespace, policyRefs, resp) + err = v.verifyAuthZ(ctx, features, idToken, resourceNamespace, policyRefs, req, resp) if err != nil { return fmt.Errorf("authorization of request could not be verified: %w", err) } @@ -146,9 +150,24 @@ func (v *OIDCTokenVerifier) verifyAuthN(ctx context.Context, audience *string, r } // verifyAuthZ verifies if the given idToken is allowed by the resources eventPolicyStatus -func (v *OIDCTokenVerifier) verifyAuthZ(features feature.Flags, idToken *IDToken, resourceNamespace string, policyRefs []duckv1.AppliedEventPolicyRef, resp http.ResponseWriter) error { +func (v *OIDCTokenVerifier) verifyAuthZ(ctx context.Context, features feature.Flags, idToken *IDToken, resourceNamespace string, policyRefs []duckv1.AppliedEventPolicyRef, req *http.Request, resp http.ResponseWriter) error { if len(policyRefs) > 0 { - subjectsFromApplyingPolicies := []string{} + req, err := copyRequest(req) + if err != nil { + resp.WriteHeader(http.StatusInternalServerError) + return fmt.Errorf("failed to copy request body: %w", err) + } + + message := cehttp.NewMessageFromHttpRequest(req) + defer message.Finish(nil) + + event, err := binding.ToEvent(ctx, message) + if err != nil { + resp.WriteHeader(http.StatusInternalServerError) + return fmt.Errorf("failed to decode event from request: %w", err) + } + + subjectsWithFiltersFromApplyingPolicies := []subjectsWithFilters{} for _, p := range policyRefs { policy, err := v.eventPolicyLister.EventPolicies(resourceNamespace).Get(p.Name) if err != nil { @@ -156,12 +175,12 @@ func (v *OIDCTokenVerifier) verifyAuthZ(features feature.Flags, idToken *IDToken return fmt.Errorf("failed to get eventPolicy: %w", err) } - subjectsFromApplyingPolicies = append(subjectsFromApplyingPolicies, policy.Status.From...) + subjectsWithFiltersFromApplyingPolicies = append(subjectsWithFiltersFromApplyingPolicies, subjectsWithFilters{subjects: policy.Status.From, filters: policy.Spec.Filters}) } - if !SubjectContained(idToken.Subject, subjectsFromApplyingPolicies) { + if !SubjectAndFiltersPass(ctx, idToken.Subject, subjectsWithFiltersFromApplyingPolicies, event, v.logger) { resp.WriteHeader(http.StatusForbidden) - return fmt.Errorf("token is from subject %q, but only %q are part of applying event policies", idToken.Subject, subjectsFromApplyingPolicies) + return fmt.Errorf("token is from subject %q, but only %#v are part of applying event policies", idToken.Subject, subjectsWithFiltersFromApplyingPolicies) } return nil @@ -271,6 +290,35 @@ func (v *OIDCTokenVerifier) getKubernetesOIDCDiscovery() (*openIDMetadata, error return openIdConfig, nil } +// copyRequest makes a copy of the http request which can be consumed as needed, leaving the original request +// able to be consumed as well. +func copyRequest(req *http.Request) (*http.Request, error) { + // check if we actually need to copy the body, otherwise we can return the original request + if req.Body == nil || req.Body == http.NoBody { + return req, nil + } + + var buf bytes.Buffer + if _, err := buf.ReadFrom(req.Body); err != nil { + return nil, fmt.Errorf("failed to read request body while copying it: %w", err) + } + + if err := req.Body.Close(); err != nil { + return nil, fmt.Errorf("failed to close original request body ready while copying request: %w", err) + } + + // set the original request body to be readable again + req.Body = io.NopCloser(&buf) + + // return a new request with a readable body and same headers as the original + // we don't need to set any other fields as cloudevents only uses the headers + // and body to construct the Message/Event. + return &http.Request{ + Header: req.Header, + Body: io.NopCloser(bytes.NewReader(buf.Bytes())), + }, nil +} + type openIDMetadata struct { Issuer string `json:"issuer"` JWKSURI string `json:"jwks_uri"` @@ -278,3 +326,8 @@ type openIDMetadata struct { SubjectTypes []string `json:"subject_types_supported"` SigningAlgs []string `json:"id_token_signing_alg_values_supported"` } + +type subjectsWithFilters struct { + filters []eventingv1.SubscriptionsAPIFilter + subjects []string +} diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index c0b7b4c4617..78064a3e417 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -114,7 +114,7 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcT return } logger.Debug("Adding filter to filtersMap") - fm.Set(trigger, createSubscriptionsAPIFilters(logger, trigger)) + fm.Set(trigger, subscriptionsapi.CreateSubscriptionsAPIFilters(logger, trigger.Spec.Filters)) kncloudevents.AddOrUpdateAddressableHandler(clientConfig, duckv1.Addressable{ URL: trigger.Status.SubscriberURI, CACerts: trigger.Status.SubscriberCACerts, @@ -126,7 +126,7 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcT return } logger.Debug("Updating filter in filtersMap") - fm.Set(trigger, createSubscriptionsAPIFilters(logger, trigger)) + fm.Set(trigger, subscriptionsapi.CreateSubscriptionsAPIFilters(logger, trigger.Spec.Filters)) kncloudevents.AddOrUpdateAddressableHandler(clientConfig, duckv1.Addressable{ URL: trigger.Status.SubscriberURI, CACerts: trigger.Status.SubscriberCACerts, @@ -608,70 +608,7 @@ func (h *Handler) filterEvent(ctx context.Context, trigger *eventingv1.Trigger, } func applySubscriptionsAPIFilters(ctx context.Context, trigger *eventingv1.Trigger, event cloudevents.Event) eventfilter.FilterResult { - return createSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trigger).Filter(ctx, event) -} - -func createSubscriptionsAPIFilters(logger *zap.Logger, trigger *eventingv1.Trigger) eventfilter.Filter { - if len(trigger.Spec.Filters) == 0 { - logger.Debug("Found no filters for trigger", zap.Any("trigger.Spec", trigger.Spec)) - return subscriptionsapi.NewNoFilter() - } - return subscriptionsapi.NewAllFilter(MaterializeFiltersList(logger, trigger.Spec.Filters)...) -} - -func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.SubscriptionsAPIFilter) eventfilter.Filter { - var materializedFilter eventfilter.Filter - var err error - switch { - case len(filter.Exact) > 0: - // The webhook validates that this map has only a single key:value pair. - materializedFilter, err = subscriptionsapi.NewExactFilter(filter.Exact) - if err != nil { - logger.Debug("Invalid exact expression", zap.Any("filters", filter.Exact), zap.Error(err)) - return nil - } - case len(filter.Prefix) > 0: - // The webhook validates that this map has only a single key:value pair. - materializedFilter, err = subscriptionsapi.NewPrefixFilter(filter.Prefix) - if err != nil { - logger.Debug("Invalid prefix expression", zap.Any("filters", filter.Exact), zap.Error(err)) - return nil - } - case len(filter.Suffix) > 0: - // The webhook validates that this map has only a single key:value pair. - materializedFilter, err = subscriptionsapi.NewSuffixFilter(filter.Suffix) - if err != nil { - logger.Debug("Invalid suffix expression", zap.Any("filters", filter.Exact), zap.Error(err)) - return nil - } - case len(filter.All) > 0: - materializedFilter = subscriptionsapi.NewAllFilter(MaterializeFiltersList(logger, filter.All)...) - case len(filter.Any) > 0: - materializedFilter = subscriptionsapi.NewAnyFilter(MaterializeFiltersList(logger, filter.Any)...) - case filter.Not != nil: - materializedFilter = subscriptionsapi.NewNotFilter(materializeSubscriptionsAPIFilter(logger, *filter.Not)) - case filter.CESQL != "": - if materializedFilter, err = subscriptionsapi.NewCESQLFilter(filter.CESQL); err != nil { - // This is weird, CESQL expression should be validated when Trigger's are created. - logger.Debug("Found an Invalid CE SQL expression", zap.String("expression", filter.CESQL)) - return nil - } - } - return materializedFilter -} - -// MaterialzieFilterList allows any component that supports `SubscriptionsAPIFilter` to process them -func MaterializeFiltersList(logger *zap.Logger, filters []eventingv1.SubscriptionsAPIFilter) []eventfilter.Filter { - materializedFilters := make([]eventfilter.Filter, 0, len(filters)) - for _, f := range filters { - f := materializeSubscriptionsAPIFilter(logger, f) - if f == nil { - logger.Warn("Failed to parse filter. Skipping filter.", zap.Any("filter", f)) - continue - } - materializedFilters = append(materializedFilters, f) - } - return materializedFilters + return subscriptionsapi.CreateSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trigger.Spec.Filters).Filter(ctx, event) } func applyAttributesFilter(ctx context.Context, filter *eventingv1.TriggerFilter, event cloudevents.Event) eventfilter.FilterResult { diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index 5d67ddb5cdd..37aa14dacbf 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -665,7 +665,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) { trig.Status.SubscriberURI = url } triggerinformerfake.Get(ctx).Informer().GetStore().Add(trig) - filtersMap.Set(trig, createSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trig)) + filtersMap.Set(trig, subscriptionsapi.CreateSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trig.Spec.Filters)) // create needed triggers subscription object sub := &messagingv1.Subscription{ diff --git a/pkg/eventfilter/subscriptionsapi/create.go b/pkg/eventfilter/subscriptionsapi/create.go new file mode 100644 index 00000000000..3f56f6e7f0c --- /dev/null +++ b/pkg/eventfilter/subscriptionsapi/create.go @@ -0,0 +1,87 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package subscriptionsapi + +import ( + "go.uber.org/zap" + v1 "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/pkg/eventfilter" +) + +// MaterializeSubscriptionsAPIFilter materializes a SubscriptionsAPIFilter into a runnable Filter. +func MaterializeSubscriptionsAPIFilter(logger *zap.Logger, filter v1.SubscriptionsAPIFilter) eventfilter.Filter { + var materializedFilter eventfilter.Filter + var err error + switch { + case len(filter.Exact) > 0: + // The webhook validates that this map has only a single key:value pair. + materializedFilter, err = NewExactFilter(filter.Exact) + if err != nil { + logger.Debug("Invalid exact expression", zap.Any("filters", filter.Exact), zap.Error(err)) + return nil + } + case len(filter.Prefix) > 0: + // The webhook validates that this map has only a single key:value pair. + materializedFilter, err = NewPrefixFilter(filter.Prefix) + if err != nil { + logger.Debug("Invalid prefix expression", zap.Any("filters", filter.Exact), zap.Error(err)) + return nil + } + case len(filter.Suffix) > 0: + // The webhook validates that this map has only a single key:value pair. + materializedFilter, err = NewSuffixFilter(filter.Suffix) + if err != nil { + logger.Debug("Invalid suffix expression", zap.Any("filters", filter.Exact), zap.Error(err)) + return nil + } + case len(filter.All) > 0: + materializedFilter = NewAllFilter(MaterializeFiltersList(logger, filter.All)...) + case len(filter.Any) > 0: + materializedFilter = NewAnyFilter(MaterializeFiltersList(logger, filter.Any)...) + case filter.Not != nil: + materializedFilter = NewNotFilter(MaterializeSubscriptionsAPIFilter(logger, *filter.Not)) + case filter.CESQL != "": + if materializedFilter, err = NewCESQLFilter(filter.CESQL); err != nil { + // This is weird, CESQL expression should be validated when Trigger's are created. + logger.Debug("Found an Invalid CE SQL expression", zap.String("expression", filter.CESQL)) + return nil + } + } + return materializedFilter +} + +func CreateSubscriptionsAPIFilters(logger *zap.Logger, filters []v1.SubscriptionsAPIFilter) eventfilter.Filter { + if len(filters) == 0 { + logger.Debug("no filters provided") + return NewNoFilter() + } + return NewAllFilter(MaterializeFiltersList(logger, filters)...) +} + +// MaterialzieFilterList allows any component that supports `SubscriptionsAPIFilter` to process them +func MaterializeFiltersList(logger *zap.Logger, filters []v1.SubscriptionsAPIFilter) []eventfilter.Filter { + materializedFilters := make([]eventfilter.Filter, 0, len(filters)) + for _, f := range filters { + f := MaterializeSubscriptionsAPIFilter(logger, f) + if f == nil { + logger.Warn("Failed to parse filter. Skipping filter.", zap.Any("filter", f)) + continue + } + materializedFilters = append(materializedFilters, f) + } + return materializedFilters +} diff --git a/test/rekt/features/authz/addressable_authz_conformance.go b/test/rekt/features/authz/addressable_authz_conformance.go index 0d633804a23..9c9833967e7 100644 --- a/test/rekt/features/authz/addressable_authz_conformance.go +++ b/test/rekt/features/authz/addressable_authz_conformance.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/test/rekt/resources/eventpolicy" "knative.dev/eventing/test/rekt/resources/pingsource" "knative.dev/reconciler-test/pkg/environment" @@ -41,6 +42,7 @@ func AddressableAuthZConformance(gvr schema.GroupVersionResource, kind, name str Features: []*feature.Feature{ addressableAllowsAuthorizedRequest(gvr, kind, name), addressableRejectsUnauthorizedRequest(gvr, kind, name), + addressableRespectsEventPolicyFilters(gvr, kind, name), addressableBecomesUnreadyOnUnreadyEventPolicy(gvr, kind, name), }, } @@ -125,6 +127,73 @@ func addressableRejectsUnauthorizedRequest(gvr schema.GroupVersionResource, kind return f } +func addressableRespectsEventPolicyFilters(gvr schema.GroupVersionResource, kind, name string) *feature.Feature { + f := feature.NewFeatureNamed(fmt.Sprintf("%s only admits events that pass the event policy filter", kind)) + + f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled()) + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + eventPolicy := feature.MakeRandomK8sName("eventpolicy") + source1 := feature.MakeRandomK8sName("source") + sourceSubject1 := feature.MakeRandomK8sName("source-oidc-identity") + source2 := feature.MakeRandomK8sName("source") + sourceSubject2 := feature.MakeRandomK8sName("source-oidc-identity") + + event1 := test.FullEvent() + event1.SetType("valid.event.type") + event1.SetID("1") + event2 := test.FullEvent() + event2.SetType("invalid.event.type") + event2.SetID("2") + + // Install event policy + f.Setup("Install the EventPolicy", func(ctx context.Context, t feature.T) { + namespace := environment.FromContext(ctx).Namespace() + eventpolicy.Install( + eventPolicy, + eventpolicy.WithToRef( + gvr.GroupVersion().WithKind(kind), + name), + eventpolicy.WithFromSubject(fmt.Sprintf("system:serviceaccount:%s:%s", namespace, sourceSubject1)), + eventpolicy.WithFromSubject(fmt.Sprintf("system:serviceaccount:%s:%s", namespace, sourceSubject2)), + eventpolicy.WithFilters([]eventingv1.SubscriptionsAPIFilter{ + { + Prefix: map[string]string{ + "type": "valid", + }, + }, + }), + )(ctx, t) + }) + f.Setup(fmt.Sprintf("EventPolicy for %s %s is ready", kind, name), k8s.IsReady(gvr, name)) + + // Install source + f.Requirement("install source 1", eventshub.Install( + source1, + eventshub.StartSenderToResourceTLS(gvr, name, nil), + eventshub.InputEvent(event1), + eventshub.OIDCSubject(sourceSubject1), + )) + + f.Requirement("install source 2", eventshub.Install( + source2, + eventshub.StartSenderToResourceTLS(gvr, name, nil), + eventshub.InputEvent(event2), + eventshub.OIDCSubject(sourceSubject2), + )) + + f.Alpha(kind). + Must("valid event sent", eventassert.OnStore(source1).MatchSentEvent(test.HasId(event1.ID())).Exact(1)). + Must("get 202 on response", eventassert.OnStore(source1).Match(eventassert.MatchStatusCode(202)).AtLeast(1)) + + f.Alpha(kind). + Must("invalid event sent", eventassert.OnStore(source2).MatchSentEvent(test.HasId(event2.ID())).Exact(1)). + Must("get 403 on response", eventassert.OnStore(source2).Match(eventassert.MatchStatusCode(403)).AtLeast(1)) + + return f +} + func addressableBecomesUnreadyOnUnreadyEventPolicy(gvr schema.GroupVersionResource, kind, name string) *feature.Feature { f := feature.NewFeatureNamed(fmt.Sprintf("%s becomes NotReady when EventPolicy is NotReady", kind)) diff --git a/test/rekt/resources/eventpolicy/eventpolicy.go b/test/rekt/resources/eventpolicy/eventpolicy.go index 879eabfd7e5..6d44293bd13 100644 --- a/test/rekt/resources/eventpolicy/eventpolicy.go +++ b/test/rekt/resources/eventpolicy/eventpolicy.go @@ -19,6 +19,8 @@ package eventpolicy import ( "context" "embed" + "encoding/json" + "strings" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -26,10 +28,13 @@ import ( "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/k8s" "knative.dev/reconciler-test/pkg/manifest" + "sigs.k8s.io/yaml" + + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" ) //go:embed *.yaml -var yaml embed.FS +var yamlEmbed embed.FS func GVR() schema.GroupVersionResource { return schema.GroupVersionResource{Group: "eventing.knative.dev", Version: "v1alpha1", Resource: "eventpolicies"} @@ -44,7 +49,7 @@ func Install(name string, opts ...manifest.CfgFn) feature.StepFn { fn(cfg) } return func(ctx context.Context, t feature.T) { - if _, err := manifest.InstallYamlFS(ctx, yaml, cfg); err != nil { + if _, err := manifest.InstallYamlFS(ctx, yamlEmbed, cfg); err != nil { t.Fatal(err) } } @@ -133,6 +138,30 @@ func WithFromSubject(subject string) manifest.CfgFn { } } +func WithFilters(filters []eventingv1.SubscriptionsAPIFilter) manifest.CfgFn { + jsonBytes, err := json.Marshal(filters) + if err != nil { + panic(err) + } + + yamlBytes, err := yaml.JSONToYAML(jsonBytes) + if err != nil { + panic(err) + } + + filtersYaml := string(yamlBytes) + + lines := strings.Split(filtersYaml, "\n") + out := make([]string, 0, len(lines)) + for i := range lines { + out = append(out, " "+lines[i]) + } + + return func(m map[string]interface{}) { + m["filters"] = strings.Join(out, "\n") + } +} + // IsReady tests to see if an EventPolicy becomes ready within the time given. func IsReady(name string, timing ...time.Duration) feature.StepFn { return k8s.IsReady(GVR(), name, timing...) diff --git a/test/rekt/resources/eventpolicy/eventpolicy.yaml b/test/rekt/resources/eventpolicy/eventpolicy.yaml index 02a98422e67..84925531e12 100644 --- a/test/rekt/resources/eventpolicy/eventpolicy.yaml +++ b/test/rekt/resources/eventpolicy/eventpolicy.yaml @@ -67,3 +67,8 @@ spec: - sub: {{ $from.sub }} {{ end }} {{ end }} + + {{ if .filters }} + filters: +{{ .filters }} + {{ end }} diff --git a/test/rekt/resources/eventpolicy/eventpolicy_test.go b/test/rekt/resources/eventpolicy/eventpolicy_test.go index 0832330d040..258579f5c28 100644 --- a/test/rekt/resources/eventpolicy/eventpolicy_test.go +++ b/test/rekt/resources/eventpolicy/eventpolicy_test.go @@ -20,6 +20,7 @@ import ( "embed" "os" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/test/rekt/resources/broker" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -105,6 +106,11 @@ func Example_full() { "my-ns-2", ), eventpolicy.WithFromSubject("my-sub"), + eventpolicy.WithFilters([]eventingv1.SubscriptionsAPIFilter{ + { + CESQL: "type LIKE event.%.type", + }, + }), } for _, fn := range cfgFn { @@ -147,4 +153,6 @@ func Example_full() { // name: my-broker // namespace: my-ns-2 // - sub: my-sub + // filters: + // - cesql: type LIKE event.%.type }