Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: Xinye <[email protected]>
  • Loading branch information
Xinye committed Oct 25, 2023
1 parent e6b4645 commit 5d546cf
Showing 1 changed file with 39 additions and 16 deletions.
55 changes: 39 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ macro_rules! fail_point {
#[cfg(feature = "async")]
mod async_imp {
use super::*;
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a, Global>>;
type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;

#[derive(Clone)]
pub(crate) struct AsyncCallback(
Expand Down Expand Up @@ -997,9 +997,16 @@ mod async_imp {
match task {
Task::Off => {}
Task::Return(s) => return Some(s),
Task::Sleep(_) => panic!(
"fail does not support async sleep, please use a async closure to sleep."
),
Task::Sleep(t) => {
let not = Arc::new(tokio::sync::Notify::new());
let not_for_thread = not.clone();
let handle = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(t));
not_for_thread.notify_waiters();
});
not.notified().await;
handle.join().unwrap();
}
Task::Panic(msg) => match msg {
Some(ref msg) => panic!("{}", msg),
None => panic!("failpoint {} panic", name),
Expand All @@ -1010,9 +1017,11 @@ mod async_imp {
},
Task::Pause => unreachable!(),
Task::Yield => thread::yield_now(),
Task::Delay(_) => panic!(
"fail does not support async delay, please use a async closure to delay."
),
Task::Delay(t) => {
let timer = Instant::now();
let timeout = Duration::from_millis(t);
while timer.elapsed() < timeout {}
}
Task::Callback(f) => {
f.run();
}
Expand Down Expand Up @@ -1242,19 +1251,17 @@ mod tests {
#[cfg(feature = "async")]
#[cfg_attr(not(feature = "failpoints"), ignore)]
#[tokio::test]
async fn test_async_failpoint() {
use std::time::Duration;

async fn test_async_failpoints() {
let f1 = async {
async_fail_point!("cb");
async_fail_point!("async_cb");
};
let f2 = async {
async_fail_point!("cb");
async_fail_point!("async_cb");
};

let counter = Arc::new(AtomicUsize::new(0));
let counter2 = counter.clone();
cfg_async_callback("cb", move || {
cfg_async_callback("async_cb", move || {
counter2.fetch_add(1, Ordering::SeqCst);
Box::pin(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
Expand All @@ -1265,19 +1272,35 @@ mod tests {
f2.await;
assert_eq!(2, counter.load(Ordering::SeqCst));

cfg("pause", "pause").unwrap();
cfg("async_pause", "pause").unwrap();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let handle = tokio::spawn(async move {
async_fail_point!("pause");
async_fail_point!("async_pause");
tx.send(()).await.unwrap();
});
tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
.unwrap_err();
remove("pause");
remove("async_pause");
tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
.unwrap();
handle.await.unwrap();

cfg("async_sleep", "sleep(500)").unwrap();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let handle = tokio::spawn(async move {
tx.send(()).await.unwrap();
async_fail_point!("async_sleep");
tx.send(()).await.unwrap();
});
rx.recv().await.unwrap();
tokio::time::timeout(Duration::from_millis(300), rx.recv())
.await
.unwrap_err();
tokio::time::timeout(Duration::from_millis(300), rx.recv())
.await
.unwrap();
handle.await.unwrap();
}
}

0 comments on commit 5d546cf

Please sign in to comment.