Skip to content

Commit

Permalink
Integration test for reconnection logic on publishers and subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
carter committed May 21, 2024
1 parent 1cf3f69 commit 8c34ce0
Showing 1 changed file with 96 additions and 4 deletions.
100 changes: 96 additions & 4 deletions roslibrust/src/rosbridge/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,103 @@ mod integration_tests {
/// - Confirms publisher and subscriber still work!
#[test_log::test(tokio::test)]
async fn pub_and_sub_reconnect_through_dead_bridge() {
// Child process not automatically killed on drop
// Wrapping in a guard
struct ChildGuard(std::process::Child);
impl Drop for ChildGuard {
fn drop(&mut self) {
// Roslaunch and rosbridge don't have a clean shutdown, so we're doing some shit here...
for _ in 0..5 {
// WHY DO YOU SUCK ROSBRIDGE
let mut kill = std::process::Command::new("kill")
.args(["-s", "TERM", &self.0.id().to_string()])
.spawn()
.expect("Failed to kill rosbridge");
kill.wait().expect("Failed to kill rosbridge");
}
}
}

// For now picking 9095 as a custom port for this test and hoping there are no collisions
let bridge = ChildGuard(
std::process::Command::new("roslaunch")
.args([
"rosbridge_server",
"rosbridge_websocket.launch",
"port:=9095",
])
.spawn()
.expect("Failed to start rosbridge"),
);

// Note longer timeout here to allow for bridge to come up
let client = ClientHandle::new_with_options(
ClientHandleOptions::new("ws://localhost:9095")
.timeout(tokio::time::Duration::from_secs(5)),
)
.await
.expect("Failed to construct client");

let publisher = client
.advertise("/test_reconnect")
.await
.expect("Failed to advertise");
let subscriber = client
.subscribe::<Header>("/test_reconnect")
.await
.expect("Failed to subscribe");

// Confirm we can send and receive messages
publisher
.publish(Header::default())
.await
.expect("Failed to publish");

let received = subscriber.next().await;
assert_eq!(received, Header::default());

// kill rosbridge
std::mem::drop(bridge);

std::process::Command::new("rosrun")
.args(["rosbridge_server", "rosbridge_websocket"])
.spawn()
.expect("Failed to start rosbridge");
// Wait for rosbridge to die fully
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;

// Try to publish and confirm we get an error
let res = publisher.publish(Header::default()).await;
match res {
Ok(_) => {
panic!("Should have failed to publish after rosbridge died");
}
Err(RosLibRustError::Disconnected) => {
// All good!
}
Err(e) => {
panic!("Got unexpected error: {e}");
}
}

// Start the bridge back up!
let bridge = ChildGuard(
std::process::Command::new("roslaunch")
.args([
"rosbridge_server",
"rosbridge_websocket.launch",
"port:=9095",
])
.spawn()
.expect("Failed to start rosbridge"),
);

// Wait for bridge to come up
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;

// Try to publish and confirm we reconnect automatically
publisher
.publish(Header::default())
.await
.expect("Failed to publish after rosbridge died");

let received = subscriber.next().await;
assert_eq!(received, Header::default());
}
}

0 comments on commit 8c34ce0

Please sign in to comment.