-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
using Microsoft.Extensions.Logging; | ||
using Momento.Sdk; | ||
using Momento.Sdk.Auth; | ||
using Momento.Sdk.Config; | ||
using Momento.Sdk.Exceptions; | ||
using Momento.Sdk.Responses; | ||
|
||
namespace TopicExample; | ||
|
||
public class Driver | ||
{ | ||
private const string AuthTokenEnvVar = "MOMENTO_AUTH_TOKEN"; | ||
private const string CacheNameEnvVar = "MOMENTO_CACHE_NAME"; | ||
private const string TopicName = "example-topic"; | ||
private static readonly ILogger Logger; | ||
private static readonly ILoggerFactory LoggerFactory; | ||
|
||
static Driver() | ||
{ | ||
LoggerFactory = InitializeLogging(); | ||
Logger = LoggerFactory.CreateLogger<Driver>(); | ||
} | ||
|
||
public static async Task Main() | ||
{ | ||
var authToken = ReadAuthToken(); | ||
var cacheName = ReadCacheName(); | ||
|
||
// Set up the client | ||
using ICacheClient client = | ||
new CacheClient(Configurations.Laptop.V1(LoggerFactory), authToken, TimeSpan.FromSeconds(60)); | ||
await EnsureCacheExistsAsync(client, cacheName); | ||
using ITopicClient topicClient = new TopicClient(TopicConfigurations.Laptop.latest(LoggerFactory), authToken); | ||
try | ||
{ | ||
var cts = new CancellationTokenSource(); | ||
cts.CancelAfter(10_000); | ||
|
||
// Subscribe and begin receiving messages | ||
var subscriptionTask = Task.Run(async () => | ||
{ | ||
var subscribeResponse = await topicClient.SubscribeAsync(cacheName, TopicName); | ||
switch (subscribeResponse) | ||
{ | ||
case TopicSubscribeResponse.Subscription subscription: | ||
try | ||
{ | ||
var cancellableSubscription = subscription.WithCancellation(cts.Token); | ||
await foreach (var message in cancellableSubscription) | ||
{ | ||
switch (message) | ||
{ | ||
case TopicMessage.Binary: | ||
Logger.LogInformation("Received unexpected binary message from topic."); | ||
break; | ||
case TopicMessage.Text text: | ||
Logger.LogInformation("Received string message from topic: {message}", | ||
text.Value); | ||
break; | ||
case TopicMessage.Error error: | ||
Logger.LogInformation("Received error message from topic: {error}", | ||
error.Message); | ||
cts.Cancel(); | ||
break; | ||
} | ||
} | ||
} | ||
finally | ||
{ | ||
subscription.Dispose(); | ||
} | ||
break; | ||
case TopicSubscribeResponse.Error error: | ||
Logger.LogInformation("Error subscribing to a topic: {error}", error.Message); | ||
cts.Cancel(); | ||
break; | ||
} | ||
}); | ||
|
||
// Publish messages | ||
var publishTask = Task.Run(async () => | ||
{ | ||
var messageCounter = 0; | ||
while (!cts.IsCancellationRequested) | ||
{ | ||
var publishResponse = | ||
await topicClient.PublishAsync(cacheName, TopicName, $"message {messageCounter}"); | ||
switch (publishResponse) | ||
{ | ||
case TopicPublishResponse.Success: | ||
break; | ||
case TopicPublishResponse.Error error: | ||
Logger.LogInformation("Error publishing a message to the topic: {error}", error.Message); | ||
cts.Cancel(); | ||
break; | ||
} | ||
await Task.Delay(1_000); | ||
messageCounter++; | ||
} | ||
}); | ||
|
||
await Task.WhenAll(subscriptionTask, publishTask); | ||
} | ||
finally | ||
{ | ||
client.Dispose(); | ||
topicClient.Dispose(); | ||
} | ||
} | ||
|
||
private static ILoggerFactory InitializeLogging() | ||
{ | ||
return Microsoft.Extensions.Logging.LoggerFactory.Create(builder => | ||
{ | ||
builder.AddSimpleConsole(options => | ||
{ | ||
options.IncludeScopes = true; | ||
options.SingleLine = true; | ||
options.TimestampFormat = "hh:mm:ss "; | ||
}); | ||
builder.SetMinimumLevel(LogLevel.Information); | ||
}); | ||
} | ||
|
||
private static ICredentialProvider ReadAuthToken() | ||
{ | ||
try | ||
{ | ||
return new EnvMomentoTokenProvider(AuthTokenEnvVar); | ||
} | ||
catch (InvalidArgumentException) | ||
{ | ||
} | ||
|
||
Console.Write($"Auth token not detected in environment variable {AuthTokenEnvVar}. Enter auth token here: "); | ||
var authToken = Console.ReadLine()!.Trim(); | ||
|
||
StringMomentoTokenProvider? authProvider = null; | ||
try | ||
{ | ||
authProvider = new StringMomentoTokenProvider(authToken); | ||
} | ||
catch (InvalidArgumentException e) | ||
{ | ||
Logger.LogInformation("{}", e); | ||
LoggerFactory.Dispose(); | ||
Environment.Exit(1); | ||
} | ||
|
||
return authProvider; | ||
} | ||
|
||
private static string ReadCacheName() | ||
{ | ||
var cacheName = Environment.GetEnvironmentVariable(CacheNameEnvVar); | ||
return cacheName ?? "default-cache"; | ||
} | ||
|
||
private static async Task EnsureCacheExistsAsync(ICacheClient client, string cacheName) | ||
{ | ||
Logger.LogInformation("Creating cache {cacheName} if it doesn't already exist.", cacheName); | ||
var createCacheResponse = await client.CreateCacheAsync(cacheName); | ||
switch (createCacheResponse) | ||
{ | ||
case CreateCacheResponse.Success: | ||
Logger.LogInformation("Created cache {cacheName}.", cacheName); | ||
break; | ||
case CreateCacheResponse.CacheAlreadyExists: | ||
Logger.LogInformation("Cache {cacheName} already exists.", cacheName); | ||
break; | ||
case CreateCacheResponse.Error: | ||
Logger.LogInformation("Error creating cache: {error.Message}", cacheName); | ||
Environment.Exit(1); | ||
break; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
# Topic Example | ||
|
||
This example program demonstrates usage of Momento Topics. | ||
|
||
# Usage | ||
|
||
The program assumes the auth token and cache names are available in environment variables. The auth token is assumed to be in the variable `TEST_AUTH_TOKEN` and the cache name in `TEST_CACHE_NAME`. If either of these is missing, you will be prompted to enter the values on the terminal. | ||
|
||
To run the program, run either: | ||
|
||
```bash | ||
TEST_AUTH_TOKEN=<YOUR_TOKEN_HERE> TEST_CACHE_NAME=<YOUR_CACHE_NAME_HERE> dotnet run | ||
``` | ||
|
||
or | ||
|
||
```bash | ||
dotnet run | ||
``` | ||
|
||
and you will be prompted to enter the auth token and cache name. | ||
|
||
If the cache name entered does not exist, the program will create it. | ||
|
||
The example publishes one message per second to a topic for 10 seconds. It subscribes to the same topic and prints each message it receives. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<OutputType>Exe</OutputType> | ||
<TargetFramework>net6.0</TargetFramework> | ||
<ImplicitUsings>enable</ImplicitUsings> | ||
<Nullable>enable</Nullable> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="6.0.0" /> | ||
<PackageReference Include="Momento.Sdk" Version="1.19.0" /> | ||
</ItemGroup> | ||
</Project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
namespace Momento.Sdk.Auth.AccessControl; | ||
|
||
public abstract record DisposableTokenPermission; | ||
Check warning on line 3 in src/Momento.Sdk/Auth/AccessControl/DisposableToken.cs GitHub Actions / publish
Check warning on line 3 in src/Momento.Sdk/Auth/AccessControl/DisposableToken.cs GitHub Actions / publish
|
||
|
||
public abstract record DisposableToken | ||
Check warning on line 5 in src/Momento.Sdk/Auth/AccessControl/DisposableToken.cs GitHub Actions / publish
Check warning on line 5 in src/Momento.Sdk/Auth/AccessControl/DisposableToken.cs GitHub Actions / publish
|
||
{ | ||
public record CachePermission(CacheRole Role, CacheSelector CacheSelector) : DisposableTokenPermission | ||
Check warning on line 7 in src/Momento.Sdk/Auth/AccessControl/DisposableToken.cs GitHub Actions / publish
Check warning on line 7 in src/Momento.Sdk/Auth/AccessControl/DisposableToken.cs GitHub Actions / publish
Check warning on line 7 in src/Momento.Sdk/Auth/AccessControl/DisposableToken.cs GitHub Actions / publish
Check warning on line 7 in src/Momento.Sdk/Auth/AccessControl/DisposableToken.cs GitHub Actions / publish
Check warning on line 7 in src/Momento.Sdk/Auth/AccessControl/DisposableToken.cs GitHub Actions / publish
Check warning on line 7 in src/Momento.Sdk/Auth/AccessControl/DisposableToken.cs GitHub Actions / publish
Check warning on line 7 in src/Momento.Sdk/Auth/AccessControl/DisposableToken.cs GitHub Actions / publish
Check warning on line 7 in src/Momento.Sdk/Auth/AccessControl/DisposableToken.cs GitHub Actions / publish
Check warning on line 7 in src/Momento.Sdk/Auth/AccessControl/DisposableToken.cs GitHub Actions / publish
Check warning on line 7 in src/Momento.Sdk/Auth/AccessControl/DisposableToken.cs GitHub Actions / publish
|
||
{ | ||
// public virtual bool Equals(CachePermission? other) | ||
// { | ||
// return false; | ||
// } | ||
} | ||
|
||
public record CacheItemPermission | ||
(CacheRole Role, CacheSelector CacheSelector, CacheItemSelector CacheItemSelector) : CachePermission(Role, | ||
CacheSelector); | ||
|
||
public record TopicPermission(TopicRole Role, CacheSelector CacheSelector, TopicSelector TopicSelector) : DisposableTokenPermission; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
using System.Collections.Generic; | ||
|
||
namespace Momento.Sdk.Auth.AccessControl; | ||
|
||
public class DisposableTokenScope | ||
{ | ||
public List<DisposableTokenPermission> Permissions { get; } | ||
|
||
public DisposableTokenScope(List<DisposableTokenPermission> Permissions) | ||
{ | ||
this.Permissions = Permissions; | ||
} | ||
} |