Skip to content

Commit

Permalink
Structural refactor - events
Browse files Browse the repository at this point in the history
  • Loading branch information
whytheplatypus committed Oct 16, 2024
1 parent b6564e0 commit 689ebe4
Show file tree
Hide file tree
Showing 10 changed files with 319 additions and 296 deletions.
3 changes: 2 additions & 1 deletion upload-server/cmd/cli/reporters.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package cli

import (
"context"

"github.com/cdcgov/data-exchange-upload/upload-server/internal/appconfig"
"github.com/cdcgov/data-exchange-upload/upload-server/internal/event"
"github.com/cdcgov/data-exchange-upload/upload-server/internal/health"
"github.com/cdcgov/data-exchange-upload/upload-server/pkg/reports"
)

func InitReporters(ctx context.Context, appConfig appconfig.AppConfig) error {
reports.Register(&event.MemoryPublisher[*reports.Report]{
reports.Register(&event.FilePublisher[*reports.Report]{
Dir: appConfig.LocalReportsFolder,
})

Expand Down
3 changes: 2 additions & 1 deletion upload-server/cmd/cli/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package cli

import (
"encoding/json"
"github.com/cdcgov/data-exchange-upload/upload-server/pkg/metadata"
"io"
"net/http"

"github.com/cdcgov/data-exchange-upload/upload-server/pkg/metadata"

"github.com/cdcgov/data-exchange-upload/upload-server/internal/delivery"
"github.com/cdcgov/data-exchange-upload/upload-server/internal/event"
)
Expand Down
140 changes: 133 additions & 7 deletions upload-server/internal/event/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ package event

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
"github.com/cdcgov/data-exchange-upload/upload-server/internal/appconfig"
"net"
"github.com/cdcgov/data-exchange-upload/upload-server/internal/models"
"nhooyr.io/websocket"
)

Expand All @@ -27,7 +32,7 @@ func NewAMQPServiceBusClient(connString string) (*azservicebus.Client, error) {
func NewAzurePublisher[T Identifiable](ctx context.Context, pubConn appconfig.AzureQueueConfig) (*AzurePublisher[T], error) {
client, err := NewAMQPServiceBusClient(pubConn.ConnectionString)
if err != nil {
logger.Error("failed to connect to event service bus", "error", err)
slog.Error("failed to connect to event service bus", "error", err)
return nil, err
}
queueOrTopic := pubConn.Queue
Expand All @@ -36,12 +41,12 @@ func NewAzurePublisher[T Identifiable](ctx context.Context, pubConn appconfig.Az
}
sender, err := client.NewSender(queueOrTopic, nil)
if err != nil {
logger.Error("failed to configure event publisher", "error", err)
slog.Error("failed to configure event publisher", "error", err)
return nil, err
}
adminClient, err := admin.NewClientFromConnectionString(pubConn.ConnectionString, nil)
if err != nil {
logger.Error("failed to connect to service bus admin client", "error", err)
slog.Error("failed to connect to service bus admin client", "error", err)
return nil, err
}

Expand All @@ -53,20 +58,74 @@ func NewAzurePublisher[T Identifiable](ctx context.Context, pubConn appconfig.Az
}, nil
}

type AzurePublisher[T Identifiable] struct {
Context context.Context
Sender *azservicebus.Sender
Config appconfig.AzureQueueConfig
AdminClient *admin.Client
}

func (ap *AzurePublisher[T]) Publish(ctx context.Context, event T) error {
b, err := json.Marshal(event)
if err != nil {
return err
}

return ap.Sender.SendMessage(ctx, &azservicebus.Message{
Body: b,
}, nil)
}

func (ap *AzurePublisher[T]) Close() error {
return ap.Sender.Close(ap.Context)
}

func (ap *AzurePublisher[T]) Health(ctx context.Context) (rsp models.ServiceHealthResp) {
rsp.Status = models.STATUS_UP
rsp.HealthIssue = models.HEALTH_ISSUE_NONE

if ap.Config.Queue != "" {
rsp.Service = fmt.Sprintf("Event Publishing %s", ap.Config.Queue)
queueResp, err := ap.AdminClient.GetQueue(ctx, ap.Config.Queue, nil)
if err != nil {
return rsp.BuildErrorResponse(err)
}
if queueResp == nil {
return rsp.BuildErrorResponse(fmt.Errorf("nil queue response"))
}
if *queueResp.Status != admin.EntityStatusActive {
return rsp.BuildErrorResponse(fmt.Errorf("service bus queue %s status: %s", ap.Config.Queue, *queueResp.Status))
}
}

if ap.Config.Topic != "" {
rsp.Service = fmt.Sprintf("Event Publishing %s", ap.Config.Topic)
topicResp, err := ap.AdminClient.GetTopic(ctx, ap.Config.Topic, nil)
if err != nil {
return rsp.BuildErrorResponse(err)
}
if *topicResp.Status != admin.EntityStatusActive {
return rsp.BuildErrorResponse(fmt.Errorf("service bus topic %s status: %s", ap.Config.Topic, *topicResp.Status))
}
}

return rsp
}

func NewAzureSubscriber[T Identifiable](ctx context.Context, subConn appconfig.AzureQueueConfig) (*AzureSubscriber[T], error) {
client, err := NewAMQPServiceBusClient(subConn.ConnectionString)
if err != nil {
logger.Error("failed to connect to event service bus", "error", err)
slog.Error("failed to connect to event service bus", "error", err)
return nil, err
}
receiver, err := client.NewReceiverForSubscription(subConn.Topic, subConn.Subscription, nil)
if err != nil {
logger.Error("failed to configure event subscriber", "error", err)
slog.Error("failed to configure event subscriber", "error", err)
return nil, err
}
adminClient, err := admin.NewClientFromConnectionString(subConn.ConnectionString, nil)
if err != nil {
logger.Error("failed to connect to service bus admin client", "error", err)
slog.Error("failed to connect to service bus admin client", "error", err)
return nil, err
}
return &AzureSubscriber[T]{
Expand All @@ -76,3 +135,70 @@ func NewAzureSubscriber[T Identifiable](ctx context.Context, subConn appconfig.A
AdminClient: adminClient,
}, nil
}

type AzureSubscriber[T Identifiable] struct {
Context context.Context
Receiver *azservicebus.Receiver
Config appconfig.AzureQueueConfig
AdminClient *admin.Client
}

func (as *AzureSubscriber[T]) GetBatch(ctx context.Context, max int) ([]T, error) {
msgs, err := as.Receiver.ReceiveMessages(ctx, max, nil)
if err != nil {
return nil, err
}

var batch []T
for _, m := range msgs {
slog.Info("received event", "event", m.Body)

var e T
e, err := NewEventFromServiceBusMessage[T](m)
if err != nil {
return nil, err
}
batch = append(batch, e)
}

return batch, nil
}

func (as *AzureSubscriber[T]) HandleSuccess(ctx context.Context, e T) error {
if e.OrigMessage() == nil {
return fmt.Errorf("malformed event %+v", e)
}
err := as.Receiver.CompleteMessage(ctx, e.OrigMessage(), nil)
if err != nil {
slog.Error("failed to ack event", "error", err)
return err
}
slog.Info("successfully handled event", "event ID", e.Identifier(), "event type", e.Type())
return nil
}

func (as *AzureSubscriber[T]) HandleError(ctx context.Context, e T, handlerError error) error {
slog.Error("failed to handle event", "event ID", e.Identifier(), "event type", e.Type(), "error", handlerError.Error())
return as.Receiver.DeadLetterMessage(ctx, e.OrigMessage(), nil)
}

func (as *AzureSubscriber[T]) Close() error {
return as.Receiver.Close(as.Context)
}

func (as *AzureSubscriber[T]) Health(ctx context.Context) (rsp models.ServiceHealthResp) {
rsp.Service = fmt.Sprintf("%s Event Subscriber", as.Config.Subscription)
rsp.Status = models.STATUS_UP
rsp.HealthIssue = models.HEALTH_ISSUE_NONE

subResp, err := as.AdminClient.GetSubscription(ctx, as.Config.Topic, as.Config.Subscription, nil)
if err != nil {
return rsp.BuildErrorResponse(err)
}

if *subResp.Status != admin.EntityStatusActive {
return rsp.BuildErrorResponse(fmt.Errorf("service bus subscription %s status: %s", as.Config.Subscription, *subResp.Status))
}

return rsp
}
23 changes: 4 additions & 19 deletions upload-server/internal/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package event

import (
"encoding/json"
"fmt"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

const FileReadyEventType = "FileReady"

var MaxRetries int
var FileReadyChan chan *FileReady
var FileReadyPublisher Publishers[*FileReady]

var MaxRetries = 5

type Retryable interface {
RetryCount() int
Expand Down Expand Up @@ -74,22 +75,6 @@ func (fr *FileReady) Identifier() string {
return fr.UploadId
}

func InitFileReadyChannel() {
FileReadyChan = make(chan *FileReady)
}

func CloseFileReadyChannel() {
close(FileReadyChan)
}

func GetChannel[T Identifiable]() (chan T, error) {
if r, ok := any(FileReadyChan).(chan T); ok {
return r, nil
}

return nil, fmt.Errorf("channel not found")
}

func NewFileReadyEvent(uploadId string, metadata map[string]string, target string) *FileReady {
return &FileReady{
Event: Event{
Expand Down
37 changes: 37 additions & 0 deletions upload-server/internal/event/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package event

import (
"context"
"encoding/json"
"os"
"path/filepath"
)

const TypeSeparator = "_"

type FilePublisher[T Identifiable] struct {
Dir string
}

func (mp *FilePublisher[T]) Publish(_ context.Context, event T) error {
err := os.MkdirAll(mp.Dir, 0750)
if err != nil && !os.IsExist(err) {
return err
}

filename := filepath.Join(mp.Dir, event.Identifier()+TypeSeparator+event.Type())
f, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer f.Close()

// write event to file.
encoder := json.NewEncoder(f)
err = encoder.Encode(event)
if err != nil {
return err
}

return nil
}
95 changes: 95 additions & 0 deletions upload-server/internal/event/memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package event

import (
"context"
"errors"
"fmt"
"log/slog"

"github.com/cdcgov/data-exchange-upload/upload-server/internal/models"
)

var FileReadyChan chan *FileReady

func InitFileReadyChannel() {
FileReadyChan = make(chan *FileReady)
}

func CloseFileReadyChannel() {
close(FileReadyChan)
}

func GetChannel[T Identifiable]() (chan T, error) {
if r, ok := any(FileReadyChan).(chan T); ok {
return r, nil
}

return nil, fmt.Errorf("channel not found")
}

type MemorySubscriber[T Identifiable] struct {
Chan chan T
}

func (ms *MemorySubscriber[T]) GetBatch(ctx context.Context, _ int) ([]T, error) {
select {
case <-ctx.Done():
return nil, nil
case evt := <-ms.Chan:
return []T{evt}, nil
}
}

func (ms *MemorySubscriber[T]) HandleSuccess(_ context.Context, e T) error {
slog.Info("successfully handled event", "event", e)
return nil
}

func (ms *MemorySubscriber[T]) HandleError(_ context.Context, e T, err error) error {
slog.Error("failed to handle event", "event", e, "error", err.Error())
if e.RetryCount() < MaxRetries {
e.IncrementRetryCount()
// Retrying in a separate go routine so this doesn't block on channel write.
go func() {
ms.Chan <- e
}()
}
return nil
}

func (ms *MemorySubscriber[T]) Close() error {
slog.Info("closing in-memory subscriber")
return nil
}

func (ms *MemorySubscriber[T]) Health(_ context.Context) (rsp models.ServiceHealthResp) {
rsp.Service = "Memory Subscriber"
rsp.Status = models.STATUS_UP
rsp.HealthIssue = models.HEALTH_ISSUE_NONE
return rsp
}

type MemoryPublisher[T Identifiable] struct {
Chan chan T
}

func (mp *MemoryPublisher[T]) Publish(_ context.Context, event T) error {
if mp.Chan != nil {
go func() {
mp.Chan <- event
}()
return nil
}
return errors.New("No event channel found")
}

func (mp *MemoryPublisher[T]) Close() error {
return nil
}

func (mp *MemoryPublisher[T]) Health(_ context.Context) (rsp models.ServiceHealthResp) {
rsp.Service = "Memory Publisher"
rsp.Status = models.STATUS_UP
rsp.HealthIssue = models.HEALTH_ISSUE_NONE
return rsp
}
Loading

0 comments on commit 689ebe4

Please sign in to comment.