Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion fastdtw/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
try:
from ._fastdtw import fastdtw, dtw
from .fastdtw import fastdtw_subsequence
except ImportError:
from .fastdtw import fastdtw, dtw
from .fastdtw import fastdtw, dtw, fastdtw_subsequence
48 changes: 44 additions & 4 deletions fastdtw/fastdtw.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ def fastdtw(x, y, radius=1, dist=None):
return __fastdtw(x, y, radius, dist)


def fastdtw_subsequence(x, y, radius=1, dist=None):
"""
like fastdtw but assumes that x is significantly shorter than y and performs partial alignment
"""
x, y, dist = __prep_inputs(x, y, dist)
return __fastdtw(x, y, radius, dist, subsequence=True)


def __difference(a, b):
return abs(a - b)

Expand All @@ -61,17 +69,19 @@ def __norm(p):
return lambda a, b: np.linalg.norm(a - b, p)


def __fastdtw(x, y, radius, dist):
def __fastdtw(x, y, radius, dist, subsequence=False):
min_time_size = radius + 2

if len(x) < min_time_size or len(y) < min_time_size:
return dtw(x, y, dist=dist)
return dtw(x, y, dist=dist, subsequence=subsequence)

x_shrinked = __reduce_by_half(x)
y_shrinked = __reduce_by_half(y)
distance, path = \
__fastdtw(x_shrinked, y_shrinked, radius=radius, dist=dist)
__fastdtw(x_shrinked, y_shrinked, radius=radius, dist=dist, subsequence=subsequence)
window = __expand_window(path, len(x), len(y), radius)
if subsequence:
return __dtw_subsequence(x, y, window, dist=dist)
return __dtw(x, y, window, dist=dist)


Expand All @@ -95,7 +105,7 @@ def __prep_inputs(x, y, dist):
return x, y, dist


def dtw(x, y, dist=None):
def dtw(x, y, dist=None, subsequence=False):
''' return the distance between 2 time series without approximation

Parameters
Expand Down Expand Up @@ -127,6 +137,8 @@ def dtw(x, y, dist=None):
(2.0, [(0, 0), (1, 0), (2, 1), (3, 2), (4, 2)])
'''
x, y, dist = __prep_inputs(x, y, dist)
if subsequence:
return __dtw_subsequence(x, y, None, dist)
return __dtw(x, y, None, dist)


Expand All @@ -150,6 +162,34 @@ def __dtw(x, y, window, dist):
return (D[len_x, len_y][0], path)


def __dtw_subsequence(x, y, window, dist):
len_x, len_y = len(x), len(y)
if window is None:
window = [(i, j) for i in range(len_x) for j in range(len_y)]
# window = ((i + 1, j + 1) for i, j in window)
D = defaultdict(lambda: (float('inf'),))
for j in range(-1, len_y):
D[-1, j] = (0, -1, -1)
for i, j in window:
dt = dist(x[i], y[j])
D[i, j] = min((D[i-1, j][0]+dt, i-1, j), (D[i, j-1][0]+dt, i, j-1),
(D[i-1, j-1][0]+dt, i-1, j-1), key=lambda a: a[0])
path = []

j_list = [D[len_x-1, j][0] for j in range(-1, len_y)]
i, j = len_x-1, np.argmin(j_list) - 1
end_j = j
while not (i == -1):
path.append((i, j))
t = D[i, j]
if len(t) < 3:
for p in path:
print(p)
i, j = t[1], t[2]
path.reverse()
return (D[len_x-1, end_j][0], path)


def __reduce_by_half(x):
return [(x[i] + x[1+i]) / 2 for i in range(0, len(x) - len(x) % 2, 2)]

Expand Down
15 changes: 15 additions & 0 deletions tests/test_fastdtw.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from fastdtw._fastdtw import dtw as dtw_c
from fastdtw.fastdtw import fastdtw as fastdtw_p
from fastdtw.fastdtw import dtw as dtw_p
from fastdtw.fastdtw import fastdtw_subsequence as fastdtw_subsequence_p


class FastdtwTest(unittest.TestCase):
Expand All @@ -19,6 +20,7 @@ def setUp(self):
self.y_1d = [2, 3, 4]
self.x_2d = np.array([[1, 1], [2, 2], [3, 3], [4, 4], [5, 5]])
self.y_2d = np.array([[2, 2], [3, 3], [4, 4]])
self.z_2d = np.array([[1, 1], [3, 3], [2, 2]])
self.dist_2d = lambda a, b: sum((a - b) ** 2) ** 0.5

def test_1d_fastdtw(self):
Expand All @@ -27,18 +29,30 @@ def test_1d_fastdtw(self):
self.assertEqual(distance_c, 2)
self.assertEqual(distance_c, distance_p)

def test_1d_fastdtw_subsequence(self):
distance_p = fastdtw_subsequence_p(self.y_1d, self.x_1d)[0]
self.assertEqual(distance_p, 0)

def test_1d_dtw(self):
distance_c = dtw_c(self.x_1d, self.y_1d)[0]
distance_p = dtw_p(self.x_1d, self.y_1d)[0]
self.assertEqual(distance_c, 2)
self.assertEqual(distance_c, distance_p)

def test_1d_dtw_subsequence(self):
distance_p = dtw_p(self.y_1d, self.x_1d, subsequence=True)[0]
self.assertEqual(distance_p, 0)

def test_2d_fastdtw(self):
distance_c = fastdtw_c(self.x_2d, self.y_2d, dist=self.dist_2d)[0]
distance_p = fastdtw_p(self.x_2d, self.y_2d, dist=self.dist_2d)[0]
self.assertAlmostEqual(distance_c, ((1+1)**0.5)*2)
self.assertEqual(distance_c, distance_p)

def test_2d_fastdtw_subsequence(self):
distance_p = fastdtw_subsequence_p(self.z_2d, self.x_2d, dist=self.dist_2d)[0]
self.assertAlmostEqual(distance_p, (1 + 1) ** 0.5)

def test_2d_pnorm(self):
distance_c = fastdtw_c(self.x_2d, self.y_2d, dist=2)[0]
distance_p = fastdtw_p(self.x_2d, self.y_2d, dist=2)[0]
Expand All @@ -55,5 +69,6 @@ def test_default_dist(self):
self.assertEqual(d1, d3)
self.assertEqual(d1, d4)


if __name__ == '__main__':
unittest.main()