Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull request #123

Merged
merged 6 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion drmemd/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "drmemd"
version = "0.4.0"
version = "0.4.1"
authors = ["Rich Neswold <[email protected]>"]
edition = "2021"
description = "Main process of the DrMem control system"
Expand Down
36 changes: 36 additions & 0 deletions drmemd/src/logic/compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2934,4 +2934,40 @@ mod tests {
);
}
}

#[test]
fn test_solar_usage() {
const DATA: &[(&str, bool)] = &[
// Make sure literals, variables, and time values don't
// return a field.
("{a}", false),
("1", false),
("1.0", false),
("true", false),
("#green", false),
("\"test\"", false),
("{utc:second}", false),
// Make sure the solar values return true.
("{solar:alt}", true),
("{solar:dec}", true),
("{solar:ra}", true),
("{solar:az}", true),
// Now test more complicated expressions to make sure each
// subtree is correctly compared.
("not (2 > 3)", false),
("2 + 2", false),
("{solar:alt} + 2", true),
("2 + {solar:az}", true),
("{solar:dec} + {solar:az}", true),
];

for (expr, result) in DATA {
assert_eq!(
&to_expr(expr).uses_solar(),
result,
"error using {}",
expr
);
}
}
}
110 changes: 90 additions & 20 deletions drmemd/src/logic/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use drmem_api::{client, device, driver, Result};
use futures::future::join_all;
use futures::future::{join_all, pending};
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
Expand Down Expand Up @@ -395,7 +395,7 @@ impl Node {

let wait_for_time = async {
match self.time_ch.as_mut() {
None => None,
None => pending().await,
Some(s) => s.next().await,
}
};
Expand All @@ -407,23 +407,32 @@ impl Node {

let wait_for_solar = async {
match self.solar_ch.as_mut() {
None => None,
Some(ch) => ch.recv().await.ok(),
None => pending().await,
Some(ch) => ch.recv().await,
}
};

#[rustfmt::skip]
tokio::select! {
// Wait for the next reading to arrive. All the
// incoming streams have been combined into one and
// the returned value is a pair consisting of an index
// and the actual reading.
biased;

Some((idx, reading)) = self.in_stream.next() => {
// Save the reading in our array for future
// recalculations.
// If we need the solar channel, wait for the next
// update.

self.inputs[idx] = Some(reading.value);
v = wait_for_solar => {
match v {
Ok(v) => solar = Some(v),
Err(broadcast::error::RecvError::Lagged(_)) => {
warn!("not handling solar info fast enough");
continue
}
Err(broadcast::error::RecvError::Closed) => {
error!("solar info channel is closed");
return Err(drmem_api::Error::OperationError(
"solar channel closed".into()
));
}
}
}

// If we need the time channel, wait for the next
Expand All @@ -433,11 +442,16 @@ impl Node {
time = v;
}

// If we need the solar channel, wait for the next
// update.
// Wait for the next reading to arrive. All the
// incoming streams have been combined into one and
// the returned value is a pair consisting of an index
// and the actual reading.

Some((idx, reading)) = self.in_stream.next() => {
// Save the reading in our array for future
// recalculations.

Some(v) = wait_for_solar => {
solar = Some(v);
self.inputs[idx] = Some(reading.value);
}
}

Expand Down Expand Up @@ -557,16 +571,16 @@ mod test {
// Create the common channels used by DrMem.

let (tx_req, mut c_recv) = mpsc::channel(100);
let (tx_tod, rx_tod) = broadcast::channel(100);
let (tx_solar, rx_solar) = broadcast::channel(100);
let (tx_tod, _) = broadcast::channel(100);
let (tx_solar, _) = broadcast::channel(100);

// Start the logic block with the proper communciation
// channels and configuration.

let node = Node::start(
client::RequestChan::new(tx_req),
rx_tod,
rx_solar,
tx_tod.subscribe(),
tx_solar.subscribe(),
cfg,
);

Expand Down Expand Up @@ -954,6 +968,62 @@ mod test {
assert_eq!(emu.await.unwrap(), Ok(true));
}

// Test a basic logic block in which forwards a solar parameter to
// a memory device.

#[tokio::test]
async fn test_basic_solar_node() {
const OUT1: &str = "device:out1";
const OUT2: &str = "device:out2";
let cfg = build_config(
&[],
&[("alt", OUT1), ("dec", OUT2)],
&[],
&["{solar:alt} -> {alt}", "{solar:dec} -> {dec}"],
);
let (tx_out1, mut rx_out1) = mpsc::channel(100);
let (tx_out2, mut rx_out2) = mpsc::channel(100);

let (_, tx_solar, emu, tx_stop) = Emulator::start(
vec![],
vec![(OUT1.into(), tx_out1), (OUT2.into(), tx_out2)],
cfg,
)
.await
.unwrap();

// Send a value and see if it was forwarded.

assert!(tx_solar
.send(Arc::new(solar::SolarInfo {
elevation: 1.0,
azimuth: 2.0,
right_ascension: 3.0,
declination: 4.0
}))
.is_ok());

{
let (value, rpy) = rx_out1.recv().await.unwrap();
let _ = rpy.send(Ok(value.clone()));

assert_eq!(value, device::Value::Flt(1.0));
}

{
let (value, rpy) = rx_out2.recv().await.unwrap();
let _ = rpy.send(Ok(value.clone()));

assert_eq!(value, device::Value::Flt(4.0));
}

// Stop the emulator and see that its return status is good.

let _ = tx_stop.send(());

assert_eq!(emu.await.unwrap(), Ok(true));
}

// Test a logic block with two outputs. Make sure they are sent
// "in parallel".

Expand Down
2 changes: 1 addition & 1 deletion drmemd/src/logic/solar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub fn create_task(
lat: f64,
long: f64,
) -> (broadcast::Sender<Info>, broadcast::Receiver<Info>) {
let (tx, rx) = broadcast::channel(1);
let (tx, rx) = broadcast::channel(10);
let tx_copy = tx.clone();

tokio::spawn(
Expand Down
47 changes: 22 additions & 25 deletions drmemd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,37 +156,34 @@ async fn run() -> Result<()> {
}
}

// Start the time-of-day task. This needs to be done *before*
// any logic blocks are started because logic blocks *may*
// have an expression that uses the time-of-day.
// Create a nested scope so that the tod and solar handles are
// freed up.

let (tx_tod, rx_tod) = logic::tod::create_task();

// Start the solar task. This, too, needs to be done before
// any logic blocks are started.
{
// Start the time-of-day task. This needs to be done
// *before* any logic blocks are started because logic
// blocks *may* have an expression that uses the
// time-of-day.

let (tx_solar, rx_solar) =
logic::solar::create_task(cfg.latitude, cfg.longitude);
let (tx_tod, _) = logic::tod::create_task();

// Iterate through the [[logic]] sections of the config.
// Start the solar task. This, too, needs to be done
// before any logic blocks are started.

for logic in cfg.logic {
tasks.push(wrap_task(logic::Node::start(
tx_clnt_req.clone(),
tx_tod.subscribe(),
tx_solar.subscribe(),
logic,
)));
}
let (tx_solar, _) =
logic::solar::create_task(cfg.latitude, cfg.longitude);

// Now that we've given all the logic blocks receive handles
// for the time-of-day and solar tasks, we can free up our
// copy. If we freed up our copy *before* creating new
// subscriptions, the tod or solar task may have briefly seen
// no clients and would exit.
// Iterate through the [[logic]] sections of the config.

std::mem::drop(rx_tod);
std::mem::drop(rx_solar);
for logic in cfg.logic {
tasks.push(wrap_task(logic::Node::start(
tx_clnt_req.clone(),
tx_tod.subscribe(),
tx_solar.subscribe(),
logic,
)));
}
}

// Now run all the tasks.

Expand Down
Loading