Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libshpool/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::time;

pub const SOCK_STREAM_TIMEOUT: time::Duration = time::Duration::from_millis(200);
pub const JOIN_POLL_DURATION: time::Duration = time::Duration::from_millis(100);
pub const JOIN_POLL_DURATION: time::Duration = time::Duration::from_millis(50);

pub const BUF_SIZE: usize = 1024 * 16;

Expand Down
27 changes: 21 additions & 6 deletions libshpool/src/daemon/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ const SUPERVISOR_POLL_DUR: time::Duration = time::Duration::from_millis(300);
// size.
const REATTACH_RESIZE_DELAY: time::Duration = time::Duration::from_millis(50);

// The shell->client thread should wake up relatively frequently so it can
// detect reattach, but we don't need to go crazy since reattach is not part of
// the inner loop.
const SHELL_TO_CLIENT_POLL_MS: u16 = 100;
// The shell->client thread should poll frequently so detach/reattach control
// messages are noticed quickly without spinning the CPU.
const SHELL_TO_CLIENT_POLL_MS: u16 = 50;

// How long to wait before giving up while trying to talk to the
// shell->client thread.
Expand Down Expand Up @@ -835,7 +834,11 @@ impl SessionInner {

use keybindings::Action::*;
match action {
Detach => self.action_detach()?,
Detach => {
self.action_detach()?;
debug!("exiting client->shell thread after detach");
return Ok(());
}
NoOp => {}
}
}
Expand Down Expand Up @@ -888,7 +891,19 @@ impl SessionInner {
return Ok(());
}

thread::sleep(consts::HEARTBEAT_DURATION);
let mut slept = time::Duration::ZERO;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's bundle this logic up into a helper function that takes a stop predicate function (in this case it would be parameterized with a closure that checks the stop atomic), a poll strategy enum, a sleep duration. It seems like a generically useful pattern. We could put it in libshpool/src/common.rs

The poll strategy enum can have one option for uniform polling with a given duration (which we would use here) and another for exponential backoff with the usual exponential backoff params (initial poll dur, backoff factor, max poll dur).

we could call it sleep_unless or something like that.

