Skip to content

Commit

Permalink
Add EF and refactor names
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee Wright authored and Lee Wright committed May 16, 2024
1 parent e1350aa commit f38405c
Show file tree
Hide file tree
Showing 20 changed files with 800 additions and 0 deletions.
10 changes: 10 additions & 0 deletions backend/MessagingAdapter/AWS/Producer/ISNSProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace MessagingAdapter.AWS.Producer;

using Amazon.SimpleNotificationService.Model;
using MessagingAdapter.Models;

public interface ISNSProducer
{
Task<PublishResponse> ProduceAsync(EventModel eventModel);
Task<IEnumerable<string>> ListAllTopicsAsync();
}
83 changes: 83 additions & 0 deletions backend/MessagingAdapter/AWS/Producer/SNSProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
namespace MessagingAdapter.AWS.Producer;

using System.Collections.Generic;
using Amazon;
using Amazon.Runtime.CredentialManagement;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using MessagingAdapter.Configuration;
using MessagingAdapter.Models;

public class SNSProducer : ISNSProducer
{
private ILogger<SNSProducer> logger;
private AmazonSimpleNotificationServiceClient client;
private readonly IConfiguration configuration;

public SNSProducer()
{
}



public SNSProducer(ILogger<SNSProducer> logger, IConfiguration configuration)
{
this.logger = logger;
this.configuration = configuration;
var options = this.configuration.GetAWSOptions();
var credentialProfileStoreChain = new CredentialProfileStoreChain();
if (credentialProfileStoreChain.TryGetAWSCredentials(options.Profile, out var credentials))
{
options.Credentials = credentials;
}

this.client = new AmazonSimpleNotificationServiceClient(options.Credentials, RegionEndpoint.CACentral1);


}

public async Task<PublishResponse> ProduceAsync(EventModel eventModel)
{
var attributes = new Dictionary<string, MessageAttributeValue>();

var filterType = (eventModel is DisclosureEventModel model) ? model.DisclosureEventType.ToString() : "unknown";
var value = new MessageAttributeValue
{
DataType = "String",
StringValue = filterType
};

attributes.Add("EventType", value);

var publisherOptions = new PublisherOptions();
this.configuration.GetSection(PublisherOptions.Publisher).Bind(publisherOptions);

var publishRequest = new PublishRequest
{
Message = eventModel.AsJSON(),
MessageAttributes = attributes,
Subject = "Disclosure Test",
TopicArn = publisherOptions.SNSTarget
};
return await this.client.PublishAsync(publishRequest);
}




/// <summary>
/// Get all topic names
/// </summary>
/// <returns></returns>
public async Task<IEnumerable<string>> ListAllTopicsAsync()
{
var topics = await this.client.ListTopicsAsync();
topics.Topics.ForEach(topic =>
{
this.logger.LogInformation($"Topic {topic.TopicArn}");
});

return topics.Topics.Select(t => t.TopicArn.ToString()).ToList();

}
}
12 changes: 12 additions & 0 deletions backend/MessagingAdapter/AWS/Subscriber/ISQSSubscriber.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace MessagingAdapter.AWS.Subscriber;

using System.Threading.Tasks;
using MessagingAdapter.Models;

public interface ISQSSubscriber
{
Task<List<DisclosureEventModel>> GetMessages();
Task<IEnumerable<string>> ListQueuesAsync();
Task<bool> AcknowledgeMessagesAsync(string qUrl, Dictionary<string, string> receiptHandles);

}
211 changes: 211 additions & 0 deletions backend/MessagingAdapter/AWS/Subscriber/SQSSubscriber.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
namespace MessagingAdapter.AWS.Subscriber;

using System.Collections.Generic;
using System.Text.Json;
using System.Text.Json.Serialization;
using Amazon;
using Amazon.Runtime.CredentialManagement;
using Amazon.SQS;
using Amazon.SQS.Model;
using MessagingAdapter.Configuration;
using MessagingAdapter.Data;
using MessagingAdapter.Models;
using Newtonsoft.Json.Linq;

