diff --git a/Odyssey-Definitions b/Odyssey-Definitions index e20db94..5805bd2 160000 --- a/Odyssey-Definitions +++ b/Odyssey-Definitions @@ -1 +1 @@ -Subproject commit e20db9419ff94d079f4d81b7c97d6b8edd4b4fb2 +Subproject commit 5805bd259c11d1913a8b1d711237d2a35cec95b1 diff --git a/src/bin/nerimd.rs b/src/bin/nerimd.rs index eb78331..191c09e 100644 --- a/src/bin/nerimd.rs +++ b/src/bin/nerimd.rs @@ -445,7 +445,8 @@ async fn write_ctrl_or_set( .await { Ok(()) => { - debug!("Successfully sent message {}", index); + debug!("Successfully sent message {}, waiting", index); + tokio::time::sleep(Duration::from_secs(1)).await; if let Some(idex) = read_back_index { read_or_readback(socket, idex).await; } else { @@ -484,8 +485,9 @@ async fn read_or_readback(socket: &mut CanSocket, index: u8) { if frame.data()[0] == 0xFF { println!("Data failure: {:?}", ErrorMessage::try_from_primitive(frame.data()[1]).expect("Invalid error code!")); } else { - println!("Data returned: {:?}", frame.data()); + println!("Data returned: {:02X?}", frame.data()); } + break; }, socketcan::Id::Extended(_) => (), } diff --git a/src/imd_poll.rs b/src/imd_poll.rs new file mode 100644 index 0000000..7959a03 --- /dev/null +++ b/src/imd_poll.rs @@ -0,0 +1,196 @@ +use std::time::Duration; +use std::time::UNIX_EPOCH; + +use socketcan::{CanDataFrame, CanFrame, EmbeddedFrame, Id}; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; +use tracing::{debug, warn}; + +use crate::proto::serverdata::ServerData; + +enum ImdScaling { + Default, + TimesPoint1, + Voltage, +} +struct ImdPacket { + pub id: u8, + pub size: usize, + pub topic: &'static str, + pub unit: &'static str, + pub scaling: ImdScaling, +} + +impl ImdPacket { + pub fn convert(&self, input: &mut Vec) -> (String, ServerData) { + // extend to id plus size of u32. All this fuckery needed as from_le_bytes requires exactly 4 bytes + input.truncate(self.size + 1); + input.resize(5, 0); + let out = u32::from_le_bytes( + input[1..=size_of::()] + .try_into() + .expect("Critical failure to decode IMD message"), + ); + let send: f32 = match self.scaling { + ImdScaling::Default => out as f32, + ImdScaling::TimesPoint1 => out as f32 * 0.1, + ImdScaling::Voltage => (out as f32 * 0.05) - 1606.4, + }; + + let mut payload = ServerData::new(); + payload.unit = self.unit.to_string(); + payload.values = vec![send]; + payload.time_us = UNIX_EPOCH.elapsed().unwrap().as_micros() as u64; + + (self.topic.to_string(), payload) + } +} + +// To add a supported field from the datasheet, add it here. +// Note that for now we are max 4 byte fields, a field 5 bytes or more cannot be read by the driver and will be truncated +const SENDABLES: [ImdPacket; 9] = [ + ImdPacket { + id: 0x40, + size: 2, + topic: "IMD/Info/Iso_Detail/R_iso_neg", + unit: "kOhm", + scaling: ImdScaling::Default, + }, + ImdPacket { + id: 0x42, + size: 2, + topic: "IMD/Info/Iso_Detail/R_iso_pos", + unit: "kOhm", + scaling: ImdScaling::Default, + }, + ImdPacket { + id: 0x4E, + size: 2, + topic: "IMD/Info/Iso_Detail/R_iso_original", + unit: "kOhm", + scaling: ImdScaling::Default, + }, + ImdPacket { + id: 0x3E, + size: 1, + topic: "IMD/Info/Iso_Detail/Iso_quality", + unit: "%", + scaling: ImdScaling::Default, + }, + ImdPacket { + id: 0x5E, + size: 2, + topic: "IMD/Info/Voltage/HV_system", + unit: "V", + scaling: ImdScaling::Voltage, + }, + ImdPacket { + id: 0x60, + size: 2, + topic: "IMD/Info/Voltage/HV_neg_to_earth", + unit: "V", + scaling: ImdScaling::Voltage, + }, + ImdPacket { + id: 0x62, + size: 2, + topic: "IMD/Info/Voltage/HV_pos_to_earth", + unit: "V", + scaling: ImdScaling::Voltage, + }, + ImdPacket { + id: 0x52, + size: 2, + topic: "IMD/Info/It_System/Capacity_measured_value", + unit: "uF", + scaling: ImdScaling::TimesPoint1, + }, + ImdPacket { + id: 0x2A, + size: 1, + topic: "IMD/Info/It_System/Unbalance_measured_value", + unit: "%", + scaling: ImdScaling::Default, + }, +]; + +/// This thread polls the IMD for data as our firmware version is too old to get the data automatically (RIP) +/// It 1. asks for a message, 2. waits for the message, 3. sends out said message, and 4. sends out another request +/// +/// # Panics +/// Panics if it doesnt work +pub async fn imd_poll_main( + cancel_token: CancellationToken, + can_send: mpsc::Sender, + mut can_recv: mpsc::Receiver, + mqtt_send: mpsc::Sender<(String, ServerData)>, +) { + debug!("Starting IMD manager"); + // send/recieve cyclically + // pub frame + // recv frame + // pub mqtt + + // pre-compute starter ID + let id: Id = socketcan::StandardId::new(0x22).expect("Tf").into(); + + // we cannot exceed 10Hz. Some extra leeway here. This missed tick burst will guarrantee we send at the next available moment after 100ms has passed + let mut min_time: tokio::time::Interval = tokio::time::interval(Duration::from_millis(120)); + + let mut lock: bool = false; + let mut curr_idex: usize = 0; + loop { + tokio::select! { + () = cancel_token.cancelled() => { + debug!("Shutting down IMD manager!"); + break; + }, + Some(val) = can_recv.recv() => { + // check ID is Ox23 + match val.id() { + Id::Standard(id) => { + if id.as_raw() != 0x23 { + continue; + } + }, + Id::Extended(_) => { + // ignore message + continue; + } + } + + // check message identifier equals current index, if not warn and bail + if val.data()[0] != SENDABLES[curr_idex].id { + warn!("Detected unknown request response id for IMD!"); + continue; + } + + let mut data = val.data().to_owned(); + let to_send = SENDABLES[curr_idex].convert(&mut data); + if let Err(err) = mqtt_send.send(to_send).await { + warn!("Could not send MQTT message from IMD {}", err); + } + + // we can now send the next message on the next loop + lock = false; + + }, + // this is fancy ass code to basically say if its been 100ms since last message, and we arent locked, go for it + _ = min_time.tick(), if !lock => { + curr_idex += 1; + if curr_idex >= SENDABLES.len() { + curr_idex = 0; + } + // send can frame. Byte 1 is the ID of the data we want + if let Err(err) = can_send.send(CanFrame::new(id, &[SENDABLES[curr_idex].id]).expect("Failed to create IMD frame")).await { + warn!("Error sending IMD frame: {}", err); + } + + // lets reset our interval, (i.e the 100ms restarts now) + min_time.reset(); + // lets also lock the thread so we dont try and send another + lock = true; + } + }; + } +} diff --git a/src/lib.rs b/src/lib.rs index 81d5a32..0ba8db4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,3 +5,5 @@ pub mod encode_data; pub mod proto; pub mod simulatable_message; pub mod simulate_data; + +pub mod imd_poll; diff --git a/src/main.rs b/src/main.rs index db5ea7a..5e887d7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ use calypso::{ data::{DecodeData, EncodeData}, decode_data::DECODE_FUNCTION_MAP, encode_data::{ENCODABLE_KEY_LIST, ENCODE_FUNCTION_MAP}, + imd_poll::imd_poll_main, proto::{ command_data, serverdata::{self, ServerData}, @@ -20,7 +21,9 @@ use rumqttc::v5::{ AsyncClient, Event, EventLoop, MqttOptions, mqttbytes::v5::{Packet, Publish}, }; -use socketcan::{CanError, CanFrame, EmbeddedFrame, Frame, Id, SocketOptions, tokio::CanSocket}; +use socketcan::{ + CanDataFrame, CanError, CanFrame, EmbeddedFrame, Frame, Id, SocketOptions, tokio::CanSocket, +}; use tokio::{ signal, sync::mpsc::{self, Receiver, Sender}, @@ -58,9 +61,13 @@ struct CalypsoArgs { )] socketcan_iface: String, - // Whether to enable MQTT multi-client + /// Whether to enable MQTT multi-client #[arg(short = 'm', long, env = "CALYPSO_MQTT_MULTICLIENT")] mqtt_multiclient: bool, + + /// Whether to enable cyclic IMD polling + #[arg(long, env = "CALYPSO_CAN_ENCODE")] + imd: bool, } /** @@ -75,6 +82,7 @@ async fn can_manager( can_interface: String, main_send_to_siren: Sender<(String, ServerData)>, alt_send_to_siren: Option>, + send_raw_can: Option>, mut send_over_can: Receiver, ) { let mut socket = CanSocket::open(&can_interface).expect("Failed to open CAN socket!"); @@ -99,7 +107,7 @@ async fn can_manager( }, Some(frame) = socket.next() => { frame_cnt += 1; - pub_frame(frame, &main_send_to_siren, alt_send_to_siren.as_ref(), &mut mqtt_cnt).await; + pub_frame(frame, &main_send_to_siren, alt_send_to_siren.as_ref(), send_raw_can.as_ref(), &mut mqtt_cnt, ).await; } Some(frame) = send_over_can.recv() => { match socket.write_frame(frame).await { @@ -130,6 +138,7 @@ async fn pub_frame( frame: Result, main_send: &Sender<(String, ServerData)>, alt_send: Option<&Sender<(String, ServerData)>>, + raw_send: Option<&Sender>, cnt: &mut u64, ) { let decoded_data = match frame { @@ -140,6 +149,14 @@ async fn pub_frame( socketcan::Id::Standard(std) => std.as_raw().into(), socketcan::Id::Extended(ext) => ext.as_raw(), }; + if let Some(send) = raw_send { + // for now just hardcode IMD + if id == 0x23 + && let Err(err) = send.send(data_frame).await + { + warn!("Could not send IMD code the response! {}", err); + } + } trace!("RECVED message with ID: {:#01x}", id); match DECODE_FUNCTION_MAP.get(&id) { Some(func) => func(data), @@ -555,6 +572,14 @@ async fn main() { // a channel to give protobuf messages to be sent out over MQTT let (decoder_send, decoder_recv) = mpsc::channel::<(String, ServerData)>(500); + // a channel to hijack certain raw CAN messages, right now only used for IMD + let (can_decoder_send, can_decoder_recv) = if cli.imd { + let ch = mpsc::channel::(50); + (Some(ch.0), Some(ch.1)) + } else { + (None, None) + }; + // a channel to give CAN messages back out (car commands) let (can_push_send, can_push_recv) = mpsc::channel::(100); @@ -581,27 +606,38 @@ async fn main() { task_tracker.spawn(can_manager( token.clone(), cli.socketcan_iface, - decoder_send, + decoder_send.clone(), Some(decoder_send_alt), + can_decoder_send, can_push_recv, )); } else { task_tracker.spawn(can_manager( token.clone(), cli.socketcan_iface, - decoder_send, + decoder_send.clone(), None, + can_decoder_send, can_push_recv, )); } task_tracker.spawn(bidir_manager( token.clone(), - can_push_send, + can_push_send.clone(), siren_recv_recv, cli.encode, )); + if let Some(can_decoder_recv) = can_decoder_recv { + task_tracker.spawn(imd_poll_main( + token.clone(), + can_push_send, + can_decoder_recv, + decoder_send, + )); + } + task_tracker.close(); info!("Initialization complete, ready...");