From 8d655f0a96f46b84c97386355490d5f7278e89cd Mon Sep 17 00:00:00 2001 From: Anthony Dodd Date: Mon, 21 Aug 2023 19:37:09 -0500 Subject: [PATCH] Extend the recent work to solidify the FS watcher Ultimately, deeper investigation has revealed that std::fs::copy will generate modification events on the source file (the file being copied) on some platforms. This has proven to be a source of difficulty for the FS watcher, and has lead to infinite build loops, despite the fact that none of the files / directories being watched were actually changed. Here we implement a post-build cooldown interval of 1s. This ensures that in very fast builds, we do not end up receiving a modification event due to std::fs::copy (from the copy source). Moreover, any FS events which have accumulated during the 1s cooldown will be purged. Altogether this will hopefully fully address the recursive build issue, and will hopefully not disrupt the development workflow. --- src/watch.rs | 263 +++++++++++++++++++++++++-------------------------- 1 file changed, 129 insertions(+), 134 deletions(-) diff --git a/src/watch.rs b/src/watch.rs index c6aa8ae2..682a1e79 100644 --- a/src/watch.rs +++ b/src/watch.rs @@ -1,35 +1,54 @@ -use std::path::{Path, PathBuf}; -use std::sync::{Arc, RwLock}; +use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result}; use futures_util::stream::StreamExt; -use notify::{RecommendedWatcher, RecursiveMode, Watcher}; +use notify::{event::ModifyKind, EventKind, FsEventWatcher, RecursiveMode, Watcher}; use notify_debouncer_full::{ - new_debouncer, DebounceEventHandler, DebounceEventResult, DebouncedEvent, Debouncer, FileIdMap, + new_debouncer, DebounceEventResult, DebouncedEvent, Debouncer, FileIdMap, }; -use tokio::runtime::Handle; -use tokio::sync::{broadcast, mpsc, Mutex}; +use tokio::sync::{broadcast, mpsc}; +use tokio::time::Instant; use tokio_stream::wrappers::BroadcastStream; use crate::build::BuildSystem; use crate::config::RtcWatch; +/// The debouncer type used in this module. +type FsDebouncer = Debouncer; + /// Blacklisted path segments which are ignored by the watcher by default. const BLACKLIST: [&str; 1] = [".git"]; +/// The duration of time to debounce FS events. +const DEBOUNCE_DURATION: Duration = Duration::from_millis(25); +/// The duration of time during which watcher events will be ignored following a build. +const WATCHER_COOLDOWN: Duration = Duration::from_secs(1); /// A watch system wrapping a build system and a watcher. pub struct WatchSystem { /// The build system. - build: Arc>, + build: BuildSystem, /// The current vector of paths to be ignored. - ignored_paths: Arc>>, + ignored_paths: Vec, + /// A channel of FS watch events. + watch_rx: mpsc::Receiver, /// A channel of new paths to ignore from the build system. build_rx: mpsc::Receiver, /// The watch system used for watching the filesystem. - _debouncer: Debouncer, + _debouncer: FsDebouncer, /// The application shutdown channel. shutdown: BroadcastStream<()>, + /// Channel that is sent on whenever a build completes. + build_done_tx: Option>, + /// An instant used to track the last build time, used to implement the watcher cooldown + /// to avoid infinite build loops. + /// + /// Ok, so why is this needed? As it turns out, `std::fs::copy` will trigger + /// `EventKind::Modify(ModifyKind::Data(_))` FS events on the file which is being copied. A + /// build cooldown period ensures that no FS events are processed until at least a duration + /// of `WATCHER_COOLDOWN` has elapsed since the last build. + last_build_finished: Instant, } impl WatchSystem { @@ -39,53 +58,31 @@ impl WatchSystem { shutdown: broadcast::Sender<()>, build_done_tx: Option>, ) -> Result { - let runtime = tokio::runtime::Handle::current(); - // Create a channel for being able to listen for new paths to ignore while running. + let (watch_tx, watch_rx) = mpsc::channel(1); let (build_tx, build_rx) = mpsc::channel(1); - // Build dependencies. - let build = Arc::new(Mutex::new( - BuildSystem::new(cfg.build.clone(), Some(build_tx)).await?, - )); - - let ignored_paths = Arc::new(RwLock::new(cfg.ignored_paths.clone())); - - let mut inner = ChangeHandler { - ignored_paths: ignored_paths.clone(), - build_done_tx, - build: build.clone(), - runtime, - }; - // Build the watcher. - let _debouncer = build_watcher( - move |events: DebounceEventResult| match events { - Ok(events) => { - inner.handle_watch_events(events); - } - Err(errs) => { - for (n, err) in errs.into_iter().enumerate() { - tracing::info!("Error while watching - {n:03}: {err}"); - } - } - }, - cfg.paths.clone(), - )?; + let _debouncer = build_watcher(watch_tx, cfg.paths.clone())?; + // Build dependencies. + let build = BuildSystem::new(cfg.build.clone(), Some(build_tx)).await?; Ok(Self { build, - _debouncer, - ignored_paths, + ignored_paths: cfg.ignored_paths.clone(), + watch_rx, build_rx, + _debouncer, shutdown: BroadcastStream::new(shutdown.subscribe()), + build_done_tx, + last_build_finished: Instant::now(), }) } /// Run a build. #[tracing::instrument(level = "trace", skip(self))] pub async fn build(&mut self) -> Result<()> { - self.build.lock().await.build().await + self.build.build().await } /// Run the watch system, responding to events and triggering builds. @@ -94,6 +91,7 @@ impl WatchSystem { loop { tokio::select! { Some(ign) = self.build_rx.recv() => self.update_ignore_list(ign), + Some(ev) = self.watch_rx.recv() => self.handle_watch_event(ev).await, _ = self.shutdown.next() => break, // Any event, even a drop, will trigger shutdown. } } @@ -101,26 +99,106 @@ impl WatchSystem { tracing::debug!("watcher system has shut down"); } - fn update_ignore_list(&self, arg_path: PathBuf) { + #[tracing::instrument(level = "trace", skip(self, event))] + async fn handle_watch_event(&mut self, event: DebouncedEvent) { + // There are various OS syscalls which can trigger FS changes, even though semantically no changes were made. + // A notorious example which has plagued the trunk watcher implementation is `std::fs::copy`, which will + // trigger watcher changes indicating that file contents have been modified. + // + // Given the difficult nature of this issue, we opt for using a cooldown period. Any changes events processed + // within the cooldown period following a build will be ignored. + if Instant::now().duration_since(self.last_build_finished) <= WATCHER_COOLDOWN { + // Purge any other events in the queue. + while let Ok(_event) = self.watch_rx.try_recv() {} + return; + } + + // Check each path in the event for a match. + match event.event.kind { + EventKind::Modify(ModifyKind::Name(_) | ModifyKind::Data(_)) + | EventKind::Create(_) + | EventKind::Remove(_) => (), + _ => return, + }; + let mut found_matching_path = false; + for ev_path in &event.paths { + let ev_path = match tokio::fs::canonicalize(&ev_path).await { + Ok(ev_path) => ev_path, + // Ignore errors here, as this would only take place for a resource which has + // been removed, which will happen for each of our dist/.stage entries. + Err(_) => continue, + }; + + // Check ignored paths. + if ev_path.ancestors().any(|path| { + self.ignored_paths + .iter() + .any(|ignored_path| ignored_path == path) + }) { + continue; // Don't emit a notification if path is ignored. + } + + // Check blacklisted paths. + if ev_path + .components() + .filter_map(|segment| segment.as_os_str().to_str()) + .any(|segment| BLACKLIST.contains(&segment)) + { + continue; // Don't emit a notification as path is on the blacklist. + } + + // If all of the above checks have passed, then we need to trigger a build. + tracing::debug!("change detected in {:?} of type {:?}", ev_path, event.kind); + found_matching_path = true; + } + + // If a build is not needed, then return. + if !found_matching_path { + return; + } + + // Else, time to trigger a build. + let _res = self.build.build().await; + self.last_build_finished = tokio::time::Instant::now(); + + // TODO/NOTE: in the future, we will want to be able to pass along error info and other + // diagnostics info over the socket for use in an error overlay or console logging. + if let Some(tx) = self.build_done_tx.as_mut() { + let _ = tx.send(()); + } + } + + fn update_ignore_list(&mut self, arg_path: PathBuf) { let path = match arg_path.canonicalize() { Ok(canon_path) => canon_path, Err(_) => arg_path, }; - let mut ignored_paths = self.ignored_paths.write().expect("Failed to acquire lock"); - if !ignored_paths.contains(&path) { - ignored_paths.push(path); + if !self.ignored_paths.contains(&path) { + self.ignored_paths.push(path); } } } -/// Build a FS watcher and debouncer, when it is dropped, it will stop watching for events. -fn build_watcher( - event_handler: H, +/// Build a FS watcher, when the watcher is dropped, it will stop watching for events. +fn build_watcher( + watch_tx: mpsc::Sender, paths: Vec, -) -> Result> { - let mut debouncer = new_debouncer(Duration::from_secs(1), None, event_handler) - .context("failed to build file system watcher")?; +) -> Result { + // Build the filesystem watcher & debouncer. + let mut debouncer = new_debouncer( + DEBOUNCE_DURATION, + None, + move |result: DebounceEventResult| match result { + Ok(events) => events.into_iter().for_each(|event| { + let _ = watch_tx.blocking_send(event); + }), + Err(errors) => errors + .into_iter() + .for_each(|err| tracing::warn!(error=?err, "error from filesystem watcher")), + }, + ) + .context("failed to build file system watcher")?; // Create a recursive watcher on each of the given paths. // NOTE WELL: it is expected that all given paths are canonical. The Trunk config @@ -138,86 +216,3 @@ fn build_watcher( Ok(debouncer) } - -/// The handler for filesystem changes. -struct ChangeHandler { - /// Runtime handle, for spawning futures. - runtime: Handle, - /// The build system. - build: Arc>, - /// The current vector of paths to be ignored. - ignored_paths: Arc>>, - /// Channel that is sent on whenever a build completes. - build_done_tx: Option>, -} - -impl ChangeHandler { - /// Test if an event is relevant to our configuration. - fn is_relevant(&self, ev_path: &Path) -> bool { - let ev_path = match std::fs::canonicalize(ev_path) { - Ok(ev_path) => ev_path, - // Ignore errors here, as this would only take place for a resource which has - // been removed, which will happen for each of our dist/.stage entries. - Err(_) => return false, - }; - - // Check ignored paths. - let ignored_paths = self.ignored_paths.read().expect("Failed to acquire lock"); - if ev_path.ancestors().any(|path| { - ignored_paths - .iter() - .any(|ignored_path| ignored_path == path) - }) { - return false; // Don't emit a notification if path is ignored. - } - - // Check blacklisted paths. - if ev_path - .components() - .filter_map(|segment| segment.as_os_str().to_str()) - .any(|segment| BLACKLIST.contains(&segment)) - { - return false; // Don't emit a notification as path is on the blacklist. - } - - tracing::info!("change detected in {:?}", ev_path); - - true - } - - /// Handle an array of [`DebouncedEvent`]s. If any of them is relevant, we run a new build, - /// and wait for it finish before returning, so that the debouncer knows we are ready for the - /// next step. - #[tracing::instrument(level = "trace", skip(self), fields(events = events.len()))] - fn handle_watch_events(&mut self, events: Vec) { - // check if we have any relevant change event - let mut none = true; - for path in events.iter().flat_map(|event| &event.paths) { - if self.is_relevant(path) { - none = false; - break; - } - } - - if none { - // none of the events was relevant - return; - } - - let (once_tx, once_rx) = tokio::sync::oneshot::channel(); - let build = self.build.clone(); - self.runtime.spawn(async move { - let mut build = build.lock().await; - let _ = once_tx.send(build.build().await); - }); - - // wait until the spawned build is ready, and retrieve its result - let _ret = once_rx.blocking_recv(); - - // TODO/NOTE: in the future, we will want to be able to pass along error info and other - // diagnostics info over the socket for use in an error overlay or console logging. - if let Some(tx) = self.build_done_tx.as_mut() { - let _ = tx.send(()); - } - } -}