Skip to content

Commit

Permalink
Add default queue type config to vhost crd
Browse files Browse the repository at this point in the history
- can be set to quorum, classic or stream
this property can only be set for RMQ 3.11.12
or above. Version previous to 3.11.12 will fail
to reconcile if it's set
  • Loading branch information
ChunyiLyu committed May 25, 2023
1 parent 655f8c4 commit e620453
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 8 deletions.
4 changes: 4 additions & 0 deletions api/v1beta1/vhost_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type VhostSpec struct {
Name string `json:"name"`
Tracing bool `json:"tracing,omitempty"`
Tags []string `json:"tags,omitempty"`
// Default queue type for this vhost; can be set to quorum, classic or stream.
// Supported in RabbitMQ 3.11.12 or above.
// +kubebuilder:validation:Enum=quorum;classic;stream
DefaultQueueType string `json:"defaultQueueType,omitempty"`
// Reference to the RabbitmqCluster that the vhost will be created in.
// Required property.
// +kubebuilder:validation:Required
Expand Down
35 changes: 35 additions & 0 deletions api/v1beta1/vhost_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,39 @@ var _ = Describe("Vhost", func() {
Name: "random-cluster",
}))
})

Context("Default queue types", func() {
var qTypeVhost = &Vhost{
ObjectMeta: metav1.ObjectMeta{
Name: "some-vhost",
Namespace: namespace,
},
Spec: VhostSpec{
Name: "some-vhost",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "random-cluster",
},
},
}
It("creates a vhost with default queue type configured", func() {
qTypeVhost.Spec.DefaultQueueType = "stream"
Expect(k8sClient.Create(ctx, qTypeVhost)).To(Succeed())

fetched := &Vhost{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: qTypeVhost.Name,
Namespace: qTypeVhost.Namespace,
}, fetched)).To(Succeed())
Expect(fetched.Spec.DefaultQueueType).To(Equal("stream"))
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{
Name: "random-cluster",
}))
})

It("fails when default queue type is invalid", func() {
qTypeVhost.Spec.DefaultQueueType = "aqueuetype"
Expect(k8sClient.Create(ctx, qTypeVhost)).To(HaveOccurred())
Expect(k8sClient.Create(ctx, qTypeVhost)).To(MatchError(`Vhost.rabbitmq.com "some-vhost" is invalid: spec.defaultQueueType: Unsupported value: "aqueuetype": supported values: "quorum", "classic", "stream"`))
})
})
})
11 changes: 9 additions & 2 deletions api/v1beta1/vhost_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ var _ = Describe("vhost webhook", func() {
Name: "test-vhost",
},
Spec: VhostSpec{
Name: "test",
Tracing: false,
Name: "test",
Tracing: false,
DefaultQueueType: "classic",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "a-cluster",
},
Expand Down Expand Up @@ -84,5 +85,11 @@ var _ = Describe("vhost webhook", func() {
newVhost.Spec.Tags = []string{"new-tag"}
Expect(newVhost.ValidateUpdate(&vhost)).To(Succeed())
})

It("allows updates on vhost.spec.defaultQueueType", func() {
newVhost := vhost.DeepCopy()
newVhost.Spec.DefaultQueueType = "quorum"
Expect(newVhost.ValidateUpdate(&vhost)).To(Succeed())
})
})
})
8 changes: 8 additions & 0 deletions config/crd/bases/rabbitmq.com_vhosts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ spec:
spec:
description: VhostSpec defines the desired state of Vhost
properties:
defaultQueueType:
description: Default queue type for this vhost; can be set to quorum,
classic or stream. Supported in RabbitMQ 3.11.12 or above.
enum:
- quorum
- classic
- stream
type: string
name:
description: Name of the vhost; see https://www.rabbitmq.com/vhosts.html.
type: string
Expand Down
1 change: 1 addition & 0 deletions docs/api/rabbitmq.com.ref.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,7 @@ VhostSpec defines the desired state of Vhost
| *`name`* __string__ | Name of the vhost; see https://www.rabbitmq.com/vhosts.html.
| *`tracing`* __boolean__ |
| *`tags`* __string array__ |
| *`defaultQueueType`* __string__ | Default queue type for this vhost; can be set to quorum, classic or stream. Supported in RabbitMQ 3.11.12 or above.
| *`rabbitmqClusterReference`* __xref:{anchor_prefix}-github-com-rabbitmq-messaging-topology-operator-api-v1beta1-rabbitmqclusterreference[$$RabbitmqClusterReference$$]__ | Reference to the RabbitmqCluster that the vhost will be created in. Required property.
|===

