From 8c34ce0f2d13206de63b1bf966865c9ae1dce2bd Mon Sep 17 00:00:00 2001 From: carter Date: Tue, 21 May 2024 18:40:16 +0000 Subject: [PATCH] Integration test for reconnection logic on publishers and subscribers --- roslibrust/src/rosbridge/integration_tests.rs | 100 +++++++++++++++++- 1 file changed, 96 insertions(+), 4 deletions(-) diff --git a/roslibrust/src/rosbridge/integration_tests.rs b/roslibrust/src/rosbridge/integration_tests.rs index 016f361c..23416ee2 100644 --- a/roslibrust/src/rosbridge/integration_tests.rs +++ b/roslibrust/src/rosbridge/integration_tests.rs @@ -381,11 +381,103 @@ mod integration_tests { /// - Confirms publisher and subscriber still work! #[test_log::test(tokio::test)] async fn pub_and_sub_reconnect_through_dead_bridge() { + // Child process not automatically killed on drop + // Wrapping in a guard + struct ChildGuard(std::process::Child); + impl Drop for ChildGuard { + fn drop(&mut self) { + // Roslaunch and rosbridge don't have a clean shutdown, so we're doing some shit here... + for _ in 0..5 { + // WHY DO YOU SUCK ROSBRIDGE + let mut kill = std::process::Command::new("kill") + .args(["-s", "TERM", &self.0.id().to_string()]) + .spawn() + .expect("Failed to kill rosbridge"); + kill.wait().expect("Failed to kill rosbridge"); + } + } + } + + // For now picking 9095 as a custom port for this test and hoping there are no collisions + let bridge = ChildGuard( + std::process::Command::new("roslaunch") + .args([ + "rosbridge_server", + "rosbridge_websocket.launch", + "port:=9095", + ]) + .spawn() + .expect("Failed to start rosbridge"), + ); + + // Note longer timeout here to allow for bridge to come up + let client = ClientHandle::new_with_options( + ClientHandleOptions::new("ws://localhost:9095") + .timeout(tokio::time::Duration::from_secs(5)), + ) + .await + .expect("Failed to construct client"); + + let publisher = client + .advertise("/test_reconnect") + .await + .expect("Failed to advertise"); + let subscriber = client + .subscribe::
("/test_reconnect") + .await + .expect("Failed to subscribe"); + + // Confirm we can send and receive messages + publisher + .publish(Header::default()) + .await + .expect("Failed to publish"); + + let received = subscriber.next().await; + assert_eq!(received, Header::default()); + + // kill rosbridge + std::mem::drop(bridge); - std::process::Command::new("rosrun") - .args(["rosbridge_server", "rosbridge_websocket"]) - .spawn() - .expect("Failed to start rosbridge"); + // Wait for rosbridge to die fully + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // Try to publish and confirm we get an error + let res = publisher.publish(Header::default()).await; + match res { + Ok(_) => { + panic!("Should have failed to publish after rosbridge died"); + } + Err(RosLibRustError::Disconnected) => { + // All good! + } + Err(e) => { + panic!("Got unexpected error: {e}"); + } + } + + // Start the bridge back up! + let bridge = ChildGuard( + std::process::Command::new("roslaunch") + .args([ + "rosbridge_server", + "rosbridge_websocket.launch", + "port:=9095", + ]) + .spawn() + .expect("Failed to start rosbridge"), + ); + + // Wait for bridge to come up + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // Try to publish and confirm we reconnect automatically + publisher + .publish(Header::default()) + .await + .expect("Failed to publish after rosbridge died"); + let received = subscriber.next().await; + assert_eq!(received, Header::default()); } }