From f38405c3041095b75a60cba630e6072da040a077 Mon Sep 17 00:00:00 2001 From: Lee Wright <258036@NTTDATA.COM> Date: Thu, 16 May 2024 13:23:59 -0700 Subject: [PATCH] Add EF and refactor names --- .../AWS/Producer/ISNSProducer.cs | 10 + .../AWS/Producer/SNSProducer.cs | 83 +++++++ .../AWS/Subscriber/ISQSSubscriber.cs | 12 + .../AWS/Subscriber/SQSSubscriber.cs | 211 ++++++++++++++++++ .../Configuration/KafkaTargetOptions.cs | 14 ++ .../Configuration/PublisherOptions.cs | 9 + .../Configuration/SubscriberOptions.cs | 8 + .../Controllers/AmazonMessagingController.cs | 90 ++++++++ .../Data/MessagingAdapterContext.cs | 21 ++ .../MessagingAdapter/MessagingAdapter.csproj | 20 ++ .../20240516180359_InitialCreate.Designer.cs | 55 +++++ .../20240516180359_InitialCreate.cs | 43 ++++ .../MessagingAdapterContextModelSnapshot.cs | 52 +++++ .../Models/DisclosureEventModel.cs | 29 +++ backend/MessagingAdapter/Models/EventModel.cs | 16 ++ .../Models/IdempotentConsumer.cs | 13 ++ backend/MessagingAdapter/Program.cs | 68 ++++++ backend/MessagingAdapter/README.md | 8 + backend/MessagingAdapter/appsettings.json | 32 +++ .../MessagingAdapter/edt.notifications.http | 6 + 20 files changed, 800 insertions(+) create mode 100644 backend/MessagingAdapter/AWS/Producer/ISNSProducer.cs create mode 100644 backend/MessagingAdapter/AWS/Producer/SNSProducer.cs create mode 100644 backend/MessagingAdapter/AWS/Subscriber/ISQSSubscriber.cs create mode 100644 backend/MessagingAdapter/AWS/Subscriber/SQSSubscriber.cs create mode 100644 backend/MessagingAdapter/Configuration/KafkaTargetOptions.cs create mode 100644 backend/MessagingAdapter/Configuration/PublisherOptions.cs create mode 100644 backend/MessagingAdapter/Configuration/SubscriberOptions.cs create mode 100644 backend/MessagingAdapter/Controllers/AmazonMessagingController.cs create mode 100644 backend/MessagingAdapter/Data/MessagingAdapterContext.cs create mode 100644 backend/MessagingAdapter/MessagingAdapter.csproj create mode 100644 backend/MessagingAdapter/Migrations/20240516180359_InitialCreate.Designer.cs create mode 100644 backend/MessagingAdapter/Migrations/20240516180359_InitialCreate.cs create mode 100644 backend/MessagingAdapter/Migrations/MessagingAdapterContextModelSnapshot.cs create mode 100644 backend/MessagingAdapter/Models/DisclosureEventModel.cs create mode 100644 backend/MessagingAdapter/Models/EventModel.cs create mode 100644 backend/MessagingAdapter/Models/IdempotentConsumer.cs create mode 100644 backend/MessagingAdapter/Program.cs create mode 100644 backend/MessagingAdapter/README.md create mode 100644 backend/MessagingAdapter/appsettings.json create mode 100644 backend/MessagingAdapter/edt.notifications.http diff --git a/backend/MessagingAdapter/AWS/Producer/ISNSProducer.cs b/backend/MessagingAdapter/AWS/Producer/ISNSProducer.cs new file mode 100644 index 00000000..38dca656 --- /dev/null +++ b/backend/MessagingAdapter/AWS/Producer/ISNSProducer.cs @@ -0,0 +1,10 @@ +namespace MessagingAdapter.AWS.Producer; + +using Amazon.SimpleNotificationService.Model; +using MessagingAdapter.Models; + +public interface ISNSProducer +{ + Task ProduceAsync(EventModel eventModel); + Task> ListAllTopicsAsync(); +} diff --git a/backend/MessagingAdapter/AWS/Producer/SNSProducer.cs b/backend/MessagingAdapter/AWS/Producer/SNSProducer.cs new file mode 100644 index 00000000..6ade6a36 --- /dev/null +++ b/backend/MessagingAdapter/AWS/Producer/SNSProducer.cs @@ -0,0 +1,83 @@ +namespace MessagingAdapter.AWS.Producer; + +using System.Collections.Generic; +using Amazon; +using Amazon.Runtime.CredentialManagement; +using Amazon.SimpleNotificationService; +using Amazon.SimpleNotificationService.Model; +using MessagingAdapter.Configuration; +using MessagingAdapter.Models; + +public class SNSProducer : ISNSProducer +{ + private ILogger logger; + private AmazonSimpleNotificationServiceClient client; + private readonly IConfiguration configuration; + + public SNSProducer() + { + } + + + + public SNSProducer(ILogger logger, IConfiguration configuration) + { + this.logger = logger; + this.configuration = configuration; + var options = this.configuration.GetAWSOptions(); + var credentialProfileStoreChain = new CredentialProfileStoreChain(); + if (credentialProfileStoreChain.TryGetAWSCredentials(options.Profile, out var credentials)) + { + options.Credentials = credentials; + } + + this.client = new AmazonSimpleNotificationServiceClient(options.Credentials, RegionEndpoint.CACentral1); + + + } + + public async Task ProduceAsync(EventModel eventModel) + { + var attributes = new Dictionary(); + + var filterType = (eventModel is DisclosureEventModel model) ? model.DisclosureEventType.ToString() : "unknown"; + var value = new MessageAttributeValue + { + DataType = "String", + StringValue = filterType + }; + + attributes.Add("EventType", value); + + var publisherOptions = new PublisherOptions(); + this.configuration.GetSection(PublisherOptions.Publisher).Bind(publisherOptions); + + var publishRequest = new PublishRequest + { + Message = eventModel.AsJSON(), + MessageAttributes = attributes, + Subject = "Disclosure Test", + TopicArn = publisherOptions.SNSTarget + }; + return await this.client.PublishAsync(publishRequest); + } + + + + + /// + /// Get all topic names + /// + /// + public async Task> ListAllTopicsAsync() + { + var topics = await this.client.ListTopicsAsync(); + topics.Topics.ForEach(topic => + { + this.logger.LogInformation($"Topic {topic.TopicArn}"); + }); + + return topics.Topics.Select(t => t.TopicArn.ToString()).ToList(); + + } +} diff --git a/backend/MessagingAdapter/AWS/Subscriber/ISQSSubscriber.cs b/backend/MessagingAdapter/AWS/Subscriber/ISQSSubscriber.cs new file mode 100644 index 00000000..fdc1daa6 --- /dev/null +++ b/backend/MessagingAdapter/AWS/Subscriber/ISQSSubscriber.cs @@ -0,0 +1,12 @@ +namespace MessagingAdapter.AWS.Subscriber; + +using System.Threading.Tasks; +using MessagingAdapter.Models; + +public interface ISQSSubscriber +{ + Task> GetMessages(); + Task> ListQueuesAsync(); + Task AcknowledgeMessagesAsync(string qUrl, Dictionary receiptHandles); + +} diff --git a/backend/MessagingAdapter/AWS/Subscriber/SQSSubscriber.cs b/backend/MessagingAdapter/AWS/Subscriber/SQSSubscriber.cs new file mode 100644 index 00000000..963e58ae --- /dev/null +++ b/backend/MessagingAdapter/AWS/Subscriber/SQSSubscriber.cs @@ -0,0 +1,211 @@ +namespace MessagingAdapter.AWS.Subscriber; + +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; +using Amazon; +using Amazon.Runtime.CredentialManagement; +using Amazon.SQS; +using Amazon.SQS.Model; +using MessagingAdapter.Configuration; +using MessagingAdapter.Data; +using MessagingAdapter.Models; +using Newtonsoft.Json.Linq; + +/// +/// Subscribes to SQS Message Topics and can receive and process messages +/// +public class SQSSubscriber : ISQSSubscriber, IDisposable +{ + private readonly int maxMessages = 1; + private readonly ILogger logger; + private readonly AmazonSQSClient? client; + private readonly IConfiguration configuration; + private readonly JsonSerializerOptions jsonSerializerOptions; + private MessagingAdapterContext dbContext; + + public SQSSubscriber() + { + } + + public SQSSubscriber(ILogger logger, IConfiguration configuration, MessagingAdapterContext dbContext) + { + this.logger = logger; + this.configuration = configuration; + this.dbContext = dbContext; + + var options = this.configuration.GetAWSOptions(); + var credentialProfileStoreChain = new CredentialProfileStoreChain(); + + // get AWS Credentials + if (credentialProfileStoreChain.TryGetAWSCredentials(options.Profile, out var credentials)) + { + options.Credentials = credentials; + } + + // create new AWS SQS Client + this.client = new AmazonSQSClient(options.Credentials, RegionEndpoint.CACentral1); + + + this.configuration = configuration; + + // treat enums as strings not numbers + this.jsonSerializerOptions = new JsonSerializerOptions + { + Converters = { new JsonStringEnumConverter() } + }; + } + + /// + /// Get messages from AWS SQS Topic + /// + /// + /// + /// + public async Task> GetMessages() + { + var messages = new List(); + var receiptHandles = new Dictionary(); + var waitTime = 2; + + + var subscriberOptions = new SubscriberOptions(); + this.configuration.GetSection(SubscriberOptions.Subscriber).Bind(subscriberOptions); + + var qUrl = subscriberOptions.SQSUrl; + + this.logger.LogInformation($"Getting messages from {qUrl}"); + + try + { + + var reponse = await this.client.ReceiveMessageAsync(new ReceiveMessageRequest + { + QueueUrl = qUrl, + MaxNumberOfMessages = this.maxMessages, + WaitTimeSeconds = waitTime + // (Could also request attributes, set visibility timeout, etc.) + }); + + this.logger.LogInformation($"Response message count {reponse.Messages.Count}"); + + reponse.Messages.ForEach(msg => + { + + // check we havent processed this message previously + if (dbContext.IsMessageProcessedAlready(msg.MessageId)) + { + this.logger.LogInformation($"Message {msg.MessageId} already processed"); + } + else + { + + // get message and convert to EventModel object + var msgBody = msg.Body; + var json = JObject.Parse(msgBody); + var content = json["Message"].ToString(); + var eventModel = JsonSerializer.Deserialize(content, this.jsonSerializerOptions); + if (eventModel != null) + { + messages.Add(eventModel); + } + + receiptHandles.Add(msg.MessageId, msg.ReceiptHandle); + } + }); + + // track the messages we've received + await this.TrackRecievedMessages(receiptHandles); + + // tell subscriber I'm done with these + await this.AcknowledgeMessagesAsync(subscriberOptions.SQSUrl, receiptHandles); + + return messages; + } + catch (Exception ex) + { + logger.LogError($"Failed to get messages {ex.Message}", ex); + return null; + } + } + + /// + /// Ensure we track messages so they are only ever processed once + /// + /// + /// + private async Task TrackRecievedMessages(Dictionary messageKeys) + { + var processed = 0; + + var txn = dbContext.Database.BeginTransaction(); + foreach (var messageKey in messageKeys) + { + this.dbContext.IdempotentConsumers.Add(new IdempotentConsumer + { + MessageId = messageKey.Key, + ReceiptId = messageKey.Value, + ProcessedUtc = DateTime.UtcNow, + }); + } + + var changes = await dbContext.SaveChangesAsync(); + + if (changes != messageKeys.Count) + { + this.logger.LogError($"Failed to track all changes count should be {messageKeys.Count} and we stored {changes} - rolling back"); + await txn.RollbackAsync(); + } + else + { + await txn.CommitAsync(); + } + + return processed; + } + + + /// + /// List available queues + /// + /// + public async Task> ListQueuesAsync() + { + var request = new ListQueuesRequest + { + MaxResults = maxMessages + }; + var response = await this.client.ListQueuesAsync(request); + return response.QueueUrls; + + } + + public async Task AcknowledgeMessagesAsync(string qUrl, Dictionary receiptHandles) + { + foreach (var receiptHandle in receiptHandles) + { + logger.LogInformation($"Removing message {receiptHandle.Key}, {receiptHandle.Value}"); + var response = await this.client.DeleteMessageAsync(qUrl, receiptHandle.Value); + if (response != null) + { + this.logger.LogInformation($"Delete response {response.HttpStatusCode}"); + } + } + + return true; + } + + public void Dispose() + { + this.Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + this.client?.Dispose(); + } + } +} diff --git a/backend/MessagingAdapter/Configuration/KafkaTargetOptions.cs b/backend/MessagingAdapter/Configuration/KafkaTargetOptions.cs new file mode 100644 index 00000000..19f26574 --- /dev/null +++ b/backend/MessagingAdapter/Configuration/KafkaTargetOptions.cs @@ -0,0 +1,14 @@ +namespace MessagingAdapter.Configuration; + +public class KafkaTargetOptions +{ + public const string KafkaTargets = "KafkaTargets"; + public Target[] Targets { get; set; } +} + +public class Target +{ + public string MessageType { get; set; } = string.Empty; + public string TargetTopic { get; set; } = string.Empty; + +} diff --git a/backend/MessagingAdapter/Configuration/PublisherOptions.cs b/backend/MessagingAdapter/Configuration/PublisherOptions.cs new file mode 100644 index 00000000..cc20fa28 --- /dev/null +++ b/backend/MessagingAdapter/Configuration/PublisherOptions.cs @@ -0,0 +1,9 @@ +namespace MessagingAdapter.Configuration; + +public class PublisherOptions +{ + public const string Publisher = "Publisher"; + + public string SNSTarget { get; set; } + +} diff --git a/backend/MessagingAdapter/Configuration/SubscriberOptions.cs b/backend/MessagingAdapter/Configuration/SubscriberOptions.cs new file mode 100644 index 00000000..5a4554e8 --- /dev/null +++ b/backend/MessagingAdapter/Configuration/SubscriberOptions.cs @@ -0,0 +1,8 @@ +namespace MessagingAdapter.Configuration; + +public class SubscriberOptions +{ + public const string Subscriber = "Subscriber"; + + public string SQSUrl { get; set; } +} diff --git a/backend/MessagingAdapter/Controllers/AmazonMessagingController.cs b/backend/MessagingAdapter/Controllers/AmazonMessagingController.cs new file mode 100644 index 00000000..98727ac0 --- /dev/null +++ b/backend/MessagingAdapter/Controllers/AmazonMessagingController.cs @@ -0,0 +1,90 @@ +namespace MessagingAdapter.Controllers; + +using Amazon.SimpleNotificationService; +using Amazon.SimpleNotificationService.Model; +using MessagingAdapter.AWS.Producer; +using MessagingAdapter.AWS.Subscriber; +using MessagingAdapter.Models; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.Configuration; + +[ApiController] +[Route("[controller]")] +public class AmazonMessagingController : ControllerBase +{ + + private readonly ILogger logger; + private IConfiguration configuration; + private ISQSSubscriber subscriber; + private ISNSProducer producer; + + public AmazonMessagingController(ILogger logger, IConfiguration configuration, ISQSSubscriber subscriber, ISNSProducer producer) + { + this.logger = logger; + this.configuration = configuration; + this.subscriber = subscriber; + this.producer = producer; + + } + + public static async Task PublishToTopicAsync( + IAmazonSimpleNotificationService client, + string topicArn, + string messageText) + { + var request = new PublishRequest + { + TopicArn = topicArn, + Message = messageText, + }; + + var response = await client.PublishAsync(request); + + Console.WriteLine($"Successfully published message ID: {response.MessageId}"); + } + + + /// + /// Post a message to the topic + /// + /// + /// + [ProducesResponseType(StatusCodes.Status201Created)] + [ProducesResponseType(StatusCodes.Status400BadRequest)] + [HttpPost("/submit-request", Name = "PublishToSNS")] + public async Task PublishToSNSTopic(DisclosureEventModel eventModel) + { + var response = await this.producer.ProduceAsync(eventModel); + + return response; + } + + /// + /// Get Messages from the topic - coded to pickup from settings + /// + /// + [HttpGet("/messages", Name = "GetMessages")] + public async Task> GetMessagesAsync() + { + var messages = new List(); + try + { + messages = await this.subscriber.GetMessages(); + + } + catch (Exception ex) + { + this.logger.LogError($"Error {ex.Message}"); + } + return messages; + } + + + + [HttpGet("/topics", Name = "GetTopics")] + public async Task> GetTopicsAsync() => await this.producer.ListAllTopicsAsync(); + + [HttpGet("/queues", Name = "GetQueues")] + public async Task> GetQueuesAsync() => await this.subscriber.ListQueuesAsync(); + +} diff --git a/backend/MessagingAdapter/Data/MessagingAdapterContext.cs b/backend/MessagingAdapter/Data/MessagingAdapterContext.cs new file mode 100644 index 00000000..b8788ac6 --- /dev/null +++ b/backend/MessagingAdapter/Data/MessagingAdapterContext.cs @@ -0,0 +1,21 @@ +namespace MessagingAdapter.Data; + +using MessagingAdapter.Models; +using Microsoft.EntityFrameworkCore; + + +public class MessagingAdapterContext(DbContextOptions options) : DbContext(options) +{ + + public DbSet IdempotentConsumers { get; set; } = default!; + + protected override void OnModelCreating(ModelBuilder modelBuilder) + { + modelBuilder.HasDefaultSchema("messaging-adapter"); + base.OnModelCreating(modelBuilder); + + } + + public bool IsMessageProcessedAlready(string messageId) => this.IdempotentConsumers.Where(ic => ic.MessageId == messageId).Any(); + +} diff --git a/backend/MessagingAdapter/MessagingAdapter.csproj b/backend/MessagingAdapter/MessagingAdapter.csproj new file mode 100644 index 00000000..0acfe99a --- /dev/null +++ b/backend/MessagingAdapter/MessagingAdapter.csproj @@ -0,0 +1,20 @@ + + + + net8.0 + enable + 1.1.5 + Linux + enable + + + + + + + + + + + + diff --git a/backend/MessagingAdapter/Migrations/20240516180359_InitialCreate.Designer.cs b/backend/MessagingAdapter/Migrations/20240516180359_InitialCreate.Designer.cs new file mode 100644 index 00000000..a44d099f --- /dev/null +++ b/backend/MessagingAdapter/Migrations/20240516180359_InitialCreate.Designer.cs @@ -0,0 +1,55 @@ +// +using System; +using MessagingAdapter.Data; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; + +#nullable disable + +namespace MessagingAdapter.Migrations +{ + [DbContext(typeof(MessagingAdapterContext))] + [Migration("20240516180359_InitialCreate")] + partial class InitialCreate + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("messaging-adapter") + .HasAnnotation("ProductVersion", "8.0.5") + .HasAnnotation("Relational:MaxIdentifierLength", 63); + + NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + + modelBuilder.Entity("MessagingAdapter.Models.IdempotentConsumer", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("MessageId") + .IsRequired() + .HasColumnType("text"); + + b.Property("ProcessedUtc") + .HasColumnType("timestamp with time zone"); + + b.Property("ReceiptId") + .IsRequired() + .HasColumnType("text"); + + b.HasKey("Id"); + + b.ToTable("IdempotentConsumers", "messaging-adapter"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/backend/MessagingAdapter/Migrations/20240516180359_InitialCreate.cs b/backend/MessagingAdapter/Migrations/20240516180359_InitialCreate.cs new file mode 100644 index 00000000..b32a9571 --- /dev/null +++ b/backend/MessagingAdapter/Migrations/20240516180359_InitialCreate.cs @@ -0,0 +1,43 @@ +using System; +using Microsoft.EntityFrameworkCore.Migrations; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; + +#nullable disable + +namespace MessagingAdapter.Migrations +{ + /// + public partial class InitialCreate : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.EnsureSchema( + name: "messaging-adapter"); + + migrationBuilder.CreateTable( + name: "IdempotentConsumers", + schema: "messaging-adapter", + columns: table => new + { + Id = table.Column(type: "integer", nullable: false) + .Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn), + MessageId = table.Column(type: "text", nullable: false), + ReceiptId = table.Column(type: "text", nullable: false), + ProcessedUtc = table.Column(type: "timestamp with time zone", nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_IdempotentConsumers", x => x.Id); + }); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "IdempotentConsumers", + schema: "messaging-adapter"); + } + } +} diff --git a/backend/MessagingAdapter/Migrations/MessagingAdapterContextModelSnapshot.cs b/backend/MessagingAdapter/Migrations/MessagingAdapterContextModelSnapshot.cs new file mode 100644 index 00000000..799d1863 --- /dev/null +++ b/backend/MessagingAdapter/Migrations/MessagingAdapterContextModelSnapshot.cs @@ -0,0 +1,52 @@ +// +using System; +using MessagingAdapter.Data; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; + +#nullable disable + +namespace MessagingAdapter.Migrations +{ + [DbContext(typeof(MessagingAdapterContext))] + partial class MessagingAdapterContextModelSnapshot : ModelSnapshot + { + protected override void BuildModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("messaging-adapter") + .HasAnnotation("ProductVersion", "8.0.5") + .HasAnnotation("Relational:MaxIdentifierLength", 63); + + NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + + modelBuilder.Entity("MessagingAdapter.Models.IdempotentConsumer", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("MessageId") + .IsRequired() + .HasColumnType("text"); + + b.Property("ProcessedUtc") + .HasColumnType("timestamp with time zone"); + + b.Property("ReceiptId") + .IsRequired() + .HasColumnType("text"); + + b.HasKey("Id"); + + b.ToTable("IdempotentConsumers", "messaging-adapter"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/backend/MessagingAdapter/Models/DisclosureEventModel.cs b/backend/MessagingAdapter/Models/DisclosureEventModel.cs new file mode 100644 index 00000000..8f8b9fd7 --- /dev/null +++ b/backend/MessagingAdapter/Models/DisclosureEventModel.cs @@ -0,0 +1,29 @@ +namespace MessagingAdapter.Models; + +using Newtonsoft.Json; +using Newtonsoft.Json.Converters; + +public class DisclosureEventModel : EventModel +{ + public DisclosureEventType DisclosureEventType { get; set; } = DisclosureEventType.ExportToParticipant; + public int PersonId { get; set; } + public int FromCaseId { get; set; } + public int ToCaseId { get; set; } + public int ToInstanceId { get; set; } + public string PersonKey { get; set; } = string.Empty; + public DisclosureParticipantType DisclosureParticipantType { get; set; } = DisclosureParticipantType.Accused; +} + +[JsonConverter(typeof(StringEnumConverter))] +public enum DisclosureEventType +{ + ExportToParticipant, + ExportToParticipantDeleted +} + +[JsonConverter(typeof(StringEnumConverter))] +public enum DisclosureParticipantType +{ + Accused, + DefenceCounsel +} diff --git a/backend/MessagingAdapter/Models/EventModel.cs b/backend/MessagingAdapter/Models/EventModel.cs new file mode 100644 index 00000000..cdb683bf --- /dev/null +++ b/backend/MessagingAdapter/Models/EventModel.cs @@ -0,0 +1,16 @@ +namespace MessagingAdapter.Models; +using Newtonsoft.Json; + + +public abstract class EventModel +{ + public int Id { get; set; } + public string CreatedByUsername { get; set; } = string.Empty; + public DateTime CreatedUtc { get; set; } + + public string AsJSON() + { + var json = JsonConvert.SerializeObject(this); + return json; + } +} diff --git a/backend/MessagingAdapter/Models/IdempotentConsumer.cs b/backend/MessagingAdapter/Models/IdempotentConsumer.cs new file mode 100644 index 00000000..0d8ebb96 --- /dev/null +++ b/backend/MessagingAdapter/Models/IdempotentConsumer.cs @@ -0,0 +1,13 @@ +namespace MessagingAdapter.Models; + +using System.ComponentModel.DataAnnotations; + +public class IdempotentConsumer +{ + [Key] + public int Id { get; set; } + public string MessageId { get; set; } = string.Empty; + public string ReceiptId { get; set; } = string.Empty; + public DateTime ProcessedUtc { get; set; } +} + diff --git a/backend/MessagingAdapter/Program.cs b/backend/MessagingAdapter/Program.cs new file mode 100644 index 00000000..d6fa0361 --- /dev/null +++ b/backend/MessagingAdapter/Program.cs @@ -0,0 +1,68 @@ + +namespace MessagingAdapter; + +using System.Text.Json.Serialization; +using MessagingAdapter.AWS.Producer; +using MessagingAdapter.AWS.Subscriber; +using MessagingAdapter.Configuration; +using MessagingAdapter.Data; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Migrations; + +public class Program +{ + private const string DbConnection = "DataStoreConnection"; + + public static void Main(string[] args) + { + var builder = WebApplication.CreateBuilder(args); + + + + // Add services to the container. + builder.Services.Configure( + builder.Configuration.GetSection(PublisherOptions.Publisher)); + builder.Services.AddControllers().AddJsonOptions(opts => + { + var enumConverter = new JsonStringEnumConverter(); + opts.JsonSerializerOptions.Converters.Add(enumConverter); + }); + // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle + builder.Services.AddEndpointsApiExplorer(); + builder.Services.AddSwaggerGen(); + + + + var dbConnection = builder.Configuration.GetValue(DbConnection); + + + builder.Services.AddDbContext(options => options + .UseNpgsql(dbConnection, npg => + { + npg.MigrationsHistoryTable(HistoryRepository.DefaultTableName, "messaging-adapter"); + npg.UseNodaTime(); + }).EnableSensitiveDataLogging(sensitiveDataLoggingEnabled: false)); + + builder.Services.AddScoped(); + builder.Services.AddScoped(); + + + var app = builder.Build(); + + + + // Configure the HTTP request pipeline. + if (app.Environment.IsDevelopment()) + { + app.UseSwagger(); + app.UseSwaggerUI(); + } + + app.UseHttpsRedirection(); + app.UseAuthorization(); + + app.MapControllers(); + + app.Run(); + } +} diff --git a/backend/MessagingAdapter/README.md b/backend/MessagingAdapter/README.md new file mode 100644 index 00000000..a925d2a9 --- /dev/null +++ b/backend/MessagingAdapter/README.md @@ -0,0 +1,8 @@ +# DIAM Notification Adapter + +## Process incoming AWS messages and adapt and produce to internal message bus + +This application will receive messages from AWS SQS (and push test messages to SNS). +When a message is received the service will place a new kafka message onto a topic that can then be consumed by other services. + +This separates out the AWS messaging from the DIAM messaging and allows other Justice apps to process those messages from Kafka without having to consume from AWS. diff --git a/backend/MessagingAdapter/appsettings.json b/backend/MessagingAdapter/appsettings.json new file mode 100644 index 00000000..14ccd5f0 --- /dev/null +++ b/backend/MessagingAdapter/appsettings.json @@ -0,0 +1,32 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "DataStoreConnection": "Host=fedora;Port=5444;Database=messaging-adapter;Username=messaging-adapter;Password=", + "AWS": { + "Profile": "nimbus", + "Region": "ca-central-1" + }, + "KafkaTargets": [ + { + "MessageType": "DisclosureEvent", + "TargetTopic": "edt-incoming-disclosure-event" + } + + ], + "KafkaCluster": { + "BootstrapServers": "", + "SaslOauthbearerTokenEndpointUrl": "", + "SaslOauthbearerProducerClientId": "kafka-producer", + "SaslOauthbearerProducerClientSecret": "", + "SaslOauthbearerConsumerClientId": "kafka-consumer", + "SaslOauthbearerConsumerClientSecret": "", + "SslCaLocation": "C:\\certs\\pidp\\ca.crt", + "SslCertificateLocation": "C:\\certs\\pidp\\client\\ca.crt", + "SslKeyLocation": "C:\\certs\\pidp\\client\\ca.key" + }, + "AllowedHosts": "*" +} diff --git a/backend/MessagingAdapter/edt.notifications.http b/backend/MessagingAdapter/edt.notifications.http new file mode 100644 index 00000000..2260d696 --- /dev/null +++ b/backend/MessagingAdapter/edt.notifications.http @@ -0,0 +1,6 @@ +@edt.notifications_HostAddress = http://localhost:5261 + +GET {{edt.notifications_HostAddress}}/weatherforecast/ +Accept: application/json + +###