Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SuperStream doesn't elect the single active consumer #7743

Closed
Gsantomaggio opened this issue Mar 27, 2023 · 4 comments
Closed

SuperStream doesn't elect the single active consumer #7743

Gsantomaggio opened this issue Mar 27, 2023 · 4 comments
Labels
Milestone

Comments

@Gsantomaggio
Copy link
Member

Gsantomaggio commented Mar 27, 2023

Describe the bug

The super stream doesn't elect the single active consumer when the consumers are restarted.
The consumers stop consuming and the status is always in waiting status. see the image:
Screenshot 2023-03-27 at 10 42 15

Reproduction steps

  1. Create super stream with ten partitions rabbitmq-streams add_super_stream invoices --partitions 10
  2. pump it with a few thousand messages (70k is enough)
  3. stop the producer
  4. start ten instances of this java client:
    public static void main(String[] args) throws IOException {

        System.out.println("Connecting...");
        Address entryPoint = new Address("127.0.0.1", 5552);


        Environment environment = Environment.builder()
//                .host(entryPoint.host())
//                .port(entryPoint.port())
//                .username("test")
//                .password("test")
                .addressResolver(address -> entryPoint)
                .maxConsumersByConnection(1).
                build();
        String AppName = "reference";
        String stream = "invoices";

        AtomicInteger consumed = new AtomicInteger();
        Date start = new Date();
        for (int i = 0; i < 500; i++) {


            Map<String, Integer> consumedMap = new HashMap<>();
            Consumer consumer = environment.consumerBuilder()
                    .superStream("invoices")
                    .name("reference")
                    .offset(OffsetSpecification.first())
                    .singleActiveConsumer()
                    .messageHandler((context, message) -> {

                        if (consumedMap.containsKey(stream)) {
                            consumedMap.put(stream, consumedMap.get(stream) + 1);
                        } else {
                            consumedMap.put(stream, 1);
                        }

                        if (consumedMap.get(stream) % 10 == 0) {
                            Date end = new Date();
                            System.out.println("Stream: " + context.stream() + " - Consumed " + consumedMap.get(stream) + " - Time " + (end.getTime() - start.getTime()));
                        }


                        try {
                            Thread.sleep(ThreadLocalRandom.current().nextInt(200, 1000));
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }

                    }).build();

            try {
                Thread.sleep(60000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("Restarting");

            consumer.close();
        }

    }
  1. Wait a couple of restarts
  2. you will have the issue

Expected behavior

One single active consumer has to be active

Additional context

I noticed that the invoices-1 is usually the first portion to have problems.
The invoices-0 partition usually works
The other partitions, at some point, will have the same issue.

RabbitMQ 3.11.11
Java RabbitMQ Stream / Java 0.10.0-SNAPSHOT

acogoluegnes added a commit that referenced this issue Mar 30, 2023
A group of consumers on a super stream can end up blocked
without an active consumer. This can happen with consumer
churn: one consumer gets removed, which makes the active
consumer passive, but the former active consumer never
gets to know because it has been removed itself.

This commit changes the structure of the messages the SAC
coordinator sends to consumer connections, to embed enough
information to look up the group and to instruct it to choose
a new active consumer when the race condition mentioned above
comes up.

Because of the changes in the structure of messages, a feature
flag is required to make sure the SAC coordinator starts
sending the new messages only when all the nodes have been upgraded.

References #7743
acogoluegnes added a commit that referenced this issue Mar 30, 2023
acogoluegnes added a commit that referenced this issue Mar 31, 2023
A group of consumers on a super stream can end up blocked
without an active consumer. This can happen with consumer
churn: one consumer gets removed, which makes the active
consumer passive, but the former active consumer never
gets to know because it has been removed itself.

This commit changes the structure of the messages the SAC
coordinator sends to consumer connections, to embed enough
information to look up the group and to instruct it to choose
a new active consumer when the race condition mentioned above
comes up.

Because of the changes in the structure of messages, a feature
flag is required to make sure the SAC coordinator starts
sending the new messages only when all the nodes have been upgraded.

References #7743
acogoluegnes added a commit that referenced this issue Mar 31, 2023
@michaelklishin michaelklishin added this to the 3.11.14 milestone Mar 31, 2023
acogoluegnes added a commit that referenced this issue Apr 3, 2023
The stream plugin can send frames to a client connection
and expect a response from it. This is used currently
for the consumer_update frame (single active consumer feature).
There was no timeout mechanism so far, so a slow or blocked
application could prevent a group of consumers to move on.

This commit introduces a timeout mechanism: if the expected
response takes too long to arrive, the server assumes the
connection is blocked and closes it.

The default timeout is 60 seconds but it can be changed by setting
the request_timeout parameter of the rabbitmq_stream application.

Note the mechanism does not enforce the exact duration of the timeout,
as a timer is set for the first request and re-used for other requests.
With bad timing, a request can time out after twice as long
as the set-up timeout.

References #7743
acogoluegnes added a commit that referenced this issue Apr 3, 2023
michaelklishin pushed a commit that referenced this issue Apr 4, 2023
A group of consumers on a super stream can end up blocked
without an active consumer. This can happen with consumer
churn: one consumer gets removed, which makes the active
consumer passive, but the former active consumer never
gets to know because it has been removed itself.

This commit changes the structure of the messages the SAC
coordinator sends to consumer connections, to embed enough
information to look up the group and to instruct it to choose
a new active consumer when the race condition mentioned above
comes up.

Because of the changes in the structure of messages, a feature
flag is required to make sure the SAC coordinator starts
sending the new messages only when all the nodes have been upgraded.

References #7743
michaelklishin pushed a commit that referenced this issue Apr 4, 2023
michaelklishin pushed a commit that referenced this issue Apr 4, 2023
The stream plugin can send frames to a client connection
and expect a response from it. This is used currently
for the consumer_update frame (single active consumer feature).
There was no timeout mechanism so far, so a slow or blocked
application could prevent a group of consumers to move on.

This commit introduces a timeout mechanism: if the expected
response takes too long to arrive, the server assumes the
connection is blocked and closes it.

The default timeout is 60 seconds but it can be changed by setting
the request_timeout parameter of the rabbitmq_stream application.

Note the mechanism does not enforce the exact duration of the timeout,
as a timer is set for the first request and re-used for other requests.
With bad timing, a request can time out after twice as long
as the set-up timeout.

References #7743
michaelklishin pushed a commit that referenced this issue Apr 4, 2023
mergify bot pushed a commit that referenced this issue Apr 4, 2023
A group of consumers on a super stream can end up blocked
without an active consumer. This can happen with consumer
churn: one consumer gets removed, which makes the active
consumer passive, but the former active consumer never
gets to know because it has been removed itself.

This commit changes the structure of the messages the SAC
coordinator sends to consumer connections, to embed enough
information to look up the group and to instruct it to choose
a new active consumer when the race condition mentioned above
comes up.

Because of the changes in the structure of messages, a feature
flag is required to make sure the SAC coordinator starts
sending the new messages only when all the nodes have been upgraded.

References #7743

(cherry picked from commit 70538c5)
mergify bot pushed a commit that referenced this issue Apr 4, 2023
References #7743

(cherry picked from commit f20f415)
mergify bot pushed a commit that referenced this issue Apr 4, 2023
The stream plugin can send frames to a client connection
and expect a response from it. This is used currently
for the consumer_update frame (single active consumer feature).
There was no timeout mechanism so far, so a slow or blocked
application could prevent a group of consumers to move on.

This commit introduces a timeout mechanism: if the expected
response takes too long to arrive, the server assumes the
connection is blocked and closes it.

The default timeout is 60 seconds but it can be changed by setting
the request_timeout parameter of the rabbitmq_stream application.

Note the mechanism does not enforce the exact duration of the timeout,
as a timer is set for the first request and re-used for other requests.
With bad timing, a request can time out after twice as long
as the set-up timeout.

References #7743

(cherry picked from commit 763acc2)
mergify bot pushed a commit that referenced this issue Apr 4, 2023
References #7743

(cherry picked from commit 4a669e1)
mergify bot pushed a commit that referenced this issue Apr 4, 2023
A group of consumers on a super stream can end up blocked
without an active consumer. This can happen with consumer
churn: one consumer gets removed, which makes the active
consumer passive, but the former active consumer never
gets to know because it has been removed itself.

This commit changes the structure of the messages the SAC
coordinator sends to consumer connections, to embed enough
information to look up the group and to instruct it to choose
a new active consumer when the race condition mentioned above
comes up.

Because of the changes in the structure of messages, a feature
flag is required to make sure the SAC coordinator starts
sending the new messages only when all the nodes have been upgraded.

References #7743

(cherry picked from commit 70538c5)
(cherry picked from commit 221f10d)

# Conflicts:
#	deps/rabbit/src/rabbit_core_ff.erl
#	deps/rabbit/src/rabbit_stream_sac_coordinator.erl
#	deps/rabbitmq_stream/src/rabbit_stream_reader.erl
mergify bot pushed a commit that referenced this issue Apr 4, 2023
References #7743

(cherry picked from commit f20f415)
(cherry picked from commit 0461e0a)
mergify bot pushed a commit that referenced this issue Apr 4, 2023
The stream plugin can send frames to a client connection
and expect a response from it. This is used currently
for the consumer_update frame (single active consumer feature).
There was no timeout mechanism so far, so a slow or blocked
application could prevent a group of consumers to move on.

This commit introduces a timeout mechanism: if the expected
response takes too long to arrive, the server assumes the
connection is blocked and closes it.

The default timeout is 60 seconds but it can be changed by setting
the request_timeout parameter of the rabbitmq_stream application.

Note the mechanism does not enforce the exact duration of the timeout,
as a timer is set for the first request and re-used for other requests.
With bad timing, a request can time out after twice as long
as the set-up timeout.

References #7743

(cherry picked from commit 763acc2)
(cherry picked from commit 62d016d)

# Conflicts:
#	deps/rabbitmq_stream/src/rabbit_stream_reader.erl
#	deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
mergify bot pushed a commit that referenced this issue Apr 4, 2023
References #7743

(cherry picked from commit 4a669e1)
(cherry picked from commit d9739b2)
acogoluegnes added a commit that referenced this issue Apr 5, 2023
References #7743
acogoluegnes added a commit that referenced this issue Apr 5, 2023
They can be useful and are not on hot paths, but
they are replicated on all nodes as part of the state
machine replication, so we are better off removing them
to avoid noise.

References #7743
mergify bot pushed a commit that referenced this issue Apr 5, 2023
They can be useful and are not on hot paths, but
they are replicated on all nodes as part of the state
machine replication, so we are better off removing them
to avoid noise.

References #7743

(cherry picked from commit c2bfcc4)
acogoluegnes added a commit to rabbitmq/rabbitmq-stream-java-client that referenced this issue Apr 5, 2023
github-actions bot pushed a commit to rabbitmq/rabbitmq-stream-java-client that referenced this issue Apr 5, 2023
mergify bot pushed a commit that referenced this issue Apr 5, 2023
They can be useful and are not on hot paths, but
they are replicated on all nodes as part of the state
machine replication, so we are better off removing them
to avoid noise.

References #7743

(cherry picked from commit c2bfcc4)
(cherry picked from commit f7043fb)
@michaelklishin michaelklishin modified the milestones: 3.11.14, 3.12.0 Apr 5, 2023
@fabiorosa-sn
Copy link

We are currently facing the same problem with version 3.12.10 , Erlang 25.3.2.7.

It happened 2 in the ~ 3 months we are running with SuperStreams.

The only way for us to "fix it" is to scale down the consumer application to 0, and then scale up again.

screenshot-1
screenshot-2

@Gsantomaggio
Copy link
Member Author

@fabiorosa-sn have you enabled stream_sac_coordinator_unblock_group? which stream client are you using?

@fabiorosa-sn
Copy link

fabiorosa-sn commented Jun 5, 2024

yes, it's enabled.

we are using the java stream client version 0.15.0

@fabiorosa-sn
Copy link

fabiorosa-sn commented Jun 5, 2024

Here is the result of the command

rabbitmqctl eval 'rabbit_stream_coordinator:sac_state(rabbit_stream_coordinator:state()).'

as requested by @Gsantomaggio

stream_coordinator_node-01.txt

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants