Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization on memory usage #118

Merged
merged 6 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
about-time==4.2.1 ; python_version >= "3.9" and python_version < "3.12"
aiostream==0.4.5 ; python_version >= "3.9" and python_version < "3.12"
alive-progress==3.1.2 ; python_version >= "3.9" and python_version < "3.12"
cachetools==5.3.0 ; python_version >= "3.9" and python_version < "3.12"
certifi==2022.12.7 ; python_version >= "3.9" and python_version < "3.12"
Expand Down
164 changes: 74 additions & 90 deletions robusta_krr/core/integrations/kubernetes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import itertools
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Union
from typing import AsyncGenerator, Optional, Union
import aiostream

from kubernetes import client, config # type: ignore
from kubernetes.client import ApiException
Expand All @@ -23,7 +23,7 @@
V2HorizontalPodAutoscalerList,
)

from robusta_krr.core.models.objects import HPAData, K8sObjectData, PodData
from robusta_krr.core.models.objects import HPAData, K8sObjectData
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.configurable import Configurable

Expand Down Expand Up @@ -53,7 +53,7 @@ def __init__(self, cluster: Optional[str], *args, **kwargs):
self.autoscaling_v1 = client.AutoscalingV1Api(api_client=self.api_client)
self.autoscaling_v2 = client.AutoscalingV2Api(api_client=self.api_client)

