Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WebPubSubClient] Parsing message returns IList #39137

Merged
merged 3 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public WebPubSubJsonProtocol() { }
public override string Name { get { throw null; } }
public override Azure.Messaging.WebPubSub.Clients.WebPubSubProtocolMessageType WebSocketMessageType { get { throw null; } }
public override System.ReadOnlyMemory<byte> GetMessageBytes(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message) { throw null; }
public override Azure.Messaging.WebPubSub.Clients.WebPubSubMessage ParseMessage(System.Buffers.ReadOnlySequence<byte> input) { throw null; }
public override System.Collections.Generic.IList<Azure.Messaging.WebPubSub.Clients.WebPubSubMessage> ParseMessage(System.Buffers.ReadOnlySequence<byte> input) { throw null; }
public override void WriteMessage(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message, System.Buffers.IBufferWriter<byte> output) { }
}
public partial class WebPubSubJsonReliableProtocol : Azure.Messaging.WebPubSub.Clients.WebPubSubProtocol
Expand All @@ -162,7 +162,7 @@ public WebPubSubJsonReliableProtocol() { }
public override string Name { get { throw null; } }
public override Azure.Messaging.WebPubSub.Clients.WebPubSubProtocolMessageType WebSocketMessageType { get { throw null; } }
public override System.ReadOnlyMemory<byte> GetMessageBytes(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message) { throw null; }
public override Azure.Messaging.WebPubSub.Clients.WebPubSubMessage ParseMessage(System.Buffers.ReadOnlySequence<byte> input) { throw null; }
public override System.Collections.Generic.IList<Azure.Messaging.WebPubSub.Clients.WebPubSubMessage> ParseMessage(System.Buffers.ReadOnlySequence<byte> input) { throw null; }
public override void WriteMessage(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message, System.Buffers.IBufferWriter<byte> output) { }
}
public abstract partial class WebPubSubMessage
Expand All @@ -176,7 +176,7 @@ protected WebPubSubProtocol() { }
public abstract string Name { get; }
public abstract Azure.Messaging.WebPubSub.Clients.WebPubSubProtocolMessageType WebSocketMessageType { get; }
public abstract System.ReadOnlyMemory<byte> GetMessageBytes(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message);
public abstract Azure.Messaging.WebPubSub.Clients.WebPubSubMessage ParseMessage(System.Buffers.ReadOnlySequence<byte> input);
public abstract System.Collections.Generic.IList<Azure.Messaging.WebPubSub.Clients.WebPubSubMessage> ParseMessage(System.Buffers.ReadOnlySequence<byte> input);
public abstract void WriteMessage(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message, System.Buffers.IBufferWriter<byte> output);
}
public enum WebPubSubProtocolMessageType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public override ReadOnlyMemory<byte> GetMessageBytes(WebPubSubMessage message)
/// </summary>
/// <param name="input">The serialized representation of the message.</param>
/// <returns>A <see cref="WebPubSubMessage"/></returns>
public override WebPubSubMessage ParseMessage(ReadOnlySequence<byte> input)
public override IList<WebPubSubMessage> ParseMessage(ReadOnlySequence<byte> input)
zackliu marked this conversation as resolved.
Show resolved Hide resolved
{
return _processor.ParseMessage(input);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public ReadOnlyMemory<byte> GetMessageBytes(WebPubSubMessage message)
return new Memory<byte>(writer.ToArray());
}

public virtual WebPubSubMessage ParseMessage(ReadOnlySequence<byte> input)
public virtual IList<WebPubSubMessage> ParseMessage(ReadOnlySequence<byte> input)
{
try
{
Expand Down Expand Up @@ -264,7 +264,7 @@ public virtual WebPubSubMessage ParseMessage(ReadOnlySequence<byte> input)
case DownstreamEventType.Ack:
AssertNotNull(ackId, AckIdPropertyName);
AssertNotNull(success, SuccessPropertyName);
return new AckMessage(ackId.Value, success.Value, errorDetail);
return new List<WebPubSubMessage> { new AckMessage(ackId.Value, success.Value, errorDetail) };

case DownstreamEventType.Message:
AssertNotNull(from, FromPropertyName);
Expand All @@ -273,10 +273,10 @@ public virtual WebPubSubMessage ParseMessage(ReadOnlySequence<byte> input)
switch (fromType)
{
case FromType.Server:
return new ServerDataMessage(dataType, data, sequenceId);
return new List<WebPubSubMessage> { new ServerDataMessage(dataType, data, sequenceId) };
case FromType.Group:
AssertNotNull(group, GroupPropertyName);
return new GroupDataMessage(group, dataType, data, sequenceId, fromUserId);
return new List<WebPubSubMessage> { new GroupDataMessage(group, dataType, data, sequenceId, fromUserId) };
// Forward compatible
default:
return null;
Expand All @@ -288,9 +288,9 @@ public virtual WebPubSubMessage ParseMessage(ReadOnlySequence<byte> input)
switch (systemEventType)
{
case SystemEventType.Connected:
return new ConnectedMessage(userId, connectionId, reconnectionToken);
return new List<WebPubSubMessage> { new ConnectedMessage(userId, connectionId, reconnectionToken) };
case SystemEventType.Disconnected:
return new DisconnectedMessage(message);
return new List<WebPubSubMessage> { new DisconnectedMessage(message) };
// Forward compatible
default:
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public override ReadOnlyMemory<byte> GetMessageBytes(WebPubSubMessage message)
/// </summary>
/// <param name="input">The serialized representation of the message.</param>
/// <returns>A <see cref="WebPubSubMessage"/></returns>
public override WebPubSubMessage ParseMessage(ReadOnlySequence<byte> input)
public override IList<WebPubSubMessage> ParseMessage(ReadOnlySequence<byte> input)
{
return _processor.ParseMessage(input);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public abstract class WebPubSubProtocol
/// </summary>
/// <param name="input">The serialized representation of the message.</param>
/// <returns>A <see cref="WebPubSubMessage"/></returns>
public abstract WebPubSubMessage ParseMessage(ReadOnlySequence<byte> input);
public abstract IList<WebPubSubMessage> ParseMessage(ReadOnlySequence<byte> input);

/// <summary>
/// Writes the specified <see cref="WebPubSubMessage"/> to a writer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,10 @@ private async Task ListenLoop(IWebSocketClient client, CancellationToken token)
{
try
{
var message = _protocol.ParseMessage(result.Payload);
await HandleMessageAsync(message, token).ConfigureAwait(false);
foreach (var message in _protocol.ParseMessage(result.Payload))
{
await HandleMessageAsync(message, token).ConfigureAwait(false);
}
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
Expand Down Expand Up @@ -152,7 +153,7 @@ public void ParseMessageTest(byte[] payload, Action<WebPubSubMessage> messageAss
{
var protocol = new WebPubSubJsonProtocol();
var resolvedMessage = protocol.ParseMessage(new ReadOnlySequence<byte>(payload));
messageAssert(resolvedMessage);
messageAssert(resolvedMessage[0]);
}

[TestCaseSource(nameof(GetSerializingTestData))]
Expand Down
Loading