-
Notifications
You must be signed in to change notification settings - Fork 1
/
calculate_dq_label.py
260 lines (221 loc) · 15.3 KB
/
calculate_dq_label.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import numpy as np
import pandas as pd
def calculate_dataset_nutrition_label(df, check_results, settings_dict):
""""Function that calculates the data readiness level based on the check results"""
calculated_scores = {}
rows_df, columns_df = df.shape
#colour scheme for dbc.Progress bars
check_failed = 'danger'
check_warning = 'warning'
check_passed = 'success'
#scoring mechanism DQ_label, categories to subdivide in: Critical issues, moderate issues, minor issues
penalty_points = 0
critical = 3
moderate = 2
minor = 1
exec_sum_string_issues = "but contains "
#for each check result, determine whether the check is passed, if not: penalize
for check_res in check_results:
if check_res == 'df_missing_values': #CRITICAL ISSUE
calculated_scores['missing_values'] = round((100-check_results['df_missing_values']),2)
if calculated_scores['missing_values'] < (100-settings_dict['advanced_settings_missing']):
calculated_scores['missing_values_color'] = check_failed
penalty_points += critical
else:
calculated_scores['missing_values_color'] = check_passed
if calculated_scores['missing_values'] < 100:
exec_sum_string_issues += "{}% missing values, ".format(round(check_results['df_missing_values'],2))
elif check_res == 'df_duplicates': #CRITICAL ISSUE
if 'Check notification' in list(check_results[check_res].columns):
calculated_scores['duplicate_instances'] = 100
total_duplicates = 0
else:
duplicates_df = check_results['df_duplicates']
duplicates_df['amount of duplicates'] = pd.to_numeric(duplicates_df['amount of duplicates'])
total_duplicates = int(duplicates_df['amount of duplicates'].sum())
calculated_scores['duplicate_instances'] = round(((rows_df-total_duplicates)/rows_df)*100,2)
exec_sum_string_issues += "{}% duplicate instances, ".format(round((total_duplicates/rows_df)*100, 2))
if (total_duplicates/rows_df)*100 > settings_dict['advanced_settings_duplicates']:
calculated_scores['duplicate_instances_color'] = check_failed
penalty_points += critical
else:
calculated_scores['duplicate_instances_color'] = check_passed
elif check_res == 'df_duplicate_columns': #CRITICAL ISSUE
if 'Check notification' in list(check_results[check_res].columns):
calculated_scores['duplicate_columns'] = 100
duplicate_cols = 0
else:
duplicate_columns_df = check_results['df_duplicate_columns']
duplicate_cols = len(pd.unique(duplicate_columns_df['Column']))
calculated_scores['duplicate_columns'] = round(((columns_df-duplicate_cols)/columns_df)*100, 2)
exec_sum_string_issues += "{} duplicate column(s), ".format(duplicate_cols)
if duplicate_cols > 0 and settings_dict['advanced_settings_duplicate_columns'] == False: #there are duplicate columns and user does not allow
calculated_scores['duplicate_columns_color'] = check_failed
penalty_points += critical
else:
calculated_scores['duplicate_columns_color'] = check_passed
elif check_res == 'df_amount_of_diff_values': #MINOR ISSUE
single_value_columns = int((check_results['df_amount_of_diff_values'].iloc[0] == '1').sum()) #values in table are strings instead of integers
calculated_scores['amount_of_diff_values'] = round(((columns_df - single_value_columns) / columns_df) * 100,
2)
if single_value_columns > 0 and settings_dict['advanced_settings_single_value'] == False: #there is a redundant column and user does not allow
calculated_scores['amount_of_diff_values_color'] = check_failed
penalty_points += minor
else:
calculated_scores['amount_of_diff_values_color'] = check_passed
if single_value_columns > 0:
exec_sum_string_issues += "{} redundant SV column(s), ".format(single_value_columns)
elif check_res == 'df_mixed_data_types': #MODERATE ISSUE
mixed_data_types_df = check_results['df_mixed_data_types']
first_row_numeric = pd.to_numeric(mixed_data_types_df.iloc[0], errors='coerce') #convert strings to numeric
mixed_columns = int(((first_row_numeric > 0) & (first_row_numeric < 1)).sum()) #check if columns contain mixed datatypes
calculated_scores['mixed_data_types'] = round(((columns_df-mixed_columns)/columns_df)*100,2)
if mixed_columns > 0 and settings_dict['advanced_settings_mixed_data_types'] == False: #there are columns with mixed dtypes and user does not
calculated_scores['mixed_data_types_color'] = check_failed
penalty_points += moderate
else:
calculated_scores['mixed_data_types_color'] = check_passed
if mixed_columns > 0:
exec_sum_string_issues += "{} mixed data type column(s), ".format(mixed_columns)
elif check_res == 'df_special_characters': #MINOR ISSUE
if 'Check notification' in list(check_results[check_res].columns):
calculated_scores['special_characters'] = 100
special_characters_df = []
else:
special_characters_df = check_results['df_special_characters']
calculated_scores['special_characters'] = round(((columns_df-len(special_characters_df))/columns_df)*100,2)
if len(special_characters_df) > 0 and settings_dict['advanced_settings_mixed_data_types'] == False: #there are special characters and user does not allow
calculated_scores['special_characters_color'] = check_failed
penalty_points += minor
else:
calculated_scores['special_characters_color'] = check_passed #no special characters or they are allowed by user
if len(special_characters_df) > 0:
exec_sum_string_issues += "{} column(s) with special character only samples, ".format(len(special_characters_df))
elif check_res == 'df_string_mismatch': #MINOR ISSUE
if 'Check notification' in list(check_results[check_res].columns):
calculated_scores['string_mismatch'] = 100
calculated_scores['string_mismatch_color'] = check_passed
else:
columns_with_stringmismatch = check_results[check_res]['Column Name'].nunique()
calculated_scores['string_mismatch'] = round(((columns_df-columns_with_stringmismatch)/columns_df)*100, 2)
exec_sum_string_issues += "{} column(s) with string mismatches, ".format(columns_with_stringmismatch)
if settings_dict['advanced_settings_string_mismatch'] == False:
calculated_scores['string_mismatch_color'] = check_failed
penalty_points += moderate
elif settings_dict['advanced_settings_string_mismatch'] == True: #then string mismatched are allowed by user
calculated_scores['string_mismatch_color'] = check_passed
elif check_res == 'df_outliers': #MINOR ISSUE
if 'Check notification' in list(check_results[check_res].columns):
calculated_scores['outliers'] = 100
outliers_df = []
else:
if rows_df > 10000:
rows_df = 10000 #since outliers are based on 10.000 samples at most due to computational complexity
outliers_df = check_results['df_outliers']
calculated_scores['outliers'] = round(((rows_df-len(outliers_df))/rows_df)*100,2)
exec_sum_string_issues += "{}% outlier instances, ".format(round((len(outliers_df)/rows_df)*100,2))
if (len(outliers_df)/rows_df)*100 > settings_dict['advanced_settings_outliers']:
calculated_scores['outliers_color'] = check_failed
penalty_points += minor
else:
calculated_scores['outliers_color'] = check_passed
rows_df = len(df) #set the correct value again
elif check_res == 'df_feature_feature_correlation': #MINOR ISSUE
feature_feature_correlation_df = check_results['df_feature_feature_correlation'].copy()
if 'Column' in feature_feature_correlation_df.columns:
feature_feature_correlation_df.drop(columns=['Column'], inplace=True)
upper = feature_feature_correlation_df.where(np.triu(np.ones(feature_feature_correlation_df.shape), k=1).astype(bool))
upper = upper.apply(pd.to_numeric, errors='coerce')
high_corr_cols = [column for column in upper.columns if (
any(upper[column] > settings_dict['advanced_settings_correlation']) or any(upper[column] < -1*settings_dict['advanced_settings_correlation']))] # Get the columns with high correlation
very_high_corr_cols = [column for column in upper.columns if (
any(upper[column] > 0.9) or any(
upper[column] < -0.9))] #only used for executive summary report
if very_high_corr_cols:
exec_sum_string_issues += "{} highly correlated columns, ".format(len(very_high_corr_cols))
correlation_dict = {}
#find with which columns it is highly correlated
for col in high_corr_cols:
correlated_with = upper[col][upper[col] > settings_dict['advanced_settings_correlation']].index.tolist()
correlation_dict[col] = correlated_with
calculated_scores['feature_correlations'] = round(((columns_df-(len(high_corr_cols)))/columns_df)*100,2)
if len(high_corr_cols) > 0:
calculated_scores['feature_correlations_color'] = check_failed
penalty_points += minor
else:
calculated_scores['feature_correlations_color'] = check_passed
elif check_res == 'df_feature_label_correlation': #NOT TAKEN INTO DQ LABEL
if 'Check notification' in list(check_results[check_res].columns):
calculated_scores['feature_label_correlation'] = 100
calculated_scores['feature_label_correlation_color'] = check_passed
else:
feature_label_correlation_df = check_results['df_feature_label_correlation']
high_corr_cols = [column for column in feature_label_correlation_df.columns if float(feature_label_correlation_df[column]) > settings_dict['advanced_settings_correlation']]#[column for column in feature_label_correlation_df.columns if any(float(feature_label_correlation_df[column]) > feature_correlation_threshold)]
very_high_corr_cols = [column for column in feature_label_correlation_df.columns if
float(feature_label_correlation_df[column]) > 0.8] #only used for executive summary
if very_high_corr_cols:
exec_sum_string_issues += "{} highly correlated column(s) to the target feature, ".format(len(very_high_corr_cols))
calculated_scores['feature_label_correlation'] = round(((columns_df-(len(high_corr_cols)))/columns_df)*100,2)
if len(high_corr_cols) > 0:
calculated_scores['feature_label_correlation_color'] = check_warning
else:
calculated_scores['feature_label_correlation_color'] = check_passed
elif check_res == 'df_class_imbalance': #NOT TAKEN INTO DQ LABEL
if 'Check notification' in list(check_results[check_res].columns):
calculated_scores['class_imbalance'] = 100
calculated_scores['class_imbalance_color'] = check_passed
else:
class_imbalance_df = check_results['df_class_imbalance']
max_value = float(class_imbalance_df.max().max())
min_value = float(class_imbalance_df.min().min())
if max_value == 0:
ratio_min_max = 0
else:
ratio_min_max = round((min_value / max_value), 4)
calculated_scores['class_imbalance'] = round((ratio_min_max * 100))
if calculated_scores['class_imbalance'] <= 10:
calculated_scores['class_imbalance_color'] = check_warning
exec_sum_string_issues += "highly imbalanced target classes (ratio lowest/highest: {}), ". format(calculated_scores['class_imbalance'])
else:
calculated_scores['class_imbalance_color'] = check_passed
elif check_res == 'df_conflicting_labels': #MODERATE ISSUE
if 'Check notification' in list(check_results[check_res].columns):
calculated_scores['conflicting_labels'] = 100
total_conflicting_labels = 0
else:
conflicting_labels_df = check_results['df_conflicting_labels']
conflicting_labels_df['Conflicting'] = conflicting_labels_df['Instances'].str.count(',') + 1
total_conflicting_labels = int(conflicting_labels_df['Conflicting'].sum())
calculated_scores['conflicting_labels'] = round((1 - (total_conflicting_labels/rows_df))*100,2)
exec_sum_string_issues += "{}% conflicting labels, ".format(round((total_conflicting_labels/rows_df)*100,2))
if total_conflicting_labels > 0 and settings_dict['advanced_settings_conflicting_labels'] == False: #there are conflicting label and user does not allow them
calculated_scores['conflicting_labels_color'] = check_failed
penalty_points += moderate
else: #there are no conflictin labels, or user allows them
calculated_scores['conflicting_labels_color'] = check_passed
if exec_sum_string_issues == "but contains ":
exec_sum_string_issues = "and no data quality issues were detected!"
else:
exec_sum_string_issues = exec_sum_string_issues[:-2] + '.'
#sum up penalty points to determine Data readiness label
if penalty_points < 3:
DQ_label = 'A'
exec_sum_string_quality = "The dataset is of very high quality overall, "
elif penalty_points >= 3 and penalty_points <6:
DQ_label = 'B'
exec_sum_string_quality = "The dataset is of high quality overall, "
elif penalty_points >= 6 and penalty_points <9:
DQ_label = 'C'
exec_sum_string_quality = "The dataset is of decent quality overall, "
elif penalty_points >= 9 and penalty_points <12:
DQ_label = 'D'
exec_sum_string_quality = "The dataset is of low quality overall, "
exec_sum_string_issues = exec_sum_string_issues.replace('but', 'and') #sounds more logical
elif penalty_points >= 12:
DQ_label = 'E'
exec_sum_string_quality = "The dataset is of very low quality overall, "
exec_sum_string_issues = exec_sum_string_issues.replace('but', 'and') #sounds more logical
if exec_sum_string_issues == "but contains ":
exec_sum_string_issues = "and no data quality issues were detected by the tool!"
exec_sum_string = exec_sum_string_quality + exec_sum_string_issues
return calculated_scores, DQ_label, exec_sum_string