Skip to content

Commit 89b9ea1

Browse files
committed
add recursive watcher
1 parent 6499752 commit 89b9ea1

File tree

2 files changed

+300
-4
lines changed

2 files changed

+300
-4
lines changed

src/asyncinotify/__init__.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from weakref import ReferenceType
1313
from asyncio import Future
1414
import select
15+
from collections import deque
1516

1617
# Python 3.7 suggests get_running_loop for library code
1718
try:
@@ -418,7 +419,7 @@ def __init__(self,
418419
@property
419420
def sync_timeout(self) -> Optional[float]:
420421
'''The timeout for :meth:`sync_get` and synchronous iteration.
421-
422+
422423
Set this to None to disable and -1 to wait forever. These options can
423424
be different depending on the blocking flags selected.
424425
'''
@@ -649,3 +650,69 @@ def __next__(self) -> Event:
649650
if event is None:
650651
raise StopIteration
651652
return event
653+
654+
class RecursiveWatcher:
655+
"""
656+
watch a folder recursively:
657+
add a watch when a folder is created/moved in
658+
delete a watch when a folder is deleted/moved out
659+
this also works for folders moving within the watched folders because both move_from event and move_to event will be caught
660+
"""
661+
def __init__(self, path, mask) -> None:
662+
self._path = path
663+
self._mask = mask
664+
665+
def _get_directories_recursive(self, path):
666+
"""
667+
DFS to iterate all paths within
668+
"""
669+
if not path.is_dir():
670+
return
671+
672+
stack = deque()
673+
stack.append(path)
674+
while stack:
675+
curr_path = stack.pop()
676+
yield curr_path
677+
for subpath in curr_path.iterdir():
678+
if subpath.is_dir():
679+
stack.append(subpath)
680+
681+
async def watch_recursive(self, inotify=None):
682+
create_inotify = inotify is None
683+
if create_inotify:
684+
inotify = Inotify()
685+
686+
try:
687+
mask = self._mask | Mask.MOVED_FROM | Mask.MOVED_TO | Mask.CREATE | Mask.IGNORED
688+
for directory in self._get_directories_recursive(self._path):
689+
inotify.add_watch(directory, mask)
690+
691+
# Things that can throw this off:
692+
#
693+
# * Doing two changes on a directory or something before the program
694+
# has a time to handle it (this will also throw off a lot of inotify
695+
# code, though)
696+
#
697+
# * Trying to watch a path that doesn't exist won't automatically
698+
# create it or anything of the sort.
699+
700+
async for event in inotify:
701+
if Mask.ISDIR in event.mask and event.path is not None:
702+
if Mask.CREATE in event.mask or Mask.MOVED_TO in event.mask:
703+
# created new folder or folder moved in, add watches
704+
for directory in self._get_directories_recursive(event.path):
705+
inotify.add_watch(directory, mask)
706+
if Mask.MOVED_FROM in event.mask:
707+
# a folder is moved to another location, remove watch for this folder and subfolders
708+
watches = [watch for watch in inotify._watches.values() if watch.path.is_relative_to(event.path)]
709+
for watch in watches:
710+
inotify.rm_watch(watch)
711+
712+
# DELETE event is not watched/handled here because IGNORED event follows deletion, and handled in asyncinotify
713+
714+
if event.mask & self._mask:
715+
yield event
716+
finally:
717+
if create_inotify:
718+
inotify.close()

test.py

Lines changed: 232 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44
# This code is released under the license described in the LICENSE file
55

66
import sys
7+
import os
8+
import shutil
79

810
import unittest
11+
from unittest import mock
912
from pathlib import Path
1013
from tempfile import TemporaryDirectory
11-
from asyncinotify import Event, Inotify, Mask
14+
from asyncinotify import Event, Inotify, Mask, RecursiveWatcher
1215

1316
if sys.version_info >= (3, 9):
1417
from collections.abc import Sequence
@@ -437,8 +440,234 @@ async def loop(n):
437440
def test_events(self):
438441
run(self._actual_test())
439442

440-
443+
class TestRecursiveWatcher(unittest.TestCase):
444+
445+
@staticmethod
446+
def _mock_iterdir(path):
447+
name = path.name
448+
if name == "tmp":
449+
return [Path.joinpath(path, "level1.1"), Path.joinpath(path, "level1.2")]
450+
if name == "level1.1":
451+
return [Path.joinpath(path, "level2.1"), Path.joinpath(path, "level2.2")]
452+
if name == "level2.1":
453+
return [Path.joinpath(path, "level3.1")]
454+
if name == "level3.1":
455+
return [Path.joinpath(path, "level4.1")]
456+
# others
457+
return []
458+
459+
@mock.patch.object(Path, "iterdir", autospec=True, side_effect=_mock_iterdir)
460+
@mock.patch.object(Path, "is_dir", return_result=True)
461+
def test_get_directories_recursive(self, mocked_isdir, mocked_iterdir):
462+
"""
463+
create folder tree as:
464+
level1.1
465+
-level2.1
466+
-level3.1
467+
-level4.1
468+
-level2.2
469+
level1.2
470+
"""
471+
472+
tmpdirname = "/tmp"
473+
watcher = RecursiveWatcher(None, None)
474+
paths = [path for path in watcher._get_directories_recursive(Path(tmpdirname))]
475+
self.assertEqual(len(paths), 7)
476+
self.assertEqual(str(paths[0]), tmpdirname)
477+
self.assertEqual(str(paths[1]), os.path.join(tmpdirname, "level1.2"))
478+
self.assertEqual(str(paths[2]), os.path.join(tmpdirname, "level1.1"))
479+
self.assertEqual(str(paths[3]), os.path.join(tmpdirname, "level1.1", "level2.2"))
480+
self.assertEqual(str(paths[4]), os.path.join(tmpdirname, "level1.1", "level2.1"))
481+
self.assertEqual(str(paths[5]), os.path.join(tmpdirname, "level1.1", "level2.1", "level3.1"))
482+
self.assertEqual(str(paths[6]), os.path.join(tmpdirname, "level1.1", "level2.1", "level3.1", "level4.1"))
483+
484+
485+
def _assert_paths_watched(self, watchers, path_set):
486+
watched_path_set = {str(watch.path) for watch in watchers.values()}
487+
self.assertSetEqual(watched_path_set, path_set)
488+
489+
def _create_file(self, file_path):
490+
with open(str(file_path), "w") as f:
491+
f.write(file_path)
492+
493+
async def _read_events(self, inotify, folder, events):
494+
watcher = RecursiveWatcher(Path(folder), Mask.CLOSE_WRITE)
495+
async for event in watcher.watch_recursive(inotify):
496+
# events/watchers are ephemeral, copy data we want
497+
events.append((
498+
event.path,
499+
event.mask,
500+
))
501+
502+
async def _watch_recursive(self):
503+
"""
504+
test the cases of folder changes:
505+
1. create folder
506+
2. create cascading folders
507+
3. move folder in from un-monitored folder
508+
4. move folders out to un-monitored folder
509+
5. move folder within monitored folders
510+
6. delete folders
511+
"""
512+
with TemporaryDirectory() as tmpdirbasename:
513+
events = []
514+
515+
tmpdirname = os.path.join(tmpdirbasename, "test")
516+
os.makedirs(tmpdirname)
517+
existing_dir = os.path.join(tmpdirname, "existing_dir")
518+
os.makedirs(existing_dir)
519+
outside_dir = os.path.join(tmpdirbasename, "outside")
520+
os.makedirs(outside_dir)
521+
522+
with Inotify() as inotify:
523+
watch_task = asyncio.create_task(self._read_events(inotify, tmpdirname, events))
524+
await asyncio.sleep(0.3)
525+
526+
# existing 2 folders are watched
527+
self._assert_paths_watched(inotify._watches, {
528+
tmpdirname,
529+
existing_dir,
530+
})
531+
532+
# create file, event
533+
file_path = os.path.join(tmpdirname, "f1.txt")
534+
self._create_file(file_path)
535+
await asyncio.sleep(0.3)
536+
537+
# still 2 folders watched
538+
self._assert_paths_watched(inotify._watches, {
539+
tmpdirname,
540+
existing_dir,
541+
})
542+
543+
# create folder and a file inside, no event because of racing
544+
folder_path = os.path.join(tmpdirname, "d1")
545+
os.makedirs(folder_path)
546+
file_path = os.path.join(folder_path, "f2.txt")
547+
self._create_file(file_path)
548+
await asyncio.sleep(0.3)
549+
550+
# one more folder watched
551+
self._assert_paths_watched(inotify._watches, {
552+
tmpdirname,
553+
existing_dir,
554+
os.path.join(tmpdirname, "d1"),
555+
})
556+
557+
# create cascade folders
558+
folder_path = os.path.join(tmpdirname, "d2", "dd1", "ddd1")
559+
os.makedirs(folder_path)
560+
await asyncio.sleep(0.3)
561+
562+
# 3 more folders watched
563+
self._assert_paths_watched(inotify._watches, {
564+
tmpdirname,
565+
existing_dir,
566+
os.path.join(tmpdirname, "d1"),
567+
os.path.join(tmpdirname, "d2"),
568+
os.path.join(tmpdirname, "d2", "dd1"),
569+
os.path.join(tmpdirname, "d2", "dd1", "ddd1"),
570+
})
571+
572+
# move in folder from outside
573+
move_folder_path = os.path.join(tmpdirname, "d1", "outside")
574+
os.rename(outside_dir, move_folder_path)
575+
await asyncio.sleep(0.3)
576+
577+
# one more folder watched
578+
self._assert_paths_watched(inotify._watches, {
579+
tmpdirname,
580+
existing_dir,
581+
os.path.join(tmpdirname, "d1"),
582+
os.path.join(tmpdirname, "d2"),
583+
os.path.join(tmpdirname, "d2", "dd1"),
584+
os.path.join(tmpdirname, "d2", "dd1", "ddd1"),
585+
os.path.join(tmpdirname, "d1", "outside"),
586+
})
587+
588+
# create file in watched outside folder, event
589+
file_path = os.path.join(tmpdirname, "d1", "outside", "f3.txt")
590+
self._create_file(file_path)
591+
await asyncio.sleep(0.3)
592+
593+
# move out folder
594+
folder_path = os.path.join(tmpdirname, "d2", "dd1")
595+
move_folder_path = os.path.join(tmpdirbasename, "dd1")
596+
os.rename(folder_path, move_folder_path)
597+
await asyncio.sleep(0.3)
598+
599+
# 2 folders not watched
600+
self._assert_paths_watched(inotify._watches, {
601+
tmpdirname,
602+
existing_dir,
603+
os.path.join(tmpdirname, "d1"),
604+
os.path.join(tmpdirname, "d2"),
605+
os.path.join(tmpdirname, "d1", "outside"),
606+
})
607+
608+
# create file in not watched folder, no event
609+
file_path = os.path.join(tmpdirbasename, "dd1", "ddd1", "f4.txt")
610+
self._create_file(file_path)
611+
await asyncio.sleep(0.3)
612+
613+
# move folder within
614+
folder_path = os.path.join(tmpdirname, "existing_dir")
615+
move_folder_path = os.path.join(tmpdirname, "d1", "existing_dir")
616+
os.rename(folder_path, move_folder_path)
617+
await asyncio.sleep(0.3)
618+
619+
# folders change
620+
self._assert_paths_watched(inotify._watches, {
621+
tmpdirname,
622+
os.path.join(tmpdirname, "d1"),
623+
os.path.join(tmpdirname, "d2"),
624+
os.path.join(tmpdirname, "d1", "outside"),
625+
os.path.join(tmpdirname, "d1", "existing_dir")
626+
})
627+
628+
# create file in moved folder, event
629+
file_path = os.path.join(tmpdirname, "d1", "existing_dir", "f5.txt")
630+
self._create_file(file_path)
631+
await asyncio.sleep(0.3)
632+
633+
# delete folder
634+
folder_path = os.path.join(tmpdirname, "d2")
635+
os.removedirs(folder_path)
636+
await asyncio.sleep(0.3)
637+
638+
# one less folder watched
639+
self._assert_paths_watched(inotify._watches, {
640+
tmpdirname,
641+
os.path.join(tmpdirname, "d1"),
642+
os.path.join(tmpdirname, "d1", "outside"),
643+
os.path.join(tmpdirname, "d1", "existing_dir")
644+
})
645+
646+
# delete folders
647+
shutil.rmtree(os.path.join(tmpdirname, "d1"))
648+
await asyncio.sleep(0.3)
649+
650+
# less folders watched
651+
self._assert_paths_watched(inotify._watches, {
652+
tmpdirname,
653+
})
654+
655+
watch_task.cancel()
656+
await asyncio.gather(watch_task, return_exceptions=True)
657+
658+
# verify events
659+
self.assertEqual(len(events), 3)
660+
self.assertEqual(str(events[0][0]), os.path.join(tmpdirname, "f1.txt"))
661+
self.assertTrue(events[0][1] & Mask.CLOSE_WRITE)
662+
663+
self.assertEqual(str(events[1][0]), os.path.join(tmpdirname, "d1", "outside", "f3.txt"))
664+
self.assertTrue(events[1][1] & Mask.CLOSE_WRITE)
665+
666+
self.assertEqual(str(events[2][0]), os.path.join(tmpdirname, "d1", "existing_dir", "f5.txt"))
667+
self.assertTrue(events[2][1] & Mask.CLOSE_WRITE)
668+
669+
def test_watch_recursive(self):
670+
run(self._watch_recursive())
441671

442672
if __name__ == '__main__':
443673
unittest.main()
444-

0 commit comments

Comments
 (0)