diff --git a/tracing-appender/src/lib.rs b/tracing-appender/src/lib.rs index e5327da2e..717c24810 100644 --- a/tracing-appender/src/lib.rs +++ b/tracing-appender/src/lib.rs @@ -195,6 +195,14 @@ pub fn non_blocking(writer: T) -> (NonBlocking, Worke NonBlocking::new(writer) } +/// Convenience function for creating a non-blocking, off-thread writer with setup. +pub fn non_blocking_with_setup( + writer: T, + setup: impl FnOnce() + Send + 'static, +) -> (NonBlocking, WorkerGuard) { + NonBlocking::new_with_setup(writer, setup) +} + #[derive(Debug)] pub(crate) enum Msg { Line(Vec), diff --git a/tracing-appender/src/non_blocking.rs b/tracing-appender/src/non_blocking.rs index 50d204a4b..cefdbf4fa 100644 --- a/tracing-appender/src/non_blocking.rs +++ b/tracing-appender/src/non_blocking.rs @@ -146,6 +146,14 @@ impl NonBlocking { NonBlockingBuilder::default().finish(writer) } + /// Returns a new `NonBlocking` writer wrapping the provided `writer` with the provided `setup` function. + pub fn new_with_setup( + writer: T, + setup: impl FnOnce() + Send + 'static, + ) -> (NonBlocking, WorkerGuard) { + NonBlockingBuilder::default().finish_with_setup(writer, setup) + } + fn create( writer: T, buffered_lines_limit: usize, @@ -173,6 +181,34 @@ impl NonBlocking { ) } + fn create_with_setup( + writer: T, + setup: impl FnOnce() + Send + 'static, + buffered_lines_limit: usize, + is_lossy: bool, + thread_name: String, + ) -> (NonBlocking, WorkerGuard) { + let (sender, receiver) = bounded(buffered_lines_limit); + + let (shutdown_sender, shutdown_receiver) = bounded(0); + + let worker = Worker::new(receiver, writer, shutdown_receiver); + let worker_guard = WorkerGuard::new( + worker.worker_thread_with_setup(thread_name, setup), + sender.clone(), + shutdown_sender, + ); + + ( + Self { + channel: sender, + error_counter: ErrorCounter(Arc::new(AtomicUsize::new(0))), + is_lossy, + }, + worker_guard, + ) + } + /// Returns a counter for the number of times logs where dropped. This will always return zero if /// `NonBlocking` is not lossy. pub fn error_counter(&self) -> ErrorCounter { @@ -223,6 +259,20 @@ impl NonBlockingBuilder { self.thread_name, ) } + + pub fn finish_with_setup( + self, + writer: T, + setup: impl FnOnce() + Send + 'static, + ) -> (NonBlocking, WorkerGuard) { + NonBlocking::create_with_setup( + writer, + setup, + self.buffered_lines_limit, + self.is_lossy, + self.thread_name, + ) + } } impl Default for NonBlockingBuilder { diff --git a/tracing-appender/src/worker.rs b/tracing-appender/src/worker.rs index 00a7d38d3..3839f42cd 100644 --- a/tracing-appender/src/worker.rs +++ b/tracing-appender/src/worker.rs @@ -89,4 +89,33 @@ impl Worker { }) .expect("failed to spawn `tracing-appender` non-blocking worker thread") } + + /// Creates a worker thread that processes a channel until it's disconnected + pub(crate) fn worker_thread_with_setup( + mut self, + name: String, + setup: impl FnOnce() + Send + 'static, + ) -> std::thread::JoinHandle<()> { + thread::Builder::new() + .name(name) + .spawn(move || { + setup(); + loop { + match self.work() { + Ok(WorkerState::Continue) | Ok(WorkerState::Empty) => {} + Ok(WorkerState::Shutdown) | Ok(WorkerState::Disconnected) => { + let _ = self.shutdown.recv(); + break; + } + Err(_) => { + // TODO: Expose a metric for IO Errors, or print to stderr + } + } + } + if let Err(e) = self.writer.flush() { + eprintln!("Failed to flush. Error: {}", e); + } + }) + .expect("failed to spawn `tracing-appender` non-blocking worker thread") + } }