diff --git a/applications/awkernel_services/src/lib.rs b/applications/awkernel_services/src/lib.rs index 900163ff..e9b80dbb 100644 --- a/applications/awkernel_services/src/lib.rs +++ b/applications/awkernel_services/src/lib.rs @@ -4,11 +4,13 @@ extern crate alloc; mod buffered_logger; mod network_service; +mod storage_service; use core::time::Duration; -const NETWORK_SERVICE_NAME: &str = "[Awkernel] network service"; const BUFFERED_LOGGER_NAME: &str = "[Awkernel] buffered logger service"; +const NETWORK_SERVICE_NAME: &str = "[Awkernel] network service"; +const STORAGE_SERVICE_NAME: &str = "[Awkernel] storage service"; const DISPLAY_SERVICE_NAME: &str = "[Awkernel] display service"; pub async fn run() { @@ -26,6 +28,13 @@ pub async fn run() { ) .await; + awkernel_async_lib::spawn( + STORAGE_SERVICE_NAME.into(), + storage_service::run(), + awkernel_async_lib::scheduler::SchedulerType::PrioritizedFIFO(0), + ) + .await; + awkernel_async_lib::spawn( DISPLAY_SERVICE_NAME.into(), awkernel_display::run(), diff --git a/applications/awkernel_services/src/storage_service.rs b/applications/awkernel_services/src/storage_service.rs new file mode 100644 index 00000000..6f5c2f64 --- /dev/null +++ b/applications/awkernel_services/src/storage_service.rs @@ -0,0 +1,114 @@ +use core::{future::Future, task::Poll}; + +use alloc::{collections::BTreeMap, format}; +use awkernel_async_lib::{ + future::FutureExt, scheduler::SchedulerType, select_biased, session_types::*, +}; + +type ProtoInterruptHandler = Recv<(), Send<(), Eps>>; +type ChanProtoInterruptHandlerDual = Chan<(), ::Dual>; + +pub async fn run() { + log::info!("Starting {}.", crate::STORAGE_SERVICE_NAME); + + let mut ch_irq_handlers = BTreeMap::new(); + + for storage_status in awkernel_lib::storage::get_all_storage_statuses() { + spawn_handlers(storage_status, &mut ch_irq_handlers).await; + } +} + +async fn spawn_handlers( + storage_status: awkernel_lib::storage::StorageStatus, + ch_irq_handlers: &mut BTreeMap, +) { + for irq in storage_status.irqs { + // Check if we already have a handler for this IRQ + if ch_irq_handlers.contains_key(&irq) { + continue; + } + + let (server, client) = session_channel::(); + ch_irq_handlers.insert(irq, client); + + let name = format!( + "{}:{}: IRQ = {irq}", + crate::STORAGE_SERVICE_NAME, + storage_status.device_name, + ); + + awkernel_async_lib::spawn( + name.into(), + interrupt_handler(storage_status.device_id, irq, server), + SchedulerType::PrioritizedFIFO(0), + ) + .await; + } +} + +// Interrupt handlers. + +struct StorageInterrupt { + irq: u16, + wait: bool, +} + +impl Future for StorageInterrupt { + type Output = (); + + fn poll( + self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll { + let m = self.get_mut(); + + if !m.wait { + return Poll::Ready(()); + } + + m.wait = false; + + if awkernel_lib::storage::register_waker_for_storage_interrupt(m.irq, cx.waker().clone()) { + Poll::Pending + } else { + Poll::Ready(()) + } + } +} + +async fn interrupt_handler(interface_id: u64, irq: u16, ch: Chan<(), ProtoInterruptHandler>) { + let mut ch = ch.recv().boxed().fuse(); + + loop { + let mut empty = async {}.boxed().fuse(); + + select_biased! { + (ch, _) = ch => { + let ch = ch.send(()).await; + ch.close(); + return; + }, + _ = empty => {}, + } + + if awkernel_lib::storage::handle_storage_interrupt(interface_id, irq) { + awkernel_async_lib::r#yield().await; + continue; + } + + // Wait interrupts. + let mut irq_wait = StorageInterrupt { irq, wait: true }.fuse(); + + select_biased! { + (ch, _) = ch => { + let ch = ch.send(()).await; + ch.close(); + return; + }, + _ = irq_wait => {}, + } + + awkernel_lib::storage::handle_storage_interrupt(interface_id, irq); + awkernel_async_lib::r#yield().await; + } +} diff --git a/awkernel_lib/src/lib.rs b/awkernel_lib/src/lib.rs index 37be10fb..76af7c2e 100644 --- a/awkernel_lib/src/lib.rs +++ b/awkernel_lib/src/lib.rs @@ -26,6 +26,7 @@ pub mod mmio; pub mod net; pub mod priority_queue; pub mod sanity; +pub mod storage; pub mod sync; pub mod time; pub mod timer; diff --git a/awkernel_lib/src/storage.rs b/awkernel_lib/src/storage.rs new file mode 100644 index 00000000..f222c1dc --- /dev/null +++ b/awkernel_lib/src/storage.rs @@ -0,0 +1,193 @@ +use crate::sync::{mcs::MCSNode, mutex::Mutex, rwlock::RwLock}; +use alloc::{ + borrow::Cow, + collections::{btree_map::Entry, BTreeMap}, + sync::Arc, + vec::Vec, +}; +use storage_device::{StorageDevError, StorageDevice, StorageDeviceType}; + +pub mod storage_device; + +#[derive(Debug)] +pub enum StorageManagerError { + InvalidDeviceID, + InvalidTransferID, + DeviceError(StorageDevError), + NotYetImplemented, + PoolNotInitialized, +} + +#[derive(Debug)] +pub struct StorageStatus { + pub device_id: u64, + pub device_name: Cow<'static, str>, + pub device_type: StorageDeviceType, + pub irqs: Vec, + pub block_size: usize, + pub num_blocks: u64, +} + +static STORAGE_MANAGER: RwLock = RwLock::new(StorageManager { + devices: BTreeMap::new(), + device_id: 0, +}); + +static IRQ_WAKERS: Mutex> = Mutex::new(BTreeMap::new()); + +pub struct StorageManager { + devices: BTreeMap>, + device_id: u64, +} + +enum IRQWaker { + Waker(core::task::Waker), + Interrupted, +} + +pub fn add_storage_device(device: Arc) -> u64 { + let mut manager = STORAGE_MANAGER.write(); + + if manager.device_id == u64::MAX { + panic!("storage device id overflow"); + } + + let id = manager.device_id; + manager.device_id += 1; + + manager.devices.insert(id, device); + + id +} + +pub fn get_device_block_size(device_id: u64) -> Result { + let manager = STORAGE_MANAGER.read(); + + let device = manager + .devices + .get(&device_id) + .ok_or(StorageManagerError::InvalidDeviceID)?; + + Ok(device.block_size()) +} + +pub fn get_storage_device(device_id: u64) -> Result, StorageManagerError> { + let manager = STORAGE_MANAGER.read(); + + let device = manager + .devices + .get(&device_id) + .ok_or(StorageManagerError::InvalidDeviceID)?; + + Ok(device.clone()) +} + +pub fn get_storage_status(device_id: u64) -> Result { + let manager = STORAGE_MANAGER.read(); + + let device = manager + .devices + .get(&device_id) + .ok_or(StorageManagerError::InvalidDeviceID)?; + + let status = StorageStatus { + device_id, + device_name: device.device_name(), + device_type: device.device_type(), + irqs: device.irqs(), + block_size: device.block_size(), + num_blocks: device.num_blocks(), + }; + + Ok(status) +} + +pub fn get_all_storage_statuses() -> Vec { + let manager = STORAGE_MANAGER.read(); + + let mut result = Vec::new(); + + for id in manager.devices.keys() { + if let Ok(status) = get_storage_status(*id) { + result.push(status); + } + } + + result +} + +pub fn get_device_namespace(device_id: u64) -> Option { + let manager = STORAGE_MANAGER.read(); + + let device = manager.devices.get(&device_id)?; + device.get_namespace_id() +} + +/// Service routine for storage device interrupt. +/// This routine should be called by interrupt handlers provided by device drivers. +pub fn storage_interrupt(irq: u16) { + let mut node = MCSNode::new(); + let mut w = IRQ_WAKERS.lock(&mut node); + + match w.entry(irq) { + Entry::Occupied(e) => { + if matches!(e.get(), IRQWaker::Waker(_)) { + let IRQWaker::Waker(w) = e.remove() else { + return; + }; + + w.wake(); + } + } + Entry::Vacant(e) => { + e.insert(IRQWaker::Interrupted); + } + } +} + +/// Register a waker for a storage device interrupt service. +/// +/// The old waker will be replaced. +/// The waker will be called when the storage device interrupt occurs once +/// and it will be removed after it is called. +/// +/// Returns true if the waker is registered successfully. +/// Returns false if the interrupt occurred before. +pub fn register_waker_for_storage_interrupt(irq: u16, waker: core::task::Waker) -> bool { + let mut node = MCSNode::new(); + let mut w = IRQ_WAKERS.lock(&mut node); + + let entry = w.entry(irq); + + match entry { + Entry::Occupied(mut e) => { + if matches!(e.get(), IRQWaker::Interrupted) { + e.remove(); + false + } else { + e.insert(IRQWaker::Waker(waker)); + true + } + } + Entry::Vacant(e) => { + e.insert(IRQWaker::Waker(waker)); + true + } + } +} + +pub fn handle_storage_interrupt(device_id: u64, irq: u16) -> bool { + let manager = STORAGE_MANAGER.read(); + + let Some(device) = manager.devices.get(&device_id) else { + return false; + }; + + let _ = device.interrupt(irq); + + drop(manager); + + // TODO: Wake tasks waiting for completion + + false +} diff --git a/awkernel_lib/src/storage/storage_device.rs b/awkernel_lib/src/storage/storage_device.rs new file mode 100644 index 00000000..18279416 --- /dev/null +++ b/awkernel_lib/src/storage/storage_device.rs @@ -0,0 +1,50 @@ +use alloc::{borrow::Cow, vec::Vec}; + +#[derive(Debug, Clone, Copy)] +pub enum StorageDeviceType { + NVMe, + SATA, + USB, + VirtIO, + Memory, +} + +#[derive(Debug)] +pub enum StorageDevError { + IoError, + InvalidCommand, + DeviceNotReady, + InvalidBlock, + BufferTooSmall, + NotSupported, +} + +pub trait StorageDevice: Send + Sync { + fn device_id(&self) -> u64; + + fn device_name(&self) -> Cow<'static, str>; + + fn device_short_name(&self) -> Cow<'static, str>; + + fn device_type(&self) -> StorageDeviceType; + + fn irqs(&self) -> Vec; + + fn interrupt(&self, irq: u16) -> Result<(), StorageDevError>; + + fn block_size(&self) -> usize; + + fn num_blocks(&self) -> u64; + + fn read_blocks(&self, buf: &mut [u8], transfer_id: u16) -> Result<(), StorageDevError>; + + fn write_blocks(&self, buf: &[u8], transfer_id: u16) -> Result<(), StorageDevError>; + + fn flush(&self, _transfer_id: u16) -> Result<(), StorageDevError> { + Ok(()) + } + + fn get_namespace_id(&self) -> Option { + None + } +}