Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: NATS subscriber retry seems not working #1888

Closed
Atanusaha143 opened this issue Nov 1, 2024 · 9 comments
Closed

Bug: NATS subscriber retry seems not working #1888

Atanusaha143 opened this issue Nov 1, 2024 · 9 comments
Labels
bug Something isn't working

Comments

@Atanusaha143
Copy link

Atanusaha143 commented Nov 1, 2024

Describe the bug
According to NATS ack it should retry, whenever an error occurs. But not getting the expected behavior.

How to reproduce

from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


@broker.subscriber(subject="test", retry=True)
async def handle_test(data: dict):
    a = 5
    b = 0
    c = a / b

And/Or steps to reproduce the behavior:

  1. similar for any error cases
  2. NatsMessage.nack() is not helping

Expected behavior

If the retry flag is set to True, the message will be nacked and placed back in the queue each time an error occurs.

Observed behavior
Even if the retry flag is set to True, the message is not retrying

@Atanusaha143 Atanusaha143 added the bug Something isn't working label Nov 1, 2024
@Lancetnik
Copy link
Member

Lancetnik commented Nov 1, 2024

@Atanusaha143 thank you for the report!
Anyway, I should warn you: retry option is deprecated and will be removed in 0.6.0 version to prior to ack_policy=AckPolicy.NackOnError

@Lancetnik
Copy link
Member

You can take a look at draft - #1869

@Atanusaha143
Copy link
Author

@Lancetnik Thank you for the prompt reply and the heads-up. Are there alternative methods to implement a retry mechanism using Faststream?

@Lancetnik
Copy link
Member

@Lancetnik Thank you for the prompt reply and the heads-up. Are there alternative methods to implement a retry mechanism using Faststream?

With Nast Core it can be implemented via re-publish only. retry should works fine with Nats JetStream now - it has acknowledgement policy and FS just calls nak to redeliver message on error if retry option setted. Anyway, retry logic strongly depends on real broker features, this reason I want to remove this option and replace it by much explicit ack_policy

@Atanusaha143
Copy link
Author

@Lancetnik The code provided in the description raises a ZeroDivisionError. Based on the logic, it should be re-queued on the subject named test, and the subscriber for the test subject should catch it and execute the handle_test function again. However, this expected behavior is not happening.

@Lancetnik
Copy link
Member

@Lancetnik The code provided in the description raises a ZeroDivisionError. Based on the logic, it should be re-queued on the subject named test, and the subscriber for the test subject should catch it and execute the handle_test function again. However, this expected behavior is not happening.

It works only for Nats JS. Nats core has no "requeue" functional. You should setup a stream

@Atanusaha143
Copy link
Author

Atanusaha143 commented Nov 3, 2024

# publisher

from nats.aio.client import Client as NATS
from nats.js import JetStreamContext
from nats.js.api import StreamConfig


class TestPublisher:
    def __init__(self):
        self._conn = None

    @staticmethod
    async def _get_jetstream_conn() -> JetStreamContext:
        nats: NATS = NATS()
        await nats.connect("nats://localhost:4222")
        return nats.jetstream()

    @staticmethod
    async def _add_stream(conn: JetStreamContext, subjects: list[str], name: str = "test_stream") -> JetStreamContext:
        try:
            await conn.add_stream(StreamConfig(name=name, subjects=subjects))
            print("Stream created successfully.")
            return conn
        except Exception as e:
            print(f"Failed to create stream: {str(e)}")

    async def _setup_publisher(self, stream_name: str, subjects: list[str]) -> None:
        self._conn = await self._get_jetstream_conn()
        self._conn = await self._add_stream(conn=self._conn, subjects=subjects, name=stream_name)

    async def publish(self, stream_name: str, subjects: list[str], data: bytes) -> None:
        try:
            if self._conn is None:
                await self._setup_publisher(stream_name=stream_name, subjects=subjects)
            for subject in subjects:
                print(f"Published into Stream: {stream_name}")
                ack = await self._conn.publish(subject=subject, payload=data)
                print(f'Ack: stream={ack.stream}, sequence={ack.seq}')
        except Exception as e:
            print(f"Failed to publish to '{subjects}': {str(e)}")
# subscriber

from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


@broker.subscriber(subject="test_subject", retry=True)
async def handle_test(data: str):
    print(data)
    a = 4
    b = 0
    c = a / b

I intentionally performed the division operation to test whether the retry mechanism is functioning properly.

await test_publisher.publish(stream_name="test_stream", subjects=["test_subject"], data=b"started testing")

@Lancetnik When calling the test_publisher's publish method, the retry mechanism should trigger since it throws a ZeroDivisionError, right?

@Lancetnik
Copy link
Member

@Atanusaha143 you should setup stream in subscriber, not in publisher to consume messages from Jetstream

@Atanusaha143
Copy link
Author

As a solution, pull consumer is required, and a pull consumer must have a stream

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants