From e952a3a0de91e761b3f5b4070e61300693230cf3 Mon Sep 17 00:00:00 2001 From: Marco Primi Date: Fri, 9 Aug 2024 20:54:03 -0700 Subject: [PATCH 1/2] Consumer test with stream start sequence Test various ways of consuming a stream with a given sequence number offset. --- server/jetstream_cluster_4_test.go | 323 +++++++++++++++++++++++++++++ 1 file changed, 323 insertions(+) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 0b0125fa4fb..85e01e92352 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -3233,3 +3233,326 @@ func TestJetStreamClusterPubAckSequenceDupeAsync(t *testing.T) { require_True(t, (pubAcks[0].Duplicate || pubAcks[1].Duplicate) && (pubAcks[0].Duplicate != pubAcks[1].Duplicate)) } } + +func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) { + + const ( + NumMessages = 10 + ChosenSeq = 5 + StreamName = "TEST" + StreamSubject = "ORDERS.*" + StreamSubjectPrefix = "ORDERS." + ) + + for _, ClusterSize := range []int{ + 1, // Single server + 3, // 3-node cluster + } { + R := ClusterSize + t.Run( + fmt.Sprintf("Nodes:%d,Replicas:%d", ClusterSize, R), + func(t *testing.T) { + // This is the success condition for all sub-tests below + var ExpectedMsgId = "" + checkMessage := func(t *testing.T, msg *nats.Msg) { + msgMeta, err := msg.Metadata() + require_NoError(t, err) + + // Check sequence number + require_Equal(t, msgMeta.Sequence.Stream, ChosenSeq) + + // Check message id + require_NotEqual(t, ExpectedMsgId, "") + require_Equal(t, msg.Header.Get(nats.MsgIdHdr), ExpectedMsgId) + } + + checkRawMessage := func(t *testing.T, msg *nats.RawStreamMsg) { + // Check sequence number + require_Equal(t, msg.Sequence, ChosenSeq) + + // Check message id + require_NotEqual(t, ExpectedMsgId, "") + require_Equal(t, msg.Header.Get(nats.MsgIdHdr), ExpectedMsgId) + } + + // Setup: start server or cluster, connect client + var server *Server + if ClusterSize == 1 { + server = RunBasicJetStreamServer(t) + defer server.Shutdown() + } else { + c := createJetStreamCluster(t, jsClusterTempl, "HUB", _EMPTY_, ClusterSize, 22020, true) + defer c.shutdown() + server = c.randomServer() + } + + // Setup: connect + var nc *nats.Conn + var js nats.JetStreamContext + nc, js = jsClientConnect(t, server) + defer nc.Close() + + // Setup: create stream + _, err := js.AddStream(&nats.StreamConfig{ + Replicas: R, + Name: StreamName, + Subjects: []string{StreamSubject}, + }) + require_NoError(t, err) + + // Setup: create subscriptions before stream is populated + var preCreatedSub, preCreatedSubDurable *nats.Subscription + { + preCreatedSub, err = js.PullSubscribe( + StreamSubject, + "", + nats.StartSequence(ChosenSeq), + ) + require_NoError(t, err) + defer func() { + require_NoError(t, preCreatedSub.Unsubscribe()) + }() + + const Durable = "dlc_pre_created" + c, err := js.AddConsumer(StreamName, &nats.ConsumerConfig{ + Durable: Durable, + DeliverPolicy: nats.DeliverByStartSequencePolicy, + OptStartSeq: ChosenSeq, + Replicas: R, + }) + require_NoError(t, err) + defer func() { + require_NoError(t, js.DeleteConsumer(c.Stream, c.Name)) + }() + + preCreatedSubDurable, err = js.PullSubscribe( + "", + "", + nats.Bind(StreamName, Durable), + ) + require_NoError(t, err) + defer func() { + require_NoError(t, preCreatedSubDurable.Unsubscribe()) + }() + } + + // Setup: populate stream + buf := make([]byte, 100) + for i := uint64(1); i <= NumMessages; i++ { + msgId := nuid.Next() + pubAck, err := js.Publish(StreamSubjectPrefix+strconv.Itoa(int(i)), buf, nats.MsgId(msgId)) + require_NoError(t, err) + + // Verify assumption made in tests below + require_Equal(t, pubAck.Sequence, i) + + if i == ChosenSeq { + // Save the expected message id for the chosen message + ExpectedMsgId = msgId + } + } + + // Tests various ways to consume the stream starting at the ChosenSeq sequence + + t.Run( + "DurableConsumer", + func(t *testing.T) { + const Durable = "dlc" + c, err := js.AddConsumer(StreamName, &nats.ConsumerConfig{ + Durable: Durable, + DeliverPolicy: nats.DeliverByStartSequencePolicy, + OptStartSeq: ChosenSeq, + Replicas: R, + }) + require_NoError(t, err) + defer func() { + require_NoError(t, js.DeleteConsumer(c.Stream, c.Name)) + }() + + sub, err := js.PullSubscribe( + StreamSubject, + Durable, + ) + require_NoError(t, err) + defer func() { + require_NoError(t, sub.Unsubscribe()) + }() + + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Equal(t, len(msgs), 1) + + checkMessage(t, msgs[0]) + }, + ) + + t.Run( + "DurableConsumerWithBind", + func(t *testing.T) { + const Durable = "dlc_bind" + c, err := js.AddConsumer(StreamName, &nats.ConsumerConfig{ + Durable: Durable, + DeliverPolicy: nats.DeliverByStartSequencePolicy, + OptStartSeq: ChosenSeq, + Replicas: R, + }) + require_NoError(t, err) + defer func() { + require_NoError(t, js.DeleteConsumer(c.Stream, c.Name)) + }() + + sub, err := js.PullSubscribe( + "", + "", + nats.Bind(StreamName, Durable), + ) + require_NoError(t, err) + defer func() { + require_NoError(t, sub.Unsubscribe()) + }() + + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Equal(t, len(msgs), 1) + + checkMessage(t, msgs[0]) + }, + ) + + t.Run( + "PreCreatedDurableConsumerWithBind", + func(t *testing.T) { + msgs, err := preCreatedSubDurable.Fetch(1) + require_NoError(t, err) + require_Equal(t, len(msgs), 1) + + checkMessage(t, msgs[0]) + }, + ) + + t.Run( + "PullConsumer", + func(t *testing.T) { + sub, err := js.PullSubscribe( + StreamSubject, + "", + nats.StartSequence(ChosenSeq), + ) + require_NoError(t, err) + defer func() { + require_NoError(t, sub.Unsubscribe()) + }() + + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Equal(t, len(msgs), 1) + + checkMessage(t, msgs[0]) + }, + ) + + t.Run( + "PreCreatedPullConsumer", + func(t *testing.T) { + msgs, err := preCreatedSub.Fetch(1) + require_NoError(t, err) + require_Equal(t, len(msgs), 1) + + checkMessage(t, msgs[0]) + }, + ) + + t.Run( + "SynchronousConsumer", + func(t *testing.T) { + sub, err := js.SubscribeSync( + StreamSubject, + nats.StartSequence(ChosenSeq), + ) + if err != nil { + return + } + require_NoError(t, err) + defer func() { + require_NoError(t, sub.Unsubscribe()) + }() + + msg, err := sub.NextMsg(1 * time.Second) + require_NoError(t, err) + checkMessage(t, msg) + }, + ) + + t.Run( + "CallbackSubscribe", + func(t *testing.T) { + var waitGroup sync.WaitGroup + waitGroup.Add(1) + // To be populated by callback + var receivedMsg *nats.Msg + + sub, err := js.Subscribe( + StreamSubject, + func(msg *nats.Msg) { + // Save first message received + if receivedMsg == nil { + receivedMsg = msg + waitGroup.Done() + } + }, + nats.StartSequence(ChosenSeq), + ) + require_NoError(t, err) + defer func() { + require_NoError(t, sub.Unsubscribe()) + }() + + waitGroup.Wait() + require_NotNil(t, receivedMsg) + checkMessage(t, receivedMsg) + }, + ) + + t.Run( + "ChannelSubscribe", + func(t *testing.T) { + msgChannel := make(chan *nats.Msg, 1) + sub, err := js.ChanSubscribe( + StreamSubject, + msgChannel, + nats.StartSequence(ChosenSeq), + ) + require_NoError(t, err) + defer func() { + require_NoError(t, sub.Unsubscribe()) + }() + + msg := <-msgChannel + checkMessage(t, msg) + }, + ) + + t.Run( + "GetRawStreamMessage", + func(t *testing.T) { + rawMsg, err := js.GetMsg(StreamName, ChosenSeq) + require_NoError(t, err) + checkRawMessage(t, rawMsg) + }, + ) + + t.Run( + "GetLastMessageBySubject", + func(t *testing.T) { + rawMsg, err := js.GetLastMsg( + StreamName, + fmt.Sprintf("ORDERS.%d", ChosenSeq), + ) + require_NoError(t, err) + checkRawMessage(t, rawMsg) + }, + ) + }, + ) + } +} From 11b2bd77cc64723464a320b2eb4429a2d7aa8b02 Mon Sep 17 00:00:00 2001 From: Marco Primi Date: Tue, 13 Aug 2024 13:08:09 -0700 Subject: [PATCH 2/2] Test ACKing deleted messages Test that explicitly ACKs messages that have been already removed from a stream --- server/jetstream_cluster_4_test.go | 132 +++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 85e01e92352..3f97518590a 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -3556,3 +3556,135 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) { ) } } + +func TestJetStreamClusterAckDeleted(t *testing.T) { + + const ( + NumMessages = 10 + StreamName = "TEST" + StreamSubject = "ORDERS.*" + StreamSubjectPrefix = "ORDERS." + ) + + for _, ClusterSize := range []int{ + 1, // Single server + 3, // 3-node cluster + } { + R := ClusterSize + t.Run( + fmt.Sprintf("Nodes:%d,Replicas:%d", ClusterSize, R), + func(t *testing.T) { + // Setup: start server or cluster, connect client + var server *Server + if ClusterSize == 1 { + server = RunBasicJetStreamServer(t) + defer server.Shutdown() + } else { + c := createJetStreamCluster(t, jsClusterTempl, "HUB", _EMPTY_, ClusterSize, 22020, true) + defer c.shutdown() + server = c.randomServer() + } + + // Setup: connect + var nc *nats.Conn + var js nats.JetStreamContext + nc, js = jsClientConnect(t, server) + defer nc.Close() + + // Setup: create stream + _, err := js.AddStream(&nats.StreamConfig{ + Replicas: R, + Name: StreamName, + Subjects: []string{StreamSubject}, + Retention: nats.LimitsPolicy, + Discard: nats.DiscardOld, + MaxMsgs: 1, // Only keep the latest message + }) + require_NoError(t, err) + + // Setup: create durable consumer and subscription + const Durable = "dlc" + c, err := js.AddConsumer(StreamName, &nats.ConsumerConfig{ + Durable: Durable, + Replicas: R, + AckPolicy: nats.AckExplicitPolicy, + MaxAckPending: NumMessages, + }) + require_NoError(t, err) + defer func() { + require_NoError(t, js.DeleteConsumer(c.Stream, c.Name)) + }() + + // Setup: create durable consumer subscription + sub, err := js.PullSubscribe( + "", + "", + nats.Bind(StreamName, Durable), + ) + require_NoError(t, err) + defer func() { + require_NoError(t, sub.Unsubscribe()) + }() + + // Collect received and non-ACKed messages + receivedMessages := make([]*nats.Msg, 0, NumMessages) + + buf := make([]byte, 100) + for i := uint64(1); i <= NumMessages; i++ { + // Publish one message + msgId := nuid.Next() + pubAck, err := js.Publish( + StreamSubjectPrefix+strconv.Itoa(int(i)), + buf, + nats.MsgId(msgId), + ) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, i) + + // Consume message + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Equal(t, len(msgs), 1) + + // Validate message + msg := msgs[0] + require_Equal(t, msgs[0].Header.Get(nats.MsgIdHdr), msgId) + + // Validate message metadata + msgMeta, err := msg.Metadata() + require_NoError(t, err) + // Check sequence number + require_Equal(t, msgMeta.Sequence.Stream, i) + + // Save for ACK later + receivedMessages = append(receivedMessages, msg) + } + + // Verify stream state, expecting a single message due to limits + streamInfo, err := js.StreamInfo(StreamName) + require_NoError(t, err) + require_Equal(t, streamInfo.State.Msgs, 1) + + // Verify consumer state, expecting ack floor corresponding to messages dropped + consumerInfo, err := js.ConsumerInfo(StreamName, Durable) + require_NoError(t, err) + require_Equal(t, consumerInfo.NumAckPending, 1) + require_Equal(t, consumerInfo.AckFloor.Stream, 9) + require_Equal(t, consumerInfo.AckFloor.Consumer, 9) + + // ACK all messages (all except last have been dropped from the stream) + for _, message := range receivedMessages { + err := message.AckSync() + require_NoError(t, err) + } + + // Verify consumer state, all messages ACKed + consumerInfo, err = js.ConsumerInfo(StreamName, Durable) + require_NoError(t, err) + require_Equal(t, consumerInfo.NumAckPending, 0) + require_Equal(t, consumerInfo.AckFloor.Stream, 10) + require_Equal(t, consumerInfo.AckFloor.Consumer, 10) + }, + ) + } +}