Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement prometheus request batching #120

Merged
merged 3 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 72 additions & 6 deletions robusta_krr/core/integrations/prometheus/metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import asyncio
import datetime
import enum
import copy
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Any, Optional
from typing import Any, TYPE_CHECKING, Optional
from functools import reduce

import numpy as np
import pydantic as pd
Expand Down Expand Up @@ -156,7 +158,6 @@ async def load_data(
)

if result == []:
self.warning(f"{self.service_name} returned no {self.__class__.__name__} metrics for {object}")
return {}

return {pod_result["metric"]["pod"]: np.array(pod_result["values"], dtype=np.float64) for pod_result in result}
Expand All @@ -177,7 +178,7 @@ class QueryMetric(PrometheusMetric):
PrometheusSeries = Any


class FilterMetric(PrometheusMetric):
class FilterJobsMixin(PrometheusMetric):
"""
This is the version of the BasicMetricLoader, that filters out data,
if multiple metrics with the same name were found.
Expand Down Expand Up @@ -206,16 +207,16 @@ def filter_prom_jobs_results(
return series_list_result

target_names = {
FilterMetric.get_target_name(series)
FilterJobsMixin.get_target_name(series)
for series in series_list_result
if FilterMetric.get_target_name(series)
if FilterJobsMixin.get_target_name(series)
}
return_list: list[PrometheusSeries] = []

# takes kubelet job if exists, return first job alphabetically if it doesn't
for target_name in target_names:
relevant_series = [
series for series in series_list_result if FilterMetric.get_target_name(series) == target_name
series for series in series_list_result if FilterJobsMixin.get_target_name(series) == target_name
]
relevant_kubelet_metric = [series for series in relevant_series if series["metric"].get("job") == "kubelet"]
if len(relevant_kubelet_metric) == 1:
Expand All @@ -228,3 +229,68 @@ def filter_prom_jobs_results(
async def query_prometheus(self, data: PrometheusMetricData) -> list[PrometheusSeries]:
result = await super().query_prometheus(data)
return self.filter_prom_jobs_results(result)


class BatchedRequestMixin(PrometheusMetric):
LeaveMyYard marked this conversation as resolved.
Show resolved Hide resolved
"""
This type of PrometheusMetric is used to split the query into multiple queries,
each querying a subset of the pods of the object.

The results of the queries are then combined into a single result.

This is useful when the number of pods is too large for a single query.
"""

pods_batch_size = 50

def combine_batches(self, results: list[PodsTimeData]) -> PodsTimeData:
"""
Combines the results of multiple queries into a single result.

Args:
results (list[MetricPodData]): A list of query results.

Returns:
MetricPodData: A combined result.
"""

return reduce(lambda x, y: x | y, results, {})

@staticmethod
def _slice_object(object: K8sObjectData, s: slice) -> K8sObjectData:
obj_copy = copy.deepcopy(object)
obj_copy.pods = object.pods[s]
return obj_copy

@staticmethod
def _split_objects(object: K8sObjectData, max_pods: int) -> list[K8sObjectData]:
"""
Splits the object into multiple objects, each containing at most max_pods pods.

Args:
object (K8sObjectData): The object to split.

Returns:
list[K8sObjectData]: A list of objects.
"""
return [
BatchedRequestMixin._slice_object(object, slice(i, i + max_pods))
for i in range(0, len(object.pods), max_pods)
]

async def load_data(
self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta
) -> PodsTimeData:
splitted_objects = self._split_objects(object, self.pods_batch_size)

# If we do not exceed the batch size, we can use the regular load_data method.
if len(splitted_objects) <= 1:
return await super().load_data(object, period, step)

results = await asyncio.gather(
*[
super(BatchedRequestMixin, self).load_data(splitted_object, period, step)
for splitted_object in splitted_objects
]
)
return self.combine_batches(results)
9 changes: 5 additions & 4 deletions robusta_krr/core/integrations/prometheus/metrics/cpu.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from robusta_krr.core.models.objects import K8sObjectData
from robusta_krr.core.abstract.strategies import PodsTimeData

from .base import FilterMetric, QueryMetric, QueryRangeMetric
from .base import QueryMetric, QueryRangeMetric, FilterJobsMixin, BatchedRequestMixin


class CPULoader(QueryRangeMetric, FilterMetric):
class CPULoader(QueryRangeMetric, FilterJobsMixin, BatchedRequestMixin):
def get_query(self, object: K8sObjectData, resolution: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand All @@ -21,7 +22,7 @@ def get_query(self, object: K8sObjectData, resolution: str) -> str:
"""


