Skip to content

Commit

Permalink
[config-feeder] add autoscaling config collector
Browse files Browse the repository at this point in the history
  • Loading branch information
slicklash committed Jul 10, 2023
1 parent 0ccbec5 commit 0228300
Show file tree
Hide file tree
Showing 18 changed files with 511 additions and 4 deletions.
Empty file.
33 changes: 33 additions & 0 deletions granulate_utils/config_feeder/client/autoscaling/collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import logging
from typing import Optional, Union

from granulate_utils.config_feeder.client.bigdata.databricks import get_databricks_autoscaling_config
from granulate_utils.config_feeder.client.bigdata.dataproc import get_dataproc_autoscaling_config
from granulate_utils.config_feeder.client.bigdata.emr import get_emr_autoscaling_config
from granulate_utils.config_feeder.core.models.autoscaling import AutoScalingConfig
from granulate_utils.config_feeder.core.models.cluster import BigDataPlatform
from granulate_utils.config_feeder.core.models.node import NodeInfo


class AutoScalingConfigCollector:
def __init__(self, *, logger: Union[logging.Logger, logging.LoggerAdapter]) -> None:
self.logger = logger

async def collect(self, node_info: NodeInfo) -> Optional[AutoScalingConfig]:
if not node_info.is_master:
self.logger.debug("not a master node, skipping")
return None

if node_info.bigdata_platform == BigDataPlatform.EMR:
return await get_emr_autoscaling_config(node_info, logger=self.logger)

if node_info.bigdata_platform == BigDataPlatform.DATAPROC:
return await get_dataproc_autoscaling_config(node_info, logger=self.logger)

if node_info.bigdata_platform == BigDataPlatform.DATABRICKS:
return await get_databricks_autoscaling_config(node_info)

self.logger.debug(
f"{node_info.bigdata_platform} on {node_info.provider} is not yet supported, skipping" # noqa: E501
)
return None
10 changes: 10 additions & 0 deletions granulate_utils/config_feeder/client/autoscaling/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Any, Dict

from pydantic import BaseModel

from granulate_utils.config_feeder.core.models.autoscaling import AutoScalingMode


class AutoScalingConfig(BaseModel):
mode: AutoScalingMode
config: Dict[str, Any]
15 changes: 15 additions & 0 deletions granulate_utils/config_feeder/client/bigdata/databricks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Dict, List, Optional

from granulate_utils.config_feeder.core.models.autoscaling import AutoScalingConfig, AutoScalingMode
from granulate_utils.config_feeder.core.models.cluster import BigDataPlatform, CloudProvider
from granulate_utils.config_feeder.core.models.node import NodeInfo
from granulate_utils.config_feeder.core.utils import mask_sensitive_value
Expand Down Expand Up @@ -34,6 +35,20 @@ def get_databricks_node_info() -> Optional[NodeInfo]:
return None


async def get_databricks_autoscaling_config(
node: NodeInfo,
) -> Optional[AutoScalingConfig]:
if node.properties.get("spark.databricks.clusterUsageTags.clusterScalingType") == "autoscaling":
return AutoScalingConfig(
mode=AutoScalingMode.MANAGED,
config={
"min_workers": int(node.properties.get("spark.databricks.clusterUsageTags.clusterMinWorkers", -1)),
"max_workers": int(node.properties.get("spark.databricks.clusterUsageTags.clusterMaxWorkers", -1)),
},
)
return None


