-
Notifications
You must be signed in to change notification settings - Fork 173
Description
When a container is created with a LogConsumer, the ContainerAsync does not wait for the logs to stop while dropping. If the process ends quickly, e.g. when running a single cargo test, some logs may not be captured by the time the process ends.
See this example failing test. Note that by its nature it's a flaky test, so you may need to increase the $(seq 20) argument to reproduce it, though on my relatively fast machine the counter never gets past 1 or 2.
#[cfg(test)]
mod tests {
use std::{
ops::AddAssign,
sync::{Arc, Mutex},
};
use futures::{future::BoxFuture, FutureExt};
use testcontainers::{
core::logs::{consumer::LogConsumer, LogFrame},
runners::AsyncRunner,
GenericImage, ImageExt,
};
struct TestLogConsumer(Arc<Mutex<u64>>);
impl LogConsumer for TestLogConsumer {
fn accept<'a>(&'a self, _: &'a LogFrame) -> BoxFuture<'a, ()> {
async move {
self.0.lock().unwrap().add_assign(1);
}
.boxed()
}
}
#[tokio::test]
async fn bad_test() {
let counter = Arc::new(Mutex::new(0u64));
{
GenericImage::new("docker.io/library/ubuntu", "latest")
.with_cmd(["/bin/bash", "-c", "for i in $(seq 20); do echo $i; done"])
.with_log_consumer(TestLogConsumer(Arc::clone(&counter)))
.start()
.await
.unwrap();
}
assert_eq!(counter.lock().unwrap().to_owned(), 20);
}
}I'm able to workaround this by wrapping the container and avoiding using LogConsumer at all:
#[cfg(test)]
mod tests {
use std::{
ops::AddAssign,
pin::Pin,
sync::{Arc, Mutex},
};
use testcontainers::{runners::AsyncRunner, ContainerAsync, GenericImage, ImageExt};
use tokio::{
io::{AsyncBufRead, AsyncBufReadExt},
runtime::Handle,
task::JoinHandle,
};
type Buffer = Pin<Box<dyn AsyncBufRead + Send>>;
struct TestLogConsumer {
stdout_handle: Option<JoinHandle<()>>,
stderr_handle: Option<JoinHandle<()>>,
}
impl TestLogConsumer {
fn new(counter: Arc<Mutex<u64>>, stdout: Buffer, stderr: Buffer) -> Self {
let (stdout_counter, stderr_counter) = (Arc::clone(&counter), Arc::clone(&counter));
let stdout_handle = Some(tokio::spawn(async move {
let mut lines = stdout.lines();
while let Ok(Some(_)) = lines.next_line().await {
stdout_counter.lock().unwrap().add_assign(1);
}
}));
let stderr_handle = Some(tokio::spawn(async move {
let mut lines = stderr.lines();
while let Ok(Some(_)) = lines.next_line().await {
stderr_counter.lock().unwrap().add_assign(1);
}
}));
Self {
stdout_handle,
stderr_handle,
}
}
}
impl Drop for TestLogConsumer {
fn drop(&mut self) {
let handle = Handle::current();
let stdout_handle = self.stdout_handle.take();
let stderr_handle = self.stderr_handle.take();
tokio::task::block_in_place(|| {
stdout_handle.map(|task| handle.block_on(task));
stderr_handle.map(|task| handle.block_on(task));
});
}
}
struct TestContainer {
container: ContainerAsync<GenericImage>,
log_consumer: TestLogConsumer,
}
impl TestContainer {
async fn new(counter: Arc<Mutex<u64>>) -> Self {
let container = GenericImage::new("docker.io/library/ubuntu", "latest")
.with_cmd(["/bin/bash", "-c", "for i in $(seq 20); do echo $i; done"])
.start()
.await
.unwrap();
let log_consumer =
TestLogConsumer::new(counter, container.stdout(true), container.stderr(true));
TestContainer {
container,
log_consumer,
}
}
}
#[tokio::test(flavor = "multi_thread")]
async fn good_test() {
let counter = Arc::new(Mutex::new(0u64));
{
TestContainer::new(Arc::clone(&counter)).await;
}
assert_eq!(counter.lock().unwrap().to_owned(), 20);
}
}But that's unfortunate. I think if someone is using a LogConsumer they'd prefer that all logs are captured.
I think the problem is that the log consumer task is created with tokio::spawn, and the corresponding handle is never joined. https://github.com/testcontainers/testcontainers-rs/blob/main/testcontainers/src/core/containers/async_container.rs#L71.
I'm more than happy to fix this myself, but wanted to gauge maintainer interest first, since adding more blocking calls to a Drop is generally unpleasant.