let sleep_step = consts::JOIN_POLL_DURATION;
while slept < consts::HEARTBEAT_DURATION {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than incrementing a counter of total duration slept, these sorts of loops should instead set a specific deadline in the future and always compare the current time against that. This avoid lock drift because thread::sleep(d) does not alwasy take exactly d. These little mismatches can add up over time. Not really a huge deal, but it doesn't hurt to be as precice as possible.

if stop.load(Ordering::Relaxed) {
info!("recvd stop msg");
return Ok(());
}

let remaining = consts::HEARTBEAT_DURATION - slept;
let step = if remaining < sleep_step { remaining } else { sleep_step };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets use min the way you do below. Also, we probably don't need an explicit named variable for remaining, we can just inline it into the min params for brevity.

thread::sleep(step);
slept += step;
}
{
let shell_to_client_ctl = self.shell_to_client_ctl.lock().unwrap();
match shell_to_client_ctl
Expand Down
67 changes: 52 additions & 15 deletions libshpool/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@ use std::{

use anyhow::{anyhow, Context};
use byteorder::{LittleEndian, ReadBytesExt as _, WriteBytesExt as _};
use nix::unistd::isatty;
use serde::{Deserialize, Serialize};
use shpool_protocol::{Chunk, ChunkKind, ConnectHeader, VersionHeader};
use tracing::{debug, error, info, instrument, span, trace, warn, Level};

use super::{consts, tty};

const JOIN_POLL_DUR: time::Duration = time::Duration::from_millis(100);
const JOIN_HANGUP_DUR: time::Duration = time::Duration::from_millis(300);
const DETACH_TTY_FAST_WAIT_DUR: time::Duration = time::Duration::from_millis(10);
const MAX_DETACH_WAIT_DUR: time::Duration = time::Duration::from_millis(300);
const DETACH_BACKOFF_INITIAL_DUR: time::Duration = time::Duration::from_millis(1);
// Cap backoff steps so slow-path stays responsive while still avoiding busy
// waits.
const DETACH_BACKOFF_MAX_STEP_DUR: time::Duration = time::Duration::from_millis(25);

/// The centralized encoding function that should be used for all protocol
/// serialization.
Expand All @@ -50,7 +55,7 @@ where
Ok(())
}

/// The centralized decoding focuntion that should be used for all protocol
/// The centralized decoding function that should be used for all protocol
/// deserialization.
pub fn decode_from<T, R>(r: R) -> anyhow::Result<T>
where
Expand Down Expand Up @@ -333,18 +338,49 @@ impl Client {
if sock_to_stdout_h.is_finished() {
nfinished_threads += 1;
}

if nfinished_threads > 0 {
if nfinished_threads < 2 {
thread::sleep(JOIN_HANGUP_DUR);
nfinished_threads = 0;
if stdin_to_sock_h.is_finished() {
info!("recheck: stdin->sock thread done");
nfinished_threads += 1;
}
if sock_to_stdout_h.is_finished() {
info!("recheck: sock->stdout thread done");
nfinished_threads += 1;
// Fast-path: when server->client already ended (detach/disconnect),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets keep the thread names consistant and call this sock->stdout

// stdin->sock can stay blocked on stdin. In that case, do a very
// short grace wait and then exit quickly.
// Slow-path: for other shutdown orders, keep compatibility by
// waiting up to 300ms with backoff.
let mut total_wait = time::Duration::ZERO;
let mut next_sleep = DETACH_BACKOFF_INITIAL_DUR;
let mut stdin_done = stdin_to_sock_h.is_finished();
let mut stdout_done = sock_to_stdout_h.is_finished();

let stdin_is_tty = isatty(io::stdin()).unwrap_or(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally like to avoid isatty when possible since it makes it harder to predict how a tool will work when running under a script. Sometimes it is worthwhile, but in this case I don't think it is worth having divergant behavior. I don't see a reason we would need to wait around longer in a script context, so let's just always use the short timeout if the daemon hangs up on us. We should re-name the constant to avoid mentioning TTY when we do this.

// Keep max_wait fixed for this detach sequence. Recomputing it inside
// the loop could accidentally switch paths mid-cleanup.
let max_wait = if stdin_is_tty && stdout_done && !stdin_done {
DETACH_TTY_FAST_WAIT_DUR
} else {
MAX_DETACH_WAIT_DUR
};

loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop can switch to using the helper I suggested above, with its stop predicate computing nfinished_threads and checking if the count is >= 2.

stdin_done = stdin_to_sock_h.is_finished();
stdout_done = sock_to_stdout_h.is_finished();
nfinished_threads = (stdin_done as usize) + (stdout_done as usize);

if nfinished_threads >= 2 {
break;
}
if total_wait >= max_wait {
break;
}

let remaining = max_wait - total_wait;
let sleep_for = cmp::min(next_sleep, remaining);
thread::sleep(sleep_for);
total_wait += sleep_for;
// Exponential backoff with a capped step to avoid busy waits.
next_sleep =
cmp::min(next_sleep + next_sleep, DETACH_BACKOFF_MAX_STEP_DUR);
}

if nfinished_threads < 2 {
// If one of the worker threads is done and the
// other is not exiting, we are likely blocked on
Expand All @@ -355,8 +391,8 @@ impl Client {
// us to use simple blocking IO.
warn!(
"exiting due to a stuck IO thread stdin_to_sock_finished={} sock_to_stdout_finished={}",
stdin_to_sock_h.is_finished(),
sock_to_stdout_h.is_finished()
stdin_done,
stdout_done
);
// make sure that we restore the tty flags on the input
// tty before exiting the process.
Expand All @@ -367,7 +403,8 @@ impl Client {
}
break;
}
thread::sleep(JOIN_POLL_DUR);

thread::sleep(consts::JOIN_POLL_DURATION);
}

match stdin_to_sock_h.join() {
Expand Down
Loading