Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Odyssey-Definitions
6 changes: 4 additions & 2 deletions src/bin/nerimd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,8 @@ async fn write_ctrl_or_set(
.await
{
Ok(()) => {
debug!("Successfully sent message {}", index);
debug!("Successfully sent message {}, waiting", index);
tokio::time::sleep(Duration::from_secs(1)).await;
if let Some(idex) = read_back_index {
read_or_readback(socket, idex).await;
} else {
Expand Down Expand Up @@ -484,8 +485,9 @@ async fn read_or_readback(socket: &mut CanSocket, index: u8) {
if frame.data()[0] == 0xFF {
println!("Data failure: {:?}", ErrorMessage::try_from_primitive(frame.data()[1]).expect("Invalid error code!"));
} else {
println!("Data returned: {:?}", frame.data());
println!("Data returned: {:02X?}", frame.data());
}
break;
},
socketcan::Id::Extended(_) => (),
}
Expand Down
196 changes: 196 additions & 0 deletions src/imd_poll.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
use std::time::Duration;
use std::time::UNIX_EPOCH;

use socketcan::{CanDataFrame, CanFrame, EmbeddedFrame, Id};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};

use crate::proto::serverdata::ServerData;

enum ImdScaling {
Default,
TimesPoint1,
Voltage,
}
struct ImdPacket {
pub id: u8,
pub size: usize,
pub topic: &'static str,
pub unit: &'static str,
pub scaling: ImdScaling,
}

impl ImdPacket {
pub fn convert(&self, input: &mut Vec<u8>) -> (String, ServerData) {
// extend to id plus size of u32. All this fuckery needed as from_le_bytes requires exactly 4 bytes
input.truncate(self.size + 1);
input.resize(5, 0);
let out = u32::from_le_bytes(
input[1..=size_of::<u32>()]
.try_into()
.expect("Critical failure to decode IMD message"),
);
let send: f32 = match self.scaling {
ImdScaling::Default => out as f32,
ImdScaling::TimesPoint1 => out as f32 * 0.1,
ImdScaling::Voltage => (out as f32 * 0.05) - 1606.4,
};

let mut payload = ServerData::new();
payload.unit = self.unit.to_string();
payload.values = vec![send];
payload.time_us = UNIX_EPOCH.elapsed().unwrap().as_micros() as u64;

(self.topic.to_string(), payload)
}
}

// To add a supported field from the datasheet, add it here.
// Note that for now we are max 4 byte fields, a field 5 bytes or more cannot be read by the driver and will be truncated
const SENDABLES: [ImdPacket; 9] = [
ImdPacket {
id: 0x40,
size: 2,
topic: "IMD/Info/Iso_Detail/R_iso_neg",
unit: "kOhm",
scaling: ImdScaling::Default,
},
ImdPacket {
id: 0x42,
size: 2,
topic: "IMD/Info/Iso_Detail/R_iso_pos",
unit: "kOhm",
scaling: ImdScaling::Default,
},
ImdPacket {
id: 0x4E,
size: 2,
topic: "IMD/Info/Iso_Detail/R_iso_original",
unit: "kOhm",
scaling: ImdScaling::Default,
},
ImdPacket {
id: 0x3E,
size: 1,
topic: "IMD/Info/Iso_Detail/Iso_quality",
unit: "%",
scaling: ImdScaling::Default,
},
ImdPacket {
id: 0x5E,
size: 2,
topic: "IMD/Info/Voltage/HV_system",
unit: "V",
scaling: ImdScaling::Voltage,
},
ImdPacket {
id: 0x60,
size: 2,
topic: "IMD/Info/Voltage/HV_neg_to_earth",
unit: "V",
scaling: ImdScaling::Voltage,
},
ImdPacket {
id: 0x62,
size: 2,
topic: "IMD/Info/Voltage/HV_pos_to_earth",
unit: "V",
scaling: ImdScaling::Voltage,
},
ImdPacket {
id: 0x52,
size: 2,
topic: "IMD/Info/It_System/Capacity_measured_value",
unit: "uF",
scaling: ImdScaling::TimesPoint1,
},
ImdPacket {
id: 0x2A,
size: 1,
topic: "IMD/Info/It_System/Unbalance_measured_value",
unit: "%",
scaling: ImdScaling::Default,
},
];

/// This thread polls the IMD for data as our firmware version is too old to get the data automatically (RIP)
/// It 1. asks for a message, 2. waits for the message, 3. sends out said message, and 4. sends out another request
///
/// # Panics
/// Panics if it doesnt work
pub async fn imd_poll_main(
cancel_token: CancellationToken,
can_send: mpsc::Sender<CanFrame>,
mut can_recv: mpsc::Receiver<CanDataFrame>,
mqtt_send: mpsc::Sender<(String, ServerData)>,
) {
debug!("Starting IMD manager");
// send/recieve cyclically
// pub frame
// recv frame
// pub mqtt

// pre-compute starter ID
let id: Id = socketcan::StandardId::new(0x22).expect("Tf").into();

// we cannot exceed 10Hz. Some extra leeway here. This missed tick burst will guarrantee we send at the next available moment after 100ms has passed
let mut min_time: tokio::time::Interval = tokio::time::interval(Duration::from_millis(120));

let mut lock: bool = false;
let mut curr_idex: usize = 0;
loop {
tokio::select! {
() = cancel_token.cancelled() => {
debug!("Shutting down IMD manager!");
break;
},
Some(val) = can_recv.recv() => {
// check ID is Ox23
match val.id() {
Id::Standard(id) => {
if id.as_raw() != 0x23 {
continue;
}
},
Id::Extended(_) => {
// ignore message
continue;
}
}

// check message identifier equals current index, if not warn and bail
if val.data()[0] != SENDABLES[curr_idex].id {
warn!("Detected unknown request response id for IMD!");
continue;
}

let mut data = val.data().to_owned();
let to_send = SENDABLES[curr_idex].convert(&mut data);
if let Err(err) = mqtt_send.send(to_send).await {
warn!("Could not send MQTT message from IMD {}", err);
}

// we can now send the next message on the next loop
lock = false;

},
// this is fancy ass code to basically say if its been 100ms since last message, and we arent locked, go for it
_ = min_time.tick(), if !lock => {
curr_idex += 1;
if curr_idex >= SENDABLES.len() {
curr_idex = 0;
}
// send can frame. Byte 1 is the ID of the data we want
if let Err(err) = can_send.send(CanFrame::new(id, &[SENDABLES[curr_idex].id]).expect("Failed to create IMD frame")).await {
warn!("Error sending IMD frame: {}", err);
}

// lets reset our interval, (i.e the 100ms restarts now)
min_time.reset();
// lets also lock the thread so we dont try and send another
lock = true;
}
};
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ pub mod encode_data;
pub mod proto;
pub mod simulatable_message;
pub mod simulate_data;

pub mod imd_poll;
48 changes: 42 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use calypso::{
data::{DecodeData, EncodeData},
decode_data::DECODE_FUNCTION_MAP,
encode_data::{ENCODABLE_KEY_LIST, ENCODE_FUNCTION_MAP},
imd_poll::imd_poll_main,
proto::{
command_data,
serverdata::{self, ServerData},
Expand All @@ -20,7 +21,9 @@ use rumqttc::v5::{
AsyncClient, Event, EventLoop, MqttOptions,
mqttbytes::v5::{Packet, Publish},
};
use socketcan::{CanError, CanFrame, EmbeddedFrame, Frame, Id, SocketOptions, tokio::CanSocket};
use socketcan::{
CanDataFrame, CanError, CanFrame, EmbeddedFrame, Frame, Id, SocketOptions, tokio::CanSocket,
};
use tokio::{
signal,
sync::mpsc::{self, Receiver, Sender},
Expand Down Expand Up @@ -58,9 +61,13 @@ struct CalypsoArgs {
)]
socketcan_iface: String,

// Whether to enable MQTT multi-client
/// Whether to enable MQTT multi-client
#[arg(short = 'm', long, env = "CALYPSO_MQTT_MULTICLIENT")]
mqtt_multiclient: bool,

/// Whether to enable cyclic IMD polling
#[arg(long, env = "CALYPSO_CAN_ENCODE")]
imd: bool,
}

/**
Expand All @@ -75,6 +82,7 @@ async fn can_manager(
can_interface: String,
main_send_to_siren: Sender<(String, ServerData)>,
alt_send_to_siren: Option<Sender<(String, ServerData)>>,
send_raw_can: Option<Sender<CanDataFrame>>,
mut send_over_can: Receiver<CanFrame>,
) {
let mut socket = CanSocket::open(&can_interface).expect("Failed to open CAN socket!");
Expand All @@ -99,7 +107,7 @@ async fn can_manager(
},
Some(frame) = socket.next() => {
frame_cnt += 1;
pub_frame(frame, &main_send_to_siren, alt_send_to_siren.as_ref(), &mut mqtt_cnt).await;
pub_frame(frame, &main_send_to_siren, alt_send_to_siren.as_ref(), send_raw_can.as_ref(), &mut mqtt_cnt, ).await;
}
Some(frame) = send_over_can.recv() => {
match socket.write_frame(frame).await {
Expand Down Expand Up @@ -130,6 +138,7 @@ async fn pub_frame(
frame: Result<CanFrame, socketcan::Error>,
main_send: &Sender<(String, ServerData)>,
alt_send: Option<&Sender<(String, ServerData)>>,
raw_send: Option<&Sender<CanDataFrame>>,
cnt: &mut u64,
) {
let decoded_data = match frame {
Expand All @@ -140,6 +149,14 @@ async fn pub_frame(
socketcan::Id::Standard(std) => std.as_raw().into(),
socketcan::Id::Extended(ext) => ext.as_raw(),
};
if let Some(send) = raw_send {
// for now just hardcode IMD
if id == 0x23
&& let Err(err) = send.send(data_frame).await
{
warn!("Could not send IMD code the response! {}", err);
}
}
trace!("RECVED message with ID: {:#01x}", id);
match DECODE_FUNCTION_MAP.get(&id) {
Some(func) => func(data),
Expand Down Expand Up @@ -555,6 +572,14 @@ async fn main() {
// a channel to give protobuf messages to be sent out over MQTT
let (decoder_send, decoder_recv) = mpsc::channel::<(String, ServerData)>(500);

// a channel to hijack certain raw CAN messages, right now only used for IMD
let (can_decoder_send, can_decoder_recv) = if cli.imd {
let ch = mpsc::channel::<CanDataFrame>(50);
(Some(ch.0), Some(ch.1))
} else {
(None, None)
};

// a channel to give CAN messages back out (car commands)
let (can_push_send, can_push_recv) = mpsc::channel::<CanFrame>(100);

Expand All @@ -581,27 +606,38 @@ async fn main() {
task_tracker.spawn(can_manager(
token.clone(),
cli.socketcan_iface,
decoder_send,
decoder_send.clone(),
Some(decoder_send_alt),
can_decoder_send,
can_push_recv,
));
} else {
task_tracker.spawn(can_manager(
token.clone(),
cli.socketcan_iface,
decoder_send,
decoder_send.clone(),
None,
can_decoder_send,
can_push_recv,
));
}

task_tracker.spawn(bidir_manager(
token.clone(),
can_push_send,
can_push_send.clone(),
siren_recv_recv,
cli.encode,
));

if let Some(can_decoder_recv) = can_decoder_recv {
task_tracker.spawn(imd_poll_main(
token.clone(),
can_push_send,
can_decoder_recv,
decoder_send,
));
}

task_tracker.close();

info!("Initialization complete, ready...");
Expand Down