From 4bfc0c1174782da334a1222d9a2a3325cebc28d5 Mon Sep 17 00:00:00 2001 From: Cindy Jiang Date: Tue, 18 Jul 2023 17:05:38 -0700 Subject: [PATCH 1/7] added recommended bootstrap configs in qualification output Signed-off-by: Cindy Jiang --- .../rapids/qualification.py | 79 ++++++++++++++++++- .../resources/qualification-conf.yaml | 22 ++++++ 2 files changed, 98 insertions(+), 3 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index f9a305743..fd716de35 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -22,7 +22,7 @@ import pandas as pd from tabulate import tabulate -from spark_rapids_pytools.cloud_api.sp_types import EnumeratedType, ClusterReshape +from spark_rapids_pytools.cloud_api.sp_types import EnumeratedType, ClusterReshape, NodeHWInfo from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator from spark_rapids_pytools.pricing.price_provider import SavingsEstimator @@ -175,7 +175,8 @@ def format_float(x: float) -> str: report_content.extend(f' - {line}' for line in self.comments) if self.sections_generators: for section_generator in self.sections_generators: - report_content.append(Utils.gen_multiline_str(section_generator())) + if section_generator: + report_content.append(Utils.gen_multiline_str(section_generator())) if self.has_gpu_recommendation(): csp_report = csp_report_provider() if csp_report: @@ -192,6 +193,51 @@ class Qualification(RapidsJarTool): """ name = 'qualification' + def __calculate_spark_settings(self, worker_info: NodeHWInfo) -> dict: + """ + Calculate the cluster properties that we need to append to the /etc/defaults of the spark + if necessary. + :param worker_info: the hardware info as extracted from the worker. Note that we assume + that all the workers have the same configurations. + :return: dictionary containing 7 spark properties to be set by default on the cluster. + """ + num_gpus = worker_info.gpu_info.num_gpus + gpu_mem = worker_info.gpu_info.gpu_mem + num_cpus = worker_info.sys_info.num_cpus + cpu_mem = worker_info.sys_info.cpu_mem + + constants = self.ctxt.get_value('local', 'clusterConfigs', 'constants') + executors_per_node = num_gpus + num_executor_cores = max(1, num_cpus // executors_per_node) + gpu_concurrent_tasks = min(constants.get('maxGpuConcurrent'), gpu_mem // constants.get('gpuMemPerTaskMB')) + # account for system overhead + usable_worker_mem = max(0, cpu_mem - constants.get('systemReserveMB')) + executor_container_mem = usable_worker_mem // executors_per_node + # reserve 10% of heap as memory overhead + max_executor_heap = max(0, int(executor_container_mem * (1 - constants.get('heapOverheadFraction')))) + # give up to 2GB of heap to each executor core + executor_heap = min(max_executor_heap, constants.get('heapPerCoreMB') * num_executor_cores) + executor_mem_overhead = int(executor_heap * constants.get('heapOverheadFraction')) + # use default for pageable_pool to add to memory overhead + pageable_pool = constants.get('defaultPageablePoolMB') + # pinned memory uses any unused space up to 4GB + pinned_mem = min(constants.get('maxPinnedMemoryMB'), + executor_container_mem - executor_heap - executor_mem_overhead - pageable_pool) + executor_mem_overhead += pinned_mem + pageable_pool + res = { + 'spark.executor.cores': num_executor_cores, + 'spark.executor.memory': f'{executor_heap}m', + 'spark.executor.memoryOverhead': f'{executor_mem_overhead}m', + 'spark.rapids.sql.concurrentGpuTasks': gpu_concurrent_tasks, + 'spark.rapids.memory.pinnedPool.size': f'{pinned_mem}m', + 'spark.sql.files.maxPartitionBytes': f'{constants.get("maxSqlFilesPartitionsMB")}m', + 'spark.task.resource.gpu.amount': 1 / num_executor_cores, + 'spark.rapids.shuffle.multiThreaded.reader.threads': num_executor_cores, + 'spark.rapids.shuffle.multiThreaded.writer.threads': num_executor_cores, + 'spark.rapids.sql.multiThreadedRead.numThreads': max(20, num_executor_cores) + } + return res + def _process_rapids_args(self): """ Qualification tool processes extra arguments: @@ -208,6 +254,13 @@ def _process_cpu_cluster_args(self, offline_cluster_opts: dict = None): self.ctxt.set_ctxt('cpuClusterProxy', cpu_cluster_obj) def _process_gpu_cluster_args(self, offline_cluster_opts: dict = None) -> bool: + def _process_gpu_cluster_worker_node(): + worker_node = gpu_cluster_obj.get_worker_node() + worker_node._pull_and_set_mc_props(cli=self.ctxt.platform.cli) # pylint: disable=protected-access + sys_info = worker_node._pull_sys_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access + gpu_info = worker_node._pull_gpu_hw_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access + worker_node.hw_info = NodeHWInfo(sys_info=sys_info, gpu_info=gpu_info) + gpu_cluster_arg = offline_cluster_opts.get('gpuCluster') if gpu_cluster_arg: gpu_cluster_obj = self._create_migration_cluster('GPU', gpu_cluster_arg) @@ -219,6 +272,12 @@ def _process_gpu_cluster_args(self, offline_cluster_opts: dict = None) -> bool: self.logger.info('Creating GPU cluster by converting the CPU cluster instances to GPU supported types') gpu_cluster_obj = self.ctxt.platform.migrate_cluster_to_gpu(orig_cluster) self.ctxt.set_ctxt('gpuClusterProxy', gpu_cluster_obj) + + _process_gpu_cluster_worker_node() + worker_node_hw_info = gpu_cluster_obj.get_worker_hw_info() + if gpu_cluster_obj: + self.ctxt.set_ctxt('recommendedConfigs', self.__calculate_spark_settings(worker_node_hw_info)) + return gpu_cluster_obj is not None def _process_offline_cluster_args(self): @@ -413,6 +472,19 @@ def __generate_mc_types_conversion_report(self): report_content.append(self.ctxt.platform.get_footer_message()) return report_content + def __generate_recommended_configs_report(self): + report_content = [] + if self.ctxt.get_ctxt('recommendedConfigs'): + report_content = [ + Utils.gen_report_sec_header('Recommended Spark configurations', hrule=False), + ] + conversion_items = [] + recommended_configs = self.ctxt.get_ctxt('recommendedConfigs') + for config in recommended_configs: + conversion_items.append([config, recommended_configs[config]]) + report_content.append(tabulate(conversion_items)) + return report_content + def __generate_cluster_shape_report(self) -> str: if bool(self.ctxt.platform.ctxt['notes']): return Utils.gen_multiline_str(self.ctxt.platform.ctxt['notes'].get('clusterShape')) @@ -612,7 +684,8 @@ def __build_global_report_summary(self, savings_report_flag=launch_savings_calc, df_result=df_final_result, irrelevant_speedups=speedups_irrelevant_flag, - sections_generators=[self.__generate_mc_types_conversion_report]) + sections_generators=[self.__generate_mc_types_conversion_report, + self.__generate_recommended_configs_report]) def _process_output(self): def process_df_for_stdout(raw_df): diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index 811ba86eb..940ac5454 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -156,6 +156,28 @@ local: - '^(\.+).*' - '^(\$+).*' - '^.+(_\$folder\$)$' + clusterConfigs: + constants: + # Maximum amount of pinned memory to use per executor in megabytes + maxPinnedMemoryMB: 4096 + # Default pageable pool size per executor in megabytes + defaultPageablePoolMB: 1024 + # Maximum number of concurrent tasks to run on the GPU + maxGpuConcurrent: 4 + # Amount of GPU memory to use per concurrent task in megabytes + # Using a bit less than 8GB here since Dataproc clusters advertise + # T4s as only having around 14.75 GB and we want to run with + # 2 concurrent by default on T4s. + gpuMemPerTaskMB: 7500 + # Ideal amount of JVM heap memory to request per CPU core in megabytes + heapPerCoreMB: 2048 + # Fraction of the executor JVM heap size that should be additionally reserved + # for JVM off-heap overhead (thread stacks, native libraries, etc.) + heapOverheadFraction: 0.1 + # Amount of CPU memory to reserve for system overhead (kernel, buffers, etc.) in megabytes + systemReserveMB: 2048 + # By default set the spark.sql.files.maxPartitionBytes to 512m + maxSqlFilesPartitionsMB: 512 platform: shortName: 'qual' outputDir: qual-tool-output From ebf1e94bfcc727c2a0b592a5bf95b59a62afcd26 Mon Sep 17 00:00:00 2001 From: Cindy Jiang Date: Wed, 19 Jul 2023 15:21:27 -0700 Subject: [PATCH 2/7] added recommended spark configs in summary log file Signed-off-by: Cindy Jiang --- .../rapids/qualification.py | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index fd716de35..782e940fd 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -255,11 +255,14 @@ def _process_cpu_cluster_args(self, offline_cluster_opts: dict = None): def _process_gpu_cluster_args(self, offline_cluster_opts: dict = None) -> bool: def _process_gpu_cluster_worker_node(): - worker_node = gpu_cluster_obj.get_worker_node() - worker_node._pull_and_set_mc_props(cli=self.ctxt.platform.cli) # pylint: disable=protected-access - sys_info = worker_node._pull_sys_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access - gpu_info = worker_node._pull_gpu_hw_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access - worker_node.hw_info = NodeHWInfo(sys_info=sys_info, gpu_info=gpu_info) + try: + worker_node = gpu_cluster_obj.get_worker_node() + worker_node._pull_and_set_mc_props(cli=self.ctxt.platform.cli) # pylint: disable=protected-access + sys_info = worker_node._pull_sys_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access + gpu_info = worker_node._pull_gpu_hw_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access + worker_node.hw_info = NodeHWInfo(sys_info=sys_info, gpu_info=gpu_info) + except Exception: # pylint: disable=broad-except + return gpu_cluster_arg = offline_cluster_opts.get('gpuCluster') if gpu_cluster_arg: @@ -472,7 +475,7 @@ def __generate_mc_types_conversion_report(self): report_content.append(self.ctxt.platform.get_footer_message()) return report_content - def __generate_recommended_configs_report(self): + def __generate_recommended_configs_report(self) -> list: report_content = [] if self.ctxt.get_ctxt('recommendedConfigs'): report_content = [ @@ -630,6 +633,10 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series: def __build_global_report_summary(self, all_apps: pd.DataFrame, csv_out: str) -> QualificationSummary: + def get_summary_log_file_path(): + output_dir = csv_out.split('qualification_summary.csv')[0] + return output_dir + 'rapids_4_spark_qualification_output/rapids_4_spark_qualification_output.log' + if all_apps.empty: # No need to run saving estimator or process the data frames. return QualificationSummary(comments=self.__generate_mc_types_conversion_report()) @@ -667,7 +674,7 @@ def __build_global_report_summary(self, per_row_flag) df_final_result = apps_working_set if not apps_working_set.empty: - self.logger.info('Generating GPU Estimated Speedup and Savings as %s', csv_out) + self.logger.info('Generating GPU Estimated Speedup and Savings as: %s', csv_out) # we can use the general format as well but this will transform numbers to E+. So, stick with %f apps_working_set.to_csv(csv_out, float_format='%.2f') else: @@ -675,9 +682,14 @@ def __build_global_report_summary(self, if not apps_reshaped_df.empty: # Do not include estimated job frequency in csv file apps_reshaped_df = apps_reshaped_df.drop(columns=['Estimated Job Frequency (monthly)']) - self.logger.info('Generating GPU Estimated Speedup as %s', csv_out) + self.logger.info('Generating GPU Estimated Speedup: as %s', csv_out) apps_reshaped_df.to_csv(csv_out, float_format='%.2f') + # add recommended Spark configurations to the summary log file + with open(get_summary_log_file_path(), 'a', encoding='UTF-8') as summary_log_file: + recommended_configs = Utils.gen_multiline_str(self.__generate_recommended_configs_report()) + summary_log_file.write(recommended_configs) + return QualificationSummary(comments=report_comments, all_apps=apps_pruned_df, recommended_apps=recommended_apps, From edca055a1ba706975eb62e8e9505af1a71630827 Mon Sep 17 00:00:00 2001 From: Cindy Jiang Date: Fri, 21 Jul 2023 16:15:48 -0700 Subject: [PATCH 3/7] moved _calculate_spark_settings to RapidsTool class Signed-off-by: Cindy Jiang --- .../spark_rapids_pytools/rapids/bootstrap.py | 49 +------------------ .../rapids/qualification.py | 48 +----------------- .../rapids/rapids_tool.py | 47 +++++++++++++++++- 3 files changed, 50 insertions(+), 94 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/bootstrap.py b/user_tools/src/spark_rapids_pytools/rapids/bootstrap.py index 2dc8f22fa..5954a981f 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/bootstrap.py +++ b/user_tools/src/spark_rapids_pytools/rapids/bootstrap.py @@ -16,7 +16,7 @@ from dataclasses import dataclass -from spark_rapids_pytools.cloud_api.sp_types import ClusterBase, NodeHWInfo +from spark_rapids_pytools.cloud_api.sp_types import ClusterBase from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.common.utilities import Utils from spark_rapids_pytools.rapids.rapids_tool import RapidsTool @@ -36,51 +36,6 @@ def _process_custom_args(self): def requires_cluster_connection(self) -> bool: return True - def __calculate_spark_settings(self, worker_info: NodeHWInfo) -> dict: - """ - Calculate the cluster properties that we need to append to the /etc/defaults of the spark - if necessary. - :param worker_info: the hardware info as extracted from the worker. Note that we assume - that all the workers have the same configurations. - :return: dictionary containing 7 spark properties to be set by default on the cluster. - """ - num_gpus = worker_info.gpu_info.num_gpus - gpu_mem = worker_info.gpu_info.gpu_mem - num_cpus = worker_info.sys_info.num_cpus - cpu_mem = worker_info.sys_info.cpu_mem - - constants = self.ctxt.get_value('local', 'clusterConfigs', 'constants') - executors_per_node = num_gpus - num_executor_cores = max(1, num_cpus // executors_per_node) - gpu_concurrent_tasks = min(constants.get('maxGpuConcurrent'), gpu_mem // constants.get('gpuMemPerTaskMB')) - # account for system overhead - usable_worker_mem = max(0, cpu_mem - constants.get('systemReserveMB')) - executor_container_mem = usable_worker_mem // executors_per_node - # reserve 10% of heap as memory overhead - max_executor_heap = max(0, int(executor_container_mem * (1 - constants.get('heapOverheadFraction')))) - # give up to 2GB of heap to each executor core - executor_heap = min(max_executor_heap, constants.get('heapPerCoreMB') * num_executor_cores) - executor_mem_overhead = int(executor_heap * constants.get('heapOverheadFraction')) - # use default for pageable_pool to add to memory overhead - pageable_pool = constants.get('defaultPageablePoolMB') - # pinned memory uses any unused space up to 4GB - pinned_mem = min(constants.get('maxPinnedMemoryMB'), - executor_container_mem - executor_heap - executor_mem_overhead - pageable_pool) - executor_mem_overhead += pinned_mem + pageable_pool - res = { - 'spark.executor.cores': num_executor_cores, - 'spark.executor.memory': f'{executor_heap}m', - 'spark.executor.memoryOverhead': f'{executor_mem_overhead}m', - 'spark.rapids.sql.concurrentGpuTasks': gpu_concurrent_tasks, - 'spark.rapids.memory.pinnedPool.size': f'{pinned_mem}m', - 'spark.sql.files.maxPartitionBytes': f'{constants.get("maxSqlFilesPartitionsMB")}m', - 'spark.task.resource.gpu.amount': 1 / num_executor_cores, - 'spark.rapids.shuffle.multiThreaded.reader.threads': num_executor_cores, - 'spark.rapids.shuffle.multiThreaded.writer.threads': num_executor_cores, - 'spark.rapids.sql.multiThreadedRead.numThreads': max(20, num_executor_cores) - } - return res - def _run_rapids_tool(self): """ Run the bootstrap on the driver node @@ -91,7 +46,7 @@ def _run_rapids_tool(self): worker_hw_info = exec_cluster.get_worker_hw_info() self.logger.debug('Worker hardware INFO %s', worker_hw_info) try: - spark_settings = self.__calculate_spark_settings(worker_info=worker_hw_info) + spark_settings = super()._calculate_spark_settings(worker_info=worker_hw_info) self.ctxt.set_ctxt('bootstrap_results', spark_settings) self.logger.debug('%s Tool finished calculating recommended Apache Spark configurations for cluster %s: %s', self.pretty_name(), diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 782e940fd..1aee3c0bb 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -193,51 +193,6 @@ class Qualification(RapidsJarTool): """ name = 'qualification' - def __calculate_spark_settings(self, worker_info: NodeHWInfo) -> dict: - """ - Calculate the cluster properties that we need to append to the /etc/defaults of the spark - if necessary. - :param worker_info: the hardware info as extracted from the worker. Note that we assume - that all the workers have the same configurations. - :return: dictionary containing 7 spark properties to be set by default on the cluster. - """ - num_gpus = worker_info.gpu_info.num_gpus - gpu_mem = worker_info.gpu_info.gpu_mem - num_cpus = worker_info.sys_info.num_cpus - cpu_mem = worker_info.sys_info.cpu_mem - - constants = self.ctxt.get_value('local', 'clusterConfigs', 'constants') - executors_per_node = num_gpus - num_executor_cores = max(1, num_cpus // executors_per_node) - gpu_concurrent_tasks = min(constants.get('maxGpuConcurrent'), gpu_mem // constants.get('gpuMemPerTaskMB')) - # account for system overhead - usable_worker_mem = max(0, cpu_mem - constants.get('systemReserveMB')) - executor_container_mem = usable_worker_mem // executors_per_node - # reserve 10% of heap as memory overhead - max_executor_heap = max(0, int(executor_container_mem * (1 - constants.get('heapOverheadFraction')))) - # give up to 2GB of heap to each executor core - executor_heap = min(max_executor_heap, constants.get('heapPerCoreMB') * num_executor_cores) - executor_mem_overhead = int(executor_heap * constants.get('heapOverheadFraction')) - # use default for pageable_pool to add to memory overhead - pageable_pool = constants.get('defaultPageablePoolMB') - # pinned memory uses any unused space up to 4GB - pinned_mem = min(constants.get('maxPinnedMemoryMB'), - executor_container_mem - executor_heap - executor_mem_overhead - pageable_pool) - executor_mem_overhead += pinned_mem + pageable_pool - res = { - 'spark.executor.cores': num_executor_cores, - 'spark.executor.memory': f'{executor_heap}m', - 'spark.executor.memoryOverhead': f'{executor_mem_overhead}m', - 'spark.rapids.sql.concurrentGpuTasks': gpu_concurrent_tasks, - 'spark.rapids.memory.pinnedPool.size': f'{pinned_mem}m', - 'spark.sql.files.maxPartitionBytes': f'{constants.get("maxSqlFilesPartitionsMB")}m', - 'spark.task.resource.gpu.amount': 1 / num_executor_cores, - 'spark.rapids.shuffle.multiThreaded.reader.threads': num_executor_cores, - 'spark.rapids.shuffle.multiThreaded.writer.threads': num_executor_cores, - 'spark.rapids.sql.multiThreadedRead.numThreads': max(20, num_executor_cores) - } - return res - def _process_rapids_args(self): """ Qualification tool processes extra arguments: @@ -279,7 +234,8 @@ def _process_gpu_cluster_worker_node(): _process_gpu_cluster_worker_node() worker_node_hw_info = gpu_cluster_obj.get_worker_hw_info() if gpu_cluster_obj: - self.ctxt.set_ctxt('recommendedConfigs', self.__calculate_spark_settings(worker_node_hw_info)) + self.ctxt.set_ctxt('recommendedConfigs', + super(RapidsJarTool, self)._calculate_spark_settings(worker_node_hw_info)) return gpu_cluster_obj is not None diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index 06f370ca5..25e5b63a3 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -27,7 +27,7 @@ from typing import Any, Callable, Dict, List from spark_rapids_pytools.cloud_api.sp_types import CloudPlatform, get_platform, \ - ClusterBase, DeployMode + ClusterBase, DeployMode, NodeHWInfo from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.common.utilities import ToolLogging, Utils from spark_rapids_pytools.rapids.rapids_job import RapidsJobPropContainer @@ -317,6 +317,51 @@ def _generate_platform_report_sections(self) -> List[str]: return rep_lines return None + def _calculate_spark_settings(self, worker_info: NodeHWInfo) -> dict: + """ + Calculate the cluster properties that we need to append to the /etc/defaults of the spark + if necessary. + :param worker_info: the hardware info as extracted from the worker. Note that we assume + that all the workers have the same configurations. + :return: dictionary containing 7 spark properties to be set by default on the cluster. + """ + num_gpus = worker_info.gpu_info.num_gpus + gpu_mem = worker_info.gpu_info.gpu_mem + num_cpus = worker_info.sys_info.num_cpus + cpu_mem = worker_info.sys_info.cpu_mem + + constants = self.ctxt.get_value('local', 'clusterConfigs', 'constants') + executors_per_node = num_gpus + num_executor_cores = max(1, num_cpus // executors_per_node) + gpu_concurrent_tasks = min(constants.get('maxGpuConcurrent'), gpu_mem // constants.get('gpuMemPerTaskMB')) + # account for system overhead + usable_worker_mem = max(0, cpu_mem - constants.get('systemReserveMB')) + executor_container_mem = usable_worker_mem // executors_per_node + # reserve 10% of heap as memory overhead + max_executor_heap = max(0, int(executor_container_mem * (1 - constants.get('heapOverheadFraction')))) + # give up to 2GB of heap to each executor core + executor_heap = min(max_executor_heap, constants.get('heapPerCoreMB') * num_executor_cores) + executor_mem_overhead = int(executor_heap * constants.get('heapOverheadFraction')) + # use default for pageable_pool to add to memory overhead + pageable_pool = constants.get('defaultPageablePoolMB') + # pinned memory uses any unused space up to 4GB + pinned_mem = min(constants.get('maxPinnedMemoryMB'), + executor_container_mem - executor_heap - executor_mem_overhead - pageable_pool) + executor_mem_overhead += pinned_mem + pageable_pool + res = { + 'spark.executor.cores': num_executor_cores, + 'spark.executor.memory': f'{executor_heap}m', + 'spark.executor.memoryOverhead': f'{executor_mem_overhead}m', + 'spark.rapids.sql.concurrentGpuTasks': gpu_concurrent_tasks, + 'spark.rapids.memory.pinnedPool.size': f'{pinned_mem}m', + 'spark.sql.files.maxPartitionBytes': f'{constants.get("maxSqlFilesPartitionsMB")}m', + 'spark.task.resource.gpu.amount': 1 / num_executor_cores, + 'spark.rapids.shuffle.multiThreaded.reader.threads': num_executor_cores, + 'spark.rapids.shuffle.multiThreaded.writer.threads': num_executor_cores, + 'spark.rapids.sql.multiThreadedRead.numThreads': max(20, num_executor_cores) + } + return res + @dataclass class RapidsJarTool(RapidsTool): From 58f79da84daa4ccfea292b2a207582a512b720d9 Mon Sep 17 00:00:00 2001 From: Cindy Jiang Date: Fri, 21 Jul 2023 16:36:09 -0700 Subject: [PATCH 4/7] minor fix for calling inherited method Signed-off-by: Cindy Jiang --- user_tools/src/spark_rapids_pytools/rapids/bootstrap.py | 2 +- user_tools/src/spark_rapids_pytools/rapids/qualification.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/bootstrap.py b/user_tools/src/spark_rapids_pytools/rapids/bootstrap.py index 5954a981f..8e2b68efb 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/bootstrap.py +++ b/user_tools/src/spark_rapids_pytools/rapids/bootstrap.py @@ -46,7 +46,7 @@ def _run_rapids_tool(self): worker_hw_info = exec_cluster.get_worker_hw_info() self.logger.debug('Worker hardware INFO %s', worker_hw_info) try: - spark_settings = super()._calculate_spark_settings(worker_info=worker_hw_info) + spark_settings = self._calculate_spark_settings(worker_info=worker_hw_info) self.ctxt.set_ctxt('bootstrap_results', spark_settings) self.logger.debug('%s Tool finished calculating recommended Apache Spark configurations for cluster %s: %s', self.pretty_name(), diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 1aee3c0bb..4649d969d 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -234,8 +234,7 @@ def _process_gpu_cluster_worker_node(): _process_gpu_cluster_worker_node() worker_node_hw_info = gpu_cluster_obj.get_worker_hw_info() if gpu_cluster_obj: - self.ctxt.set_ctxt('recommendedConfigs', - super(RapidsJarTool, self)._calculate_spark_settings(worker_node_hw_info)) + self.ctxt.set_ctxt('recommendedConfigs', self._calculate_spark_settings(worker_node_hw_info)) return gpu_cluster_obj is not None From 837409c2b5b6b8a8423cc129909869ce3396f88c Mon Sep 17 00:00:00 2001 From: Cindy Jiang Date: Mon, 24 Jul 2023 16:15:34 -0700 Subject: [PATCH 5/7] moved contants in a common file for bootstrap and qualification tools Signed-off-by: Cindy Jiang --- .../rapids/qualification.py | 2 +- .../rapids/rapids_tool.py | 4 +++- .../resources/bootstrap-conf.yaml | 22 ------------------ .../resources/cluster-configs.yaml | 22 ++++++++++++++++++ .../resources/qualification-conf.yaml | 23 ------------------- 5 files changed, 26 insertions(+), 47 deletions(-) create mode 100644 user_tools/src/spark_rapids_pytools/resources/cluster-configs.yaml diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 4649d969d..068317a15 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -434,7 +434,7 @@ def __generate_recommended_configs_report(self) -> list: report_content = [] if self.ctxt.get_ctxt('recommendedConfigs'): report_content = [ - Utils.gen_report_sec_header('Recommended Spark configurations', hrule=False), + Utils.gen_report_sec_header('Recommended Spark configurations for running on GPUs', hrule=False), ] conversion_items = [] recommended_configs = self.ctxt.get_ctxt('recommendedConfigs') diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index 25e5b63a3..a90b51d10 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -28,6 +28,7 @@ from spark_rapids_pytools.cloud_api.sp_types import CloudPlatform, get_platform, \ ClusterBase, DeployMode, NodeHWInfo +from spark_rapids_pytools.common.prop_manager import YAMLPropertiesContainer from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.common.utilities import ToolLogging, Utils from spark_rapids_pytools.rapids.rapids_job import RapidsJobPropContainer @@ -330,7 +331,8 @@ def _calculate_spark_settings(self, worker_info: NodeHWInfo) -> dict: num_cpus = worker_info.sys_info.num_cpus cpu_mem = worker_info.sys_info.cpu_mem - constants = self.ctxt.get_value('local', 'clusterConfigs', 'constants') + config_path = Utils.resource_path('cluster-configs.yaml') + constants = YAMLPropertiesContainer(prop_arg=config_path).get_value('clusterConfigs', 'constants') executors_per_node = num_gpus num_executor_cores = max(1, num_cpus // executors_per_node) gpu_concurrent_tasks = min(constants.get('maxGpuConcurrent'), gpu_mem // constants.get('gpuMemPerTaskMB')) diff --git a/user_tools/src/spark_rapids_pytools/resources/bootstrap-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/bootstrap-conf.yaml index bebb0aa9a..ee24afdd4 100644 --- a/user_tools/src/spark_rapids_pytools/resources/bootstrap-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/bootstrap-conf.yaml @@ -8,25 +8,3 @@ local: cleanUp: true # Name of the file where the final result is going to show fileName: rapids_4_dataproc_bootstrap_output.log - clusterConfigs: - constants: - # Maximum amount of pinned memory to use per executor in megabytes - maxPinnedMemoryMB: 4096 - # Default pageable pool size per executor in megabytes - defaultPageablePoolMB: 1024 - # Maximum number of concurrent tasks to run on the GPU - maxGpuConcurrent: 4 - # Amount of GPU memory to use per concurrent task in megabytes - # Using a bit less than 8GB here since Dataproc clusters advertise - # T4s as only having around 14.75 GB and we want to run with - # 2 concurrent by default on T4s. - gpuMemPerTaskMB: 7500 - # Ideal amount of JVM heap memory to request per CPU core in megabytes - heapPerCoreMB: 2048 - # Fraction of the executor JVM heap size that should be additionally reserved - # for JVM off-heap overhead (thread stacks, native libraries, etc.) - heapOverheadFraction: 0.1 - # Amount of CPU memory to reserve for system overhead (kernel, buffers, etc.) in megabytes - systemReserveMB: 2048 - # By default set the spark.sql.files.maxPartitionBytes to 512m - maxSqlFilesPartitionsMB: 512 diff --git a/user_tools/src/spark_rapids_pytools/resources/cluster-configs.yaml b/user_tools/src/spark_rapids_pytools/resources/cluster-configs.yaml new file mode 100644 index 000000000..700c181a5 --- /dev/null +++ b/user_tools/src/spark_rapids_pytools/resources/cluster-configs.yaml @@ -0,0 +1,22 @@ +clusterConfigs: + constants: + # Maximum amount of pinned memory to use per executor in megabytes + maxPinnedMemoryMB: 4096 + # Default pageable pool size per executor in megabytes + defaultPageablePoolMB: 1024 + # Maximum number of concurrent tasks to run on the GPU + maxGpuConcurrent: 4 + # Amount of GPU memory to use per concurrent task in megabytes + # Using a bit less than 8GB here since Dataproc clusters advertise + # T4s as only having around 14.75 GB and we want to run with + # 2 concurrent by default on T4s. + gpuMemPerTaskMB: 7500 + # Ideal amount of JVM heap memory to request per CPU core in megabytes + heapPerCoreMB: 2048 + # Fraction of the executor JVM heap size that should be additionally reserved + # for JVM off-heap overhead (thread stacks, native libraries, etc.) + heapOverheadFraction: 0.1 + # Amount of CPU memory to reserve for system overhead (kernel, buffers, etc.) in megabytes + systemReserveMB: 2048 + # By default set the spark.sql.files.maxPartitionBytes to 512m + maxSqlFilesPartitionsMB: 512 diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index 940ac5454..f30a35c80 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -156,30 +156,7 @@ local: - '^(\.+).*' - '^(\$+).*' - '^.+(_\$folder\$)$' - clusterConfigs: - constants: - # Maximum amount of pinned memory to use per executor in megabytes - maxPinnedMemoryMB: 4096 - # Default pageable pool size per executor in megabytes - defaultPageablePoolMB: 1024 - # Maximum number of concurrent tasks to run on the GPU - maxGpuConcurrent: 4 - # Amount of GPU memory to use per concurrent task in megabytes - # Using a bit less than 8GB here since Dataproc clusters advertise - # T4s as only having around 14.75 GB and we want to run with - # 2 concurrent by default on T4s. - gpuMemPerTaskMB: 7500 - # Ideal amount of JVM heap memory to request per CPU core in megabytes - heapPerCoreMB: 2048 - # Fraction of the executor JVM heap size that should be additionally reserved - # for JVM off-heap overhead (thread stacks, native libraries, etc.) - heapOverheadFraction: 0.1 - # Amount of CPU memory to reserve for system overhead (kernel, buffers, etc.) in megabytes - systemReserveMB: 2048 - # By default set the spark.sql.files.maxPartitionBytes to 512m - maxSqlFilesPartitionsMB: 512 platform: shortName: 'qual' outputDir: qual-tool-output cleanUp: true - From ea9d265abbc2710b1de2926bd13d753cb2ad0ce4 Mon Sep 17 00:00:00 2001 From: Cindy Jiang Date: Tue, 25 Jul 2023 17:57:40 -0700 Subject: [PATCH 6/7] remove bootstrap recommendation for db platforms Signed-off-by: Cindy Jiang --- user_tools/src/spark_rapids_pytools/rapids/qualification.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 068317a15..e4f8652e6 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -432,6 +432,9 @@ def __generate_mc_types_conversion_report(self): def __generate_recommended_configs_report(self) -> list: report_content = [] + # TODO: add bootstrap configs support for databricks platforms + if 'databricks' in self.ctxt.platform.get_platform_name(): + return report_content if self.ctxt.get_ctxt('recommendedConfigs'): report_content = [ Utils.gen_report_sec_header('Recommended Spark configurations for running on GPUs', hrule=False), From 1202cf18768aed9e7cd00e3b03a390572568872d Mon Sep 17 00:00:00 2001 From: Cindy Jiang Date: Wed, 26 Jul 2023 14:39:15 -0700 Subject: [PATCH 7/7] disable bootstrap recommendations in DB runs Signed-off-by: Cindy Jiang --- .../rapids/qualification.py | 30 ++++++++----------- .../resources/dataproc-configs.json | 23 +++++++++++--- .../resources/emr-configs.json | 23 +++++++++++--- .../resources/qualification-conf.yaml | 3 ++ 4 files changed, 54 insertions(+), 25 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index e4f8652e6..7dbf206cd 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -432,18 +432,22 @@ def __generate_mc_types_conversion_report(self): def __generate_recommended_configs_report(self) -> list: report_content = [] - # TODO: add bootstrap configs support for databricks platforms - if 'databricks' in self.ctxt.platform.get_platform_name(): - return report_content if self.ctxt.get_ctxt('recommendedConfigs'): - report_content = [ - Utils.gen_report_sec_header('Recommended Spark configurations for running on GPUs', hrule=False), - ] conversion_items = [] recommended_configs = self.ctxt.get_ctxt('recommendedConfigs') for config in recommended_configs: conversion_items.append([config, recommended_configs[config]]) report_content.append(tabulate(conversion_items)) + # the report should be appended to the log_summary file + rapids_output_dir = self.ctxt.get_rapids_output_folder() + rapids_log_file = FSUtil.build_path(rapids_output_dir, + self.ctxt.get_value('toolOutput', 'textFormat', 'summaryLog', + 'fileName')) + with open(rapids_log_file, 'a', encoding='UTF-8') as summary_log_file: + log_report = [Utils.gen_report_sec_header('Recommended Spark configurations for running on GPUs', + hrule=False)] + log_report.extend(report_content) + summary_log_file.write(Utils.gen_multiline_str(log_report)) return report_content def __generate_cluster_shape_report(self) -> str: @@ -591,10 +595,6 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series: def __build_global_report_summary(self, all_apps: pd.DataFrame, csv_out: str) -> QualificationSummary: - def get_summary_log_file_path(): - output_dir = csv_out.split('qualification_summary.csv')[0] - return output_dir + 'rapids_4_spark_qualification_output/rapids_4_spark_qualification_output.log' - if all_apps.empty: # No need to run saving estimator or process the data frames. return QualificationSummary(comments=self.__generate_mc_types_conversion_report()) @@ -643,19 +643,13 @@ def get_summary_log_file_path(): self.logger.info('Generating GPU Estimated Speedup: as %s', csv_out) apps_reshaped_df.to_csv(csv_out, float_format='%.2f') - # add recommended Spark configurations to the summary log file - with open(get_summary_log_file_path(), 'a', encoding='UTF-8') as summary_log_file: - recommended_configs = Utils.gen_multiline_str(self.__generate_recommended_configs_report()) - summary_log_file.write(recommended_configs) - return QualificationSummary(comments=report_comments, all_apps=apps_pruned_df, recommended_apps=recommended_apps, savings_report_flag=launch_savings_calc, df_result=df_final_result, irrelevant_speedups=speedups_irrelevant_flag, - sections_generators=[self.__generate_mc_types_conversion_report, - self.__generate_recommended_configs_report]) + sections_generators=[self.__generate_mc_types_conversion_report]) def _process_output(self): def process_df_for_stdout(raw_df): @@ -778,6 +772,8 @@ def _generate_section_lines(self, sec_conf: dict) -> List[str]: script_content = gpu_cluster.generate_bootstrap_script(overridden_args=override_args) highlighted_code = TemplateGenerator.highlight_bash_code(script_content) return ['```bash', highlighted_code, '```', ''] + if sec_conf.get('sectionID') == 'gpuBootstrapRecommendedConfigs': + return self.__generate_recommended_configs_report() return super()._generate_section_content(sec_conf) diff --git a/user_tools/src/spark_rapids_pytools/resources/dataproc-configs.json b/user_tools/src/spark_rapids_pytools/resources/dataproc-configs.json index 9dec7d63e..3e9234f7a 100644 --- a/user_tools/src/spark_rapids_pytools/resources/dataproc-configs.json +++ b/user_tools/src/spark_rapids_pytools/resources/dataproc-configs.json @@ -224,14 +224,29 @@ } }, { - "sectionID": "runUserToolsBootstrap", + "sectionID": "gpuBootstrapRecommendedConfigs", "requiresBoolFlag": "enableSavingsCalculations", + "sectionName": "Recommended Spark configurations for running on GPUs", + "content": { + "header": [ + "", + "For the new GPU-accelerated cluster with RAPIDS Accelerator for Apache Spark,", + " it is recommended to set the following Spark configurations:", + "" + ] + } + }, + { + "sectionID": "runUserToolsBootstrap", + "requiresBoolFlag": "DISABLED", + "sectionName": "Regenerating recommended configurations for an existing GPU-Cluster", "content": { "header": [ "", - "Once the cluster is created, run the Bootstrap tool to provide optimized", - "RAPIDS Accelerator for Apache Spark configs based on GPU cluster shape.", - "Notes:", + "To generate the recommended configurations on an existing GPU-Cluster,", + " re-run the Bootstrap tool to provide optimized RAPIDS Accelerator", + " for Apache Spark configs based on GPU cluster shape.", + " Notes:", " - Overriding the Apache Spark default configurations on the cluster", " requires SSH access.", " - If SSH access is unavailable, you can still dump the recommended", diff --git a/user_tools/src/spark_rapids_pytools/resources/emr-configs.json b/user_tools/src/spark_rapids_pytools/resources/emr-configs.json index 648abbac9..c4bfd3d98 100644 --- a/user_tools/src/spark_rapids_pytools/resources/emr-configs.json +++ b/user_tools/src/spark_rapids_pytools/resources/emr-configs.json @@ -254,14 +254,29 @@ } }, { - "sectionID": "runUserToolsBootstrap", + "sectionID": "gpuBootstrapRecommendedConfigs", "requiresBoolFlag": "enableSavingsCalculations", + "sectionName": "Recommended Spark configurations for running on GPUs", + "content": { + "header": [ + "", + "For the new GPU-accelerated cluster with RAPIDS Accelerator for Apache Spark,", + " it is recommended to set the following Spark configurations:", + "" + ] + } + }, + { + "sectionID": "runUserToolsBootstrap", + "requiresBoolFlag": "DISABLED", + "sectionName": "Regenerating recommended configurations for an existing GPU-Cluster", "content": { "header": [ "", - "Once the cluster is created, run the Bootstrap tool to provide optimized", - "RAPIDS Accelerator for Apache Spark configs based on GPU cluster shape.", - "Notes:", + "To generate the recommended configurations on an existing GPU-Cluster,", + " re-run the Bootstrap tool to provide optimized RAPIDS Accelerator", + " for Apache Spark configs based on GPU cluster shape.", + " Notes:", " - Overriding the Apache Spark default configurations on the cluster", " requires SSH access.", " - If SSH access is unavailable, you can still dump the recommended", diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index f30a35c80..5f7474bd0 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -1,6 +1,9 @@ toolOutput: completeOutput: true subFolder: rapids_4_spark_qualification_output + textFormat: + summaryLog: + fileName: rapids_4_spark_qualification_output.log csv: summaryReport: fileName: rapids_4_spark_qualification_output.csv