From 2410cf5aa30cb4a20839beb129c61cfd8bda47d5 Mon Sep 17 00:00:00 2001 From: vuillaut Date: Thu, 6 Jan 2022 18:26:12 +0100 Subject: [PATCH] apply sourcery for streamline code --- ctaplot/ana/ana.py | 53 +++---- ctaplot/gammaboard/gammaboard.py | 259 ++++++++++++++++--------------- ctaplot/gammaboard/reader.py | 3 +- ctaplot/plots/plots.py | 2 +- 4 files changed, 158 insertions(+), 159 deletions(-) diff --git a/ctaplot/ana/ana.py b/ctaplot/ana/ana.py index a6f3c1b..b32274f 100644 --- a/ctaplot/ana/ana.py +++ b/ctaplot/ana/ana.py @@ -222,19 +222,18 @@ def get_effective_area(self, observation_time=50 * u.h): ------- `numpy.ndarray`, `numpy.ndarray` """ - if observation_time == 50 * u.h: - if self.site in _south_site_names: - energy, effective_area = np.loadtxt(ds.get('cta_requirements_South-30m-EffectiveArea.dat'), - unpack=True) - elif self.site in _north_site_names: - energy, effective_area = np.loadtxt(ds.get('cta_requirements_North-30m-EffectiveArea.dat'), - unpack=True) - else: - raise ValueError( - f'incorrect site specified, accepted values are {_north_site_names} or {_south_site_names}') - else: + if observation_time != 50 * u.h: raise ValueError(f"no effective area for an observation time of {observation_time}") + if self.site in _south_site_names: + energy, effective_area = np.loadtxt(ds.get('cta_requirements_South-30m-EffectiveArea.dat'), + unpack=True) + elif self.site in _north_site_names: + energy, effective_area = np.loadtxt(ds.get('cta_requirements_North-30m-EffectiveArea.dat'), + unpack=True) + else: + raise ValueError( + f'incorrect site specified, accepted values are {_north_site_names} or {_south_site_names}') self.energy = energy * u.TeV self.effective_area = effective_area * u.m return self.energy, self.effective_area @@ -264,16 +263,15 @@ def get_energy_resolution(self): @u.quantity_input(observation_time=u.h) def get_sensitivity(self, observation_time=50 * u.h): - if observation_time == 50 * u.h: - if self.site in _south_site_names: - energy, sensitivity = np.loadtxt(ds.get('cta_requirements_South-50h.dat'), unpack=True) - elif self.site in _north_site_names: - energy, sensitivity = np.loadtxt(ds.get('cta_requirements_North-50h.dat'), unpack=True) - else: - raise ValueError( - f'incorrect site specified, accepted values are {_north_site_names} or {_south_site_names}') - else: + if observation_time != 50 * u.h: raise ValueError(f"no sensitivity for an observation time of {observation_time}") + if self.site in _south_site_names: + energy, sensitivity = np.loadtxt(ds.get('cta_requirements_South-50h.dat'), unpack=True) + elif self.site in _north_site_names: + energy, sensitivity = np.loadtxt(ds.get('cta_requirements_North-50h.dat'), unpack=True) + else: + raise ValueError( + f'incorrect site specified, accepted values are {_north_site_names} or {_south_site_names}') self.energy = energy * u.TeV self.sensitivity = sensitivity * u.erg / (u.cm ** 2 * u.s) return self.energy, self.sensitivity @@ -933,13 +931,12 @@ def _percentile(x, percentile=68.27): ------- float """ - if len(x) == 0: - if isinstance(x, u.Quantity): - return 0 * x.unit - else: - return 0 - else: + if len(x) != 0: return np.percentile(x, percentile) + if isinstance(x, u.Quantity): + return 0 * x.unit + else: + return 0 @u.quantity_input(alt1=u.rad, az1=u.rad, alt2=u.rad, az2=u.rad) @@ -966,9 +963,7 @@ def angular_separation_altaz(alt1, az1, alt2, az2): cosdelta[cosdelta > 1] = 1. cosdelta[cosdelta < -1] = -1. - ang_sep = np.arccos(cosdelta) * u.rad - - return ang_sep + return np.arccos(cosdelta) * u.rad def logbin_mean(x_bin): diff --git a/ctaplot/gammaboard/gammaboard.py b/ctaplot/gammaboard/gammaboard.py index 8cabbc1..1ae273c 100644 --- a/ctaplot/gammaboard/gammaboard.py +++ b/ctaplot/gammaboard/gammaboard.py @@ -67,10 +67,7 @@ def load_data_from_h5(experiment, experiments_directory): from .reader import read_params from astropy.table import vstack result_files = find_data_files(experiment, experiments_directory) - result_data = [] - for r_file in tqdm(result_files): - result_data.append(read_params(r_file)) - + result_data = [read_params(r_file) for r_file in tqdm(result_files)] return vstack(result_data) @@ -353,18 +350,17 @@ def plot_angular_resolution(self, ax=None): self.set_plotted(True) def plot_angular_resolution_reco(self, ax=None): - if self.get_loaded(): - if self.reco_gamma_data is not None: - self.ax_ang_res = plots.plot_angular_resolution_per_energy(self.reco_gamma_data['true_altitude'].quantity, - self.reco_gamma_data['reco_altitude'].quantity, - self.reco_gamma_data['true_azimuth'].quantity, - self.reco_gamma_data['reco_azimuth'].quantity, - self.reco_gamma_data['true_energy'].quantity, - bias_correction=self.bias_correction, - ax=ax, - label=self.name + '_reco', - color=self.color, - ) + if self.get_loaded() and self.reco_gamma_data is not None: + self.ax_ang_res = plots.plot_angular_resolution_per_energy(self.reco_gamma_data['true_altitude'].quantity, + self.reco_gamma_data['reco_altitude'].quantity, + self.reco_gamma_data['true_azimuth'].quantity, + self.reco_gamma_data['reco_azimuth'].quantity, + self.reco_gamma_data['true_energy'].quantity, + bias_correction=self.bias_correction, + ax=ax, + label=self.name + '_reco', + color=self.color, + ) def update_angular_resolution_reco(self, ax): for c in ax.containers: @@ -384,15 +380,14 @@ def plot_energy_resolution(self, ax=None): self.set_plotted(True) def plot_energy_resolution_reco(self, ax=None): - if self.get_loaded(): - if self.reco_gamma_data is not None: - self.ax_ene_res = plots.plot_energy_resolution(self.reco_gamma_data['true_energy'].quantity, - self.reco_gamma_data['reco_energy'].quantity, - bias_correction=self.bias_correction, - ax=ax, - label=self.name + '_reco', - color=self.color, - ) + if self.get_loaded() and self.reco_gamma_data is not None: + self.ax_ene_res = plots.plot_energy_resolution(self.reco_gamma_data['true_energy'].quantity, + self.reco_gamma_data['reco_energy'].quantity, + bias_correction=self.bias_correction, + ax=ax, + label=self.name + '_reco', + color=self.color, + ) def update_energy_resolution_reco(self, ax): for c in ax.containers: @@ -419,18 +414,17 @@ def plot_impact_resolution(self, ax=None): self.set_plotted(True) def plot_impact_resolution_reco(self, ax=None): - if self.get_loaded(): - if self.reco_gamma_data is not None: - self.ax_imp_res = plots.plot_impact_resolution_per_energy(self.gamma_data['true_core_x'].quantity, - self.gamma_data['reco_core_x'].quantity, - self.gamma_data['true_core_y'].quantity, - self.gamma_data['reco_core_y'].quantity, - self.gamma_data['true_energy'].quantity, - bias_correction=self.bias_correction, - ax=ax, - label=self.name + '_reco', - color=self.color, - ) + if self.get_loaded() and self.reco_gamma_data is not None: + self.ax_imp_res = plots.plot_impact_resolution_per_energy(self.gamma_data['true_core_x'].quantity, + self.gamma_data['reco_core_x'].quantity, + self.gamma_data['true_core_y'].quantity, + self.gamma_data['reco_core_y'].quantity, + self.gamma_data['true_energy'].quantity, + bias_correction=self.bias_correction, + ax=ax, + label=self.name + '_reco', + color=self.color, + ) def update_impact_resolution_reco(self, ax): for c in ax.containers: @@ -469,31 +463,34 @@ def plot_effective_area_reco(self, ax=None): if self.get_loaded(): self.ax_eff_area = ax if ax is not None else plt.gca() - if self.run_configs is not None and GAMMA_ID in self.run_configs: - if self.reco_gamma_data is not None: - E_reco, S_reco = ana.effective_area_per_energy_power_law(self.run_configs[0]['energy_range_min'], - self.run_configs[0]['energy_range_max'], - self.run_configs[0]['num_showers'], - self.run_configs[0]['spectral_index'], - self.reco_gamma_data['reco_energy'], - self.run_configs[0]['scattering_surface'] - ) - E_reco_prot, S_reco_prot = ana.effective_area_per_energy_power_law( - self.run_configs[0]['energy_range_min'], - self.run_configs[0]['energy_range_max'], - self.run_configs[0]['num_showers'], - self.run_configs[0]['spectral_index'], - self.noise_reco_gamma['reco_energy'], - self.run_configs[0]['scattering_surface'] - ) - self.ax_eff_area.plot(E_reco[:-1], S_reco, - label=self.name + '_reco', - color=self.color, - linestyle=':') - self.ax_eff_area.plot(E_reco_prot[:-1], S_reco_prot, - label=self.name + '_reco_noise', - color=self.color, - linestyle='--') + if ( + self.run_configs is not None + and GAMMA_ID in self.run_configs + and self.reco_gamma_data is not None + ): + E_reco, S_reco = ana.effective_area_per_energy_power_law(self.run_configs[0]['energy_range_min'], + self.run_configs[0]['energy_range_max'], + self.run_configs[0]['num_showers'], + self.run_configs[0]['spectral_index'], + self.reco_gamma_data['reco_energy'], + self.run_configs[0]['scattering_surface'] + ) + E_reco_prot, S_reco_prot = ana.effective_area_per_energy_power_law( + self.run_configs[0]['energy_range_min'], + self.run_configs[0]['energy_range_max'], + self.run_configs[0]['num_showers'], + self.run_configs[0]['spectral_index'], + self.noise_reco_gamma['reco_energy'], + self.run_configs[0]['scattering_surface'] + ) + self.ax_eff_area.plot(E_reco[:-1], S_reco, + label=self.name + '_reco', + color=self.color, + linestyle=':') + self.ax_eff_area.plot(E_reco_prot[:-1], S_reco_prot, + label=self.name + '_reco_noise', + color=self.color, + linestyle='--') def update_effective_area_reco(self, ax): to_remove = [] @@ -505,43 +502,47 @@ def update_effective_area_reco(self, ax): self.plot_effective_area_reco(ax) def plot_effective_area_ratio(self, ax=None): - if self.get_loaded(): - self.ax_eff_area_ratio = ax if ax is not None else plt.gca() - - if self.run_configs is not None and GAMMA_ID in self.run_configs: - if self.mc_trig_events is not None: - E_max, S_max = ana.effective_area_per_energy_power_law(self.run_configs[0]['energy_range_min'], - self.run_configs[0]['energy_range_max'], - self.run_configs[0]['num_showers'], - self.run_configs[0]['spectral_index'], - self.mc_trig_events['mc_trig_energies'], - self.run_configs[0]['scattering_surface']) - E_reco, S_reco = ana.effective_area_per_energy_power_law(self.run_configs[0]['energy_range_min'], - self.run_configs[0]['energy_range_max'], - self.run_configs[0]['num_showers'], - self.run_configs[0]['spectral_index'], - self.reco_gamma_data['reco_energy'], - self.run_configs[0]['scattering_surface'] - ) - E_reco_prot, S_reco_prot = ana.effective_area_per_energy_power_law( - self.run_configs[0]['energy_range_min'], - self.run_configs[0]['energy_range_max'], - self.run_configs[0]['num_showers'], - self.run_configs[0]['spectral_index'], - self.noise_reco_gamma['reco_energy'], - self.run_configs[0]['scattering_surface'] - ) - assert np.all(E_reco_prot == E_max) and np.all(E_reco == E_max), \ - 'To compute effective area ratio, the true_energy bins must be the same' - - self.ax_eff_area_ratio.plot(E_reco[:-1], S_reco / S_max, - label=self.name + '_ratio_gamma', - color=self.color, - linestyle=':') - self.ax_eff_area_ratio.plot(E_reco_prot[:-1], S_reco_prot / S_max, - label=self.name + '_ratio_noise', - color=self.color, - linestyle='--') + if not self.get_loaded(): + return + self.ax_eff_area_ratio = ax if ax is not None else plt.gca() + + if ( + self.run_configs is not None + and GAMMA_ID in self.run_configs + and self.mc_trig_events is not None + ): + E_max, S_max = ana.effective_area_per_energy_power_law(self.run_configs[0]['energy_range_min'], + self.run_configs[0]['energy_range_max'], + self.run_configs[0]['num_showers'], + self.run_configs[0]['spectral_index'], + self.mc_trig_events['mc_trig_energies'], + self.run_configs[0]['scattering_surface']) + E_reco, S_reco = ana.effective_area_per_energy_power_law(self.run_configs[0]['energy_range_min'], + self.run_configs[0]['energy_range_max'], + self.run_configs[0]['num_showers'], + self.run_configs[0]['spectral_index'], + self.reco_gamma_data['reco_energy'], + self.run_configs[0]['scattering_surface'] + ) + E_reco_prot, S_reco_prot = ana.effective_area_per_energy_power_law( + self.run_configs[0]['energy_range_min'], + self.run_configs[0]['energy_range_max'], + self.run_configs[0]['num_showers'], + self.run_configs[0]['spectral_index'], + self.noise_reco_gamma['reco_energy'], + self.run_configs[0]['scattering_surface'] + ) + assert np.all(E_reco_prot == E_max) and np.all(E_reco == E_max), \ + 'To compute effective area ratio, the true_energy bins must be the same' + + self.ax_eff_area_ratio.plot(E_reco[:-1], S_reco / S_max, + label=self.name + '_ratio_gamma', + color=self.color, + linestyle=':') + self.ax_eff_area_ratio.plot(E_reco_prot[:-1], S_reco_prot / S_max, + label=self.name + '_ratio_noise', + color=self.color, + linestyle='--') def update_effective_area_ratio(self, ax): to_remove = [] @@ -565,36 +566,40 @@ def plot_roc_curve(self, ax=None): self.set_plotted(True) def plot_pr_curve(self, ax=None): - if self.get_loaded(): - self.ax_pr = plt.gca() if ax is None else ax - if 'reco_gammaness' in self.data.columns: - label_binarizer = LabelBinarizer() - binarized_classes = label_binarizer.fit_transform(self.data['true_particle']) - ii = np.where(label_binarizer.classes_ == GAMMA_ID)[0][0] - precision, recall, _ = precision_recall_curve(binarized_classes[:, ii], - self.data['reco_gammaness'], - pos_label=GAMMA_ID) - else: - raise ValueError - self.ax_pr.plot(recall, precision, label=self.name, color=self.color) - self.set_plotted(True) + if not self.get_loaded(): + return + self.ax_pr = plt.gca() if ax is None else ax + if 'reco_gammaness' not in self.data.columns: + raise ValueError + label_binarizer = LabelBinarizer() + binarized_classes = label_binarizer.fit_transform(self.data['true_particle']) + ii = np.where(label_binarizer.classes_ == GAMMA_ID)[0][0] + precision, recall, _ = precision_recall_curve(binarized_classes[:, ii], + self.data['reco_gammaness'], + pos_label=GAMMA_ID) + self.ax_pr.plot(recall, precision, label=self.name, color=self.color) + self.set_plotted(True) def plot_gammaness_cut(self): - if self.get_loaded() and self.gammaness_cut is not None and self.ax_pr is not None: - if 'reco_gammaness' in self.data.columns: - true_positive = self.gamma_data[self.gamma_data['reco_gammaness'] >= self.gammaness_cut] - noise = self.data[self.data['true_particle'] != GAMMA_ID] - false_positive = noise[noise['reco_gammaness'] >= self.gammaness_cut] - else: - raise ValueError - try: - self.precision = len(true_positive) / (len(true_positive) + len(false_positive)) - self.recall = len(true_positive) / len(self.gamma_data) - except Exception as e: - print('Plot gammaness cut ', e) - self.precision = 0 - self.recall = 0 - self.ax_pr.scatter(self.recall, self.precision, c=[self.color], label=self.name) + if ( + not self.get_loaded() + or self.gammaness_cut is None + or self.ax_pr is None + ): + return + if 'reco_gammaness' not in self.data.columns: + raise ValueError + true_positive = self.gamma_data[self.gamma_data['reco_gammaness'] >= self.gammaness_cut] + noise = self.data[self.data['true_particle'] != GAMMA_ID] + false_positive = noise[noise['reco_gammaness'] >= self.gammaness_cut] + try: + self.precision = len(true_positive) / (len(true_positive) + len(false_positive)) + self.recall = len(true_positive) / len(self.gamma_data) + except Exception as e: + print('Plot gammaness cut ', e) + self.precision = 0 + self.recall = 0 + self.ax_pr.scatter(self.recall, self.precision, c=[self.color], label=self.name) def update_pr_cut(self, ax): for c in ax.collections: diff --git a/ctaplot/gammaboard/reader.py b/ctaplot/gammaboard/reader.py index 89f1fdc..e9fc443 100644 --- a/ctaplot/gammaboard/reader.py +++ b/ctaplot/gammaboard/reader.py @@ -7,8 +7,7 @@ def read_params(filename): readers = [LstchainDL2Reader, GammaLearnv01DL2Reader] for reader in readers: try: - data = reader(filename).read() - return data + return reader(filename).read() except: continue raise ValueError(f"Can't read data from {filename}") diff --git a/ctaplot/plots/plots.py b/ctaplot/plots/plots.py index 7e0d889..3d8980f 100644 --- a/ctaplot/plots/plots.py +++ b/ctaplot/plots/plots.py @@ -2146,7 +2146,7 @@ def plot_precision_recall(y_true, proba_pred, pos_label=0, sample_weigth=None, t pred = (proba_pred > threshold).astype(int) neg_label = list(set(y_true)) neg_label.remove(pos_label) - if not len(neg_label) == 1: + if len(neg_label) != 1: raise ValueError("`y_true` should contain only two labels") neg_label = neg_label[0] pred_labels = np.where(pred == 1, np.ones_like(pred) * pos_label, np.ones_like(pred) * neg_label)