Skip to content

Commit

Permalink
Merge pull request #118 from robusta-dev/refactoring-and-optimization
Browse files Browse the repository at this point in the history
 Optimization on memory usage
  • Loading branch information
LeaveMyYard authored Aug 2, 2023
2 parents d202105 + 5debaa6 commit a46b32f
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 132 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
about-time==4.2.1 ; python_version >= "3.9" and python_version < "3.12"
aiostream==0.4.5 ; python_version >= "3.9" and python_version < "3.12"
alive-progress==3.1.2 ; python_version >= "3.9" and python_version < "3.12"
cachetools==5.3.0 ; python_version >= "3.9" and python_version < "3.12"
certifi==2022.12.7 ; python_version >= "3.9" and python_version < "3.12"
Expand Down
164 changes: 74 additions & 90 deletions robusta_krr/core/integrations/kubernetes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import itertools
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Union
from typing import AsyncGenerator, Optional, Union
import aiostream

from kubernetes import client, config # type: ignore
from kubernetes.client import ApiException
Expand All @@ -23,7 +23,7 @@
V2HorizontalPodAutoscalerList,
)

from robusta_krr.core.models.objects import HPAData, K8sObjectData, PodData
from robusta_krr.core.models.objects import HPAData, K8sObjectData
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.configurable import Configurable

Expand Down Expand Up @@ -53,7 +53,7 @@ def __init__(self, cluster: Optional[str], *args, **kwargs):
self.autoscaling_v1 = client.AutoscalingV1Api(api_client=self.api_client)
self.autoscaling_v2 = client.AutoscalingV2Api(api_client=self.api_client)

