diff --git a/docs/eventing-api.md b/docs/eventing-api.md index 5fc775d5da6..ef140a972cd 100644 --- a/docs/eventing-api.md +++ b/docs/eventing-api.md @@ -7826,453 +7826,6 @@ IntegrationSourceStatus -

AWSCommon -

-

-(Appears on:AWSDDBStreams, AWSS3, AWSSQS) -

-

-

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
FieldDescription
-region
- -string - -
-

Auth is the S3 authentication (accessKey/secretKey) configuration.

-
-profileCredentialsName
- -string - -
-

UseDefaultCredentials bool json:"useDefaultCredentials" default:"false" // Use default credentials provider -UseProfileCredentials bool json:"useProfileCredentials" default:"false" // Use profile credentials provider

-
-sessionToken
- -string - -
-
UseSessionCredentials  bool   `json:"useSessionCredentials" default:"false"` // Use session credentials
-
-
-uriEndpointOverride
- -string - -
-

Session token

-
-overrideEndpoint
- -bool - -
-

Override endpoint URI

-
-

AWSDDBStreams -

-

-

- - - - - - - - - - - - - - - - - - - - - - - - - -
FieldDescription
-AWSCommon
- - -AWSCommon - - -
-

-(Members of AWSCommon are embedded into this type.) -

-
-table
- -string - -
-

Embeds AWSCommon to inherit its fields in JSON

-
-streamIteratorType
- -string - -
-

The name of the DynamoDB table

-
-delay
- -int - -
-

Defines where in the DynamoDB stream to start getting records

-
-

AWSS3 -

-

-

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
FieldDescription
-AWSCommon
- - -AWSCommon - - -
-

-(Members of AWSCommon are embedded into this type.) -

-
-bucketNameOrArn
- -string - -
-

Embeds AWSCommon to inherit its fields in JSON

-
-deleteAfterRead
- -bool - -
-

S3 Bucket name or ARN

-
-moveAfterRead
- -bool - -
-

Auto-delete objects after reading

-
-destinationBucket
- -string - -
-

Move objects after reading

-
-destinationBucketPrefix
- -string - -
-

Destination bucket for moved objects

-
-destinationBucketSuffix
- -string - -
-

Prefix for moved objects

-
-autoCreateBucket
- -bool - -
-

Suffix for moved objects

-
-prefix
- -string - -
-

Auto-create S3 bucket

-
-ignoreBody
- -bool - -
-

S3 bucket prefix for search

-
-forcePathStyle
- -bool - -
-

Ignore object body

-
-delay
- -int - -
-

Force path style for bucket access

-
-maxMessagesPerPoll
- -int - -
-

Delay between polls in milliseconds

-
-

AWSSQS -

-

-

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
FieldDescription
-AWSCommon
- - -AWSCommon - - -
-

-(Members of AWSCommon are embedded into this type.) -

-
-queueNameOrArn
- -string - -
-

Embeds AWSCommon to inherit its fields in JSON

-
-deleteAfterRead
- -bool - -
-

SQS Queue name or ARN

-
-autoCreateQueue
- -bool - -
-

Auto-delete messages after reading

-
-amazonAWSHost
- -string - -
-

Auto-create SQS queue

-
-protocol
- -string - -
-

AWS host

-
-queueURL
- -string - -
-

Communication protocol (http/https)

-
-greedy
- -bool - -
-

Full SQS queue URL

-
-delay
- -int - -
-

Greedy scheduler

-
-maxMessagesPerPoll
- -int - -
-

Delay between polls in milliseconds

-
-waitTimeSeconds
- -int - -
-

Max messages to return (1-10)

-
-visibilityTimeout
- -int - -
-

Wait time for messages

-

Aws

diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_conversion.go b/pkg/apis/sinks/v1alpha1/integration_sink_conversion.go new file mode 100644 index 00000000000..8f41d2da41c --- /dev/null +++ b/pkg/apis/sinks/v1alpha1/integration_sink_conversion.go @@ -0,0 +1,36 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "fmt" + + "knative.dev/pkg/apis" +) + +// ConvertTo implements apis.Convertible +// Converts source from v1alpha1.IntegrationSink into a higher version. +func (sink *IntegrationSink) ConvertTo(ctx context.Context, obj apis.Convertible) error { + return fmt.Errorf("v1alpha1 is the highest known version, got: %T", sink) +} + +// ConvertFrom implements apis.Convertible +// Converts source from a higher version into v1beta2.IntegrationSink +func (sink *IntegrationSink) ConvertFrom(ctx context.Context, obj apis.Convertible) error { + return fmt.Errorf("v1alpha1 is the highest known version, got: %T", sink) +} diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_conversion_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_conversion_test.go new file mode 100644 index 00000000000..015e6bee3c3 --- /dev/null +++ b/pkg/apis/sinks/v1alpha1/integration_sink_conversion_test.go @@ -0,0 +1,34 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "testing" +) + +func TestIntegrationSinkConversionBadType(t *testing.T) { + good, bad := &IntegrationSink{}, &testObject{} + + if err := good.ConvertTo(context.Background(), bad); err == nil { + t.Errorf("ConvertTo() = %#v, wanted error", bad) + } + + if err := good.ConvertFrom(context.Background(), bad); err == nil { + t.Errorf("ConvertFrom() = %#v, wanted error", good) + } +} diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_defaults_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_defaults_test.go new file mode 100644 index 00000000000..41bdc3137ce --- /dev/null +++ b/pkg/apis/sinks/v1alpha1/integration_sink_defaults_test.go @@ -0,0 +1,38 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "github.com/google/go-cmp/cmp" + "testing" +) + +func TestIntegrationSinkSetDefaults(t *testing.T) { + testCases := map[string]struct { + initial IntegrationSink + expected IntegrationSink + }{} + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + tc.initial.SetDefaults(context.TODO()) + if diff := cmp.Diff(tc.expected, tc.initial); diff != "" { + t.Fatal("Unexpected defaults (-want, +got):", diff) + } + }) + } +} diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle_test.go new file mode 100644 index 00000000000..4cb2902666d --- /dev/null +++ b/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle_test.go @@ -0,0 +1,115 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + "testing" +) + +func TestIntegrationSinkGetConditionSet(t *testing.T) { + r := &IntegrationSink{} + + if got, want := r.GetConditionSet().GetTopLevelConditionType(), apis.ConditionReady; got != want { + t.Errorf("GetTopLevelCondition=%v, want=%v", got, want) + } +} + +func TestIntegrationSinkInitializeConditions(t *testing.T) { + tests := []struct { + name string + js *IntegrationSinkStatus + want *IntegrationSinkStatus + }{{ + name: "empty", + js: &IntegrationSinkStatus{}, + want: &IntegrationSinkStatus{ + Status: duckv1.Status{ + Conditions: []apis.Condition{{ + Type: IntegrationSinkConditionAddressable, + Status: corev1.ConditionUnknown, + }, { + Type: IntegrationSinkConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, + }, { + Type: IntegrationSinkConditionReady, + Status: corev1.ConditionUnknown, + }}, + }, + }, + }, { + name: "one false", + js: &IntegrationSinkStatus{ + Status: duckv1.Status{ + Conditions: []apis.Condition{{ + Type: IntegrationSinkConditionAddressable, + Status: corev1.ConditionFalse, + }}, + }, + }, + want: &IntegrationSinkStatus{ + Status: duckv1.Status{ + Conditions: []apis.Condition{{ + Type: IntegrationSinkConditionAddressable, + Status: corev1.ConditionFalse, + }, { + Type: IntegrationSinkConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, + }, { + Type: IntegrationSinkConditionReady, + Status: corev1.ConditionUnknown, + }}, + }, + }, + }, { + name: "one true", + js: &IntegrationSinkStatus{ + Status: duckv1.Status{ + Conditions: []apis.Condition{{ + Type: IntegrationSinkConditionAddressable, + Status: corev1.ConditionTrue, + }}, + }, + }, + want: &IntegrationSinkStatus{ + Status: duckv1.Status{ + Conditions: []apis.Condition{{ + Type: IntegrationSinkConditionAddressable, + Status: corev1.ConditionTrue, + }, { + Type: IntegrationSinkConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, + }, { + Type: IntegrationSinkConditionReady, + Status: corev1.ConditionUnknown, + }}, + }, + }, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + test.js.InitializeConditions() + if diff := cmp.Diff(test.want, test.js, ignoreAllButTypeAndStatus); diff != "" { + t.Error("unexpected conditions (-want, +got) =", diff) + } + }) + } +} diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_types.go b/pkg/apis/sinks/v1alpha1/integration_sink_types.go index c86acd1e7b0..95d0366cf9c 100644 --- a/pkg/apis/sinks/v1alpha1/integration_sink_types.go +++ b/pkg/apis/sinks/v1alpha1/integration_sink_types.go @@ -49,7 +49,7 @@ var ( _ apis.Defaultable = (*IntegrationSink)(nil) _ apis.HasSpec = (*IntegrationSink)(nil) _ duckv1.KRShaped = (*IntegrationSink)(nil) - // _ apis.Convertible = (*IntegrationSink)(nil) + _ apis.Convertible = (*JobSink)(nil) ) type IntegrationSinkSpec struct { diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_types_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_types_test.go new file mode 100644 index 00000000000..d148aa28f90 --- /dev/null +++ b/pkg/apis/sinks/v1alpha1/integration_sink_types_test.go @@ -0,0 +1,110 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "knative.dev/eventing/pkg/apis/common" + "testing" +) + +func TestIntegrationSink_GetStatus(t *testing.T) { + r := &IntegrationSink{ + Status: IntegrationSinkStatus{}, + } + if got, want := r.GetStatus(), &r.Status.Status; got != want { + t.Errorf("GetStatus=%v, want=%v", got, want) + } +} + +func TestIntegrationSink_GetGroupVersionKind(t *testing.T) { + src := &IntegrationSink{} + gvk := src.GetGroupVersionKind() + + if gvk.Kind != "IntegrationSink" { + t.Errorf("Should be IntegrationSink.") + } +} + +func TestLog(t *testing.T) { + log := Log{ + Level: "info", + ShowHeaders: true, + } + + if log.Level != "info" { + t.Errorf("Log.Level = %v, want 'info'", log.Level) + } + if log.ShowHeaders != true { + t.Errorf("Log.ShowHeaders = %v, want 'false'", log.ShowHeaders) + } +} + +func TestAWS(t *testing.T) { + s3 := common.AWSS3{ + AWSCommon: common.AWSCommon{ + Region: "eu-north-1", + }, + BucketNameOrArn: "example-bucket", + } + + if s3.Region != "eu-north-1" { + t.Errorf("AWSS3.Region = %v, want 'eu-north-1'", s3.Region) + } + + sqs := common.AWSSQS{ + AWSCommon: common.AWSCommon{ + Region: "eu-north-1", + }, + QueueNameOrArn: "example-queue", + } + + if sqs.Region != "eu-north-1" { + t.Errorf("AWSSQS.Region = %v, want 'eu-north-1'", sqs.Region) + } + + ddbStreams := common.AWSDDBStreams{ + AWSCommon: common.AWSCommon{ + Region: "eu-north-1", + }, + Table: "example-table", + } + + if ddbStreams.Region != "eu-north-1" { + t.Errorf("AWSDDBStreams.Region = %v, want 'eu-north-1'", ddbStreams.Region) + } +} + +// TestAuthFieldAccess tests the HasAuth method and field access in Auth struct +func TestAuthFieldAccess(t *testing.T) { + auth := common.Auth{ + Secret: &common.Secret{ + Ref: &common.SecretReference{ + Name: "aws-secret", + }, + }, + AccessKey: "access-key", + SecretKey: "secret-key", + } + + if !auth.HasAuth() { + t.Error("Auth.HasAuth() = false, want true") + } + + if auth.Secret.Ref.Name != "aws-secret" { + t.Errorf("Auth.Secret.Ref.Name = %v, want 'aws-secret'", auth.Secret.Ref.Name) + } +} diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_validation.go b/pkg/apis/sinks/v1alpha1/integration_sink_validation.go index bb1ed9c9269..40d02310540 100644 --- a/pkg/apis/sinks/v1alpha1/integration_sink_validation.go +++ b/pkg/apis/sinks/v1alpha1/integration_sink_validation.go @@ -26,8 +26,59 @@ func (sink *IntegrationSink) Validate(ctx context.Context) *apis.FieldError { return sink.Spec.Validate(ctx).ViaField("spec") } -func (sink *IntegrationSinkSpec) Validate(ctx context.Context) *apis.FieldError { +func (spec *IntegrationSinkSpec) Validate(ctx context.Context) *apis.FieldError { var errs *apis.FieldError + // Count how many fields are set to ensure mutual exclusivity + sinkSetCount := 0 + if spec.Log != nil { + sinkSetCount++ + } + if spec.Aws != nil { + if spec.Aws.S3 != nil { + sinkSetCount++ + } + if spec.Aws.SQS != nil { + sinkSetCount++ + } + } + + // Validate that only one sink field is set + if sinkSetCount > 1 { + errs = errs.Also(apis.ErrGeneric("only one sink type can be set", "spec")) + } else if sinkSetCount == 0 { + errs = errs.Also(apis.ErrGeneric("at least one sink type must be specified", "spec")) + } + + // Only perform AWS-specific validation if exactly one AWS sink is configured + if sinkSetCount == 1 && spec.Aws != nil { + if spec.Aws.S3 != nil || spec.Aws.SQS != nil { + // Check that AWS Auth is properly configured + if !spec.Aws.Auth.HasAuth() { + errs = errs.Also(apis.ErrMissingField("aws.auth.secret.ref.name")) + } + } + + // Additional validation for AWS S3 required fields + if spec.Aws.S3 != nil { + if spec.Aws.S3.BucketNameOrArn == "" { + errs = errs.Also(apis.ErrMissingField("aws.s3.bucketNameOrArn")) + } + if spec.Aws.S3.Region == "" { + errs = errs.Also(apis.ErrMissingField("aws.s3.region")) + } + } + + // Additional validation for AWS SQS required fields + if spec.Aws.SQS != nil { + if spec.Aws.SQS.QueueNameOrArn == "" { + errs = errs.Also(apis.ErrMissingField("aws.sqs.queueNameOrArn")) + } + if spec.Aws.SQS.Region == "" { + errs = errs.Also(apis.ErrMissingField("aws.sqs.region")) + } + } + } + return errs } diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go new file mode 100644 index 00000000000..d984eadd4e6 --- /dev/null +++ b/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go @@ -0,0 +1,198 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "knative.dev/eventing/pkg/apis/common" + "testing" + + "github.com/google/go-cmp/cmp" + "knative.dev/pkg/apis" +) + +func TestIntegrationSinkSpecValidation(t *testing.T) { + tests := []struct { + name string + spec IntegrationSinkSpec + want *apis.FieldError + }{ + { + name: "valid log sink", + spec: IntegrationSinkSpec{ + Log: &Log{ + Level: "info", + ShowHeaders: true, + }, + }, + want: nil, + }, + { + name: "valid AWS S3 sink with auth and region", + spec: IntegrationSinkSpec{ + Aws: &Aws{ + S3: &common.AWSS3{ + AWSCommon: common.AWSCommon{ + Region: "us-east-1", + }, + BucketNameOrArn: "example-bucket", + }, + Auth: &common.Auth{ + Secret: &common.Secret{ + Ref: &common.SecretReference{ + Name: "aws-secret", + }, + }, + }, + }, + }, + want: nil, + }, + { + name: "valid AWS SQS sink with auth and region", + spec: IntegrationSinkSpec{ + Aws: &Aws{ + SQS: &common.AWSSQS{ + AWSCommon: common.AWSCommon{ + Region: "us-east-1", + }, + QueueNameOrArn: "example-queue", + }, + Auth: &common.Auth{ + Secret: &common.Secret{ + Ref: &common.SecretReference{ + Name: "aws-secret", + }, + }, + }, + }, + }, + want: nil, + }, + { + name: "multiple sinks set (invalid)", + spec: IntegrationSinkSpec{ + Log: &Log{ + Level: "info", + ShowHeaders: true, + }, + Aws: &Aws{ + S3: &common.AWSS3{ + AWSCommon: common.AWSCommon{ + Region: "us-east-1", + }, + BucketNameOrArn: "example-bucket", + }, + }, + }, + want: apis.ErrGeneric("only one sink type can be set", "spec"), + }, + { + name: "multiple AWS sinks set (invalid)", + spec: IntegrationSinkSpec{ + Aws: &Aws{ + S3: &common.AWSS3{ + AWSCommon: common.AWSCommon{ + Region: "us-east-1", + }, + BucketNameOrArn: "example-bucket", + }, + SQS: &common.AWSSQS{ + AWSCommon: common.AWSCommon{ + Region: "us-east-1", + }, + QueueNameOrArn: "example-queue", + }, + Auth: &common.Auth{ + Secret: &common.Secret{ + Ref: &common.SecretReference{ + Name: "aws-secret", + }, + }, + }, + }, + }, + want: apis.ErrGeneric("only one sink type can be set", "spec"), + }, + { + name: "AWS SQS sink without QueueNameOrArn (invalid)", + spec: IntegrationSinkSpec{ + Aws: &Aws{ + SQS: &common.AWSSQS{ + AWSCommon: common.AWSCommon{ + Region: "us-east-1", + }, + }, + Auth: &common.Auth{ + Secret: &common.Secret{ + Ref: &common.SecretReference{ + Name: "aws-secret", + }, + }, + }, + }, + }, + want: apis.ErrMissingField("aws.sqs.queueNameOrArn"), + }, + { + name: "no sink type specified (invalid)", + spec: IntegrationSinkSpec{}, + want: apis.ErrGeneric("at least one sink type must be specified", "spec"), + }, + { + name: "AWS sink without auth (invalid)", + spec: IntegrationSinkSpec{ + Aws: &Aws{ + S3: &common.AWSS3{ + AWSCommon: common.AWSCommon{ + Region: "us-east-1", + }, + BucketNameOrArn: "example-bucket", + }, + }, + }, + want: apis.ErrMissingField("aws.auth.secret.ref.name"), + }, + { + name: "AWS S3 sink without region (invalid)", + spec: IntegrationSinkSpec{ + Aws: &Aws{ + S3: &common.AWSS3{ + BucketNameOrArn: "example-bucket", + }, + Auth: &common.Auth{ + Secret: &common.Secret{ + Ref: &common.SecretReference{ + Name: "aws-secret", + }, + }, + }, + }, + }, + want: apis.ErrMissingField("aws.s3.region"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.spec.Validate(context.TODO()) + if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { + t.Errorf("IntegrationSinkSpec.Validate (-want, +got) = %v", diff) + } + }) + } +}