Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
549 changes: 4 additions & 545 deletions crates/admin/src/cluster_controller/service.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ use futures::future::OptionFuture;
use itertools::Itertools;
use tracing::info;

use crate::cluster_controller::service::Service;
use crate::cluster_controller::service::scheduler::Scheduler;
use crate::cluster_controller::service::scheduler_task::SchedulerTask;
use crate::cluster_controller::service::trim_logs_task::TrimLogsTask;
use restate_core::network::TransportConnect;
use restate_core::{TaskCenter, TaskId, TaskKind, my_node_id};
use restate_types::nodes_config::NodesConfiguration;

use crate::cluster_controller::service::Service;
use crate::cluster_controller::service::scheduler::Scheduler;
use crate::cluster_controller::service::scheduler_task::SchedulerTask;

pub enum ClusterControllerState {
Follower,
Leader(Leader),
Expand Down Expand Up @@ -82,7 +82,6 @@ impl ClusterControllerState {
}

pub struct Leader {
trim_logs_task: TaskId,
scheduler_task: TaskId,
}

Expand All @@ -94,20 +93,6 @@ impl Leader {
service.replica_set_states.clone(),
);

let trim_task = TrimLogsTask::new(
service.bifrost.clone(),
service.cluster_state_refresher.cluster_state_watcher(),
);

// We spawn the trim task as a child task to make use of the built-in error handling of
// managed tasks. Otherwise, we would have to monitor for failed tasks ourselves.
let trim_logs_task =
TaskCenter::spawn_child(TaskKind::Background, "trim-logs", async move {
trim_task.run().await;
Ok(())
})
.expect("failed to spawn trim logs task");

// We spawn the scheduler task as a child task to make use of the built-in error handling of
// managed tasks. Otherwise, we would have to monitor for failed tasks ourselves.
let scheduler_task = TaskCenter::spawn_child(
Expand All @@ -122,21 +107,14 @@ impl Leader {
)
.expect("failed to spawn scheduler task");

Self {
trim_logs_task,
scheduler_task,
}
Self { scheduler_task }
}

/// Stops the leader tasks to make sure that no other leader activity is running.
async fn stop(self) {
let trim_logs_task = TaskCenter::cancel_task(self.trim_logs_task);
let scheduler_task = TaskCenter::cancel_task(self.scheduler_task);

// ignore if the task failed during cancellation
let (_trim_logs_task, _scheduler_task) = tokio::join!(
OptionFuture::from(trim_logs_task),
OptionFuture::from(scheduler_task)
);
let _scheduler_task = OptionFuture::from(scheduler_task).await;
}
}
Loading
Loading