Skip to content

Commit

Permalink
History check (#212)
Browse files Browse the repository at this point in the history
* Warning on fresh prometheus

* Fix typing errors, fix running from in-cluster

* Update for new prometrix version

* Update prometheus_metrics_service to new prometrix

---------

Co-authored-by: LeaveMyYard <[email protected]>
  • Loading branch information
LeaveMyYard and LeaveMyYard authored Mar 4, 2024
1 parent 6ddae62 commit f38e33c
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 15 deletions.
5 changes: 5 additions & 0 deletions robusta_krr/core/abstract/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ def history_timedelta(self) -> datetime.timedelta:
def timeframe_timedelta(self) -> datetime.timedelta:
return datetime.timedelta(minutes=self.timeframe_duration)

def history_range_enough(self, history_range: tuple[datetime.timedelta, datetime.timedelta]) -> bool:
"""Override this function to check if the history range is enough for the strategy."""

return True


# A type alias for a numpy array of shape (N, 2).
ArrayNx2 = Annotated[NDArray[np.float64], Literal["N", 2]]
Expand Down
4 changes: 3 additions & 1 deletion robusta_krr/core/integrations/kubernetes/config_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

from __future__ import annotations

from typing import Optional

from kubernetes.client import configuration
from kubernetes.config import kube_config

Expand All @@ -25,7 +27,7 @@ def _set_config(self, client_configuration: Configuration):
class Configuration(configuration.Configuration):
def __init__(
self,
proxy: str | None = None,
proxy: Optional[str] = None,
**kwargs,
):
super().__init__(**kwargs)
Expand Down
7 changes: 6 additions & 1 deletion robusta_krr/core/integrations/prometheus/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,15 @@ def get_metrics_service(
loader.validate_cluster_name()
return loader
except MetricsNotFound as e:
logger.debug(f"{service_name} not found: {e}")
logger.info(f"{service_name} not found: {e}")

return None

async def get_history_range(
self, history_duration: datetime.timedelta
) -> Optional[tuple[datetime.datetime, datetime.datetime]]:
return await self.loader.get_history_range(history_duration)

async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> list[PodData]:
try:
return await self.loader.load_pods(object, period)
Expand Down
2 changes: 1 addition & 1 deletion robusta_krr/core/integrations/prometheus/metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ async def load_data(
duration_str = self._step_to_string(period)

query = self.get_query(object, duration_str, step_str)
end_time = datetime.datetime.now().replace(second=0, microsecond=0).astimezone()
end_time = datetime.datetime.utcnow().replace(second=0, microsecond=0)
start_time = end_time - period

# Here if we split the object into multiple sub-objects, we query each sub-object recursively.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import datetime
import logging
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from typing import List, Optional

from kubernetes.client import ApiClient
Expand Down Expand Up @@ -108,7 +108,19 @@ def check_connection(self):

async def query(self, query: str) -> dict:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(self.executor, lambda: self.prometheus.safe_custom_query(query=query))
return await loop.run_in_executor(
self.executor,
lambda: self.prometheus.safe_custom_query(query=query)["result"],
)

async def query_range(self, query: str, start: datetime, end: datetime, step: timedelta) -> dict:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self.executor,
lambda: self.prometheus.safe_custom_query_range(
query=query, start_time=start, end_time=end, step=f"{step.seconds}s"
)["result"],
)

def validate_cluster_name(self):
if not settings.prometheus_cluster_label and not settings.prometheus_label:
Expand Down Expand Up @@ -137,12 +149,34 @@ def get_cluster_names(self) -> Optional[List[str]]:
logger.error("Labels api not present on prometheus client")
return []

async def get_history_range(self, history_duration: timedelta) -> tuple[datetime, datetime]:
"""
Get the history range from Prometheus, based on container_memory_working_set_bytes.
Returns:
float: The first history point.
"""

now = datetime.now()
result = await self.query_range(
"max(container_memory_working_set_bytes)",
start=now - history_duration,
end=now,
step=timedelta(hours=1),
)
try:
values = result[0]["values"]
start, end = values[0][0], values[-1][0]
return datetime.fromtimestamp(start), datetime.fromtimestamp(end)
except (KeyError, IndexError) as e:
logger.debug(f"Returned from get_history_range: {result}")
raise ValueError("Error while getting history range") from e

async def gather_data(
self,
object: K8sObjectData,
LoaderClass: type[PrometheusMetric],
period: datetime.timedelta,
step: datetime.timedelta = datetime.timedelta(minutes=30),
period: timedelta,
step: timedelta = timedelta(minutes=30),
) -> PodsTimeData:
"""
ResourceHistoryData: The gathered resource history data.
Expand All @@ -164,12 +198,12 @@ async def gather_data(

return data

async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> list[PodData]:
async def load_pods(self, object: K8sObjectData, period: timedelta) -> list[PodData]:
"""
List pods related to the object and add them to the object's pods list.
Args:
object (K8sObjectData): The Kubernetes object.
period (datetime.timedelta): The time period for which to gather data.
period (timedelta): The time period for which to gather data.
"""

logger.debug(f"Adding historic pods for {object}")
Expand Down
1 change: 1 addition & 0 deletions robusta_krr/core/models/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class Result(pd.BaseModel):
resources: list[str] = ["cpu", "memory"]
description: Optional[str] = None
strategy: StrategyData
errors: list[dict[str, Any]] = pd.Field(default_factory=list)

def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
Expand Down
59 changes: 53 additions & 6 deletions robusta_krr/core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def __init__(self) -> None:
self._metrics_service_loaders_error_logged: set[Exception] = set()
self._strategy = settings.create_strategy()

self.errors: list[dict] = []

# This executor will be running calculations for recommendations
self._executor = ThreadPoolExecutor(settings.max_workers)

Expand Down Expand Up @@ -73,6 +75,8 @@ def _greet(self) -> None:
custom_print("")

def _process_result(self, result: Result) -> None:
result.errors = self.errors

Formatter = settings.Formatter
formatted = result.format(Formatter)
rich = getattr(Formatter, "__rich_console__", False)
Expand Down Expand Up @@ -149,13 +153,12 @@ async def _calculate_object_recommendations(self, object: K8sObjectData) -> RunR
object.pods = await self._k8s_loader.load_pods(object)

# NOTE: Kubernetes API returned pods, but Prometheus did not
# This might happen with fast executing jobs
if object.pods != []:
object.add_warning("NoPrometheusPods")
logger.warning(
f"Was not able to load any pods for {object} from Prometheus.\n\t"
"This could mean that Prometheus is missing some required metrics.\n\t"
"Loaded pods from Kubernetes API instead.\n\t"
"See more info at https://github.com/robusta-dev/krr#requirements "
f"Was not able to load any pods for {object} from Prometheus. "
"Loaded pods from Kubernetes API instead."
)

metrics = await prometheus_loader.gather_data(
Expand All @@ -173,6 +176,43 @@ async def _calculate_object_recommendations(self, object: K8sObjectData) -> RunR
logger.info(f"Calculated recommendations for {object} (using {len(metrics)} metrics)")
return self._format_result(result)

async def _check_data_availability(self, cluster: Optional[str]) -> None:
prometheus_loader = self._get_prometheus_loader(cluster)
if prometheus_loader is None:
return

try:
history_range = await prometheus_loader.get_history_range(self._strategy.settings.history_timedelta)
except ValueError:
logger.exception(f"Was not able to get history range for cluster {cluster}")
self.errors.append(
{
"name": "HistoryRangeError",
}
)
return

logger.debug(f"History range for {cluster}: {history_range}")
enough_data = self._strategy.settings.history_range_enough(history_range)

if not enough_data:
logger.error(f"Not enough history available for cluster {cluster}.")
try_after = history_range[0] + self._strategy.settings.history_timedelta

logger.error(
"If the cluster is freshly installed, it might take some time for the enough data to be available."
)
logger.error(
f"Enough data is estimated to be available after {try_after}, "
"but will try to calculate recommendations anyway."
)
self.errors.append(
{
"name": "NotEnoughHistoryAvailable",
"retry_after": try_after,
}
)

async def _gather_object_allocations(self, k8s_object: K8sObjectData) -> ResourceScan:
recommendation = await self._calculate_object_recommendations(k8s_object)

Expand All @@ -191,13 +231,20 @@ async def _collect_result(self) -> Result:
clusters = await self._k8s_loader.list_clusters()
if clusters and len(clusters) > 1 and settings.prometheus_url:
# this can only happen for multi-cluster querying a single centeralized prometheus
# In this scenario we dont yet support determining which metrics belong to which cluster so the reccomendation can be incorrect
# In this scenario we dont yet support determining
# which metrics belong to which cluster so the reccomendation can be incorrect
raise ClusterNotSpecifiedException(
f"Cannot scan multiple clusters for this prometheus, Rerun with the flag `-c <cluster>` where <cluster> is one of {clusters}"
f"Cannot scan multiple clusters for this prometheus, "
f"Rerun with the flag `-c <cluster>` where <cluster> is one of {clusters}"
)

logger.info(f'Using clusters: {clusters if clusters is not None else "inner cluster"}')

if clusters is None:
await self._check_data_availability(None)
else:
await asyncio.gather(*[self._check_data_availability(cluster) for cluster in clusters])

with ProgressBar(title="Calculating Recommendation") as self.__progressbar:
scans_tasks = [
asyncio.create_task(self._gather_object_allocations(k8s_object))
Expand Down
6 changes: 6 additions & 0 deletions robusta_krr/strategies/simple.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import timedelta

import numpy as np
import pydantic as pd

Expand Down Expand Up @@ -47,6 +49,10 @@ def calculate_cpu_proposal(self, data: PodsTimeData) -> float:

return np.max(data_)

def history_range_enough(self, history_range: tuple[timedelta, timedelta]) -> bool:
start, end = history_range
return min(end - start, self.history_timedelta) / self.timeframe_timedelta >= self.points_required


class SimpleStrategy(BaseStrategy[SimpleStrategySettings]):
"""
Expand Down

0 comments on commit f38e33c

Please sign in to comment.