Skip to content

Commit

Permalink
Cancel queue
Browse files Browse the repository at this point in the history
  • Loading branch information
mzohreva committed Nov 4, 2020
1 parent 5759959 commit f940d00
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 33 deletions.
6 changes: 3 additions & 3 deletions enclave-runner/src/usercalls/abi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::future::Future;

type Register = u64;

trait RegisterArgument {
pub(super) trait RegisterArgument {
fn from_register(_: Register) -> Self;
fn into_register(self) -> Register;
}
Expand All @@ -29,7 +29,7 @@ type EnclaveAbort = super::EnclaveAbort<bool>;
pub(crate) type UsercallResult<T> = ::std::result::Result<T, EnclaveAbort>;
pub(crate) type DispatchResult = UsercallResult<(Register, Register)>;

trait ReturnValue {
pub(super) trait ReturnValue {
fn into_registers(self) -> DispatchResult;
}

Expand All @@ -38,7 +38,7 @@ macro_rules! define_usercalls {
($(fn $f:ident($($n:ident: $t:ty),*) $(-> $r:tt)*; )*) => {
#[repr(C)]
#[allow(non_camel_case_types)]
enum UsercallList {
pub(super) enum UsercallList {
__enclave_usercalls_invalid,
$($f,)*
}
Expand Down
7 changes: 4 additions & 3 deletions enclave-runner/src/usercalls/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,13 @@ impl<'future, 'ioinput: 'future, 'tcs: 'ioinput> Usercalls<'future> for Handler<
self,
usercall_queue: *mut FifoDescriptor<Usercall>,
return_queue: *mut FifoDescriptor<Return>,
cancel_queue: *mut FifoDescriptor<Cancel>,
) -> std::pin::Pin<Box<dyn Future<Output = (Self, UsercallResult<Result>)> + 'future>> {
async move {
unsafe {
let ret = match (usercall_queue.as_mut(), return_queue.as_mut()) {
(Some(usercall_queue), Some(return_queue)) => {
self.0.async_queues(usercall_queue, return_queue).await.map(Ok)
self.0.async_queues(usercall_queue, return_queue, cancel_queue.as_mut()).await.map(Ok)
},
_ => {
Ok(Err(IoErrorKind::InvalidInput.into()))
Expand Down Expand Up @@ -321,13 +322,13 @@ fn result_from_io_error(err: IoError) -> Result {
ret as _
}

trait ToSgxResult {
pub(super) trait ToSgxResult {
type Return;

fn to_sgx_result(self) -> Self::Return;
}

trait SgxReturn {
pub(super) trait SgxReturn {
fn on_error() -> Self;
}

Expand Down
149 changes: 130 additions & 19 deletions enclave-runner/src/usercalls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use std::alloc::{GlobalAlloc, Layout, System};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::io::{self, ErrorKind as IoErrorKind, Read, Result as IoResult};
use std::pin::Pin;
use std::result::Result as StdResult;
Expand Down Expand Up @@ -38,7 +38,7 @@ use sgxs::loader::Tcs as SgxsTcs;

use crate::loader::{EnclavePanic, ErasedTcs};
use crate::tcs::{self, CoResult, ThreadResult};
use self::abi::dispatch;
use self::abi::{dispatch, UsercallList};
use self::interface::{Handler, OutputBuffer};

pub(crate) mod abi;
Expand All @@ -48,10 +48,11 @@ lazy_static! {
static ref DEBUGGER_TOGGLE_SYNC: Mutex<()> = Mutex::new(());
}

const EV_ABORT: u64 = 0b0000_0000_0000_1000;
const EV_ABORT: u64 = 0b0000_0000_0001_0000;

const USERCALL_QUEUE_SIZE: usize = 16;
const RETURN_QUEUE_SIZE: usize = 1024;
const CANCEL_QUEUE_SIZE: usize = USERCALL_QUEUE_SIZE * 2;

enum UsercallSendData {
Sync(ThreadResult<ErasedTcs>, RunningTcs, RefCell<[u8; 1024]>),
Expand All @@ -61,7 +62,7 @@ enum UsercallSendData {
// This is the same as UsercallSendData except that it can't be Sync(CoResult::Return(...), ...)
enum UsercallHandleData {
Sync(tcs::Usercall<ErasedTcs>, RunningTcs, RefCell<[u8; 1024]>),
Async(Identified<Usercall>),
Async(Identified<Usercall>, Option<async_mpsc::UnboundedSender<UsercallEvent>>),
}

type EnclaveResult = StdResult<(u64, u64), EnclaveAbort<Option<EnclavePanic>>>;
Expand Down Expand Up @@ -511,7 +512,7 @@ struct PendingEvents {

impl PendingEvents {
// Will error if it doesn't fit in a `u64`
const EV_MAX_U64: u64 = (EV_USERCALLQ_NOT_FULL | EV_RETURNQ_NOT_EMPTY | EV_UNPARK) + 1;
const EV_MAX_U64: u64 = (EV_USERCALLQ_NOT_FULL | EV_RETURNQ_NOT_EMPTY | EV_UNPARK | EV_CANCELQ_NOT_FULL) + 1;
const EV_MAX: usize = Self::EV_MAX_U64 as _;
// Will error if it doesn't fit in a `usize`
const _ERROR_IF_USIZE_TOO_SMALL: u64 = u64::MAX + (Self::EV_MAX_U64 - (Self::EV_MAX as u64));
Expand Down Expand Up @@ -586,6 +587,7 @@ impl EnclaveKind {
struct FifoGuards {
usercall_queue: DescriptorGuard<Usercall>,
return_queue: DescriptorGuard<Return>,
cancel_queue: DescriptorGuard<Cancel>,
async_queues_called: bool,
}

Expand Down Expand Up @@ -631,6 +633,31 @@ impl Work {
}
}

enum UsercallEvent {
Started(u64, tokio::sync::oneshot::Sender<()>),
Finished(u64),
Cancelled(u64, Instant),
}

fn ignore_cancel_impl(usercall_nr: u64) -> bool {
usercall_nr != UsercallList::read as u64 &&
usercall_nr != UsercallList::read_alloc as u64 &&
usercall_nr != UsercallList::write as u64 &&
usercall_nr != UsercallList::accept_stream as u64 &&
usercall_nr != UsercallList::connect_stream as u64 &&
usercall_nr != UsercallList::wait as u64
}

trait IgnoreCancel {
fn ignore_cancel(&self) -> bool;
}
impl IgnoreCancel for Identified<Usercall> {
fn ignore_cancel(&self) -> bool { ignore_cancel_impl(self.data.0) }
}
impl IgnoreCancel for Identified<Cancel> {
fn ignore_cancel(&self) -> bool { ignore_cancel_impl(self.data.usercall_nr) }
}

impl EnclaveState {
fn event_queue_add_tcs(
event_queues: &mut FnvHashMap<TcsAddress, futures::channel::mpsc::UnboundedSender<u8>>,
Expand Down Expand Up @@ -703,15 +730,41 @@ impl EnclaveState {
tx_return_channel: tokio::sync::mpsc::UnboundedSender<(EnclaveResult, ReturnSource)>,
mut handle_data: UsercallHandleData,
) {
let notifier_rx = match handle_data {
UsercallHandleData::Async(ref usercall, Some(ref usercall_event_tx)) => {
let (notifier_tx, notifier_rx) = tokio::sync::oneshot::channel();
usercall_event_tx.send(UsercallEvent::Started(usercall.id, notifier_tx)).ok()
.expect("failed to send usercall event");
Some(notifier_rx)
},
_ => None,
};
let (parameters, mode, tcs) = match handle_data {
UsercallHandleData::Sync(ref usercall, ref mut tcs, _) => (usercall.parameters(), tcs.mode.into(), Some(tcs)),
UsercallHandleData::Async(ref usercall) => (usercall.data.into(), ReturnSource::AsyncUsercall, None),
UsercallHandleData::Async(ref usercall, _) => (usercall.data.into(), ReturnSource::AsyncUsercall, None),
};
let mut input = IOHandlerInput { enclave: enclave.clone(), tcs, work_sender: &work_sender };
let handler = Handler(&mut input);
let (_handler, result) = {
let result = {
use self::interface::ToSgxResult;
use self::abi::ReturnValue;

let (p1, p2, p3, p4, p5) = parameters;
dispatch(handler, p1, p2, p3, p4, p5).await
match notifier_rx {
None => dispatch(handler, p1, p2, p3, p4, p5).await.1,
Some(notifier_rx) => {
let a = dispatch(handler, p1, p2, p3, p4, p5).boxed_local();
let b = notifier_rx;
match futures::future::select(a, b).await {
Either::Left((ret, _)) => ret.1,
Either::Right((Ok(()), _)) => {
let result: IoResult<usize> = Err(IoErrorKind::Interrupted.into());
ReturnValue::into_registers(Ok(result.to_sgx_result()))
},
Either::Right((Err(_), _)) => panic!("notifier channel closed unexpectedly"),
}
},
}
};
let ret = match result {
Ok(ret) => {
Expand All @@ -722,7 +775,11 @@ impl EnclaveState {
entry: CoEntry::Resume(usercall, ret),
}).expect("Work sender couldn't send data to receiver");
}
UsercallHandleData::Async(usercall) => {
UsercallHandleData::Async(usercall, usercall_event_tx) => {
if let Some(usercall_event_tx) = usercall_event_tx {
usercall_event_tx.send(UsercallEvent::Finished(usercall.id)).ok()
.expect("failed to send usercall event");
}
let return_queue_tx = enclave.return_queue_tx.lock().await.clone().expect("return_queue_tx not initialized");
let ret = Identified {
id: usercall.id,
Expand All @@ -741,7 +798,7 @@ impl EnclaveState {
trap_attached_debugger(usercall.tcs_address() as _).await;
EnclavePanic::from(debug_buf.into_inner())
}
UsercallHandleData::Async(_) => {
UsercallHandleData::Async(_, _) => {
// TODO: https://github.com/fortanix/rust-sgx/issues/235#issuecomment-641811437
EnclavePanic::DebugStr("async exit with a panic".to_owned())
}
Expand Down Expand Up @@ -819,14 +876,16 @@ impl EnclaveState {
};
let enclave_clone = enclave.clone();
let io_future = async move {
let (usercall_queue_synchronizer, return_queue_synchronizer, sync_usercall_tx) = QueueSynchronizer::new(enclave_clone.clone());
let (uqs, rqs, cqs, sync_usercall_tx) = QueueSynchronizer::new(enclave_clone.clone());

let (usercall_queue_tx, usercall_queue_rx) = ipc_queue::bounded_async(USERCALL_QUEUE_SIZE, usercall_queue_synchronizer);
let (return_queue_tx, return_queue_rx) = ipc_queue::bounded_async(RETURN_QUEUE_SIZE, return_queue_synchronizer);
let (usercall_queue_tx, usercall_queue_rx) = ipc_queue::bounded_async(USERCALL_QUEUE_SIZE, uqs);
let (return_queue_tx, return_queue_rx) = ipc_queue::bounded_async(RETURN_QUEUE_SIZE, rqs);
let (cancel_queue_tx, cancel_queue_rx) = ipc_queue::bounded_async(CANCEL_QUEUE_SIZE, cqs);

let fifo_guards = FifoGuards {
usercall_queue: usercall_queue_tx.into_descriptor_guard(),
return_queue: return_queue_rx.into_descriptor_guard(),
cancel_queue: cancel_queue_tx.into_descriptor_guard(),
async_queues_called: false,
};

Expand All @@ -839,14 +898,51 @@ impl EnclaveState {
}
});

let (usercall_event_tx, mut usercall_event_rx) = async_mpsc::unbounded_channel();
let usercall_event_tx_clone = usercall_event_tx.clone();
tokio::task::spawn_local(async move {
while let Ok(c) = cancel_queue_rx.recv().await {
if !c.ignore_cancel() {
let _ = usercall_event_tx_clone.send(UsercallEvent::Cancelled(c.id, Instant::now()));
}
}
});

tokio::task::spawn_local(async move {
let mut notifiers = HashMap::new();
let mut cancels: HashMap<u64, Instant> = HashMap::new();
// This should be greater than the amount of time it takes for the enclave runner
// to start executing a usercall after the enclave sends it on the usercall_queue.
const CANCEL_EXPIRY: Duration = Duration::from_millis(100);
loop {
match usercall_event_rx.recv().await.expect("usercall_event channel closed unexpectedly") {
UsercallEvent::Started(id, notifier) => match cancels.remove(&id) {
Some(t) if t.elapsed() < CANCEL_EXPIRY => { let _ = notifier.send(()); },
_ => { notifiers.insert(id, notifier); },
},
UsercallEvent::Finished(id) => { notifiers.remove(&id); },
UsercallEvent::Cancelled(id, t) => if t.elapsed() < CANCEL_EXPIRY {
match notifiers.remove(&id) {
Some(notifier) => { let _ = notifier.send(()); },
None => { cancels.insert(id, t); },
}
},
}
// cleanup expired cancels
let now = Instant::now();
cancels.retain(|_id, &mut t| now - t < CANCEL_EXPIRY);
}
});

let mut recv_queue = io_queue_receive.into_future();
while let (Some(work), stream) = recv_queue.await {
recv_queue = stream.into_future();
let enclave_clone = enclave_clone.clone();
let tx_return_channel = tx_return_channel.clone();
match work {
UsercallSendData::Async(usercall) => {
let uchd = UsercallHandleData::Async(usercall);
let usercall_event_tx = if usercall.ignore_cancel() { None } else { Some(usercall_event_tx.clone()) };
let uchd = UsercallHandleData::Async(usercall, usercall_event_tx);
let fut = Self::handle_usercall(enclave_clone, work_sender.clone(), tx_return_channel, uchd);
tokio::task::spawn_local(fut);
}
Expand Down Expand Up @@ -1439,7 +1535,7 @@ impl<'tcs> IOHandlerInput<'tcs> {
}

fn check_event_set(set: u64) -> IoResult<u8> {
const EV_ALL: u64 = EV_USERCALLQ_NOT_FULL | EV_RETURNQ_NOT_EMPTY | EV_UNPARK;
const EV_ALL: u64 = EV_USERCALLQ_NOT_FULL | EV_RETURNQ_NOT_EMPTY | EV_UNPARK | EV_CANCELQ_NOT_FULL;
if (set & !EV_ALL) != 0 {
return Err(IoErrorKind::InvalidInput.into());
}
Expand Down Expand Up @@ -1584,12 +1680,16 @@ impl<'tcs> IOHandlerInput<'tcs> {
&mut self,
usercall_queue: &mut FifoDescriptor<Usercall>,
return_queue: &mut FifoDescriptor<Return>,
cancel_queue: Option<&mut FifoDescriptor<Cancel>>,
) -> StdResult<(), EnclaveAbort<bool>> {
let mut fifo_guards = self.enclave.fifo_guards.lock().await;
match &mut *fifo_guards {
Some(ref mut fifo_guards) if !fifo_guards.async_queues_called => {
*usercall_queue = fifo_guards.usercall_queue.fifo_descriptor();
*return_queue = fifo_guards.return_queue.fifo_descriptor();
if let Some(cancel_queue) = cancel_queue {
*cancel_queue = fifo_guards.cancel_queue.fifo_descriptor();
}
fifo_guards.async_queues_called = true;
Ok(())
}
Expand All @@ -1608,6 +1708,7 @@ impl<'tcs> IOHandlerInput<'tcs> {
enum Queue {
Usercall,
Return,
Cancel,
}

struct QueueSynchronizer {
Expand All @@ -1620,14 +1721,15 @@ struct QueueSynchronizer {
}

impl QueueSynchronizer {
fn new(enclave: Arc<EnclaveState>) -> (Self, Self, broadcast::Sender<()>) {
fn new(enclave: Arc<EnclaveState>) -> (Self, Self, Self, broadcast::Sender<()>) {
// This broadcast channel is used to notify enclave-runner of any
// synchronous usercalls made by the enclave for the purpose of
// synchronizing access to usercall and return queues.
// The size of this channel should not matter since recv() can
// return RecvError::Lagged.
let (tx, rx1) = broadcast::channel(1);
let rx2 = tx.subscribe();
let rx3 = tx.subscribe();
let usercall_queue_synchronizer = QueueSynchronizer {
queue: Queue::Usercall,
enclave: enclave.clone(),
Expand All @@ -1636,11 +1738,17 @@ impl QueueSynchronizer {
};
let return_queue_synchronizer = QueueSynchronizer {
queue: Queue::Return,
enclave,
enclave: enclave.clone(),
subscription: Mutex::new(rx2),
subscription_maker: tx.clone(),
};
(usercall_queue_synchronizer, return_queue_synchronizer, tx)
let cancel_queue_synchronizer = QueueSynchronizer {
queue: Queue::Cancel,
enclave,
subscription: Mutex::new(rx3),
subscription_maker: tx.clone(),
};
(usercall_queue_synchronizer, return_queue_synchronizer, cancel_queue_synchronizer, tx)
}
}

Expand All @@ -1659,6 +1767,7 @@ impl ipc_queue::AsyncSynchronizer for QueueSynchronizer {
fn wait(&self, event: QueueEvent) -> Pin<Box<dyn Future<Output = StdResult<(), ipc_queue::SynchronizationError>> + '_>> {
match (self.queue, event) {
(Queue::Usercall, QueueEvent::NotFull) => panic!("enclave runner should not send on the usercall queue"),
(Queue::Cancel, QueueEvent::NotFull) => panic!("enclave runner should not send on the cancel queue"),
(Queue::Return, QueueEvent::NotEmpty) => panic!("enclave runner should not receive on the return queue"),
_ => {}
}
Expand All @@ -1677,12 +1786,14 @@ impl ipc_queue::AsyncSynchronizer for QueueSynchronizer {
fn notify(&self, event: QueueEvent) {
let ev = match (self.queue, event) {
(Queue::Usercall, QueueEvent::NotEmpty) => panic!("enclave runner should not send on the usercall queue"),
(Queue::Cancel, QueueEvent::NotEmpty) => panic!("enclave runner should not send on the cancel queue"),
(Queue::Return, QueueEvent::NotFull) => panic!("enclave runner should not receive on the return queue"),
(Queue::Usercall, QueueEvent::NotFull) => EV_USERCALLQ_NOT_FULL,
(Queue::Cancel, QueueEvent::NotFull) => EV_CANCELQ_NOT_FULL,
(Queue::Return, QueueEvent::NotEmpty) => EV_RETURNQ_NOT_EMPTY,
};
// When the enclave needs to wait on a queue, it executes the wait() usercall synchronously,
// specifying EV_USERCALLQ_NOT_FULL, EV_RETURNQ_NOT_EMPTY, or both in the event_mask.
// specifying EV_USERCALLQ_NOT_FULL, EV_RETURNQ_NOT_EMPTY or EV_CANCELQ_NOT_FULL in the event_mask.
// Userspace will wake any or all threads waiting on the appropriate event when it is triggered.
for queue in self.enclave.event_queues.values() {
let _ = queue.unbounded_send(ev as _);
Expand Down
Loading

0 comments on commit f940d00

Please sign in to comment.