Skip to content

Commit

Permalink
Better shovel config validation
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunyiLyu committed May 24, 2023
1 parent a1a5f32 commit 4907873
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 16 deletions.
8 changes: 5 additions & 3 deletions api/v1beta1/shovel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type ShovelSpec struct {
DestinationAddress string `json:"destAddress,omitempty"`
DestinationExchange string `json:"destExchange,omitempty"`
DestinationExchangeKey string `json:"destExchangeKey,omitempty"`
DestinationProtocol string `json:"destProtocol,omitempty"`
// +kubebuilder:validation:Enum=amqp091;amqp10
DestinationProtocol string `json:"destProtocol,omitempty"`
// +kubebuilder:validation:Type=object
// +kubebuilder:pruning:PreserveUnknownFields
DestinationApplicationProperties *runtime.RawExtension `json:"destApplicationProperties,omitempty"`
Expand All @@ -56,8 +57,9 @@ type ShovelSpec struct {
SourceExchange string `json:"srcExchange,omitempty"`
SourceExchangeKey string `json:"srcExchangeKey,omitempty"`
SourcePrefetchCount int `json:"srcPrefetchCount,omitempty"`
SourceProtocol string `json:"srcProtocol,omitempty"`
SourceQueue string `json:"srcQueue,omitempty"`
// +kubebuilder:validation:Enum=amqp091;amqp10
SourceProtocol string `json:"srcProtocol,omitempty"`
SourceQueue string `json:"srcQueue,omitempty"`
// +kubebuilder:validation:Type=object
// +kubebuilder:pruning:PreserveUnknownFields
SourceConsumerArgs *runtime.RawExtension `json:"srcConsumerArgs,omitempty"`
Expand Down
48 changes: 48 additions & 0 deletions api/v1beta1/shovel_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ var _ = Describe("Shovel spec", func() {
DestinationAddTimestampHeader: true,
DestinationAddress: "myQueue",
DestinationApplicationProperties: &runtime.RawExtension{Raw: []byte(`{"key": "a-property"}`)},
DestinationMessageAnnotations: &runtime.RawExtension{Raw: []byte(`{"key": "a-property"}`)},
DestinationExchange: "an-exchange",
DestinationExchangeKey: "a-key",
DestinationProperties: &runtime.RawExtension{Raw: []byte(`{"key": "a-property"}`)},
Expand All @@ -80,6 +81,7 @@ var _ = Describe("Shovel spec", func() {
SourcePrefetchCount: 10,
SourceProtocol: "amqp091",
SourceQueue: "a-queue",
SourceConsumerArgs: &runtime.RawExtension{Raw: []byte(`{"arg": "arg-value"}`)},
}}
Expect(k8sClient.Create(ctx, &shovel)).To(Succeed())
fetched := &Shovel{}
Expand All @@ -103,6 +105,7 @@ var _ = Describe("Shovel spec", func() {
Expect(fetched.Spec.DestinationExchange).To(Equal("an-exchange"))
Expect(fetched.Spec.DestinationExchangeKey).To(Equal("a-key"))
Expect(fetched.Spec.DestinationProperties.Raw).To(Equal([]byte(`{"key":"a-property"}`)))
Expect(fetched.Spec.DestinationMessageAnnotations.Raw).To(Equal([]byte(`{"key":"a-property"}`)))
Expect(fetched.Spec.DestinationQueue).To(Equal("a-queue"))
Expect(fetched.Spec.PrefetchCount).To(Equal(10))
Expect(fetched.Spec.ReconnectDelay).To(Equal(10))
Expand All @@ -114,6 +117,7 @@ var _ = Describe("Shovel spec", func() {
Expect(fetched.Spec.SourcePrefetchCount).To(Equal(10))
Expect(fetched.Spec.SourceProtocol).To(Equal("amqp091"))
Expect(fetched.Spec.SourceQueue).To(Equal("a-queue"))
Expect(fetched.Spec.SourceConsumerArgs.Raw).To(Equal([]byte(`{"arg":"arg-value"}`)))
})

When("creating a shovel with an invalid 'AckMode' value", func() {
Expand All @@ -137,4 +141,48 @@ var _ = Describe("Shovel spec", func() {
Expect(k8sClient.Create(ctx, &shovel)).To(MatchError(`Shovel.rabbitmq.com "an-invalid-ackmode" is invalid: spec.ackMode: Unsupported value: "an-invalid-ackmode": supported values: "on-confirm", "on-publish", "no-ack"`))
})
})

When("creating a shovel with unsupported protocol", func() {
It("fails with validation errors", func() {
shovel := Shovel{
ObjectMeta: metav1.ObjectMeta{
Name: "an-invalid-destprotocol",
Namespace: namespace,
},
Spec: ShovelSpec{
Name: "an-invalid-destprotocol",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
UriSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
SourceProtocol: "amqp091",
DestinationProtocol: "stomp",
}}
Expect(k8sClient.Create(ctx, &shovel)).To(HaveOccurred())
Expect(k8sClient.Create(ctx, &shovel)).To(MatchError(`Shovel.rabbitmq.com "an-invalid-destprotocol" is invalid: spec.destProtocol: Unsupported value: "stomp": supported values: "amqp091", "amqp10"`))
})

It("fails with validation errors", func() {
shovel := Shovel{
ObjectMeta: metav1.ObjectMeta{
Name: "an-invalid-srcprotocol",
Namespace: namespace,
},
Spec: ShovelSpec{
Name: "an-invalid-srcprotocol",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
UriSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
SourceProtocol: "mqtt",
DestinationProtocol: "amqp10",
}}
Expect(k8sClient.Create(ctx, &shovel)).To(HaveOccurred())
Expect(k8sClient.Create(ctx, &shovel)).To(MatchError(`Shovel.rabbitmq.com "an-invalid-srcprotocol" is invalid: spec.srcProtocol: Unsupported value: "mqtt": supported values: "amqp091", "amqp10"`))
})
})
})
22 changes: 22 additions & 0 deletions api/v1beta1/shovel_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ var _ webhook.Validator = &Shovel{}
// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
// either rabbitmqClusterReference.name or rabbitmqClusterReference.connectionSecret must be provided but not both
func (s *Shovel) ValidateCreate() error {
if err := s.amqp10Validate(); err != nil {
return err
}
return s.Spec.RabbitmqClusterReference.ValidateOnCreate(s.GroupResource(), s.Name)
}

Expand All @@ -32,6 +35,10 @@ func (s *Shovel) ValidateUpdate(old runtime.Object) error {
return apierrors.NewBadRequest(fmt.Sprintf("expected a shovel but got a %T", oldShovel))
}

if err := s.amqp10Validate(); err != nil {
return err
}

detailMsg := "updates on name, vhost and rabbitmqClusterReference are all forbidden"
if s.Spec.Name != oldShovel.Spec.Name {
return apierrors.NewForbidden(s.GroupResource(), s.Name,
Expand All @@ -53,3 +60,18 @@ func (s *Shovel) ValidateUpdate(old runtime.Object) error {
func (s *Shovel) ValidateDelete() error {
return nil
}

func (s *Shovel) amqp10Validate() error {
var errorList field.ErrorList
if s.Spec.SourceProtocol == "amqp10" && s.Spec.SourceAddress == "" {
errorList = append(errorList, field.Required(field.NewPath("spec", "srcAddress"),
"must specify spec.srcAddress when spec.srcProtocol is amqp10"))
return apierrors.NewInvalid(GroupVersion.WithKind("Shovel").GroupKind(), s.Name, errorList)
}
if s.Spec.DestinationProtocol == "amqp10" && s.Spec.DestinationAddress == "" {
errorList = append(errorList, field.Required(field.NewPath("spec", "destAddress"),
"must specify spec.destAddress when spec.destProtocol is amqp10"))
return apierrors.NewInvalid(GroupVersion.WithKind("Shovel").GroupKind(), s.Name, errorList)
}
return nil
}
29 changes: 29 additions & 0 deletions api/v1beta1/shovel_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,20 @@ var _ = Describe("shovel webhook", func() {
notAllowed.Spec.RabbitmqClusterReference.ConnectionSecret = nil
Expect(apierrors.IsForbidden(notAllowed.ValidateCreate())).To(BeTrue())
})

It("spec.srcAddress must be set if spec.srcProtocol is amqp10", func() {
notValid := shovel.DeepCopy()
notValid.Spec.SourceProtocol = "amqp10"
notValid.Spec.SourceAddress = ""
Expect(apierrors.IsInvalid(notValid.ValidateCreate())).To(BeTrue())
})

It("spec.destAddress must be set if spec.destProtocol is amqp10", func() {
notValid := shovel.DeepCopy()
notValid.Spec.DestinationProtocol = "amqp10"
notValid.Spec.DestinationAddress = ""
Expect(apierrors.IsInvalid(notValid.ValidateCreate())).To(BeTrue())
})
})

Context("ValidateUpdate", func() {
Expand All @@ -86,6 +100,20 @@ var _ = Describe("shovel webhook", func() {
Expect(apierrors.IsForbidden(newShovel.ValidateUpdate(&shovel))).To(BeTrue())
})

It("spec.srcAddress must be set if spec.srcProtocol is amqp10", func() {
newShovel := shovel.DeepCopy()
newShovel.Spec.SourceProtocol = "amqp10"
newShovel.Spec.SourceAddress = ""
Expect(apierrors.IsInvalid(newShovel.ValidateUpdate(&shovel))).To(BeTrue())
})

It("spec.destAddress must be set if spec.destProtocol is amqp10", func() {
newShovel := shovel.DeepCopy()
newShovel.Spec.DestinationProtocol = "amqp10"
newShovel.Spec.DestinationAddress = ""
Expect(apierrors.IsInvalid(newShovel.ValidateUpdate(&shovel))).To(BeTrue())
})

It("does not allow updates on rabbitmqClusterReference.connectionSecret", func() {
connectionScr := Shovel{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -180,6 +208,7 @@ var _ = Describe("shovel webhook", func() {
newShovel.Spec.DestinationProperties = &runtime.RawExtension{Raw: []byte(`{"key": "new"}`)}
Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed())
})

It("allows updates on DestinationProtocol", func() {
newShovel := shovel.DeepCopy()
newShovel.Spec.DestinationProtocol = "new"
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/rabbitmq.com_shovels.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ spec:
type: object
x-kubernetes-preserve-unknown-fields: true
destProtocol:
enum:
- amqp091
- amqp10
type: string
destPublishProperties:
type: object
Expand Down Expand Up @@ -120,6 +123,9 @@ spec:
srcPrefetchCount:
type: integer
srcProtocol:
enum:
- amqp091
- amqp10
type: string
srcQueue:
type: string
Expand Down
10 changes: 5 additions & 5 deletions internal/shovel_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,31 @@ func GenerateShovelDefinition(s *topology.Shovel, srcUri, destUri string) (*rabb
srcConArgs := make(map[string]interface{})
if s.Spec.SourceConsumerArgs != nil {
if err := json.Unmarshal(s.Spec.SourceConsumerArgs.Raw, &srcConArgs); err != nil {
return nil, fmt.Errorf("failed to unmarshall: %v", err)
return nil, fmt.Errorf("failed to unmarshall source consumer args: %v", err)
}
}
appProperties := make(map[string]interface{})
if s.Spec.DestinationApplicationProperties != nil {
if err := json.Unmarshal(s.Spec.DestinationApplicationProperties.Raw, &appProperties); err != nil {
return nil, fmt.Errorf("failed to unmarshall: %v", err)
return nil, fmt.Errorf("failed to unmarshall destination application properties: %v", err)
}
}
destProperties := make(map[string]interface{})
if s.Spec.DestinationProperties != nil {
if err := json.Unmarshal(s.Spec.DestinationProperties.Raw, &destProperties); err != nil {
return nil, fmt.Errorf("failed to unmarshall: %v", err)
return nil, fmt.Errorf("failed to unmarshall destination properties: %v", err)
}
}
destPubProperties := make(map[string]interface{})
if s.Spec.DestinationPublishProperties != nil {
if err := json.Unmarshal(s.Spec.DestinationPublishProperties.Raw, &destPubProperties); err != nil {
return nil, fmt.Errorf("failed to unmarshall: %v", err)
return nil, fmt.Errorf("failed to unmarshall destination publish properties: %v", err)
}
}
destMsgAnnotations := make(map[string]interface{})
if s.Spec.DestinationMessageAnnotations != nil {
if err := json.Unmarshal(s.Spec.DestinationMessageAnnotations.Raw, &destMsgAnnotations); err != nil {
return nil, fmt.Errorf("failed to unmarshall: %v", err)
return nil, fmt.Errorf("failed to unmarshall destination message annotations: %v", err)
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/shovel_definition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ var _ = Describe("GenerateShovelDefinition", func() {
shovel.Spec.DestinationPublishProperties = &runtime.RawExtension{Raw: []byte(`{"delivery_mode": 1}`)}
definition, err := GenerateShovelDefinition(shovel, "", "")
Expect(err).NotTo(HaveOccurred())
// Unmarshall stores float64 for JSON numbers
Expect(definition.DestinationPublishProperties).To(HaveKeyWithValue("delivery_mode", float64(1)))
})

Expand Down
16 changes: 8 additions & 8 deletions system_tests/shovel_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ var _ = Describe("Shovel", func() {
Namespace: namespace,
},
Spec: topology.ShovelSpec{
UriSecret: &corev1.LocalObjectReference{Name: shovelSecret.Name},
UriSecret: &corev1.LocalObjectReference{Name: shovelSecret.Name},
SourceDeleteAfter: "never",
AckMode: "no-ack",
AckMode: "no-ack",
RabbitmqClusterReference: topology.RabbitmqClusterReference{
Name: rmq.Name,
},
Expand All @@ -66,7 +66,7 @@ var _ = Describe("Shovel", func() {
shovel.Spec.DestinationPublishProperties = &runtime.RawExtension{Raw: []byte(`{"delivery_mode": 2}`)}

By("declaring shovel successfully")
shovelInfo := declareShovelandAssertCommonProperties(ctx, shovel)
shovelInfo := declareAssertShovelCommonProperties(ctx, shovel)

Expect(shovelInfo.Definition.DestinationQueue).To(Equal(shovel.Spec.DestinationQueue))
Expect(shovelInfo.Definition.SourceQueue).To(Equal(shovel.Spec.SourceQueue))
Expand Down Expand Up @@ -112,7 +112,7 @@ var _ = Describe("Shovel", func() {

By("deleting shovel configuration on deletion")
Expect(k8sClient.Delete(ctx, shovel)).To(Succeed())
assertShovelConfigDeleted(shovel)
assertShovelDeleted(shovel)
})

It("works with a shovel using amqp10 protocol", func() {
Expand All @@ -129,7 +129,7 @@ var _ = Describe("Shovel", func() {
shovel.Spec.DestinationAddTimestampHeader = true

By("declaring shovel successfully")
shovelInfo := declareShovelandAssertCommonProperties(ctx, shovel)
shovelInfo := declareAssertShovelCommonProperties(ctx, shovel)

Expect(shovelInfo.Definition.SourceProtocol).To(Equal("amqp10"))
Expect(shovelInfo.Definition.DestinationProtocol).To(Equal("amqp10"))
Expand All @@ -143,11 +143,11 @@ var _ = Describe("Shovel", func() {

By("deleting shovel configuration on deletion")
Expect(k8sClient.Delete(ctx, shovel)).To(Succeed())
assertShovelConfigDeleted(shovel)
assertShovelDeleted(shovel)
})
})

func declareShovelandAssertCommonProperties(ctx context.Context, shovel *topology.Shovel) *rabbithole.ShovelInfo {
func declareAssertShovelCommonProperties(ctx context.Context, shovel *topology.Shovel) *rabbithole.ShovelInfo {
Expect(k8sClient.Create(ctx, shovel, &client.CreateOptions{})).To(Succeed())
var shovelInfo *rabbithole.ShovelInfo
Eventually(func() error {
Expand All @@ -169,7 +169,7 @@ func declareShovelandAssertCommonProperties(ctx context.Context, shovel *topolog
return shovelInfo
}

func assertShovelConfigDeleted(shovel *topology.Shovel) {
func assertShovelDeleted(shovel *topology.Shovel) {
var err error
Eventually(func() error {
_, err = rabbitClient.GetShovel("/", shovel.Spec.Name)
Expand Down

0 comments on commit 4907873

Please sign in to comment.