Skip to content

Commit

Permalink
Revert "remove sns stuff from gateway (#766)"
Browse files Browse the repository at this point in the history
This reverts commit eb5a0d4.
  • Loading branch information
smonero authored Aug 13, 2024
1 parent eb5a0d4 commit 47e053a
Show file tree
Hide file tree
Showing 14 changed files with 514 additions and 20 deletions.
14 changes: 13 additions & 1 deletion server/legacy/lyft/gateway/events_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func NewVCSEventsController(
scope tally.Scope,
webhookSecret []byte,
allowDraftPRs bool,
snsWriter gateway_handlers.Writer,
commentParser events.CommentParsing,
repoAllowlistChecker *events.RepoAllowlistChecker,
vcsClient vcs.Client,
Expand All @@ -60,10 +61,19 @@ func NewVCSEventsController(
clientCreator githubapp.ClientCreator,
defaultTFVersion string,
) *VCSEventsController {
pullEventSNSProxy := gateway_handlers.NewSNSWorkerProxy(
snsWriter, logger,
)
legacyHandler := &gateway_handlers.LegacyPullHandler{
Logger: logger,
WorkerProxy: pullEventSNSProxy,
VCSStatusUpdater: vcsStatusUpdater,
}
prSignaler := &pr.WorkflowSignaler{TemporalClient: temporalClient, DefaultTFVersion: defaultTFVersion}
prRequirementChecker := requirement.NewPRAggregate(globalCfg)
modifiedPullHandler := gateway_handlers.NewModifiedPullHandler(logger, asyncScheduler, rootConfigBuilder, globalCfg, prRequirementChecker, prSignaler)
modifiedPullHandler := gateway_handlers.NewModifiedPullHandler(logger, asyncScheduler, rootConfigBuilder, globalCfg, prRequirementChecker, prSignaler, legacyHandler)
closedPullHandler := &gateway_handlers.ClosedPullRequestHandler{
WorkerProxy: pullEventSNSProxy,
Logger: logger,
PRCloseSignaler: prSignaler,
Scope: scope.SubScope("pull.closed"),
Expand Down Expand Up @@ -108,6 +118,7 @@ func NewVCSEventsController(
vcsClient,
gateway_handlers.NewCommentEventWorkerProxy(
logger,
snsWriter,
asyncScheduler,
prSignaler,
deploySignaler,
Expand Down Expand Up @@ -143,6 +154,7 @@ func NewVCSEventsController(

pullRequestReviewHandler := &gateway_handlers.PullRequestReviewWorkerProxy{
Scheduler: asyncScheduler,
SnsWriter: snsWriter,
Logger: logger,
CheckRunFetcher: checkRunFetcher,
WorkflowSignaler: prSignaler,
Expand Down
5 changes: 5 additions & 0 deletions server/neptune/gateway/event/closed_pull_request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@ type prCloseSignaler interface {
}

type ClosedPullRequestHandler struct {
WorkerProxy workerProxy
Logger logging.Logger
PRCloseSignaler prCloseSignaler
Scope tally.Scope
}

func (c *ClosedPullRequestHandler) Handle(ctx context.Context, request *http.BufferedRequest, event PullRequest) error {
if err := c.WorkerProxy.Handle(ctx, request, event); err != nil {
c.Logger.ErrorContext(ctx, err.Error())
}

if err := c.handlePlatformMode(ctx, event); err != nil {
return errors.Wrap(err, "handling platform mode")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (
)

func TestClosedPullHandler_Handle(t *testing.T) {
workerProxy := &mockWorkerProxy{}
signaler := &testCloseSignaler{
t: t,
expectedRepoName: "repo",
expectedPullNum: 1,
}
pullHandler := event.ClosedPullRequestHandler{
Logger: logging.NewNoopCtxLogger(t),
WorkerProxy: workerProxy,
PRCloseSignaler: signaler,
}
pr := event.PullRequest{
Expand All @@ -35,10 +37,12 @@ func TestClosedPullHandler_Handle(t *testing.T) {
}
err := pullHandler.Handle(context.Background(), &http.BufferedRequest{}, pr)
assert.True(t, signaler.called)
assert.True(t, workerProxy.called)
assert.NoError(t, err)
}

func TestClosedPullHandler_Handle_SignalError(t *testing.T) {
workerProxy := &mockWorkerProxy{}
signaler := &testCloseSignaler{
t: t,
err: assert.AnError,
Expand All @@ -47,6 +51,7 @@ func TestClosedPullHandler_Handle_SignalError(t *testing.T) {
}
pullHandler := event.ClosedPullRequestHandler{
Logger: logging.NewNoopCtxLogger(t),
WorkerProxy: workerProxy,
PRCloseSignaler: signaler,
}
pr := event.PullRequest{
Expand All @@ -60,10 +65,12 @@ func TestClosedPullHandler_Handle_SignalError(t *testing.T) {
}
err := pullHandler.Handle(context.Background(), &http.BufferedRequest{}, pr)
assert.True(t, signaler.called)
assert.True(t, workerProxy.called)
assert.Error(t, err)
}

func TestClosedPullHandler_Handle_SignalNotFoundError(t *testing.T) {
workerProxy := &mockWorkerProxy{}
signaler := &testCloseSignaler{
t: t,
expectedRepoName: "repo",
Expand All @@ -72,6 +79,7 @@ func TestClosedPullHandler_Handle_SignalNotFoundError(t *testing.T) {
}
pullHandler := event.ClosedPullRequestHandler{
Logger: logging.NewNoopCtxLogger(t),
WorkerProxy: workerProxy,
PRCloseSignaler: signaler,
Scope: tally.NewTestScope("", map[string]string{}),
}
Expand All @@ -86,6 +94,7 @@ func TestClosedPullHandler_Handle_SignalNotFoundError(t *testing.T) {
}
err := pullHandler.Handle(context.Background(), &http.BufferedRequest{}, pr)
assert.True(t, signaler.called)
assert.True(t, workerProxy.called)
assert.NoError(t, err)
}

Expand Down
11 changes: 10 additions & 1 deletion server/neptune/gateway/event/comment_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,15 @@ func (c Comment) GetRepo() models.Repo {
return c.BaseRepo
}

func NewCommentEventWorkerProxy(logger logging.Logger, scheduler scheduler, prSignaler prSignaler, deploySignaler deploySignaler, commentCreator commentCreator, vcsStatusUpdater statusUpdater, globalCfg valid.GlobalCfg, rootConfigBuilder rootConfigBuilder, legacyErrorHandler errorHandler, neptuneErrorHandler errorHandler, requirementChecker requirementChecker) *CommentEventWorkerProxy {
func NewCommentEventWorkerProxy(logger logging.Logger, snsWriter Writer, scheduler scheduler, prSignaler prSignaler, deploySignaler deploySignaler, commentCreator commentCreator, vcsStatusUpdater statusUpdater, globalCfg valid.GlobalCfg, rootConfigBuilder rootConfigBuilder, legacyErrorHandler errorHandler, neptuneErrorHandler errorHandler, requirementChecker requirementChecker) *CommentEventWorkerProxy {
return &CommentEventWorkerProxy{
logger: logger,
scheduler: scheduler,
legacyHandler: &LegacyCommentHandler{
logger: logger,
snsWriter: snsWriter,
globalCfg: globalCfg,
},
neptuneWorkerProxy: &NeptuneWorkerProxy{
logger: logger,
deploySignaler: deploySignaler,
Expand Down Expand Up @@ -172,6 +177,7 @@ type CommentEventWorkerProxy struct {
scheduler scheduler
vcsStatusUpdater statusUpdater
rootConfigBuilder rootConfigBuilder
legacyHandler *LegacyCommentHandler
neptuneWorkerProxy *NeptuneWorkerProxy
neptuneErrorHandler errorHandler
legacyErrorHandler errorHandler
Expand Down Expand Up @@ -203,6 +209,9 @@ func (p *CommentEventWorkerProxy) handle(ctx context.Context, request *http.Buff
}

fxns := []sync.Executor{
p.legacyErrorHandler.WrapWithHandling(ctx, event, cmd.CommandName().String(), func(ctx context.Context) error {
return p.legacyHandler.Handle(ctx, event, cmd, roots, request)
}),
p.neptuneErrorHandler.WrapWithHandling(ctx, event, cmd.CommandName().String(), func(ctx context.Context) error {
return p.neptuneWorkerProxy.Handle(ctx, event, cmd, roots, request)
}),
Expand Down
29 changes: 22 additions & 7 deletions server/neptune/gateway/event/comment_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func TestCommentEventWorkerProxy_HandleForceApply(t *testing.T) {
},
},
}
writer := &mockSnsWriter{}
scheduler := &sync.SynchronousScheduler{Logger: logger}
commentCreator := &mockCommentCreator{
expectedT: t,
Expand All @@ -172,7 +173,7 @@ func TestCommentEventWorkerProxy_HandleForceApply(t *testing.T) {
prSignaler := &mockPRSignaler{
expectedT: t,
}
commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{})
commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, writer, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{})
bufReq := buildRequest(t)
cmd := &command.Comment{
Name: command.Apply,
Expand All @@ -182,6 +183,7 @@ func TestCommentEventWorkerProxy_HandleForceApply(t *testing.T) {
assert.NoError(t, err)
assert.True(t, commentCreator.isCalled)
assert.True(t, testSignaler.called())
assert.False(t, writer.isCalled)
assert.False(t, statusUpdater.isCalled)
}

Expand Down Expand Up @@ -220,11 +222,12 @@ func TestCommentEventWorkerProxy_HandleApplyComment_RequirementsFailed(t *testin
expectedT: t,
}

writer := &mockSnsWriter{}
scheduler := &sync.SynchronousScheduler{Logger: logger}
commentCreator := &mockCommentCreator{}
statusUpdater := &mockStatusUpdater{}
cfg := valid.NewGlobalCfg("somedir")
commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{
commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, writer, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{
err: assert.AnError,
})
bufReq := buildRequest(t)
Expand All @@ -236,6 +239,7 @@ func TestCommentEventWorkerProxy_HandleApplyComment_RequirementsFailed(t *testin
assert.False(t, statusUpdater.isCalled)
assert.False(t, commentCreator.isCalled)
assert.False(t, testSignaler.called)
assert.False(t, writer.isCalled)
}

func TestCommentEventWorkerProxy_HandleApplyComment(t *testing.T) {
Expand Down Expand Up @@ -293,14 +297,16 @@ func TestCommentEventWorkerProxy_HandleApplyComment(t *testing.T) {
},
},
}

writer := &mockSnsWriter{}
scheduler := &sync.SynchronousScheduler{Logger: logger}
commentCreator := &mockCommentCreator{}
statusUpdater := &mockStatusUpdater{}
cfg := valid.NewGlobalCfg("somedir")
prSignaler := &mockPRSignaler{
expectedT: t,
}
commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{})
commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, writer, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{})
bufReq := buildRequest(t)
cmd := &command.Comment{
Name: command.Apply,
Expand All @@ -310,6 +316,7 @@ func TestCommentEventWorkerProxy_HandleApplyComment(t *testing.T) {
assert.False(t, statusUpdater.isCalled)
assert.False(t, commentCreator.isCalled)
assert.True(t, testSignaler.called())
assert.False(t, writer.isCalled)
}

func TestCommentEventWorkerProxy_HandlePlanComment_NoCmds(t *testing.T) {
Expand All @@ -335,6 +342,7 @@ func TestCommentEventWorkerProxy_HandlePlanComment_NoCmds(t *testing.T) {
},
InstallationToken: 123,
}
writer := &mockSnsWriter{}
scheduler := &sync.SynchronousScheduler{Logger: logger}
commentCreator := &mockCommentCreator{}
statusUpdater := &multiMockStatusUpdater{
Expand Down Expand Up @@ -369,7 +377,7 @@ func TestCommentEventWorkerProxy_HandlePlanComment_NoCmds(t *testing.T) {
prSignaler := &mockPRSignaler{
expectedT: t,
}
commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{})
commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, writer, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{})
bufReq := buildRequest(t)
cmd := &command.Comment{
Name: command.Plan,
Expand All @@ -379,6 +387,7 @@ func TestCommentEventWorkerProxy_HandlePlanComment_NoCmds(t *testing.T) {
assert.False(t, statusUpdater.AllCalled())
assert.False(t, commentCreator.isCalled)
assert.False(t, testSignaler.called)
assert.False(t, writer.isCalled)
}

func TestCommentEventWorkerProxy_HandleApplyComment_NoCmds(t *testing.T) {
Expand All @@ -404,6 +413,7 @@ func TestCommentEventWorkerProxy_HandleApplyComment_NoCmds(t *testing.T) {
},
InstallationToken: 123,
}
writer := &mockSnsWriter{}
scheduler := &sync.SynchronousScheduler{Logger: logger}
commentCreator := &mockCommentCreator{}
statusUpdater := &multiMockStatusUpdater{
Expand All @@ -422,7 +432,7 @@ func TestCommentEventWorkerProxy_HandleApplyComment_NoCmds(t *testing.T) {
prSignaler := &mockPRSignaler{
expectedT: t,
}
commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{})
commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, writer, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{})
bufReq := buildRequest(t)
cmd := &command.Comment{
Name: command.Apply,
Expand All @@ -432,6 +442,7 @@ func TestCommentEventWorkerProxy_HandleApplyComment_NoCmds(t *testing.T) {
assert.False(t, statusUpdater.AllCalled())
assert.False(t, commentCreator.isCalled)
assert.False(t, testSignaler.called)
assert.False(t, writer.isCalled)
}

func TestCommentEventWorkerProxy_HandlePlanComment(t *testing.T) {
Expand Down Expand Up @@ -479,6 +490,7 @@ func TestCommentEventWorkerProxy_HandlePlanComment(t *testing.T) {
},
InstallationToken: 123,
}
writer := &mockSnsWriter{}
scheduler := &sync.SynchronousScheduler{Logger: logger}
commentCreator := &mockCommentCreator{}
statusUpdater := &mockStatusUpdater{}
Expand All @@ -488,7 +500,7 @@ func TestCommentEventWorkerProxy_HandlePlanComment(t *testing.T) {
expectedRoots: roots,
expectedPRRequest: prRequest,
}
commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, scheduler, prSignaler, deploySignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{})
commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, writer, scheduler, prSignaler, deploySignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{})
bufReq := buildRequest(t)
cmd := &command.Comment{
Name: command.Plan,
Expand All @@ -499,6 +511,7 @@ func TestCommentEventWorkerProxy_HandlePlanComment(t *testing.T) {
assert.False(t, commentCreator.isCalled)
assert.False(t, deploySignaler.called)
assert.True(t, prSignaler.called)
assert.True(t, writer.isCalled)
}

func TestCommentEventWorkerProxy_HandlePlanCommentAllocatorEnabled(t *testing.T) {
Expand Down Expand Up @@ -546,6 +559,7 @@ func TestCommentEventWorkerProxy_HandlePlanCommentAllocatorEnabled(t *testing.T)
},
InstallationToken: 123,
}
writer := &mockSnsWriter{}
scheduler := &sync.SynchronousScheduler{Logger: logger}
commentCreator := &mockCommentCreator{}
statusUpdater := &mockStatusUpdater{}
Expand All @@ -555,7 +569,7 @@ func TestCommentEventWorkerProxy_HandlePlanCommentAllocatorEnabled(t *testing.T)
expectedRoots: roots,
expectedPRRequest: prRequest,
}
commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{})
commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, writer, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{})
bufReq := buildRequest(t)
cmd := &command.Comment{
Name: command.Plan,
Expand All @@ -565,6 +579,7 @@ func TestCommentEventWorkerProxy_HandlePlanCommentAllocatorEnabled(t *testing.T)
assert.False(t, statusUpdater.isCalled)
assert.False(t, commentCreator.isCalled)
assert.False(t, testSignaler.called)
assert.True(t, writer.isCalled)
assert.True(t, prSignaler.called)
}

Expand Down
43 changes: 43 additions & 0 deletions server/neptune/gateway/event/legacy_comment_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package event

import (
"bytes"
"context"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/config/valid"
"github.com/runatlantis/atlantis/server/legacy/events/command"
"github.com/runatlantis/atlantis/server/legacy/http"
"github.com/runatlantis/atlantis/server/logging"
)

type LegacyCommentHandler struct {
logger logging.Logger
snsWriter Writer
globalCfg valid.GlobalCfg
}

func (p *LegacyCommentHandler) Handle(ctx context.Context, event Comment, cmd *command.Comment, roots []*valid.MergedProjectCfg, request *http.BufferedRequest) error {
// legacy mode should not be handling any type of apply command anymore
if cmd.Name == command.Apply {
return nil
}
// forward everything to sns for now since platform mode doesn't do anything w.r.t to comments atm.
if err := p.ForwardToSns(ctx, request); err != nil {
return errors.Wrap(err, "forwarding request through sns")
}
return nil
}

func (p *LegacyCommentHandler) ForwardToSns(ctx context.Context, request *http.BufferedRequest) error {
buffer := bytes.NewBuffer([]byte{})
if err := request.GetRequestWithContext(ctx).Write(buffer); err != nil {
return errors.Wrap(err, "writing request to buffer")
}

if err := p.snsWriter.WriteWithContext(ctx, buffer.Bytes()); err != nil {
return errors.Wrap(err, "writing buffer to sns")
}
p.logger.InfoContext(ctx, "proxied request to sns")

return nil
}
Loading

0 comments on commit 47e053a

Please sign in to comment.