Skip to content

Commit

Permalink
feat: subscription create need namespace (#529)
Browse files Browse the repository at this point in the history
* feat: subscription create need namespace

Signed-off-by: xdlbdy <[email protected]>

* feat: subscription create need namespace

Signed-off-by: xdlbdy <[email protected]>

* feat: subscription create need namespace

Signed-off-by: xdlbdy <[email protected]>

---------

Signed-off-by: xdlbdy <[email protected]>
  • Loading branch information
xdlbdy authored Mar 13, 2023
1 parent 20e543c commit 539f968
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 53 deletions.
15 changes: 0 additions & 15 deletions internal/controller/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,6 @@ func (ctrl *controller) CreateSubscription(ctx context.Context,
})
return nil, err
}
// subscription name can't be repeated in an eventbus
eventbusID := vanus.NewIDFromUint64(request.Subscription.EventbusId)
_sub := ctrl.subscriptionManager.GetSubscriptionByName(ctx, eventbusID, request.Subscription.Name)
if _sub != nil {
return nil, errors.ErrInvalidRequest.WithMessage(
fmt.Sprintf("subscription name %s has exist", request.Subscription.Name))
}
sub := convert.FromPbSubscriptionRequest(request.Subscription)
sub.ID, err = vanus.NewID()
sub.CreatedAt = time.Now()
Expand Down Expand Up @@ -230,14 +223,6 @@ func (ctrl *controller) UpdateSubscription(ctx context.Context,
if request.Subscription.EventbusId != uint64(sub.EventbusID) {
return nil, errors.ErrInvalidRequest.WithMessage("can not change eventbus")
}
if request.Subscription.Name != sub.Name {
// subscription name can't be repeated in an eventbus
_sub := ctrl.subscriptionManager.GetSubscriptionByName(ctx, sub.EventbusID, request.Subscription.Name)
if _sub != nil {
return nil, errors.ErrInvalidRequest.WithMessage(
fmt.Sprintf("subscription name %s has exist", request.Subscription.Name))
}
}
update := convert.FromPbSubscriptionRequest(request.Subscription)
transChange := 0
if !sub.Transformer.Exist() && update.Transformer.Exist() {
Expand Down
84 changes: 55 additions & 29 deletions internal/controller/trigger/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ func TestController_CreateSubscription(t *testing.T) {
subManager.EXPECT().AddSubscription(gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
create := &ctrlpb.CreateSubscriptionRequest{
Subscription: &ctrlpb.SubscriptionRequest{
EventbusId: vanus.NewTestID().Uint64(),
Name: "test-name",
Sink: "test-sink",
NamespaceId: vanus.NewTestID().Uint64(),
EventbusId: vanus.NewTestID().Uint64(),
Name: "test-name",
Sink: "test-sink",
},
}
request := create.Subscription
Expand All @@ -141,10 +142,12 @@ func TestController_CreateSubscription(t *testing.T) {
So(err, ShouldBeNil)
So(resp.Sink, ShouldEqual, request.Sink)
So(resp.EventbusId, ShouldEqual, request.EventbusId)
So(resp.NamespaceId, ShouldEqual, request.NamespaceId)
resp2, err := ctrl.CreateSubscription(ctx, create)
So(err, ShouldBeNil)
So(resp2.Sink, ShouldEqual, request.Sink)
So(resp2.EventbusId, ShouldEqual, request.EventbusId)
So(resp2.NamespaceId, ShouldEqual, request.NamespaceId)
So(resp.Id, ShouldNotEqual, resp2.Id)
})
})
Expand All @@ -164,6 +167,7 @@ func TestController_UpdateSubscription(t *testing.T) {

subID := vanus.NewTestID()
eventbusID := vanus.NewTestID()
namespaceID := vanus.NewTestID()
ctrl.state = primitive.ServerStateRunning
Convey("update subscription not exist", func() {
subManager.EXPECT().GetSubscription(gomock.Any(), gomock.Eq(subID)).Return(nil)
Expand All @@ -181,6 +185,7 @@ func TestController_UpdateSubscription(t *testing.T) {
Phase: metadata.SubscriptionPhaseStopped,
TriggerWorker: "test-addr",
EventbusID: eventbusID,
NamespaceID: namespaceID,
Name: "test-name",
Sink: "test-sink",
Protocol: primitive.HTTPProtocol,
Expand All @@ -194,8 +199,21 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
EventbusId: eventbusID.Uint64(),
Sink: "test-sink",
NamespaceId: namespaceID.Uint64(),
EventbusId: eventbusID.Uint64(),
Sink: "test-sink",
},
}
_, err := ctrl.UpdateSubscription(ctx, request)
So(err, ShouldNotBeNil)
})
Convey("tet update namespace fail", func() {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
NamespaceId: vanus.NewTestID().Uint64(),
EventbusId: eventbusID.Uint64(),
Sink: "test-sink",
},
}
_, err := ctrl.UpdateSubscription(ctx, request)
Expand All @@ -205,8 +223,9 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
EventbusId: vanus.NewTestID().Uint64(),
Sink: "test-sink",
NamespaceId: namespaceID.Uint64(),
EventbusId: vanus.NewTestID().Uint64(),
Sink: "test-sink",
},
}
_, err := ctrl.UpdateSubscription(ctx, request)
Expand All @@ -217,9 +236,10 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
EventbusId: eventbusID.Uint64(),
Sink: "test-sink",
Protocol: metapb.Protocol_AWS_LAMBDA,
NamespaceId: namespaceID.Uint64(),
EventbusId: eventbusID.Uint64(),
Sink: "test-sink",
Protocol: metapb.Protocol_AWS_LAMBDA,
SinkCredential: &metapb.SinkCredential{
CredentialType: metapb.SinkCredential_AWS,
},
Expand All @@ -232,9 +252,10 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
EventbusId: eventbusID.Uint64(),
Sink: "arn:aws:lambda:us-west-2:843378899134:function:xdltest",
Protocol: metapb.Protocol_AWS_LAMBDA,
NamespaceId: namespaceID.Uint64(),
EventbusId: eventbusID.Uint64(),
Sink: "arn:aws:lambda:us-west-2:843378899134:function:xdltest",
Protocol: metapb.Protocol_AWS_LAMBDA,
SinkCredential: &metapb.SinkCredential{
CredentialType: metapb.SinkCredential_PLAIN,
},
Expand All @@ -247,9 +268,10 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
EventbusId: eventbusID.Uint64(),
Sink: "arn:aws:lambda:us-west-2:843378899134:function:xdltest",
Protocol: metapb.Protocol_AWS_LAMBDA,
NamespaceId: namespaceID.Uint64(),
EventbusId: eventbusID.Uint64(),
Sink: "arn:aws:lambda:us-west-2:843378899134:function:xdltest",
Protocol: metapb.Protocol_AWS_LAMBDA,
SinkCredential: &metapb.SinkCredential{
CredentialType: metapb.SinkCredential_AWS,
},
Expand All @@ -262,10 +284,11 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
EventbusId: eventbusID.Uint64(),
Name: "test-name",
Sink: "arn:aws:lambda:us-west-2:843378899134:function:xdltest",
Protocol: metapb.Protocol_AWS_LAMBDA,
NamespaceId: namespaceID.Uint64(),
EventbusId: eventbusID.Uint64(),
Name: "test-name",
Sink: "arn:aws:lambda:us-west-2:843378899134:function:xdltest",
Protocol: metapb.Protocol_AWS_LAMBDA,
SinkCredential: &metapb.SinkCredential{
CredentialType: metapb.SinkCredential_AWS,
Credential: &metapb.SinkCredential_Aws{
Expand All @@ -292,9 +315,10 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
EventbusId: eventbusID.Uint64(),
Name: "test-name",
Sink: "modify-sink",
NamespaceId: namespaceID.Uint64(),
EventbusId: eventbusID.Uint64(),
Name: "test-name",
Sink: "modify-sink",
},
}
resp, err := ctrl.UpdateSubscription(ctx, request)
Expand All @@ -308,9 +332,10 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
EventbusId: eventbusID.Uint64(),
Name: "test-name",
Sink: "test-sink",
NamespaceId: namespaceID.Uint64(),
EventbusId: eventbusID.Uint64(),
Name: "test-name",
Sink: "test-sink",
Filters: []*metapb.Filter{
{
Exact: map[string]string{"type": "test"},
Expand All @@ -328,9 +353,10 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
EventbusId: eventbusID.Uint64(),
Name: "test-name",
Sink: "test-sink",
NamespaceId: namespaceID.Uint64(),
EventbusId: eventbusID.Uint64(),
Name: "test-name",
Sink: "test-sink",
Transformer: &metapb.Transformer{
Define: map[string]string{"k": "v"},
Template: "test",
Expand Down
1 change: 1 addition & 0 deletions internal/controller/trigger/metadata/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Subscription struct {
Protocol primitive.Protocol `json:"protocol,omitempty"`
ProtocolSetting *primitive.ProtocolSetting `json:"protocol_settings,omitempty"`
EventbusID vanus.ID `json:"eventbus_id"`
NamespaceID vanus.ID `json:"namespace_id"`
Transformer *primitive.Transformer `json:"transformer,omitempty"`
Name string `json:"name"`
Description string `json:"description"`
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/trigger/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Manager interface {
ResetOffsetByTimestamp(ctx context.Context, id vanus.ID, timestamp uint64) (info.ListOffsetInfo, error)
ListSubscription(ctx context.Context) []*metadata.Subscription
GetSubscription(ctx context.Context, id vanus.ID) *metadata.Subscription
GetSubscriptionByName(ctx context.Context, eventbusID vanus.ID, name string) *metadata.Subscription
GetSubscriptionByName(ctx context.Context, namespaceID vanus.ID, name string) *metadata.Subscription
AddSubscription(ctx context.Context, subscription *metadata.Subscription) error
UpdateSubscription(ctx context.Context, subscription *metadata.Subscription) error
Heartbeat(ctx context.Context, id vanus.ID, addr string, time time.Time) error
Expand Down Expand Up @@ -104,11 +104,11 @@ func (m *manager) ListSubscription(_ context.Context) []*metadata.Subscription {
return list
}

func (m *manager) GetSubscriptionByName(ctx context.Context, eventbusID vanus.ID, name string) *metadata.Subscription {
func (m *manager) GetSubscriptionByName(ctx context.Context, namespaceID vanus.ID, name string) *metadata.Subscription {
m.lock.RLock()
defer m.lock.RUnlock()
for _, sub := range m.subscriptionMap {
if sub.EventbusID != eventbusID {
if sub.NamespaceID != namespaceID {
continue
}
if sub.Name == name {
Expand Down
11 changes: 7 additions & 4 deletions internal/controller/trigger/validation/subscripton.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/api/idtoken"
"google.golang.org/api/option"

"github.com/vanus-labs/vanus/internal/primitive/vanus"
"github.com/vanus-labs/vanus/pkg/errors"
ctrlpb "github.com/vanus-labs/vanus/proto/pkg/controller"
metapb "github.com/vanus-labs/vanus/proto/pkg/meta"
Expand All @@ -32,10 +33,15 @@ import (
"github.com/vanus-labs/vanus/internal/primitive/cel"
"github.com/vanus-labs/vanus/internal/primitive/transform/arg"
"github.com/vanus-labs/vanus/internal/primitive/transform/runtime"
"github.com/vanus-labs/vanus/internal/primitive/vanus"
)

func ValidateSubscriptionRequest(ctx context.Context, request *ctrlpb.SubscriptionRequest) error {
if request.NamespaceId == vanus.EmptyID().Uint64() {
return errors.ErrInvalidRequest.WithMessage("namespace is empty")
}
if request.EventbusId == vanus.EmptyID().Uint64() {
return errors.ErrInvalidRequest.WithMessage("eventbus is empty")
}
if err := ValidateFilterList(ctx, request.Filters); err != nil {
return errors.ErrInvalidRequest.WithMessage("filters is invalid").Wrap(err)
}
Expand All @@ -48,9 +54,6 @@ func ValidateSubscriptionRequest(ctx context.Context, request *ctrlpb.Subscripti
if err := validateSinkCredential(ctx, request.Sink, request.SinkCredential); err != nil {
return err
}
if request.EventbusId == vanus.EmptyID().Uint64() {
return errors.ErrInvalidRequest.WithMessage("eventbus is empty")
}
if request.Name == "" {
return errors.ErrInvalidRequest.WithMessage("name is empty")
}
Expand Down
5 changes: 3 additions & 2 deletions internal/controller/trigger/validation/suscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func TestSubscriptionRequestValidator(t *testing.T) {
ctx := context.Background()
Convey("multiple dialect", t, func() {
request := &ctrlpb.SubscriptionRequest{
NamespaceId: vanus.NewTestID().Uint64(),
EventbusId: vanus.NewTestID().Uint64(),
Filters: []*metapb.Filter{{
Not: &metapb.Filter{
Exact: map[string]string{
Expand All @@ -43,8 +45,7 @@ func TestSubscriptionRequestValidator(t *testing.T) {
})
Convey("eventbus empty", t, func() {
request := &ctrlpb.SubscriptionRequest{
Sink: "sink",
EventbusId: vanus.EmptyID().Uint64(),
Sink: "sink",
}
So(ValidateSubscriptionRequest(ctx, request), ShouldNotBeNil)
})
Expand Down
2 changes: 2 additions & 0 deletions internal/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func FromPbSubscriptionRequest(sub *ctrl.SubscriptionRequest) *metadata.Subscrip
Filters: fromPbFilters(sub.Filters),
Transformer: fromPbTransformer(sub.Transformer),
EventbusID: vanus.NewIDFromUint64(sub.EventbusId),
NamespaceID: vanus.NewIDFromUint64(sub.NamespaceId),
Name: sub.Name,
Description: sub.Description,
}
Expand Down Expand Up @@ -295,6 +296,7 @@ func ToPbSubscription(sub *metadata.Subscription, offsets info.ListOffsetInfo) *
Protocol: toPbProtocol(sub.Protocol),
ProtocolSettings: toPbProtocolSettings(sub.ProtocolSetting),
EventbusId: sub.EventbusID.Uint64(),
NamespaceId: sub.NamespaceID.Uint64(),
Filters: toPbFilters(sub.Filters),
Transformer: ToPbTransformer(sub.Transformer),
Offsets: ToPbOffsetInfos(offsets),
Expand Down
5 changes: 5 additions & 0 deletions vsctl/command/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func createSubscriptionCommand() *cobra.Command {
Sink: sink,
SinkCredential: credential,
Protocol: p,
NamespaceId: mustGetNamespaceID(namespace).Uint64(),
EventbusId: mustGetEventbusID(namespace, eventbus).Uint64(),
Transformer: trans,
Name: subscriptionName,
Expand Down Expand Up @@ -358,6 +359,7 @@ func updateSubscriptionCommand() *cobra.Command {
Sink: sub.Sink,
SinkCredential: sub.SinkCredential,
Protocol: sub.Protocol,
NamespaceId: mustGetNamespaceID(namespace).Uint64(),
EventbusId: mustGetEventbusID(namespace, eventbus).Uint64(),
Transformer: sub.Transformer,
Name: sub.Name,
Expand Down Expand Up @@ -577,6 +579,9 @@ func listSubscriptionCommand() *cobra.Command {
request := &ctrlpb.ListSubscriptionRequest{
Name: subscriptionName,
}
if namespace != "" {
request.NamespaceId = mustGetNamespaceID(namespace).Uint64()
}
if eventbus != "" {
request.EventbusId = mustGetEventbusID(namespace, eventbus).Uint64()
}
Expand Down

0 comments on commit 539f968

Please sign in to comment.