From 692c99cdade7b785097a46d71f42a4def4008d6d Mon Sep 17 00:00:00 2001 From: Nima Sarajpoor Date: Fri, 13 Sep 2024 16:47:50 -0400 Subject: [PATCH] Fixed #998: Speed up stumpi and aampi (#1001) * Average 4x faster performance! * wrap njit-decorated function around hot spot to improve performance * improve docstring * Move function to stumpy.core * Add test function * refactored aampi._update_egress * refactored stumpi._update * refactored using newly-created function * Rename function * test top-k feature in test function * use name of parameter when passing value to be more explicit * add comment and new case in the test function * revise test functions * minor enhancement * extra test that causes an error * Fixed error caused by loss of precision * revise test function and improve comments * Revised docstring and comment * Fixed flake8 format * Minor changes * Fixed black format * replace random with np.random --- stumpy/aampi.py | 48 +++---------- stumpy/core.py | 67 ++++++++++++++++++ stumpy/stumpi.py | 49 +++---------- tests/test_core.py | 170 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 256 insertions(+), 78 deletions(-) diff --git a/stumpy/aampi.py b/stumpy/aampi.py index 1a084f802..e5c2ee8a1 100644 --- a/stumpy/aampi.py +++ b/stumpy/aampi.py @@ -247,26 +247,9 @@ def _update_egress(self, t): if np.any(~self._T_isfinite[-self._m :]): D[:] = np.inf - core.apply_exclusion_zone(D, D.shape[0] - 1, self._excl_zone, np.inf) - - update_idx = np.argwhere(D < self._P[:, -1]).flatten() - for i in update_idx: - idx = np.searchsorted(self._P[i], D[i], side="right") - core._shift_insert_at_index(self._P[i], idx, D[i]) - core._shift_insert_at_index( - self._I[i], idx, D.shape[0] + self._n_appended - 1 - ) - # D.shape[0] is base-1 - - # Calculate the (top-k) matrix profile values/indices for the last subsequence - # by using its corresponding distance profile `D` - self._P[-1] = np.inf - self._I[-1] = -1 - for i, d in enumerate(D): - if d < self._P[-1, -1]: - idx = np.searchsorted(self._P[-1], d, side="right") - core._shift_insert_at_index(self._P[-1], idx, d) - core._shift_insert_at_index(self._I[-1], idx, i + self._n_appended) + core._update_incremental_PI( + D, self._P, self._I, self._excl_zone, n_appended=self._n_appended + ) # All neighbors of the last subsequence are on its left. So, its (top-1) # matrix profile value/index and its left matrix profile value/index must @@ -322,30 +305,17 @@ def _update(self, t): if np.any(~self._T_isfinite[-self._m :]): D[:] = np.inf - core.apply_exclusion_zone(D, D.shape[0] - 1, self._excl_zone, np.inf) - - update_idx = np.argwhere(D[:l] < self._P[:l, -1]).flatten() - for i in update_idx: - idx = np.searchsorted(self._P[i], D[i], side="right") - core._shift_insert_at_index(self._P[i], idx, D[i]) - core._shift_insert_at_index(self._I[i], idx, l) - - # Calculating top-k matrix profile and (top-1) left matrix profile (and their - # corresponding indices) for new subsequence whose distance profile is `D` P_new = np.full(self._k, np.inf, dtype=np.float64) I_new = np.full(self._k, -1, dtype=np.int64) - for i, d in enumerate(D): - if d < P_new[-1]: # maximum value in sorted array P_new - idx = np.searchsorted(P_new, d, side="right") - core._shift_insert_at_index(P_new, idx, d) - core._shift_insert_at_index(I_new, idx, i) + self._P = np.append(self._P, P_new.reshape(1, -1), axis=0) + self._I = np.append(self._I, I_new.reshape(1, -1), axis=0) + + core._update_incremental_PI(D, self._P, self._I, self._excl_zone, n_appended=0) - left_I_new = I_new[0] - left_P_new = P_new[0] + left_I_new = self._I[-1, 0] + left_P_new = self._P[-1, 0] self._T = T_new - self._P = np.append(self._P, P_new.reshape(1, -1), axis=0) - self._I = np.append(self._I, I_new.reshape(1, -1), axis=0) self._left_P = np.append(self._left_P, left_P_new) self._left_I = np.append(self._left_I, left_I_new) self._p_norm = p_norm_new diff --git a/stumpy/core.py b/stumpy/core.py index a237931a7..4cdaea02a 100644 --- a/stumpy/core.py +++ b/stumpy/core.py @@ -4368,3 +4368,70 @@ def get_ray_nworkers(ray_client): Total number of Ray workers """ return int(ray_client.cluster_resources().get("CPU")) + + +@njit +def _update_incremental_PI(D, P, I, excl_zone, n_appended=0): + """ + Given the 1D array distance profile, `D`, of the last subsequence of T, + update (in-place) the (top-k) matrix profile, `P`, and the matrix profile + index, I. + + Parameters + ---------- + D : numpy.ndarray + A 1D array (with dtype float) representing the distance profile of + the last subsequence of T + + P : numpy.ndarray + A 2D array representing the matrix profile of T, + with shape (len(T) - m + 1, k), where `m` is the window size. + P[-1, :] should be set to np.inf + + I : numpy.ndarray + A 2D array representing the matrix profile index of T, + with shape (len(T) - m + 1, k), where `m` is the window size + I[-1, :] should be set to -1. + + excl_zone : int + Size of the exclusion zone. + + n_appended : int + Number of times the timeseries start point is shifted one to the right. + See note below for more details. + + Returns + ------- + None + + Note + ----- + The `n_appended` parameter is used to indicate the number of times the timeseries + start point is shifted one to the right. When `egress=False` (see stumpy.stumpi), + the matrix profile and matrix profile index are updated in an incremental fashion + while considering all historical data. `n_appended` must be set to 0 in such + cases. However, when `egress=True`, the matrix profile and matrix profile index are + updated in an incremental fashion and they represent the matrix profile and matrix + profile index for the `l` most recent subsequences (where `l = len(T) - m + 1`). + In this case, each subsequence is only compared against upto `l-1` left neighbors + and upto `l-1` right neighbors. + """ + _apply_exclusion_zone(D, D.shape[0] - 1, excl_zone, np.inf) + + update_idx = np.argwhere(D < P[:, -1]).flatten() + for i in update_idx: + idx = np.searchsorted(P[i], D[i], side="right") + _shift_insert_at_index(P[i], idx, D[i]) + _shift_insert_at_index(I[i], idx, D.shape[0] + n_appended - 1) + + # Calculate the (top-k) matrix profile values/indidces + # for the last subsequence + P[-1] = np.inf + I[-1] = -1 + for i, d in enumerate(D): + if d < P[-1, -1]: + idx = np.searchsorted(P[-1], d, side="right") + _shift_insert_at_index(P[-1], idx, d) + _shift_insert_at_index(I[-1], idx, i + n_appended) + + return diff --git a/stumpy/stumpi.py b/stumpy/stumpi.py index 1731b2808..feb8cb2af 100644 --- a/stumpy/stumpi.py +++ b/stumpy/stumpi.py @@ -354,26 +354,9 @@ def _update_egress(self, t): if np.any(~self._T_isfinite[-self._m :]): D[:] = np.inf - core.apply_exclusion_zone(D, D.shape[0] - 1, self._excl_zone, np.inf) - - update_idx = np.argwhere(D < self._P[:, -1]).flatten() - for i in update_idx: - idx = np.searchsorted(self._P[i], D[i], side="right") - core._shift_insert_at_index(self._P[i], idx, D[i]) - core._shift_insert_at_index( - self._I[i], idx, D.shape[0] + self._n_appended - 1 - ) - # D.shape[0] is base-1 - - # Calculate the (top-k) matrix profile values/indices for the last subsequence - # by using its corresponding distance profile `D` - self._P[-1] = np.inf - self._I[-1] = -1 - for i, d in enumerate(D): - if d < self._P[-1, -1]: - idx = np.searchsorted(self._P[-1], d, side="right") - core._shift_insert_at_index(self._P[-1], idx, d) - core._shift_insert_at_index(self._I[-1], idx, i + self._n_appended) + core._update_incremental_PI( + D, self._P, self._I, self._excl_zone, n_appended=self._n_appended + ) # All neighbors of the last subsequence are on its left. So, its (top-1) # matrix profile value/index and its left matrix profile value/index must @@ -440,30 +423,18 @@ def _update(self, t): if np.any(~self._T_isfinite[-self._m :]): D[:] = np.inf - core.apply_exclusion_zone(D, D.shape[0] - 1, self._excl_zone, np.inf) - - update_idx = np.argwhere(D[:l] < self._P[:l, -1]).flatten() - for i in update_idx: - idx = np.searchsorted(self._P[i], D[i], side="right") - core._shift_insert_at_index(self._P[i], idx, D[i]) - core._shift_insert_at_index(self._I[i], idx, l) - - # Calculating top-k matrix profile and (top-1) left matrix profile (and their - # corresponding indices) for new subsequence whose distance profile is `D` P_new = np.full(self._k, np.inf, dtype=np.float64) I_new = np.full(self._k, -1, dtype=np.int64) - for i, d in enumerate(D): - if d < P_new[-1]: # maximum value in sorted array P_new - idx = np.searchsorted(P_new, d, side="right") - core._shift_insert_at_index(P_new, idx, d) - core._shift_insert_at_index(I_new, idx, i) + self._P = np.append(self._P, P_new.reshape(1, -1), axis=0) + self._I = np.append(self._I, I_new.reshape(1, -1), axis=0) + + core._update_incremental_PI(D, self._P, self._I, self._excl_zone, n_appended=0) - left_I_new = I_new[0] - left_P_new = P_new[0] + left_I_new = self._I[-1, 0] + left_P_new = self._P[-1, 0] self._T = T_new - self._P = np.append(self._P, P_new.reshape(1, -1), axis=0) - self._I = np.append(self._I, I_new.reshape(1, -1), axis=0) + self._left_P = np.append(self._left_P, left_P_new) self._left_I = np.append(self._left_I, left_I_new) self._QT = QT_new diff --git a/tests/test_core.py b/tests/test_core.py index d35be0f89..8d0721979 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1778,3 +1778,173 @@ def test_process_isconstant_1d_default(): T_subseq_isconstant_comp = core.process_isconstant(T, m, T_subseq_isconstant=None) npt.assert_array_equal(T_subseq_isconstant_ref, T_subseq_isconstant_comp) + + +def test_update_incremental_PI_egressFalse(): + # This tests the function `core._update_incremental_PI` + # when `egress` is False, meaning new data point is being + # appended to the historical data. + T = np.random.rand(64) + t = np.random.rand() # new datapoint + T_new = np.append(T, t) + + m = 3 + excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM)) + + for k in range(1, 4): + # ref + mp_ref = naive.stump(T_new, m, row_wise=True, k=k) + P_ref = mp_ref[:, :k].astype(np.float64) + I_ref = mp_ref[:, k : 2 * k].astype(np.int64) + + # comp + mp = naive.stump(T, m, row_wise=True, k=k) + P_comp = mp[:, :k].astype(np.float64) + I_comp = mp[:, k : 2 * k].astype(np.int64) + + # Because of the new data point, the length of matrix profile + # and matrix profile indices should be increased by one. + P_comp = np.pad( + P_comp, + [(0, 1), (0, 0)], + mode="constant", + constant_values=np.inf, + ) + I_comp = np.pad( + I_comp, + [(0, 1), (0, 0)], + mode="constant", + constant_values=-1, + ) + + D = core.mass(T_new[-m:], T_new) + core._update_incremental_PI(D, P_comp, I_comp, excl_zone, n_appended=0) + + # assertion + npt.assert_almost_equal(P_ref, P_comp) + npt.assert_almost_equal(I_ref, I_comp) + + +def test_update_incremental_PI_egressTrue(): + T = np.random.rand(64) + t = np.random.rand() # new data point + m = 3 + excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM)) + + for k in range(1, 4): + # ref + # In egress=True mode, a new data point, t, is being appended + # to the historical data, T, while the oldest data point is + # being removed. Therefore, the first subsequence in T + # and the last subsequence does not get a chance to meet each + # other. Therefore, we need to exclude that distance. + + T_with_t = np.append(T, t) + D = naive.distance_matrix(T_with_t, T_with_t, m) + D[-1, 0] = np.inf + D[0, -1] = np.inf + + l = len(T_with_t) - m + 1 + P = np.empty((l, k), dtype=np.float64) + I = np.empty((l, k), dtype=np.int64) + for i in range(l): + core.apply_exclusion_zone(D[i], i, excl_zone, np.inf) + IDX = np.argsort(D[i], kind="mergesort")[:k] + I[i] = IDX + P[i] = D[i, IDX] + + P_ref = P[1:].copy() + I_ref = I[1:].copy() + + # comp + mp = naive.stump(T, m, row_wise=True, k=k) + P_comp = mp[:, :k].astype(np.float64) + I_comp = mp[:, k : 2 * k].astype(np.int64) + + P_comp[:-1] = P_comp[1:] + P_comp[-1] = np.inf + I_comp[:-1] = I_comp[1:] + I_comp[-1] = -1 + + T_new = np.append(T[1:], t) + D = core.mass(T_new[-m:], T_new) + core._update_incremental_PI(D, P_comp, I_comp, excl_zone, n_appended=1) + + # assertion + npt.assert_almost_equal(P_ref, P_comp) + npt.assert_almost_equal(I_ref, I_comp) + + +def test_update_incremental_PI_egressTrue_MemoryCheck(): + # This test function is to ensure that the function + # `core._update_incremental_PI` does not forget the + # nearest neighbors that were pointing to those old data + # points that are removed in the `egress=True` mode. + # This can be tested by inserting the same subsequence, s, in the beginning, + # middle, and end of the time series. This is to allow us to know which + # neighbor is the nearest neighbor to each of those three subsequences. + + # In the `egress=True` mode, the first element of the time series is removed and + # a new data point is appended. However, the updated matrix profile index for the + # middle subsequence `s` should still refer to the first subsequence in + # the historical data. + seed = 0 + np.random.seed(seed) + + T = np.random.rand(64) + m = 3 + excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM)) + + s = np.random.rand(m) + T[:m] = s + T[30 : 30 + m] = s + T[-m:] = s + + t = np.random.rand() # new data point + T_with_t = np.append(T, t) + + # In egress=True mode, a new data point, t, is being appended + # to the historical data, T, while the oldest data point is + # being removed. Therefore, the first subsequence in T + # and the last subsequence does not get a chance to meet each + # other. Therefore, their pairwise distances should be excluded + # from the distance matrix. + D = naive.distance_matrix(T_with_t, T_with_t, m) + D[-1, 0] = np.inf + D[0, -1] = np.inf + + l = len(T_with_t) - m + 1 + for i in range(l): + core.apply_exclusion_zone(D[i], i, excl_zone, np.inf) + + T_new = np.append(T[1:], t) + dist_profile = naive.distance_profile(T_new[-m:], T_new, m) + core.apply_exclusion_zone(dist_profile, len(dist_profile) - 1, excl_zone, np.inf) + + for k in range(1, 4): + # ref + P = np.empty((l, k), dtype=np.float64) + I = np.empty((l, k), dtype=np.int64) + for i in range(l): + IDX = np.argsort(D[i], kind="mergesort")[:k] + I[i] = IDX + P[i] = D[i, IDX] + + P_ref = P[1:].copy() + I_ref = I[1:].copy() + + # comp + mp = naive.stump(T, m, row_wise=True, k=k) + P_comp = mp[:, :k].astype(np.float64) + I_comp = mp[:, k : 2 * k].astype(np.int64) + + P_comp[:-1] = P_comp[1:] + P_comp[-1] = np.inf + I_comp[:-1] = I_comp[1:] + I_comp[-1] = -1 + core._update_incremental_PI( + dist_profile, P_comp, I_comp, excl_zone, n_appended=1 + ) + + npt.assert_almost_equal(P_ref, P_comp) + npt.assert_almost_equal(I_ref, I_comp)