-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #475 from momentohq/topic-example
feat: add example for topics
- Loading branch information
Showing
9 changed files
with
225 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
using Microsoft.Extensions.Logging; | ||
using Momento.Sdk; | ||
using Momento.Sdk.Auth; | ||
using Momento.Sdk.Config; | ||
using Momento.Sdk.Exceptions; | ||
using Momento.Sdk.Responses; | ||
|
||
namespace TopicExample; | ||
|
||
public class Driver | ||
{ | ||
private const string AuthTokenEnvVar = "MOMENTO_AUTH_TOKEN"; | ||
private const string CacheNameEnvVar = "MOMENTO_CACHE_NAME"; | ||
private const string TopicName = "example-topic"; | ||
private static readonly ILogger Logger; | ||
private static readonly ILoggerFactory LoggerFactory; | ||
|
||
static Driver() | ||
{ | ||
LoggerFactory = InitializeLogging(); | ||
Logger = LoggerFactory.CreateLogger<Driver>(); | ||
} | ||
|
||
public static async Task Main() | ||
{ | ||
var authToken = ReadAuthToken(); | ||
var cacheName = ReadCacheName(); | ||
|
||
// Set up the client | ||
using ICacheClient client = | ||
new CacheClient(Configurations.Laptop.V1(LoggerFactory), authToken, TimeSpan.FromSeconds(60)); | ||
await EnsureCacheExistsAsync(client, cacheName); | ||
using ITopicClient topicClient = new TopicClient(TopicConfigurations.Laptop.latest(LoggerFactory), authToken); | ||
try | ||
{ | ||
var cts = new CancellationTokenSource(); | ||
cts.CancelAfter(10_000); | ||
|
||
// Subscribe and begin receiving messages | ||
var subscriptionTask = Task.Run(async () => | ||
{ | ||
var subscribeResponse = await topicClient.SubscribeAsync(cacheName, TopicName); | ||
switch (subscribeResponse) | ||
{ | ||
case TopicSubscribeResponse.Subscription subscription: | ||
try | ||
{ | ||
var cancellableSubscription = subscription.WithCancellation(cts.Token); | ||
await foreach (var message in cancellableSubscription) | ||
{ | ||
switch (message) | ||
{ | ||
case TopicMessage.Binary: | ||
Logger.LogInformation("Received unexpected binary message from topic."); | ||
break; | ||
case TopicMessage.Text text: | ||
Logger.LogInformation("Received string message from topic: {message}", | ||
text.Value); | ||
break; | ||
case TopicMessage.Error error: | ||
Logger.LogInformation("Received error message from topic: {error}", | ||
error.Message); | ||
cts.Cancel(); | ||
break; | ||
} | ||
} | ||
} | ||
finally | ||
{ | ||
subscription.Dispose(); | ||
} | ||
break; | ||
case TopicSubscribeResponse.Error error: | ||
Logger.LogInformation("Error subscribing to a topic: {error}", error.Message); | ||
cts.Cancel(); | ||
break; | ||
} | ||
}); | ||
|
||
// Publish messages | ||
var publishTask = Task.Run(async () => | ||
{ | ||
var messageCounter = 0; | ||
while (!cts.IsCancellationRequested) | ||
{ | ||
var publishResponse = | ||
await topicClient.PublishAsync(cacheName, TopicName, $"message {messageCounter}"); | ||
switch (publishResponse) | ||
{ | ||
case TopicPublishResponse.Success: | ||
break; | ||
case TopicPublishResponse.Error error: | ||
Logger.LogInformation("Error publishing a message to the topic: {error}", error.Message); | ||
cts.Cancel(); | ||
break; | ||
} | ||
await Task.Delay(1_000); | ||
messageCounter++; | ||
} | ||
}); | ||
|
||
await Task.WhenAll(subscriptionTask, publishTask); | ||
} | ||
finally | ||
{ | ||
client.Dispose(); | ||
topicClient.Dispose(); | ||
} | ||
} | ||
|
||
private static ILoggerFactory InitializeLogging() | ||
{ | ||
return Microsoft.Extensions.Logging.LoggerFactory.Create(builder => | ||
{ | ||
builder.AddSimpleConsole(options => | ||
{ | ||
options.IncludeScopes = true; | ||
options.SingleLine = true; | ||
options.TimestampFormat = "hh:mm:ss "; | ||
}); | ||
builder.SetMinimumLevel(LogLevel.Information); | ||
}); | ||
} | ||
|
||
private static ICredentialProvider ReadAuthToken() | ||
{ | ||
try | ||
{ | ||
return new EnvMomentoTokenProvider(AuthTokenEnvVar); | ||
} | ||
catch (InvalidArgumentException) | ||
{ | ||
} | ||
|
||
Console.Write($"Auth token not detected in environment variable {AuthTokenEnvVar}. Enter auth token here: "); | ||
var authToken = Console.ReadLine()!.Trim(); | ||
|
||
StringMomentoTokenProvider? authProvider = null; | ||
try | ||
{ | ||
authProvider = new StringMomentoTokenProvider(authToken); | ||
} | ||
catch (InvalidArgumentException e) | ||
{ | ||
Logger.LogInformation("{}", e); | ||
LoggerFactory.Dispose(); | ||
Environment.Exit(1); | ||
} | ||
|
||
return authProvider; | ||
} | ||
|
||
private static string ReadCacheName() | ||
{ | ||
var cacheName = Environment.GetEnvironmentVariable(CacheNameEnvVar); | ||
return cacheName ?? "default-cache"; | ||
} | ||
|
||
private static async Task EnsureCacheExistsAsync(ICacheClient client, string cacheName) | ||
{ | ||
Logger.LogInformation("Creating cache {cacheName} if it doesn't already exist.", cacheName); | ||
var createCacheResponse = await client.CreateCacheAsync(cacheName); | ||
switch (createCacheResponse) | ||
{ | ||
case CreateCacheResponse.Success: | ||
Logger.LogInformation("Created cache {cacheName}.", cacheName); | ||
break; | ||
case CreateCacheResponse.CacheAlreadyExists: | ||
Logger.LogInformation("Cache {cacheName} already exists.", cacheName); | ||
break; | ||
case CreateCacheResponse.Error: | ||
Logger.LogInformation("Error creating cache: {error.Message}", cacheName); | ||
Environment.Exit(1); | ||
break; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
# Topic Example | ||
|
||
This example program demonstrates usage of Momento Topics. | ||
|
||
# Usage | ||
|
||
The program assumes the auth token and cache names are available in environment variables. The auth token is assumed to be in the variable `TEST_AUTH_TOKEN` and the cache name in `TEST_CACHE_NAME`. If either of these is missing, you will be prompted to enter the values on the terminal. | ||
|
||
To run the program, run either: | ||
|
||
```bash | ||
TEST_AUTH_TOKEN=<YOUR_TOKEN_HERE> TEST_CACHE_NAME=<YOUR_CACHE_NAME_HERE> dotnet run | ||
``` | ||
|
||
or | ||
|
||
```bash | ||
dotnet run | ||
``` | ||
|
||
and you will be prompted to enter the auth token and cache name. | ||
|
||
If the cache name entered does not exist, the program will create it. | ||
|
||
The example publishes one message per second to a topic for 10 seconds. It subscribes to the same topic and prints each message it receives. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<OutputType>Exe</OutputType> | ||
<TargetFramework>net6.0</TargetFramework> | ||
<ImplicitUsings>enable</ImplicitUsings> | ||
<Nullable>enable</Nullable> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="6.0.0" /> | ||
<PackageReference Include="Momento.Sdk" Version="1.19.0" /> | ||
</ItemGroup> | ||
</Project> |