Skip to content

Commit 928c99f

Browse files
committed
Added send_after and send_interval tests
1 parent b39e8d5 commit 928c99f

File tree

6 files changed

+500
-4
lines changed

6 files changed

+500
-4
lines changed

concurrency/src/tasks/gen_server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ where
259259

260260
#[cfg(test)]
261261
mod tests {
262+
262263
use super::*;
263264
use crate::tasks::send_after;
264265
use std::{thread, time::Duration};

concurrency/src/tasks/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ mod gen_server;
66
mod process;
77
mod time;
88

9+
#[cfg(test)]
10+
mod timer_tests;
11+
912
pub use error::GenServerError;
1013
pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg};
1114
pub use process::{send, Process, ProcessInfo};
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
use crate::tasks::{send_interval, CallResponse, CastResponse, GenServer, GenServerHandle};
2+
use spawned_rt::tasks::{self as rt, CancellationToken};
3+
use std::time::Duration;
4+
5+
use super::send_after;
6+
7+
type RepeaterHandle = GenServerHandle<Repeater>;
8+
9+
#[derive(Clone)]
10+
struct RepeaterState {
11+
pub(crate) count: i32,
12+
pub(crate) cancellation_token: Option<CancellationToken>,
13+
}
14+
15+
#[derive(Clone)]
16+
enum RepeaterCastMessage {
17+
Inc,
18+
StopTimer,
19+
}
20+
21+
#[derive(Clone)]
22+
enum RepeaterCallMessage {
23+
GetCount,
24+
}
25+
26+
#[derive(PartialEq, Debug)]
27+
enum RepeaterOutMessage {
28+
Count(i32),
29+
}
30+
31+
struct Repeater;
32+
33+
impl Repeater {
34+
pub async fn stop_timer(server: &mut RepeaterHandle) -> Result<(), ()> {
35+
server
36+
.cast(RepeaterCastMessage::StopTimer)
37+
.await
38+
.map_err(|_| ())
39+
}
40+
41+
pub async fn get_count(server: &mut RepeaterHandle) -> Result<RepeaterOutMessage, ()> {
42+
server
43+
.call(RepeaterCallMessage::GetCount)
44+
.await
45+
.map_err(|_| ())
46+
}
47+
}
48+
49+
impl GenServer for Repeater {
50+
type CallMsg = RepeaterCallMessage;
51+
type CastMsg = RepeaterCastMessage;
52+
type OutMsg = RepeaterOutMessage;
53+
type State = RepeaterState;
54+
type Error = ();
55+
56+
fn new() -> Self {
57+
Self
58+
}
59+
60+
async fn init(
61+
&mut self,
62+
handle: &RepeaterHandle,
63+
mut state: Self::State,
64+
) -> Result<Self::State, Self::Error> {
65+
let timer = send_interval(
66+
Duration::from_millis(100),
67+
handle.clone(),
68+
RepeaterCastMessage::Inc,
69+
);
70+
state.cancellation_token = Some(timer.cancellation_token);
71+
Ok(state)
72+
}
73+
74+
async fn handle_call(
75+
&mut self,
76+
_message: Self::CallMsg,
77+
_handle: &RepeaterHandle,
78+
state: Self::State,
79+
) -> CallResponse<Self> {
80+
let count = state.count;
81+
CallResponse::Reply(state, RepeaterOutMessage::Count(count))
82+
}
83+
84+
async fn handle_cast(
85+
&mut self,
86+
message: Self::CastMsg,
87+
_handle: &GenServerHandle<Self>,
88+
mut state: Self::State,
89+
) -> CastResponse<Self> {
90+
match message {
91+
RepeaterCastMessage::Inc => {
92+
state.count += 1;
93+
}
94+
RepeaterCastMessage::StopTimer => {
95+
if let Some(ct) = state.cancellation_token.clone() {
96+
ct.cancel()
97+
};
98+
}
99+
};
100+
CastResponse::NoReply(state)
101+
}
102+
}
103+
104+
#[test]
105+
pub fn test_send_interval_and_cancellation() {
106+
let runtime = rt::Runtime::new().unwrap();
107+
runtime.block_on(async move {
108+
// Start a Repeater
109+
let mut repeater = Repeater::start(RepeaterState {
110+
count: 0,
111+
cancellation_token: None,
112+
});
113+
114+
// Wait for 1 second
115+
rt::sleep(Duration::from_secs(1)).await;
116+
117+
// Check count
118+
let count = Repeater::get_count(&mut repeater).await.unwrap();
119+
120+
// 9 messages in 1 second (after first 100 milliseconds sleep)
121+
assert_eq!(RepeaterOutMessage::Count(9), count);
122+
123+
// Pause timer
124+
Repeater::stop_timer(&mut repeater).await.unwrap();
125+
126+
// Wait another second
127+
rt::sleep(Duration::from_secs(1)).await;
128+
129+
// Check count again
130+
let count2 = Repeater::get_count(&mut repeater).await.unwrap();
131+
132+
// As timer was paused, count should remain at 9
133+
assert_eq!(RepeaterOutMessage::Count(9), count2);
134+
});
135+
}
136+
137+
type DelayedHandle = GenServerHandle<Delayed>;
138+
139+
#[derive(Clone)]
140+
struct DelayedState {
141+
pub(crate) count: i32,
142+
}
143+
144+
#[derive(Clone)]
145+
enum DelayedCastMessage {
146+
Inc,
147+
}
148+
149+
#[derive(Clone)]
150+
enum DelayedCallMessage {
151+
GetCount,
152+
}
153+
154+
#[derive(PartialEq, Debug)]
155+
enum DelayedOutMessage {
156+
Count(i32),
157+
}
158+
159+
struct Delayed;
160+
161+
impl Delayed {
162+
pub async fn get_count(server: &mut DelayedHandle) -> Result<DelayedOutMessage, ()> {
163+
server
164+
.call(DelayedCallMessage::GetCount)
165+
.await
166+
.map_err(|_| ())
167+
}
168+
}
169+
170+
impl GenServer for Delayed {
171+
type CallMsg = DelayedCallMessage;
172+
type CastMsg = DelayedCastMessage;
173+
type OutMsg = DelayedOutMessage;
174+
type State = DelayedState;
175+
type Error = ();
176+
177+
fn new() -> Self {
178+
Self
179+
}
180+
181+
async fn handle_call(
182+
&mut self,
183+
_message: Self::CallMsg,
184+
_handle: &DelayedHandle,
185+
state: Self::State,
186+
) -> CallResponse<Self> {
187+
let count = state.count;
188+
CallResponse::Reply(state, DelayedOutMessage::Count(count))
189+
}
190+
191+
async fn handle_cast(
192+
&mut self,
193+
message: Self::CastMsg,
194+
_handle: &DelayedHandle,
195+
mut state: Self::State,
196+
) -> CastResponse<Self> {
197+
match message {
198+
DelayedCastMessage::Inc => {
199+
state.count += 1;
200+
}
201+
};
202+
CastResponse::NoReply(state)
203+
}
204+
}
205+
206+
#[test]
207+
pub fn test_send_after_and_cancellation() {
208+
let runtime = rt::Runtime::new().unwrap();
209+
runtime.block_on(async move {
210+
// Start a Delayed
211+
let mut repeater = Delayed::start(DelayedState { count: 0 });
212+
213+
// Set a just once timed message
214+
let _ = send_after(
215+
Duration::from_millis(100),
216+
repeater.clone(),
217+
DelayedCastMessage::Inc,
218+
);
219+
220+
// Wait for 200 milliseconds
221+
rt::sleep(Duration::from_millis(200)).await;
222+
223+
// Check count
224+
let count = Delayed::get_count(&mut repeater).await.unwrap();
225+
226+
// Only one message (no repetition)
227+
assert_eq!(DelayedOutMessage::Count(1), count);
228+
229+
// New timer
230+
let timer = send_after(
231+
Duration::from_millis(100),
232+
repeater.clone(),
233+
DelayedCastMessage::Inc,
234+
);
235+
236+
// Cancel the new timer before timeout
237+
timer.cancellation_token.cancel();
238+
239+
// Wait another 200 milliseconds
240+
rt::sleep(Duration::from_millis(200)).await;
241+
242+
// Check count again
243+
let count2 = Delayed::get_count(&mut repeater).await.unwrap();
244+
245+
// As timer was cancelled, count should remain at 1
246+
assert_eq!(DelayedOutMessage::Count(1), count2);
247+
});
248+
}

concurrency/src/threads/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ mod gen_server;
66
mod process;
77
mod time;
88

9+
#[cfg(test)]
10+
mod timer_tests;
11+
912
pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg};
1013
pub use process::{send, Process, ProcessInfo};
1114
pub use time::{send_after, send_interval};

concurrency/src/threads/time.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,22 @@ pub fn send_after<T>(
1515
period: Duration,
1616
mut handle: GenServerHandle<T>,
1717
message: T::CastMsg,
18-
) -> JoinHandle<()>
18+
) -> TimerHandle
1919
where
2020
T: GenServer + 'static,
2121
{
22-
rt::spawn(move || {
22+
let cancellation_token = CancellationToken::new();
23+
let mut cloned_token = cancellation_token.clone();
24+
let join_handle = rt::spawn(move || {
2325
rt::sleep(period);
24-
let _ = handle.cast(message);
25-
})
26+
if !cloned_token.is_cancelled() {
27+
let _ = handle.cast(message);
28+
};
29+
});
30+
TimerHandle {
31+
join_handle,
32+
cancellation_token,
33+
}
2634
}
2735

2836
// Sends a message to the specified GenServe repeatedly after `Time` milliseconds.

0 commit comments

Comments
 (0)