Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bcpsdems 1902 cornet services #605

Merged
merged 6 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/ApprovalFlow/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ private static void CreateLogger(
Console.WriteLine("Creating the logging directory failed: {0}", e.ToString());
}


var name = Assembly.GetExecutingAssembly().GetName();
var outputTemplate = "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}";

var loggerConfiguration = new LoggerConfiguration()
.MinimumLevel.Information()
.Filter.ByExcluding("RequestPath like '/health%'")
.Filter.ByExcluding("RequestPath like '/metrics%'")
.MinimumLevel.Override("Microsoft", LogEventLevel.Warning)
.MinimumLevel.Override("Microsoft.Hosting.Lifetime", LogEventLevel.Information)
.MinimumLevel.Override("System", LogEventLevel.Warning)
Expand Down
17 changes: 5 additions & 12 deletions backend/DIAMCornetService/Controllers/NotificationContoller.cs
Original file line number Diff line number Diff line change
@@ -1,32 +1,25 @@
namespace DIAMCornetService.Controllers;

using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;

[ApiController]
[Route("[controller]")]
public class NotificationContoller : ControllerBase
[Authorize]
public class NotificationContoller(Services.INotificationService notificationService) : ControllerBase
{
private readonly ILogger<NotificationContoller> _logger;
private readonly DIAMCornetService.Services.INotificationService _notificationService;

public NotificationContoller(ILogger<NotificationContoller> logger, DIAMCornetService.Services.INotificationService notificationService)
{
this._logger = logger;
this._notificationService = notificationService;
}

[HttpGet(Name = "GenerateTestNotification")]
public async Task<ActionResult<string>> PublishTestNotification([FromQuery] string participantId, [FromQuery] string messageText)
{
try
{
var publishedGUID = await this._notificationService.PublishTestNotificationAsync(participantId, messageText);
var publishedGUID = await notificationService.PublishTestNotificationAsync(participantId, messageText);
return publishedGUID;
}
catch (Exception ex)
{
this._logger.LogError(ex, "Failed to publish test notification");
return StatusCode(500, $"Failed to publish test notification: {ex.Message}");
return this.StatusCode(500, $"Failed to publish test notification: {ex.Message}");
}
}
}
15 changes: 15 additions & 0 deletions backend/DIAMCornetService/DIAMCornetServiceConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace DIAMCornetService;

using Common.Authorization;

public class DIAMCornetServiceConfiguration
{

Expand All @@ -9,7 +11,14 @@ public class DIAMCornetServiceConfiguration
public KafkaClusterConfiguration KafkaCluster { get; set; } = new();
public ConnectionStringConfiguration ConnectionStrings { get; set; } = new();
public CornetConfiguration CornetService { get; set; } = new();
public KeycloakConfiguration Keycloak { get; set; } = new();
public SplunkConfiguration SplunkConfig { get; set; } = new SplunkConfiguration();

public class SplunkConfiguration
{
public string Host { get; set; } = string.Empty;
public string CollectorToken { get; set; } = string.Empty;
}
public class KafkaClusterConfiguration
{
public string Url { get; set; } = string.Empty;
Expand All @@ -30,6 +39,12 @@ public class KafkaClusterConfiguration

}

public class KeycloakConfiguration
{
public string RealmUrl { get; set; } = string.Empty;
public string WellKnownConfig => KeycloakUrls.WellKnownConfig(this.RealmUrl);
}

public class ConnectionStringConfiguration
{
public string DIAMCornetDatabase { get; set; } = string.Empty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@ namespace DIAMCornetService.Features.MessageConsumer;
using System.Threading;
using Confluent.Kafka;

public class IncomingDisclosureNotificationConsumer(ConsumerConfig config, IServiceScopeFactory serviceScopeFactory, DIAMCornetServiceConfiguration configuration)
public class IncomingDisclosureNotificationConsumer(ILogger<IncomingDisclosureNotificationConsumer> logger, ConsumerConfig config, DIAMCornetServiceConfiguration configuration)
{
private readonly IServiceScopeFactory serviceScopeFactory = serviceScopeFactory;
private readonly ConsumerConfig config = config;
private readonly DIAMCornetServiceConfiguration configuration = configuration;


public void StartConsuming(CancellationToken cancellationToken)
{


using (var consumer = new ConsumerBuilder<Ignore, string>(this.config).Build())
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
// subscribe to this topic
consumer.Subscribe(this.configuration.KafkaCluster.ParticipantCSNumberMappingTopic);
consumer.Subscribe(configuration.KafkaCluster.ParticipantCSNumberMappingTopic);

try
{
Expand All @@ -27,7 +25,7 @@ public void StartConsuming(CancellationToken cancellationToken)
var consumeResult = consumer.Consume(cancellationToken);

// Handle the message, e.g., process consumeResult.Message
Console.WriteLine($"Received message: {consumeResult.Message.Value}");
logger.LogInformation($"Received message: {consumeResult.Message.Value}");
}
}
catch (OperationCanceledException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,11 @@ namespace DIAMCornetService.Features.MessageConsumer;
using System.Threading.Tasks;
using Common.Kafka;
using DIAMCornetService.Data;
using DIAMCornetService.Exceptions;
using DIAMCornetService.Models;
using DIAMCornetService.Services;
using global::DIAMCornetService.Models;

public class IncomingDisclosureNotificationHandler : IKafkaHandler<string, IncomingDisclosureNotificationModel>
public class IncomingDisclosureNotificationHandler(ILogger<IncomingDisclosureNotificationHandler> logger, ICornetService cornetService, DIAMCornetDbContext context) : IKafkaHandler<string, IncomingDisclosureNotificationModel>
{
private readonly ILogger<IncomingDisclosureNotificationHandler> logger;
private ICornetService cornetService;
private DIAMCornetDbContext context;

public IncomingDisclosureNotificationHandler(ILogger<IncomingDisclosureNotificationHandler> logger, ICornetService cornetService, DIAMCornetDbContext context)
{
this.logger = logger;
this.cornetService = cornetService;
this.context = context;
}

public async Task<Task> HandleAsync(string consumerName, string key, IncomingDisclosureNotificationModel value)
{
Expand All @@ -28,7 +17,7 @@ public async Task<Task> HandleAsync(string consumerName, string key, IncomingDis
try
{
// check we havent consumed this message before
var processedAlready = await this.context.HasMessageBeenProcessed(key, consumerName);
var processedAlready = await context.HasMessageBeenProcessed(key, consumerName);
if (processedAlready)
{
logger.LogWarning($"Already processed message with key {key}");
Expand All @@ -41,42 +30,45 @@ public async Task<Task> HandleAsync(string consumerName, string key, IncomingDis
incomingMessage.MessageTimestamp = DateTime.UtcNow;

// we'll track message processing time and responses here
this.context.Add(incomingMessage);
context.Add(incomingMessage);

logger.LogInformation("Message received on {0} with key {1}", consumerName, key);

// this is where we'll produce a response
var response = await this.cornetService.PublishCSNumberResponseAsync(value.ParticipantId);
incomingMessage.CSNumber = response["CSNumber"];
incomingMessage.ProcessResponseId = response["id"];
var response = await cornetService.LookupCSNumberForParticipant(value.ParticipantId);

// assuming no error on CS Number lookup
// change to if error = true
if (false)
if (response.ErrorType != null)
{
incomingMessage.ErrorMessage = response["ErrorMessage"];

}
// submit notification to users
var notificationResponse = await this.cornetService.SubmitNotificationToEServices(response["CSNumber"], value.MessageText);

// if publish returned an error
if (false)
{
// log error

return Task.FromException(new CornetException("Failed to publish notification"));
// error on getting the CS Number
response = await cornetService.PublishErrorsToDIAMAsync(response);
}
else
{

//add to tell message has been processed by consumer
await this.context.AddIdempotentConsumer(messageId: key, consumer: consumerName);
incomingMessage.CSNumber = response.CSNumber;

incomingMessage.CompletedTimestamp = DateTime.UtcNow;
// submit notification to users
response = await cornetService.SubmitNotificationToEServices(response, value.MessageText);

// if submission was good we'll notify DIAM to provision the account
if (response.ErrorType == null)
{
response = await cornetService.PublishNotificationToDIAMAsync(response);
}
// otherwise we'll notify the business of the errors
else
{
response = await cornetService.PublishErrorsToDIAMAsync(response);
}

return Task.CompletedTask;
}
//add to tell message has been processed by consumer
await context.AddIdempotentConsumer(messageId: key, consumer: consumerName);

incomingMessage.CompletedTimestamp = DateTime.UtcNow;

return Task.CompletedTask;

}
catch (Exception ex)
{
Expand All @@ -86,7 +78,7 @@ public async Task<Task> HandleAsync(string consumerName, string key, IncomingDis
}
finally
{
await this.context.SaveChangesAsync();
await context.SaveChangesAsync();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,29 @@ namespace DIAMCornetService.Features.IncomingDisclosureNotifications;
using System.Net;
using DIAMCornetService.Infrastructure;
using DIAMCornetService.Models;
public class IncomingNotificationConsumer : BackgroundService

public class IncomingNotificationConsumer(ILogger<IncomingNotificationConsumer> logger, IKafkaConsumer<string, IncomingDisclosureNotificationModel> kafkaConsumer, DIAMCornetServiceConfiguration config) : BackgroundService
{
private readonly IKafkaConsumer<string, IncomingDisclosureNotificationModel> consumer;

private readonly DIAMCornetServiceConfiguration config;
public IncomingNotificationConsumer(IKafkaConsumer<string, IncomingDisclosureNotificationModel> kafkaConsumer, DIAMCornetServiceConfiguration config)
{
this.consumer = kafkaConsumer;
this.config = config;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{

Serilog.Log.Information("Starting consumer {0}", this.config.KafkaCluster.DisclosureNotificationTopic);
logger.LogInformation("Starting consumer {0}", config.KafkaCluster.DisclosureNotificationTopic);
try
{
await this.consumer.Consume(this.config.KafkaCluster.DisclosureNotificationTopic, stoppingToken);
await kafkaConsumer.Consume(config.KafkaCluster.DisclosureNotificationTopic, stoppingToken);
}
catch (Exception ex)
{
Serilog.Log.Warning($"{(int)HttpStatusCode.InternalServerError} ConsumeFailedOnTopic - {this.config.KafkaCluster.DisclosureNotificationTopic}, {ex}");
logger.LogWarning($"{(int)HttpStatusCode.InternalServerError} ConsumeFailedOnTopic - {config.KafkaCluster.DisclosureNotificationTopic}, {ex}");
}
}

public override void Dispose()
{
this.consumer.Close();
this.consumer.Dispose();
kafkaConsumer.Close();
kafkaConsumer.Dispose();

base.Dispose();
GC.SuppressFinalize(this);
Expand Down
2 changes: 2 additions & 0 deletions backend/DIAMCornetService/Infrastructure/AccessTokenClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ public async Task<string> GetAccessTokenAsync(ClientCredentialsTokenRequest requ
var response = await this.client.RequestClientCredentialsTokenAsync(request);
return response.AccessToken;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
namespace DIAMCornetService.Infrastructure;

using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.IdentityModel.Tokens;
using Newtonsoft.Json;

public static class AuthorizationConfiguration
{
public static IServiceCollection ConfigureAuthorization(this IServiceCollection services, DIAMCornetServiceConfiguration config)
{
services.AddAuthentication(option =>
{
option.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
option.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
})
.AddJwtBearer(options =>
{
options.Authority = config.Keycloak.RealmUrl;
options.RequireHttpsMetadata = false;
options.Audience = "DIAM-INTERNAL";
options.MetadataAddress = config.Keycloak.WellKnownConfig;
options.TokenValidationParameters = new TokenValidationParameters()
{
ValidateIssuerSigningKey = true,
ValidateIssuer = false,
ValidateAudience = false,
ValidAlgorithms = new List<string>() { "RS256" }
};
options.Events = new JwtBearerEvents
{
OnTokenValidated = context =>
{
return Task.CompletedTask;
},
OnAuthenticationFailed = context =>
{
context.Response.OnStarting(async () =>
{
context.NoResult();
context.Response.StatusCode = StatusCodes.Status401Unauthorized;
context.Response.ContentType = "application/json";
var response =
JsonConvert.SerializeObject("The access token provided is not valid.");
if (context.Exception.GetType() == typeof(SecurityTokenExpiredException))
{
context.Response.Headers.Add("Token-Expired", "true");
response =
JsonConvert.SerializeObject("The access token provided has expired.");
}
await context.Response.WriteAsync(response);
});

//context.HandleResponse();
//context.Response.WriteAsync(response).Wait();
return Task.CompletedTask;

},
OnForbidden = context =>
{
return Task.CompletedTask;
},
OnChallenge = context =>
{
context.HandleResponse();
context.Response.StatusCode = StatusCodes.Status401Unauthorized;
context.Response.ContentType = "application/json";

if (string.IsNullOrEmpty(context.Error))
context.Error = "invalid_token";
if (string.IsNullOrEmpty(context.ErrorDescription))
context.ErrorDescription = "This request requires a valid JWT access token to be provided";

return context.Response.WriteAsync(JsonConvert.SerializeObject(new
{
error = context.Error,
error_description = context.ErrorDescription
}));
}
};
});

return services;
}
}
Loading