Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka Component #951

Merged
merged 3 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Aspire.sln
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OrleansServer", "samples\or
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ServiceDefaults", "samples\orleans\ServiceDefaults\ServiceDefaults.csproj", "{F7D9FA54-1F64-4A36-961A-0087F8E88D07}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Confluent.Kafka", "src\Components\Aspire.Confluent.Kafka\Aspire.Confluent.Kafka.csproj", "{174E0507-3BB0-4CDC-829E-9CA75DA66473}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Confluent.Kafka.Tests", "tests\Aspire.Confluent.Kafka.Tests\Aspire.Confluent.Kafka.Tests.csproj", "{A8CB331A-1247-41D9-8118-538E5A2CC9DF}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -496,6 +500,14 @@ Global
{F7D9FA54-1F64-4A36-961A-0087F8E88D07}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F7D9FA54-1F64-4A36-961A-0087F8E88D07}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F7D9FA54-1F64-4A36-961A-0087F8E88D07}.Release|Any CPU.Build.0 = Release|Any CPU
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Debug|Any CPU.Build.0 = Debug|Any CPU
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Release|Any CPU.ActiveCfg = Release|Any CPU
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Release|Any CPU.Build.0 = Release|Any CPU
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -582,6 +594,8 @@ Global
{04B03D1C-45C5-44D4-AEE5-BC315F3D9D26} = {8BAF2119-8370-4E9E-A887-D92506F8C727}
{20758E81-7316-49AC-8E1B-A5461397530A} = {8BAF2119-8370-4E9E-A887-D92506F8C727}
{F7D9FA54-1F64-4A36-961A-0087F8E88D07} = {8BAF2119-8370-4E9E-A887-D92506F8C727}
{174E0507-3BB0-4CDC-829E-9CA75DA66473} = {27381127-6C45-4B4C-8F18-41FF48DFE4B2}
{A8CB331A-1247-41D9-8118-538E5A2CC9DF} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6DCEDFEC-988E-4CB3-B45B-191EB5086E0C}
Expand Down
4 changes: 4 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<PackageVersion Include="AspNetCore.HealthChecks.Azure.Storage.Blobs" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.Azure.Storage.Queues" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.AzureServiceBus" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.Kafka" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.MongoDb" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.MySql" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.NpgSql" Version="8.0.0" />
Expand Down Expand Up @@ -64,6 +65,7 @@
<PackageVersion Include="Microsoft.Extensions.Primitives" Version="$(MicrosoftExtensionsPrimitivesPackageVersion)" />
<PackageVersion Include="Microsoft.Extensions.Http.Resilience" Version="$(MicrosoftExtensionsHttpResiliencePackageVersion)" />
<!-- external dependencies -->
<PackageVersion Include="Confluent.Kafka" Version="2.3.0" />
<PackageVersion Include="Dapr.AspNetCore" Version="1.12.0" />
<PackageVersion Include="DnsClient" Version="1.7.0" />
<PackageVersion Include="Grpc.AspNetCore" Version="2.59.0" />
Expand Down Expand Up @@ -111,5 +113,7 @@
<PackageVersion Include="Microsoft.DotNet.Build.Tasks.Workloads" Version="8.0.0-beta.23564.4" />
<PackageVersion Include="Microsoft.Signed.Wix" Version="1.0.0-v3.14.0.5722" />
<PackageVersion Include="Microsoft.DotNet.Build.Tasks.Installers" Version="8.0.0-beta.23564.4" />
<!-- unit test dependencies -->
<PackageVersion Include="Microsoft.Extensions.Diagnostics.Testing" Version="8.0.0" />
</ItemGroup>
</Project>
80 changes: 80 additions & 0 deletions src/Aspire.Hosting/Kafka/KafkaBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Publishing;

namespace Aspire.Hosting;

public static class KafkaBuilderExtensions
{
private const int KafkaBrokerPort = 9092;
/// <summary>
/// Adds a Kafka broker container to the application.
/// </summary>
/// <param name="builder">The <see cref="IDistributedApplicationBuilder"/>.</param>
/// <param name="name">The name of the resource. This name will be used as the connection string name when referenced in a dependency.</param>
/// <param name="port">The host port of Kafka broker.</param>
/// <returns>A reference to the <see cref="IResourceBuilder{KafkaContainerResource}"/></returns>
public static IResourceBuilder<KafkaContainerResource> AddKafkaContainer(this IDistributedApplicationBuilder builder, string name, int? port = null)
{
var kafka = new KafkaContainerResource(name);
return builder.AddResource(kafka)
.WithEndpoint(hostPort: port, containerPort: KafkaBrokerPort)
.WithAnnotation(new ContainerImageAnnotation { Image = "confluentinc/confluent-local", Tag = "latest" })
.WithManifestPublishingCallback(context => WriteKafkaContainerToManifest(context, kafka))
.WithEnvironment(context => ConfigureKafkaContainer(context, kafka));

static void WriteKafkaContainerToManifest(ManifestPublishingContext context, KafkaContainerResource resource)
{
context.WriteContainer(resource);
context.Writer.WriteString("connectionString", $"{{{resource.Name}.bindings.tcp.host}}:{{{resource.Name}.bindings.tcp.port}}");
}
}

/// <summary>
/// Adds a Kafka resource to the application. A container is used for local development.
/// </summary>
/// <param name="builder">The <see cref="IDistributedApplicationBuilder"/>.</param>
/// <param name="name">The name of the resource. This name will be used as the connection string name when referenced in a dependency</param>
/// <returns>A reference to the <see cref="IResourceBuilder{KafkaServerResource}"/>.</returns>
public static IResourceBuilder<KafkaServerResource> AddKafka(this IDistributedApplicationBuilder builder, string name)
{
var kafka = new KafkaServerResource(name);
return builder.AddResource(kafka)
.WithEndpoint(containerPort: KafkaBrokerPort)
.WithAnnotation(new ContainerImageAnnotation{ Image = "confluentinc/confluent-local", Tag = "latest" })
.WithManifestPublishingCallback(WriteKafkaServerToManifest)
.WithEnvironment(context => ConfigureKafkaContainer(context, kafka));

static void WriteKafkaServerToManifest(ManifestPublishingContext context)
{
context.Writer.WriteString("type", "kafka.server.v0");
}
}

private static void ConfigureKafkaContainer(EnvironmentCallbackContext context, IResource resource)
{
// confluentinc/confluent-local is a docker image that contains a Kafka broker started with KRaft to avoid pulling a separate image for ZooKeeper.
// See https://github.com/confluentinc/kafka-images/blob/master/local/README.md.
// When not explicitly set default configuration is applied.
// See https://github.com/confluentinc/kafka-images/blob/master/local/include/etc/confluent/docker/configureDefaults for more details.

var hostPort = context.PublisherName == "manifest"
? KafkaBrokerPort
: GetResourcePort(resource);
context.EnvironmentVariables.Add("KAFKA_ADVERTISED_LISTENERS",
$"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:{hostPort}");

static int GetResourcePort(IResource resource)
{
if (!resource.TryGetAllocatedEndPoints(out var allocatedEndpoints))
{
throw new DistributedApplicationException(
$"Kafka resource \"{resource.Name}\" does not have endpoint annotation.");
}

return allocatedEndpoints.Single().Port;
}
}
}
37 changes: 37 additions & 0 deletions src/Aspire.Hosting/Kafka/KafkaContainerResource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Hosting.ApplicationModel;

namespace Aspire.Hosting;

/// <summary>
/// A resource that represents a Kafka broker container.
/// </summary>
/// <param name="name"></param>
public class KafkaContainerResource(string name) : ContainerResource(name), IResourceWithConnectionString, IResourceWithEnvironment
{
/// <summary>
/// Gets the connection string for Kafka broker.
/// </summary>
/// <returns>A connection string for the Kafka in the form "host:port" to be passed as <see href="https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.ClientConfig.html#Confluent_Kafka_ClientConfig_BootstrapServers">BootstrapServers</see>.</returns>
public string? GetConnectionString()
{
if (!this.TryGetAllocatedEndPoints(out var allocatedEndpoints))
{
throw new DistributedApplicationException($"Kafka resource \"{Name}\" does not have endpoint annotation.");
}

return allocatedEndpoints.SingleOrDefault()?.EndPointString;
}

internal int GetPort()
{
if (!this.TryGetAllocatedEndPoints(out var allocatedEndpoints))
{
throw new DistributedApplicationException($"Kafka resource \"{Name}\" does not have endpoint annotation.");
}

return allocatedEndpoints.Single().Port;
}
}
35 changes: 35 additions & 0 deletions src/Aspire.Hosting/Kafka/KafkaServerResource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace Aspire.Hosting.ApplicationModel;

/// <summary>
/// A resource that represents a Kafka broker.
/// </summary>
/// <param name="name">The name of the resource.</param>
public class KafkaServerResource(string name) : Resource(name), IResourceWithConnectionString, IResourceWithEnvironment
{
/// <summary>
/// Gets the connection string for Kafka broker.
/// </summary>
/// <returns>A connection string for the Kafka in the form "host:port" to be passed as <see href="https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.ClientConfig.html#Confluent_Kafka_ClientConfig_BootstrapServers">BootstrapServers</see>.</returns>
public string? GetConnectionString()
{
if (!this.TryGetAllocatedEndPoints(out var allocatedEndpoints))
{
throw new DistributedApplicationException($"Kafka resource \"{Name}\" does not have endpoint annotation.");
}

return allocatedEndpoints.SingleOrDefault()?.EndPointString;
}

internal int GetPort()
{
if (!this.TryGetAllocatedEndPoints(out var allocatedEndpoints))
{
throw new DistributedApplicationException($"Kafka resource \"{Name}\" does not have endpoint annotation.");
}

return allocatedEndpoints.Single().Port;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>$(NetCurrent)</TargetFramework>
<IsPackable>true</IsPackable>
<PackageTags>$(ComponentCommonPackageTags) kafka</PackageTags>
<Description>Confluent.Kafka based Kafka generic consumer and producer that integrates with Aspire, including healthchecks and metrics.</Description>
<NoWarn>$(NoWarn);SYSLIB1100</NoWarn>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Common\HealthChecksExtensions.cs" Link="HealthChecksExtensions.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.Kafka" />
<PackageReference Include="Confluent.Kafka" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" />
</ItemGroup>

</Project>
Loading