Skip to content

Commit

Permalink
Merge pull request #116 from robusta-dev/strategy-metric-usage-refact…
Browse files Browse the repository at this point in the history
…oring

Refactor strategy dependancy on metrics
  • Loading branch information
LeaveMyYard authored Aug 2, 2023
2 parents 7b04d23 + 81d640c commit d202105
Show file tree
Hide file tree
Showing 24 changed files with 518 additions and 597 deletions.
9 changes: 7 additions & 2 deletions examples/custom_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import pydantic as pd

import robusta_krr
from robusta_krr.api.models import HistoryData, K8sObjectData, ResourceRecommendation, ResourceType, RunResult
from robusta_krr.api.models import MetricsPodData, K8sObjectData, ResourceRecommendation, ResourceType, RunResult
from robusta_krr.api.strategies import BaseStrategy, StrategySettings
from robusta_krr.core.integrations.prometheus.metrics import MaxCPULoader, MaxMemoryLoader


# Providing description to the settings will make it available in the CLI help
Expand All @@ -19,7 +20,11 @@ class CustomStrategy(BaseStrategy[CustomStrategySettings]):
Made only in order to demonstrate how to create a custom strategy.
"""

def run(self, history_data: HistoryData, object_data: K8sObjectData) -> RunResult:
display_name = "custom" # The name of the strategy
rich_console = True # Whether to use rich console for the CLI
metrics = [MaxCPULoader, MaxMemoryLoader] # The metrics to use for the strategy

def run(self, history_data: MetricsPodData, object_data: K8sObjectData) -> RunResult:
return {
ResourceType.CPU: ResourceRecommendation(request=self.settings.param_1, limit=None),
ResourceType.Memory: ResourceRecommendation(request=self.settings.param_2, limit=self.settings.param_2),
Expand Down
11 changes: 8 additions & 3 deletions robusta_krr/api/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from robusta_krr.core.abstract.strategies import HistoryData, ResourceHistoryData, ResourceRecommendation, RunResult
from robusta_krr.core.abstract.strategies import (
PodsTimeData,
MetricsPodData,
ResourceRecommendation,
RunResult,
)
from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType
from robusta_krr.core.models.objects import K8sObjectData, PodData
from robusta_krr.core.models.result import ResourceScan, Result
Expand All @@ -15,7 +20,7 @@
"register_severity_calculator",
"ResourceScan",
"ResourceRecommendation",
"HistoryData",
"ResourceHistoryData",
"PodsTimeData",
"MetricsPodData",
"RunResult",
]
21 changes: 21 additions & 0 deletions robusta_krr/core/abstract/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from abc import ABC, abstractmethod
import datetime

from robusta_krr.core.abstract.strategies import PodsTimeData
from robusta_krr.core.models.objects import K8sObjectData


class BaseMetric(ABC):
"""
This abstraction is done for a future use.
Currently we only scrape metrics from Prometheus,
but in the future we may want to support other metric sources like Datadog, etc.
TODO: When we want to support other metric sources, we should maybe rethink an interface here.
"""

@abstractmethod
async def load_data(
self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta
) -> PodsTimeData:
...
50 changes: 25 additions & 25 deletions robusta_krr/core/abstract/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
import abc
import datetime
from textwrap import dedent
from typing import Annotated, Generic, Literal, Optional, TypeVar, get_args
from typing import Annotated, Generic, Literal, Optional, TypeVar, get_args, TYPE_CHECKING, Sequence

import numpy as np
import pydantic as pd
from numpy.typing import NDArray

from robusta_krr.core.models.result import K8sObjectData, Metric, ResourceType
from robusta_krr.utils.display_name import display_name_property
from robusta_krr.core.models.result import K8sObjectData, ResourceType

if TYPE_CHECKING:
from robusta_krr.core.integrations.prometheus.metrics import PrometheusMetric
from robusta_krr.core.abstract.metrics import BaseMetric # noqa: F401

SelfRR = TypeVar("SelfRR", bound="ResourceRecommendation")

Expand Down Expand Up @@ -61,21 +64,9 @@ def timeframe_timedelta(self) -> datetime.timedelta:
ArrayNx2 = Annotated[NDArray[np.float64], Literal["N", 2]]


class ResourceHistoryData(pd.BaseModel):
"""A class to represent resource history data.
metric is the metric information used to gather the history data.
data is a mapping from pod to a numpy array of time and value.
"""

metric: Metric
data: dict[str, ArrayNx2] # Mapping: pod -> [(time, value)]

class Config:
arbitrary_types_allowed = True

PodsTimeData = dict[str, ArrayNx2] # Mapping: pod -> [(time, value)]
MetricsPodData = dict[str, PodsTimeData]

HistoryData = dict[ResourceType, ResourceHistoryData]
RunResult = dict[ResourceType, ResourceRecommendation]

SelfBS = TypeVar("SelfBS", bound="BaseStrategy")
Expand All @@ -85,7 +76,6 @@ class Config:
# An abstract base class for strategy implementation.
# This class requires implementation of a 'run' method for calculating recommendation.
# Make a subclass if you want to create a concrete strategy.
@display_name_property(suffix="Strategy")
class BaseStrategy(abc.ABC, Generic[_StrategySettings]):
"""An abstract base class for strategy implementation.
Expand All @@ -98,20 +88,29 @@ class BaseStrategy(abc.ABC, Generic[_StrategySettings]):
Description property uses the docstring of the strategy class and the settings of the strategy.
The name of the strategy is the name of the class in lowercase, without the 'Strategy' suffix, if exists.
If you want to change the name of the strategy, you can change the __display_name__ attribute.
If you want to change the name of the strategy, you can change the display_name class attribute.
The strategy will automatically be registered in the strategy registry using __subclasses__ mechanism.
"""

__display_name__: str
display_name: str
rich_console: bool = False

settings: _StrategySettings
# TODO: this should be BaseMetric, but currently we only support Prometheus
@property
@abc.abstractmethod
def metrics(self) -> Sequence[type[PrometheusMetric]]:
pass

def __init__(self, settings: _StrategySettings):
self.settings = settings

def __str__(self) -> str:
return self.__display_name__.title()
return self._display_name.title()

@property
def _display_name(self) -> str:
return getattr(self, "display_name", self.__class__.__name__.lower().removeprefix("strategy"))

@property
def description(self) -> Optional[str]:
Expand All @@ -129,7 +128,7 @@ def description(self) -> Optional[str]:
# Abstract method that needs to be implemented by subclass.
# This method is intended to calculate resource recommendation based on history data and kubernetes object data.
@abc.abstractmethod
def run(self, history_data: HistoryData, object_data: K8sObjectData) -> RunResult:
def run(self, history_data: MetricsPodData, object_data: K8sObjectData) -> RunResult:
pass

# This method is intended to return a strategy by its name.
Expand All @@ -146,7 +145,7 @@ def find(cls: type[SelfBS], name: str) -> type[SelfBS]:
def get_all(cls: type[SelfBS]) -> dict[str, type[SelfBS]]:
from robusta_krr import strategies as _ # noqa: F401

return {sub_cls.__display_name__.lower(): sub_cls for sub_cls in cls.__subclasses__()}
return {sub_cls.display_name.lower(): sub_cls for sub_cls in cls.__subclasses__()}

# This method is intended to return the type of settings used in strategy.
@classmethod
Expand All @@ -161,7 +160,8 @@ def get_settings_type(cls) -> type[StrategySettings]:
"AnyStrategy",
"BaseStrategy",
"StrategySettings",
"HistoryData",
"PodsTimeData",
"MetricsPodData",
"K8sObjectData",
"ResourceType",
]
2 changes: 1 addition & 1 deletion robusta_krr/core/integrations/prometheus/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .loader import MetricsLoader
from .loader import PrometheusMetricsLoader
from .metrics_service.prometheus_metrics_service import PrometheusDiscovery, PrometheusNotFound
from .prometheus_client import CustomPrometheusConnect, ClusterNotSpecifiedException
30 changes: 19 additions & 11 deletions robusta_krr/core/integrations/prometheus/loader.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
from __future__ import annotations

import datetime
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
from typing import Optional, TYPE_CHECKING

from kubernetes import config as k8s_config
from kubernetes.client.api_client import ApiClient

from robusta_krr.core.abstract.strategies import ResourceHistoryData
from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData
from robusta_krr.core.models.result import ResourceType
from robusta_krr.utils.configurable import Configurable

from .metrics_service.base_metric_service import MetricsNotFound, MetricsService
from .metrics_service.base_metric_service import MetricsNotFound
from .metrics_service.prometheus_metrics_service import PrometheusMetricsService, PrometheusNotFound
from .metrics_service.thanos_metrics_service import ThanosMetricsService
from .metrics_service.victoria_metrics_service import VictoriaMetricsService

if TYPE_CHECKING:
from robusta_krr.core.abstract.strategies import MetricsPodData, BaseStrategy
from robusta_krr.core.models.config import Config

METRICS_SERVICES = {
"Prometheus": PrometheusMetricsService,
"Victoria Metrics": VictoriaMetricsService,
"Thanos": ThanosMetricsService,
}


class MetricsLoader(Configurable):
class PrometheusMetricsLoader(Configurable):
def __init__(
self,
config: Config,
Expand Down Expand Up @@ -53,14 +56,14 @@ def __init__(

self.loader = loader

self.info(f"{self.loader.name()} connected successfully for {cluster or 'default'} cluster")
self.info(f"{self.loader.name} connected successfully for {cluster or 'default'} cluster")

def get_metrics_service(
self,
config: Config,
api_client: Optional[ApiClient] = None,
cluster: Optional[str] = None,
) -> Optional[MetricsService]:
) -> Optional[PrometheusMetricsService]:
for service_name, metric_service_class in METRICS_SERVICES.items():
try:
loader = metric_service_class(config, api_client=api_client, cluster=cluster, executor=self.executor)
Expand All @@ -76,11 +79,11 @@ def get_metrics_service(
async def gather_data(
self,
object: K8sObjectData,
resource: ResourceType,
strategy: BaseStrategy,
period: datetime.timedelta,
*,
step: datetime.timedelta = datetime.timedelta(minutes=30),
) -> ResourceHistoryData:
) -> MetricsPodData:
"""
Gathers data from Prometheus for a specified object and resource.
Expand All @@ -94,4 +97,9 @@ async def gather_data(
ResourceHistoryData: The gathered resource history data.
"""

return await self.loader.gather_data(object, resource, period, step)
await self.loader.add_historic_pods(object, period)

return {
MetricLoader.__name__: await self.loader.gather_data(object, MetricLoader, period, step)
for MetricLoader in strategy.metrics
}
6 changes: 3 additions & 3 deletions robusta_krr/core/integrations/prometheus/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .base_metric import BaseMetricLoader, bind_metric
from .cpu_metric import CPUMetricLoader
from .memory_metric import MemoryMetricLoader
from .cpu import CPULoader, MaxCPULoader, PercentileCPULoader
from .memory import MemoryLoader, MaxMemoryLoader, PercentileMemoryLoader
from .base import PrometheusMetric
Loading

0 comments on commit d202105

Please sign in to comment.