Skip to content

Commit

Permalink
Merge branch 'master' into refactor_cgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
mirikl committed Jun 20, 2023
2 parents a5a5132 + 727d3ee commit 7ca59b1
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 59 deletions.
14 changes: 13 additions & 1 deletion glogger/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def __init__(
server_address: str,
*,
scheme: str = "https",
x_auth_type: str = "",
x_auth_access_key_id: str = "",
x_auth_secret_access_key: str = "",
send_interval: float = 30.0,
send_threshold: float = 0.8,
send_min_interval: float = 10.0,
Expand All @@ -49,6 +52,9 @@ def __init__(
:param application_name: Unique identifier requests coming from this handler.
:param auth_token: Token for authenticating requests to the server.
:param x_auth_type: The type of authentication to use.
:param x_auth_access_key_id: The access key id to use for authentication.
:param x_auth_secret_access_key: The secret access key to use for authentication.
:param server_address: Address of server where to send messages.
:param scheme: The scheme to use as string ('http' or 'https')
:param send_interval: Seconds between sending batches.
Expand All @@ -59,12 +65,15 @@ def __init__(

self.application_name = application_name
self.auth_token = auth_token
self.x_auth_type = x_auth_type
self.x_auth_access_key_id = x_auth_access_key_id
self.x_auth_secret_access_key = x_auth_secret_access_key
self.server_address = server_address
self.send_interval = send_interval
self.send_threshold = send_threshold
self.send_min_interval = send_min_interval
self.max_send_tries = max_send_tries

self.max_send_tries = max_send_tries
self.stdout_logger = get_stdout_logger()
self.server_uri = f"{scheme}://{server_address}/api/v1/logs"
self.jsonify = JSONEncoder(separators=(",", ":"), default=repr).encode # compact, no whitespace
Expand Down Expand Up @@ -162,6 +171,9 @@ def _send_once_to_server(self, data: bytes) -> None:
"Content-Type": "application/json",
"X-Application-Name": self.application_name,
"X-Token": self.auth_token,
"X-Auth-Type": self.x_auth_type,
"X-Auth-Access-Key-Id": self.x_auth_access_key_id,
"X-Auth-Secret-Access-Key": self.x_auth_secret_access_key,
}

# Default compression level (9) is slowest. Level 6 trades a bit of compression for speed.
Expand Down
191 changes: 133 additions & 58 deletions granulate_utils/metadata/databricks_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,42 @@
import json
import logging
import os
import re
import time
from typing import Dict, Optional
from typing import Any, Dict, Optional

import requests

from granulate_utils.exceptions import DatabricksJobNameDiscoverException

HOST_KEY_NAME = "*.sink.ganglia.host"
DATABRICKS_METRICS_PROP_PATH = "/databricks/spark/conf/metrics.properties"
CLUSTER_TAGS_KEY = "spark.databricks.clusterUsageTags.clusterAllTags"
CLUSTER_ALL_TAGS_PROP = "spark.databricks.clusterUsageTags.clusterAllTags"
CLUSTER_NAME_PROP = "spark.databricks.clusterUsageTags.clusterName"
SPARKUI_APPS_URL = "http://{}/api/v1/applications"
REQUEST_TIMEOUT = 5
JOB_NAME_KEY = "RunName"
CLUSTER_NAME_KEY = "ClusterName"
DEFAULT_WEBUI_PORT = 40001
DATABRICKS_JOBNAME_TIMEOUT_S = 2 * 60
RETRY_INTERVAL_S = 1
RUN_ID_REGEX = "run-\\d+-"


class DatabricksClient:
def __init__(self, logger: logging.LoggerAdapter) -> None:
class DBXWebUIEnvWrapper:
def __init__(self, logger: logging.LoggerAdapter, enable_retries: bool = True) -> None:
"""
When `enable_retries` is True, the wrapper will retry the request to the webui until it succeeds or until
"""
self.logger = logger
self.logger.debug("Getting Databricks job name")
self.job_name = self.get_job_name()
if self.job_name is None:
self.enable_retries = enable_retries
self._apps_url: Optional[str] = None
self.logger.debug("Getting DBX environment properties")
self.all_props_dict: Optional[Dict[str, str]] = self.extract_relevant_metadata()
if self.all_props_dict is None:
self.logger.warning(
"Failed initializing Databricks client. Databricks job name will not be included in ephemeral clusters."
"DBXWebUIEnvWrapper failed to get relevant metadata, service name will not include metadata from DBX"
)
else:
self.logger.debug(f"Got Databricks job name: {self.job_name}")

def _request_get(self, url: str) -> requests.Response:
resp = requests.get(url, timeout=REQUEST_TIMEOUT)
Expand All @@ -56,53 +63,48 @@ def get_webui_address() -> Optional[str]:
raise DatabricksJobNameDiscoverException(f"Failed to get Databricks webui address {properties=}") from e
return f"{host}:{DEFAULT_WEBUI_PORT}"

def get_job_name(self) -> Optional[str]:
def extract_relevant_metadata(self) -> Optional[Dict[str, str]]:
# Retry in case of a connection error, as the metrics server might not be up yet.
start_time = time.monotonic()
while time.monotonic() - start_time < DATABRICKS_JOBNAME_TIMEOUT_S:
try:
if cluster_metadata := self._cluster_all_tags_metadata():
name = self._get_name_from_metadata(cluster_metadata)
if name:
self.logger.debug("Found name in metadata", job_name=name, cluster_metadata=cluster_metadata)
return name
else:
self.logger.debug("Failed to extract name from metadata", cluster_metadata=cluster_metadata)
return None
if cluster_all_props := self._cluster_all_tags_metadata():
self.logger.info(
"Successfully got relevant cluster tags metadata", cluster_all_props=cluster_all_props
)
return cluster_all_props
else:
# No job name yet, retry.
# No environment metadata yet, retry.
time.sleep(RETRY_INTERVAL_S)
except DatabricksJobNameDiscoverException:
self.logger.exception("Failed to get Databricks job name")
self.logger.exception("Failed to get DBX environment properties")
return None
except Exception:
self.logger.exception("Generic exception was raise during spark job name discovery")
self.logger.exception("Generic exception was raise during DBX environment properties discovery")
return None
self.logger.info("Databricks get job name timeout, continuing...")
return None

@staticmethod
def _get_name_from_metadata(metadata: Dict[str, str]) -> Optional[str]:
if JOB_NAME_KEY in metadata:
return str(metadata[JOB_NAME_KEY]).replace(" ", "-").lower()
if not self.enable_retries:
break
self.logger.info("Databricks get DBX environment metadata timeout, continuing...")
return None

def _cluster_all_tags_metadata(self) -> Optional[Dict[str, str]]:
def _discover_apps_url(self) -> bool:
"""
Returns `includes spark.databricks.clusterUsageTags.clusterAllTags` tags as `Dict`.
Discovers the SparkUI apps url, and setting it to `self._apps_url`.
Returns `True` if the url was discovered, `False` otherwise.
"""
if not os.path.isfile(DATABRICKS_METRICS_PROP_PATH):
# We want to retry in case the cluster is still initializing, and the file is not yet deployed.
return None
webui = self.get_webui_address()
if webui is None:
# retry
return None
# The API used: https://spark.apache.org/docs/latest/monitoring.html#rest-api
apps_url = SPARKUI_APPS_URL.format(webui)
self.logger.debug("Databricks SparkUI address", apps_url=apps_url)
if self._apps_url is not None: # Checks if the url was already discovered.
return True
else:
if (web_ui_address := self.get_webui_address()) is None:
return False
self._apps_url = SPARKUI_APPS_URL.format(web_ui_address)
self.logger.debug("Databricks SparkUI address", apps_url=self._apps_url)
return True

def _spark_apps_json(self) -> Any:
assert self._apps_url, "SparkUI apps url was not discovered"
try:
response = self._request_get(apps_url)
response = self._request_get(self._apps_url)
except requests.exceptions.RequestException:
# Request might fail in cases where the cluster is still initializing, retrying.
return None
Expand All @@ -117,12 +119,11 @@ def _cluster_all_tags_metadata(self) -> Optional[Dict[str, str]]:
raise DatabricksJobNameDiscoverException(
f"Failed to parse apps url response, query {response.text=}"
) from e
if len(apps) == 0:
# apps might be empty because of initialization, retrying.
self.logger.debug("No apps yet, retrying.")
return None
return apps

env_url = f"{apps_url}/{apps[0]['id']}/environment"
def _spark_app_env_json(self, app_id: str) -> Any:
assert self._apps_url is not None, "SparkUI apps url was not discovered"
env_url = f"{self._apps_url}/{app_id}/environment"
try:
response = self._request_get(env_url)
except Exception as e:
Expand All @@ -132,15 +133,89 @@ def _cluster_all_tags_metadata(self) -> Optional[Dict[str, str]]:
env = response.json()
except Exception as e:
raise DatabricksJobNameDiscoverException(f"Environment request failed {response.text=}") from e
props = env.get("sparkProperties")
if props is None:
raise DatabricksJobNameDiscoverException(f"sparkProperties was not found in {env=}")
for prop in props:
if prop[0] == CLUSTER_TAGS_KEY:
try:
all_tags_value = json.loads(prop[1])
except Exception as e:
raise DatabricksJobNameDiscoverException(f"Failed to parse {prop=}") from e
return {cluster_all_tag["key"]: cluster_all_tag["value"] for cluster_all_tag in all_tags_value}
return env

@staticmethod
def _extract_service_name_candidates(spark_properties: Any) -> Dict[str, str]:
# Creating a dict of the relevant properties and their values.
relevant_props = [CLUSTER_ALL_TAGS_PROP, CLUSTER_NAME_PROP]
service_name_prop_candidates = {prop[0]: prop[1] for prop in spark_properties if prop[0] in relevant_props}
if len(service_name_prop_candidates) == 0:
# We expect at least one of the properties to be present.
raise DatabricksJobNameDiscoverException(
f"Failed to create dict of relevant properties {spark_properties=}"
)
return service_name_prop_candidates

def _cluster_all_tags_metadata(self) -> Optional[Dict[str, str]]:
"""
Returns `includes spark.databricks.clusterUsageTags.clusterAllTags` tags as `Dict`.
In any case this function returns `None`, a retry is required.
"""
if not os.path.isfile(DATABRICKS_METRICS_PROP_PATH):
# We want to retry in case the cluster is still initializing, and the file is not yet deployed.
return None
# Discovering SparkUI apps url.
if self._discover_apps_url() is False:
# SparkUI apps url was not discovered, retrying.
return None

# Getting spark apps in JSON format.
if (apps := self._spark_apps_json()) is None:
return None
if len(apps) == 0:
# apps might be empty because of initialization, retrying.
self.logger.debug("No apps yet, retrying.")
return None

# Extracting for the first app the "sparkProperties" table of the application environment.
full_spark_app_env = self._spark_app_env_json(apps[0]["id"])
spark_properties = full_spark_app_env.get("sparkProperties")
if spark_properties is None:
raise DatabricksJobNameDiscoverException(f"sparkProperties was not found in {full_spark_app_env=}")
service_name_prop_candidates = self._extract_service_name_candidates(spark_properties)

# First, trying to extract `CLUSTER_TAGS_KEY` property, in case not redacted.
if (
cluster_all_tags_value := service_name_prop_candidates.get(CLUSTER_ALL_TAGS_PROP)
) is not None and "redacted" not in cluster_all_tags_value:
try:
cluster_all_tags_value_json = json.loads(cluster_all_tags_value)
except Exception as e:
raise DatabricksJobNameDiscoverException(f"Failed to parse {cluster_all_tags_value}") from e
return self._apply_pattern(
{cluster_all_tag["key"]: cluster_all_tag["value"] for cluster_all_tag in cluster_all_tags_value_json}
)
# As a fallback, trying to extract `CLUSTER_NAME_PROP` property.
elif (cluster_name_value := service_name_prop_candidates.get(CLUSTER_NAME_PROP)) is not None:
return self._apply_pattern({CLUSTER_NAME_KEY: cluster_name_value})
else:
raise DatabricksJobNameDiscoverException(f"Failed to find {CLUSTER_TAGS_KEY=} in {props=}")
raise DatabricksJobNameDiscoverException(
f"Failed to extract {CLUSTER_ALL_TAGS_PROP} or {CLUSTER_NAME_PROP} from {spark_properties=}"
)

@staticmethod
def _apply_pattern(metadata: Dict[str, str]) -> Dict[str, str]:
"""
Applies certain patterns on the metadata values.
We mostly use the metadata values as service names, so we want to make sure the metadata values
match some service name requirements.
e.g.: Job Name might include spaces, we want to replace them with dashes.
"""
if JOB_NAME_KEY in metadata:
metadata[JOB_NAME_KEY] = metadata[JOB_NAME_KEY].replace(" ", "-").lower()
if CLUSTER_NAME_KEY in metadata:
# We've tackled cases where the cluster name includes Run ID, we want to remove it.
metadata[CLUSTER_NAME_KEY] = re.sub(RUN_ID_REGEX, "", metadata[CLUSTER_NAME_KEY])
metadata[CLUSTER_NAME_KEY] = metadata[CLUSTER_NAME_KEY].replace(" ", "-").lower()
return metadata


def get_name_from_metadata(metadata: Dict[str, str]) -> Optional[str]:
assert metadata is not None, "all_props_dict is None, can't get name from metadata"
if job_name := metadata.get(JOB_NAME_KEY):
return f"job-{job_name}"
elif cluster_name := metadata.get(CLUSTER_NAME_KEY):
return cluster_name
return None

0 comments on commit 7ca59b1

Please sign in to comment.