Skip to content

Commit 28f7a7b

Browse files
authored
Merge pull request #11 from lambdaclass/blocking_news
feature: added blocking start for genservers
2 parents 0d1c4bb + 73396b5 commit 28f7a7b

File tree

9 files changed

+330
-3
lines changed

9 files changed

+330
-3
lines changed

Cargo.lock

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ members = [
88
"examples/ping_pong",
99
"examples/ping_pong_threads",
1010
"examples/updater",
11-
"examples/updater_threads",
11+
"examples/updater_threads", "examples/blocking_genserver",
1212
]
1313

1414
[workspace.dependencies]

concurrency/src/tasks/gen_server.rs

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,26 @@ impl<G: GenServer> GenServerHandle<G> {
3838
handle_clone
3939
}
4040

41+
pub(crate) fn new_blocking(mut initial_state: G::State) -> Self {
42+
let (tx, mut rx) = mpsc::channel::<GenServerInMsg<G>>();
43+
let handle = GenServerHandle { tx };
44+
let mut gen_server: G = GenServer::new();
45+
let handle_clone = handle.clone();
46+
// Ignore the JoinHandle for now. Maybe we'll use it in the future
47+
let _join_handle = rt::spawn_blocking(|| {
48+
rt::block_on(async move {
49+
if gen_server
50+
.run(&handle, &mut rx, &mut initial_state)
51+
.await
52+
.is_err()
53+
{
54+
tracing::trace!("GenServer crashed")
55+
};
56+
})
57+
});
58+
handle_clone
59+
}
60+
4161
pub fn sender(&self) -> mpsc::Sender<GenServerInMsg<G>> {
4262
self.tx.clone()
4363
}
@@ -97,6 +117,15 @@ where
97117
GenServerHandle::new(initial_state)
98118
}
99119

120+
/// Tokio tasks depend on a coolaborative multitasking model. "work stealing" can't
121+
/// happen if the task is blocking the thread. As such, for sync compute task
122+
/// or other blocking tasks need to be in their own separate thread, and the OS
123+
/// will manage them through hardware interrupts.
124+
/// Start blocking provides such thread.
125+
fn start_blocking(initial_state: Self::State) -> GenServerHandle<Self> {
126+
GenServerHandle::new_blocking(initial_state)
127+
}
128+
100129
fn run(
101130
&mut self,
102131
handle: &GenServerHandle<Self>,
@@ -199,3 +228,136 @@ where
199228
state: &mut Self::State,
200229
) -> impl std::future::Future<Output = CastResponse> + Send;
201230
}
231+
232+
#[cfg(test)]
233+
mod tests {
234+
use super::*;
235+
use crate::tasks::send_after;
236+
use std::{process::exit, thread, time::Duration};
237+
struct BadlyBehavedTask;
238+
239+
#[derive(Clone)]
240+
pub enum InMessage {
241+
GetCount,
242+
Stop,
243+
}
244+
#[derive(Clone)]
245+
pub enum OutMsg {
246+
Count(u64),
247+
}
248+
249+
impl GenServer for BadlyBehavedTask {
250+
type CallMsg = InMessage;
251+
type CastMsg = ();
252+
type OutMsg = ();
253+
type State = ();
254+
type Error = ();
255+
256+
fn new() -> Self {
257+
Self {}
258+
}
259+
260+
async fn handle_call(
261+
&mut self,
262+
_: Self::CallMsg,
263+
_: &GenServerHandle<Self>,
264+
_: &mut Self::State,
265+
) -> CallResponse<Self::OutMsg> {
266+
CallResponse::Stop(())
267+
}
268+
269+
async fn handle_cast(
270+
&mut self,
271+
_: Self::CastMsg,
272+
_: &GenServerHandle<Self>,
273+
_: &mut Self::State,
274+
) -> CastResponse {
275+
rt::sleep(Duration::from_millis(20)).await;
276+
thread::sleep(Duration::from_secs(2));
277+
CastResponse::Stop
278+
}
279+
}
280+
281+
struct WellBehavedTask;
282+
283+
#[derive(Clone)]
284+
struct CountState {
285+
pub count: u64,
286+
}
287+
288+
impl GenServer for WellBehavedTask {
289+
type CallMsg = InMessage;
290+
type CastMsg = ();
291+
type OutMsg = OutMsg;
292+
type State = CountState;
293+
type Error = ();
294+
295+
fn new() -> Self {
296+
Self {}
297+
}
298+
299+
async fn handle_call(
300+
&mut self,
301+
message: Self::CallMsg,
302+
_: &GenServerHandle<Self>,
303+
state: &mut Self::State,
304+
) -> CallResponse<Self::OutMsg> {
305+
match message {
306+
InMessage::GetCount => CallResponse::Reply(OutMsg::Count(state.count)),
307+
InMessage::Stop => CallResponse::Stop(OutMsg::Count(state.count)),
308+
}
309+
}
310+
311+
async fn handle_cast(
312+
&mut self,
313+
_: Self::CastMsg,
314+
handle: &GenServerHandle<Self>,
315+
state: &mut Self::State,
316+
) -> CastResponse {
317+
state.count += 1;
318+
println!("{:?}: good still alive", thread::current().id());
319+
send_after(Duration::from_millis(100), handle.to_owned(), ());
320+
CastResponse::NoReply
321+
}
322+
}
323+
324+
#[test]
325+
pub fn badly_behaved_thread_non_blocking() {
326+
let runtime = rt::Runtime::new().unwrap();
327+
runtime.block_on(async move {
328+
let mut badboy = BadlyBehavedTask::start(());
329+
let _ = badboy.cast(()).await;
330+
let mut goodboy = WellBehavedTask::start(CountState { count: 0 });
331+
let _ = goodboy.cast(()).await;
332+
rt::sleep(Duration::from_secs(1)).await;
333+
let count = goodboy.call(InMessage::GetCount).await.unwrap();
334+
335+
match count {
336+
OutMsg::Count(num) => {
337+
assert_ne!(num, 10);
338+
}
339+
}
340+
goodboy.call(InMessage::Stop).await.unwrap();
341+
});
342+
}
343+
344+
#[test]
345+
pub fn badly_behaved_thread() {
346+
let runtime = rt::Runtime::new().unwrap();
347+
runtime.block_on(async move {
348+
let mut badboy = BadlyBehavedTask::start_blocking(());
349+
let _ = badboy.cast(()).await;
350+
let mut goodboy = WellBehavedTask::start(CountState { count: 0 });
351+
let _ = goodboy.cast(()).await;
352+
rt::sleep(Duration::from_secs(1)).await;
353+
let count = goodboy.call(InMessage::GetCount).await.unwrap();
354+
355+
match count {
356+
OutMsg::Count(num) => {
357+
assert_eq!(num, 10);
358+
}
359+
}
360+
goodboy.call(InMessage::Stop).await.unwrap();
361+
});
362+
}
363+
}

