Skip to content

Commit

Permalink
Merge pull request #150 from robusta-dev/fallback-get-pods
Browse files Browse the repository at this point in the history
Fallback to K8S API for getting pods, refactor logging for new warnings
  • Loading branch information
LeaveMyYard authored Oct 20, 2023
2 parents 1dbe77a + 0399758 commit 0f16f93
Show file tree
Hide file tree
Showing 17 changed files with 419 additions and 358 deletions.
31 changes: 23 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ Read more about [how KRR works](#how-it-works) and [KRR vs Kubernetes VPA](#diff

## Installation

### Requirements

KRR requires you to have Prometheus.

Additionally to that, [kube-state-metrics](https://github.com/kubernetes/kube-state-metrics) needs to be running on your cluster, as KRR is dependant on those metrics:

- `container_cpu_usage_seconds_total`
- `container_memory_working_set_bytes`
- `kube_replicaset_owner`
- `kube_pod_owner`
- `kube_pod_status_phase`

_Note: If one of last three metrics is absent KRR will still work, but it will only consider currently-running pods when calculating recommendations. Historic pods that no longer exist in the cluster will not be taken into consideration._

### With brew (MacOS/Linux):

1. Add our tap:
Expand Down Expand Up @@ -193,9 +207,10 @@ krr simple -v
```

Other helpful flags:
* `--cpu-min` Sets the minimum recommended cpu value in millicores
* `--mem-min` Sets the minimum recommended memory value in MB
* `--history_duration` The duration of the prometheus history data to use (in hours)

- `--cpu-min` Sets the minimum recommended cpu value in millicores
- `--mem-min` Sets the minimum recommended memory value in MB
- `--history_duration` The duration of the prometheus history data to use (in hours)

More specific information on Strategy Settings can be found using

Expand All @@ -209,15 +224,14 @@ krr simple --help

With the [free Robusta SaaS platform](https://home.robusta.dev/) you can:

* See why KRR recommends what it does
* Sort and filter recommendations by namespace, priority, and more
* Copy a YAML snippet to fix the problems KRR finds
- See why KRR recommends what it does
- Sort and filter recommendations by namespace, priority, and more
- Copy a YAML snippet to fix the problems KRR finds

![Robusta UI Screen Shot][ui-screenshot]

<p align="right">(<a href="#readme-top">back to top</a>)</p>


## How it works

### Metrics Gathering
Expand Down Expand Up @@ -431,9 +445,10 @@ python krr.py simple -p "https://prom-api.coralogix..." --coralogix_token

## Grafana Cloud managed Prometheus

For Grafana Cloud managed Prometheus you need to specify prometheus link, prometheus user, and an access token of your Grafana Cloud stack. The Prometheus link and user for the stack can be found on the Grafana Cloud Portal. An access token with a `metrics:read` scope can also be created using Access Policies on the same portal.
For Grafana Cloud managed Prometheus you need to specify prometheus link, prometheus user, and an access token of your Grafana Cloud stack. The Prometheus link and user for the stack can be found on the Grafana Cloud Portal. An access token with a `metrics:read` scope can also be created using Access Policies on the same portal.

Next, run the following command, after setting the values of PROM_URL, PROM_USER, and PROM_TOKEN variables with your Grafana Cloud stack's prometheus link, prometheus user, and access token.

```sh
python krr.py simple -p $PROM_URL --prometheus-auth-header "Bearer ${PROM_USER}:${PROM_TOKEN}" --prometheus-ssl-enabled
```
Expand Down
126 changes: 76 additions & 50 deletions robusta_krr/core/integrations/kubernetes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import AsyncGenerator, AsyncIterator, Callable, Optional, Union

Expand All @@ -13,31 +14,32 @@
V1Job,
V1LabelSelector,
V1Pod,
V1PodList,
V1StatefulSet,
V2HorizontalPodAutoscaler,
V2HorizontalPodAutoscalerList,
)

from robusta_krr.core.models.objects import HPAData, K8sObjectData, KindLiteral
from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import HPAData, K8sObjectData, KindLiteral, PodData
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.configurable import Configurable

from . import config_patch as _
from .rollout import RolloutAppsV1Api

logger = logging.getLogger("krr")

AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job]
HPAKey = tuple[str, str, str]


class ClusterLoader(Configurable):
def __init__(self, cluster: Optional[str], *args, **kwargs):
super().__init__(*args, **kwargs)

class ClusterLoader:
def __init__(self, cluster: Optional[str]):
self.cluster = cluster
# This executor will be running requests to Kubernetes API
self.executor = ThreadPoolExecutor(self.config.max_workers)
self.executor = ThreadPoolExecutor(settings.max_workers)
self.api_client = (
config.new_client_from_config(context=cluster, config_file=self.config.kubeconfig)
config.new_client_from_config(context=cluster, config_file=settings.kubeconfig)
if cluster is not None
else None
)
Expand All @@ -57,9 +59,9 @@ async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]:
A list of scannable objects.
"""

self.info(f"Listing scannable objects in {self.cluster}")
self.debug(f"Namespaces: {self.config.namespaces}")
self.debug(f"Resources: {self.config.resources}")
logger.info(f"Listing scannable objects in {self.cluster}")
logger.debug(f"Namespaces: {settings.namespaces}")
logger.debug(f"Resources: {settings.resources}")

self.__hpa_list = await self._try_list_hpa()

Expand All @@ -76,10 +78,24 @@ async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]:
async with objects_combined.stream() as streamer:
async for object in streamer:
# NOTE: By default we will filter out kube-system namespace
if self.config.namespaces == "*" and object.namespace == "kube-system":
if settings.namespaces == "*" and object.namespace == "kube-system":
continue
yield object

async def list_pods(self, object: K8sObjectData) -> list[PodData]:
selector = self._build_selector_query(object._api_resource.spec.selector)
if selector is None:
return []

loop = asyncio.get_running_loop()
ret: V1PodList = await loop.run_in_executor(
self.executor,
lambda: self.core.list_namespaced_pod(
namespace=object._api_resource.metadata.namespace, label_selector=selector
),
)
return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items]

@staticmethod
def _get_match_expression_filter(expression) -> str:
if expression.operator.lower() == "exists":
Expand Down Expand Up @@ -116,36 +132,37 @@ def __build_obj(
container=container.name,
allocations=ResourceAllocations.from_container(container),
hpa=self.__hpa_list.get((namespace, kind, name)),
api_resource=item,
)

def _should_list_resource(self, resource: str):
if self.config.resources == "*":
if settings.resources == "*":
return True
return resource.lower() in self.config.resources
return resource.lower() in settings.resources

async def _list_workflows(
self, kind: KindLiteral, all_namespaces_request: Callable, namespaced_request: Callable
) -> AsyncIterator[K8sObjectData]:
if not self._should_list_resource(kind):
self.debug(f"Skipping {kind}s in {self.cluster}")
logger.debug(f"Skipping {kind}s in {self.cluster}")
return

if kind == "Rollout" and not self.__rollouts_available:
return

self.debug(f"Listing {kind}s in {self.cluster}")
logger.debug(f"Listing {kind}s in {self.cluster}")
loop = asyncio.get_running_loop()

try:
if self.config.namespaces == "*":
if settings.namespaces == "*":
ret_multi = await loop.run_in_executor(
self.executor,
lambda: all_namespaces_request(
watch=False,
label_selector=self.config.selector,
label_selector=settings.selector,
),
)
self.debug(f"Found {len(ret_multi.items)} {kind} in {self.cluster}")
logger.debug(f"Found {len(ret_multi.items)} {kind} in {self.cluster}")
for item in ret_multi.items:
for container in item.spec.template.spec.containers:
yield self.__build_obj(item, container, kind)
Expand All @@ -156,10 +173,10 @@ async def _list_workflows(
lambda: namespaced_request(
namespace=namespace,
watch=False,
label_selector=self.config.selector,
label_selector=settings.selector,
),
)
for namespace in self.config.namespaces
for namespace in settings.namespaces
]

total_items = 0
Expand All @@ -170,16 +187,15 @@ async def _list_workflows(
for container in item.spec.template.spec.containers:
yield self.__build_obj(item, container, kind)

self.debug(f"Found {total_items} {kind} in {self.cluster}")
logger.debug(f"Found {total_items} {kind} in {self.cluster}")
except ApiException as e:
if kind == "Rollout" and e.status in [400, 401, 403, 404]:
if self.__rollouts_available:
self.debug(f"Rollout API not available in {self.cluster}")
logger.debug(f"Rollout API not available in {self.cluster}")
self.__rollouts_available = False
else:
self.error(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}")
self.debug_exception()
self.error("Will skip this object type and continue.")
logger.exception(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}")
logger.error("Will skip this object type and continue.")

def _list_deployments(self) -> AsyncIterator[K8sObjectData]:
return self._list_workflows(
Expand Down Expand Up @@ -296,61 +312,63 @@ async def _try_list_hpa(self) -> dict[HPAKey, HPAData]:
try:
return await self.__list_hpa()
except Exception as e:
self.error(f"Error trying to list hpa in cluster {self.cluster}: {e}")
self.debug_exception()
self.error(
logger.exception(f"Error trying to list hpa in cluster {self.cluster}: {e}")
logger.error(
"Will assume that there are no HPA. "
"Be careful as this may lead to inaccurate results if object actually has HPA."
)
return {}


class KubernetesLoader(Configurable):
class KubernetesLoader:
def __init__(self) -> None:
self._cluster_loaders: dict[Optional[str], ClusterLoader] = {}

async def list_clusters(self) -> Optional[list[str]]:
"""List all clusters.
Returns:
A list of clusters.
"""

if self.config.inside_cluster:
self.debug("Working inside the cluster")
if settings.inside_cluster:
logger.debug("Working inside the cluster")
return None

try:
contexts, current_context = config.list_kube_config_contexts(self.config.kubeconfig)
contexts, current_context = config.list_kube_config_contexts(settings.kubeconfig)
except config.ConfigException:
if self.config.clusters is not None and self.config.clusters != "*":
self.warning("Could not load context from kubeconfig.")
self.warning(f"Falling back to clusters from CLI: {self.config.clusters}")
return self.config.clusters
if settings.clusters is not None and settings.clusters != "*":
logger.warning("Could not load context from kubeconfig.")
logger.warning(f"Falling back to clusters from CLI: {settings.clusters}")
return settings.clusters
else:
self.error(
logger.error(
"Could not load context from kubeconfig. "
"Please check your kubeconfig file or pass -c flag with the context name."
)
return None

self.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}")
self.debug(f"Current cluster: {current_context['name']}")
logger.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}")
logger.debug(f"Current cluster: {current_context['name']}")

self.debug(f"Configured clusters: {self.config.clusters}")
logger.debug(f"Configured clusters: {settings.clusters}")

# None, empty means current cluster
if not self.config.clusters:
if not settings.clusters:
return [current_context["name"]]

# * means all clusters
if self.config.clusters == "*":
if settings.clusters == "*":
return [context["name"] for context in contexts]

return [context["name"] for context in contexts if context["name"] in self.config.clusters]
return [context["name"] for context in contexts if context["name"] in settings.clusters]

def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[ClusterLoader]:
try:
return ClusterLoader(cluster=cluster, config=self.config)
return ClusterLoader(cluster=cluster)
except Exception as e:
self.error(f"Could not load cluster {cluster} and will skip it: {e}")
logger.error(f"Could not load cluster {cluster} and will skip it: {e}")
return None

async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncIterator[K8sObjectData]:
Expand All @@ -364,17 +382,25 @@ async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncIt
else:
_cluster_loaders = [self._try_create_cluster_loader(cluster) for cluster in clusters]

cluster_loaders = [cl for cl in _cluster_loaders if cl is not None]
if cluster_loaders == []:
self.error("Could not load any cluster.")
self.cluster_loaders = {cl.cluster: cl for cl in _cluster_loaders if cl is not None}
if self.cluster_loaders == {}:
logger.error("Could not load any cluster.")
return

# https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python
# This will merge all the streams from all the cluster loaders into a single stream
objects_combined = aiostream.stream.merge(
*[cluster_loader.list_scannable_objects() for cluster_loader in cluster_loaders]
*[cluster_loader.list_scannable_objects() for cluster_loader in self.cluster_loaders.values()]
)

async with objects_combined.stream() as streamer:
async for object in streamer:
yield object

async def load_pods(self, object: K8sObjectData) -> list[PodData]:
try:
cluster_loader = self.cluster_loaders[object.cluster]
except KeyError:
raise RuntimeError(f"Cluster loader for cluster {object.cluster} not found") from None

return await cluster_loader.list_pods(object)
Loading

0 comments on commit 0f16f93

Please sign in to comment.