Skip to content

Commit

Permalink
Fixed #998: Speed up stumpi and aampi (#1001)
Browse files Browse the repository at this point in the history
* Average 4x faster performance! 

* wrap njit-decorated function around hot spot to improve performance

* improve docstring

* Move function to stumpy.core

* Add test function

* refactored aampi._update_egress

* refactored stumpi._update

* refactored using newly-created function

* Rename function

* test top-k feature in test function

* use name of parameter when passing value to be more explicit

* add comment and new case in the test function

* revise test functions

* minor enhancement

* extra test that causes an error

* Fixed error caused by loss of precision

* revise test function and improve comments

* Revised docstring and comment

* Fixed flake8 format

* Minor changes

* Fixed black format

* replace random with np.random
  • Loading branch information
NimaSarajpoor authored Sep 13, 2024
1 parent e271570 commit 692c99c
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 78 deletions.
48 changes: 9 additions & 39 deletions stumpy/aampi.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,26 +247,9 @@ def _update_egress(self, t):
if np.any(~self._T_isfinite[-self._m :]):
D[:] = np.inf

core.apply_exclusion_zone(D, D.shape[0] - 1, self._excl_zone, np.inf)

update_idx = np.argwhere(D < self._P[:, -1]).flatten()
for i in update_idx:
idx = np.searchsorted(self._P[i], D[i], side="right")
core._shift_insert_at_index(self._P[i], idx, D[i])
core._shift_insert_at_index(
self._I[i], idx, D.shape[0] + self._n_appended - 1
)
# D.shape[0] is base-1

# Calculate the (top-k) matrix profile values/indices for the last subsequence
# by using its corresponding distance profile `D`
self._P[-1] = np.inf
self._I[-1] = -1
for i, d in enumerate(D):
if d < self._P[-1, -1]:
idx = np.searchsorted(self._P[-1], d, side="right")
core._shift_insert_at_index(self._P[-1], idx, d)
core._shift_insert_at_index(self._I[-1], idx, i + self._n_appended)
core._update_incremental_PI(
D, self._P, self._I, self._excl_zone, n_appended=self._n_appended
)

# All neighbors of the last subsequence are on its left. So, its (top-1)
# matrix profile value/index and its left matrix profile value/index must
Expand Down Expand Up @@ -322,30 +305,17 @@ def _update(self, t):
if np.any(~self._T_isfinite[-self._m :]):
D[:] = np.inf

core.apply_exclusion_zone(D, D.shape[0] - 1, self._excl_zone, np.inf)

update_idx = np.argwhere(D[:l] < self._P[:l, -1]).flatten()
for i in update_idx:
idx = np.searchsorted(self._P[i], D[i], side="right")
core._shift_insert_at_index(self._P[i], idx, D[i])
core._shift_insert_at_index(self._I[i], idx, l)

# Calculating top-k matrix profile and (top-1) left matrix profile (and their
# corresponding indices) for new subsequence whose distance profile is `D`
P_new = np.full(self._k, np.inf, dtype=np.float64)
I_new = np.full(self._k, -1, dtype=np.int64)
for i, d in enumerate(D):
if d < P_new[-1]: # maximum value in sorted array P_new
idx = np.searchsorted(P_new, d, side="right")
core._shift_insert_at_index(P_new, idx, d)
core._shift_insert_at_index(I_new, idx, i)
self._P = np.append(self._P, P_new.reshape(1, -1), axis=0)
self._I = np.append(self._I, I_new.reshape(1, -1), axis=0)

core._update_incremental_PI(D, self._P, self._I, self._excl_zone, n_appended=0)

left_I_new = I_new[0]
left_P_new = P_new[0]
left_I_new = self._I[-1, 0]
left_P_new = self._P[-1, 0]

