Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions tracing-appender/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ pub fn non_blocking<T: Write + Send + 'static>(writer: T) -> (NonBlocking, Worke
NonBlocking::new(writer)
}

/// Convenience function for creating a non-blocking, off-thread writer with setup.
pub fn non_blocking_with_setup<T: Write + Send + 'static>(
writer: T,
setup: impl FnOnce() + Send + 'static,
) -> (NonBlocking, WorkerGuard) {
NonBlocking::new_with_setup(writer, setup)
}

#[derive(Debug)]
pub(crate) enum Msg {
Line(Vec<u8>),
Expand Down
50 changes: 50 additions & 0 deletions tracing-appender/src/non_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ impl NonBlocking {
NonBlockingBuilder::default().finish(writer)
}

/// Returns a new `NonBlocking` writer wrapping the provided `writer` with the provided `setup` function.
pub fn new_with_setup<T: Write + Send + 'static>(
writer: T,
setup: impl FnOnce() + Send + 'static,
) -> (NonBlocking, WorkerGuard) {
NonBlockingBuilder::default().finish_with_setup(writer, setup)
}

fn create<T: Write + Send + 'static>(
writer: T,
buffered_lines_limit: usize,
Expand Down Expand Up @@ -173,6 +181,34 @@ impl NonBlocking {
)
}

fn create_with_setup<T: Write + Send + 'static>(
writer: T,
setup: impl FnOnce() + Send + 'static,
buffered_lines_limit: usize,
is_lossy: bool,
thread_name: String,
) -> (NonBlocking, WorkerGuard) {
let (sender, receiver) = bounded(buffered_lines_limit);

let (shutdown_sender, shutdown_receiver) = bounded(0);

let worker = Worker::new(receiver, writer, shutdown_receiver);
let worker_guard = WorkerGuard::new(
worker.worker_thread_with_setup(thread_name, setup),
sender.clone(),
shutdown_sender,
);

(
Self {
channel: sender,
error_counter: ErrorCounter(Arc::new(AtomicUsize::new(0))),
is_lossy,
},
worker_guard,
)
}

/// Returns a counter for the number of times logs where dropped. This will always return zero if
/// `NonBlocking` is not lossy.
pub fn error_counter(&self) -> ErrorCounter {
Expand Down Expand Up @@ -223,6 +259,20 @@ impl NonBlockingBuilder {
self.thread_name,
)
}

pub fn finish_with_setup<T: Write + Send + 'static>(
self,
writer: T,
setup: impl FnOnce() + Send + 'static,
) -> (NonBlocking, WorkerGuard) {
NonBlocking::create_with_setup(
writer,
setup,
self.buffered_lines_limit,
self.is_lossy,
self.thread_name,
)
}
}

impl Default for NonBlockingBuilder {
Expand Down
29 changes: 29 additions & 0 deletions tracing-appender/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,33 @@ impl<T: Write + Send + 'static> Worker<T> {
})
.expect("failed to spawn `tracing-appender` non-blocking worker thread")
}

/// Creates a worker thread that processes a channel until it's disconnected
pub(crate) fn worker_thread_with_setup(
mut self,
name: String,
setup: impl FnOnce() + Send + 'static,
) -> std::thread::JoinHandle<()> {
thread::Builder::new()
.name(name)
.spawn(move || {
setup();
loop {
match self.work() {
Ok(WorkerState::Continue) | Ok(WorkerState::Empty) => {}
Ok(WorkerState::Shutdown) | Ok(WorkerState::Disconnected) => {
let _ = self.shutdown.recv();
break;
}
Err(_) => {
// TODO: Expose a metric for IO Errors, or print to stderr
}
}
}
if let Err(e) = self.writer.flush() {
eprintln!("Failed to flush. Error: {}", e);
}
})
.expect("failed to spawn `tracing-appender` non-blocking worker thread")
}
}