async def list_scannable_objects(self) -> list[K8sObjectData]:
async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]:
"""List all scannable objects.
Returns:
Expand All @@ -63,32 +63,31 @@ async def list_scannable_objects(self) -> list[K8sObjectData]:
self.info(f"Listing scannable objects in {self.cluster}")
self.debug(f"Namespaces: {self.config.namespaces}")

try:
self.__hpa_list = await self.__list_hpa()
objects_tuple = await asyncio.gather(
self._list_deployments(),
self._list_rollouts(),
self._list_all_statefulsets(),
self._list_all_daemon_set(),
self._list_all_jobs(),
)

except Exception as e:
self.error(f"Error trying to list pods in cluster {self.cluster}: {e}")
self.debug_exception()
return []

objects = itertools.chain(*objects_tuple)
if self.config.namespaces == "*":
# NOTE: We are not scanning kube-system namespace by default
result = [obj for obj in objects if obj.namespace != "kube-system"]
else:
result = [obj for obj in objects if obj.namespace in self.config.namespaces]

namespaces = {obj.namespace for obj in result}
self.info(f"Found {len(result)} objects across {len(namespaces)} namespaces in {self.cluster}")

return result
self.__hpa_list = await self._try_list_hpa()

tasks = [
self._list_deployments(),
self._list_rollouts(),
self._list_all_statefulsets(),
self._list_all_daemon_set(),
self._list_all_jobs(),
]

for fut in asyncio.as_completed(tasks):
try:
object_list = await fut
except Exception as e:
self.error(f"Error {e.__class__.__name__} listing objects in cluster {self.cluster}: {e}")
self.debug_exception()
self.error("Will skip this object type and continue.")
continue

for object in object_list:
if self.config.namespaces == "*" and object.namespace == "kube-system":
continue
elif self.config.namespaces != "*" and object.namespace not in self.config.namespaces:
continue
yield object

@staticmethod
def _get_match_expression_filter(expression) -> str:
Expand All @@ -111,22 +110,12 @@ def _build_selector_query(selector: V1LabelSelector) -> Union[str, None]:

return ",".join(label_filters)

async def __list_pods(self, resource: Union[V1Deployment, V1DaemonSet, V1StatefulSet]) -> list[PodData]:
selector = self._build_selector_query(resource.spec.selector)
if selector is None:
return []

loop = asyncio.get_running_loop()
ret: V1PodList = await loop.run_in_executor(
self.executor,
lambda: self.core.list_namespaced_pod(namespace=resource.metadata.namespace, label_selector=selector),
)
return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items]

async def __build_obj(self, item: AnyKubernetesAPIObject, container: V1Container) -> K8sObjectData:
def __build_obj(
self, item: AnyKubernetesAPIObject, container: V1Container, kind: Optional[str] = None
) -> K8sObjectData:
name = item.metadata.name
namespace = item.metadata.namespace
kind = item.__class__.__name__[2:]
kind = kind or item.__class__.__name__[2:]

return K8sObjectData(
cluster=self.cluster,
Expand All @@ -135,7 +124,6 @@ async def __build_obj(self, item: AnyKubernetesAPIObject, container: V1Container
kind=kind,
container=container.name,
allocations=ResourceAllocations.from_container(container),
pods=await self.__list_pods(item),
hpa=self.__hpa_list.get((namespace, kind, name)),
)

Expand All @@ -151,13 +139,9 @@ async def _list_deployments(self) -> list[K8sObjectData]:
)
self.debug(f"Found {len(ret.items)} deployments in {self.cluster}")

return await asyncio.gather(
*[
self.__build_obj(item, container)
for item in ret.items
for container in item.spec.template.spec.containers
]
)
return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]

async def _list_rollouts(self) -> list[K8sObjectData]:
self.debug(f"Listing ArgoCD rollouts in {self.cluster}")
Expand All @@ -178,13 +162,9 @@ async def _list_rollouts(self) -> list[K8sObjectData]:

self.debug(f"Found {len(ret.items)} rollouts in {self.cluster}")

return await asyncio.gather(
*[
self.__build_obj(item, container)
for item in ret.items
for container in item.spec.template.spec.containers
]
)
return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]

async def _list_all_statefulsets(self) -> list[K8sObjectData]:
self.debug(f"Listing statefulsets in {self.cluster}")
Expand All @@ -198,13 +178,9 @@ async def _list_all_statefulsets(self) -> list[K8sObjectData]:
)
self.debug(f"Found {len(ret.items)} statefulsets in {self.cluster}")

return await asyncio.gather(
*[
self.__build_obj(item, container)
for item in ret.items
for container in item.spec.template.spec.containers
]
)
return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]

async def _list_all_daemon_set(self) -> list[K8sObjectData]:
self.debug(f"Listing daemonsets in {self.cluster}")
Expand All @@ -218,13 +194,9 @@ async def _list_all_daemon_set(self) -> list[K8sObjectData]:
)
self.debug(f"Found {len(ret.items)} daemonsets in {self.cluster}")

return await asyncio.gather(
*[
self.__build_obj(item, container)
for item in ret.items
for container in item.spec.template.spec.containers
]
)
return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]

async def _list_all_jobs(self) -> list[K8sObjectData]:
self.debug(f"Listing jobs in {self.cluster}")
Expand All @@ -238,13 +210,9 @@ async def _list_all_jobs(self) -> list[K8sObjectData]:
)
self.debug(f"Found {len(ret.items)} jobs in {self.cluster}")

return await asyncio.gather(
*[
self.__build_obj(item, container)
for item in ret.items
for container in item.spec.template.spec.containers
]
)
return [
self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
]

async def _list_pods(self) -> list[K8sObjectData]:
"""For future use, not supported yet."""
Expand All @@ -260,9 +228,7 @@ async def _list_pods(self) -> list[K8sObjectData]:
)
self.debug(f"Found {len(ret.items)} pods in {self.cluster}")

return await asyncio.gather(
*[self.__build_obj(item, container) for item in ret.items for container in item.spec.containers]
)
return [self.__build_obj(item, container) for item in ret.items for container in item.spec.containers]

async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]:
loop = asyncio.get_running_loop()
Expand Down Expand Up @@ -340,6 +306,18 @@ async def __list_hpa(self) -> dict[HPAKey, HPAData]:
# If V2 API does not exist, fall back to V1
return await self.__list_hpa_v1()

async def _try_list_hpa(self) -> dict[HPAKey, HPAData]:
try:
return await self.__list_hpa()
except Exception as e:
self.error(f"Error trying to list hpa in cluster {self.cluster}: {e}")
self.debug_exception()
self.error(
"Will assume that there are no HPA. "
"Be careful as this may lead to inaccurate results if object actually has HPA."
)
return {}


class KubernetesLoader(Configurable):
async def list_clusters(self) -> Optional[list[str]]:
Expand Down Expand Up @@ -389,13 +367,12 @@ def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[Cluster
self.error(f"Could not load cluster {cluster} and will skip it: {e}")
return None

async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]:
async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncGenerator[K8sObjectData, None]:
"""List all scannable objects.
Returns:
A list of scannable objects.
Yields:
Each scannable object as it is loaded.
"""

if clusters is None:
_cluster_loaders = [self._try_create_cluster_loader(None)]
else:
Expand All @@ -404,7 +381,14 @@ async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8
cluster_loaders = [cl for cl in _cluster_loaders if cl is not None]
if cluster_loaders == []:
self.error("Could not load any cluster.")
return []
return

# https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python
# This will merge all the streams from all the cluster loaders into a single stream
objects_combined = aiostream.stream.merge(
*[cluster_loader.list_scannable_objects() for cluster_loader in cluster_loaders]
)

objects = await asyncio.gather(*[cluster_loader.list_scannable_objects() for cluster_loader in cluster_loaders])
return list(itertools.chain(*objects))
async with objects_combined.stream() as streamer:
async for object in streamer:
yield object
2 changes: 1 addition & 1 deletion robusta_krr/core/integrations/prometheus/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def gather_data(
ResourceHistoryData: The gathered resource history data.
"""

