Skip to content

Commit 481439d

Browse files
authored
Merge pull request #13 from lambdaclass/send_interval
Added send_interval and cancellation behavior
2 parents 52963da + 928c99f commit 481439d

File tree

17 files changed

+637
-47
lines changed

17 files changed

+637
-47
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ members = [
88
"examples/ping_pong",
99
"examples/ping_pong_threads",
1010
"examples/updater",
11-
"examples/updater_threads", "examples/blocking_genserver",
11+
"examples/updater_threads",
12+
"examples/blocking_genserver",
1213
]
1314

1415
[workspace.dependencies]

concurrency/src/tasks/gen_server.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ pub trait GenServer
105105
where
106106
Self: Send + Sized,
107107
{
108-
type CallMsg: Send + Sized;
109-
type CastMsg: Send + Sized;
108+
type CallMsg: Clone + Send + Sized + Sync;
109+
type CastMsg: Clone + Send + Sized + Sync;
110110
type OutMsg: Send + Sized;
111111
type State: Clone + Send;
112112
type Error: Debug + Send;
@@ -259,6 +259,7 @@ where
259259

260260
#[cfg(test)]
261261
mod tests {
262+
262263
use super::*;
263264
use crate::tasks::send_after;
264265
use std::{thread, time::Duration};

concurrency/src/tasks/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ mod gen_server;
66
mod process;
77
mod time;
88

9+
#[cfg(test)]
10+
mod timer_tests;
11+
912
pub use error::GenServerError;
1013
pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg};
1114
pub use process::{send, Process, ProcessInfo};
12-
pub use time::send_after;
15+
pub use time::{send_after, send_interval};

concurrency/src/tasks/time.rs

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,72 @@
1+
use futures::future::select;
12
use std::time::Duration;
23

3-
use spawned_rt::tasks::{self as rt, JoinHandle};
4+
use spawned_rt::tasks::{self as rt, CancellationToken, JoinHandle};
45

56
use super::{GenServer, GenServerHandle};
67

8+
pub struct TimerHandle {
9+
pub join_handle: JoinHandle<()>,
10+
pub cancellation_token: CancellationToken,
11+
}
12+
713
// Sends a message after a given period to the specified GenServer. The task terminates
814
// once the send has completed
915
pub fn send_after<T>(
1016
period: Duration,
1117
mut handle: GenServerHandle<T>,
1218
message: T::CastMsg,
13-
) -> JoinHandle<()>
19+
) -> TimerHandle
20+
where
21+
T: GenServer + 'static,
22+
{
23+
let cancellation_token = CancellationToken::new();
24+
let cloned_token = cancellation_token.clone();
25+
let join_handle = rt::spawn(async move {
26+
let _ = select(
27+
Box::pin(cloned_token.cancelled()),
28+
Box::pin(async {
29+
rt::sleep(period).await;
30+
let _ = handle.cast(message.clone()).await;
31+
}),
32+
)
33+
.await;
34+
});
35+
TimerHandle {
36+
join_handle,
37+
cancellation_token,
38+
}
39+
}
40+
41+
// Sends a message to the specified GenServe repeatedly after `Time` milliseconds.
42+
pub fn send_interval<T>(
43+
period: Duration,
44+
mut handle: GenServerHandle<T>,
45+
message: T::CastMsg,
46+
) -> TimerHandle
1447
where
1548
T: GenServer + 'static,
1649
{
17-
rt::spawn(async move {
18-
rt::sleep(period).await;
19-
let _ = handle.cast(message).await;
20-
})
50+
let cancellation_token = CancellationToken::new();
51+
let cloned_token = cancellation_token.clone();
52+
let join_handle = rt::spawn(async move {
53+
loop {
54+
let result = select(
55+
Box::pin(cloned_token.cancelled()),
56+
Box::pin(async {
57+
rt::sleep(period).await;
58+
let _ = handle.cast(message.clone()).await;
59+
}),
60+
)
61+
.await;
62+
match result {
63+
futures::future::Either::Left(_) => break,
64+
futures::future::Either::Right(_) => (),
65+
}
66+
}
67+
});
68+
TimerHandle {
69+
join_handle,
70+
cancellation_token,
71+
}
2172
}
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
use crate::tasks::{send_interval, CallResponse, CastResponse, GenServer, GenServerHandle};
2+
use spawned_rt::tasks::{self as rt, CancellationToken};
3+
use std::time::Duration;
4+
5+
use super::send_after;
6+
7+
type RepeaterHandle = GenServerHandle<Repeater>;
8+
9+
#[derive(Clone)]
10+
struct RepeaterState {
11+
pub(crate) count: i32,
12+
pub(crate) cancellation_token: Option<CancellationToken>,
13+
}
14+
15+
#[derive(Clone)]
16+
enum RepeaterCastMessage {
17+
Inc,
18+
StopTimer,
19+
}
20+
21+
#[derive(Clone)]
22+
enum RepeaterCallMessage {
23+
GetCount,
24+
}
25+
26+
#[derive(PartialEq, Debug)]
27+
enum RepeaterOutMessage {
28+
Count(i32),
29+
}
30+
31+
struct Repeater;
32+
33+
impl Repeater {
34+
pub async fn stop_timer(server: &mut RepeaterHandle) -> Result<(), ()> {
35+
server
36+
.cast(RepeaterCastMessage::StopTimer)
37+
.await
38+
.map_err(|_| ())
39+
}
40+
41+
pub async fn get_count(server: &mut RepeaterHandle) -> Result<RepeaterOutMessage, ()> {
42+
server
43+
.call(RepeaterCallMessage::GetCount)
44+
.await
45+
.map_err(|_| ())
46+
}
47+
}
48+
49+
impl GenServer for Repeater {
50+
type CallMsg = RepeaterCallMessage;
51+
type CastMsg = RepeaterCastMessage;
52+
type OutMsg = RepeaterOutMessage;
53+
type State = RepeaterState;
54+
type Error = ();
55+
56+
fn new() -> Self {
57+
Self
58+
}
59+
60+
async fn init(
61+
&mut self,
62+
handle: &RepeaterHandle,
63+
mut state: Self::State,
64+
) -> Result<Self::State, Self::Error> {
65+
let timer = send_interval(
66+
Duration::from_millis(100),
67+
handle.clone(),
68+
RepeaterCastMessage::Inc,
69+
);
70+
state.cancellation_token = Some(timer.cancellation_token);
71+
Ok(state)
72+
}
73+
74+
async fn handle_call(
75+
&mut self,
76+
_message: Self::CallMsg,
77+
_handle: &RepeaterHandle,
78+
state: Self::State,
79+
) -> CallResponse<Self> {
80+
let count = state.count;
81+
CallResponse::Reply(state, RepeaterOutMessage::Count(count))
82+
}
83+
84+
async fn handle_cast(
85+
&mut self,
86+
message: Self::CastMsg,
87+
_handle: &GenServerHandle<Self>,
88+
mut state: Self::State,
89+
) -> CastResponse<Self> {
90+
match message {
91+
RepeaterCastMessage::Inc => {
92+
state.count += 1;
93+
}
94+
RepeaterCastMessage::StopTimer => {
95+
if let Some(ct) = state.cancellation_token.clone() {
96+
ct.cancel()
97+
};
98+
}
99+
};
100+
CastResponse::NoReply(state)
101+
}
102+
}
103+
104+
#[test]
105+
pub fn test_send_interval_and_cancellation() {
106+
let runtime = rt::Runtime::new().unwrap();
107+
runtime.block_on(async move {
108+
// Start a Repeater
109+
let mut repeater = Repeater::start(RepeaterState {
110+
count: 0,
111+
cancellation_token: None,
112+
});
113+
114+
// Wait for 1 second
115+
rt::sleep(Duration::from_secs(1)).await;
116+
117+
// Check count
118+
let count = Repeater::get_count(&mut repeater).await.unwrap();
119+
120+
// 9 messages in 1 second (after first 100 milliseconds sleep)
121+
assert_eq!(RepeaterOutMessage::Count(9), count);
122+
123+
// Pause timer
124+
Repeater::stop_timer(&mut repeater).await.unwrap();
125+
126+
// Wait another second
127+
rt::sleep(Duration::from_secs(1)).await;
128+
129+
// Check count again
130+
let count2 = Repeater::get_count(&mut repeater).await.unwrap();
131+
132+
// As timer was paused, count should remain at 9
133+
assert_eq!(RepeaterOutMessage::Count(9), count2);
134+
});
135+
}
136+
137+
type DelayedHandle = GenServerHandle<Delayed>;
138+
139+
#[derive(Clone)]
140+
struct DelayedState {
141+
pub(crate) count: i32,
142+
}
143+
144+
#[derive(Clone)]
145+
enum DelayedCastMessage {
146+
Inc,
147+
}
148+
149+
#[derive(Clone)]
150+
enum DelayedCallMessage {
151+
GetCount,
152+
}
153+
154+
#[derive(PartialEq, Debug)]
155+
enum DelayedOutMessage {
156+
Count(i32),
157+
}
158+
159+
struct Delayed;
160+
161+
impl Delayed {
162+
pub async fn get_count(server: &mut DelayedHandle) -> Result<DelayedOutMessage, ()> {
163+
server
164+
.call(DelayedCallMessage::GetCount)
165+
.await
166+
.map_err(|_| ())
167+
}
168+
}
169+
170+
impl GenServer for Delayed {
171+
type CallMsg = DelayedCallMessage;
172+
type CastMsg = DelayedCastMessage;
173+
type OutMsg = DelayedOutMessage;
174+
type State = DelayedState;
175+
type Error = ();
176+
177+
fn new() -> Self {
178+
Self
179+
}
180+
181+
async fn handle_call(
182+
&mut self,
183+
_message: Self::CallMsg,
184+
_handle: &DelayedHandle,
185+
state: Self::State,
186+
) -> CallResponse<Self> {
187+
let count = state.count;
188+
CallResponse::Reply(state, DelayedOutMessage::Count(count))
189+
}
190+
191+
async fn handle_cast(
192+
&mut self,
193+
message: Self::CastMsg,
194+
_handle: &DelayedHandle,
195+
mut state: Self::State,
196+
) -> CastResponse<Self> {
197+
match message {
198+
DelayedCastMessage::Inc => {
199+
state.count += 1;
200+
}
201+
};
202+
CastResponse::NoReply(state)
203+
}
204+
}
205+
206+
#[test]
207+
pub fn test_send_after_and_cancellation() {
208+
let runtime = rt::Runtime::new().unwrap();
209+
runtime.block_on(async move {
210+
// Start a Delayed
211+
let mut repeater = Delayed::start(DelayedState { count: 0 });
212+
213+
// Set a just once timed message
214+
let _ = send_after(
215+
Duration::from_millis(100),
216+
repeater.clone(),
217+
DelayedCastMessage::Inc,
218+
);
219+
220+
// Wait for 200 milliseconds
221+
rt::sleep(Duration::from_millis(200)).await;
222+
223+
// Check count
224+
let count = Delayed::get_count(&mut repeater).await.unwrap();
225+
226+
// Only one message (no repetition)
227+
assert_eq!(DelayedOutMessage::Count(1), count);
228+
229+
// New timer
230+
let timer = send_after(
231+
Duration::from_millis(100),
232+
repeater.clone(),
233+
DelayedCastMessage::Inc,
234+
);
235+
236+
// Cancel the new timer before timeout
237+
timer.cancellation_token.cancel();
238+
239+
// Wait another 200 milliseconds
240+
rt::sleep(Duration::from_millis(200)).await;
241+
242+
// Check count again
243+
let count2 = Delayed::get_count(&mut repeater).await.unwrap();
244+
245+
// As timer was cancelled, count should remain at 1
246+
assert_eq!(DelayedOutMessage::Count(1), count2);
247+
});
248+
}

concurrency/src/threads/gen_server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ pub trait GenServer
8383
where
8484
Self: Send + Sized,
8585
{
86-
type CallMsg: Send + Sized;
87-
type CastMsg: Send + Sized;
86+
type CallMsg: Clone + Send + Sized;
87+
type CastMsg: Clone + Send + Sized;
8888
type OutMsg: Send + Sized;
8989
type State: Clone + Send;
9090
type Error: Debug;

concurrency/src/threads/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ mod gen_server;
66
mod process;
77
mod time;
88

9+
#[cfg(test)]
10+
mod timer_tests;
11+
912
pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg};
1013
pub use process::{send, Process, ProcessInfo};
11-
pub use time::send_after;
14+
pub use time::{send_after, send_interval};

0 commit comments

Comments
 (0)