Skip to content

Commit

Permalink
WIP: Remove cross-locking on mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
EdoZua95LT committed Sep 18, 2024
1 parent 5c51132 commit e3ee380
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 28 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rand = "0.8.5"
kalmanfilt = "0.2.4"
tokio = "1.40.0"
70 changes: 42 additions & 28 deletions src/bin/sailtrack-kalman.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use rand::Rng;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use std::sync::mpsc::{Receiver, Sender};

use nalgebra::{OMatrix, OVector, U3, U6};
use rumqttc::{Client, Event, Incoming, MqttOptions, QoS};
Expand All @@ -17,6 +18,12 @@ const EARTH_CIRCUMFERENCE_METERS: f32 = 40075.0 * 1000.0;
const KALMAN_SAMPLE_TIME_MS: u64 = 200;
const LAT_FACTOR: f32 = 1.0;

#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
enum SyncEvent {
GPS_Received,
IMU_Received,
GPS_Fix_Received,
}

#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -85,15 +92,12 @@ struct MeasureCollection<OVector> {
struct Measure {
meas: OVector<f32, U6>,
meas_variance: OMatrix<f32, U6, U6>,
past_measures: MeasureCollection<OVector<f32, U6>>,
new_measure: bool,
}

#[derive(Debug, Clone, Copy)]
struct Input {
acceleration: OVector<f32, U3>,
orientation: OVector<f32, U3>,
new_input: bool,
}

impl MeasureCollection<OVector<f32, U6>> {
Expand Down Expand Up @@ -181,25 +185,6 @@ fn get_measure_forom_gps(gps_data: &Gps, reference: &Gps, measure_struct: &mut M
measure_struct.new_measure = true;
}

// Function that keeps on controll if the GPS fix is obtained
fn wait_for_fix_tipe(gps_ref_arc: &Arc<Mutex<Gps>>) -> bool {
let gps_ref_lock = acquire_lock(gps_ref_arc, line!());
if gps_ref_lock.fix_type == 3 {
drop(gps_ref_lock);
return true;
}
drop(gps_ref_lock);
loop {
let gps_ref_lock = acquire_lock(gps_ref_arc, line!());
if gps_ref_lock.fix_type == 3 {
drop(gps_ref_lock);
return true;
}
drop(gps_ref_lock);
thread::sleep(Duration::from_millis(1000));
}
}

fn on_message_imu(message: Imu, input: &Arc<Mutex<Input>>) {
let accel_vec = vec![
message.linear_accel.x,
Expand Down Expand Up @@ -270,7 +255,6 @@ fn main() {
let input = Input {
acceleration: OVector::<f32, U3>::zeros(),
orientation: OVector::<f32, U3>::zeros(),
new_input: false,
};

let gps_ref = Gps {
Expand All @@ -293,8 +277,6 @@ fn main() {
let measure = Measure {
meas: OVector::<f32, U6>::zeros(),
meas_variance: OMatrix::<f32, U6, U6>::identity(),
past_measures: MeasureCollection::new(),
new_measure: false,
};

// Creating ESKF object
Expand Down Expand Up @@ -372,6 +354,8 @@ fn main() {
R: noise_meas_cov,
..Default::default()
};
// Defining Event Channels
let (tx, rx) = mpsc::channel();

// Defining Mutex for thread share
let gps_ref_mutex = Arc::new(Mutex::new(gps_ref));
Expand Down Expand Up @@ -402,6 +386,10 @@ fn main() {
serde_json::from_slice(payload.as_ref()).unwrap(),
&input_clone,
);
match tx.try_send(SyncEvent::IMU_Received){
Ok(_) => (),
Err(_) => continue
}
}
"sensor/gps0" => {
let payload = packet.payload.clone(); // Clone the payload for later use
Expand All @@ -410,21 +398,47 @@ fn main() {
&gps_ref_clone,
&measure_clone,
);
match tx.try_send(SyncEvent::IMU_Received){
Ok(_) => (),
Err(_) => continue
}
}
_ => (),
}
}
}
});

// GPS fix thread
let gps_ref_clone = Arc::clone(&gps_ref_mutex);
thread::spawn(move || {
let gps_ref_lock = acquire_lock(gps_ref_clone, line!());
if gps_ref_lock.fix_type == 3 {
drop(gps_ref_lock);
return true;
}
drop(gps_ref_lock);
loop {
let gps_ref_lock = acquire_lock(gps_ref_clone, line!());
if gps_ref_lock.fix_type == 3 {
drop(gps_ref_lock);
return true;
}
drop(gps_ref_lock);
thread::sleep(Duration::from_millis(1000));
}
});

// Kalman filter thread
let gps_ref_clone = Arc::clone(&gps_ref_mutex);
let measure_clone = Arc::clone(&measure_mutex);
let input_clone = Arc::clone(&input_mutex);
let filter_clone = Arc::clone(&filter_mutex);
thread::spawn(move || loop {
// Check if the GPS fix has been obtained
wait_for_fix_tipe(&gps_ref_clone);
loop {

}
let thread_start = Instant::now();
let mut measure_lock = acquire_lock(&measure_clone, line!());
let measure = &mut *measure_lock;
Expand Down Expand Up @@ -532,7 +546,7 @@ fn main() {
serde_json::to_vec(&message).unwrap(),
)
.unwrap();

thread::sleep(Duration::from_millis(1000 / MQTT_PUBLISH_FREQ_HZ));
}

}
}

0 comments on commit e3ee380

Please sign in to comment.