def _get_deploy_conf() -> Optional[Dict[str, str]]:
"""
Reads dataproc properties from deploy.conf
Expand Down
39 changes: 39 additions & 0 deletions granulate_utils/config_feeder/client/bigdata/dataproc.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import asyncio
import json
import logging
from typing import Any, Dict, Optional, Union

import requests
from requests.exceptions import ConnectionError, JSONDecodeError

from granulate_utils.config_feeder.core.models.autoscaling import AutoScalingConfig, AutoScalingMode
from granulate_utils.config_feeder.core.models.cluster import BigDataPlatform, CloudProvider
from granulate_utils.config_feeder.core.models.node import NodeInfo

Expand Down Expand Up @@ -43,9 +46,45 @@ def get_dataproc_node_info(logger: Optional[Union[logging.Logger, logging.Logger
return None


async def get_dataproc_autoscaling_config(
node: NodeInfo, *, logger: Union[logging.Logger, logging.LoggerAdapter]
) -> Optional[AutoScalingConfig]:
if (
cluster_info := await _run_gcloud_command(
node,
f"dataproc clusters describe {node.properties['cluster_name']}",
logger=logger,
)
) is None:
logger.error("failed to get cluster info")
return None
if policy_url := cluster_info.get("config", {}).get("autoscalingConfig", {}).get("policyUri"):
if (
policy := await _run_gcloud_command(
node, f"dataproc autoscaling-policies describe {policy_url}", logger=logger
)
) is not None:
return AutoScalingConfig(mode=AutoScalingMode.CUSTOM, config=policy)
else:
logger.error("failed to get autoscaling policy")
return None


def _get_metadata() -> Dict[str, Any]:
url = "http://metadata.google.internal/computeMetadata/v1/instance/?recursive=true" # noqa: E501
headers = {"Metadata-Flavor": "Google"}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()


async def _run_gcloud_command(
node: NodeInfo, command: str, *, logger: Union[logging.Logger, logging.LoggerAdapter]
) -> Optional[Dict[str, Any]]:
cmd = f"gcloud {command} --region={node.properties['region']} --format=json"
process = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await process.communicate()
if process.returncode != 0:
logger.error("failed to run gcloud command", extra={"command": command, "stderr": stderr.decode()})
return None
return json.loads(stdout.decode().strip())
40 changes: 39 additions & 1 deletion granulate_utils/config_feeder/client/bigdata/emr.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import asyncio
import json
from typing import Dict, Optional
import logging
from typing import Any, Dict, Optional, Union

from granulate_utils.config_feeder.core.models.autoscaling import AutoScalingConfig, AutoScalingMode
from granulate_utils.config_feeder.core.models.cluster import BigDataPlatform, CloudProvider
from granulate_utils.config_feeder.core.models.node import NodeInfo

Expand All @@ -22,6 +25,29 @@ def get_emr_node_info() -> Optional[NodeInfo]:
return None


async def get_emr_autoscaling_config(
node: NodeInfo, *, logger: Union[logging.Logger, logging.LoggerAdapter]
) -> Optional[AutoScalingConfig]:
result = {}
if (cluster_info := await _run_emr_command(node, "describe-cluster", logger=logger)) is not None:
for group in cluster_info.get("Cluster", {}).get("InstanceGroups", []):
if "AutoScalingPolicy" in group:
policy = group["AutoScalingPolicy"]
result[group["Id"]] = {
"instance_group_type": group["InstanceGroupType"],
"constraints": policy["Constraints"],
"rules": policy["Rules"],
}
else:
logger.error("failed to get EMR cluster info")
return None
if result:
return AutoScalingConfig(mode=AutoScalingMode.CUSTOM, config=result)
if managed_policy := await _run_emr_command(node, "get-managed-scaling-policy", logger=logger):
return AutoScalingConfig(mode=AutoScalingMode.MANAGED, config=managed_policy)
return None


def _get_emr_job_info() -> Optional[Dict[str, str]]:
try:
with open("/mnt/var/lib/info/job-flow.json", "r") as f:
Expand All @@ -43,3 +69,15 @@ def _get_is_master() -> bool:
obj = json.load(f)
result: bool = obj["isMaster"]
return result


async def _run_emr_command(
node: NodeInfo, command: str, *, logger: Union[logging.Logger, logging.LoggerAdapter]
) -> Optional[Dict[str, Any]]:
cmd = f"aws emr {command} --cluster-id {node.external_cluster_id}"
process = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await process.communicate()
if process.returncode != 0:
logger.error("failed to run EMR command", extra={"command": command, "stderr": stderr.decode()})
return None
return json.loads(stdout.decode().strip())
41 changes: 39 additions & 2 deletions granulate_utils/config_feeder/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
from requests import Session
from requests.exceptions import ConnectionError, JSONDecodeError

from granulate_utils.config_feeder.client.autoscaling.collector import AutoScalingConfigCollector
from granulate_utils.config_feeder.client.bigdata import get_node_info
from granulate_utils.config_feeder.client.exceptions import APIError, ClientError
from granulate_utils.config_feeder.client.models import CollectionResult, ConfigType
from granulate_utils.config_feeder.client.yarn.collector import YarnConfigCollector
from granulate_utils.config_feeder.client.yarn.models import YarnConfig
from granulate_utils.config_feeder.core.errors import raise_for_code
from granulate_utils.config_feeder.core.models.aggregation import CreateNodeConfigsRequest, CreateNodeConfigsResponse
from granulate_utils.config_feeder.core.models.autoscaling import AutoScalingConfig, ClusterAutoScalingConfigCreate
from granulate_utils.config_feeder.core.models.cluster import ClusterCreate, CreateClusterRequest, CreateClusterResponse
from granulate_utils.config_feeder.core.models.collection import CollectorType
from granulate_utils.config_feeder.core.models.node import CreateNodeRequest, CreateNodeResponse, NodeCreate, NodeInfo
Expand All @@ -33,6 +35,7 @@ def __init__(
logger: Union[logging.Logger, logging.LoggerAdapter],
server_address: Optional[str] = None,
yarn: bool = True,
autoscaling: bool = True,
collector_type=CollectorType.SAGENT,
) -> None:
if not token or not service:
Expand All @@ -45,6 +48,8 @@ def __init__(
self._server_address: str = server_address.rstrip("/") if server_address else DEFAULT_API_SERVER_ADDRESS
self._is_yarn_enabled = yarn
self._yarn_collector = YarnConfigCollector(logger=logger)
self._is_autoscaling_enabled = autoscaling
self._autoscaling_collector = AutoScalingConfigCollector(logger=logger)
self._last_hash: DefaultDict[ConfigType, Dict[str, str]] = defaultdict(dict)
self._init_api_session()

Expand All @@ -67,8 +72,9 @@ def collect(self) -> None:
async def _collect(self, node_info: NodeInfo) -> CollectionResult:
results = await asyncio.gather(
self._collect_yarn_config(node_info),
self._collect_autoscaling_config(node_info),
)
return CollectionResult(node=node_info, yarn_config=results[0])
return CollectionResult(node=node_info, yarn_config=results[0], autoscaling_config=results[1])

async def _collect_yarn_config(self, node_info: NodeInfo) -> Optional[YarnConfig]:
if not self._is_yarn_enabled:
Expand All @@ -78,6 +84,14 @@ async def _collect_yarn_config(self, node_info: NodeInfo) -> Optional[YarnConfig
self.logger.info("YARN config collection finished")
return yarn_config

async def _collect_autoscaling_config(self, node_info: NodeInfo) -> Optional[AutoScalingConfig]:
if not self._is_autoscaling_enabled:
return None
self.logger.info("AutoScaling config collection starting")
autoscaling_config = await self._autoscaling_collector.collect(node_info)
self.logger.info("AutoScaling config collection finished")
return autoscaling_config

def _submit_node_configs(
self,
collection_result: CollectionResult,
Expand All @@ -96,10 +110,16 @@ def _submit_node_configs(
assert request.yarn_config is not None
self._last_hash[ConfigType.YARN][external_id] = collection_result.yarn_config_hash

if response.autoscaling_config is not None:
assert request.autoscaling_config is not None
self._last_hash[ConfigType.AUTOSCALING][external_id] = collection_result.autoscaling_config_hash

def _register_node(
self,
node: NodeInfo,
) -> str:
if self._cluster_id is None:
self._register_cluster(node)
assert self._cluster_id is not None
self.logger.debug(f"registering node {node.external_id}")
request = CreateNodeRequest(
Expand Down Expand Up @@ -131,12 +151,14 @@ def _register_cluster(self, node_info: NodeInfo) -> None:

def _get_configs_request(self, configs: CollectionResult) -> Optional[CreateNodeConfigsRequest]:
yarn_config = self._get_yarn_config_if_changed(configs)
autoscaling_config = self._get_autoscaling_config_if_changed(configs)

if yarn_config is None:
if yarn_config is None and autoscaling_config is None:
return None

return CreateNodeConfigsRequest(
yarn_config=yarn_config,
autoscaling_config=autoscaling_config,
)

def _get_yarn_config_if_changed(self, configs: CollectionResult) -> Optional[NodeYarnConfigCreate]:
Expand All @@ -149,6 +171,21 @@ def _get_yarn_config_if_changed(self, configs: CollectionResult) -> Optional[Nod
collector_type=self._collector_type, config_json=json.dumps(configs.yarn_config.config)
)

def _get_autoscaling_config_if_changed(
self,
configs: CollectionResult,
) -> Optional[ClusterAutoScalingConfigCreate]:
if configs.autoscaling_config is None:
return None
if self._last_hash[ConfigType.AUTOSCALING].get(configs.node.external_id) == configs.autoscaling_config_hash:
self.logger.debug("AutoScaling config is up to date")
return None
return ClusterAutoScalingConfigCreate(
collector_type=self._collector_type,
mode=configs.autoscaling_config.mode,
config_json=json.dumps(configs.autoscaling_config.config),
)

def _api_request(
self,
method: str,
Expand Down
10 changes: 9 additions & 1 deletion granulate_utils/config_feeder/client/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pydantic import BaseModel, root_validator

from granulate_utils.config_feeder.client.yarn.models import YarnConfig
from granulate_utils.config_feeder.core.models.autoscaling import AutoScalingConfig
from granulate_utils.config_feeder.core.models.node import NodeInfo
from granulate_utils.config_feeder.core.utils import get_config_hash

Expand All @@ -14,16 +15,23 @@ class CollectionResult(BaseModel):
yarn_config: Optional[YarnConfig]
yarn_config_hash: str = ""

autoscaling_config: Optional[AutoScalingConfig]
autoscaling_config_hash: str = ""

@root_validator(pre=False)
def _set_hashes(cls, values: Dict[str, Any]) -> Dict[str, Any]:
if yarn_config := values.get("yarn_config"):
values["yarn_config_hash"] = get_config_hash(yarn_config.config)

if autoscaling_config := values.get("autoscaling_config"):
values["autoscaling_config_hash"] = get_config_hash(autoscaling_config.config)
return values

@property
def is_empty(self) -> bool:
return self.yarn_config is None
return self.yarn_config is None and self.autoscaling_config is None


class ConfigType(Enum):
YARN = 0
AUTOSCALING = 1
8 changes: 8 additions & 0 deletions granulate_utils/config_feeder/core/models/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,26 @@

from pydantic import BaseModel, root_validator

from granulate_utils.config_feeder.core.models.autoscaling import (
ClusterAutoScalingConfig,
ClusterAutoScalingConfigCreate,
)
from granulate_utils.config_feeder.core.models.yarn import NodeYarnConfig, NodeYarnConfigCreate


class CreateNodeConfigsRequest(BaseModel):
yarn_config: Optional[NodeYarnConfigCreate] = None
autoscaling_config: Optional[ClusterAutoScalingConfigCreate] = None

@root_validator
def at_least_one_config_is_required(cls, values: Dict[str, Any]) -> Dict[str, Any]:
if values.get("yarn_config") is not None:
return values
if values.get("autoscaling_config") is not None:
return values
raise ValueError("at least one config is required")


class CreateNodeConfigsResponse(BaseModel):
yarn_config: Optional[NodeYarnConfig]
autoscaling_config: Optional[ClusterAutoScalingConfig]
Loading

0 comments on commit 0228300

Please sign in to comment.