Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up the support for split queues #78274

Merged
merged 7 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from sentry.conf.api_pagination_allowlist_do_not_modify import (
SENTRY_API_PAGINATION_ALLOWLIST_DO_NOT_MODIFY,
)
from sentry.conf.types.celery import SplitQueueSize
from sentry.conf.types.kafka_definition import ConsumerDefinition
from sentry.conf.types.logging_config import LoggingConfig
from sentry.conf.types.role_dict import RoleDict
Expand Down Expand Up @@ -822,6 +823,15 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"sentry.integrations.tasks",
)

# tmp(michal): Default configuration for post_process* queues split
SENTRY_POST_PROCESS_QUEUE_SPLIT_ROUTER: dict[str, Callable[[], str]] = {}

# Mapping from queue name to split queues to be used by SplitQueueRouter.
# This is meant to be used in those case where we have to specify the
# queue name when issuing a task. Example: post process.
CELERY_SPLIT_QUEUE_ROUTES: Mapping[str, SplitQueueSize] = {}


default_exchange = Exchange("default", type="direct")
control_exchange = default_exchange

Expand Down Expand Up @@ -3517,7 +3527,3 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
SENTRY_WEB_PORT = int(bind[1])

CELERYBEAT_SCHEDULE_FILENAME = f"celerybeat-schedule-{SILO_MODE}"


# tmp(michal): Default configuration for post_process* queueus split
SENTRY_POST_PROCESS_QUEUE_SPLIT_ROUTER: dict[str, Callable[[], str]] = {}
19 changes: 19 additions & 0 deletions src/sentry/conf/types/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import annotations

from typing import TypedDict


class SplitQueueSize(TypedDict):
# The total number of queues to create to split a single queue.
# This number triggers the creation of the queues themselves
# when the application starts.
total: int
# The number of queues to actually use. It has to be smaller or
# equal to `total`.
# This is the number of queues the router uses when the split
# is enable on this queue.
# This number exists in order to be able to safely increase or
# decrease the number of queues as the queues have to be created
# first, then we have to start consuming from them, only then
# we can start producing.
in_use: int
10 changes: 5 additions & 5 deletions src/sentry/eventstream/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
from enum import Enum
from typing import TYPE_CHECKING, Any, Optional, TypedDict, cast

from django.conf import settings

from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.queue.routers import SplitQueueRouter
from sentry.tasks.post_process import post_process_group
from sentry.utils.cache import cache_key_for_event
from sentry.utils.services import Service
Expand Down Expand Up @@ -65,6 +64,9 @@ class EventStream(Service):
"_get_event_type",
)

def __init__(self, **options: Any) -> None:
self.__celery_router = SplitQueueRouter()

