-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add topics #468
feat: add topics #468
Conversation
65d1fe4
to
62e3094
Compare
Add the initial topics implementation. The entry point is TopicClient, which contains the publish and subscribe methods. Subscribe returns an IAsyncEnumerable<TopicMessage> that can be iterated over to read from the topic.
62e3094
to
e10ae02
Compare
var subscribeResponse = await topicClient.SubscribeAsync(cacheName, topicName); | ||
Assert.True(subscribeResponse is TopicSubscribeResponse.Subscription, | ||
$"Unexpected response: {subscribeResponse}"); | ||
var subscription = (TopicSubscribeResponse.Subscription)subscribeResponse; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this what we are expecting the end user code to look like? They will have to cast it?
I know that with the cache methods we were doing like instanceof Subscription newVarName
and then it would smart cast. If there's not going to be a similar way for users to consume this then we may want to tweak how we are exposing the API, I don't want users to have to cast in the happy path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can cast in an if statement:
if (response is TopicPublishResponse.Success successResponse) { // handle success as appropriate }
That's what I would expect end user code to look like. I can make the tests look more like that if desired.
Assert.True(message is TopicMessage.Item, $"Unexpected message: {message}"); | ||
if (((TopicMessage.Item)message).ValueString!.Equals(skipMessage)) continue; | ||
|
||
Assert.Equal(valuesToSend[messageCount], ((TopicMessage.Item)message).ValueString); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here.
i don't mind casting in tests but I don't want it to be the UX.
If this pattern is just for tests then it might be worth adding an example main
that shows what you are expecting it to look for users that are consuming it.
The number 1 thing that we have to get correct before we merge this PR is the user-facing API. Everything else can be tweaked/improved/cleaned up later - so you should make sure there is something in the PR that clearly illustrates what you are expecting that to look like for a consumer.
is this missing an assembly reference?
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a short look and poked at it in visual studio. Nice docs, and I like how we got to await foreach (var message in subscription)
public Item(_TopicItem topicItem) | ||
{ | ||
_value = topicItem.Value; | ||
TopicSequenceNumber = topicItem.TopicSequenceNumber; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so, oneofs in protocol buffers are a little different for each language, but they always expect some kind of matching before accessing a property. As-is, a user can't tell the difference between a published empty string and a binary message.
We should provide a way for users to be able to tell the difference, possibly similar to the way protocol buffers does it (that's where we got the go matching pattern), between an Error, a Discontinuity, a Text value and a Binary value. Here is the documentation for inspiration: https://learn.microsoft.com/en-us/dotnet/architecture/grpc-for-wcf-developers/protobuf-any-oneof#oneof
public Item(_TopicItem topicItem)
{
switch (topicItem.Value.KindCase)
{
case _TopicValue.KindOneofCase.Text:
{
// The only populated field in this message's value is Text.
// Let's vend it to the user that way instead of leaving them to guess?
break;
}
case _TopicValue.KindOneofCase.Binary:
{
// The only populated field in this message's value is Binary.
break;
}
case _TopicValue.KindOneofCase.None:
{
// Skip message? It's not useful, and the server did something bad
break;
}
default:
{
// Skip message? It's not useful, as it holds something undefined
break;
}
}
_value = topicItem.Value;
TopicSequenceNumber = topicItem.TopicSequenceNumber;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with that. I was trying to follow the pattern set by the cache methods, where the value is derived from the same ByteString regardless of whether it is text or binary. Since the difference is more explicit here I'm in favor of making them separate types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kvcache so do you really think that's the right API here? To force users to always have a switch statement in order to read anything at all off of the subscription, even for the simplest use cases?
that feels overbearing and in the other SDKs I think we have been trying to avoid forcing that on them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kvcache random thought:
In the JS SDK we implement subscriptions by allowing the user to pass callbacks for OnItem and OnError. Presumably in the future there might also be OnDiscontinuity.
We did that in JS SDK because that's how the grpc streaming API was exposed.
In golang we did it iterator-style because that's how the grpc streaming API was exposed.
But would it be crazy, in .NET, to do it the callback way?
We could encapsulate the foreach loop inside the SDK in an async function, and allow the user to pass us callbacks for OnBinary, OnText, OnDiscontinuity.
Then they wouldn't have to have a switch statement.
It seems like it might be a better API IMO, with two questions in my mind:
- Is it going to be obvious to them that the overloaded signatures of
publish
will map to the specificOnBinary
/OnText
callbacks? (This issue is relevant now, even if we stick with the iterator-style API.) - If they need to do anything that has to do with ordering between OnText and OnDiscontinuity, having those separated into two user-provided lambdas might make it harder for them to manage/share state across the two callbacks? maybe not though, they could close over it or use a class that had the two callback functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(edit: crossposted)
@cprice404 this is inside the Item class - I do think it is reasonable to expect Momento developers to switch to do the needful with each message on the subscription :-).
Separately, I do not think all users should be forced to use pattern matching if they are of the exceptional persuasion. I tried to suggest that we should have accessors available that throw if the message is not of the required disposition. I think the item type should BOTH [1] have a switch pattern available AND [2] have exceptional typed accessors available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh dude, you sparked a memory: I think we might want to think about using delegate
s for this. You hit on something interesting here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delegate void TextHandler(string message);
delegate void BinaryHandler(byte[] message);
class SubscriptionActions
{
public SubscriptionActions(TextHandler? onText = null, BinaryHandler? onBinary = null)
{
OnTextMessage += Log;
OnBinaryMessage += Log;
if (onText != null)
{
OnTextMessage += onText;
}
if (onBinary != null)
{
OnBinaryMessage += onBinary;
}
}
public event TextHandler OnTextMessage;
public event BinaryHandler OnBinaryMessage;
private void Log<T>(T message)
{
}
}
class Lol
{
SubscriptionActions MakeSubscriptionActionsThatJustPrintTextMessagesToTheConsole()
{
return new SubscriptionActions(onText: Console.WriteLine);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, as nice as this could be, I think await foreach
is too ergonomic to ignore.
var subscribeResponse = await topicClient.SubscribeAsync(cacheName, topicName); | ||
Assert.True(subscribeResponse is TopicSubscribeResponse.Subscription, | ||
$"Unexpected response: {subscribeResponse}"); | ||
var subscription = (TopicSubscribeResponse.Subscription)subscribeResponse; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, casting being the pattern is not great. I think you should consider:
- Make TopicSubscribeResponse the return type.
- Make TopicSubscribeResponse a simple class.
- Un-nest response types.
- Compose 2 things in the TopicSubscribeResponse:
- An enum
TopicSubscribeResponse.Kind
- A field for each response kind (like protocol buffers, sound familiar?)
- An enum
- Add a couple properties to TopicSubscribeResponse:
- Kind - documented so people can choose to switch on it if they want to avoid the throws below...
- Subscription - gives you the Subscription or throws a good error about the error.
- Error - gives you the Error or throws a good error about how this was not an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Casting is not the normal pattern outside of testing. In use it would look more like:
if (subscribeResponse is TopicSubscribeResponse.Subscription subscription)
{
await foreach (var message in subscription)
{
switch (message)
{
case TopicMessage.Binary binary:
break;
case TopicMessage.Text text:
break;
case TopicMessage.Discontinuity discontinuity:
break;
case TopicMessage.Error:
break;
}
}
} else if (subscribeResponse is TopicSubscribeResponse.Error error)
{
// error handling
}
That's assuming we split text and binary messages and add discontinuity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cprice404 Is there a good place to add an example like this without releasing and adding it to the examples directory? I can make something separate if we want realistic looking code before the examples are ready.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, that's pretty cool. It looks like a type switch + lexical cast. I couldn't tell that's how it was going :-). Flattening the value cases seems reasonable to me too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re: how to show examples, generally I will just add a throwaway file with the example code, and remove it before the PR is merged. Or add a temporary dependency on ../blah
in the Examples project and include an example there, and stash it before the final PR merge.
or pasting an example into github like you did is fine too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rewritten tests have a consume function that looks much more like what standard use is:
private async Task<List<TopicMessage>> ConsumeMessages(string topicName, CancellationToken token) |
case _SubscriptionItem.KindOneofCase.Discontinuity: | ||
_logger.LogTraceTopicMessageReceived("discontinuity", cacheName, topicName); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should probably be returning a subscription item so users can be aware of missing messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to add Discontinuity as a separate item in a second pass, but I can add it now if if won't add too much complexity to the pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes we need to get this out the door, we will add those later
Split Item into separate Text and Binary types so there is no ambiguity about the type of message received. Add AsyncInterfaces to the integration test csproj.
Add cancellations to the topic tests so guard against deadlocks. Temporarily change the ubuntu test runner to macos to see if the hanging is os dependent.
ubuntu-latest and windows-latest seem to hang forever when running the integration tests, while macos-latest passes. The tests run locally on mac os and amazon linux. Continuing to troubleshoot. |
This could be a problem: |
Add a net6.0 windows target to the github tests.
src/Momento.Sdk/TopicClient.cs
Outdated
/// </summary> | ||
/// <param name="config">Configuration to use for the transport, retries, middlewares. See <see cref="Configurations"/> for out-of-the-box configuration choices, eg <see cref="Configurations.Laptop.Latest"/></param> | ||
/// <param name="authProvider">Momento auth provider.</param> | ||
public TopicClient(IConfiguration config, ICredentialProvider authProvider) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not want to use the CacheClient Configuration object here. We need a new TopicConfiguration interface/object with the minimal surface area.
Remove top level value methods and the sequence number from TopicMessage. Add pre-built topic configurations. Remove max requests and eager connections from the topic manager. Separate enumerator and subscription cancellation.
Add the initial topics implementation. The entry point is TopicClient, which contains the publish and subscribe methods. Subscribe returns an IAsyncEnumerable that can be iterated over to read from the topic.