await self.loader.add_historic_pods(object, period)
await self.loader.load_pods(object, period)

return {
MetricLoader.__name__: await self.loader.gather_data(object, MetricLoader, period, step)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import datetime
import time
from typing import List, Optional
from concurrent.futures import ThreadPoolExecutor

Expand Down Expand Up @@ -123,7 +124,7 @@ def validate_cluster_name(self):
cluster_label = self.config.prometheus_cluster_label
cluster_names = self.get_cluster_names()

if len(cluster_names) <= 1:
if cluster_names is None or len(cluster_names) <= 1:
# there is only one cluster of metrics in this prometheus
return

Expand Down Expand Up @@ -153,19 +154,21 @@ async def gather_data(
"""
ResourceHistoryData: The gathered resource history data.
"""
self.debug(f"Gathering data for {object} and {LoaderClass}")
self.debug(f"Gathering {LoaderClass.__name__} metric for {object}")

metric_loader = LoaderClass(self.config, self.prometheus, self.name, self.executor)
return await metric_loader.load_data(object, period, step)

async def add_historic_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None:
async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None:
"""
Finds pods that have been deleted but still have some metrics in Prometheus.
List pods related to the object and add them to the object's pods list.
Args:
object (K8sObjectData): The Kubernetes object.
period (datetime.timedelta): The time period for which to gather data.
"""

self.debug(f"Adding historic pods for {object}")

days_literal = min(int(period.total_seconds()) // 60 // 24, 32)
period_literal = f"{days_literal}d"
pod_owners: list[str]
Expand All @@ -183,25 +186,47 @@ async def add_historic_pods(self, object: K8sObjectData, period: datetime.timede
)
pod_owners = [replicaset["metric"]["replicaset"] for replicaset in replicasets]
pod_owner_kind = "ReplicaSet"

del replicasets
else:
pod_owners = [object.name]
pod_owner_kind = object.kind

owners_regex = "|".join(pod_owners)
related_pods = await self.query(
"kube_pod_owner{"
f'owner_name=~"{owners_regex}", '
f'owner_kind="{pod_owner_kind}", '
f'namespace="{object.namespace}"'
f"{cluster_label}"
"}"
f"[{period_literal}]"
f"""
last_over_time(
kube_pod_owner{{
owner_name=~"{owners_regex}",
owner_kind="{pod_owner_kind}",
namespace="{object.namespace}"
{cluster_label}
}}[{period_literal}]
)
"""
)

if related_pods == []:
self.debug(f"No pods found for {object}")
return

current_pods = await self.query(
f"""
present_over_time(
kube_pod_owner{{
owner_name=~"{owners_regex}",
owner_kind="{pod_owner_kind}",
namespace="{object.namespace}"
{cluster_label}
}}[1m]
)
"""
)

current_pods = {p.name for p in object.pods}
current_pods_set = {pod["metric"]["pod"] for pod in current_pods}
del current_pods

object.pods += [
PodData(name=pod["metric"]["pod"], deleted=True)
PodData(name=pod["metric"]["pod"], deleted=pod["metric"]["pod"] not in current_pods_set)
for pod in related_pods
if pod["metric"]["pod"] not in current_pods
]
2 changes: 1 addition & 1 deletion robusta_krr/core/models/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class K8sObjectData(pd.BaseModel):
cluster: Optional[str]
name: str
container: str
pods: list[PodData]
pods: list[PodData] = []
hpa: Optional[HPAData]
namespace: str
kind: str
Expand Down
Loading

0 comments on commit a46b32f

Please sign in to comment.