Skip to content

Commit

Permalink
[WIP] Fixed #639 Top-K Nearest Neighbors to Matrix Profile (normalize…
Browse files Browse the repository at this point in the history
…=False) (#714)

* Add support for top-k to naive aamp

* Add test function for TopK. Expect Error.

* Add TopK support to aamp. Error Resolved

* Add TopK test function for AB_join

* (Temporarily) fix scraamp to adapt the changes in _aamp

* fix calling function _aamp

* (Temporarily) fixed aamped

* Avoid unnecessary passing k and minor changes

* fix docstring in aamped

* Add support for top-k to naive prescraamp

* Add test function for prescraamp top-k. Expect Error

* Add top-k support for prescraamp

* Add top-k test function for AB_join prescraamp

* Add top-k support to scraamp and update test function

* Add new test functions and minor fixes

* update test function to adapt to new changes

* update test function to adapt to new changes

* Add Top-K support to gpu_aamp

* fix import

* fix minor bug

* minor fixes

* Add test function for top-k

* Empty Commit

* Add top-k support to naive

* fix issues to pass existing unit tests aampi

* Add top-k test. Expect Error

* Add top-k support to performant aampi

* Add top-k test function to aampi with egress

* minor changes

* Revised signature

* replace param with class attribute

* fixed style

* fixed typo

* Move device function to core to fix circular import error

* Update test functions

* fix bug

* replace wrong variable with the correct one

* fix docstring

* fix docstrings

* Empty commit
  • Loading branch information
NimaSarajpoor authored Dec 3, 2022
1 parent 9cd0984 commit 8da71ec
Show file tree
Hide file tree
Showing 18 changed files with 1,602 additions and 696 deletions.
204 changes: 147 additions & 57 deletions stumpy/aamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@njit(
# "(f8[:], f8[:], i8, b1[:], b1[:], f8, i8[:], i8, i8, i8, f8[:, :, :],"
# "i8[:, :, :], b1)",
# "f8[:, :], f8[:, :], i8[:, :, :], i8[:, :], i8[:, :], b1)",
fastmath=True,
)
def _compute_diagonal(
Expand All @@ -30,12 +30,17 @@ def _compute_diagonal(
diags_stop_idx,
thread_idx,
P,
PL,
PR,
I,
IL,
IR,
ignore_trivial,
):
"""
Compute (Numba JIT-compiled) and update P, I along a single diagonal using a single
thread and avoiding race conditions
Compute (Numba JIT-compiled) and update the (top-k) matrix profile P,
PL, PR, I, IL, and IR sequentially along individual diagonals using a single
thread and avoiding race conditions.
Parameters
----------
Expand All @@ -49,12 +54,6 @@ def _compute_diagonal(
m : int
Window size
P : numpy.ndarray
Matrix profile
I : numpy.ndarray
Matrix profile indices
T_A_subseq_isfinite : numpy.ndarray
A boolean array that indicates whether a subsequence in `T_A` contains a
`np.nan`/`np.inf` value (False)
Expand All @@ -78,6 +77,24 @@ def _compute_diagonal(
thread_idx : int
The thread index
P : numpy.ndarray
The (top-k) matrix profile, sorted in ascending order per row
PL : numpy.ndarray
The top-1 left marix profile
PR : numpy.ndarray
The top-1 right marix profile
I : numpy.ndarray
The (top-k) matrix profile indices
IL : numpy.ndarray
The top-1 left matrix profile indices
IR : numpy.ndarray
The top-1 right matrix profile indices
ignore_trivial : bool
Set to `True` if this is a self-join. Otherwise, for AB-join, set this to
`False`. Default is `True`.
Expand All @@ -92,16 +109,16 @@ def _compute_diagonal(
uint64_1 = np.uint64(1)

for diag_idx in range(diags_start_idx, diags_stop_idx):
k = diags[diag_idx]
g = diags[diag_idx]

if k >= 0:
iter_range = range(0, min(n_A - m + 1, n_B - m + 1 - k))
if g >= 0:
iter_range = range(0, min(n_A - m + 1, n_B - m + 1 - g))
else:
iter_range = range(-k, min(n_A - m + 1, n_B - m + 1 - k))
iter_range = range(-g, min(n_A - m + 1, n_B - m + 1 - g))

for i in iter_range:
uint64_i = np.uint64(i)
uint64_j = np.uint64(i + k)
uint64_j = np.uint64(i + g)

if uint64_i == 0 or uint64_j == 0:
p_norm = (
Expand Down Expand Up @@ -129,36 +146,59 @@ def _compute_diagonal(

if T_A_subseq_isfinite[uint64_i] and T_B_subseq_isfinite[uint64_j]:
# Neither subsequence contains NaNs
if p_norm < P[thread_idx, uint64_i, 0]:
P[thread_idx, uint64_i, 0] = p_norm
I[thread_idx, uint64_i, 0] = uint64_j

if ignore_trivial:
if p_norm < P[thread_idx, uint64_j, 0]:
P[thread_idx, uint64_j, 0] = p_norm
I[thread_idx, uint64_j, 0] = uint64_i
# `P[thread_idx, i, :]` is sorted ascendingly and MUST be updated
# when the newly-calculated `p_norm` value becomes smaller than the
# last (i.e. greatest) element in this array. Note that the goal
# is to have top-k smallest distancs for each subsequence.
if p_norm < P[thread_idx, uint64_i, -1]:
idx = np.searchsorted(P[thread_idx, uint64_i], p_norm)
core._shift_insert_at_index(
P[thread_idx, uint64_i], idx, p_norm, shift="right"
)
core._shift_insert_at_index(
I[thread_idx, uint64_i], idx, uint64_j, shift="right"
)

if ignore_trivial: # self-joins only
if p_norm < P[thread_idx, uint64_j, -1]:
idx = np.searchsorted(P[thread_idx, uint64_j], p_norm)
core._shift_insert_at_index(
P[thread_idx, uint64_j], idx, p_norm, shift="right"
)
core._shift_insert_at_index(
I[thread_idx, uint64_j], idx, uint64_i, shift="right"
)

if uint64_i < uint64_j:
# left matrix profile and left matrix profile index
if p_norm < P[thread_idx, uint64_j, 1]:
P[thread_idx, uint64_j, 1] = p_norm
I[thread_idx, uint64_j, 1] = uint64_i
if p_norm < PL[thread_idx, uint64_j]:
PL[thread_idx, uint64_j] = p_norm
IL[thread_idx, uint64_j] = uint64_i

# right matrix profile and right matrix profile index
if p_norm < P[thread_idx, uint64_i, 2]:
P[thread_idx, uint64_i, 2] = p_norm
I[thread_idx, uint64_i, 2] = uint64_j
if p_norm < PR[thread_idx, uint64_i]:
PR[thread_idx, uint64_i] = p_norm
IR[thread_idx, uint64_i] = uint64_j

return


@njit(
# "(f8[:], f8[:], i8, b1[:], b1[:], i8[:], b1)",
# "(f8[:], f8[:], i8, b1[:], b1[:], i8[:], b1, i8)",
parallel=True,
fastmath=True,
)
def _aamp(
T_A, T_B, m, T_A_subseq_isfinite, T_B_subseq_isfinite, p, diags, ignore_trivial
T_A,
T_B,
m,
T_A_subseq_isfinite,
T_B_subseq_isfinite,
p,
diags,
ignore_trivial,
k,
):
"""
A Numba JIT-compiled version of AAMP for parallel computation of the matrix
Expand Down Expand Up @@ -194,13 +234,30 @@ def _aamp(
Set to `True` if this is a self-join. Otherwise, for AB-join, set this to
`False`. Default is `True`.
k : int
The number of top `k` smallest distances used to construct the matrix profile.
Note that this will increase the total computational time and memory usage
when k > 1.
Returns
-------
P : numpy.ndarray
Matrix profile
out1 : numpy.ndarray
The (top-k) matrix profile
I : numpy.ndarray
Matrix profile indices
out2 : numpy.ndarray
The (top-1) left matrix profile
out3 : numpy.ndarray
The (top-1) right matrix profile
out4 : numpy.ndarray
The (top-k) matrix profile indices
out5 : numpy.ndarray
The (top-1) left matrix profile indices
out6 : numpy.ndarray
The (top-1) right matrix profile indices
Notes
-----
Expand All @@ -213,8 +270,15 @@ def _aamp(
n_B = T_B.shape[0]
l = n_A - m + 1
n_threads = numba.config.NUMBA_NUM_THREADS
P = np.full((n_threads, l, 3), np.inf, dtype=np.float64)
I = np.full((n_threads, l, 3), -1, dtype=np.int64)

P = np.full((n_threads, l, k), np.inf, dtype=np.float64)
I = np.full((n_threads, l, k), -1, dtype=np.int64)

PL = np.full((n_threads, l), np.inf, dtype=np.float64)
IL = np.full((n_threads, l), -1, dtype=np.int64)

PR = np.full((n_threads, l), np.inf, dtype=np.float64)
IR = np.full((n_threads, l), -1, dtype=np.int64)

ndist_counts = core._count_diagonal_ndist(diags, m, n_A, n_B)
diags_ranges = core._get_array_ranges(ndist_counts, n_threads, False)
Expand All @@ -233,26 +297,37 @@ def _aamp(
diags_ranges[thread_idx, 1],
thread_idx,
P,
PL,
PR,
I,
IL,
IR,
ignore_trivial,
)

# Reduction of results from all threads
for thread_idx in range(1, n_threads):
for i in prange(l):
if P[0, i, 0] > P[thread_idx, i, 0]:
P[0, i, 0] = P[thread_idx, i, 0]
I[0, i, 0] = I[thread_idx, i, 0]
# left matrix profile and left matrix profile indices
if P[0, i, 1] > P[thread_idx, i, 1]:
P[0, i, 1] = P[thread_idx, i, 1]
I[0, i, 1] = I[thread_idx, i, 1]
# right matrix profile and right matrix profile indices
if P[0, i, 2] > P[thread_idx, i, 2]:
P[0, i, 2] = P[thread_idx, i, 2]
I[0, i, 2] = I[thread_idx, i, 2]

return np.power(P[0, :, :], 1.0 / p), I[0, :, :]
# update top-k arrays
core._merge_topk_PI(P[0], P[thread_idx], I[0], I[thread_idx])

# update left matrix profile and matrix profile indices
mask = PL[0] > PL[thread_idx]
PL[0][mask] = PL[thread_idx][mask]
IL[0][mask] = IL[thread_idx][mask]

# update right matrix profile and matrix profile indices
mask = PR[0] > PR[thread_idx]
PR[0][mask] = PR[thread_idx][mask]
IR[0][mask] = IR[thread_idx][mask]

return (
np.power(P[0], 1.0 / p),
np.power(PL[0], 1.0 / p),
np.power(PR[0], 1.0 / p),
I[0],
IL[0],
IR[0],
)


def aamp(T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
Expand Down Expand Up @@ -291,8 +366,16 @@ def aamp(T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
Returns
-------
out : numpy.ndarray
The first column consists of the matrix profile, the second column
consists of the matrix profile indices.
When k = 1 (default), the first column consists of the matrix profile,
the second column consists of the matrix profile indices, the third column
consists of the left matrix profile indices, and the fourth column consists
of the right matrix profile indices. However, when k > 1, the output array
will contain exactly 2 * k + 2 columns. The first k columns (i.e., out[:, :k])
consists of the top-k matrix profile, the next set of k columns
(i.e., out[:, k:2k]) consists of the corresponding top-k matrix profile
indices, and the last two columns (i.e., out[:, 2k] and out[:, 2k+1] or,
equivalently, out[:, -2] and out[:, -1]) correspond to the top-1 left
matrix profile indices and the top-1 right matrix profile indices, respectively.
Notes
-----
Expand Down Expand Up @@ -331,19 +414,26 @@ def aamp(T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
l = n_A - m + 1

excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))
out = np.empty((l, 4), dtype=object)

if ignore_trivial:
diags = np.arange(excl_zone + 1, n_A - m + 1, dtype=np.int64)
else:
diags = np.arange(-(n_A - m + 1) + 1, n_B - m + 1, dtype=np.int64)

P, I = _aamp(
T_A, T_B, m, T_A_subseq_isfinite, T_B_subseq_isfinite, p, diags, ignore_trivial
P, PL, PR, I, IL, IR = _aamp(
T_A,
T_B,
m,
T_A_subseq_isfinite,
T_B_subseq_isfinite,
p,
diags,
ignore_trivial,
k,
)

out[:, 0] = P[:, 0]
out[:, 1:] = I[:, :]
out = np.empty((l, 2 * k + 2), dtype=object)
out[:, :k] = P
out[:, k:] = np.column_stack((I, IL, IR))

core._check_P(out[:, 0])

Expand Down
44 changes: 30 additions & 14 deletions stumpy/aamped.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,16 @@ def aamped(dask_client, T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
Returns
-------
out : numpy.ndarray
The first column consists of the matrix profile, the second column
consists of the matrix profile indices.
When k = 1 (default), the first column consists of the matrix profile,
the second column consists of the matrix profile indices, the third column
consists of the left matrix profile indices, and the fourth column consists
of the right matrix profile indices. However, when k > 1, the output array
will contain exactly 2 * k + 2 columns. The first k columns (i.e., out[:, :k])
consists of the top-k matrix profile, the next set of k columns
(i.e., out[:, k:2k]) consists of the corresponding top-k matrix profile
indices, and the last two columns (i.e., out[:, 2k] and out[:, 2k+1] or,
equivalently, out[:, -2] and out[:, -1]) correspond to the top-1 left
matrix profile indices and the top-1 right matrix profile indices, respectively.
Notes
-----
Expand Down Expand Up @@ -94,12 +102,10 @@ def aamped(dask_client, T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
n_B = T_B.shape[0]
l = n_A - m + 1

excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))
out = np.empty((l, 4), dtype=object)

hosts = list(dask_client.ncores().keys())
nworkers = len(hosts)

excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))
if ignore_trivial:
diags = np.arange(excl_zone + 1, n_A - m + 1, dtype=np.int64)
else:
Expand Down Expand Up @@ -141,20 +147,30 @@ def aamped(dask_client, T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
p,
diags_futures[i],
ignore_trivial,
k,
)
)

results = dask_client.gather(futures)
profile, indices = results[0]
profile, profile_L, profile_R, indices, indices_L, indices_R = results[0]
for i in range(1, len(hosts)):
P, I = results[i]
for col in range(P.shape[1]): # pragma: no cover
cond = P[:, col] < profile[:, col]
profile[:, col] = np.where(cond, P[:, col], profile[:, col])
indices[:, col] = np.where(cond, I[:, col], indices[:, col])

out[:, 0] = profile[:, 0]
out[:, 1:4] = indices
P, PL, PR, I, IL, IR = results[i]
# Update top-k matrix profile and matrix profile indices
core._merge_topk_PI(profile, P, indices, I)

# Update top-1 left matrix profile and matrix profile index
mask = PL < profile_L
profile_L[mask] = PL[mask]
indices_L[mask] = IL[mask]

# Update top-1 right matrix profile and matrix profile index
mask = PR < profile_R
profile_R[mask] = PR[mask]
indices_R[mask] = IR[mask]

out = np.empty((l, 2 * k + 2), dtype=object)
out[:, :k] = profile
out[:, k : 2 * k + 2] = np.column_stack((indices, indices_L, indices_R))

core._check_P(out[:, 0])

Expand Down
Loading

0 comments on commit 8da71ec

Please sign in to comment.