Expand Down
4 changes: 4 additions & 0 deletions docs/examples/vhosts/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Vhost examples

This section contains 1 example for creating a RabbitMQ vhost.
Note that setting default queue type `spec.defaultQueueType` is only supported by RabbitMQ server version `3.11.12` or above.
3 changes: 2 additions & 1 deletion docs/examples/vhosts/vhost.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ kind: Vhost
metadata:
name: test-vhost
spec:
name: test-vhost # vhost name
name: test-vhost # vhost name; required and cannot be updated
defaultVhostType: quorum # default vhost type for this vhost; require RabbitMQ version 3.11.12 or above
rabbitmqClusterReference:
name: test # rabbitmqCluster must exist in the same namespace as this resource
# status:
Expand Down
5 changes: 3 additions & 2 deletions internal/vhost_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (

func GenerateVhostSettings(v *topology.Vhost) *rabbithole.VhostSettings {
return &rabbithole.VhostSettings{
Tracing: v.Spec.Tracing,
Tags: v.Spec.Tags,
Tracing: v.Spec.Tracing,
Tags: v.Spec.Tags,
DefaultQueueType: v.Spec.DefaultQueueType,
}
}
6 changes: 6 additions & 0 deletions internal/vhost_settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,10 @@ var _ = Describe("GenerateVhostSettings", func() {
settings := internal.GenerateVhostSettings(v)
Expect(settings.Tags).To(ConsistOf("tag1", "tag2", "multi_dc_replication"))
})

It("sets default queue type according to vhost.spec.defaultQueueType", func() {
v.Spec.DefaultQueueType = "stream"
settings := internal.GenerateVhostSettings(v)
Expect(settings.DefaultQueueType).To(Equal("stream"))
})
})
3 changes: 2 additions & 1 deletion system_tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ func basicTestRabbitmqCluster(name, namespace string) *rabbitmqv1beta1.RabbitmqC
Namespace: namespace,
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
Replicas: pointer.Int32Ptr(1),
Replicas: pointer.Int32(1),
Image: "rabbitmq:3-management",
Resources: &corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceMemory: resource.MustParse("100Mi"),
Expand Down
17 changes: 15 additions & 2 deletions system_tests/vhost_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ var _ = Describe("vhost", func() {
Namespace: namespace,
},
Spec: topology.VhostSpec{
Name: "test",
Tags: []string{"multi_dc_replication"},
Name: "test",
Tags: []string{"multi_dc_replication"},
DefaultQueueType: "stream",
RabbitmqClusterReference: topology.RabbitmqClusterReference{
Name: rmq.Name,
},
Expand All @@ -49,6 +50,7 @@ var _ = Describe("vhost", func() {
Expect(fetched.Tracing).To(BeFalse())
Expect(fetched.Tags).To(HaveLen(1))
Expect(fetched.Tags[0]).To(Equal("multi_dc_replication"))
Expect(fetched.DefaultQueueType).To(Equal("stream"))

By("updating status condition 'Ready'")
updatedVhost := topology.Vhost{}
Expand Down Expand Up @@ -76,6 +78,17 @@ var _ = Describe("vhost", func() {
updateTest.Spec.Name = "new-name"
Expect(k8sClient.Update(ctx, &updateTest).Error()).To(ContainSubstring("spec.name: Forbidden: updates on name and rabbitmqClusterReference are all forbidden"))

By("updating vhosts configuration")
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace}, vhost)).To(Succeed())
vhost.Spec.DefaultQueueType = "classic"
Expect(k8sClient.Update(ctx, vhost, &client.UpdateOptions{})).To(Succeed())
Eventually(func() string {
var err error
fetched, err = rabbitClient.GetVhost(vhost.Spec.Name)
Expect(err).NotTo(HaveOccurred())
return fetched.DefaultQueueType
}, 30, 2).Should(Equal("classic"))

By("deleting a vhost")
Expect(k8sClient.Delete(ctx, vhost)).To(Succeed())
var err error
Expand Down

0 comments on commit e620453

Please sign in to comment.