concurrency/src/threads/gen_server.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ where
9898
GenServerHandle::new(initial_state)
9999
}
100100

101+
/// We copy the same interface as tasks, but all threads can work
102+
/// while blocking by default
103+
fn start_blocking(initial_state: Self::State) -> GenServerHandle<Self> {
104+
GenServerHandle::new(initial_state)
105+
}
106+
101107
fn run(
102108
&mut self,
103109
handle: &GenServerHandle<Self>,
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "blocking_genserver"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
spawned-rt = { workspace = true }
8+
spawned-concurrency = { workspace = true }
9+
tracing = { workspace = true }
10+
11+
[[bin]]
12+
name = "blocking_genserver"
13+
path = "main.rs"
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
use spawned_rt::tasks as rt;
2+
use std::time::Duration;
3+
use std::{process::exit, thread};
4+
5+
use spawned_concurrency::tasks::{
6+
CallResponse, CastResponse, GenServer, GenServerHandle, send_after,
7+
};
8+
9+
// We test a scenario with a badly behaved task
10+
struct BadlyBehavedTask;
11+
12+
#[derive(Clone)]
13+
pub enum InMessage {
14+
GetCount,
15+
Stop,
16+
}
17+
#[derive(Clone)]
18+
pub enum OutMsg {
19+
Count(u64),
20+
}
21+
22+
impl GenServer for BadlyBehavedTask {
23+
type CallMsg = InMessage;
24+
type CastMsg = ();
25+
type OutMsg = ();
26+
type State = ();
27+
type Error = ();
28+
29+
fn new() -> Self {
30+
Self {}
31+
}
32+
33+
async fn handle_call(
34+
&mut self,
35+
_: Self::CallMsg,
36+
_: &GenServerHandle<Self>,
37+
_: &mut Self::State,
38+
) -> CallResponse<Self::OutMsg> {
39+
CallResponse::Stop(())
40+
}
41+
42+
async fn handle_cast(
43+
&mut self,
44+
_: Self::CastMsg,
45+
_: &GenServerHandle<Self>,
46+
_: &mut Self::State,
47+
) -> CastResponse {
48+
rt::sleep(Duration::from_millis(20)).await;
49+
loop {
50+
println!("{:?}: bad still alive", thread::current().id());
51+
thread::sleep(Duration::from_millis(50));
52+
}
53+
}
54+
}
55+
56+
struct WellBehavedTask;
57+
58+
#[derive(Clone)]
59+
struct CountState {
60+
pub count: u64,
61+
}
62+
63+
impl GenServer for WellBehavedTask {
64+
type CallMsg = InMessage;
65+
type CastMsg = ();
66+
type OutMsg = OutMsg;
67+
type State = CountState;
68+
type Error = ();
69+
70+
fn new() -> Self {
71+
Self {}
72+
}
73+
74+
async fn handle_call(
75+
&mut self,
76+
message: Self::CallMsg,
77+
_: &GenServerHandle<Self>,
78+
state: &mut Self::State,
79+
) -> CallResponse<Self::OutMsg> {
80+
match message {
81+
InMessage::GetCount => CallResponse::Reply(OutMsg::Count(state.count)),
82+
InMessage::Stop => CallResponse::Stop(OutMsg::Count(state.count)),
83+
}
84+
}
85+
86+
async fn handle_cast(
87+
&mut self,
88+
_: Self::CastMsg,
89+
handle: &GenServerHandle<Self>,
90+
state: &mut Self::State,
91+
) -> CastResponse {
92+
state.count += 1;
93+
println!("{:?}: good still alive", thread::current().id());
94+
send_after(Duration::from_millis(100), handle.to_owned(), ());
95+
CastResponse::NoReply
96+
}
97+
}
98+
99+
/// Example of start_blocking to fix issues #8 https://github.com/lambdaclass/spawned/issues/8
100+
/// Tasks that block can block the entire tokio runtime (and other cooperative multitasking models)
101+
/// To fix this we implement start_blocking, which under the hood launches a new thread to deal with the issue
102+
pub fn main() {
103+
rt::run(async move {
104+
// If we change BadlyBehavedTask to start instead, it can stop the entire program
105+
let mut badboy = BadlyBehavedTask::start_blocking(());
106+
let _ = badboy.cast(()).await;
107+
let mut goodboy = WellBehavedTask::start(CountState { count: 0 });
108+
let _ = goodboy.cast(()).await;
109+
rt::sleep(Duration::from_secs(1)).await;
110+
let count = goodboy.call(InMessage::GetCount).await.unwrap();
111+
112+
match count {
113+
OutMsg::Count(num) => {
114+
assert!(num == 10);
115+
}
116+
}
117+
118+
goodboy.call(InMessage::Stop).await.unwrap();
119+
exit(0);
120+
})
121+
}

rt/src/tasks/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@
99
1010
mod tokio;
1111

12+
use ::tokio::runtime::Handle;
13+
1214
use crate::tracing::init_tracing;
1315

1416
pub use crate::tasks::tokio::mpsc;
1517
pub use crate::tasks::tokio::oneshot;
1618
pub use crate::tasks::tokio::sleep;
17-
pub use crate::tasks::tokio::{spawn, JoinHandle, Runtime};
19+
pub use crate::tasks::tokio::{spawn, spawn_blocking, JoinHandle, Runtime};
1820
use std::future::Future;
1921

2022
pub fn run<F: Future>(future: F) -> F::Output {
@@ -23,3 +25,7 @@ pub fn run<F: Future>(future: F) -> F::Output {
2325
let rt = Runtime::new().unwrap();
2426
rt.block_on(future)
2527
}
28+
29+
pub fn block_on<F: Future>(future: F) -> F::Output {
30+
Handle::current().block_on(future)
31+
}

rt/src/tasks/tokio/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod oneshot;
44

55
pub use tokio::{
66
runtime::Runtime,
7-
task::{spawn, JoinHandle},
7+
task::{spawn, spawn_blocking, JoinHandle},
88
time::sleep,
9+
test,
910
};

rt/src/threads/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,12 @@ pub fn block_on<F: Future>(future: F) -> F::Output {
2020
let rt = Runtime::new().unwrap();
2121
rt.block_on(future)
2222
}
23+
24+
/// Spawn blocking is the same as spawn for pure threaded usage.
25+
pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
26+
where
27+
F: FnOnce() -> R + Send + 'static,
28+
R: Send + 'static,
29+
{
30+
spawn(f)
31+
}

0 commit comments

Comments
 (0)