Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queues priority to metrics only works in redis #204

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,4 @@ dmypy.json

.virtualenv
celery-exporter
.idea
15 changes: 15 additions & 0 deletions .idea/celery-exporter.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions .idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/inspectionProfiles/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Empty file.
6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

213 changes: 213 additions & 0 deletions .idea/workspace.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ docker run -p 9808:9808 danihodovic/celery-exporter --broker-url=redis://redis.s
--broker-transport-option global_keyprefix=danihodovic \
--broker-transport-option visibility_timeout=7200
```

In case you changed celery transport options to enable [priority](https://docs.celeryq.dev/en/stable/userguide/routing.html#redis-message-priorities) or changed separator just use same config here as well.
```sh
--broker-transport-option sep=':' --broker-transport-option priority_steps=[0,1,2,3,4,5,6,7,8,9]
```
In case of extended transport options, such as `sentinel_kwargs` you can pass JSON string:,
for example:

Expand Down
8 changes: 6 additions & 2 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ def celery_config(broker):
)
if broker == "redis":
config["broker_url"] = "redis://localhost:6379/" # type: ignore
config["broker_transport_options"] = {
'priority_steps': list(range(10)),
'queue_order_strategy': 'priority',
'sep': ':'
}
elif broker == "rabbitmq":
config["broker_url"] = "amqp://guest:guest@localhost:5672" # type: ignore
elif broker == "memory":
Expand Down Expand Up @@ -70,7 +75,6 @@ def find_free_port():
"""

def _find_free_port():

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("0.0.0.0", 0))
Expand All @@ -88,7 +92,7 @@ def exporter_instance(find_free_port, celery_config, log_level):
"host": "0.0.0.0",
"port": find_free_port(),
"broker_url": celery_config["broker_url"],
"broker_transport_option": ["visibility_timeout=7200"],
"broker_transport_option": ["visibility_timeout=7200", "sep=:", "priority_steps=[0,1,2,3,4,5,6,7,8,9]"],
"broker_ssl_option": [],
"retry_interval": 5,
"log_level": log_level,
Expand Down
22 changes: 19 additions & 3 deletions src/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
from celery.utils import nodesplit # type: ignore
from kombu.exceptions import ChannelError # type: ignore
from loguru import logger
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram
from prometheus_client import (
CollectorRegistry,
Counter,
Gauge,
Histogram,
)

from .http_server import start_http_server

Expand Down Expand Up @@ -106,7 +111,7 @@ def __init__(self, buckets=None):
def track_queue_metrics(self):
with self.app.connection() as connection: # type: ignore
transport = connection.info()["transport"]
acceptable_transports = ["redis", "rediss", "amqp", "memory", "sentinel"]
acceptable_transports = ["redis", "rediss", "amqp", "amqps", "memory", "sentinel"]
if transport not in acceptable_transports:
logger.debug(
f"Queue length tracking is only implemented for {acceptable_transports}"
Expand All @@ -117,9 +122,20 @@ def track_queue_metrics(self):
# we need to cache queue info in exporter in case all workers are offline
# so that no worker response to exporter will make active_queues return None
queues = self.app.control.inspect().active_queues() or {}
queue_cache = set()
for info_list in queues.values():
for queue_info in info_list:
self.queue_cache.add(queue_info["name"])
queue_cache.add((queue_info['name']))

# Check celery queues based on worker separator & priority steps
separator = '\x06\x16'
if 'sep' in self.app.conf["broker_transport_options"]:
separator = self.app.conf["broker_transport_options"]['sep']
if 'priority_steps' in self.app.conf["broker_transport_options"]:
for queue in queue_cache:
for step in self.app.conf['broker_transport_options']['priority_steps']:
self.queue_cache.add(f'{queue}{separator}{str(step)}')

track_length = lambda q, l: self.celery_queue_length.labels(
queue_name=q
Expand All @@ -128,7 +144,7 @@ def track_queue_metrics(self):
if transport in ["redis", "rediss", "sentinel"]:
queue_length = redis_queue_length(connection, queue)
track_length(queue, queue_length)
elif transport in ["amqp", "memory"]:
elif transport in ["amqp", "amqps", "memory"]:
queue_length = rabbitmq_queue_length(connection, queue)
track_length(queue, queue_length)

Expand Down
Loading