/// <summary>
/// Subscribes to SQS Message Topics and can receive and process messages
/// </summary>
public class SQSSubscriber : ISQSSubscriber, IDisposable
{
private readonly int maxMessages = 1;
private readonly ILogger<SQSSubscriber> logger;
private readonly AmazonSQSClient? client;
private readonly IConfiguration configuration;
private readonly JsonSerializerOptions jsonSerializerOptions;
private MessagingAdapterContext dbContext;

public SQSSubscriber()
{
}

public SQSSubscriber(ILogger<SQSSubscriber> logger, IConfiguration configuration, MessagingAdapterContext dbContext)
{
this.logger = logger;
this.configuration = configuration;
this.dbContext = dbContext;

var options = this.configuration.GetAWSOptions();
var credentialProfileStoreChain = new CredentialProfileStoreChain();

// get AWS Credentials
if (credentialProfileStoreChain.TryGetAWSCredentials(options.Profile, out var credentials))
{
options.Credentials = credentials;
}

// create new AWS SQS Client
this.client = new AmazonSQSClient(options.Credentials, RegionEndpoint.CACentral1);


this.configuration = configuration;

// treat enums as strings not numbers
this.jsonSerializerOptions = new JsonSerializerOptions
{
Converters = { new JsonStringEnumConverter() }
};
}

/// <summary>
/// Get messages from AWS SQS Topic
/// </summary>
/// <param name="qUrl"></param>
/// <param name="waitTime"></param>
/// <returns></returns>
public async Task<List<DisclosureEventModel>> GetMessages()
{
var messages = new List<DisclosureEventModel>();
var receiptHandles = new Dictionary<string, string>();
var waitTime = 2;


var subscriberOptions = new SubscriberOptions();
this.configuration.GetSection(SubscriberOptions.Subscriber).Bind(subscriberOptions);

var qUrl = subscriberOptions.SQSUrl;

this.logger.LogInformation($"Getting messages from {qUrl}");

try
{

var reponse = await this.client.ReceiveMessageAsync(new ReceiveMessageRequest
{
QueueUrl = qUrl,
MaxNumberOfMessages = this.maxMessages,
WaitTimeSeconds = waitTime
// (Could also request attributes, set visibility timeout, etc.)
});

this.logger.LogInformation($"Response message count {reponse.Messages.Count}");

reponse.Messages.ForEach(msg =>
{
// check we havent processed this message previously
if (dbContext.IsMessageProcessedAlready(msg.MessageId))
{
this.logger.LogInformation($"Message {msg.MessageId} already processed");
}
else
{
// get message and convert to EventModel object
var msgBody = msg.Body;
var json = JObject.Parse(msgBody);
var content = json["Message"].ToString();
var eventModel = JsonSerializer.Deserialize<DisclosureEventModel>(content, this.jsonSerializerOptions);
if (eventModel != null)
{
messages.Add(eventModel);
}
receiptHandles.Add(msg.MessageId, msg.ReceiptHandle);
}
});

// track the messages we've received
await this.TrackRecievedMessages(receiptHandles);

// tell subscriber I'm done with these
await this.AcknowledgeMessagesAsync(subscriberOptions.SQSUrl, receiptHandles);

return messages;
}
catch (Exception ex)
{
logger.LogError($"Failed to get messages {ex.Message}", ex);
return null;
}
}

/// <summary>
/// Ensure we track messages so they are only ever processed once
/// </summary>
/// <param name="messageKeys"></param>
/// <returns></returns>
private async Task<int> TrackRecievedMessages(Dictionary<string, string> messageKeys)
{
var processed = 0;

var txn = dbContext.Database.BeginTransaction();
foreach (var messageKey in messageKeys)
{
this.dbContext.IdempotentConsumers.Add(new IdempotentConsumer
{
MessageId = messageKey.Key,
ReceiptId = messageKey.Value,
ProcessedUtc = DateTime.UtcNow,
});
}

var changes = await dbContext.SaveChangesAsync();

if (changes != messageKeys.Count)
{
this.logger.LogError($"Failed to track all changes count should be {messageKeys.Count} and we stored {changes} - rolling back");
await txn.RollbackAsync();
}
else
{
await txn.CommitAsync();
}

return processed;
}


/// <summary>
/// List available queues
/// </summary>
/// <returns></returns>
public async Task<IEnumerable<string>> ListQueuesAsync()
{
var request = new ListQueuesRequest
{
MaxResults = maxMessages
};
var response = await this.client.ListQueuesAsync(request);
return response.QueueUrls;

}

public async Task<bool> AcknowledgeMessagesAsync(string qUrl, Dictionary<string, string> receiptHandles)
{
foreach (var receiptHandle in receiptHandles)
{
logger.LogInformation($"Removing message {receiptHandle.Key}, {receiptHandle.Value}");
var response = await this.client.DeleteMessageAsync(qUrl, receiptHandle.Value);
if (response != null)
{
this.logger.LogInformation($"Delete response {response.HttpStatusCode}");
}
}

return true;
}

public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
if (disposing)
{
this.client?.Dispose();
}
}
}
14 changes: 14 additions & 0 deletions backend/MessagingAdapter/Configuration/KafkaTargetOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace MessagingAdapter.Configuration;

public class KafkaTargetOptions
{
public const string KafkaTargets = "KafkaTargets";
public Target[] Targets { get; set; }
}

public class Target
{
public string MessageType { get; set; } = string.Empty;
public string TargetTopic { get; set; } = string.Empty;

}
9 changes: 9 additions & 0 deletions backend/MessagingAdapter/Configuration/PublisherOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace MessagingAdapter.Configuration;

public class PublisherOptions
{
public const string Publisher = "Publisher";

public string SNSTarget { get; set; }

}
8 changes: 8 additions & 0 deletions backend/MessagingAdapter/Configuration/SubscriberOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace MessagingAdapter.Configuration;

public class SubscriberOptions
{
public const string Subscriber = "Subscriber";

public string SQSUrl { get; set; }
}
Loading

0 comments on commit f38405c

Please sign in to comment.