async def list_scannable_objects(self) -> list[K8sObjectData]:
async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]:
"""List all scannable objects.

Returns:
Expand All @@ -63,32 +63,31 @@ async def list_scannable_objects(self) -> list[K8sObjectData]:
self.info(f"Listing scannable objects in {self.cluster}")
self.debug(f"Namespaces: {self.config.namespaces}")

try:
self.__hpa_list = await self.__list_hpa()
objects_tuple = await asyncio.gather(
self._list_deployments(),
self._list_rollouts(),
self._list_all_statefulsets(),
self._list_all_daemon_set(),
self._list_all_jobs(),
)

except Exception as e:
self.error(f"Error trying to list pods in cluster {self.cluster}: {e}")
self.debug_exception()
return []

objects = itertools.chain(*objects_tuple)
if self.config.namespaces == "*":
# NOTE: We are not scanning kube-system namespace by default
result = [obj for obj in objects if obj.namespace != "kube-system"]
else:
result = [obj for obj in objects if obj.namespace in self.config.namespaces]

namespaces = {obj.namespace for obj in result}
self.info(f"Found {len(result)} objects across {len(namespaces)} namespaces in {self.cluster}")

return result
self.__hpa_list = await self._try_list_hpa()

tasks = [
self._list_deployments(),
self._list_rollouts(),
self._list_all_statefulsets(),
self._list_all_daemon_set(),
self._list_all_jobs(),
]

for fut in asyncio.as_completed(tasks):
try:
object_list = await fut
except Exception as e:
self.error(f"Error {e.__class__.__name__} listing objects in cluster {self.cluster}: {e}")
self.debug_exception()
self.error("Will skip this object type and continue.")
LeaveMyYard marked this conversation as resolved.
Show resolved Hide resolved
continue

for object in object_list:
if self.config.namespaces == "*" and object.namespace == "kube-system":
continue
elif self.config.namespaces != "*" and object.namespace not in self.config.namespaces:
continue
yield object

@staticmethod
def _get_match_expression_filter(expression) -> str:
Expand All @@ -111,22 +110,12 @@ def _build_selector_query(selector: V1LabelSelector) -> Union[str, None]:

return ",".join(label_filters)

async def __list_pods(self, resource: Union[V1Deployment, V1DaemonSet, V1StatefulSet]) -> list[PodData]:
selector = self._build_selector_query(resource.spec.selector)
if selector is None:
return []

loop = asyncio.get_running_loop()
ret: V1PodList = await loop.run_in_executor(
self.executor,
lambda: self.core.list_namespaced_pod(namespace=resource.metadata.namespace, label_selector=selector),
)
return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items]

async def __build_obj(self, item: AnyKubernetesAPIObject, container: V1Container) -> K8sObjectData:
def __build_obj(
self, item: AnyKubernetesAPIObject, container: V1Container, kind: Optional[str] = None
) -> K8sObjectData:
name = item.metadata.name
namespace = item.metadata.namespace
kind = item.__class__.__name__[2:]
kind = kind or item.__class__.__name__[2:]

return K8sObjectData(
cluster=self.cluster,
Expand All @@ -135,7 +124,6 @@ async def __build_obj(self, item: AnyKubernetesAPIObject, container: V1Container
kind=kind,
container=container.name,
allocations=ResourceAllocations.from_container(container),
pods=await self.__list_pods(item),
hpa=self.__hpa_list.get((namespace, kind, name)),
)

Expand All @@ -151,13 +139,9 @@ async def _list_deployments(self) -> list[K8sObjectData]:
)
self.debug(f"Found {len(ret.items)} deployments in {self.cluster}")

return await asyncio.gather(
*[
self.__build_obj(item, container)
for item in ret.items
for container in item.spec.template.spec.containers
]
)
return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]

async def _list_rollouts(self) -> list[K8sObjectData]:
self.debug(f"Listing ArgoCD rollouts in {self.cluster}")
Expand All @@ -178,13 +162,9 @@ async def _list_rollouts(self) -> list[K8sObjectData]:

self.debug(f"Found {len(ret.items)} rollouts in {self.cluster}")

return await asyncio.gather(
*[
self.__build_obj(item, container)
for item in ret.items
for container in item.spec.template.spec.containers
]
)
return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]

async def _list_all_statefulsets(self) -> list[K8sObjectData]:
self.debug(f"Listing statefulsets in {self.cluster}")
Expand All @@ -198,13 +178,9 @@ async def _list_all_statefulsets(self) -> list[K8sObjectData]:
)
self.debug(f"Found {len(ret.items)} statefulsets in {self.cluster}")

return await asyncio.gather(
*[
self.__build_obj(item, container)
for item in ret.items
for container in item.spec.template.spec.containers
]
)
return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]

async def _list_all_daemon_set(self) -> list[K8sObjectData]:
self.debug(f"Listing daemonsets in {self.cluster}")
Expand All @@ -218,13 +194,9 @@ async def _list_all_daemon_set(self) -> list[K8sObjectData]:
)
self.debug(f"Found {len(ret.items)} daemonsets in {self.cluster}")

return await asyncio.gather(
*[
self.__build_obj(item, container)
for item in ret.items
for container in item.spec.template.spec.containers
]
)
return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]

async def _list_all_jobs(self) -> list[K8sObjectData]:
self.debug(f"Listing jobs in {self.cluster}")
Expand All @@ -238,13 +210,9 @@ async def _list_all_jobs(self) -> list[K8sObjectData]:
)
self.debug(f"Found {len(ret.items)} jobs in {self.cluster}")

return await asyncio.gather(
*[
self.__build_obj(item, container)
for item in ret.items
for container in item.spec.template.spec.containers
]
)
return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]

async def _list_pods(self) -> list[K8sObjectData]:
"""For future use, not supported yet."""
Expand All @@ -260,9 +228,7 @@ async def _list_pods(self) -> list[K8sObjectData]:
)
self.debug(f"Found {len(ret.items)} pods in {self.cluster}")

return await asyncio.gather(
*[self.__build_obj(item, container) for item in ret.items for container in item.spec.containers]
)
return [self.__build_obj(item, container) for item in ret.items for container in item.spec.containers]

async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]:
loop = asyncio.get_running_loop()
Expand Down Expand Up @@ -340,6 +306,18 @@ async def __list_hpa(self) -> dict[HPAKey, HPAData]:
# If V2 API does not exist, fall back to V1
return await self.__list_hpa_v1()

async def _try_list_hpa(self) -> dict[HPAKey, HPAData]:
try:
return await self.__list_hpa()
except Exception as e:
self.error(f"Error trying to list hpa in cluster {self.cluster}: {e}")
self.debug_exception()
self.error(
"Will assume that there are no HPA. "
"Be careful as this may lead to inaccurate results if object actually has HPA."
)
return {}


class KubernetesLoader(Configurable):
async def list_clusters(self) -> Optional[list[str]]:
Expand Down Expand Up @@ -389,13 +367,12 @@ def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[Cluster
self.error(f"Could not load cluster {cluster} and will skip it: {e}")
return None

async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]:
async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncGenerator[K8sObjectData, None]:
"""List all scannable objects.

