diff --git a/README.md b/README.md index c06e3546..c533cbc1 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,20 @@ Read more about [how KRR works](#how-it-works) and [KRR vs Kubernetes VPA](#diff ## Installation +### Requirements + +KRR requires you to have Prometheus. + +Additionally to that, [kube-state-metrics](https://github.com/kubernetes/kube-state-metrics) needs to be running on your cluster, as KRR is dependant on those metrics: + +- `container_cpu_usage_seconds_total` +- `container_memory_working_set_bytes` +- `kube_replicaset_owner` +- `kube_pod_owner` +- `kube_pod_status_phase` + +_Note: If one of last three metrics is absent KRR will still work, but it will only consider currently-running pods when calculating recommendations. Historic pods that no longer exist in the cluster will not be taken into consideration._ + ### With brew (MacOS/Linux): 1. Add our tap: @@ -193,9 +207,10 @@ krr simple -v ``` Other helpful flags: -* `--cpu-min` Sets the minimum recommended cpu value in millicores -* `--mem-min` Sets the minimum recommended memory value in MB -* `--history_duration` The duration of the prometheus history data to use (in hours) + +- `--cpu-min` Sets the minimum recommended cpu value in millicores +- `--mem-min` Sets the minimum recommended memory value in MB +- `--history_duration` The duration of the prometheus history data to use (in hours) More specific information on Strategy Settings can be found using @@ -209,15 +224,14 @@ krr simple --help With the [free Robusta SaaS platform](https://home.robusta.dev/) you can: -* See why KRR recommends what it does -* Sort and filter recommendations by namespace, priority, and more -* Copy a YAML snippet to fix the problems KRR finds +- See why KRR recommends what it does +- Sort and filter recommendations by namespace, priority, and more +- Copy a YAML snippet to fix the problems KRR finds ![Robusta UI Screen Shot][ui-screenshot]
- ## How it works ### Metrics Gathering @@ -431,9 +445,10 @@ python krr.py simple -p "https://prom-api.coralogix..." --coralogix_token ## Grafana Cloud managed Prometheus -For Grafana Cloud managed Prometheus you need to specify prometheus link, prometheus user, and an access token of your Grafana Cloud stack. The Prometheus link and user for the stack can be found on the Grafana Cloud Portal. An access token with a `metrics:read` scope can also be created using Access Policies on the same portal. +For Grafana Cloud managed Prometheus you need to specify prometheus link, prometheus user, and an access token of your Grafana Cloud stack. The Prometheus link and user for the stack can be found on the Grafana Cloud Portal. An access token with a `metrics:read` scope can also be created using Access Policies on the same portal. Next, run the following command, after setting the values of PROM_URL, PROM_USER, and PROM_TOKEN variables with your Grafana Cloud stack's prometheus link, prometheus user, and access token. + ```sh python krr.py simple -p $PROM_URL --prometheus-auth-header "Bearer ${PROM_USER}:${PROM_TOKEN}" --prometheus-ssl-enabled ``` diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py index 8bb89a19..b6c85f48 100644 --- a/robusta_krr/core/integrations/kubernetes/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/__init__.py @@ -1,4 +1,5 @@ import asyncio +import logging from concurrent.futures import ThreadPoolExecutor from typing import AsyncGenerator, AsyncIterator, Callable, Optional, Union @@ -13,31 +14,32 @@ V1Job, V1LabelSelector, V1Pod, + V1PodList, V1StatefulSet, V2HorizontalPodAutoscaler, V2HorizontalPodAutoscalerList, ) -from robusta_krr.core.models.objects import HPAData, K8sObjectData, KindLiteral +from robusta_krr.core.models.config import settings +from robusta_krr.core.models.objects import HPAData, K8sObjectData, KindLiteral, PodData from robusta_krr.core.models.result import ResourceAllocations -from robusta_krr.utils.configurable import Configurable from . import config_patch as _ from .rollout import RolloutAppsV1Api +logger = logging.getLogger("krr") + AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job] HPAKey = tuple[str, str, str] -class ClusterLoader(Configurable): - def __init__(self, cluster: Optional[str], *args, **kwargs): - super().__init__(*args, **kwargs) - +class ClusterLoader: + def __init__(self, cluster: Optional[str]): self.cluster = cluster # This executor will be running requests to Kubernetes API - self.executor = ThreadPoolExecutor(self.config.max_workers) + self.executor = ThreadPoolExecutor(settings.max_workers) self.api_client = ( - config.new_client_from_config(context=cluster, config_file=self.config.kubeconfig) + config.new_client_from_config(context=cluster, config_file=settings.kubeconfig) if cluster is not None else None ) @@ -57,9 +59,9 @@ async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]: A list of scannable objects. """ - self.info(f"Listing scannable objects in {self.cluster}") - self.debug(f"Namespaces: {self.config.namespaces}") - self.debug(f"Resources: {self.config.resources}") + logger.info(f"Listing scannable objects in {self.cluster}") + logger.debug(f"Namespaces: {settings.namespaces}") + logger.debug(f"Resources: {settings.resources}") self.__hpa_list = await self._try_list_hpa() @@ -76,10 +78,24 @@ async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]: async with objects_combined.stream() as streamer: async for object in streamer: # NOTE: By default we will filter out kube-system namespace - if self.config.namespaces == "*" and object.namespace == "kube-system": + if settings.namespaces == "*" and object.namespace == "kube-system": continue yield object + async def list_pods(self, object: K8sObjectData) -> list[PodData]: + selector = self._build_selector_query(object._api_resource.spec.selector) + if selector is None: + return [] + + loop = asyncio.get_running_loop() + ret: V1PodList = await loop.run_in_executor( + self.executor, + lambda: self.core.list_namespaced_pod( + namespace=object._api_resource.metadata.namespace, label_selector=selector + ), + ) + return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items] + @staticmethod def _get_match_expression_filter(expression) -> str: if expression.operator.lower() == "exists": @@ -116,36 +132,37 @@ def __build_obj( container=container.name, allocations=ResourceAllocations.from_container(container), hpa=self.__hpa_list.get((namespace, kind, name)), + api_resource=item, ) def _should_list_resource(self, resource: str): - if self.config.resources == "*": + if settings.resources == "*": return True - return resource.lower() in self.config.resources + return resource.lower() in settings.resources async def _list_workflows( self, kind: KindLiteral, all_namespaces_request: Callable, namespaced_request: Callable ) -> AsyncIterator[K8sObjectData]: if not self._should_list_resource(kind): - self.debug(f"Skipping {kind}s in {self.cluster}") + logger.debug(f"Skipping {kind}s in {self.cluster}") return if kind == "Rollout" and not self.__rollouts_available: return - self.debug(f"Listing {kind}s in {self.cluster}") + logger.debug(f"Listing {kind}s in {self.cluster}") loop = asyncio.get_running_loop() try: - if self.config.namespaces == "*": + if settings.namespaces == "*": ret_multi = await loop.run_in_executor( self.executor, lambda: all_namespaces_request( watch=False, - label_selector=self.config.selector, + label_selector=settings.selector, ), ) - self.debug(f"Found {len(ret_multi.items)} {kind} in {self.cluster}") + logger.debug(f"Found {len(ret_multi.items)} {kind} in {self.cluster}") for item in ret_multi.items: for container in item.spec.template.spec.containers: yield self.__build_obj(item, container, kind) @@ -156,10 +173,10 @@ async def _list_workflows( lambda: namespaced_request( namespace=namespace, watch=False, - label_selector=self.config.selector, + label_selector=settings.selector, ), ) - for namespace in self.config.namespaces + for namespace in settings.namespaces ] total_items = 0 @@ -170,16 +187,15 @@ async def _list_workflows( for container in item.spec.template.spec.containers: yield self.__build_obj(item, container, kind) - self.debug(f"Found {total_items} {kind} in {self.cluster}") + logger.debug(f"Found {total_items} {kind} in {self.cluster}") except ApiException as e: if kind == "Rollout" and e.status in [400, 401, 403, 404]: if self.__rollouts_available: - self.debug(f"Rollout API not available in {self.cluster}") + logger.debug(f"Rollout API not available in {self.cluster}") self.__rollouts_available = False else: - self.error(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}") - self.debug_exception() - self.error("Will skip this object type and continue.") + logger.exception(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}") + logger.error("Will skip this object type and continue.") def _list_deployments(self) -> AsyncIterator[K8sObjectData]: return self._list_workflows( @@ -296,16 +312,18 @@ async def _try_list_hpa(self) -> dict[HPAKey, HPAData]: try: return await self.__list_hpa() except Exception as e: - self.error(f"Error trying to list hpa in cluster {self.cluster}: {e}") - self.debug_exception() - self.error( + logger.exception(f"Error trying to list hpa in cluster {self.cluster}: {e}") + logger.error( "Will assume that there are no HPA. " "Be careful as this may lead to inaccurate results if object actually has HPA." ) return {} -class KubernetesLoader(Configurable): +class KubernetesLoader: + def __init__(self) -> None: + self._cluster_loaders: dict[Optional[str], ClusterLoader] = {} + async def list_clusters(self) -> Optional[list[str]]: """List all clusters. @@ -313,44 +331,44 @@ async def list_clusters(self) -> Optional[list[str]]: A list of clusters. """ - if self.config.inside_cluster: - self.debug("Working inside the cluster") + if settings.inside_cluster: + logger.debug("Working inside the cluster") return None try: - contexts, current_context = config.list_kube_config_contexts(self.config.kubeconfig) + contexts, current_context = config.list_kube_config_contexts(settings.kubeconfig) except config.ConfigException: - if self.config.clusters is not None and self.config.clusters != "*": - self.warning("Could not load context from kubeconfig.") - self.warning(f"Falling back to clusters from CLI: {self.config.clusters}") - return self.config.clusters + if settings.clusters is not None and settings.clusters != "*": + logger.warning("Could not load context from kubeconfig.") + logger.warning(f"Falling back to clusters from CLI: {settings.clusters}") + return settings.clusters else: - self.error( + logger.error( "Could not load context from kubeconfig. " "Please check your kubeconfig file or pass -c flag with the context name." ) return None - self.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}") - self.debug(f"Current cluster: {current_context['name']}") + logger.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}") + logger.debug(f"Current cluster: {current_context['name']}") - self.debug(f"Configured clusters: {self.config.clusters}") + logger.debug(f"Configured clusters: {settings.clusters}") # None, empty means current cluster - if not self.config.clusters: + if not settings.clusters: return [current_context["name"]] # * means all clusters - if self.config.clusters == "*": + if settings.clusters == "*": return [context["name"] for context in contexts] - return [context["name"] for context in contexts if context["name"] in self.config.clusters] + return [context["name"] for context in contexts if context["name"] in settings.clusters] def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[ClusterLoader]: try: - return ClusterLoader(cluster=cluster, config=self.config) + return ClusterLoader(cluster=cluster) except Exception as e: - self.error(f"Could not load cluster {cluster} and will skip it: {e}") + logger.error(f"Could not load cluster {cluster} and will skip it: {e}") return None async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncIterator[K8sObjectData]: @@ -364,17 +382,25 @@ async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncIt else: _cluster_loaders = [self._try_create_cluster_loader(cluster) for cluster in clusters] - cluster_loaders = [cl for cl in _cluster_loaders if cl is not None] - if cluster_loaders == []: - self.error("Could not load any cluster.") + self.cluster_loaders = {cl.cluster: cl for cl in _cluster_loaders if cl is not None} + if self.cluster_loaders == {}: + logger.error("Could not load any cluster.") return # https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python # This will merge all the streams from all the cluster loaders into a single stream objects_combined = aiostream.stream.merge( - *[cluster_loader.list_scannable_objects() for cluster_loader in cluster_loaders] + *[cluster_loader.list_scannable_objects() for cluster_loader in self.cluster_loaders.values()] ) async with objects_combined.stream() as streamer: async for object in streamer: yield object + + async def load_pods(self, object: K8sObjectData) -> list[PodData]: + try: + cluster_loader = self.cluster_loaders[object.cluster] + except KeyError: + raise RuntimeError(f"Cluster loader for cluster {object.cluster} not found") from None + + return await cluster_loader.list_pods(object) diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index 5bcec23b..c0995360 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -1,6 +1,7 @@ from __future__ import annotations import datetime +import logging from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING, Optional @@ -8,8 +9,8 @@ from kubernetes.client.api_client import ApiClient from prometrix import MetricsNotFound, PrometheusNotFound -from robusta_krr.core.models.objects import K8sObjectData -from robusta_krr.utils.configurable import Configurable +from robusta_krr.core.models.config import settings +from robusta_krr.core.models.objects import K8sObjectData, PodData from .metrics_service.prometheus_metrics_service import PrometheusMetricsService from .metrics_service.thanos_metrics_service import ThanosMetricsService @@ -17,7 +18,8 @@ if TYPE_CHECKING: from robusta_krr.core.abstract.strategies import BaseStrategy, MetricsPodData - from robusta_krr.core.models.config import Config + +logger = logging.getLogger("krr") METRICS_SERVICES = { "Prometheus": PrometheusMetricsService, @@ -26,56 +28,54 @@ } -class PrometheusMetricsLoader(Configurable): - def __init__( - self, - config: Config, - *, - cluster: Optional[str] = None, - ) -> None: +class PrometheusMetricsLoader: + def __init__(self, *, cluster: Optional[str] = None) -> None: """ Initializes the Prometheus Loader. Args: - config (Config): The configuration object. cluster (Optional[str]): The name of the cluster. Defaults to None. """ - super().__init__(config=config) - - self.executor = ThreadPoolExecutor(self.config.max_workers) + self.executor = ThreadPoolExecutor(settings.max_workers) self.api_client = ( - k8s_config.new_client_from_config(config_file=self.config.kubeconfig, context=cluster) + k8s_config.new_client_from_config(config_file=settings.kubeconfig, context=cluster) if cluster is not None else None ) - loader = self.get_metrics_service(config, api_client=self.api_client, cluster=cluster) + loader = self.get_metrics_service(api_client=self.api_client, cluster=cluster) if loader is None: raise PrometheusNotFound("No Prometheus or metrics service found") self.loader = loader - self.info(f"{self.loader.name} connected successfully for {cluster or 'default'} cluster") + logger.info(f"{self.loader.name} connected successfully for {cluster or 'default'} cluster") def get_metrics_service( self, - config: Config, api_client: Optional[ApiClient] = None, cluster: Optional[str] = None, ) -> Optional[PrometheusMetricsService]: for service_name, metric_service_class in METRICS_SERVICES.items(): try: - loader = metric_service_class(config, api_client=api_client, cluster=cluster, executor=self.executor) + loader = metric_service_class(api_client=api_client, cluster=cluster, executor=self.executor) loader.check_connection() - self.echo(f"{service_name} found") + logger.info(f"{service_name} found") loader.validate_cluster_name() return loader except MetricsNotFound as e: - self.debug(f"{service_name} not found: {e}") + logger.debug(f"{service_name} not found: {e}") return None + async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> list[PodData]: + try: + return await self.loader.load_pods(object, period) + except Exception as e: + logger.exception(f"Failed to load pods for {object}: {e}") + return [] + async def gather_data( self, object: K8sObjectData, @@ -97,8 +97,6 @@ async def gather_data( ResourceHistoryData: The gathered resource history data. """ - await self.loader.load_pods(object, period) - return { MetricLoader.__name__: await self.loader.gather_data(object, MetricLoader, period, step) for MetricLoader in strategy.metrics diff --git a/robusta_krr/core/integrations/prometheus/metrics/base.py b/robusta_krr/core/integrations/prometheus/metrics/base.py index 81751aaf..ab095f89 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/base.py +++ b/robusta_krr/core/integrations/prometheus/metrics/base.py @@ -14,9 +14,8 @@ from robusta_krr.core.abstract.metrics import BaseMetric from robusta_krr.core.abstract.strategies import PodsTimeData -from robusta_krr.core.models.config import Config +from robusta_krr.core.models.config import settings from robusta_krr.core.models.objects import K8sObjectData -from robusta_krr.utils.configurable import Configurable class PrometheusSeries(TypedDict): @@ -37,7 +36,7 @@ class PrometheusMetricData(pd.BaseModel): type: QueryType -class PrometheusMetric(BaseMetric, Configurable): +class PrometheusMetric(BaseMetric): """ Base class for all metric loaders. @@ -63,12 +62,10 @@ class PrometheusMetric(BaseMetric, Configurable): def __init__( self, - config: Config, prometheus: CustomPrometheusConnect, service_name: str, executor: Optional[ThreadPoolExecutor] = None, ) -> None: - super().__init__(config) self.prometheus = prometheus self.service_name = service_name @@ -84,9 +81,9 @@ def get_prometheus_cluster_label(self) -> str: Returns: str: a promql safe label string for querying the cluster. """ - if self.config.prometheus_cluster_label is None: + if settings.prometheus_cluster_label is None: return "" - return f', {self.config.prometheus_label}="{self.config.prometheus_cluster_label}"' + return f', {settings.prometheus_label}="{settings.prometheus_cluster_label}"' @abc.abstractmethod def get_query(self, object: K8sObjectData, duration: str, step: str) -> str: @@ -237,7 +234,7 @@ def filter_prom_jobs_results( if len(relevant_kubelet_metric) == 1: return_list.append(relevant_kubelet_metric[0]) continue - sorted_relevant_series = sorted(relevant_series, key=lambda s: s["metric"].get("job"), reverse=False) + sorted_relevant_series = sorted(relevant_series, key=lambda s: s["metric"].get("job", ""), reverse=False) return_list.append(sorted_relevant_series[0]) return return_list diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py index 8b4beef5..9adb5b53 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py @@ -6,22 +6,19 @@ from kubernetes.client.api_client import ApiClient from robusta_krr.core.abstract.strategies import PodsTimeData -from robusta_krr.core.models.config import Config +from robusta_krr.core.models.config import settings from robusta_krr.core.models.objects import K8sObjectData -from robusta_krr.utils.configurable import Configurable from ..metrics import PrometheusMetric -class MetricsService(Configurable, abc.ABC): +class MetricsService(abc.ABC): def __init__( self, - config: Config, api_client: Optional[ApiClient] = None, cluster: Optional[str] = None, executor: Optional[ThreadPoolExecutor] = None, ) -> None: - super().__init__(config=config) self.api_client = api_client self.cluster = cluster or "default" self.executor = executor @@ -56,6 +53,6 @@ def get_prometheus_cluster_label(self) -> str: Returns: str: a promql safe label string for querying the cluster. """ - if self.config.prometheus_cluster_label is None: + if settings.prometheus_cluster_label is None: return "" - return f', {self.config.prometheus_label}="{self.config.prometheus_cluster_label}"' + return f', {settings.prometheus_label}="{settings.prometheus_cluster_label}"' diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 2ea99dc1..091041e4 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -1,5 +1,6 @@ import asyncio import datetime +import logging from concurrent.futures import ThreadPoolExecutor from typing import List, Optional @@ -8,7 +9,7 @@ from prometrix import PrometheusNotFound, get_custom_prometheus_connect from robusta_krr.core.abstract.strategies import PodsTimeData -from robusta_krr.core.models.config import Config +from robusta_krr.core.models.config import settings from robusta_krr.core.models.objects import K8sObjectData, PodData from robusta_krr.utils.batched import batched from robusta_krr.utils.service_discovery import MetricsServiceDiscovery @@ -17,6 +18,8 @@ from ..prometheus_utils import ClusterNotSpecifiedException, generate_prometheus_config from .base_metric_service import MetricsService +logger = logging.getLogger("krr") + class PrometheusDiscovery(MetricsServiceDiscovery): def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: @@ -50,22 +53,21 @@ class PrometheusMetricsService(MetricsService): def __init__( self, - config: Config, *, cluster: Optional[str] = None, api_client: Optional[ApiClient] = None, executor: Optional[ThreadPoolExecutor] = None, ) -> None: - super().__init__(config=config, api_client=api_client, cluster=cluster, executor=executor) + super().__init__(api_client=api_client, cluster=cluster, executor=executor) - self.info(f"Connecting to {self.name} for {self.cluster} cluster") + logger.info(f"Connecting to {self.name} for {self.cluster} cluster") - self.auth_header = self.config.prometheus_auth_header - self.ssl_enabled = self.config.prometheus_ssl_enabled + self.auth_header = settings.prometheus_auth_header + self.ssl_enabled = settings.prometheus_ssl_enabled - self.prometheus_discovery = self.service_discovery(config=self.config, api_client=self.api_client) + self.prometheus_discovery = self.service_discovery(api_client=self.api_client) - self.url = self.config.prometheus_url + self.url = settings.prometheus_url self.url = self.url or self.prometheus_discovery.find_metrics_url() if not self.url: @@ -74,15 +76,15 @@ def __init__( "\tTry using port-forwarding and/or setting the url manually (using the -p flag.)." ) - self.info(f"Using {self.name} at {self.url} for cluster {cluster or 'default'}") + logger.info(f"Using {self.name} at {self.url} for cluster {cluster or 'default'}") - headers = self.config.prometheus_other_headers + headers = settings.prometheus_other_headers if self.auth_header: headers |= {"Authorization": self.auth_header} - elif not self.config.inside_cluster and self.api_client is not None: + elif not settings.inside_cluster and self.api_client is not None: self.api_client.update_params_for_auth(headers, {}, ["BearerToken"]) - self.prom_config = generate_prometheus_config(config, url=self.url, headers=headers, metrics_service=self) + self.prom_config = generate_prometheus_config(url=self.url, headers=headers, metrics_service=self) self.prometheus = get_custom_prometheus_connect(self.prom_config) def check_connection(self): @@ -98,10 +100,10 @@ async def query(self, query: str) -> dict: return await loop.run_in_executor(self.executor, lambda: self.prometheus.custom_query(query=query)) def validate_cluster_name(self): - if not self.config.prometheus_cluster_label and not self.config.prometheus_label: + if not settings.prometheus_cluster_label and not settings.prometheus_label: return - cluster_label = self.config.prometheus_cluster_label + cluster_label = settings.prometheus_cluster_label cluster_names = self.get_cluster_names() if cluster_names is None or len(cluster_names) <= 1: @@ -119,9 +121,9 @@ def validate_cluster_name(self): def get_cluster_names(self) -> Optional[List[str]]: try: - return self.prometheus.get_label_values(label_name=self.config.prometheus_label) + return self.prometheus.get_label_values(label_name=settings.prometheus_label) except PrometheusApiClientException: - self.error("Labels api not present on prometheus client") + logger.error("Labels api not present on prometheus client") return [] async def gather_data( @@ -134,19 +136,24 @@ async def gather_data( """ ResourceHistoryData: The gathered resource history data. """ - self.debug(f"Gathering {LoaderClass.__name__} metric for {object}") + logger.debug(f"Gathering {LoaderClass.__name__} metric for {object}") - metric_loader = LoaderClass(self.config, self.prometheus, self.name, self.executor) + metric_loader = LoaderClass(self.prometheus, self.name, self.executor) data = await metric_loader.load_data(object, period, step) if len(data) == 0: - self.warning( + if "CPU" in LoaderClass.__name__: + object.add_warning("NoPrometheusCPUMetrics") + elif "Memory" in LoaderClass.__name__: + object.add_warning("NoPrometheusMemoryMetrics") + + logger.warning( f"{metric_loader.service_name} returned no {metric_loader.__class__.__name__} metrics for {object}" ) return data - async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None: + async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> list[PodData]: """ List pods related to the object and add them to the object's pods list. Args: @@ -154,7 +161,7 @@ async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> period (datetime.timedelta): The time period for which to gather data. """ - self.debug(f"Adding historic pods for {object}") + logger.debug(f"Adding historic pods for {object}") days_literal = min(int(period.total_seconds()) // 60 // 24, 32) period_literal = f"{days_literal}d" @@ -195,8 +202,7 @@ async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> ) if related_pods_result == []: - self.debug(f"No pods found for {object}") - return + return [] related_pods = [pod["metric"]["pod"] for pod in related_pods_result] current_pods_set = set() @@ -217,4 +223,4 @@ async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> current_pods_set |= {pod["metric"]["pod"] for pod in pods_status_result} del pods_status_result - object.pods = list({PodData(name=pod, deleted=pod not in current_pods_set) for pod in related_pods}) + return list({PodData(name=pod, deleted=pod not in current_pods_set) for pod in related_pods}) diff --git a/robusta_krr/core/integrations/prometheus/prometheus_utils.py b/robusta_krr/core/integrations/prometheus/prometheus_utils.py index 450c60c9..e40c2805 100644 --- a/robusta_krr/core/integrations/prometheus/prometheus_utils.py +++ b/robusta_krr/core/integrations/prometheus/prometheus_utils.py @@ -5,7 +5,7 @@ import boto3 from prometrix import AWSPrometheusConfig, CoralogixPrometheusConfig, PrometheusConfig, VictoriaMetricsPrometheusConfig -from robusta_krr.core.models.config import Config +from robusta_krr.core.models.config import settings if TYPE_CHECKING: from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import ( @@ -22,25 +22,25 @@ class ClusterNotSpecifiedException(Exception): def generate_prometheus_config( - config: Config, url: str, headers: dict[str, str], metrics_service: PrometheusMetricsService + url: str, headers: dict[str, str], metrics_service: PrometheusMetricsService ) -> PrometheusConfig: from .metrics_service.victoria_metrics_service import VictoriaMetricsService baseconfig = { "url": url, - "disable_ssl": not config.prometheus_ssl_enabled, + "disable_ssl": not settings.prometheus_ssl_enabled, "headers": headers, } # aws config - if config.eks_managed_prom: - session = boto3.Session(profile_name=config.eks_managed_prom_profile_name) + if settings.eks_managed_prom: + session = boto3.Session(profile_name=settings.eks_managed_prom_profile_name) credentials = session.get_credentials() credentials = credentials.get_frozen_credentials() - region = config.eks_managed_prom_region if config.eks_managed_prom_region else session.region_name - access_key = config.eks_access_key if config.eks_access_key else credentials.access_key - secret_key = config.eks_secret_key if config.eks_secret_key else credentials.secret_key - service_name = config.eks_service_name if config.eks_secret_key else "aps" + region = settings.eks_managed_prom_region if settings.eks_managed_prom_region else session.region_name + access_key = settings.eks_access_key if settings.eks_access_key else credentials.access_key + secret_key = settings.eks_secret_key if settings.eks_secret_key else credentials.secret_key + service_name = settings.eks_service_name if settings.eks_secret_key else "aps" if not region: raise Exception("No eks region specified") @@ -52,8 +52,8 @@ def generate_prometheus_config( **baseconfig, ) # coralogix config - if config.coralogix_token: - return CoralogixPrometheusConfig(**baseconfig, prometheus_token=config.coralogix_token) + if settings.coralogix_token: + return CoralogixPrometheusConfig(**baseconfig, prometheus_token=settings.coralogix_token) if isinstance(metrics_service, VictoriaMetricsService): return VictoriaMetricsPrometheusConfig(**baseconfig) return PrometheusConfig(**baseconfig) diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py index d170ef52..3d6aabc0 100644 --- a/robusta_krr/core/models/config.py +++ b/robusta_krr/core/models/config.py @@ -1,14 +1,21 @@ +from __future__ import annotations + +import logging +import sys from typing import Any, Literal, Optional, Union import pydantic as pd from kubernetes import config from kubernetes.config.config_exception import ConfigException from rich.console import Console +from rich.logging import RichHandler from robusta_krr.core.abstract import formatters from robusta_krr.core.abstract.strategies import AnyStrategy, BaseStrategy from robusta_krr.core.models.objects import KindLiteral +logger = logging.getLogger("krr") + class Config(pd.BaseSettings): quiet: bool = pd.Field(False) @@ -113,3 +120,34 @@ def load_kubeconfig(self) -> None: self.inside_cluster = False else: self.inside_cluster = True + + @staticmethod + def set_config(config: Config) -> None: + global _config + + _config = config + logging.basicConfig( + level="NOTSET", + format="%(message)s", + datefmt="[%X]", + handlers=[RichHandler(console=Console(file=sys.stderr if settings.log_to_stderr else sys.stdout))], + ) + logging.getLogger("").setLevel(logging.CRITICAL) + logger.setLevel(logging.DEBUG if config.verbose else logging.CRITICAL if config.quiet else logging.INFO) + + +# NOTE: This class is just a proxy for _config. +# Import settings from this module and use it like it is just a config object. +class _Settings(Config): # Config here is used for type checking + def __init__(self) -> None: + pass + + def __getattr__(self, name: str): + if _config is None: + raise AttributeError("Config is not set") + + return getattr(_config, name) + + +_config: Optional[Config] = None +settings = _Settings() diff --git a/robusta_krr/core/models/objects.py b/robusta_krr/core/models/objects.py index 7da48d72..62149c39 100644 --- a/robusta_krr/core/models/objects.py +++ b/robusta_krr/core/models/objects.py @@ -27,6 +27,13 @@ class HPAData(pd.BaseModel): target_memory_utilization_percentage: Optional[float] +PodWarning = Literal[ + "NoPrometheusPods", + "NoPrometheusCPUMetrics", + "NoPrometheusMemoryMetrics", +] + + class K8sObjectData(pd.BaseModel): # NOTE: Here None means that we are running inside the cluster cluster: Optional[str] @@ -37,6 +44,9 @@ class K8sObjectData(pd.BaseModel): namespace: str kind: KindLiteral allocations: ResourceAllocations + warnings: set[PodWarning] = set() + + _api_resource = pd.PrivateAttr(None) def __str__(self) -> str: return f"{self.kind} {self.namespace}/{self.name}/{self.container}" @@ -44,6 +54,9 @@ def __str__(self) -> str: def __hash__(self) -> int: return hash(str(self)) + def add_warning(self, warning: PodWarning) -> None: + self.warnings.add(warning) + @property def current_pods_count(self) -> int: return len([pod for pod in self.pods if not pod.deleted]) diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index bc22574c..312efdda 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -1,4 +1,5 @@ import asyncio +import logging import math import os import sys @@ -12,32 +13,33 @@ from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult from robusta_krr.core.integrations.kubernetes import KubernetesLoader from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, PrometheusMetricsLoader -from robusta_krr.core.models.config import Config +from robusta_krr.core.models.config import settings from robusta_krr.core.models.objects import K8sObjectData from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result, StrategyData -from robusta_krr.utils.configurable import Configurable from robusta_krr.utils.logo import ASCII_LOGO +from robusta_krr.utils.print import print from robusta_krr.utils.progress_bar import ProgressBar from robusta_krr.utils.version import get_version +logger = logging.getLogger("krr") -class Runner(Configurable): + +class Runner: EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound) - def __init__(self, config: Config) -> None: - super().__init__(config) - self._k8s_loader = KubernetesLoader(self.config) + def __init__(self) -> None: + self._k8s_loader = KubernetesLoader() self._metrics_service_loaders: dict[Optional[str], Union[PrometheusMetricsLoader, Exception]] = {} self._metrics_service_loaders_error_logged: set[Exception] = set() - self._strategy = self.config.create_strategy() + self._strategy = settings.create_strategy() # This executor will be running calculations for recommendations - self._executor = ThreadPoolExecutor(self.config.max_workers) + self._executor = ThreadPoolExecutor(settings.max_workers) def _get_prometheus_loader(self, cluster: Optional[str]) -> Optional[PrometheusMetricsLoader]: if cluster not in self._metrics_service_loaders: try: - self._metrics_service_loaders[cluster] = PrometheusMetricsLoader(self.config, cluster=cluster) + self._metrics_service_loaders[cluster] = PrometheusMetricsLoader(cluster=cluster) except Exception as e: self._metrics_service_loaders[cluster] = e @@ -45,7 +47,7 @@ def _get_prometheus_loader(self, cluster: Optional[str]) -> Optional[PrometheusM if isinstance(result, self.EXPECTED_EXCEPTIONS): if result not in self._metrics_service_loaders_error_logged: self._metrics_service_loaders_error_logged.add(result) - self.error(str(result)) + logger.error(str(result)) return None elif isinstance(result, Exception): raise result @@ -53,42 +55,46 @@ def _get_prometheus_loader(self, cluster: Optional[str]) -> Optional[PrometheusM return result def _greet(self) -> None: - self.echo(ASCII_LOGO, no_prefix=True) - self.echo(f"Running Robusta's KRR (Kubernetes Resource Recommender) {get_version()}", no_prefix=True) - self.echo(f"Using strategy: {self._strategy}", no_prefix=True) - self.echo(f"Using formatter: {self.config.format}", no_prefix=True) - self.echo(no_prefix=True) + if settings.quiet: + return + + print(ASCII_LOGO) + print(f"Running Robusta's KRR (Kubernetes Resource Recommender) {get_version()}") + print(f"Using strategy: {self._strategy}") + print(f"Using formatter: {settings.format}") + print("") def _process_result(self, result: Result) -> None: - Formatter = self.config.Formatter + Formatter = settings.Formatter formatted = result.format(Formatter) - self.echo("\n", no_prefix=True) - self.print_result(formatted, rich=getattr(Formatter, "__rich_console__", False)) - if (self.config.file_output) or (self.config.slack_output): - if self.config.file_output: - file_name = self.config.file_output - elif self.config.slack_output: - file_name = self.config.slack_output + rich = getattr(Formatter, "__rich_console__", False) + + print(formatted, rich=rich, force=True) + if settings.file_output or settings.slack_output: + if settings.file_output: + file_name = settings.file_output + elif settings.slack_output: + file_name = settings.slack_output with open(file_name, "w") as target_file: sys.stdout = target_file - self.print_result(formatted, rich=getattr(Formatter, "__rich_console__", False)) + print(formatted, rich=rich, force=True) sys.stdout = sys.stdout - if self.config.slack_output: + if settings.slack_output: client = WebClient(os.environ["SLACK_BOT_TOKEN"]) warnings.filterwarnings("ignore", category=UserWarning) client.files_upload( - channels=f"#{self.config.slack_output}", + channels=f"#{settings.slack_output}", title="KRR Report", file=f"./{file_name}", - initial_comment=f'Kubernetes Resource Report for {(" ".join(self.config.namespaces))}', + initial_comment=f'Kubernetes Resource Report for {(" ".join(settings.namespaces))}', ) os.remove(file_name) def __get_resource_minimal(self, resource: ResourceType) -> float: if resource == ResourceType.CPU: - return 1 / 1000 * self.config.cpu_min_value + return 1 / 1000 * settings.cpu_min_value elif resource == ResourceType.Memory: - return 1024**2 * self.config.memory_min_value + return 1024**2 * settings.memory_min_value else: return 0 @@ -128,6 +134,21 @@ async def _calculate_object_recommendations(self, object: K8sObjectData) -> RunR if prometheus_loader is None: return {resource: ResourceRecommendation.undefined("Prometheus not found") for resource in ResourceType} + object.pods = await prometheus_loader.load_pods(object, self._strategy.settings.history_timedelta) + if object.pods == []: + # Fallback to Kubernetes API + object.pods = await self._k8s_loader.load_pods(object) + + # NOTE: Kubernetes API returned pods, but Prometheus did not + if object.pods != []: + object.add_warning("NoPrometheusPods") + logger.warning( + f"Was not able to load any pods for {object} from Prometheus.\n\t" + "This could mean that Prometheus is missing some required metrics.\n\t" + "Loaded pods from Kubernetes API instead.\n\t" + "See more info at https://github.com/robusta-dev/krr#requirements " + ) + metrics = await prometheus_loader.gather_data( object, self._strategy, @@ -135,12 +156,12 @@ async def _calculate_object_recommendations(self, object: K8sObjectData) -> RunR step=self._strategy.settings.timeframe_timedelta, ) - self.debug(f"Calculating recommendations for {object} with {len(metrics)} metrics") - # NOTE: We run this in a threadpool as the strategy calculation might be CPU intensive # But keep in mind that numpy calcluations will not block the GIL loop = asyncio.get_running_loop() result = await loop.run_in_executor(self._executor, self._strategy.run, metrics, object) + + logger.info(f"Calculated recommendations for {object} (using {len(metrics)} metrics)") return self._format_result(result) async def _gather_object_allocations(self, k8s_object: K8sObjectData) -> ResourceScan: @@ -159,16 +180,16 @@ async def _gather_object_allocations(self, k8s_object: K8sObjectData) -> Resourc async def _collect_result(self) -> Result: clusters = await self._k8s_loader.list_clusters() - if clusters and len(clusters) > 1 and self.config.prometheus_url: + if clusters and len(clusters) > 1 and settings.prometheus_url: # this can only happen for multi-cluster querying a single centeralized prometheus # In this scenario we dont yet support determining which metrics belong to which cluster so the reccomendation can be incorrect raise ClusterNotSpecifiedException( f"Cannot scan multiple clusters for this prometheus, Rerun with the flag `-c