self._T = T_new
self._P = np.append(self._P, P_new.reshape(1, -1), axis=0)
self._I = np.append(self._I, I_new.reshape(1, -1), axis=0)
self._left_P = np.append(self._left_P, left_P_new)
self._left_I = np.append(self._left_I, left_I_new)
self._p_norm = p_norm_new
Expand Down
67 changes: 67 additions & 0 deletions stumpy/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4368,3 +4368,70 @@ def get_ray_nworkers(ray_client):
Total number of Ray workers
"""
return int(ray_client.cluster_resources().get("CPU"))


@njit
def _update_incremental_PI(D, P, I, excl_zone, n_appended=0):
"""
Given the 1D array distance profile, `D`, of the last subsequence of T,
update (in-place) the (top-k) matrix profile, `P`, and the matrix profile
index, I.
Parameters
----------
D : numpy.ndarray
A 1D array (with dtype float) representing the distance profile of
the last subsequence of T
P : numpy.ndarray
A 2D array representing the matrix profile of T,
with shape (len(T) - m + 1, k), where `m` is the window size.
P[-1, :] should be set to np.inf
I : numpy.ndarray
A 2D array representing the matrix profile index of T,
with shape (len(T) - m + 1, k), where `m` is the window size
I[-1, :] should be set to -1.
excl_zone : int
Size of the exclusion zone.
n_appended : int
Number of times the timeseries start point is shifted one to the right.
See note below for more details.
Returns
-------
None
Note
-----
The `n_appended` parameter is used to indicate the number of times the timeseries
start point is shifted one to the right. When `egress=False` (see stumpy.stumpi),
the matrix profile and matrix profile index are updated in an incremental fashion
while considering all historical data. `n_appended` must be set to 0 in such
cases. However, when `egress=True`, the matrix profile and matrix profile index are
updated in an incremental fashion and they represent the matrix profile and matrix
profile index for the `l` most recent subsequences (where `l = len(T) - m + 1`).
In this case, each subsequence is only compared against upto `l-1` left neighbors
and upto `l-1` right neighbors.
"""
_apply_exclusion_zone(D, D.shape[0] - 1, excl_zone, np.inf)

update_idx = np.argwhere(D < P[:, -1]).flatten()
for i in update_idx:
idx = np.searchsorted(P[i], D[i], side="right")
_shift_insert_at_index(P[i], idx, D[i])
_shift_insert_at_index(I[i], idx, D.shape[0] + n_appended - 1)

# Calculate the (top-k) matrix profile values/indidces
# for the last subsequence
P[-1] = np.inf
I[-1] = -1
for i, d in enumerate(D):
if d < P[-1, -1]:
idx = np.searchsorted(P[-1], d, side="right")
_shift_insert_at_index(P[-1], idx, d)
_shift_insert_at_index(I[-1], idx, i + n_appended)

return
49 changes: 10 additions & 39 deletions stumpy/stumpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,26 +354,9 @@ def _update_egress(self, t):
if np.any(~self._T_isfinite[-self._m :]):
D[:] = np.inf

core.apply_exclusion_zone(D, D.shape[0] - 1, self._excl_zone, np.inf)

update_idx = np.argwhere(D < self._P[:, -1]).flatten()
for i in update_idx:
idx = np.searchsorted(self._P[i], D[i], side="right")
core._shift_insert_at_index(self._P[i], idx, D[i])
core._shift_insert_at_index(
self._I[i], idx, D.shape[0] + self._n_appended - 1
)
# D.shape[0] is base-1

# Calculate the (top-k) matrix profile values/indices for the last subsequence
# by using its corresponding distance profile `D`
self._P[-1] = np.inf
self._I[-1] = -1
for i, d in enumerate(D):
if d < self._P[-1, -1]:
idx = np.searchsorted(self._P[-1], d, side="right")
core._shift_insert_at_index(self._P[-1], idx, d)
core._shift_insert_at_index(self._I[-1], idx, i + self._n_appended)
core._update_incremental_PI(
D, self._P, self._I, self._excl_zone, n_appended=self._n_appended
)

# All neighbors of the last subsequence are on its left. So, its (top-1)
# matrix profile value/index and its left matrix profile value/index must
Expand Down Expand Up @@ -440,30 +423,18 @@ def _update(self, t):
if np.any(~self._T_isfinite[-self._m :]):
D[:] = np.inf

core.apply_exclusion_zone(D, D.shape[0] - 1, self._excl_zone, np.inf)

update_idx = np.argwhere(D[:l] < self._P[:l, -1]).flatten()
for i in update_idx:
idx = np.searchsorted(self._P[i], D[i], side="right")
core._shift_insert_at_index(self._P[i], idx, D[i])
core._shift_insert_at_index(self._I[i], idx, l)

# Calculating top-k matrix profile and (top-1) left matrix profile (and their
# corresponding indices) for new subsequence whose distance profile is `D`
P_new = np.full(self._k, np.inf, dtype=np.float64)
I_new = np.full(self._k, -1, dtype=np.int64)
for i, d in enumerate(D):
if d < P_new[-1]: # maximum value in sorted array P_new
idx = np.searchsorted(P_new, d, side="right")
core._shift_insert_at_index(P_new, idx, d)
core._shift_insert_at_index(I_new, idx, i)
self._P = np.append(self._P, P_new.reshape(1, -1), axis=0)
self._I = np.append(self._I, I_new.reshape(1, -1), axis=0)

core._update_incremental_PI(D, self._P, self._I, self._excl_zone, n_appended=0)

left_I_new = I_new[0]
left_P_new = P_new[0]
left_I_new = self._I[-1, 0]
left_P_new = self._P[-1, 0]

self._T = T_new
self._P = np.append(self._P, P_new.reshape(1, -1), axis=0)
self._I = np.append(self._I, I_new.reshape(1, -1), axis=0)

self._left_P = np.append(self._left_P, left_P_new)
self._left_I = np.append(self._left_I, left_I_new)
self._QT = QT_new
Expand Down
170 changes: 170 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1778,3 +1778,173 @@ def test_process_isconstant_1d_default():
T_subseq_isconstant_comp = core.process_isconstant(T, m, T_subseq_isconstant=None)

npt.assert_array_equal(T_subseq_isconstant_ref, T_subseq_isconstant_comp)


def test_update_incremental_PI_egressFalse():
# This tests the function `core._update_incremental_PI`
# when `egress` is False, meaning new data point is being
# appended to the historical data.
T = np.random.rand(64)
t = np.random.rand() # new datapoint
T_new = np.append(T, t)

m = 3
excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))

for k in range(1, 4):
# ref
mp_ref = naive.stump(T_new, m, row_wise=True, k=k)
P_ref = mp_ref[:, :k].astype(np.float64)
I_ref = mp_ref[:, k : 2 * k].astype(np.int64)

# comp
mp = naive.stump(T, m, row_wise=True, k=k)
P_comp = mp[:, :k].astype(np.float64)
I_comp = mp[:, k : 2 * k].astype(np.int64)

# Because of the new data point, the length of matrix profile
# and matrix profile indices should be increased by one.
P_comp = np.pad(
P_comp,
[(0, 1), (0, 0)],
mode="constant",
constant_values=np.inf,
)
I_comp = np.pad(
I_comp,
[(0, 1), (0, 0)],
mode="constant",
constant_values=-1,
)

D = core.mass(T_new[-m:], T_new)
core._update_incremental_PI(D, P_comp, I_comp, excl_zone, n_appended=0)

# assertion
npt.assert_almost_equal(P_ref, P_comp)
npt.assert_almost_equal(I_ref, I_comp)


def test_update_incremental_PI_egressTrue():
T = np.random.rand(64)
t = np.random.rand() # new data point
m = 3
excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))

for k in range(1, 4):
# ref
# In egress=True mode, a new data point, t, is being appended
# to the historical data, T, while the oldest data point is
# being removed. Therefore, the first subsequence in T
# and the last subsequence does not get a chance to meet each
# other. Therefore, we need to exclude that distance.

T_with_t = np.append(T, t)
D = naive.distance_matrix(T_with_t, T_with_t, m)
D[-1, 0] = np.inf
D[0, -1] = np.inf

l = len(T_with_t) - m + 1
P = np.empty((l, k), dtype=np.float64)
I = np.empty((l, k), dtype=np.int64)
for i in range(l):
core.apply_exclusion_zone(D[i], i, excl_zone, np.inf)
IDX = np.argsort(D[i], kind="mergesort")[:k]
I[i] = IDX
P[i] = D[i, IDX]

P_ref = P[1:].copy()
I_ref = I[1:].copy()

# comp
mp = naive.stump(T, m, row_wise=True, k=k)
P_comp = mp[:, :k].astype(np.float64)
I_comp = mp[:, k : 2 * k].astype(np.int64)

P_comp[:-1] = P_comp[1:]
P_comp[-1] = np.inf
I_comp[:-1] = I_comp[1:]
I_comp[-1] = -1

T_new = np.append(T[1:], t)
D = core.mass(T_new[-m:], T_new)
core._update_incremental_PI(D, P_comp, I_comp, excl_zone, n_appended=1)

# assertion
npt.assert_almost_equal(P_ref, P_comp)
npt.assert_almost_equal(I_ref, I_comp)


def test_update_incremental_PI_egressTrue_MemoryCheck():
# This test function is to ensure that the function
# `core._update_incremental_PI` does not forget the
# nearest neighbors that were pointing to those old data
# points that are removed in the `egress=True` mode.
# This can be tested by inserting the same subsequence, s, in the beginning,
# middle, and end of the time series. This is to allow us to know which
# neighbor is the nearest neighbor to each of those three subsequences.

# In the `egress=True` mode, the first element of the time series is removed and
# a new data point is appended. However, the updated matrix profile index for the
# middle subsequence `s` should still refer to the first subsequence in
# the historical data.
seed = 0
np.random.seed(seed)

T = np.random.rand(64)
m = 3
excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))

s = np.random.rand(m)
T[:m] = s
T[30 : 30 + m] = s
T[-m:] = s

t = np.random.rand() # new data point
T_with_t = np.append(T, t)

# In egress=True mode, a new data point, t, is being appended
# to the historical data, T, while the oldest data point is
# being removed. Therefore, the first subsequence in T
# and the last subsequence does not get a chance to meet each
# other. Therefore, their pairwise distances should be excluded
# from the distance matrix.
D = naive.distance_matrix(T_with_t, T_with_t, m)
D[-1, 0] = np.inf
D[0, -1] = np.inf

l = len(T_with_t) - m + 1
for i in range(l):
core.apply_exclusion_zone(D[i], i, excl_zone, np.inf)

T_new = np.append(T[1:], t)
dist_profile = naive.distance_profile(T_new[-m:], T_new, m)
core.apply_exclusion_zone(dist_profile, len(dist_profile) - 1, excl_zone, np.inf)

for k in range(1, 4):
# ref
P = np.empty((l, k), dtype=np.float64)
I = np.empty((l, k), dtype=np.int64)
for i in range(l):
IDX = np.argsort(D[i], kind="mergesort")[:k]
I[i] = IDX
P[i] = D[i, IDX]

P_ref = P[1:].copy()
I_ref = I[1:].copy()

# comp
mp = naive.stump(T, m, row_wise=True, k=k)
P_comp = mp[:, :k].astype(np.float64)
I_comp = mp[:, k : 2 * k].astype(np.int64)

P_comp[:-1] = P_comp[1:]
P_comp[-1] = np.inf
I_comp[:-1] = I_comp[1:]
I_comp[-1] = -1
core._update_incremental_PI(
dist_profile, P_comp, I_comp, excl_zone, n_appended=1
)

npt.assert_almost_equal(P_ref, P_comp)
npt.assert_almost_equal(I_ref, I_comp)

0 comments on commit 692c99c

Please sign in to comment.