From bbc5ed5346f471c238fd9d4decb3c4a4833cb7c7 Mon Sep 17 00:00:00 2001 From: Matt Funnell Date: Mon, 16 Sep 2024 10:31:03 +0100 Subject: [PATCH 01/14] Added Sources and Mirror to KV Store Creation --- .../Models/StreamSource.cs | 3 + src/NATS.Client.KeyValueStore/NatsKVConfig.cs | 17 +++-- .../NatsKVContext.cs | 65 +++++++++++++++++-- .../KeyValueStoreTest.cs | 33 ++++++++++ 4 files changed, 105 insertions(+), 13 deletions(-) diff --git a/src/NATS.Client.JetStream/Models/StreamSource.cs b/src/NATS.Client.JetStream/Models/StreamSource.cs index 684b7d667..af234b651 100644 --- a/src/NATS.Client.JetStream/Models/StreamSource.cs +++ b/src/NATS.Client.JetStream/Models/StreamSource.cs @@ -54,4 +54,7 @@ public record StreamSource [System.Text.Json.Serialization.JsonPropertyName("external")] [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] public ExternalStreamSource? External { get; set; } + + [System.Text.Json.Serialization.JsonIgnore] + public string? Domain { get; set; } } diff --git a/src/NATS.Client.KeyValueStore/NatsKVConfig.cs b/src/NATS.Client.KeyValueStore/NatsKVConfig.cs index dad8b7086..3ff86e67d 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVConfig.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVConfig.cs @@ -1,3 +1,5 @@ +using NATS.Client.JetStream.Models; + namespace NATS.Client.KeyValueStore; /// @@ -61,12 +63,15 @@ public record NatsKVConfig /// public bool Compression { get; init; } - // TODO: Bucket mirror configuration. - // pub mirror: Option, - // Bucket sources configuration. - // pub sources: Option>, - // Allow mirrors using direct API. - // pub mirror_direct: bool, + /// + /// Mirror defines the configuration for mirroring another KeyValue store + /// + public StreamSource? Mirror { get; init; } + + /// + /// Sources defines the configuration for sources of a KeyValue store. + /// + public ICollection? Sources { get; set; } } /// diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs index 6c4279739..d0743380a 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using System.Runtime.CompilerServices; using System.Text.RegularExpressions; using NATS.Client.JetStream; @@ -173,9 +174,6 @@ private static string ExtractBucketName(string streamName) private static StreamConfig CreateStreamConfig(NatsKVConfig config) { - // TODO: KV Mirrors - var subjects = new[] { $"$KV.{config.Bucket}.>" }; - long history; if (config.History > 0) { @@ -203,6 +201,60 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config) var replicas = config.NumberOfReplicas > 0 ? config.NumberOfReplicas : 1; + string[]? subjects = default; + StreamSource? mirror = default; + ICollection? sources = default; + var mirrorDirect = false; + if (config.Mirror != null) + { + mirror = new StreamSource + { + Name = config.Mirror.Name.StartsWith(KvStreamNamePrefix) ? config.Mirror.Name : BucketToStream(config.Mirror.Name), + Domain = config.Mirror.Domain, + External = config.Mirror.External, + OptStartSeq = config.Mirror.OptStartSeq, + OptStartTime = config.Mirror.OptStartTime, + SubjectTransforms = config.Mirror.SubjectTransforms, + FilterSubject = config.Mirror.FilterSubject, + }; + mirrorDirect = true; + } + else if (config.Sources != null && config.Sources.Count > 0) + { + sources = []; + foreach (var ss in config.Sources) + { + string? sourceBucketName = default; + if (ss.Name.StartsWith(KvStreamNamePrefix)) + { + sourceBucketName = ss.Name.Substring(KvStreamNamePrefixLen); + } + else + { + sourceBucketName = ss.Name; + ss.Name = BucketToStream(ss.Name); + } + + if (ss.External == null || sourceBucketName != config.Bucket) + { + ss.SubjectTransforms = [new SubjectTransform + { + Src = $"$KV.{sourceBucketName}.>", + Dest = $"$KV.{config.Bucket}.>", + } + ]; + } + + sources.Add(ss); + } + + subjects = [$"$KV.{config.Bucket}.>"]; + } + else + { + subjects = [$"$KV.{config.Bucket}.>"]; + } + var streamConfig = new StreamConfig { Name = BucketToStream(config.Bucket), @@ -221,10 +273,9 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config) AllowDirect = true, NumReplicas = replicas, Discard = StreamConfigDiscard.New, - - // TODO: KV mirrors - // MirrorDirect = - // Mirror = + Mirror = mirror, + MirrorDirect = mirrorDirect, + Sources = sources, Retention = StreamConfigRetention.Limits, // from ADR-8 }; diff --git a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs index 01c973cf9..567ec6756 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs @@ -648,4 +648,37 @@ public async Task TestDirectMessageRepublishedSubject() Assert.Equal(publishSubject3, kve3.Key); Assert.Equal("tres", kve3.Value); } + + [Fact] + public async Task Test_CombinedSources() + { + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + + var js = new NatsJSContext(nats); + var kv = new NatsKVContext(js); + + var storeSource1 = await kv.CreateStoreAsync("source1"); + var storeSource2 = await kv.CreateStoreAsync("source2"); + + var storeCombined = await kv.CreateStoreAsync(new NatsKVConfig("combined") + { + Sources = [ + new StreamSource { Name = "source1" }, + new StreamSource { Name = "source2" } + ], + }); + + await storeSource1.PutAsync("ss1_a", "a_fromStore1"); + await storeSource2.PutAsync("ss2_b", "b_fromStore2"); + + // ensure any async replication + await Task.Delay(1000); + + var entryA = await storeCombined.GetEntryAsync("ss1_a"); + var entryB = await storeCombined.GetEntryAsync("ss2_b"); + + Assert.Equal("a_fromStore1", entryA.Value); + Assert.Equal("b_fromStore2", entryB.Value); + } } From fbe0dbeda9b724eaac0183665d227f9ea6f7b8b7 Mon Sep 17 00:00:00 2001 From: Matt Funnell Date: Wed, 18 Sep 2024 09:49:14 +0100 Subject: [PATCH 02/14] Removed the need for StreamSource.Domain --- src/NATS.Client.JetStream/Models/StreamSource.cs | 3 --- src/NATS.Client.KeyValueStore/NatsKVContext.cs | 3 +-- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/NATS.Client.JetStream/Models/StreamSource.cs b/src/NATS.Client.JetStream/Models/StreamSource.cs index af234b651..684b7d667 100644 --- a/src/NATS.Client.JetStream/Models/StreamSource.cs +++ b/src/NATS.Client.JetStream/Models/StreamSource.cs @@ -54,7 +54,4 @@ public record StreamSource [System.Text.Json.Serialization.JsonPropertyName("external")] [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] public ExternalStreamSource? External { get; set; } - - [System.Text.Json.Serialization.JsonIgnore] - public string? Domain { get; set; } } diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs index d0743380a..48dc176c9 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs @@ -209,8 +209,7 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config) { mirror = new StreamSource { - Name = config.Mirror.Name.StartsWith(KvStreamNamePrefix) ? config.Mirror.Name : BucketToStream(config.Mirror.Name), - Domain = config.Mirror.Domain, + Name = config.Mirror.Name.StartsWith(KvStreamNamePrefix) ? config.Mirror.Name : BucketToStream(config.Mirror.Name), External = config.Mirror.External, OptStartSeq = config.Mirror.OptStartSeq, OptStartTime = config.Mirror.OptStartTime, From cca19326aa4fa0d947d7c9601d01fb257801fd8a Mon Sep 17 00:00:00 2001 From: Matt Funnell Date: Wed, 18 Sep 2024 10:30:26 +0100 Subject: [PATCH 03/14] Fixed trailing whitespace --- src/NATS.Client.KeyValueStore/NatsKVContext.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs index 48dc176c9..e9f143a81 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs @@ -209,7 +209,7 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config) { mirror = new StreamSource { - Name = config.Mirror.Name.StartsWith(KvStreamNamePrefix) ? config.Mirror.Name : BucketToStream(config.Mirror.Name), + Name = config.Mirror.Name.StartsWith(KvStreamNamePrefix) ? config.Mirror.Name : BucketToStream(config.Mirror.Name), External = config.Mirror.External, OptStartSeq = config.Mirror.OptStartSeq, OptStartTime = config.Mirror.OptStartTime, From baadaf17b91a382235231fefe38e9cf39626f9be Mon Sep 17 00:00:00 2001 From: Matt Funnell Date: Wed, 18 Sep 2024 10:51:02 +0100 Subject: [PATCH 04/14] Whitespace fix for KeyValueStoreTest --- tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs index 567ec6756..0aa1bd368 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs @@ -664,8 +664,8 @@ public async Task Test_CombinedSources() var storeCombined = await kv.CreateStoreAsync(new NatsKVConfig("combined") { Sources = [ - new StreamSource { Name = "source1" }, - new StreamSource { Name = "source2" } + new StreamSource { Name = "source1" }, + new StreamSource { Name = "source2" } ], }); @@ -673,7 +673,7 @@ public async Task Test_CombinedSources() await storeSource2.PutAsync("ss2_b", "b_fromStore2"); // ensure any async replication - await Task.Delay(1000); + await Task.Delay(500); var entryA = await storeCombined.GetEntryAsync("ss1_a"); var entryB = await storeCombined.GetEntryAsync("ss2_b"); From 694b370658f9d3e45c1166c98acf76afe5c196b6 Mon Sep 17 00:00:00 2001 From: Matt Funnell Date: Thu, 19 Sep 2024 08:44:37 +0100 Subject: [PATCH 05/14] Added Domain support for stream sourcing. --- .../Models/StreamSource.cs | 11 +++++ .../NatsJSContext.Streams.cs | 47 +++++++++++++++++++ src/NATS.Client.JetStream/NatsJSContext.cs | 16 +++++++ .../NatsKVContext.cs | 3 ++ 4 files changed, 77 insertions(+) diff --git a/src/NATS.Client.JetStream/Models/StreamSource.cs b/src/NATS.Client.JetStream/Models/StreamSource.cs index 684b7d667..93ee8acad 100644 --- a/src/NATS.Client.JetStream/Models/StreamSource.cs +++ b/src/NATS.Client.JetStream/Models/StreamSource.cs @@ -1,3 +1,5 @@ +using System.Diagnostics.Metrics; + namespace NATS.Client.JetStream.Models; /// @@ -54,4 +56,13 @@ public record StreamSource [System.Text.Json.Serialization.JsonPropertyName("external")] [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] public ExternalStreamSource? External { get; set; } + + /// + /// This field is a convenience for setting up an ExternalStream. + /// If set, the value here is used to calculate the JetStreamAPI prefix. + /// This field is never serialized to the server.This value cannot be set + /// external is set. + /// + [System.Text.Json.Serialization.JsonIgnore] + public string? Domain { get; set; } } diff --git a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs index 53d61cbdd..767fd9637 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs @@ -20,6 +20,53 @@ public async ValueTask CreateStreamAsync( CancellationToken cancellationToken = default) { ThrowIfInvalidStreamName(config.Name, nameof(config.Name)); + + // If we have a mirror and an external domain, convert to ext.APIPrefix. + if (config.Mirror != null && !string.IsNullOrEmpty(config.Mirror.Domain)) + { + config.Mirror = new StreamSource + { + Name = config.Mirror.Name, + Domain = config.Mirror.Domain, + External = config.Mirror.External, + FilterSubject = config.Mirror.FilterSubject, + OptStartSeq = config.Mirror.OptStartSeq, + OptStartTime = config.Mirror.OptStartTime, + SubjectTransforms = config.Mirror.SubjectTransforms, + }; + ConvertDomain(config.Mirror); + } + + // Check sources for the same. + if (config.Sources != null && config.Sources.Count > 0) + { + ICollection? sources = []; + foreach (var ss in config.Sources) + { + if (!string.IsNullOrEmpty(ss.Domain)) + { + var remappedDomainSource = new StreamSource + { + Name = ss.Name, + Domain = ss.Domain, + External = ss.External, + FilterSubject = ss.FilterSubject, + OptStartSeq = ss.OptStartSeq, + OptStartTime = ss.OptStartTime, + SubjectTransforms = ss.SubjectTransforms, + }; + ConvertDomain(remappedDomainSource); + sources.Add(remappedDomainSource); + } + else + { + sources.Add(ss); + } + } + + config.Sources = sources; + } + var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.STREAM.CREATE.{config.Name}", config, diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 78feb7fbf..252eb4044 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -1,5 +1,6 @@ using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; +using System.Xml.Schema; using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.JetStream.Internal; @@ -327,6 +328,21 @@ internal async ValueTask> JSRequestAsync throw new ArgumentException("Stream name cannot contain ' ', '.'", paramName); diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs index e9f143a81..861f048f2 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs @@ -215,6 +215,7 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config) OptStartTime = config.Mirror.OptStartTime, SubjectTransforms = config.Mirror.SubjectTransforms, FilterSubject = config.Mirror.FilterSubject, + Domain = config.Mirror.Domain, }; mirrorDirect = true; } @@ -247,6 +248,8 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config) sources.Add(ss); } + config.Sources = sources; + subjects = [$"$KV.{config.Bucket}.>"]; } else From a411063fa11ff3e3241f28b8b45a575ab5dd1514 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 19 Sep 2024 10:56:58 +0100 Subject: [PATCH 06/14] Test fix --- src/NATS.Client.Core/Nuid.cs | 2 +- .../KeyValueStoreTest.cs | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/NATS.Client.Core/Nuid.cs b/src/NATS.Client.Core/Nuid.cs index 8994f3f28..fb57fe5db 100644 --- a/src/NATS.Client.Core/Nuid.cs +++ b/src/NATS.Client.Core/Nuid.cs @@ -18,7 +18,7 @@ namespace NATS.Client.Core; [SkipLocalsInit] public sealed class Nuid { - // NuidLength, PrefixLength, SequentialLength were nuint (System.UIntPtr) in the original code + // NuidLength, PrefixLength, SequentialLength were nuint (System.UIntPtr) in the original code, // however, they were changed to uint to fix the compilation error for IL2CPP Unity projects. // With nuint, the following error occurs in Unity Linux IL2CPP builds: // Error: IL2CPP error for method 'System.Char[] NATS.Client.Core.Internal.NuidWriter::Refresh(System.UInt64&)' diff --git a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs index 0aa1bd368..ca274d1b2 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs @@ -672,8 +672,22 @@ public async Task Test_CombinedSources() await storeSource1.PutAsync("ss1_a", "a_fromStore1"); await storeSource2.PutAsync("ss2_b", "b_fromStore2"); - // ensure any async replication - await Task.Delay(500); + await Retry.Until( + "async replication is completed", + async () => + { + try + { + await storeCombined.GetEntryAsync("ss1_a"); + await storeCombined.GetEntryAsync("ss2_b"); + } + catch (NatsKVKeyNotFoundException) + { + return false; + } + + return true; + }); var entryA = await storeCombined.GetEntryAsync("ss1_a"); var entryB = await storeCombined.GetEntryAsync("ss2_b"); From ded2bca35c0a5bad79135f9850bc21ea1031e8de Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 19 Sep 2024 12:57:50 +0100 Subject: [PATCH 07/14] Keep caller's config intact. --- .../Models/StreamConfig.cs | 8 ++++++ .../Models/StreamSource.cs | 8 ++++++ .../NatsJSContext.Streams.cs | 25 ++++--------------- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/NATS.Client.JetStream/Models/StreamConfig.cs b/src/NATS.Client.JetStream/Models/StreamConfig.cs index fdacdaf30..7688533f0 100644 --- a/src/NATS.Client.JetStream/Models/StreamConfig.cs +++ b/src/NATS.Client.JetStream/Models/StreamConfig.cs @@ -245,4 +245,12 @@ internal StreamConfig() [System.Text.Json.Serialization.JsonPropertyName("metadata")] [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] public IDictionary? Metadata { get; set; } + + /// + /// Creates a shallow copy of the current StreamConfig instance using the MemberwiseClone method. + /// + /// + /// A shallow copy of the current StreamConfig. + /// + public StreamConfig ShallowCopy() => (StreamConfig)MemberwiseClone(); } diff --git a/src/NATS.Client.JetStream/Models/StreamSource.cs b/src/NATS.Client.JetStream/Models/StreamSource.cs index 93ee8acad..05fd78557 100644 --- a/src/NATS.Client.JetStream/Models/StreamSource.cs +++ b/src/NATS.Client.JetStream/Models/StreamSource.cs @@ -65,4 +65,12 @@ public record StreamSource /// [System.Text.Json.Serialization.JsonIgnore] public string? Domain { get; set; } + + /// + /// Creates a shallow copy of the current StreamSource instance using the MemberwiseClone method. + /// + /// + /// A shallow copy of the current StreamSource. + /// + public StreamSource ShallowCopy() => (StreamSource)MemberwiseClone(); } diff --git a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs index 767fd9637..b3443b0a3 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs @@ -21,19 +21,13 @@ public async ValueTask CreateStreamAsync( { ThrowIfInvalidStreamName(config.Name, nameof(config.Name)); + // keep caller's config intact. + config = config.ShallowCopy(); + // If we have a mirror and an external domain, convert to ext.APIPrefix. if (config.Mirror != null && !string.IsNullOrEmpty(config.Mirror.Domain)) { - config.Mirror = new StreamSource - { - Name = config.Mirror.Name, - Domain = config.Mirror.Domain, - External = config.Mirror.External, - FilterSubject = config.Mirror.FilterSubject, - OptStartSeq = config.Mirror.OptStartSeq, - OptStartTime = config.Mirror.OptStartTime, - SubjectTransforms = config.Mirror.SubjectTransforms, - }; + config.Mirror = config.Mirror.ShallowCopy(); ConvertDomain(config.Mirror); } @@ -45,16 +39,7 @@ public async ValueTask CreateStreamAsync( { if (!string.IsNullOrEmpty(ss.Domain)) { - var remappedDomainSource = new StreamSource - { - Name = ss.Name, - Domain = ss.Domain, - External = ss.External, - FilterSubject = ss.FilterSubject, - OptStartSeq = ss.OptStartSeq, - OptStartTime = ss.OptStartTime, - SubjectTransforms = ss.SubjectTransforms, - }; + var remappedDomainSource = ss.ShallowCopy(); ConvertDomain(remappedDomainSource); sources.Add(remappedDomainSource); } From 91b0568e9446e719d57502a7cdf5e84c804f7be9 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 19 Sep 2024 13:55:32 +0100 Subject: [PATCH 08/14] Test fix --- tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs index ca274d1b2..23f4f5b60 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs @@ -687,7 +687,9 @@ await Retry.Until( } return true; - }); + }, + retryDelay: TimeSpan.FromSeconds(3), + timeout: TimeSpan.FromSeconds(30)); var entryA = await storeCombined.GetEntryAsync("ss1_a"); var entryB = await storeCombined.GetEntryAsync("ss2_b"); From 42d8565d829e9d560339fcdf888adc30a7a6eb61 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 19 Sep 2024 14:00:21 +0100 Subject: [PATCH 09/14] Remove unused import in StreamSource.cs Deleted the 'System.Diagnostics.Metrics' import as it was not being used anywhere in the file. This change helps in maintaining a clean codebase with only necessary dependencies. --- src/NATS.Client.JetStream/Models/StreamSource.cs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/NATS.Client.JetStream/Models/StreamSource.cs b/src/NATS.Client.JetStream/Models/StreamSource.cs index 05fd78557..9b4050759 100644 --- a/src/NATS.Client.JetStream/Models/StreamSource.cs +++ b/src/NATS.Client.JetStream/Models/StreamSource.cs @@ -1,5 +1,3 @@ -using System.Diagnostics.Metrics; - namespace NATS.Client.JetStream.Models; /// @@ -60,8 +58,8 @@ public record StreamSource /// /// This field is a convenience for setting up an ExternalStream. /// If set, the value here is used to calculate the JetStreamAPI prefix. - /// This field is never serialized to the server.This value cannot be set - /// external is set. + /// This field is never serialized to the server. This value cannot be set + /// if external is set. /// [System.Text.Json.Serialization.JsonIgnore] public string? Domain { get; set; } From 45841da47255f568be8350af2a560687ee271dcc Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 19 Sep 2024 14:01:43 +0100 Subject: [PATCH 10/14] Remove unused using directive Deleted an unnecessary directive for System.Xml.Schema in NatsJSContext.cs. This cleanup aids in maintaining clean and efficient code. --- src/NATS.Client.JetStream/NatsJSContext.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 252eb4044..cd265d0e9 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -1,6 +1,5 @@ using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; -using System.Xml.Schema; using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.JetStream.Internal; From a028ac6fcf5b5f3455abdeafb5ea3e0f5f2a4af3 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 19 Sep 2024 14:12:19 +0100 Subject: [PATCH 11/14] Refactor initialization logic for stream configuration. Clean up redundant initializations and streamline the handling of `subjects`, `mirror`, and `sources` to improve code clarity. Ensure default assignments are explicitly defined within conditional branches for better code maintainability. --- .../NatsKVContext.cs | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs index 861f048f2..6ffea6f4b 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs @@ -1,4 +1,3 @@ -using System.Diagnostics; using System.Runtime.CompilerServices; using System.Text.RegularExpressions; using NATS.Client.JetStream; @@ -201,10 +200,11 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config) var replicas = config.NumberOfReplicas > 0 ? config.NumberOfReplicas : 1; - string[]? subjects = default; - StreamSource? mirror = default; - ICollection? sources = default; - var mirrorDirect = false; + string[]? subjects; + StreamSource? mirror; + ICollection? sources; + bool mirrorDirect; + if (config.Mirror != null) { mirror = new StreamSource @@ -218,6 +218,8 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config) Domain = config.Mirror.Domain, }; mirrorDirect = true; + subjects = default; + sources = default; } else if (config.Sources != null && config.Sources.Count > 0) { @@ -237,24 +239,29 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config) if (ss.External == null || sourceBucketName != config.Bucket) { - ss.SubjectTransforms = [new SubjectTransform - { - Src = $"$KV.{sourceBucketName}.>", - Dest = $"$KV.{config.Bucket}.>", - } + ss.SubjectTransforms = + [ + new SubjectTransform + { + Src = $"$KV.{sourceBucketName}.>", + Dest = $"$KV.{config.Bucket}.>", + } ]; } sources.Add(ss); } - config.Sources = sources; - subjects = [$"$KV.{config.Bucket}.>"]; + mirror = default; + mirrorDirect = false; } else { subjects = [$"$KV.{config.Bucket}.>"]; + mirror = default; + sources = default; + mirrorDirect = false; } var streamConfig = new StreamConfig From faea245873737a35f320a410f2f548814027f006 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 19 Sep 2024 14:21:35 +0100 Subject: [PATCH 12/14] Refactor mirror assignment with ShallowCopy method Simplify the mirror object instantiation by using the ShallowCopy method, reducing code redundancy. This change improves readability and maintenance by encapsulating the cloning logic within the method. --- src/NATS.Client.KeyValueStore/NatsKVContext.cs | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs index 6ffea6f4b..f81fb7f32 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs @@ -207,16 +207,10 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config) if (config.Mirror != null) { - mirror = new StreamSource - { - Name = config.Mirror.Name.StartsWith(KvStreamNamePrefix) ? config.Mirror.Name : BucketToStream(config.Mirror.Name), - External = config.Mirror.External, - OptStartSeq = config.Mirror.OptStartSeq, - OptStartTime = config.Mirror.OptStartTime, - SubjectTransforms = config.Mirror.SubjectTransforms, - FilterSubject = config.Mirror.FilterSubject, - Domain = config.Mirror.Domain, - }; + mirror = config.Mirror.ShallowCopy(); + mirror.Name = config.Mirror.Name.StartsWith(KvStreamNamePrefix) + ? config.Mirror.Name + : BucketToStream(config.Mirror.Name); mirrorDirect = true; subjects = default; sources = default; @@ -226,7 +220,7 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config) sources = []; foreach (var ss in config.Sources) { - string? sourceBucketName = default; + string? sourceBucketName; if (ss.Name.StartsWith(KvStreamNamePrefix)) { sourceBucketName = ss.Name.Substring(KvStreamNamePrefixLen); From 2f9559871cf79378c590f8738d605b35ccdc8376 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 19 Sep 2024 14:21:59 +0100 Subject: [PATCH 13/14] Refactor null and count check for config.Sources Updated the conditional check for `config.Sources` to use pattern matching, improving readability and adhering to modern C# conventions. This change ensures cleaner and more maintainable code. --- src/NATS.Client.KeyValueStore/NatsKVContext.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs index f81fb7f32..eee625c09 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs @@ -215,7 +215,7 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config) subjects = default; sources = default; } - else if (config.Sources != null && config.Sources.Count > 0) + else if (config.Sources is { Count: > 0 }) { sources = []; foreach (var ss in config.Sources) From 419cf3f4e7f4a05ac29d5377444141cfee022563 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 19 Sep 2024 16:01:00 +0100 Subject: [PATCH 14/14] Skip specific test for NATS servers earlier than v2.10 This commit updates the KeyValueStoreTest to skip the Test_CombinedSources test for NATS server versions earlier than 2.10 since some of the mirroring features are introduced with 2.10. It also removes unnecessary retry delay and timeout parameters from the test configuration. --- tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs index 23f4f5b60..8504f04ac 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs @@ -649,7 +649,7 @@ public async Task TestDirectMessageRepublishedSubject() Assert.Equal("tres", kve3.Value); } - [Fact] + [SkipIfNatsServer(versionEarlierThan: "2.10")] public async Task Test_CombinedSources() { await using var server = NatsServer.StartJS(); @@ -687,9 +687,7 @@ await Retry.Until( } return true; - }, - retryDelay: TimeSpan.FromSeconds(3), - timeout: TimeSpan.FromSeconds(30)); + }); var entryA = await storeCombined.GetEntryAsync("ss1_a"); var entryB = await storeCombined.GetEntryAsync("ss2_b");