-
Notifications
You must be signed in to change notification settings - Fork 37
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add new watermill handlers that get or refresh entities by properties…
… and call another handler These will be used in webhook handlers instead of calling the property service directly. The handlers follow the same base logic, just with "pluggable" ways of retrieving the entity, converting the entity to properties and which handler to call next. Related: #4327
- Loading branch information
Showing
17 changed files
with
1,319 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
// | ||
// Copyright 2024 Stacklok, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// Package handlers contains the message handlers for entities. | ||
package handlers | ||
|
||
import ( | ||
watermill "github.com/ThreeDotsLabs/watermill/message" | ||
"github.com/rs/zerolog" | ||
|
||
"github.com/stacklok/minder/internal/db" | ||
"github.com/stacklok/minder/internal/entities/handlers/message" | ||
"github.com/stacklok/minder/internal/entities/handlers/strategies" | ||
entStrategies "github.com/stacklok/minder/internal/entities/handlers/strategies/entity" | ||
msgStrategies "github.com/stacklok/minder/internal/entities/handlers/strategies/message" | ||
propertyService "github.com/stacklok/minder/internal/entities/properties/service" | ||
"github.com/stacklok/minder/internal/events" | ||
"github.com/stacklok/minder/internal/providers/manager" | ||
) | ||
|
||
type handleEntityAndDoBase struct { | ||
evt events.Publisher | ||
|
||
refreshEntity strategies.GetEntityStrategy | ||
createMessage strategies.MessageCreateStrategy | ||
|
||
handlerName string | ||
forwardHandlerName string | ||
|
||
handlerMiddleware []watermill.HandlerMiddleware | ||
} | ||
|
||
// Register satisfies the events.Consumer interface. | ||
func (b *handleEntityAndDoBase) Register(r events.Registrar) { | ||
r.Register(b.handlerName, b.handleRefreshEntityAndDo, b.handlerMiddleware...) | ||
} | ||
|
||
func (b *handleEntityAndDoBase) handleRefreshEntityAndDo(msg *watermill.Message) error { | ||
ctx := msg.Context() | ||
|
||
l := zerolog.Ctx(ctx).With(). | ||
Str("messageStrategy", b.createMessage.GetName()). | ||
Str("refreshStrategy", b.refreshEntity.GetName()). | ||
Logger() | ||
|
||
// unmarshal the message | ||
entMsg, err := message.ToEntityRefreshAndDo(msg) | ||
if err != nil { | ||
l.Error().Err(err).Msg("error unpacking message") | ||
return nil | ||
} | ||
l.Debug().Msg("message unpacked") | ||
|
||
// call refreshEntity | ||
ewp, err := b.refreshEntity.GetEntity(ctx, entMsg) | ||
if err != nil { | ||
l.Error().Err(err).Msg("error refreshing entity") | ||
// do not return error in the handler, just log it | ||
// we might want to special-case retrying /some/ errors specifically those from the | ||
// provider, but for now, just log it | ||
return nil | ||
} | ||
|
||
if ewp != nil { | ||
l.Debug(). | ||
Str("entityID", ewp.Entity.ID.String()). | ||
Str("providerID", ewp.Entity.ProviderID.String()). | ||
Msg("entity refreshed") | ||
} else { | ||
l.Debug().Msg("entity not retrieved") | ||
} | ||
|
||
nextMsg, err := b.createMessage.CreateMessage(ctx, ewp) | ||
if err != nil { | ||
l.Error().Err(err).Msg("error creating message") | ||
return nil | ||
} | ||
|
||
// If nextMsg is nil, it means we don't need to publish anything (entity not found) | ||
if nextMsg != nil { | ||
l.Debug().Msg("publishing message") | ||
if err := b.evt.Publish(b.forwardHandlerName, nextMsg); err != nil { | ||
l.Error().Err(err).Msg("error publishing message") | ||
return nil | ||
} | ||
} else { | ||
l.Info().Msg("no message to publish") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// NewRefreshEntityAndEvaluateHandler creates a new handler that refreshes an entity and evaluates it. | ||
func NewRefreshEntityAndEvaluateHandler( | ||
evt events.Publisher, | ||
store db.Store, | ||
propSvc propertyService.PropertiesService, | ||
provMgr manager.ProviderManager, | ||
handlerMiddleware ...watermill.HandlerMiddleware, | ||
) events.Consumer { | ||
return &handleEntityAndDoBase{ | ||
evt: evt, | ||
|
||
refreshEntity: entStrategies.NewRefreshEntityByUpstreamIDStrategy(propSvc, provMgr, store), | ||
createMessage: msgStrategies.NewToEntityInfoWrapper(store, propSvc, provMgr), | ||
|
||
handlerName: events.TopicQueueRefreshEntityAndEvaluate, | ||
forwardHandlerName: events.TopicQueueEntityEvaluate, | ||
|
||
handlerMiddleware: handlerMiddleware, | ||
} | ||
} | ||
|
||
// NewGetEntityAndDeleteHandler creates a new handler that gets an entity and deletes it. | ||
func NewGetEntityAndDeleteHandler( | ||
evt events.Publisher, | ||
propSvc propertyService.PropertiesService, | ||
handlerMiddleware ...watermill.HandlerMiddleware, | ||
) events.Consumer { | ||
return &handleEntityAndDoBase{ | ||
evt: evt, | ||
|
||
refreshEntity: entStrategies.NewGetEntityByUpstreamIDStrategy(propSvc), | ||
createMessage: msgStrategies.NewToMinderEntity(), | ||
|
||
handlerName: events.TopicQueueGetEntityAndDelete, | ||
forwardHandlerName: events.TopicQueueReconcileEntityDelete, | ||
|
||
handlerMiddleware: handlerMiddleware, | ||
} | ||
} | ||
|
||
// NewAddOriginatingEntityHandler creates a new handler that adds an originating entity. | ||
func NewAddOriginatingEntityHandler( | ||
evt events.Publisher, | ||
store db.Store, | ||
propSvc propertyService.PropertiesService, | ||
provMgr manager.ProviderManager, | ||
handlerMiddleware ...watermill.HandlerMiddleware, | ||
) events.Consumer { | ||
return &handleEntityAndDoBase{ | ||
evt: evt, | ||
|
||
refreshEntity: entStrategies.NewAddOriginatingEntityStrategy(propSvc, provMgr, store), | ||
createMessage: msgStrategies.NewToEntityInfoWrapper(store, propSvc, provMgr), | ||
|
||
handlerName: events.TopicQueueOriginatingEntityAdd, | ||
forwardHandlerName: events.TopicQueueEntityEvaluate, | ||
|
||
handlerMiddleware: handlerMiddleware, | ||
} | ||
} | ||
|
||
// NewRemoveOriginatingEntityHandler creates a new handler that removes an originating entity. | ||
func NewRemoveOriginatingEntityHandler( | ||
evt events.Publisher, | ||
store db.Store, | ||
propSvc propertyService.PropertiesService, | ||
provMgr manager.ProviderManager, | ||
handlerMiddleware ...watermill.HandlerMiddleware, | ||
) events.Consumer { | ||
return &handleEntityAndDoBase{ | ||
evt: evt, | ||
|
||
refreshEntity: entStrategies.NewDelOriginatingEntityStrategy(propSvc, provMgr, store), | ||
createMessage: msgStrategies.NewCreateEmpty(), | ||
|
||
handlerName: events.TopicQueueOriginatingEntityDelete, | ||
|
||
handlerMiddleware: handlerMiddleware, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
// | ||
// Copyright 2024 Stacklok, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package handlers | ||
|
||
import ( | ||
"database/sql" | ||
"testing" | ||
|
||
"github.com/ThreeDotsLabs/watermill/message" | ||
"github.com/google/uuid" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.uber.org/mock/gomock" | ||
|
||
mockdb "github.com/stacklok/minder/database/mock" | ||
"github.com/stacklok/minder/internal/engine/entities" | ||
message2 "github.com/stacklok/minder/internal/entities/handlers/message" | ||
"github.com/stacklok/minder/internal/entities/models" | ||
"github.com/stacklok/minder/internal/entities/properties" | ||
"github.com/stacklok/minder/internal/entities/properties/service" | ||
mock_service "github.com/stacklok/minder/internal/entities/properties/service/mock" | ||
stubeventer "github.com/stacklok/minder/internal/events/stubs" | ||
ghprops "github.com/stacklok/minder/internal/providers/github/properties" | ||
"github.com/stacklok/minder/internal/providers/manager" | ||
mock_manager "github.com/stacklok/minder/internal/providers/manager/mock" | ||
minderv1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" | ||
) | ||
|
||
func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { | ||
t.Parallel() | ||
|
||
tests := []struct { | ||
name string | ||
lookupPropMap map[string]any | ||
entPropMap map[string]any | ||
nextHandler string | ||
providerHint string | ||
ewp *models.EntityWithProperties | ||
setupMocks func(*gomock.Controller, *models.EntityWithProperties) (service.PropertiesService, manager.ProviderManager) | ||
expectedError string | ||
expectedPublish bool | ||
}{ | ||
{ | ||
name: "successful refresh and publish of a repo", | ||
lookupPropMap: map[string]any{ | ||
properties.PropertyUpstreamID: "123", | ||
}, | ||
ewp: &models.EntityWithProperties{ | ||
Entity: models.EntityInstance{ | ||
ID: uuid.New(), | ||
Type: minderv1.Entity_ENTITY_REPOSITORIES, | ||
Name: "testorg/testrepo", | ||
ProviderID: uuid.New(), | ||
ProjectID: uuid.New(), | ||
}, | ||
}, | ||
nextHandler: "call.me.next", | ||
providerHint: "github", | ||
entPropMap: map[string]any{ | ||
properties.PropertyName: "testorg/testrepo", | ||
ghprops.RepoPropertyName: "testrepo", | ||
ghprops.RepoPropertyOwner: "testorg", | ||
ghprops.RepoPropertyId: int64(123), | ||
properties.RepoPropertyIsPrivate: false, | ||
properties.RepoPropertyIsFork: false, | ||
}, | ||
setupMocks: func(ctrl *gomock.Controller, ewp *models.EntityWithProperties) (service.PropertiesService, manager.ProviderManager) { | ||
mockPropSvc := mock_service.NewMockPropertiesService(ctrl) | ||
mockProvMgr := mock_manager.NewMockProviderManager(ctrl) | ||
|
||
protoEnt, err := ghprops.RepoV1FromProperties(ewp.Properties) | ||
require.NoError(t, err) | ||
|
||
mockPropSvc.EXPECT(). | ||
EntityWithPropertiesByUpstreamHint(gomock.Any(), ewp.Entity.Type, gomock.Any(), gomock.Any(), gomock.Any()). | ||
Return(ewp, nil) | ||
mockPropSvc.EXPECT(). | ||
RetrieveAllPropertiesForEntity(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). | ||
Return(nil) | ||
mockPropSvc.EXPECT(). | ||
EntityWithPropertiesAsProto(gomock.Any(), ewp, mockProvMgr). | ||
Return(protoEnt, nil) | ||
|
||
return mockPropSvc, mockProvMgr | ||
}, | ||
expectedPublish: true, | ||
}, | ||
} | ||
|
||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
t.Parallel() | ||
|
||
ctrl := gomock.NewController(t) | ||
defer ctrl.Finish() | ||
|
||
getByProps, err := properties.NewProperties(tt.lookupPropMap) | ||
require.NoError(t, err) | ||
|
||
handlerMsg := message.NewMessage(uuid.New().String(), nil) | ||
entityMsg := message2.NewEntityRefreshAndDoMessage(). | ||
WithEntity(minderv1.Entity_ENTITY_REPOSITORIES, getByProps). | ||
WithProviderImplementsHint(tt.providerHint) | ||
|
||
err = entityMsg.ToMessage(handlerMsg) | ||
require.NoError(t, err) | ||
|
||
entProps, err := properties.NewProperties(tt.entPropMap) | ||
require.NoError(t, err) | ||
tt.ewp.Properties = entProps | ||
|
||
mockPropSvc, mockProvMgr := tt.setupMocks(ctrl, tt.ewp) | ||
|
||
mockStore := mockdb.NewMockStore(ctrl) | ||
tx := sql.Tx{} | ||
mockStore.EXPECT(). | ||
BeginTransaction(). | ||
Return(&tx, nil) | ||
mockStore.EXPECT(). | ||
Commit(&tx). | ||
Return(nil) | ||
mockStore.EXPECT(). | ||
Rollback(&tx). | ||
Return(nil) | ||
mockStore.EXPECT(). | ||
GetQuerierWithTransaction(&tx). | ||
Return(mockStore) | ||
|
||
stubEventer := &stubeventer.StubEventer{} | ||
handler := NewRefreshEntityAndEvaluateHandler(stubEventer, mockStore, mockPropSvc, mockProvMgr) | ||
|
||
refreshHandlerStruct, ok := handler.(*handleEntityAndDoBase) | ||
require.True(t, ok) | ||
err = refreshHandlerStruct.handleRefreshEntityAndDo(handlerMsg) | ||
|
||
if tt.expectedError != "" { | ||
assert.Error(t, err) | ||
assert.Contains(t, err.Error(), tt.expectedError) | ||
} else { | ||
assert.NoError(t, err) | ||
} | ||
|
||
if !tt.expectedPublish { | ||
assert.Equal(t, 0, len(stubEventer.Sent), "Expected no publish calls") | ||
return | ||
} | ||
|
||
assert.Equal(t, 1, len(stubEventer.Sent), "Expected one publish call") | ||
sentMsg := stubEventer.Sent[0] | ||
eiw, err := entities.ParseEntityEvent(sentMsg) | ||
require.NoError(t, err) | ||
require.NotNil(t, eiw) | ||
|
||
assert.Equal(t, tt.ewp.Entity.Type, eiw.Type) | ||
assert.Equal(t, tt.ewp.Entity.ProjectID, eiw.ProjectID) | ||
assert.Equal(t, tt.ewp.Entity.ProviderID, eiw.ProviderID) | ||
|
||
pbrepo, ok := eiw.Entity.(*minderv1.Repository) | ||
require.True(t, ok) | ||
assert.Equal(t, tt.entPropMap[ghprops.RepoPropertyName].(string), pbrepo.Name) | ||
}) | ||
} | ||
} |
Oops, something went wrong.