Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion applications/awkernel_services/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ extern crate alloc;

mod buffered_logger;
mod network_service;
mod storage_service;

use core::time::Duration;

const NETWORK_SERVICE_NAME: &str = "[Awkernel] network service";
const BUFFERED_LOGGER_NAME: &str = "[Awkernel] buffered logger service";
const NETWORK_SERVICE_NAME: &str = "[Awkernel] network service";
const STORAGE_SERVICE_NAME: &str = "[Awkernel] storage service";
const DISPLAY_SERVICE_NAME: &str = "[Awkernel] display service";

pub async fn run() {
Expand All @@ -26,6 +28,13 @@ pub async fn run() {
)
.await;

awkernel_async_lib::spawn(
STORAGE_SERVICE_NAME.into(),
storage_service::run(),
awkernel_async_lib::scheduler::SchedulerType::PrioritizedFIFO(0),
)
.await;

awkernel_async_lib::spawn(
DISPLAY_SERVICE_NAME.into(),
awkernel_display::run(),
Expand Down
114 changes: 114 additions & 0 deletions applications/awkernel_services/src/storage_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use core::{future::Future, task::Poll};

use alloc::{collections::BTreeMap, format};
use awkernel_async_lib::{
future::FutureExt, scheduler::SchedulerType, select_biased, session_types::*,
};

type ProtoInterruptHandler = Recv<(), Send<(), Eps>>;
type ChanProtoInterruptHandlerDual = Chan<(), <ProtoInterruptHandler as HasDual>::Dual>;

pub async fn run() {
log::info!("Starting {}.", crate::STORAGE_SERVICE_NAME);

let mut ch_irq_handlers = BTreeMap::new();

for storage_status in awkernel_lib::storage::get_all_storage_statuses() {
spawn_handlers(storage_status, &mut ch_irq_handlers).await;
}
}

async fn spawn_handlers(
storage_status: awkernel_lib::storage::StorageStatus,
ch_irq_handlers: &mut BTreeMap<u16, ChanProtoInterruptHandlerDual>,
) {
for irq in storage_status.irqs {
// Check if we already have a handler for this IRQ
if ch_irq_handlers.contains_key(&irq) {
continue;
}

let (server, client) = session_channel::<ProtoInterruptHandler>();
ch_irq_handlers.insert(irq, client);

let name = format!(
"{}:{}: IRQ = {irq}",
crate::STORAGE_SERVICE_NAME,
storage_status.device_name,
);

awkernel_async_lib::spawn(
name.into(),
interrupt_handler(storage_status.device_id, irq, server),
SchedulerType::PrioritizedFIFO(0),
)
.await;
}
}

// Interrupt handlers.

struct StorageInterrupt {
irq: u16,
wait: bool,
}

impl Future for StorageInterrupt {
type Output = ();

fn poll(
self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
let m = self.get_mut();

if !m.wait {
return Poll::Ready(());
}

m.wait = false;

if awkernel_lib::storage::register_waker_for_storage_interrupt(m.irq, cx.waker().clone()) {
Poll::Pending
} else {
Poll::Ready(())
}
}
}

async fn interrupt_handler(interface_id: u64, irq: u16, ch: Chan<(), ProtoInterruptHandler>) {
let mut ch = ch.recv().boxed().fuse();

loop {
let mut empty = async {}.boxed().fuse();

select_biased! {
(ch, _) = ch => {
let ch = ch.send(()).await;
ch.close();
return;
},
_ = empty => {},
}

if awkernel_lib::storage::handle_storage_interrupt(interface_id, irq) {
awkernel_async_lib::r#yield().await;
continue;
}

// Wait interrupts.
let mut irq_wait = StorageInterrupt { irq, wait: true }.fuse();

select_biased! {
(ch, _) = ch => {
let ch = ch.send(()).await;
ch.close();
return;
},
_ = irq_wait => {},
}

awkernel_lib::storage::handle_storage_interrupt(interface_id, irq);
awkernel_async_lib::r#yield().await;
}
}
1 change: 1 addition & 0 deletions awkernel_lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod mmio;
pub mod net;
pub mod priority_queue;
pub mod sanity;
pub mod storage;
pub mod sync;
pub mod time;
pub mod timer;
Expand Down
193 changes: 193 additions & 0 deletions awkernel_lib/src/storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
use crate::sync::{mcs::MCSNode, mutex::Mutex, rwlock::RwLock};
use alloc::{
borrow::Cow,
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
vec::Vec,
};
use storage_device::{StorageDevError, StorageDevice, StorageDeviceType};

pub mod storage_device;

#[derive(Debug)]
pub enum StorageManagerError {
InvalidDeviceID,
InvalidTransferID,
DeviceError(StorageDevError),
NotYetImplemented,
PoolNotInitialized,
}

#[derive(Debug)]
pub struct StorageStatus {
pub device_id: u64,
pub device_name: Cow<'static, str>,
pub device_type: StorageDeviceType,
pub irqs: Vec<u16>,
pub block_size: usize,
pub num_blocks: u64,
}

static STORAGE_MANAGER: RwLock<StorageManager> = RwLock::new(StorageManager {
devices: BTreeMap::new(),
device_id: 0,
});

static IRQ_WAKERS: Mutex<BTreeMap<u16, IRQWaker>> = Mutex::new(BTreeMap::new());

pub struct StorageManager {
devices: BTreeMap<u64, Arc<dyn StorageDevice>>,
device_id: u64,
}

enum IRQWaker {
Waker(core::task::Waker),
Interrupted,
}

pub fn add_storage_device(device: Arc<dyn StorageDevice + Sync + Send>) -> u64 {
let mut manager = STORAGE_MANAGER.write();

if manager.device_id == u64::MAX {
panic!("storage device id overflow");
}

let id = manager.device_id;
manager.device_id += 1;

manager.devices.insert(id, device);

id
}

pub fn get_device_block_size(device_id: u64) -> Result<usize, StorageManagerError> {
let manager = STORAGE_MANAGER.read();

let device = manager
.devices
.get(&device_id)
.ok_or(StorageManagerError::InvalidDeviceID)?;

Ok(device.block_size())
}

pub fn get_storage_device(device_id: u64) -> Result<Arc<dyn StorageDevice>, StorageManagerError> {
let manager = STORAGE_MANAGER.read();

let device = manager
.devices
.get(&device_id)
.ok_or(StorageManagerError::InvalidDeviceID)?;

Ok(device.clone())
}

pub fn get_storage_status(device_id: u64) -> Result<StorageStatus, StorageManagerError> {
let manager = STORAGE_MANAGER.read();

let device = manager
.devices
.get(&device_id)
.ok_or(StorageManagerError::InvalidDeviceID)?;

let status = StorageStatus {
device_id,
device_name: device.device_name(),
device_type: device.device_type(),
irqs: device.irqs(),
block_size: device.block_size(),
num_blocks: device.num_blocks(),
};

Ok(status)
}

pub fn get_all_storage_statuses() -> Vec<StorageStatus> {
let manager = STORAGE_MANAGER.read();

let mut result = Vec::new();

for id in manager.devices.keys() {
if let Ok(status) = get_storage_status(*id) {
result.push(status);
}
}

result
}

pub fn get_device_namespace(device_id: u64) -> Option<u32> {
let manager = STORAGE_MANAGER.read();

let device = manager.devices.get(&device_id)?;
device.get_namespace_id()
}

/// Service routine for storage device interrupt.
/// This routine should be called by interrupt handlers provided by device drivers.
pub fn storage_interrupt(irq: u16) {
let mut node = MCSNode::new();
let mut w = IRQ_WAKERS.lock(&mut node);

match w.entry(irq) {
Entry::Occupied(e) => {
if matches!(e.get(), IRQWaker::Waker(_)) {
let IRQWaker::Waker(w) = e.remove() else {
return;
};

w.wake();
}
}
Entry::Vacant(e) => {
e.insert(IRQWaker::Interrupted);
}
}
}

/// Register a waker for a storage device interrupt service.
///
/// The old waker will be replaced.
/// The waker will be called when the storage device interrupt occurs once
/// and it will be removed after it is called.
///
/// Returns true if the waker is registered successfully.
/// Returns false if the interrupt occurred before.
pub fn register_waker_for_storage_interrupt(irq: u16, waker: core::task::Waker) -> bool {
let mut node = MCSNode::new();
let mut w = IRQ_WAKERS.lock(&mut node);

let entry = w.entry(irq);

match entry {
Entry::Occupied(mut e) => {
if matches!(e.get(), IRQWaker::Interrupted) {
e.remove();
false
} else {
e.insert(IRQWaker::Waker(waker));
true
}
}
Entry::Vacant(e) => {
e.insert(IRQWaker::Waker(waker));
true
}
}
}

pub fn handle_storage_interrupt(device_id: u64, irq: u16) -> bool {
let manager = STORAGE_MANAGER.read();

let Some(device) = manager.devices.get(&device_id) else {
return false;
};

let _ = device.interrupt(irq);

drop(manager);

// TODO: Wake tasks waiting for completion

false
}
50 changes: 50 additions & 0 deletions awkernel_lib/src/storage/storage_device.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use alloc::{borrow::Cow, vec::Vec};

#[derive(Debug, Clone, Copy)]
pub enum StorageDeviceType {
NVMe,
SATA,
USB,
VirtIO,
Memory,
}

#[derive(Debug)]
pub enum StorageDevError {
IoError,
InvalidCommand,
DeviceNotReady,
InvalidBlock,
BufferTooSmall,
NotSupported,
}

pub trait StorageDevice: Send + Sync {
fn device_id(&self) -> u64;

fn device_name(&self) -> Cow<'static, str>;

fn device_short_name(&self) -> Cow<'static, str>;

fn device_type(&self) -> StorageDeviceType;

fn irqs(&self) -> Vec<u16>;

fn interrupt(&self, irq: u16) -> Result<(), StorageDevError>;

fn block_size(&self) -> usize;

fn num_blocks(&self) -> u64;

fn read_blocks(&self, buf: &mut [u8], transfer_id: u16) -> Result<(), StorageDevError>;

fn write_blocks(&self, buf: &[u8], transfer_id: u16) -> Result<(), StorageDevError>;

fn flush(&self, _transfer_id: u16) -> Result<(), StorageDevError> {
Ok(())
}

fn get_namespace_id(&self) -> Option<u32> {
None
}
}