Skip to content

Commit

Permalink
Merge pull request #547 from Nukesor/internal-refactor
Browse files Browse the repository at this point in the history
Internal refactor
  • Loading branch information
Nukesor authored Jun 23, 2024
2 parents a97448d + 0d9d5e3 commit a9ed6f7
Show file tree
Hide file tree
Showing 52 changed files with 1,381 additions and 1,496 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
target/
lib/target/

# Perf files
flamegraph.svg
perf.data

# These are backup files generated by rustfmt
*.rs.bk

Expand Down
20 changes: 19 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,25 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## \[Unreleased\]
## \[4.0.0\] - unreleased

This release introduces a new major version, as it includes a major internal refactoring.

That refactoring corrects an old architectural design decision to have the subprocess state live in dedicated thread.
This design forced client commands that directly affected processes, such as `pueue start --immediate`, to be forwarded to that thread via an `mpsc` channel.
That thread would then check for new `mpsc` messages in a loop and eventually execute that command.
This design resulted in short delays until those commands would actually take effect, which was problematic during testing or scripting.

The new design fixes this issue and moves all process state into the global shared state (behind a Mutex), which allows Pueue to do process manipulation directly inside of the client message handlers.
Furthermore, this change makes Pueue better suited to be scripted, as it effectively eliminates the need to call `pueue wait` in certain scenarios. The focus of Pueue, however, lies still on human interaction.

Even though this refactoring significantly simplified the code, it also introduced a few mean and subtle bugs. Large parts of the internal state handling have been changed after all. Hopefully most have been caught by Pueue's extensive test suite, but there's still a chance that I overlooked something.

So even though this is technically not a breaking change, I'll treat it as one to make you aware of possible issues that may arise.

### Fixed

