diff --git a/src/watch.rs b/src/watch.rs index c6aa8ae2..682a1e79 100644 --- a/src/watch.rs +++ b/src/watch.rs @@ -1,35 +1,54 @@ -use std::path::{Path, PathBuf}; -use std::sync::{Arc, RwLock}; +use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result}; use futures_util::stream::StreamExt; -use notify::{RecommendedWatcher, RecursiveMode, Watcher}; +use notify::{event::ModifyKind, EventKind, FsEventWatcher, RecursiveMode, Watcher}; use notify_debouncer_full::{ - new_debouncer, DebounceEventHandler, DebounceEventResult, DebouncedEvent, Debouncer, FileIdMap, + new_debouncer, DebounceEventResult, DebouncedEvent, Debouncer, FileIdMap, }; -use tokio::runtime::Handle; -use tokio::sync::{broadcast, mpsc, Mutex}; +use tokio::sync::{broadcast, mpsc}; +use tokio::time::Instant; use tokio_stream::wrappers::BroadcastStream; use crate::build::BuildSystem; use crate::config::RtcWatch; +/// The debouncer type used in this module. +type FsDebouncer = Debouncer; + /// Blacklisted path segments which are ignored by the watcher by default. const BLACKLIST: [&str; 1] = [".git"]; +/// The duration of time to debounce FS events. +const DEBOUNCE_DURATION: Duration = Duration::from_millis(25); +/// The duration of time during which watcher events will be ignored following a build. +const WATCHER_COOLDOWN: Duration = Duration::from_secs(1); /// A watch system wrapping a build system and a watcher. pub struct WatchSystem { /// The build system. - build: Arc>, + build: BuildSystem, /// The current vector of paths to be ignored. - ignored_paths: Arc>>, + ignored_paths: Vec, + /// A channel of FS watch events. + watch_rx: mpsc::Receiver, /// A channel of new paths to ignore from the build system. build_rx: mpsc::Receiver, /// The watch system used for watching the filesystem. - _debouncer: Debouncer, + _debouncer: FsDebouncer, /// The application shutdown channel. shutdown: BroadcastStream<()>, + /// Channel that is sent on whenever a build completes. + build_done_tx: Option>, + /// An instant used to track the last build time, used to implement the watcher cooldown + /// to avoid infinite build loops. + /// + /// Ok, so why is this needed? As it turns out, `std::fs::copy` will trigger + /// `EventKind::Modify(ModifyKind::Data(_))` FS events on the file which is being copied. A + /// build cooldown period ensures that no FS events are processed until at least a duration + /// of `WATCHER_COOLDOWN` has elapsed since the last build. + last_build_finished: Instant, } impl WatchSystem { @@ -39,53 +58,31 @@ impl WatchSystem { shutdown: broadcast::Sender<()>, build_done_tx: Option>, ) -> Result { - let runtime = tokio::runtime::Handle::current(); - // Create a channel for being able to listen for new paths to ignore while running. + let (watch_tx, watch_rx) = mpsc::channel(1); let (build_tx, build_rx) = mpsc::channel(1); - // Build dependencies. - let build = Arc::new(Mutex::new( - BuildSystem::new(cfg.build.clone(), Some(build_tx)).await?, - )); - - let ignored_paths = Arc::new(RwLock::new(cfg.ignored_paths.clone())); - - let mut inner = ChangeHandler { - ignored_paths: ignored_paths.clone(), - build_done_tx, - build: build.clone(), - runtime, - }; - // Build the watcher. - let _debouncer = build_watcher( - move |events: DebounceEventResult| match events { - Ok(events) => { - inner.handle_watch_events(events); - } - Err(errs) => { - for (n, err) in errs.into_iter().enumerate() { - tracing::info!("Error while watching - {n:03}: {err}"); - } - } - }, - cfg.paths.clone(), - )?; + let _debouncer = build_watcher(watch_tx, cfg.paths.clone())?; + // Build dependencies. + let build = BuildSystem::new(cfg.build.clone(), Some(build_tx)).await?; Ok(Self { build, - _debouncer, - ignored_paths, + ignored_paths: cfg.ignored_paths.clone(), + watch_rx, build_rx, + _debouncer, shutdown: BroadcastStream::new(shutdown.subscribe()), + build_done_tx, + last_build_finished: Instant::now(), }) } /// Run a build. #[tracing::instrument(level = "trace", skip(self))] pub async fn build(&mut self) -> Result<()> { - self.build.lock().await.build().await + self.build.build().await } /// Run the watch system, responding to events and triggering builds. @@ -94,6 +91,7 @@ impl WatchSystem { loop { tokio::select! { Some(ign) = self.build_rx.recv() => self.update_ignore_list(ign), + Some(ev) = self.watch_rx.recv() => self.handle_watch_event(ev).await, _ = self.shutdown.next() => break, // Any event, even a drop, will trigger shutdown. } } @@ -101,26 +99,106 @@ impl WatchSystem { tracing::debug!("watcher system has shut down"); } - fn update_ignore_list(&self, arg_path: PathBuf) { + #[tracing::instrument(level = "trace", skip(self, event))] + async fn handle_watch_event(&mut self, event: DebouncedEvent) { + // There are various OS syscalls which can trigger FS changes, even though semantically no changes were made. + // A notorious example which has plagued the trunk watcher implementation is `std::fs::copy`, which will + // trigger watcher changes indicating that file contents have been modified. + // + // Given the difficult nature of this issue, we opt for using a cooldown period. Any changes events processed + // within the cooldown period following a build will be ignored. + if Instant::now().duration_since(self.last_build_finished) <= WATCHER_COOLDOWN { + // Purge any other events in the queue. + while let Ok(_event) = self.watch_rx.try_recv() {} + return; + } + + // Check each path in the event for a match. + match event.event.kind { + EventKind::Modify(ModifyKind::Name(_) | ModifyKind::Data(_)) + | EventKind::Create(_) + | EventKind::Remove(_) => (), + _ => return, + }; + let mut found_matching_path = false; + for ev_path in &event.paths { + let ev_path = match tokio::fs::canonicalize(&ev_path).await { + Ok(ev_path) => ev_path, + // Ignore errors here, as this would only take place for a resource which has + // been removed, which will happen for each of our dist/.stage entries. + Err(_) => continue, + }; + + // Check ignored paths. + if ev_path.ancestors().any(|path| { + self.ignored_paths + .iter() + .any(|ignored_path| ignored_path == path) + }) { + continue; // Don't emit a notification if path is ignored. + } + + // Check blacklisted paths. + if ev_path + .components() + .filter_map(|segment| segment.as_os_str().to_str()) + .any(|segment| BLACKLIST.contains(&segment)) + { + continue; // Don't emit a notification as path is on the blacklist. + } + + // If all of the above checks have passed, then we need to trigger a build. + tracing::debug!("change detected in {:?} of type {:?}", ev_path, event.kind); + found_matching_path = true; + } + + // If a build is not needed, then return. + if !found_matching_path { + return; + } + + // Else, time to trigger a build. + let _res = self.build.build().await; + self.last_build_finished = tokio::time::Instant::now(); + + // TODO/NOTE: in the future, we will want to be able to pass along error info and other + // diagnostics info over the socket for use in an error overlay or console logging. + if let Some(tx) = self.build_done_tx.as_mut() { + let _ = tx.send(()); + } + } + + fn update_ignore_list(&mut self, arg_path: PathBuf) { let path = match arg_path.canonicalize() { Ok(canon_path) => canon_path, Err(_) => arg_path, }; - let mut ignored_paths = self.ignored_paths.write().expect("Failed to acquire lock"); - if !ignored_paths.contains(&path) { - ignored_paths.push(path); + if !self.ignored_paths.contains(&path) { + self.ignored_paths.push(path); } } } -/// Build a FS watcher and debouncer, when it is dropped, it will stop watching for events. -fn build_watcher( - event_handler: H, +/// Build a FS watcher, when the watcher is dropped, it will stop watching for events. +fn build_watcher( + watch_tx: mpsc::Sender, paths: Vec, -) -> Result> { - let mut debouncer = new_debouncer(Duration::from_secs(1), None, event_handler) - .context("failed to build file system watcher")?; +) -> Result { + // Build the filesystem watcher & debouncer. + let mut debouncer = new_debouncer( + DEBOUNCE_DURATION, + None, + move |result: DebounceEventResult| match result { + Ok(events) => events.into_iter().for_each(|event| { + let _ = watch_tx.blocking_send(event); + }), + Err(errors) => errors + .into_iter() + .for_each(|err| tracing::warn!(error=?err, "error from filesystem watcher")), + }, + ) + .context("failed to build file system watcher")?; // Create a recursive watcher on each of the given paths. // NOTE WELL: it is expected that all given paths are canonical. The Trunk config @@ -138,86 +216,3 @@ fn build_watcher( Ok(debouncer) } - -/// The handler for filesystem changes. -struct ChangeHandler { - /// Runtime handle, for spawning futures. - runtime: Handle, - /// The build system. - build: Arc>, - /// The current vector of paths to be ignored. - ignored_paths: Arc>>, - /// Channel that is sent on whenever a build completes. - build_done_tx: Option>, -} - -impl ChangeHandler { - /// Test if an event is relevant to our configuration. - fn is_relevant(&self, ev_path: &Path) -> bool { - let ev_path = match std::fs::canonicalize(ev_path) { - Ok(ev_path) => ev_path, - // Ignore errors here, as this would only take place for a resource which has - // been removed, which will happen for each of our dist/.stage entries. - Err(_) => return false, - }; - - // Check ignored paths. - let ignored_paths = self.ignored_paths.read().expect("Failed to acquire lock"); - if ev_path.ancestors().any(|path| { - ignored_paths - .iter() - .any(|ignored_path| ignored_path == path) - }) { - return false; // Don't emit a notification if path is ignored. - } - - // Check blacklisted paths. - if ev_path - .components() - .filter_map(|segment| segment.as_os_str().to_str()) - .any(|segment| BLACKLIST.contains(&segment)) - { - return false; // Don't emit a notification as path is on the blacklist. - } - - tracing::info!("change detected in {:?}", ev_path); - - true - } - - /// Handle an array of [`DebouncedEvent`]s. If any of them is relevant, we run a new build, - /// and wait for it finish before returning, so that the debouncer knows we are ready for the - /// next step. - #[tracing::instrument(level = "trace", skip(self), fields(events = events.len()))] - fn handle_watch_events(&mut self, events: Vec) { - // check if we have any relevant change event - let mut none = true; - for path in events.iter().flat_map(|event| &event.paths) { - if self.is_relevant(path) { - none = false; - break; - } - } - - if none { - // none of the events was relevant - return; - } - - let (once_tx, once_rx) = tokio::sync::oneshot::channel(); - let build = self.build.clone(); - self.runtime.spawn(async move { - let mut build = build.lock().await; - let _ = once_tx.send(build.build().await); - }); - - // wait until the spawned build is ready, and retrieve its result - let _ret = once_rx.blocking_recv(); - - // TODO/NOTE: in the future, we will want to be able to pass along error info and other - // diagnostics info over the socket for use in an error overlay or console logging. - if let Some(tx) = self.build_done_tx.as_mut() { - let _ = tx.send(()); - } - } -}