Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Include bootstrap recommended configs in qualification output #451

Merged
merged 8 commits into from
Jul 26, 2023
49 changes: 2 additions & 47 deletions user_tools/src/spark_rapids_pytools/rapids/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from dataclasses import dataclass

from spark_rapids_pytools.cloud_api.sp_types import ClusterBase, NodeHWInfo
from spark_rapids_pytools.cloud_api.sp_types import ClusterBase
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils
from spark_rapids_pytools.rapids.rapids_tool import RapidsTool
Expand All @@ -36,51 +36,6 @@ def _process_custom_args(self):
def requires_cluster_connection(self) -> bool:
return True

def __calculate_spark_settings(self, worker_info: NodeHWInfo) -> dict:
"""
Calculate the cluster properties that we need to append to the /etc/defaults of the spark
if necessary.
:param worker_info: the hardware info as extracted from the worker. Note that we assume
that all the workers have the same configurations.
:return: dictionary containing 7 spark properties to be set by default on the cluster.
"""
num_gpus = worker_info.gpu_info.num_gpus
gpu_mem = worker_info.gpu_info.gpu_mem
num_cpus = worker_info.sys_info.num_cpus
cpu_mem = worker_info.sys_info.cpu_mem

constants = self.ctxt.get_value('local', 'clusterConfigs', 'constants')
executors_per_node = num_gpus
num_executor_cores = max(1, num_cpus // executors_per_node)
gpu_concurrent_tasks = min(constants.get('maxGpuConcurrent'), gpu_mem // constants.get('gpuMemPerTaskMB'))
# account for system overhead
usable_worker_mem = max(0, cpu_mem - constants.get('systemReserveMB'))
executor_container_mem = usable_worker_mem // executors_per_node
# reserve 10% of heap as memory overhead
max_executor_heap = max(0, int(executor_container_mem * (1 - constants.get('heapOverheadFraction'))))
# give up to 2GB of heap to each executor core
executor_heap = min(max_executor_heap, constants.get('heapPerCoreMB') * num_executor_cores)
executor_mem_overhead = int(executor_heap * constants.get('heapOverheadFraction'))
# use default for pageable_pool to add to memory overhead
pageable_pool = constants.get('defaultPageablePoolMB')
# pinned memory uses any unused space up to 4GB
pinned_mem = min(constants.get('maxPinnedMemoryMB'),
executor_container_mem - executor_heap - executor_mem_overhead - pageable_pool)
executor_mem_overhead += pinned_mem + pageable_pool
res = {
'spark.executor.cores': num_executor_cores,
'spark.executor.memory': f'{executor_heap}m',
'spark.executor.memoryOverhead': f'{executor_mem_overhead}m',
'spark.rapids.sql.concurrentGpuTasks': gpu_concurrent_tasks,
'spark.rapids.memory.pinnedPool.size': f'{pinned_mem}m',
'spark.sql.files.maxPartitionBytes': f'{constants.get("maxSqlFilesPartitionsMB")}m',
'spark.task.resource.gpu.amount': 1 / num_executor_cores,
'spark.rapids.shuffle.multiThreaded.reader.threads': num_executor_cores,
'spark.rapids.shuffle.multiThreaded.writer.threads': num_executor_cores,
'spark.rapids.sql.multiThreadedRead.numThreads': max(20, num_executor_cores)
}
return res

def _run_rapids_tool(self):
"""
Run the bootstrap on the driver node
Expand All @@ -91,7 +46,7 @@ def _run_rapids_tool(self):
worker_hw_info = exec_cluster.get_worker_hw_info()
self.logger.debug('Worker hardware INFO %s', worker_hw_info)
try:
spark_settings = self.__calculate_spark_settings(worker_info=worker_hw_info)
spark_settings = self._calculate_spark_settings(worker_info=worker_hw_info)
self.ctxt.set_ctxt('bootstrap_results', spark_settings)
self.logger.debug('%s Tool finished calculating recommended Apache Spark configurations for cluster %s: %s',
self.pretty_name(),
Expand Down
50 changes: 45 additions & 5 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pandas as pd
from tabulate import tabulate

from spark_rapids_pytools.cloud_api.sp_types import EnumeratedType, ClusterReshape
from spark_rapids_pytools.cloud_api.sp_types import EnumeratedType, ClusterReshape, NodeHWInfo
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator
from spark_rapids_pytools.pricing.price_provider import SavingsEstimator
Expand Down Expand Up @@ -175,7 +175,8 @@ def format_float(x: float) -> str:
report_content.extend(f' - {line}' for line in self.comments)
if self.sections_generators:
for section_generator in self.sections_generators:
report_content.append(Utils.gen_multiline_str(section_generator()))
if section_generator:
report_content.append(Utils.gen_multiline_str(section_generator()))
if self.has_gpu_recommendation():
csp_report = csp_report_provider()
if csp_report:
Expand Down Expand Up @@ -208,6 +209,16 @@ def _process_cpu_cluster_args(self, offline_cluster_opts: dict = None):
self.ctxt.set_ctxt('cpuClusterProxy', cpu_cluster_obj)

def _process_gpu_cluster_args(self, offline_cluster_opts: dict = None) -> bool:
def _process_gpu_cluster_worker_node():
try:
worker_node = gpu_cluster_obj.get_worker_node()
worker_node._pull_and_set_mc_props(cli=self.ctxt.platform.cli) # pylint: disable=protected-access
sys_info = worker_node._pull_sys_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access
gpu_info = worker_node._pull_gpu_hw_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access
worker_node.hw_info = NodeHWInfo(sys_info=sys_info, gpu_info=gpu_info)
except Exception: # pylint: disable=broad-except
return

gpu_cluster_arg = offline_cluster_opts.get('gpuCluster')
if gpu_cluster_arg:
gpu_cluster_obj = self._create_migration_cluster('GPU', gpu_cluster_arg)
Expand All @@ -219,6 +230,12 @@ def _process_gpu_cluster_args(self, offline_cluster_opts: dict = None) -> bool:
self.logger.info('Creating GPU cluster by converting the CPU cluster instances to GPU supported types')
gpu_cluster_obj = self.ctxt.platform.migrate_cluster_to_gpu(orig_cluster)
self.ctxt.set_ctxt('gpuClusterProxy', gpu_cluster_obj)

_process_gpu_cluster_worker_node()
worker_node_hw_info = gpu_cluster_obj.get_worker_hw_info()
if gpu_cluster_obj:
self.ctxt.set_ctxt('recommendedConfigs', self._calculate_spark_settings(worker_node_hw_info))

return gpu_cluster_obj is not None

def _process_offline_cluster_args(self):
Expand Down Expand Up @@ -413,6 +430,19 @@ def __generate_mc_types_conversion_report(self):
report_content.append(self.ctxt.platform.get_footer_message())
return report_content

def __generate_recommended_configs_report(self) -> list:
report_content = []
if self.ctxt.get_ctxt('recommendedConfigs'):
report_content = [
Utils.gen_report_sec_header('Recommended Spark configurations for running on GPUs', hrule=False),
]
conversion_items = []
recommended_configs = self.ctxt.get_ctxt('recommendedConfigs')
for config in recommended_configs:
conversion_items.append([config, recommended_configs[config]])
report_content.append(tabulate(conversion_items))
return report_content

def __generate_cluster_shape_report(self) -> str:
if bool(self.ctxt.platform.ctxt['notes']):
return Utils.gen_multiline_str(self.ctxt.platform.ctxt['notes'].get('clusterShape'))
Expand Down Expand Up @@ -558,6 +588,10 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series:
def __build_global_report_summary(self,
all_apps: pd.DataFrame,
csv_out: str) -> QualificationSummary:
def get_summary_log_file_path():
output_dir = csv_out.split('qualification_summary.csv')[0]
return output_dir + 'rapids_4_spark_qualification_output/rapids_4_spark_qualification_output.log'

if all_apps.empty:
# No need to run saving estimator or process the data frames.
return QualificationSummary(comments=self.__generate_mc_types_conversion_report())
Expand Down Expand Up @@ -595,24 +629,30 @@ def __build_global_report_summary(self,
per_row_flag)
df_final_result = apps_working_set
if not apps_working_set.empty:
self.logger.info('Generating GPU Estimated Speedup and Savings as %s', csv_out)
self.logger.info('Generating GPU Estimated Speedup and Savings as: %s', csv_out)
# we can use the general format as well but this will transform numbers to E+. So, stick with %f
apps_working_set.to_csv(csv_out, float_format='%.2f')
else:
df_final_result = apps_reshaped_df
if not apps_reshaped_df.empty:
# Do not include estimated job frequency in csv file
apps_reshaped_df = apps_reshaped_df.drop(columns=['Estimated Job Frequency (monthly)'])
self.logger.info('Generating GPU Estimated Speedup as %s', csv_out)
self.logger.info('Generating GPU Estimated Speedup: as %s', csv_out)
apps_reshaped_df.to_csv(csv_out, float_format='%.2f')

# add recommended Spark configurations to the summary log file
with open(get_summary_log_file_path(), 'a', encoding='UTF-8') as summary_log_file:
recommended_configs = Utils.gen_multiline_str(self.__generate_recommended_configs_report())
summary_log_file.write(recommended_configs)

return QualificationSummary(comments=report_comments,
all_apps=apps_pruned_df,
recommended_apps=recommended_apps,
savings_report_flag=launch_savings_calc,
df_result=df_final_result,
irrelevant_speedups=speedups_irrelevant_flag,
sections_generators=[self.__generate_mc_types_conversion_report])
sections_generators=[self.__generate_mc_types_conversion_report,
self.__generate_recommended_configs_report])

def _process_output(self):
def process_df_for_stdout(raw_df):
Expand Down
49 changes: 48 additions & 1 deletion user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
from typing import Any, Callable, Dict, List

from spark_rapids_pytools.cloud_api.sp_types import CloudPlatform, get_platform, \
ClusterBase, DeployMode
ClusterBase, DeployMode, NodeHWInfo
from spark_rapids_pytools.common.prop_manager import YAMLPropertiesContainer
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import ToolLogging, Utils
from spark_rapids_pytools.rapids.rapids_job import RapidsJobPropContainer
Expand Down Expand Up @@ -317,6 +318,52 @@ def _generate_platform_report_sections(self) -> List[str]:
return rep_lines
return None

def _calculate_spark_settings(self, worker_info: NodeHWInfo) -> dict:
"""
Calculate the cluster properties that we need to append to the /etc/defaults of the spark
if necessary.
:param worker_info: the hardware info as extracted from the worker. Note that we assume
that all the workers have the same configurations.
:return: dictionary containing 7 spark properties to be set by default on the cluster.
"""
num_gpus = worker_info.gpu_info.num_gpus
gpu_mem = worker_info.gpu_info.gpu_mem
num_cpus = worker_info.sys_info.num_cpus
cpu_mem = worker_info.sys_info.cpu_mem

config_path = Utils.resource_path('cluster-configs.yaml')
constants = YAMLPropertiesContainer(prop_arg=config_path).get_value('clusterConfigs', 'constants')
executors_per_node = num_gpus
num_executor_cores = max(1, num_cpus // executors_per_node)
gpu_concurrent_tasks = min(constants.get('maxGpuConcurrent'), gpu_mem // constants.get('gpuMemPerTaskMB'))
# account for system overhead
usable_worker_mem = max(0, cpu_mem - constants.get('systemReserveMB'))
executor_container_mem = usable_worker_mem // executors_per_node
# reserve 10% of heap as memory overhead
max_executor_heap = max(0, int(executor_container_mem * (1 - constants.get('heapOverheadFraction'))))
# give up to 2GB of heap to each executor core
executor_heap = min(max_executor_heap, constants.get('heapPerCoreMB') * num_executor_cores)
executor_mem_overhead = int(executor_heap * constants.get('heapOverheadFraction'))
# use default for pageable_pool to add to memory overhead
pageable_pool = constants.get('defaultPageablePoolMB')
# pinned memory uses any unused space up to 4GB
pinned_mem = min(constants.get('maxPinnedMemoryMB'),
executor_container_mem - executor_heap - executor_mem_overhead - pageable_pool)
executor_mem_overhead += pinned_mem + pageable_pool
res = {
'spark.executor.cores': num_executor_cores,
'spark.executor.memory': f'{executor_heap}m',
'spark.executor.memoryOverhead': f'{executor_mem_overhead}m',
'spark.rapids.sql.concurrentGpuTasks': gpu_concurrent_tasks,
'spark.rapids.memory.pinnedPool.size': f'{pinned_mem}m',
'spark.sql.files.maxPartitionBytes': f'{constants.get("maxSqlFilesPartitionsMB")}m',
'spark.task.resource.gpu.amount': 1 / num_executor_cores,
'spark.rapids.shuffle.multiThreaded.reader.threads': num_executor_cores,
'spark.rapids.shuffle.multiThreaded.writer.threads': num_executor_cores,
'spark.rapids.sql.multiThreadedRead.numThreads': max(20, num_executor_cores)
}
return res
Comment on lines +353 to +365
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recommendations generated by the bootstrap tool does not match all the platforms.
As an example, the changes in #440 which adjusted the default configurations to fit the Databricks platform.
Given that the bootstrap tool is not available for DB-azure/DB-AWS , this section should not be valid when running against DB.
@cindyyuanjiang I see you put example CLI for DB, but have you actually verified that the bootstrap-configurations are not part of the output on those platforms?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we can not include bootstrap output when running qual tool on DB. File an issue to track adding bootstrap config support for DB qualification.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amahussein thank you! I will remove the bootstrap recommendations for DB platforms.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattahrens thank you! I filed a follow-up issue and tracked in this PR's description.



@dataclass
class RapidsJarTool(RapidsTool):
Expand Down
22 changes: 0 additions & 22 deletions user_tools/src/spark_rapids_pytools/resources/bootstrap-conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,3 @@ local:
cleanUp: true
# Name of the file where the final result is going to show
fileName: rapids_4_dataproc_bootstrap_output.log
clusterConfigs:
constants:
# Maximum amount of pinned memory to use per executor in megabytes
maxPinnedMemoryMB: 4096
# Default pageable pool size per executor in megabytes
defaultPageablePoolMB: 1024
# Maximum number of concurrent tasks to run on the GPU
maxGpuConcurrent: 4
# Amount of GPU memory to use per concurrent task in megabytes
# Using a bit less than 8GB here since Dataproc clusters advertise
# T4s as only having around 14.75 GB and we want to run with
# 2 concurrent by default on T4s.
gpuMemPerTaskMB: 7500
# Ideal amount of JVM heap memory to request per CPU core in megabytes
heapPerCoreMB: 2048
# Fraction of the executor JVM heap size that should be additionally reserved
# for JVM off-heap overhead (thread stacks, native libraries, etc.)
heapOverheadFraction: 0.1
# Amount of CPU memory to reserve for system overhead (kernel, buffers, etc.) in megabytes
systemReserveMB: 2048
# By default set the spark.sql.files.maxPartitionBytes to 512m
maxSqlFilesPartitionsMB: 512
22 changes: 22 additions & 0 deletions user_tools/src/spark_rapids_pytools/resources/cluster-configs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
clusterConfigs:
constants:
# Maximum amount of pinned memory to use per executor in megabytes
maxPinnedMemoryMB: 4096
# Default pageable pool size per executor in megabytes
defaultPageablePoolMB: 1024
# Maximum number of concurrent tasks to run on the GPU
maxGpuConcurrent: 4
# Amount of GPU memory to use per concurrent task in megabytes
# Using a bit less than 8GB here since Dataproc clusters advertise
# T4s as only having around 14.75 GB and we want to run with
# 2 concurrent by default on T4s.
gpuMemPerTaskMB: 7500
# Ideal amount of JVM heap memory to request per CPU core in megabytes
heapPerCoreMB: 2048
# Fraction of the executor JVM heap size that should be additionally reserved
# for JVM off-heap overhead (thread stacks, native libraries, etc.)
heapOverheadFraction: 0.1
# Amount of CPU memory to reserve for system overhead (kernel, buffers, etc.) in megabytes
systemReserveMB: 2048
# By default set the spark.sql.files.maxPartitionBytes to 512m
maxSqlFilesPartitionsMB: 512
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,3 @@ platform:
shortName: 'qual'
outputDir: qual-tool-output
cleanUp: true