Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug / Feature request: Time intensive custom_messages functions trigger heartbeat timeout #2608

Closed
2 tasks done
samuelspagl opened this issue Feb 20, 2024 · 4 comments
Closed
2 tasks done
Labels

Comments

@samuelspagl
Copy link
Contributor

Prerequisites

Description

Currently the functions of custom_messages are blocking the greenlet responsible for reading from the grpc connection between master and its workers.
When the function for a custom_message is time intensive, then it can happen that the heartbeat_timeout_checker is triggered, and therefore a worker is killed.

A solution for this issue would be to encapsulate the functions for custom_messages in a greenlet to ensure that the heartbeat and other message types locust depends on can be received all the time.
MasterRunner line of code
WorkerRunner line of code

Instead of

self.custom_messages[msg.type](environment=self.environment, msg=msg)

I would propose to to do

gevent.spawn(self.custom_messages[msg.type], environment, msg)

to ensure that it can run concurrently.

Still this solution will likely be breaking for some current users of Locust.

Another solution, which would probably take a bit more time to implement, but would be a lot better is, to implement some kind of store for the custom_messages and its functions instead of using a dictionary.
When adding a new listener a keyword like concurrent or non_blocking could tell this store to run the function as a greenlet or in a blocking way, therefore being fully backwards compatible for all users.

        environment.runner.register_message("acknowledge_users", on_acknowledge, concurrent=True)

I'm looking forward hearing from you! :)

Command line

locust -f mylocustfile.py --headless

Locustfile contents

from locust import HttpUser, between, events, task
from locust.runners import MasterRunner, WorkerRunner
import gevent

usernames = []


def setup_test_users(environment, msg, **kwargs):
    # Fired when the worker receives a message of type 'test_users'
    usernames.extend(map(lambda u: u["name"], msg.data))
    print("Before sleep")
    gevent.sleep(80)
    print("After Sleep")
    environment.runner.send_message("acknowledge_users", f"Thanks for the {len(msg.data)} users!")


def on_acknowledge(msg, **kwargs):
    # Fired when the master receives a message of type 'acknowledge_users'
    print(msg.data)


@events.init.add_listener
def on_locust_init(environment, **_kwargs):
    if not isinstance(environment.runner, MasterRunner):
        environment.runner.register_message("test_users", setup_test_users)
    if not isinstance(environment.runner, WorkerRunner):
        environment.runner.register_message("acknowledge_users", on_acknowledge)


@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
    # When the test is started, evenly divides list between
    # worker nodes to ensure unique data across threads
    if not isinstance(environment.runner, WorkerRunner):
        users = []
        for i in range(environment.runner.target_user_count):
            users.append({"name": f"User{i}"})

        worker_count = environment.runner.worker_count
        chunk_size = int(len(users) / worker_count)

        for i, worker in enumerate(environment.runner.clients):
            start_index = i * chunk_size

            if i + 1 < worker_count:
                end_index = start_index + chunk_size
            else:
                end_index = len(users)

            data = users[start_index:end_index]
            environment.runner.send_message("test_users", data, worker)


class WebsiteUser(HttpUser):
    host = "http://127.0.0.1:8089"
    wait_time = between(2, 5)

    def __init__(self, parent):
        self.username = usernames.pop()
        super().__init__(parent)

    @task
    def task(self):
        print(self.username)

Python version

3.11

Locust version

2.23.1

Operating system

MacOS

@cyberw
Copy link
Collaborator

cyberw commented Feb 20, 2024

Hmm. Good issue.

Maybe having a single ”custom message handler greenlet” handle events is the best, but I’m unsure. With the new distributor-pattern used by locust-plugins there can be a LOT of messages flying back and forth, so we probably dont want to start new greenlets every time.

Or maybe your second approach is best.

If you have time to try implementing it I’m all ears.

@samuelspagl
Copy link
Contributor Author

So unfortunately I'm on vacation until mid of march, therefore I can't implement it until then.

If this is still open when I return, then I'll take a look at it and try to do a PR :)

@samuelspagl
Copy link
Contributor Author

Maybe having a single ”custom message handler greenlet” handle events is the best, but I’m unsure

What about using a gevent.Pool there. We could have a base setting of size one, but if needed one could adjust this value.
I know that this would still start n greenlets but we would have more control over the amount of concurrency.

@cyberw
Copy link
Collaborator

cyberw commented Feb 21, 2024

I dont know what would be the best solution. It is kind of easy to workaround by just starting a greenlet inside the event handler yourself (pooled or not) so I think there's no rush to fix it. With distributors the master is often handling >1000 custom messages/s so I want to be sure we dont introduce any extra overhead for the "normal" case when the handler is fast.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants