Skip to content

Commit

Permalink
Merge pull request #111 from awslabs/dev
Browse files Browse the repository at this point in the history
Merge dev to main for next beta release
  • Loading branch information
philasmar authored Mar 20, 2024
2 parents fc8e967 + 165bf38 commit 9c05f1c
Show file tree
Hide file tree
Showing 43 changed files with 1,364 additions and 88 deletions.
7 changes: 7 additions & 0 deletions AWS.Messaging.sln
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AWS.Messaging.Tests.LambdaF
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AWS.Messaging.Benchmarks", "test\AWS.Messaging.Benchmarks\AWS.Messaging.Benchmarks.csproj", "{143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PollyIntegration", "sampleapps\PollyIntegration\PollyIntegration.csproj", "{86896246-B032-4D34-82BE-CD5ACB6E43F9}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -87,6 +89,10 @@ Global
{143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB}.Release|Any CPU.Build.0 = Release|Any CPU
{86896246-B032-4D34-82BE-CD5ACB6E43F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{86896246-B032-4D34-82BE-CD5ACB6E43F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{86896246-B032-4D34-82BE-CD5ACB6E43F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{86896246-B032-4D34-82BE-CD5ACB6E43F9}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -103,6 +109,7 @@ Global
{C529DC6E-72DA-49ED-908A-21DBC40F26C0} = {2D0A561B-0B97-4259-8603-3AF5437BB652}
{F7A1B9DE-86DE-4F70-A2CD-628E56FE5BAD} = {80DB2C77-6ADD-4A60-B27D-763BDF9659D3}
{143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB} = {80DB2C77-6ADD-4A60-B27D-763BDF9659D3}
{86896246-B032-4D34-82BE-CD5ACB6E43F9} = {1AA8985B-897C-4BD5-9735-FD8B33FEBFFB}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7B2B759D-6455-4089-8173-3F1619567B36}
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

### Release 2024-03-20
* **AWS.Messaging (0.3.0-beta)**
* Added back-off logic to the SQS Poller that can perform Exponential, Interval or disable back-offs entirely. The SQS Poller will now back-off before attempting to reach SQS in case of an exception.
* Added support for SourceLink
* **AWS.Messaging.Lambda (0.1.1-beta)**
* Added support for SourceLink
* **AWS.Messaging.Telemetry.OpenTelemetry (0.1.1-beta)**
* Added support for SourceLink

### Release 2024-03-08
* **AWS.Messaging (0.2.0-beta)**
* BREAKING CHANGE: Message content is no longer included by default in logs or exceptions. Call `EnableDataMessageLogging` during setup to re-enable.
Expand Down
60 changes: 57 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ The framework is currently under active development. It already supports:
* Handling messages from [FIFO (first-in-first-out) queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fifo-queues.html), and respecting group ordering
* OpenTelemetry Instrumentation
* Customizing serialization
* Performance and error hardening

Features to be added:
* AWS X-Ray Instrumentation
* Performance and error hardening

# Getting started

Expand Down Expand Up @@ -287,6 +287,60 @@ In the following example the framework will check every 1 second for messages th
## Handling Messages in AWS Lambda Functions
You can use the AWS Message Processing Framework for .NET with [SQS's integration with Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html). This is provided by the `AWS.Messaging.Lambda` package. Refer to its [README](https://github.com/awslabs/aws-dotnet-messaging/blob/main/src/AWS.Messaging.Lambda/README.md) to get started.

## SQS Poller Resiliency
The SQS Poller is resilient by design and is able to handle errors thrown by the underlying .NET SDK as well as the framework itself in two ways. We have classified a number of exceptions that may occur due to invalid configuration from the user's side as fatal exceptions. These exceptions will cause the SQS Poller background service to stop running after throwing a user-friendly error message. However, any exceptions thrown, outside of the fatal ones we have defined, will not cause the SQS Poller to error out and will remain resilient in the event that an underlying service is facing degraded performance or outages. The SQS poller leverages backoffs in an effort to retry any failed SQS requests while applying a certain time-delay between retries.

The framework defined two interfaces, `IBackoffHandler` and `IBackoffPolicy`. The `IBackoffPolicy` is closely tied to the `BackoffHandler` which implements `IBackoffHandler`. The default implementation of `IBackoffHandler` checks the attached `IBackoffPolicy` to see if a backoff should be applied. If a backoff is to be applied, the `IBackoffPolicy` also returns the time delay between retries.

The framework support three backoff policies:
* `None`, which would disable the backoff handler. This will allow users to fully rely on the SDK’s retry logic.
* `Interval`, which would backoff on a given and configurable interval. Default value is 1 second.
* `CappedExponential`, which would perform an exponential backoff up until it reaches a certain configurable max backoff time, at which point it would switch to an interval backoff. Default value for the cap backoff time is 1 hour.

By default, without any user configuration, the SQS Poller will use the default implementation of the `IBackoffHandler` interface, coupled with a capped exponential backoff policy.

Users are free to change the backoff policy as follows:
```csharp
services.AddAWSMessageBus(builder =>
{
builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/536721586275/MPF");

// Optional: Configure the backoff policy used by the SQS Poller.
builder.ConfigureBackoffPolicy(options =>
{
// Use 1 of the available 3 backoff policies:
// No backoff Policy
options.UseNoBackoff();

// Interval backoff policy
options.UseIntervalBackoff(x =>
{
x.FixedInterval = 1;
});

// Capped exponential backoff policy
options.UseCappedExponentialBackoff(x =>
{
x.CapBackoffTime = 60;
});
});
});
```

Users can also implement their own backoff handler by implementing the interface `IBackoffHandler` and injecting it before the AWS Message Bus is added to the DI Container. This can be done as follows:
```csharp
services.TryAddSingleton<IBackoffHandler, CustomBackoffHandler>();

services.AddAWSMessageBus(builder =>
{
builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MPF");
builder.AddMessageHandler<ChatMessageHandler, ChatMessage>("chatMessage");
});
```

As an example, you can use [Polly](https://github.com/App-vNext/Polly) to handle the retries. A sample app that achieves this could be found [here](./sampleapps/PollyIntegration/).

# Telemetry
The AWS Message Processing Framework for .NET is instrumented for OpenTelemetry to log [traces](https://opentelemetry.io/docs/concepts/signals/traces/) for each message that is published or handled by the framework. This is provided by the `AWS.Messaging.Telemetry.OpenTelemetry` package. Refer to its [README](https://github.com/awslabs/aws-dotnet-messaging/blob/main/src/AWS.Messaging.Telemetry.OpenTelemetry/README.md) to get started.

Expand Down Expand Up @@ -327,11 +381,11 @@ We welcome community contributions and pull requests. See [CONTRIBUTING.md](./CO
# Security
The AWS Message Processing Framework for .NET relies on the [AWS SDK for .NET](https://github.com/aws/aws-sdk-net) for communicating with AWS. Refer to the [security section](https://docs.aws.amazon.com/sdk-for-net/v3/developer-guide/security.html) in the [AWS SDK for .NET Developer Guide](https://docs.aws.amazon.com/sdk-for-net/v3/developer-guide/welcome.html) for more information. You can also find more information in [AWS Message Processing Framework for .NET Developer Guide](https://docs.aws.amazon.com/sdk-for-net/v3/developer-guide/msg-proc-fw.html).

The framework does not log data messages sent by the user for security purposes. If users want to enable this functionality for debugging purposes, you need to call `EnableDataMessageLogging()` in the AWS Message Bus as follows:
The framework does not log data messages sent by the user for security purposes. If users want to enable this functionality for debugging purposes, you need to call `EnableMessageContentLogging()` in the AWS Message Bus as follows:
```csharp
builder.Services.AddAWSMessageBus(bus =>
{
builder.EnableDataMessageLogging();
builder.EnableMessageContentLogging();
});
```

Expand Down
1 change: 0 additions & 1 deletion docs/docfx.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
],
"exclude": [
"_site/**",
"docs/**",
"filter.yml"
]
}
Expand Down
29 changes: 29 additions & 0 deletions sampleapps/PollyIntegration/MessageHandlers/ChatMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.\r
// SPDX-License-Identifier: Apache-2.0

using AWS.Messaging;
using PollyIntegration.Models;

namespace PollyIntegration.MessageHandlers;

public class ChatMessageHandler : IMessageHandler<ChatMessage>
{
public Task<MessageProcessStatus> HandleAsync(MessageEnvelope<ChatMessage> messageEnvelope, CancellationToken token = default)
{
if (messageEnvelope == null)
{
return Task.FromResult(MessageProcessStatus.Failed());
}

if (messageEnvelope.Message == null)
{
return Task.FromResult(MessageProcessStatus.Failed());
}

var message = messageEnvelope.Message;

Console.WriteLine($"Message Description: {message.MessageDescription}");

return Task.FromResult(MessageProcessStatus.Success());
}
}
9 changes: 9 additions & 0 deletions sampleapps/PollyIntegration/Models/ChatMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.\r
// SPDX-License-Identifier: Apache-2.0

namespace PollyIntegration.Models;

public class ChatMessage
{
public string MessageDescription { get; set; } = string.Empty;
}
28 changes: 28 additions & 0 deletions sampleapps/PollyIntegration/PollyBackoffHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.\r
// SPDX-License-Identifier: Apache-2.0

using AWS.Messaging.Configuration;
using AWS.Messaging.Services.Backoff;
using Polly;
using Polly.Registry;

namespace PollyIntegration;

public class PollyBackoffHandler : IBackoffHandler
{
private readonly ResiliencePipelineProvider<string> _resiliencePipelineProvider;

public PollyBackoffHandler(ResiliencePipelineProvider<string> resiliencePipelineProvider)
{
_resiliencePipelineProvider = resiliencePipelineProvider;
}

public async Task<T> BackoffAsync<T>(Func<Task<T>> task, SQSMessagePollerConfiguration configuration, CancellationToken token)
{
ResiliencePipeline pipeline = _resiliencePipelineProvider.GetPipeline("my-pipeline");

// Execute the pipeline
return await pipeline.ExecuteAsync(async cancellationToken => await task.Invoke(),
token);
}
}
34 changes: 34 additions & 0 deletions sampleapps/PollyIntegration/PollyIntegration.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<None Remove="appsettings.json" />
</ItemGroup>

<ItemGroup>
<Content Include="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.*" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.6.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.6.0" />
<PackageReference Include="Polly.Core" Version="8.1.0" />
<PackageReference Include="Polly.Extensions" Version="8.1.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\AWS.Messaging.Telemetry.OpenTelemetry\AWS.Messaging.Telemetry.OpenTelemetry.csproj" />
<ProjectReference Include="..\..\src\AWS.Messaging\AWS.Messaging.csproj" />
</ItemGroup>

</Project>
55 changes: 55 additions & 0 deletions sampleapps/PollyIntegration/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using AWS.Messaging.Services.Backoff;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using AWS.Messaging.Telemetry.OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using Polly;
using Polly.Retry;
using PollyIntegration;
using PollyIntegration.MessageHandlers;
using PollyIntegration.Models;

await Host.CreateDefaultBuilder(args)
.ConfigureLogging(logging =>
{
logging.ClearProviders();
logging.AddConsole().SetMinimumLevel(LogLevel.Debug);
})
.ConfigureAppConfiguration(configuration =>
{
configuration.AddJsonFile("appsettings.json");
})
.ConfigureServices((context, services) =>
{
services.AddResiliencePipeline("my-pipeline", builder =>
{
builder
.AddRetry(new RetryStrategyOptions())
.AddTimeout(TimeSpan.FromSeconds(10));
});
services.TryAddSingleton<IBackoffHandler, PollyBackoffHandler>();
services.AddAWSMessageBus(builder =>
{
// To load the configuration from appsettings.json instead of the code below, uncomment this and remove the following lines.
// builder.LoadConfigurationFromSettings(context.Configuration);
builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MPF");
builder.AddMessageHandler<ChatMessageHandler, ChatMessage>("chatMessage");
// Logging data messages is disabled by default to protect sensitive user data. If you want this enabled, uncomment the line below.
// builder.EnableMessageContentLogging();
})
.AddOpenTelemetry()
.ConfigureResource(resource => resource.AddService("PollyIntegration"))
.WithTracing(tracing => tracing
.AddAWSMessagingInstrumentation()
.AddConsoleExporter());
})
.Build()
.RunAsync();
27 changes: 27 additions & 0 deletions sampleapps/PollyIntegration/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*",
"AWS.Messaging": {
"MessageHandlers": [
{
"HandlerType": "PollyIntegration.MessageHandlers.ChatMessageHandler",
"MessageType": "PollyIntegration.Models.ChatMessage",
"MessageTypeIdentifier": "chatMessage"
}
],
"SQSPollers": [
{
"QueueUrl": "https://sqs.us-west-2.amazonaws.com/012345678910/MPF"
}
],
"BackoffPolicy": "CappedExponential",
"CappedExponentialBackoffOptions": {
"CapBackoffTime": 2
}
}
}
2 changes: 1 addition & 1 deletion sampleapps/PublisherAPI/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
});
// Logging data messages is disabled by default to protect sensitive user data. If you want this enabled, uncomment the line below.
// builder.EnableDataMessageLogging();
// bus.EnableMessageContentLogging();
});
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
Expand Down
3 changes: 2 additions & 1 deletion sampleapps/PublisherAPI/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
}
}
],
"LogMessageContent": false
"LogMessageContent": false,
"BackoffPolicy": "CappedExponential"
}
}
11 changes: 10 additions & 1 deletion sampleapps/SubscriberService/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,17 @@ await Host.CreateDefaultBuilder(args)
};
});
// Optional: Configure the backoff policy used by the SQS Poller.
builder.ConfigureBackoffPolicy(options =>
{
options.UseCappedExponentialBackoff(x =>
{
x.CapBackoffTime = 60;
});
});
// Logging data messages is disabled by default to protect sensitive user data. If you want this enabled, uncomment the line below.
// builder.EnableDataMessageLogging();
// builder.EnableMessageContentLogging();
})
.AddOpenTelemetry()
.ConfigureResource(resource => resource.AddService("SubscriberService"))
Expand Down
3 changes: 2 additions & 1 deletion sampleapps/SubscriberService/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"VisibilityTimeoutExtensionThreshold": 5
}
}
]
],
"BackoffPolicy": "CappedExponential"
}
}
9 changes: 7 additions & 2 deletions src/AWS.Messaging.Lambda/AWS.Messaging.Lambda.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
<WarningsAsErrors>CA1727</WarningsAsErrors>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>..\..\public.snk</AssemblyOriginatorKeyFile>
<Version>0.1.0-beta</Version>
<Version>0.1.1-beta</Version>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Amazon.Lambda.Core" Version="2.2.0" />
<PackageReference Include="Amazon.Lambda.SQSEvents" Version="2.2.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All"/>
</ItemGroup>

<ItemGroup>
Expand All @@ -37,6 +42,6 @@

<ItemGroup>
<None Include=".\README.md" Pack="true" PackagePath="" />
</ItemGroup>
</ItemGroup>

</Project>
Loading

0 comments on commit 9c05f1c

Please sign in to comment.