class MaxCPULoader(QueryMetric, FilterMetric):
class MaxCPULoader(QueryMetric, FilterJobsMixin, BatchedRequestMixin):
def get_query(self, object: K8sObjectData, resolution: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand All @@ -40,7 +41,7 @@ def get_query(self, object: K8sObjectData, resolution: str) -> str:


def PercentileCPULoader(percentile: float) -> type[QueryMetric]:
class PercentileCPULoader(QueryMetric, FilterMetric):
class PercentileCPULoader(QueryMetric, FilterJobsMixin, BatchedRequestMixin):
def get_query(self, object: K8sObjectData, resolution: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand Down
10 changes: 6 additions & 4 deletions robusta_krr/core/integrations/prometheus/metrics/memory.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from robusta_krr.core.abstract.strategies import PodsTimeData

from robusta_krr.core.models.objects import K8sObjectData

from .base import FilterMetric, QueryMetric, QueryRangeMetric
from .base import QueryMetric, QueryRangeMetric, FilterJobsMixin, BatchedRequestMixin


class MemoryLoader(QueryRangeMetric, FilterMetric):
class MemoryLoader(QueryRangeMetric, FilterJobsMixin, BatchedRequestMixin):
def get_query(self, object: K8sObjectData, resolution: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand All @@ -19,7 +21,7 @@ def get_query(self, object: K8sObjectData, resolution: str) -> str:
"""


class MaxMemoryLoader(QueryMetric, FilterMetric):
class MaxMemoryLoader(QueryMetric, FilterJobsMixin, BatchedRequestMixin):
def get_query(self, object: K8sObjectData, resolution: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand All @@ -36,7 +38,7 @@ def get_query(self, object: K8sObjectData, resolution: str) -> str:


def PercentileMemoryLoader(percentile: float) -> type[QueryMetric]:
class PercentileMemoryLoader(QueryMetric, FilterMetric):
class PercentileMemoryLoader(QueryMetric, FilterJobsMixin, BatchedRequestMixin):
def get_query(self, object: K8sObjectData, resolution: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,14 @@ async def gather_data(
self.debug(f"Gathering {LoaderClass.__name__} metric for {object}")

metric_loader = LoaderClass(self.config, self.prometheus, self.name, self.executor)
return await metric_loader.load_data(object, period, step)
data = await metric_loader.load_data(object, period, step)

if len(data) == 0:
self.warning(
f"{metric_loader.service_name} returned no {metric_loader.__class__.__name__} metrics for {object}"
)

return data

async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None:
"""
Expand All @@ -159,13 +166,14 @@ async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) ->
cluster_label = self.get_prometheus_cluster_label()
if object.kind == "Deployment":
replicasets = await self.query(
"kube_replicaset_owner{"
f'owner_name="{object.name}", '
f'owner_kind="Deployment", '
f'namespace="{object.namespace}"'
f"{cluster_label}"
"}"
f"[{period_literal}]"
f"""
kube_replicaset_owner{{
owner_name="{object.name}",
owner_kind="Deployment",
namespace="{object.namespace}"
{cluster_label}
}}[{period_literal}]
"""
)
pod_owners = [replicaset["metric"]["replicaset"] for replicaset in replicasets]
pod_owner_kind = "ReplicaSet"
Expand Down
6 changes: 2 additions & 4 deletions robusta_krr/strategies/simple.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from typing import Sequence
import numpy as np
import pydantic as pd

Expand All @@ -12,7 +11,6 @@
RunResult,
StrategySettings,
)
from robusta_krr.core.abstract.metrics import BaseMetric
from robusta_krr.core.integrations.prometheus.metrics import PercentileCPULoader, MaxMemoryLoader, PrometheusMetric


Expand All @@ -27,7 +25,7 @@ def calculate_memory_proposal(self, data: PodsTimeData) -> float:
if len(data_) == 0:
return float("NaN")

return max(data_) * (1 + self.memory_buffer_percentage / 100)
return np.max(data_) * (1 + self.memory_buffer_percentage / 100)

def calculate_cpu_proposal(self, data: PodsTimeData) -> float:
if len(data) == 0:
Expand All @@ -38,7 +36,7 @@ def calculate_cpu_proposal(self, data: PodsTimeData) -> float:
else:
data_ = list(data.values())[0][:, 1]

return np.percentile(data_, self.cpu_percentile)
return np.max(data_)

LeaveMyYard marked this conversation as resolved.
Show resolved Hide resolved

class SimpleStrategy(BaseStrategy[SimpleStrategySettings]):
Expand Down
Loading