Skip to content

Commit

Permalink
send and receive are no longer threads but tasks #14
Browse files Browse the repository at this point in the history
  • Loading branch information
jimy-byerley committed Oct 18, 2023
1 parent 7bd2a2f commit 6f22412
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 178 deletions.
48 changes: 27 additions & 21 deletions examples/pdu_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,39 @@ use etherage::{Field, EthernetSocket, RawMaster, EthercatResult};
#[tokio::main]
async fn main() -> EthercatResult<()> {
let master = Arc::new(RawMaster::new(EthernetSocket::new("eno1")?));
{
let master = master.clone();
std::thread::spawn(move || loop {
master.receive().unwrap();
})};
{
let master = master.clone();
std::thread::spawn(move || loop {
master.send().unwrap();
})};

let reg = Field::<u16>::simple(0x1234);
let slave = 0;

// test read/write
let received = master.aprd(slave, reg).await.one()?;
master.apwr(slave, reg, received).await.one()?;

// test simultaneous read/write

(
async { master.receive().await.unwrap() },
async { master.send().await.unwrap() },
async {

let reg = Field::<u16>::simple(0x1234);
let slave = 0;

// test read/write
dbg!(1);
let received = master.aprd(slave, reg).await.one().unwrap();
dbg!(2);
master.apwr(slave, reg, received).await.one().unwrap();
dbg!(3);

// test simultaneous read/write
(
async {
let received = master.aprd(slave, reg).await.one().unwrap();
master.apwr(slave, reg, received).await.one().unwrap();
},
async {
let received = master.aprd(slave, reg).await.one().unwrap();
master.apwr(slave, reg, received).await.one().unwrap();
},
).join().await;
},
async {
let received = master.aprd(slave, reg).await.one().unwrap();
master.apwr(slave, reg, received).await.one().unwrap();
loop {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
println!("running");
}
},
).join().await;

Expand Down
67 changes: 32 additions & 35 deletions examples/sdo_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,44 @@ use bilge::prelude::u2;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let master = Arc::new(RawMaster::new(EthernetSocket::new("eno1")?));
{
let master = master.clone();
std::thread::spawn(move || loop {
master.receive().unwrap();
})};
{
let master = master.clone();
std::thread::spawn(move || loop {
master.send().unwrap();
})};

(
async { master.receive().await.unwrap() },
async { master.send().await.unwrap() },
async {

let mut slave = Slave::raw(master.clone(), SlaveAddress::AutoIncremented(0));
slave.switch(CommunicationState::Init).await?;
slave.set_address(1).await?;
slave.init_mailbox().await?;
slave.init_coe().await;
let mut slave = Slave::raw(master.clone(), SlaveAddress::AutoIncremented(0));
slave.switch(CommunicationState::Init).await.unwrap();
slave.set_address(1).await.unwrap();
slave.init_mailbox().await.unwrap();
slave.init_coe().await;

slave.switch(CommunicationState::PreOperational).await?;
slave.switch(CommunicationState::PreOperational).await.unwrap();

let sdo = Sdo::<u32>::complete(0x1c12);
let priority = u2::new(1);
let sdo = Sdo::<u32>::complete(0x1c12);
let priority = u2::new(1);

// test read/write
let received = slave.coe().await.sdo_read(&sdo, priority).await?;
slave.coe().await.sdo_write(&sdo, priority, received).await?;

// test concurrent read/write
(
async {
println!("begin");
let received = slave.coe().await.sdo_read(&sdo, priority).await.unwrap();
println!("between");
slave.coe().await.sdo_write(&sdo, priority, received).await.unwrap();
println!("end");
},
async {
println!("begin");
// test read/write
let received = slave.coe().await.sdo_read(&sdo, priority).await.unwrap();
println!("between");
slave.coe().await.sdo_write(&sdo, priority, received).await.unwrap();
println!("end");

// test concurrent read/write
(
async {
println!("begin");
let received = slave.coe().await.sdo_read(&sdo, priority).await.unwrap();
println!("between");
slave.coe().await.sdo_write(&sdo, priority, received).await.unwrap();
println!("end");
},
async {
println!("begin");
let received = slave.coe().await.sdo_read(&sdo, priority).await.unwrap();
println!("between");
slave.coe().await.sdo_write(&sdo, priority, received).await.unwrap();
println!("end");
},
).join().await;
},
).join().await;

Expand Down
5 changes: 0 additions & 5 deletions src/master.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,12 @@ impl Master {
loop {
let status = self.raw.brd(registers::al::response).await;
if status.value().unwrap().error() {
// {return Err(EthercatError::Slave(()))}
for slave in 0 .. status.answers {
let error = self.raw.aprd(slave, registers::al::error).await.one()?;
if error != AlError::NoError
{return Err(EthercatError::Slave(error))}
}
}
// print!("slaves state {:?} waiting {:?} ",
// status.state(),
// target,
// );
if status.value().unwrap().state() == target.into()
{break}
}
Expand Down
147 changes: 93 additions & 54 deletions src/rawmaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@
*/

use std::sync::{Mutex, Condvar};
use std::os::fd::AsRawFd;
use std::time::Instant;
use tokio::io::unix::AsyncFd;
use core::{
ops::DerefMut,
time::Duration,
};
use tokio::sync::Notify;
use tokio_timerfd::Delay;
use bilge::prelude::*;
use futures_concurrency::future::{Join, Race};
use core::future::poll_fn;

use crate::{
error::{EthercatError, EthercatResult},
Expand All @@ -29,6 +35,8 @@ const MIN_PDU: usize = 60;
/// maximum number of PDU in an ethercat frame
const MAX_ETHERCAT_PDU: usize = MAX_ETHERCAT_FRAME / MIN_PDU;

// trait EthercatSocket: crate::socket::EthercatSocket + AsRawFd {}

/**
low level ethercat communication functions, with no notion of slave.
Expand Down Expand Up @@ -65,7 +73,7 @@ pub struct RawMaster {
socket: Box<dyn EthercatSocket + Send + Sync>,
// synchronization signal for multitask reception
received: Notify,
sendable: Condvar,
sendable: Notify,
sent: Notify,

// communication state
Expand Down Expand Up @@ -99,7 +107,7 @@ impl RawMaster {

socket: Box::new(socket),
received: Notify::new(),
sendable: Condvar::new(),
sendable: Notify::new(),
sent: Notify::new(),

pdu_state: Mutex::new(PduState {
Expand Down Expand Up @@ -330,6 +338,7 @@ impl RawMaster {
// clean up the receive table at function end, or in case the async runtime cancels this task
_finisher = Finisher::new(|| {
let mut state = self.pdu_state.lock().unwrap();
// free the token
state.receive[token] = None;
state.free.push(token).unwrap();

Expand All @@ -350,7 +359,6 @@ impl RawMaster {
}

{
// free the token
let state = self.pdu_state.lock().unwrap();
state.receive[token].as_ref().unwrap().answers
}
Expand Down Expand Up @@ -407,60 +415,91 @@ impl RawMaster {
In case something goes wrong during PDUs unpacking, the PDUs successfully processed will be reported to their pending futures, the futures for PDU which caused the read to fail and all following PDUs in the frame will be left pending (since the data is corrupted, there is no way to determine which future should be aborted)
*/
pub fn receive(&self) -> EthercatResult {
let mut receive = self.ethercat_receive.lock().unwrap();
let size = self.socket.receive(receive.deref_mut())?;
let mut frame = Cursor::new(&receive[.. size]);

let header = frame.unpack::<EthercatHeader>()?;
if frame.remain().len() < header.len().value() as usize
{return Err(EthercatError::Protocol("received frame header has inconsistent length"))}
let content = &frame.remain()[.. header.len().value() as usize];

assert!(header.len().value() as usize <= content.len());
// TODO check working count to detect possible refused requests
match header.ty() {
EthercatType::PDU => self.pdu_receive(
self.pdu_state.lock().unwrap().deref_mut(),
content,
)?,
// what is this ?
EthercatType::NetworkVariable => todo!(),
// no mailbox frame shall transit to this master, ignore it or raise an error ?
EthercatType::Mailbox => {},
pub async fn receive(&self) -> EthercatResult {
loop {
let mut receive = self.ethercat_receive.lock().unwrap();
let size = poll_fn(|cx|
self.socket.poll_receive(cx, receive.deref_mut())
).await?;
let mut frame = Cursor::new(&receive[.. size]);

let header = frame.unpack::<EthercatHeader>()?;
if frame.remain().len() < header.len().value() as usize
{return Err(EthercatError::Protocol("received frame header has inconsistent length"))}
let content = &frame.remain()[.. header.len().value() as usize];

assert!(header.len().value() as usize <= content.len());
match header.ty() {
EthercatType::PDU => self.pdu_receive(
self.pdu_state.lock().unwrap().deref_mut(),
content,
)?,
// what is this ?
EthercatType::NetworkVariable => todo!(),
// no mailbox frame shall transit to this master, ignore it or raise an error ?
EthercatType::Mailbox => {},
}
}
Ok(())
}
/// this is the socket sending handler
pub fn send(&self) -> EthercatResult {
let mut state = self.pdu_state.lock().unwrap();
// wait indefinitely if no data to send
while state.last_end == 0
{state = self.sendable.wait(state).unwrap();}
// wait for more data until a timeout once data is present
if ! state.ready
{state = self.sendable.wait_timeout_while(
state,
self.pdu_merge_time,
|state| ! state.ready,
).unwrap().0;}

// check header
EthercatHeader::new(
u11::new((state.last_end - EthercatHeader::packed_size()) as u16),
EthercatType::PDU,
).pack(&mut state.send).unwrap();

// send
// we are blocking the async machine until the send ends
// we assume it is not a long operation to copy those data into the system buffer
self.socket.send(&state.send[.. state.last_end])?;
// reset state
state.ready = false;
state.last_end = 0;
state.last_start = EthercatHeader::packed_size();
self.sent.notify_waiters();
Ok(())
pub async fn send(&self) -> EthercatResult {
let mut delay = Delay::new(Instant::now())?;
loop {
let mut delay = &mut delay;

// wait indefinitely if no data to send
let mut state = loop {
self.sendable.notified().await;
let state = self.pdu_state.lock().unwrap();
if state.last_end != 0 {break state}
};

if ! state.ready {
drop(state);
delay.reset(Instant::now() + self.pdu_merge_time);
// wait for more data until a timeout once data is present
state = (
// timeout for sending the batch
async {
delay.await.unwrap();
/// TODO: handle the possible ioerror in the delay
self.pdu_state.lock().unwrap()
},
// wait for more data in the batch
async { loop {
self.sendable.notified().await;
let state = self.pdu_state.lock().unwrap();
if state.ready {break state}
}},
).race().await;
}

// check header
EthercatHeader::new(
u11::new((state.last_end - EthercatHeader::packed_size()) as u16),
EthercatType::PDU,
).pack(&mut state.send).unwrap();

let send = unsafe {std::slice::from_raw_parts_mut(
state.send.as_mut_ptr(),
state.last_end,
)};
drop(state);
// send
// we are blocking the async machine until the send ends
// we assume it is not a long operation to copy those data into the system buffer
poll_fn(|cx|
self.socket.poll_send(cx, &send)
).await?;

let mut state = self.pdu_state.lock().unwrap();
// reset state
state.ready = false;
state.last_end = 0;
state.last_start = EthercatHeader::packed_size();
drop(state);
self.sent.notify_waiters();
}
}
}

Expand Down
5 changes: 0 additions & 5 deletions src/slave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,6 @@ impl<'a> Slave<'a> {
if error != registers::AlError::NoError
{return Err(EthercatError::Slave(error))}
}
// print!("slave {:?} state {:?} waiting {:?} \r",
// self.address,
// CommunicationState::try_from(status.state()).unwrap(),
// target,
// );
if status.state() == target.into() {break}
}
self.state = target;
Expand Down
Loading

0 comments on commit 6f22412

Please sign in to comment.