diff --git a/user_tools/src/spark_rapids_pytools/rapids/bootstrap.py b/user_tools/src/spark_rapids_pytools/rapids/bootstrap.py index 2dc8f22fa..8e2b68efb 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/bootstrap.py +++ b/user_tools/src/spark_rapids_pytools/rapids/bootstrap.py @@ -16,7 +16,7 @@ from dataclasses import dataclass -from spark_rapids_pytools.cloud_api.sp_types import ClusterBase, NodeHWInfo +from spark_rapids_pytools.cloud_api.sp_types import ClusterBase from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.common.utilities import Utils from spark_rapids_pytools.rapids.rapids_tool import RapidsTool @@ -36,51 +36,6 @@ def _process_custom_args(self): def requires_cluster_connection(self) -> bool: return True - def __calculate_spark_settings(self, worker_info: NodeHWInfo) -> dict: - """ - Calculate the cluster properties that we need to append to the /etc/defaults of the spark - if necessary. - :param worker_info: the hardware info as extracted from the worker. Note that we assume - that all the workers have the same configurations. - :return: dictionary containing 7 spark properties to be set by default on the cluster. - """ - num_gpus = worker_info.gpu_info.num_gpus - gpu_mem = worker_info.gpu_info.gpu_mem - num_cpus = worker_info.sys_info.num_cpus - cpu_mem = worker_info.sys_info.cpu_mem - - constants = self.ctxt.get_value('local', 'clusterConfigs', 'constants') - executors_per_node = num_gpus - num_executor_cores = max(1, num_cpus // executors_per_node) - gpu_concurrent_tasks = min(constants.get('maxGpuConcurrent'), gpu_mem // constants.get('gpuMemPerTaskMB')) - # account for system overhead - usable_worker_mem = max(0, cpu_mem - constants.get('systemReserveMB')) - executor_container_mem = usable_worker_mem // executors_per_node - # reserve 10% of heap as memory overhead - max_executor_heap = max(0, int(executor_container_mem * (1 - constants.get('heapOverheadFraction')))) - # give up to 2GB of heap to each executor core - executor_heap = min(max_executor_heap, constants.get('heapPerCoreMB') * num_executor_cores) - executor_mem_overhead = int(executor_heap * constants.get('heapOverheadFraction')) - # use default for pageable_pool to add to memory overhead - pageable_pool = constants.get('defaultPageablePoolMB') - # pinned memory uses any unused space up to 4GB - pinned_mem = min(constants.get('maxPinnedMemoryMB'), - executor_container_mem - executor_heap - executor_mem_overhead - pageable_pool) - executor_mem_overhead += pinned_mem + pageable_pool - res = { - 'spark.executor.cores': num_executor_cores, - 'spark.executor.memory': f'{executor_heap}m', - 'spark.executor.memoryOverhead': f'{executor_mem_overhead}m', - 'spark.rapids.sql.concurrentGpuTasks': gpu_concurrent_tasks, - 'spark.rapids.memory.pinnedPool.size': f'{pinned_mem}m', - 'spark.sql.files.maxPartitionBytes': f'{constants.get("maxSqlFilesPartitionsMB")}m', - 'spark.task.resource.gpu.amount': 1 / num_executor_cores, - 'spark.rapids.shuffle.multiThreaded.reader.threads': num_executor_cores, - 'spark.rapids.shuffle.multiThreaded.writer.threads': num_executor_cores, - 'spark.rapids.sql.multiThreadedRead.numThreads': max(20, num_executor_cores) - } - return res - def _run_rapids_tool(self): """ Run the bootstrap on the driver node @@ -91,7 +46,7 @@ def _run_rapids_tool(self): worker_hw_info = exec_cluster.get_worker_hw_info() self.logger.debug('Worker hardware INFO %s', worker_hw_info) try: - spark_settings = self.__calculate_spark_settings(worker_info=worker_hw_info) + spark_settings = self._calculate_spark_settings(worker_info=worker_hw_info) self.ctxt.set_ctxt('bootstrap_results', spark_settings) self.logger.debug('%s Tool finished calculating recommended Apache Spark configurations for cluster %s: %s', self.pretty_name(), diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index f9a305743..7dbf206cd 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -22,7 +22,7 @@ import pandas as pd from tabulate import tabulate -from spark_rapids_pytools.cloud_api.sp_types import EnumeratedType, ClusterReshape +from spark_rapids_pytools.cloud_api.sp_types import EnumeratedType, ClusterReshape, NodeHWInfo from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator from spark_rapids_pytools.pricing.price_provider import SavingsEstimator @@ -175,7 +175,8 @@ def format_float(x: float) -> str: report_content.extend(f' - {line}' for line in self.comments) if self.sections_generators: for section_generator in self.sections_generators: - report_content.append(Utils.gen_multiline_str(section_generator())) + if section_generator: + report_content.append(Utils.gen_multiline_str(section_generator())) if self.has_gpu_recommendation(): csp_report = csp_report_provider() if csp_report: @@ -208,6 +209,16 @@ def _process_cpu_cluster_args(self, offline_cluster_opts: dict = None): self.ctxt.set_ctxt('cpuClusterProxy', cpu_cluster_obj) def _process_gpu_cluster_args(self, offline_cluster_opts: dict = None) -> bool: + def _process_gpu_cluster_worker_node(): + try: + worker_node = gpu_cluster_obj.get_worker_node() + worker_node._pull_and_set_mc_props(cli=self.ctxt.platform.cli) # pylint: disable=protected-access + sys_info = worker_node._pull_sys_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access + gpu_info = worker_node._pull_gpu_hw_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access + worker_node.hw_info = NodeHWInfo(sys_info=sys_info, gpu_info=gpu_info) + except Exception: # pylint: disable=broad-except + return + gpu_cluster_arg = offline_cluster_opts.get('gpuCluster') if gpu_cluster_arg: gpu_cluster_obj = self._create_migration_cluster('GPU', gpu_cluster_arg) @@ -219,6 +230,12 @@ def _process_gpu_cluster_args(self, offline_cluster_opts: dict = None) -> bool: self.logger.info('Creating GPU cluster by converting the CPU cluster instances to GPU supported types') gpu_cluster_obj = self.ctxt.platform.migrate_cluster_to_gpu(orig_cluster) self.ctxt.set_ctxt('gpuClusterProxy', gpu_cluster_obj) + + _process_gpu_cluster_worker_node() + worker_node_hw_info = gpu_cluster_obj.get_worker_hw_info() + if gpu_cluster_obj: + self.ctxt.set_ctxt('recommendedConfigs', self._calculate_spark_settings(worker_node_hw_info)) + return gpu_cluster_obj is not None def _process_offline_cluster_args(self): @@ -413,6 +430,26 @@ def __generate_mc_types_conversion_report(self): report_content.append(self.ctxt.platform.get_footer_message()) return report_content + def __generate_recommended_configs_report(self) -> list: + report_content = [] + if self.ctxt.get_ctxt('recommendedConfigs'): + conversion_items = [] + recommended_configs = self.ctxt.get_ctxt('recommendedConfigs') + for config in recommended_configs: + conversion_items.append([config, recommended_configs[config]]) + report_content.append(tabulate(conversion_items)) + # the report should be appended to the log_summary file + rapids_output_dir = self.ctxt.get_rapids_output_folder() + rapids_log_file = FSUtil.build_path(rapids_output_dir, + self.ctxt.get_value('toolOutput', 'textFormat', 'summaryLog', + 'fileName')) + with open(rapids_log_file, 'a', encoding='UTF-8') as summary_log_file: + log_report = [Utils.gen_report_sec_header('Recommended Spark configurations for running on GPUs', + hrule=False)] + log_report.extend(report_content) + summary_log_file.write(Utils.gen_multiline_str(log_report)) + return report_content + def __generate_cluster_shape_report(self) -> str: if bool(self.ctxt.platform.ctxt['notes']): return Utils.gen_multiline_str(self.ctxt.platform.ctxt['notes'].get('clusterShape')) @@ -595,7 +632,7 @@ def __build_global_report_summary(self, per_row_flag) df_final_result = apps_working_set if not apps_working_set.empty: - self.logger.info('Generating GPU Estimated Speedup and Savings as %s', csv_out) + self.logger.info('Generating GPU Estimated Speedup and Savings as: %s', csv_out) # we can use the general format as well but this will transform numbers to E+. So, stick with %f apps_working_set.to_csv(csv_out, float_format='%.2f') else: @@ -603,7 +640,7 @@ def __build_global_report_summary(self, if not apps_reshaped_df.empty: # Do not include estimated job frequency in csv file apps_reshaped_df = apps_reshaped_df.drop(columns=['Estimated Job Frequency (monthly)']) - self.logger.info('Generating GPU Estimated Speedup as %s', csv_out) + self.logger.info('Generating GPU Estimated Speedup: as %s', csv_out) apps_reshaped_df.to_csv(csv_out, float_format='%.2f') return QualificationSummary(comments=report_comments, @@ -735,6 +772,8 @@ def _generate_section_lines(self, sec_conf: dict) -> List[str]: script_content = gpu_cluster.generate_bootstrap_script(overridden_args=override_args) highlighted_code = TemplateGenerator.highlight_bash_code(script_content) return ['```bash', highlighted_code, '```', ''] + if sec_conf.get('sectionID') == 'gpuBootstrapRecommendedConfigs': + return self.__generate_recommended_configs_report() return super()._generate_section_content(sec_conf) diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index 06f370ca5..a90b51d10 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -27,7 +27,8 @@ from typing import Any, Callable, Dict, List from spark_rapids_pytools.cloud_api.sp_types import CloudPlatform, get_platform, \ - ClusterBase, DeployMode + ClusterBase, DeployMode, NodeHWInfo +from spark_rapids_pytools.common.prop_manager import YAMLPropertiesContainer from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.common.utilities import ToolLogging, Utils from spark_rapids_pytools.rapids.rapids_job import RapidsJobPropContainer @@ -317,6 +318,52 @@ def _generate_platform_report_sections(self) -> List[str]: return rep_lines return None + def _calculate_spark_settings(self, worker_info: NodeHWInfo) -> dict: + """ + Calculate the cluster properties that we need to append to the /etc/defaults of the spark + if necessary. + :param worker_info: the hardware info as extracted from the worker. Note that we assume + that all the workers have the same configurations. + :return: dictionary containing 7 spark properties to be set by default on the cluster. + """ + num_gpus = worker_info.gpu_info.num_gpus + gpu_mem = worker_info.gpu_info.gpu_mem + num_cpus = worker_info.sys_info.num_cpus + cpu_mem = worker_info.sys_info.cpu_mem + + config_path = Utils.resource_path('cluster-configs.yaml') + constants = YAMLPropertiesContainer(prop_arg=config_path).get_value('clusterConfigs', 'constants') + executors_per_node = num_gpus + num_executor_cores = max(1, num_cpus // executors_per_node) + gpu_concurrent_tasks = min(constants.get('maxGpuConcurrent'), gpu_mem // constants.get('gpuMemPerTaskMB')) + # account for system overhead + usable_worker_mem = max(0, cpu_mem - constants.get('systemReserveMB')) + executor_container_mem = usable_worker_mem // executors_per_node + # reserve 10% of heap as memory overhead + max_executor_heap = max(0, int(executor_container_mem * (1 - constants.get('heapOverheadFraction')))) + # give up to 2GB of heap to each executor core + executor_heap = min(max_executor_heap, constants.get('heapPerCoreMB') * num_executor_cores) + executor_mem_overhead = int(executor_heap * constants.get('heapOverheadFraction')) + # use default for pageable_pool to add to memory overhead + pageable_pool = constants.get('defaultPageablePoolMB') + # pinned memory uses any unused space up to 4GB + pinned_mem = min(constants.get('maxPinnedMemoryMB'), + executor_container_mem - executor_heap - executor_mem_overhead - pageable_pool) + executor_mem_overhead += pinned_mem + pageable_pool + res = { + 'spark.executor.cores': num_executor_cores, + 'spark.executor.memory': f'{executor_heap}m', + 'spark.executor.memoryOverhead': f'{executor_mem_overhead}m', + 'spark.rapids.sql.concurrentGpuTasks': gpu_concurrent_tasks, + 'spark.rapids.memory.pinnedPool.size': f'{pinned_mem}m', + 'spark.sql.files.maxPartitionBytes': f'{constants.get("maxSqlFilesPartitionsMB")}m', + 'spark.task.resource.gpu.amount': 1 / num_executor_cores, + 'spark.rapids.shuffle.multiThreaded.reader.threads': num_executor_cores, + 'spark.rapids.shuffle.multiThreaded.writer.threads': num_executor_cores, + 'spark.rapids.sql.multiThreadedRead.numThreads': max(20, num_executor_cores) + } + return res + @dataclass class RapidsJarTool(RapidsTool): diff --git a/user_tools/src/spark_rapids_pytools/resources/bootstrap-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/bootstrap-conf.yaml index bebb0aa9a..ee24afdd4 100644 --- a/user_tools/src/spark_rapids_pytools/resources/bootstrap-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/bootstrap-conf.yaml @@ -8,25 +8,3 @@ local: cleanUp: true # Name of the file where the final result is going to show fileName: rapids_4_dataproc_bootstrap_output.log - clusterConfigs: - constants: - # Maximum amount of pinned memory to use per executor in megabytes - maxPinnedMemoryMB: 4096 - # Default pageable pool size per executor in megabytes - defaultPageablePoolMB: 1024 - # Maximum number of concurrent tasks to run on the GPU - maxGpuConcurrent: 4 - # Amount of GPU memory to use per concurrent task in megabytes - # Using a bit less than 8GB here since Dataproc clusters advertise - # T4s as only having around 14.75 GB and we want to run with - # 2 concurrent by default on T4s. - gpuMemPerTaskMB: 7500 - # Ideal amount of JVM heap memory to request per CPU core in megabytes - heapPerCoreMB: 2048 - # Fraction of the executor JVM heap size that should be additionally reserved - # for JVM off-heap overhead (thread stacks, native libraries, etc.) - heapOverheadFraction: 0.1 - # Amount of CPU memory to reserve for system overhead (kernel, buffers, etc.) in megabytes - systemReserveMB: 2048 - # By default set the spark.sql.files.maxPartitionBytes to 512m - maxSqlFilesPartitionsMB: 512 diff --git a/user_tools/src/spark_rapids_pytools/resources/cluster-configs.yaml b/user_tools/src/spark_rapids_pytools/resources/cluster-configs.yaml new file mode 100644 index 000000000..700c181a5 --- /dev/null +++ b/user_tools/src/spark_rapids_pytools/resources/cluster-configs.yaml @@ -0,0 +1,22 @@ +clusterConfigs: + constants: + # Maximum amount of pinned memory to use per executor in megabytes + maxPinnedMemoryMB: 4096 + # Default pageable pool size per executor in megabytes + defaultPageablePoolMB: 1024 + # Maximum number of concurrent tasks to run on the GPU + maxGpuConcurrent: 4 + # Amount of GPU memory to use per concurrent task in megabytes + # Using a bit less than 8GB here since Dataproc clusters advertise + # T4s as only having around 14.75 GB and we want to run with + # 2 concurrent by default on T4s. + gpuMemPerTaskMB: 7500 + # Ideal amount of JVM heap memory to request per CPU core in megabytes + heapPerCoreMB: 2048 + # Fraction of the executor JVM heap size that should be additionally reserved + # for JVM off-heap overhead (thread stacks, native libraries, etc.) + heapOverheadFraction: 0.1 + # Amount of CPU memory to reserve for system overhead (kernel, buffers, etc.) in megabytes + systemReserveMB: 2048 + # By default set the spark.sql.files.maxPartitionBytes to 512m + maxSqlFilesPartitionsMB: 512 diff --git a/user_tools/src/spark_rapids_pytools/resources/dataproc-configs.json b/user_tools/src/spark_rapids_pytools/resources/dataproc-configs.json index 9dec7d63e..3e9234f7a 100644 --- a/user_tools/src/spark_rapids_pytools/resources/dataproc-configs.json +++ b/user_tools/src/spark_rapids_pytools/resources/dataproc-configs.json @@ -224,14 +224,29 @@ } }, { - "sectionID": "runUserToolsBootstrap", + "sectionID": "gpuBootstrapRecommendedConfigs", "requiresBoolFlag": "enableSavingsCalculations", + "sectionName": "Recommended Spark configurations for running on GPUs", + "content": { + "header": [ + "", + "For the new GPU-accelerated cluster with RAPIDS Accelerator for Apache Spark,", + " it is recommended to set the following Spark configurations:", + "" + ] + } + }, + { + "sectionID": "runUserToolsBootstrap", + "requiresBoolFlag": "DISABLED", + "sectionName": "Regenerating recommended configurations for an existing GPU-Cluster", "content": { "header": [ "", - "Once the cluster is created, run the Bootstrap tool to provide optimized", - "RAPIDS Accelerator for Apache Spark configs based on GPU cluster shape.", - "Notes:", + "To generate the recommended configurations on an existing GPU-Cluster,", + " re-run the Bootstrap tool to provide optimized RAPIDS Accelerator", + " for Apache Spark configs based on GPU cluster shape.", + " Notes:", " - Overriding the Apache Spark default configurations on the cluster", " requires SSH access.", " - If SSH access is unavailable, you can still dump the recommended", diff --git a/user_tools/src/spark_rapids_pytools/resources/emr-configs.json b/user_tools/src/spark_rapids_pytools/resources/emr-configs.json index 648abbac9..c4bfd3d98 100644 --- a/user_tools/src/spark_rapids_pytools/resources/emr-configs.json +++ b/user_tools/src/spark_rapids_pytools/resources/emr-configs.json @@ -254,14 +254,29 @@ } }, { - "sectionID": "runUserToolsBootstrap", + "sectionID": "gpuBootstrapRecommendedConfigs", "requiresBoolFlag": "enableSavingsCalculations", + "sectionName": "Recommended Spark configurations for running on GPUs", + "content": { + "header": [ + "", + "For the new GPU-accelerated cluster with RAPIDS Accelerator for Apache Spark,", + " it is recommended to set the following Spark configurations:", + "" + ] + } + }, + { + "sectionID": "runUserToolsBootstrap", + "requiresBoolFlag": "DISABLED", + "sectionName": "Regenerating recommended configurations for an existing GPU-Cluster", "content": { "header": [ "", - "Once the cluster is created, run the Bootstrap tool to provide optimized", - "RAPIDS Accelerator for Apache Spark configs based on GPU cluster shape.", - "Notes:", + "To generate the recommended configurations on an existing GPU-Cluster,", + " re-run the Bootstrap tool to provide optimized RAPIDS Accelerator", + " for Apache Spark configs based on GPU cluster shape.", + " Notes:", " - Overriding the Apache Spark default configurations on the cluster", " requires SSH access.", " - If SSH access is unavailable, you can still dump the recommended", diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index 811ba86eb..5f7474bd0 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -1,6 +1,9 @@ toolOutput: completeOutput: true subFolder: rapids_4_spark_qualification_output + textFormat: + summaryLog: + fileName: rapids_4_spark_qualification_output.log csv: summaryReport: fileName: rapids_4_spark_qualification_output.csv @@ -160,4 +163,3 @@ platform: shortName: 'qual' outputDir: qual-tool-output cleanUp: true -