Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring the speedup factor generation to support WholeStageCodegen parsing and environment defaults #493

Merged
merged 2 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions user_tools/custom_speedup_factors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ spark_rapids_user_tools onprem profiling --csv --eventlogs CPU-3k --local_folder
spark_rapids_user_tools onprem profiling --csv --eventlogs GPU-3k --local_folder GPU-3k-profile
```
3. Speedup factor generation
1. Run the speedup factor generation script, passing the CPU and GPU profiler output.
1. Run the speedup factor generation script, passing the CPU and GPU profiler output along with a CSV output filename.
```
python generate_speedup_factors.py --cpu CPU-3k-profile/rapids_4_spark_profile --gpu GPU-3k-profile/rapids_4_spark_profile
python generate_speedup_factors.py --cpu CPU-3k-profile/rapids_4_spark_profile --gpu GPU-3k-profile/rapids_4_spark_profile --output newScores.csv
```

The output will showcase what operators were detected in the benchmarks to be used as custom speedups. You can then update values from the default [operatorsScore.csv](https://github.com/NVIDIA/spark-rapids-tools/blob/dev/core/src/main/resources/operatorsScore.csv) file to create your own version with the custom speedup factors generated by the output.
The script will generate the new scores in the output specified by the `--output` argument.

## Running Workload Qualification with Custom Speedup Factors

Now that you have a custom *operatorsScore.csv* file, you can run the Spark RAPIDS qualification tool using it to get estimations applicable for your environment. Here is the command to run with a custom speedup factor file:
```
spark_rapids_user_tools onprem qualification --speedup-factor-file operatorsScore.csv --eventlogs <CPU-event-logs>
spark_rapids_user_tools onprem qualification --speedup-factor-file newScores.csv --eventlogs <CPU-event-logs>
```
18 changes: 18 additions & 0 deletions user_tools/custom_speedup_factors/defaultScores.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CPUOperator,Score
AggregateInPandasExec,1.2
ArrowEvalPythonExec,1.2
FlatMapGroupsInPandasExec,1.2
MapInPandasExec,1.2
WindowInPandasExec,1.2
KMeans-pyspark,8.86
KMeans-scala,1
PCA-pyspark,2.24
PCA-scala,2.69
LinearRegression-pyspark,2
LinearRegression-scala,1
RandomForestClassifier-pyspark,6.31
RandomForestClassifier-scala,1
RandomForestRegressor-pyspark,3.66
RandomForestRegressor-scala,1
XGBoost-pyspark,1
XGBoost-scala,3.31
97 changes: 68 additions & 29 deletions user_tools/custom_speedup_factors/generate_speedup_factors.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
parser = argparse.ArgumentParser(description="Speedup Factor Analysis")
parser.add_argument("--cpu", type=str, help="Directory of CPU profiler logs", required=True)
parser.add_argument("--gpu", type=str, help="Directory of GPU profiler logs", required=True)
parser.add_argument("--output", type=str, help="Filename for custom speedup factors", required=True)
parser.add_argument("--verbose", action="store_true", help="flag to generate full verbose output for logging raw node results")
parser.add_argument("--chdir", action="store_true", help="flag to change to work dir that's the script located")
args = parser.parse_args()

cpu_dir = args.cpu
gpu_dir = args.gpu
output = args.output
verbose = args.verbose

cpu_stage_log = {}
Expand All @@ -55,13 +57,26 @@
mapping_info = mapping_info.groupby(['SQL Node'])['Child Node'].apply(','.join).reset_index()

# - process sql_plan_metrics_for_application.csv
# - load in "duration" (CPU) or "op time" (GPU)
# - load in "duration" (CPU)
# - replace WholeStageCodegen (CPU only) with list of operators from mapping lookup file
# - mapping_info.parent = sql_times.nodeName
cpu_sql_info = pd.read_csv(cpu_dir + "/" + app + "/sql_plan_metrics_for_application.csv")
cpu_sql_times = cpu_sql_info[cpu_sql_info["name"] == "duration"]
cpu_sql_combined = cpu_sql_times.set_index('nodeName').join(mapping_info.set_index('SQL Node'), how='left')

# - parse WholeStageCodegen durations with child node mapping
cpu_sql_times_df = cpu_sql_combined[['Child Node', 'max_value']]

for index, row in cpu_sql_times_df.iterrows():
operators = str(row['Child Node']).split(',')
duration = row['max_value']/len(operators)/1000.0
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
for operator in operators:
if operator in cpu_stage_log[app_name]:
cpu_stage_log[app_name][operator] = cpu_stage_log[app_name][operator] + duration
else:
cpu_stage_log[app_name][operator] = duration

# - parse top-level execs from sql_to_stage_information.csv
cpu_stage_info = pd.read_csv(cpu_dir + "/" + app + "/sql_to_stage_information.csv")
cpu_stage_times = cpu_stage_info[['Stage Duration', 'SQL Nodes(IDs)']]

Expand Down Expand Up @@ -92,12 +107,8 @@
app_name = app_info.loc[0]["appName"]
gpu_stage_log[app_name] = {}

# - process sql_plan_metrics_for_application.csv
# - load in "duration" (CPU) or "op time" (GPU)
# - mapping_info.parent = sql_times.nodeName
gpu_sql_info = pd.read_csv(gpu_dir + "/" + app + "/sql_plan_metrics_for_application.csv")
gpu_sql_times = gpu_sql_info[gpu_sql_info["name"] == "op time"]

# - process sql_to_stage_information.csv to get stage durations
# - split up duration by operators listed in each stage
gpu_stage_info = pd.read_csv(gpu_dir + "/" + app + "/sql_to_stage_information.csv")
gpu_stage_times = gpu_stage_info[['Stage Duration', 'SQL Nodes(IDs)']]

Expand All @@ -111,41 +122,69 @@
else:
gpu_stage_log[app_name][op_key] = duration

# Sum up SQL operators for each
stage_totals = {}
cpu_stage_totals = {}
gpu_stage_totals = {}
cpu_stage_total = 0.0
gpu_stage_total = 0.0

# Sum up SQL operators for each operator found in CPU and GPU
for app_key in cpu_stage_log:
for op_key in cpu_stage_log[app_key]:
if op_key not in stage_totals:
stage_totals[op_key] = cpu_stage_log[app_key][op_key]
if op_key not in cpu_stage_totals:
cpu_stage_totals[op_key] = cpu_stage_log[app_key][op_key]
else:
stage_totals[op_key] = stage_totals[op_key] + cpu_stage_log[app_key][op_key]
cpu_stage_totals[op_key] = cpu_stage_totals[op_key] + cpu_stage_log[app_key][op_key]
cpu_stage_total = cpu_stage_total + cpu_stage_log[app_key][op_key]


for app_key in gpu_stage_log:
for op_key in gpu_stage_log[app_key]:
if op_key not in stage_totals:
stage_totals[op_key] = gpu_stage_log[app_key][op_key]
if op_key not in gpu_stage_totals:
gpu_stage_totals[op_key] = gpu_stage_log[app_key][op_key]
else:
stage_totals[op_key] = stage_totals[op_key] + gpu_stage_log[app_key][op_key]
gpu_stage_totals[op_key] = gpu_stage_totals[op_key] + gpu_stage_log[app_key][op_key]
gpu_stage_total = gpu_stage_total + gpu_stage_log[app_key][op_key]

# Create dictionary of execs where speedup factors can be calculated
scores_dict = {}
scores_dict["FilterExec"] = str(round(cpu_stage_totals['Filter'] / gpu_stage_totals['GpuFilter'], 2))
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
scores_dict["SortExec"] = str(round(cpu_stage_totals['SortMergeJoin'] / gpu_stage_totals['GpuShuffledHashJoin'], 2))
scores_dict["BroadcastHashJoinExec"] = str(round(cpu_stage_totals['BroadcastHashJoin'] / gpu_stage_totals['GpuBroadcastHashJoin'], 2))
scores_dict["ShuffleExchangeExec"] = str(round(cpu_stage_totals['Exchange'] / gpu_stage_totals['GpuColumnarExchange'], 2))
scores_dict["HashAggregateExec"] = str(round(cpu_stage_totals['HashAggregate'] / gpu_stage_totals['GpuHashAggregate'], 2))
scores_dict["SortMergeJoinExec"] = str(round((cpu_stage_totals['SortMergeJoin']+cpu_stage_totals['Sort']) / (gpu_stage_totals['GpuShuffledHashJoin']+gpu_stage_totals['GpuSort']), 2))
nartal1 marked this conversation as resolved.
Show resolved Hide resolved

overall_speedup = str(round(cpu_stage_total/gpu_stage_total, 2))

# Print out node metrics (if verbose)
if verbose:
print("# Operator metrics ")
for key in stage_totals:
print(key + "," + str(stage_totals[key]))
print("CPU Total," + str(cpu_stage_total))
print("GPU Total," + str(gpu_stage_total))

# Print out speedup factors
print("# Speedup Factors ")
print("FilterExec," + str(round(stage_totals['Filter'] / stage_totals['GpuFilter'], 2)))
print("SortExec," + str(round(stage_totals['SortMergeJoin'] / stage_totals['GpuShuffledHashJoin'], 2)))
print("BroadcastHashJoinExec," + str(round(stage_totals['BroadcastHashJoin'] / stage_totals['GpuBroadcastHashJoin'], 2)))
print("ShuffleExchangeExec," + str(round(stage_totals['Exchange'] / stage_totals['GpuColumnarExchange'], 2)))
print("HashAggregateExec," + str(round(stage_totals['HashAggregate'] / stage_totals['GpuHashAggregate'], 2)))
print("SortMergeJoinExec," + str(round((stage_totals['SortMergeJoin']+stage_totals['Sort']) / (stage_totals['GpuShuffledHashJoin']+stage_totals['GpuSort']), 2)))
print("# CPU Operator Metrics")
for key in cpu_stage_totals:
print(key + " = " + str(cpu_stage_totals[key]))
print("# GPU Operator Metrics")
for key in gpu_stage_totals:
print(key + " = " + str(gpu_stage_totals[key]))
print("# Summary Metrics")
print("CPU Total = " + str(cpu_stage_total))
print("GPU Total = " + str(gpu_stage_total))
print("Overall speedup = " + overall_speedup)

# Print out individual exec speedup factors
print("# Speedup Factors ")
for key in scores_dict:
print(f"{key} = {scores_dict[key]}")

# Load in list of operators and set initial values to default speedup
scores_df = pd.read_csv("operatorsList.csv")
scores_df["Score"] = overall_speedup

# Update operators that are found in benchmark
for key in scores_dict:
scores_df.loc[scores_df['CPUOperator'] == key, 'Score'] = scores_dict[key]

# Add in hard-coded defaults
defaults_df = pd.read_csv("defaultScores.csv")

# Generate output CSV file
final_df = pd.concat([scores_df, defaults_df])
final_df.to_csv(output, index=False)
Loading
Loading