Returns:
A list of scannable objects.
Yields:
Each scannable object as it is loaded.
"""

if clusters is None:
_cluster_loaders = [self._try_create_cluster_loader(None)]
else:
Expand All @@ -404,7 +381,14 @@ async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8
cluster_loaders = [cl for cl in _cluster_loaders if cl is not None]
if cluster_loaders == []:
self.error("Could not load any cluster.")
return []
return

# https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python
# This will merge all the streams from all the cluster loaders into a single stream
objects_combined = aiostream.stream.merge(
*[cluster_loader.list_scannable_objects() for cluster_loader in cluster_loaders]
)

objects = await asyncio.gather(*[cluster_loader.list_scannable_objects() for cluster_loader in cluster_loaders])
return list(itertools.chain(*objects))
async with objects_combined.stream() as streamer:
async for object in streamer:
yield object
2 changes: 1 addition & 1 deletion robusta_krr/core/integrations/prometheus/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def gather_data(
ResourceHistoryData: The gathered resource history data.
"""

await self.loader.add_historic_pods(object, period)
await self.loader.load_pods(object, period)

return {
MetricLoader.__name__: await self.loader.gather_data(object, MetricLoader, period, step)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import datetime
import time
from typing import List, Optional
from concurrent.futures import ThreadPoolExecutor

Expand Down Expand Up @@ -123,7 +124,7 @@ def validate_cluster_name(self):
cluster_label = self.config.prometheus_cluster_label
cluster_names = self.get_cluster_names()

if len(cluster_names) <= 1:
if cluster_names is None or len(cluster_names) <= 1:
# there is only one cluster of metrics in this prometheus
return

Expand Down Expand Up @@ -153,19 +154,21 @@ async def gather_data(
"""
ResourceHistoryData: The gathered resource history data.
"""
self.debug(f"Gathering data for {object} and {LoaderClass}")
self.debug(f"Gathering {LoaderClass.__name__} metric for {object}")

metric_loader = LoaderClass(self.config, self.prometheus, self.name, self.executor)
return await metric_loader.load_data(object, period, step)

async def add_historic_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None:
async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None:
"""
Finds pods that have been deleted but still have some metrics in Prometheus.
List pods related to the object and add them to the object's pods list.
Args:
object (K8sObjectData): The Kubernetes object.
period (datetime.timedelta): The time period for which to gather data.
"""

self.debug(f"Adding historic pods for {object}")

days_literal = min(int(period.total_seconds()) // 60 // 24, 32)
period_literal = f"{days_literal}d"
pod_owners: list[str]
Expand All @@ -183,25 +186,47 @@ async def add_historic_pods(self, object: K8sObjectData, period: datetime.timede
)
pod_owners = [replicaset["metric"]["replicaset"] for replicaset in replicasets]
pod_owner_kind = "ReplicaSet"

del replicasets
else:
pod_owners = [object.name]
pod_owner_kind = object.kind

owners_regex = "|".join(pod_owners)
related_pods = await self.query(
"kube_pod_owner{"
f'owner_name=~"{owners_regex}", '
f'owner_kind="{pod_owner_kind}", '
f'namespace="{object.namespace}"'
f"{cluster_label}"
"}"
f"[{period_literal}]"
f"""
last_over_time(
kube_pod_owner{{
owner_name=~"{owners_regex}",
owner_kind="{pod_owner_kind}",
namespace="{object.namespace}"
{cluster_label}
}}[{period_literal}]
LeaveMyYard marked this conversation as resolved.
Show resolved Hide resolved
)
"""
)

if related_pods == []:
self.debug(f"No pods found for {object}")
return

current_pods = await self.query(
Avi-Robusta marked this conversation as resolved.
Show resolved Hide resolved
f"""
present_over_time(
kube_pod_owner{{
owner_name=~"{owners_regex}",
owner_kind="{pod_owner_kind}",
namespace="{object.namespace}"
{cluster_label}
}}[1m]
)
"""
)

current_pods = {p.name for p in object.pods}
current_pods_set = {pod["metric"]["pod"] for pod in current_pods}
del current_pods

object.pods += [
PodData(name=pod["metric"]["pod"], deleted=True)
PodData(name=pod["metric"]["pod"], deleted=pod["metric"]["pod"] not in current_pods_set)
for pod in related_pods
if pod["metric"]["pod"] not in current_pods
]
2 changes: 1 addition & 1 deletion robusta_krr/core/models/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class K8sObjectData(pd.BaseModel):
cluster: Optional[str]
name: str
container: str
pods: list[PodData]
pods: list[PodData] = []
hpa: Optional[HPAData]
namespace: str
kind: str
Expand Down
Loading
Loading