- Fixed delay after sending process related commands from client. [#540](https://github.com/Nukesor/pueue/pull/540)

### Added

Expand Down
28 changes: 8 additions & 20 deletions docs/Architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,39 +30,30 @@ This includes:
The daemon is composed of two main components.

1. Request handling in `pueue/src/daemon/network/`.
This is the code responsible for communicating with clients.
In `pueue/src/daemon/network/message_handler/` you can find neatly separated handlers for all of Pueue's subcommands.
This is the code responsible for communicating with clients.
In `pueue/src/daemon/network/message_handler/` you can find neatly separated handlers for all of Pueue's subcommands.
2. The TaskHandler in `pueue/src/daemon/task_handler/`.
It's responsible for everything regarding process interaction.
It's responsible for everything regarding process interaction.

All information that's not sub-process specific, is stored in the `State` (`pueue-lib/state.rs`) struct. \
All information, including process specific information, is stored in the `State` (`pueue-lib/state.rs`) struct. \
Both components share a reference to the State, a `Arc<Mutex<State>>`.
That way we can guarantee a single source of truth and a consistent state.

It's also important to know, that there's a `mpsc` channel. \
This channel is used to send on-demand messages from the network request handler to the the TaskHandler.
This includes stuff like "Start/Pause/Kill" sub-processes or "Reset everything".

### Request handling
### Message Handlers

The `pueue/src/daemon/network/socket.rs` module contains the logic for accepting client connections and receiving payloads.
The request accept and handle logic is a single async-await loop run by the main thread.

The payload is then deserialized to `Message` (`pueue-lib/message.rs`) and handled by its respective function.
All functions used for handling these messages can be found in `pueue/src/daemon/network/message_handler`.

Many messages can be instantly handled by simply modifying or reading the state. \
However, sometimes the TaskHandler has to be notified, if something involves modifying actual system processes (start/pause/kill tasks).
That's when the `mpsc` channel to the TaskHandler comes into play.

### TaskHandler

The TaskHandler is responsible for actually starting and managing system processes. \
It's further important to note, that it runs in its own thread.
It shares the async main thread with the message handlers in a `try_join!` call.

The TaskHandler runs a never ending loop, which checks a few times each second, if

- there are new instructions in the `mpsc` channel.
- a new task can be started.
- tasks finished and can be finalized.
- delayed tasks can be enqueued (`-d` flag on `pueue add`)
Expand All @@ -74,12 +65,9 @@ The TaskHandler is by far the most complex piece of code in this project, but th

Whenever you're writing some core-logic in Pueue, please make sure to understand how mutexes work.

Try to be conservative with your `state.lock()` calls, since this also blocks the request handler!
Only use the state, if you absolutely have to.
As a general rule of thumb, the state should only ever be locked in message handler functions and at the top of the TaskHandler's main loop.

At the same time, you should also lock early enough to prevent inconsistent states.
Operations should generally be atomic. \
Anyhow, working with mutexes is usually straight-forward, but can sometimes be a little bit tricky.
This rule allows us to be very conservative with state locking to prevent deadlocks.

## Code Style

Expand Down
135 changes: 135 additions & 0 deletions pueue/src/daemon/callbacks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use std::collections::HashMap;

use chrono::{DateTime, Local};
use handlebars::{Handlebars, RenderError};
use log::{debug, error, info};
use pueue_lib::{
log::{get_log_path, read_last_log_file_lines},
process_helper::compile_shell_command,
settings::Settings,
task::{Task, TaskResult, TaskStatus},
};

use super::state_helper::LockedState;

/// Users can specify a callback that's fired whenever a task finishes.
/// Execute the callback by spawning a new subprocess.
pub fn spawn_callback(settings: &Settings, state: &mut LockedState, task: &Task) {
// Return early, if there's no callback specified
let Some(template_string) = &settings.daemon.callback else {
return;
};

// Build the command to be called from the template string in the configuration file.
let callback_command = match build_callback_command(settings, task, template_string) {
Ok(callback_command) => callback_command,
Err(err) => {
error!("Failed to create callback command from template with error: {err}");
return;
}
};

let mut command = compile_shell_command(settings, &callback_command);

// Spawn the callback subprocess and log if it fails.
let spawn_result = command.spawn();
let child = match spawn_result {
Err(error) => {
error!("Failed to spawn callback with error: {error}");
return;
}
Ok(child) => child,
};

debug!("Spawned callback for task {}", task.id);
state.callbacks.push(child);
}

/// Take the callback template string from the configuration and insert all parameters from the
/// finished task.
pub fn build_callback_command(
settings: &Settings,
task: &Task,
template_string: &str,
) -> Result<String, RenderError> {
// Init Handlebars. We set to strict, as we want to show an error on missing variables.
let mut handlebars = Handlebars::new();
handlebars.set_strict_mode(true);

// Add templating variables.
let mut parameters = HashMap::new();
parameters.insert("id", task.id.to_string());
parameters.insert("command", task.command.clone());
parameters.insert("path", (*task.path.to_string_lossy()).to_owned());
parameters.insert("group", task.group.clone());

// Result takes the TaskResult Enum strings, unless it didn't finish yet.
if let TaskStatus::Done(result) = &task.status {
parameters.insert("result", result.to_string());
} else {
parameters.insert("result", "None".into());
}

// Format and insert start and end times.
let print_time = |time: Option<DateTime<Local>>| {
time.map(|time| time.timestamp().to_string())
.unwrap_or_default()
};
parameters.insert("start", print_time(task.start));
parameters.insert("end", print_time(task.end));

// Read the last lines of the process' output and make it available.
if let Ok(output) = read_last_log_file_lines(
task.id,
&settings.shared.pueue_directory(),
settings.daemon.callback_log_lines,
) {
parameters.insert("output", output);
} else {
parameters.insert("output", "".to_string());
}

let out_path = get_log_path(task.id, &settings.shared.pueue_directory());
// Using Display impl of PathBuf which isn't necessarily a perfect
// representation of the path but should work for most cases here
parameters.insert("output_path", out_path.display().to_string());

// Get the exit code
if let TaskStatus::Done(result) = &task.status {
match result {
TaskResult::Success => parameters.insert("exit_code", "0".into()),
TaskResult::Failed(code) => parameters.insert("exit_code", code.to_string()),
_ => parameters.insert("exit_code", "None".into()),
};
} else {
parameters.insert("exit_code", "None".into());
}

handlebars.render_template(template_string, &parameters)
}

/// Look at all running callbacks and log any errors.
/// If everything went smoothly, simply remove them from the list.
pub fn check_callbacks(state: &mut LockedState) {
let mut finished = Vec::new();
for (id, child) in state.callbacks.iter_mut().enumerate() {
match child.try_wait() {
// Handle a child error.
Err(error) => {
error!("Callback failed with error {error:?}");
finished.push(id);
}
// Child process did not exit yet.
Ok(None) => continue,
Ok(exit_status) => {
info!("Callback finished with exit code {exit_status:?}");
finished.push(id);
}
}
}

finished.reverse();
for id in finished.iter() {
state.callbacks.remove(*id);
}
}
39 changes: 17 additions & 22 deletions pueue/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,28 @@ use std::{fs::create_dir_all, path::PathBuf};

use anyhow::{bail, Context, Result};
use log::warn;
use std::sync::mpsc::channel;

use process_handler::initiate_shutdown;
use pueue_lib::error::Error;
use pueue_lib::network::certificate::create_certificates;
use pueue_lib::network::message::Shutdown;
use pueue_lib::network::protocol::socket_cleanup;
use pueue_lib::network::secret::init_shared_secret;
use pueue_lib::settings::Settings;
use pueue_lib::state::State;
use pueue_lib::state::{SharedState, State};
use tokio::try_join;

use self::state_helper::{restore_state, save_state};
use crate::daemon::network::socket::accept_incoming;
use crate::daemon::task_handler::{TaskHandler, TaskSender};

mod callbacks;
pub mod cli;
mod network;
mod pid;
mod process_handler;
/// Contains re-usable helper functions, that operate on the pueue-lib state.
pub mod state_helper;
mod task_handler;
pub mod task_handler;

/// The main entry point for the daemon logic.
/// It's basically the `main`, but publicly exported as a library.
Expand Down Expand Up @@ -77,24 +79,18 @@ pub async fn run(config_path: Option<PathBuf>, profile: Option<String>, test: bo
save_state(&state, &settings).context("Failed to save state on startup.")?;
let state = Arc::new(Mutex::new(state));

let (sender, receiver) = channel();
let sender = TaskSender::new(sender);
let mut task_handler = TaskHandler::new(state.clone(), settings.clone(), receiver);

// Don't set ctrlc and panic handlers during testing.
// This is necessary for multithreaded integration testing, since multiple listener per process
// aren't allowed. On top of this, ctrlc also somehow breaks test error output.
if !test {
setup_signal_panic_handling(&settings, &sender)?;
setup_signal_panic_handling(&settings, state.clone())?;
}

std::thread::spawn(move || {
task_handler.run();
});

accept_incoming(sender, state.clone(), settings.clone()).await?;

Ok(())
// Run both the task handler and the message handler in the same tokio task.
// If any of them fails, return an error immediately.
let task_handler = task_handler::run(state.clone(), settings.clone());
let message_handler = accept_incoming(settings.clone(), state.clone());
try_join!(task_handler, message_handler).map(|_| ())
}

/// Initialize all directories needed for normal operation.
Expand Down Expand Up @@ -136,17 +132,16 @@ fn init_directories(pueue_dir: &Path) -> Result<()> {
/// TaskHandler. This is to prevent dangling processes and other weird edge-cases.
///
/// On panic, we want to cleanup existing unix sockets and the PID file.
fn setup_signal_panic_handling(settings: &Settings, sender: &TaskSender) -> Result<()> {
let sender_clone = sender.clone();
fn setup_signal_panic_handling(settings: &Settings, state: SharedState) -> Result<()> {
let state_clone = state.clone();
let settings_clone = settings.clone();

// This section handles Shutdown via SigTerm/SigInt process signals
// Notify the TaskHandler, so it can shutdown gracefully.
// The actual program exit will be done via the TaskHandler.
ctrlc::set_handler(move || {
// Notify the task handler
sender_clone
.send(Shutdown::Graceful)
.expect("Failed to send Message to TaskHandler on Shutdown");
let mut state = state_clone.lock().unwrap();
initiate_shutdown(&settings_clone, &mut state, Shutdown::Graceful);
})?;

// Try to do some final cleanup, even if we panic.
Expand Down
7 changes: 6 additions & 1 deletion pueue/src/daemon/network/follow_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ use pueue_lib::network::message::*;
use pueue_lib::network::protocol::{send_message, GenericStream};
use pueue_lib::state::SharedState;

/// Handle the continuous stream of a message.
/// Handle the continuous stream of a some log output.
///
/// It's not actually a stream in the sense of a low-level network stream, but rather a series of
/// `Message::Stream` messages, that each send a portion of new log output.
///
/// It's basically our own chunked stream implementation on top of the protocol we established.
pub async fn handle_follow(
pueue_directory: &Path,
stream: &mut GenericStream,
Expand Down
23 changes: 7 additions & 16 deletions pueue/src/daemon/network/message_handler/add.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
use chrono::Local;
use pueue_lib::aliasing::insert_alias;
use pueue_lib::failure_msg;
use pueue_lib::network::message::*;
use pueue_lib::state::{GroupStatus, SharedState};
use pueue_lib::task::{Task, TaskStatus};

use super::*;
use crate::daemon::process_handler;
use crate::daemon::state_helper::save_state;
use crate::ok_or_return_failure_message;
use crate::ok_or_save_state_failure;

/// Invoked when calling `pueue add`.
/// Queues a new task to the state.
/// If the start_immediately flag is set, send a StartMessage to the task handler.
pub fn add_task(
message: AddMessage,
sender: &TaskSender,
state: &SharedState,
settings: &Settings,
) -> Message {
pub fn add_task(settings: &Settings, state: &SharedState, message: AddMessage) -> Message {
let mut state = state.lock().unwrap();
if let Err(message) = ensure_group_exists(&mut state, &message.group) {
return message;
Expand All @@ -29,9 +26,7 @@ pub fn add_task(
.filter(|id| !state.tasks.contains_key(id))
.collect();
if !not_found.is_empty() {
return create_failure_message(format!(
"Unable to setup dependencies : task(s) {not_found:?} not found",
));
return failure_msg!("Unable to setup dependencies : task(s) {not_found:?} not found",);
}

// Create a new task and add it to the state.
Expand Down Expand Up @@ -78,15 +73,11 @@ pub fn add_task(

// Add the task and persist the state.
let task_id = state.add_task(task);
ok_or_return_failure_message!(save_state(&state, settings));
ok_or_save_state_failure!(save_state(&state, settings));

// Notify the task handler, in case the client wants to start the task immediately.
if message.start_immediately {
sender
.send(StartMessage {
tasks: TaskSelection::TaskIds(vec![task_id]),
})
.expect(SENDER_ERR);
process_handler::start::start(settings, &mut state, TaskSelection::TaskIds(vec![task_id]));
}

// Create the customized response for the client.
Expand Down
Loading

0 comments on commit a9ed6f7

Please sign in to comment.