Skip to content

Commit 666e51d

Browse files
Merge pull request #8 from zhuowang10/master
Add recursive watcher
2 parents 6499752 + 59165ae commit 666e51d

File tree

2 files changed

+336
-4
lines changed

2 files changed

+336
-4
lines changed

src/asyncinotify/__init__.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from weakref import ReferenceType
1313
from asyncio import Future
1414
import select
15+
from collections import deque
1516

1617
# Python 3.7 suggests get_running_loop for library code
1718
try:
@@ -418,7 +419,7 @@ def __init__(self,
418419
@property
419420
def sync_timeout(self) -> Optional[float]:
420421
'''The timeout for :meth:`sync_get` and synchronous iteration.
421-
422+
422423
Set this to None to disable and -1 to wait forever. These options can
423424
be different depending on the blocking flags selected.
424425
'''
@@ -649,3 +650,69 @@ def __next__(self) -> Event:
649650
if event is None:
650651
raise StopIteration
651652
return event
653+
654+
class RecursiveWatcher:
655+
"""
656+
watch a folder recursively:
657+
add a watch when a folder is created/moved in
658+
delete a watch when a folder is deleted/moved out
659+
this also works for folders moving within the watched folders because both move_from event and move_to event will be caught
660+
"""
661+
def __init__(self, path, mask) -> None:
662+
self._path = path
663+
self._mask = mask
664+
665+
def _get_directories_recursive(self, path):
666+
"""
667+
DFS to iterate all paths within
668+
"""
669+
if not path.is_dir():
670+
return
671+
672+
stack = deque()
673+
stack.append(path)
674+
while stack:
675+
curr_path = stack.pop()
676+
yield curr_path
677+
for subpath in curr_path.iterdir():
678+
if subpath.is_dir():
679+
stack.append(subpath)
680+
681+
async def watch_recursive(self, inotify=None):
682+
create_inotify = inotify is None
683+
if create_inotify:
684+
inotify = Inotify()
685+
686+
try:
687+
mask = self._mask | Mask.MOVED_FROM | Mask.MOVED_TO | Mask.CREATE | Mask.IGNORED
688+
for directory in self._get_directories_recursive(self._path):
689+
inotify.add_watch(directory, mask)
690+
691+
# Things that can throw this off:
692+
#
693+
# * Doing two changes on a directory or something before the program
694+
# has a time to handle it (this will also throw off a lot of inotify
695+
# code, though)
696+
#
697+
# * Trying to watch a path that doesn't exist won't automatically
698+
# create it or anything of the sort.
699+
700+
async for event in inotify:
701+
if Mask.ISDIR in event.mask and event.path is not None:
702+
if Mask.CREATE in event.mask or Mask.MOVED_TO in event.mask:
703+
# created new folder or folder moved in, add watches
704+
for directory in self._get_directories_recursive(event.path):
705+
inotify.add_watch(directory, mask)
706+
if Mask.MOVED_FROM in event.mask:
707+
# a folder is moved to another location, remove watch for this folder and subfolders
708+
watches = [watch for watch in inotify._watches.values() if watch.path.is_relative_to(event.path)]
709+
for watch in watches:
710+
inotify.rm_watch(watch)
711+
712+
# DELETE event is not watched/handled here because IGNORED event follows deletion, and handled in asyncinotify
713+
714+
if event.mask & self._mask:
715+
yield event
716+
finally:
717+
if create_inotify:
718+
inotify.close()

test.py

Lines changed: 268 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44
# This code is released under the license described in the LICENSE file
55

66
import sys
7+
import os
8+
import shutil
79

810
import unittest
11+
from unittest import mock
912
from pathlib import Path
1013
from tempfile import TemporaryDirectory
11-
from asyncinotify import Event, Inotify, Mask
14+
from asyncinotify import Event, Inotify, Mask, RecursiveWatcher
1215

1316
if sys.version_info >= (3, 9):
1417
from collections.abc import Sequence
@@ -437,8 +440,270 @@ async def loop(n):
437440
def test_events(self):
438441
run(self._actual_test())
439442

440-
443+
class TestRecursiveWatcher(unittest.TestCase):
444+
445+
@staticmethod
446+
def _mock_iterdir(path):
447+
name = path.name
448+
if name == "tmp":
449+
return [Path.joinpath(path, "level1.1"), Path.joinpath(path, "level1.2")]
450+
if name == "level1.1":
451+
return [Path.joinpath(path, "level2.1"), Path.joinpath(path, "level2.2")]
452+
if name == "level2.1":
453+
return [Path.joinpath(path, "level3.1")]
454+
if name == "level3.1":
455+
return [Path.joinpath(path, "level4.1")]
456+
# others
457+
return []
458+
459+
@mock.patch.object(Path, "iterdir", autospec=True, side_effect=_mock_iterdir)
460+
@mock.patch.object(Path, "is_dir", return_result=True)
461+
def test_get_directories_recursive(self, mocked_isdir, mocked_iterdir):
462+
"""
463+
create folder tree as:
464+
level1.1
465+
-level2.1
466+
-level3.1
467+
-level4.1
468+
-level2.2
469+
level1.2
470+
"""
471+
472+
tmpdirname = "/tmp"
473+
watcher = RecursiveWatcher(None, None)
474+
paths = [path for path in watcher._get_directories_recursive(Path(tmpdirname))]
475+
self.assertEqual(len(paths), 7)
476+
self.assertEqual(str(paths[0]), tmpdirname)
477+
self.assertEqual(str(paths[1]), os.path.join(tmpdirname, "level1.2"))
478+
self.assertEqual(str(paths[2]), os.path.join(tmpdirname, "level1.1"))
479+
self.assertEqual(str(paths[3]), os.path.join(tmpdirname, "level1.1", "level2.2"))
480+
self.assertEqual(str(paths[4]), os.path.join(tmpdirname, "level1.1", "level2.1"))
481+
self.assertEqual(str(paths[5]), os.path.join(tmpdirname, "level1.1", "level2.1", "level3.1"))
482+
self.assertEqual(str(paths[6]), os.path.join(tmpdirname, "level1.1", "level2.1", "level3.1", "level4.1"))
483+
484+
def _assert_paths_watched(self, watchers, path_set):
485+
watched_path_set = {str(watch.path) for watch in watchers.values()}
486+
self.assertSetEqual(watched_path_set, path_set)
487+
488+
class _FakeWatcher:
489+
def __init__(self, path) -> None:
490+
self.path = path
491+
492+
def test_assert_paths_watched(self):
493+
# both empty
494+
self._assert_paths_watched({}, set())
495+
496+
# watchers empty
497+
with self.assertRaises(AssertionError):
498+
self._assert_paths_watched({}, {"/tmp/path1"})
499+
500+
# path set empty
501+
with self.assertRaises(AssertionError):
502+
self._assert_paths_watched({
503+
"fd1": self._FakeWatcher(Path("/tmp/path1")),
504+
"fd2": self._FakeWatcher(Path("/tmp/path2")),
505+
}, set())
506+
507+
# identical sets
508+
self._assert_paths_watched({
509+
"fd1": self._FakeWatcher(Path("/tmp/path1")),
510+
"fd2": self._FakeWatcher(Path("/tmp/path2")),
511+
}, {
512+
"/tmp/path2",
513+
"/tmp/path1"
514+
})
515+
516+
# diff sets
517+
with self.assertRaises(AssertionError):
518+
self._assert_paths_watched({
519+
"fd1": self._FakeWatcher(Path("/tmp/path1")),
520+
}, {
521+
"/tmp/path2",
522+
"/tmp/path1"
523+
})
524+
525+
def _create_file(self, file_path):
526+
with open(str(file_path), "w") as f:
527+
f.write(file_path)
528+
529+
async def _read_events(self, inotify, folder, events):
530+
watcher = RecursiveWatcher(Path(folder), Mask.CLOSE_WRITE)
531+
async for event in watcher.watch_recursive(inotify):
532+
# events/watchers are ephemeral, copy data we want
533+
events.append((
534+
event.path,
535+
event.mask,
536+
))
537+
538+
async def _watch_recursive(self):
539+
"""
540+
test the cases of folder changes:
541+
1. create folder
542+
2. create cascading folders
543+
3. move folder in from un-monitored folder
544+
4. move folders out to un-monitored folder
545+
5. move folder within monitored folders
546+
6. delete folders
547+
"""
548+
with TemporaryDirectory() as tmpdirbasename:
549+
events = []
550+
551+
tmpdirname = os.path.join(tmpdirbasename, "test")
552+
os.makedirs(tmpdirname)
553+
existing_dir = os.path.join(tmpdirname, "existing_dir")
554+
os.makedirs(existing_dir)
555+
outside_dir = os.path.join(tmpdirbasename, "outside")
556+
os.makedirs(outside_dir)
557+
558+
with Inotify() as inotify:
559+
watch_task = asyncio.create_task(self._read_events(inotify, tmpdirname, events))
560+
await asyncio.sleep(0.3)
561+
562+
# existing 2 folders are watched
563+
self._assert_paths_watched(inotify._watches, {
564+
tmpdirname,
565+
existing_dir,
566+
})
567+
568+
# create file, event
569+
file_path = os.path.join(tmpdirname, "f1.txt")
570+
self._create_file(file_path)
571+
await asyncio.sleep(0.3)
572+
573+
# still 2 folders watched
574+
self._assert_paths_watched(inotify._watches, {
575+
tmpdirname,
576+
existing_dir,
577+
})
578+
579+
# create folder and a file inside, no event because of racing
580+
folder_path = os.path.join(tmpdirname, "d1")
581+
os.makedirs(folder_path)
582+
file_path = os.path.join(folder_path, "f2.txt")
583+
self._create_file(file_path)
584+
await asyncio.sleep(0.3)
585+
586+
# one more folder watched
587+
self._assert_paths_watched(inotify._watches, {
588+
tmpdirname,
589+
existing_dir,
590+
os.path.join(tmpdirname, "d1"),
591+
})
592+
593+
# create cascade folders
594+
folder_path = os.path.join(tmpdirname, "d2", "dd1", "ddd1")
595+
os.makedirs(folder_path)
596+
await asyncio.sleep(0.3)
597+
598+
# 3 more folders watched
599+
self._assert_paths_watched(inotify._watches, {
600+
tmpdirname,
601+
existing_dir,
602+
os.path.join(tmpdirname, "d1"),
603+
os.path.join(tmpdirname, "d2"),
604+
os.path.join(tmpdirname, "d2", "dd1"),
605+
os.path.join(tmpdirname, "d2", "dd1", "ddd1"),
606+
})
607+
608+
# move in folder from outside
609+
move_folder_path = os.path.join(tmpdirname, "d1", "outside")
610+
os.rename(outside_dir, move_folder_path)
611+
await asyncio.sleep(0.3)
612+
613+
# one more folder watched
614+
self._assert_paths_watched(inotify._watches, {
615+
tmpdirname,
616+
existing_dir,
617+
os.path.join(tmpdirname, "d1"),
618+
os.path.join(tmpdirname, "d2"),
619+
os.path.join(tmpdirname, "d2", "dd1"),
620+
os.path.join(tmpdirname, "d2", "dd1", "ddd1"),
621+
os.path.join(tmpdirname, "d1", "outside"),
622+
})
623+
624+
# create file in watched outside folder, event
625+
file_path = os.path.join(tmpdirname, "d1", "outside", "f3.txt")
626+
self._create_file(file_path)
627+
await asyncio.sleep(0.3)
628+
629+
# move out folder
630+
folder_path = os.path.join(tmpdirname, "d2", "dd1")
631+
move_folder_path = os.path.join(tmpdirbasename, "dd1")
632+
os.rename(folder_path, move_folder_path)
633+
await asyncio.sleep(0.3)
634+
635+
# 2 folders not watched
636+
self._assert_paths_watched(inotify._watches, {
637+
tmpdirname,
638+
existing_dir,
639+
os.path.join(tmpdirname, "d1"),
640+
os.path.join(tmpdirname, "d2"),
641+
os.path.join(tmpdirname, "d1", "outside"),
642+
})
643+
644+
# create file in not watched folder, no event
645+
file_path = os.path.join(tmpdirbasename, "dd1", "ddd1", "f4.txt")
646+
self._create_file(file_path)
647+
await asyncio.sleep(0.3)
648+
649+
# move folder within
650+
folder_path = os.path.join(tmpdirname, "existing_dir")
651+
move_folder_path = os.path.join(tmpdirname, "d1", "existing_dir")
652+
os.rename(folder_path, move_folder_path)
653+
await asyncio.sleep(0.3)
654+
655+
# folders change
656+
self._assert_paths_watched(inotify._watches, {
657+
tmpdirname,
658+
os.path.join(tmpdirname, "d1"),
659+
os.path.join(tmpdirname, "d2"),
660+
os.path.join(tmpdirname, "d1", "outside"),
661+
os.path.join(tmpdirname, "d1", "existing_dir")
662+
})
663+
664+
# create file in moved folder, event
665+
file_path = os.path.join(tmpdirname, "d1", "existing_dir", "f5.txt")
666+
self._create_file(file_path)
667+
await asyncio.sleep(0.3)
668+
669+
# delete folder
670+
folder_path = os.path.join(tmpdirname, "d2")
671+
os.removedirs(folder_path)
672+
await asyncio.sleep(0.3)
673+
674+
# one less folder watched
675+
self._assert_paths_watched(inotify._watches, {
676+
tmpdirname,
677+
os.path.join(tmpdirname, "d1"),
678+
os.path.join(tmpdirname, "d1", "outside"),
679+
os.path.join(tmpdirname, "d1", "existing_dir")
680+
})
681+
682+
# delete folders
683+
shutil.rmtree(os.path.join(tmpdirname, "d1"))
684+
await asyncio.sleep(0.3)
685+
686+
# less folders watched
687+
self._assert_paths_watched(inotify._watches, {
688+
tmpdirname,
689+
})
690+
691+
watch_task.cancel()
692+
await asyncio.gather(watch_task, return_exceptions=True)
693+
694+
# verify events
695+
self.assertEqual(len(events), 3)
696+
self.assertEqual(str(events[0][0]), os.path.join(tmpdirname, "f1.txt"))
697+
self.assertTrue(events[0][1] & Mask.CLOSE_WRITE)
698+
699+
self.assertEqual(str(events[1][0]), os.path.join(tmpdirname, "d1", "outside", "f3.txt"))
700+
self.assertTrue(events[1][1] & Mask.CLOSE_WRITE)
701+
702+
self.assertEqual(str(events[2][0]), os.path.join(tmpdirname, "d1", "existing_dir", "f5.txt"))
703+
self.assertTrue(events[2][1] & Mask.CLOSE_WRITE)
704+
705+
def test_watch_recursive(self):
706+
run(self._watch_recursive())
441707

442708
if __name__ == '__main__':
443709
unittest.main()
444-

0 commit comments

Comments
 (0)