def _dispatch_post_process_group_task(
self,
event_id: str,
Expand Down Expand Up @@ -108,9 +110,7 @@ def _get_queue_for_post_process(self, event: Event | GroupEvent) -> str:
else:
default_queue = "post_process_errors"

return settings.SENTRY_POST_PROCESS_QUEUE_SPLIT_ROUTER.get(
default_queue, lambda: default_queue
)()
return self.__celery_router.route_for_queue(default_queue)

def _get_occurrence_data(self, event: Event | GroupEvent) -> MutableMapping[str, Any]:
occurrence = cast(Optional[IssueOccurrence], getattr(event, "occurrence", None))
Expand Down
1 change: 1 addition & 0 deletions src/sentry/eventstream/kafka/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

class KafkaEventStream(SnubaProtocolEventStream):
def __init__(self, **options: Any) -> None:
super().__init__(**options)
self.topic = Topic.EVENTS
self.transactions_topic = Topic.TRANSACTIONS
self.issue_platform_topic = Topic.EVENTSTREAM_GENERIC
Expand Down
12 changes: 12 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -2748,6 +2748,18 @@
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

register(
"celery_split_queue_legacy_mode",
default=["post_process_transactions"],
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

register(
"celery_split_queue_rollout",
default={"post_process_transactions": 1.0},
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

# Secret Scanning. Allows to temporarily disable signature verification.
register(
"secret-scanning.github.enable-signature-verification",
Expand Down
68 changes: 68 additions & 0 deletions src/sentry/queue/routers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import logging
import random
from collections.abc import Sequence
from itertools import cycle

from django.conf import settings

from sentry import options
from sentry.celery import app
from sentry.utils.celery import build_queue_names

logger = logging.getLogger(__name__)


def _get_known_queues() -> set[str]:
return {c_queue.name for c_queue in app.conf.CELERY_QUEUES}


def _validate_destinations(destinations: Sequence[str]) -> None:
for dest in destinations:
assert dest in _get_known_queues(), f"Queue {dest} in split queue config is not declared."


class SplitQueueRouter:
"""
Returns the split queue to use for a Celery queue.
Split queues allow us to spread the load of a queue to multiple ones.
This takes in input a queue name and returns the split. It is supposed
to be used by the code that schedules the task.
Each split queue can be individually rolled out via options.
WARNING: Do not forget to configure your workers to listen to the
queues appropriately before you start routing messages.
"""

def __init__(self) -> None:
known_queues = _get_known_queues()
self.__queue_routers = {}
for source, dest_config in settings.CELERY_SPLIT_QUEUE_ROUTES.items():
assert source in known_queues, f"Queue {source} in split queue config is not declared."
assert dest_config["in_use"] <= dest_config["total"]

if dest_config["in_use"] >= 2:
destinations = build_queue_names(source, dest_config["in_use"])
_validate_destinations(destinations)
self.__queue_routers[source] = cycle(destinations)
else:
logger.error(
"Invalid configuration for queue %s. In use is not greater than 1: %d. Fall back to source",
source,
dest_config["in_use"],
)

def route_for_queue(self, queue: str) -> str:
rollout_rate = options.get("celery_split_queue_rollout").get(queue, 0.0)
if random.random() >= rollout_rate:
return queue

if queue in set(options.get("celery_split_queue_legacy_mode")):
# Use legacy route
# This router required to define the routing logic inside the
# settings file.
return settings.SENTRY_POST_PROCESS_QUEUE_SPLIT_ROUTER.get(queue, lambda: queue)()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks compatible with the patch that is in getsentry/ops config overrides 👍

else:
router = self.__queue_routers.get(queue)
if router is not None:
return next(router)
else:
return queue
53 changes: 53 additions & 0 deletions src/sentry/utils/celery.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,62 @@
from collections.abc import Mapping, MutableSequence, Sequence
from random import randint
from typing import Any

from celery.schedules import crontab
from kombu import Queue

from sentry.conf.types.celery import SplitQueueSize


def crontab_with_minute_jitter(*args: Any, **kwargs: Any) -> crontab:
kwargs["minute"] = randint(0, 59)
return crontab(*args, **kwargs)


def build_queue_names(base_name: str, quantity: int) -> Sequence[str]:
ret = []
for index in range(quantity):
name = f"{base_name}_{index + 1}"
ret.append(name)
return ret


def make_split_queues(config: Mapping[str, SplitQueueSize]) -> Sequence[Queue]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't seem to call this outside of tests. Do we need to setup the post_process_transactions split queues, or are we going to rely on the patching being done in getsentry/ops for a while longer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a debate in the previous PR whether we should add the queues in sentry or getsentry.
#76494 (comment).
What do you think?

I am a bit biased towards adding them in the sentry configuration rather than getsentry to create fewer surprises.
Either way, in order to rollout safely, I need to deploy this first, then adapt the current patch to avoid queue duplication (if you declare the same queue twice two queues actually show up), then I can switch to using this method.

"""
Generates the split queue definitions from the mapping between
base queue and split queue config.
"""
ret: MutableSequence[Queue] = []
for base_name, conf in config.items():
names = [
Queue(name=name, routing_key=name)
for name in build_queue_names(base_name, conf["total"])
]
ret.extend(names)

return ret


def safe_append(queues: MutableSequence[Queue], queue: Queue) -> None:
"""
We define queues as lists in the configuration and we allow override
of the config per environment.
Unfortunately if you add twice a queue with the same name to the celery
config. Celery just creates the queue twice. This can be an undesired behavior
depending on the Celery backend. So this method allows to add queues to
a list without duplications.
"""
existing_queue_names = {q.name for q in queues}
if queue.name not in existing_queue_names:
queues.append(queue)


def safe_extend(queues: MutableSequence[Queue], to_add: Sequence[Queue]) -> None:
"""
Like `safe_append` but it works like extend adding multiple queues
to the config.
"""
existing_queue_names = {q.name for q in queues}
for q in to_add:
if q.name not in existing_queue_names:
queues.append(q)
43 changes: 22 additions & 21 deletions tests/sentry/eventstream/test_eventstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from unittest.mock import Mock, patch

import pytest
from django.conf import settings
from django.test import override_settings
from django.utils import timezone
from snuba_sdk import Column, Condition, Entity, Op, Query, Request
Expand Down Expand Up @@ -341,18 +340,7 @@ def test_transaction_queue(self, mock_eventstream_insert):

@override_settings()
@patch("sentry.eventstream.backend.insert", autospec=True)
def test_queue_split_router(self, mock_eventstream_insert):
queues = [
"post_process_transactions-1",
"post_process_transactions-2",
"post_process_transactions-3",
]
queues_gen = itertools.cycle(queues)

settings.SENTRY_POST_PROCESS_QUEUE_SPLIT_ROUTER = {
"post_process_transactions": lambda: next(queues_gen)
}

def test_queue_legacy_split_router(self, mock_eventstream_insert):
event = self.__build_transaction_event()
event.group_id = None
event.groups = [self.group]
Expand All @@ -371,14 +359,27 @@ def test_queue_split_router(self, mock_eventstream_insert):
"group_states": [{"id": event.groups[0].id, **group_state}],
}

headers, body = self.__produce_payload(*insert_args, **insert_kwargs)
assert body["queue"] == "post_process_transactions-1"
headers, body = self.__produce_payload(*insert_args, **insert_kwargs)
assert body["queue"] == "post_process_transactions-2"
headers, body = self.__produce_payload(*insert_args, **insert_kwargs)
assert body["queue"] == "post_process_transactions-3"
headers, body = self.__produce_payload(*insert_args, **insert_kwargs)
assert body["queue"] == "post_process_transactions-1"
queues_gen = itertools.cycle(
[
"post_process_transactions_1",
"post_process_transactions_2",
"post_process_transactions_3",
]
)

with override_settings(
SENTRY_POST_PROCESS_QUEUE_SPLIT_ROUTER={
"post_process_transactions": lambda: next(queues_gen)
}
):
_, body = self.__produce_payload(*insert_args, **insert_kwargs)
assert body["queue"] == "post_process_transactions_1"
_, body = self.__produce_payload(*insert_args, **insert_kwargs)
assert body["queue"] == "post_process_transactions_2"
_, body = self.__produce_payload(*insert_args, **insert_kwargs)
assert body["queue"] == "post_process_transactions_3"
_, body = self.__produce_payload(*insert_args, **insert_kwargs)
assert body["queue"] == "post_process_transactions_1"

# test default assignment
insert_kwargs = {
Expand Down
Loading
Loading