Skip to content

Commit

Permalink
Replace error sender with shutdown trigger.
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Jan 16, 2024
1 parent 7988669 commit a1a378a
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 62 deletions.
20 changes: 8 additions & 12 deletions src/app_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use avail_subxt::utils::H256;
use codec::Encode;
use color_eyre::{
eyre::{eyre, WrapErr},
Report, Result,
Result,
};
use dusk_plonk::commitment_scheme::kzg10::PublicParameters;
use kate_recovery::{
Expand All @@ -39,13 +39,14 @@ use std::{
ops::Range,
sync::{Arc, Mutex},
};
use tokio::sync::{broadcast, mpsc::Sender};
use tokio::sync::broadcast;
use tracing::{debug, error, info, instrument};

use crate::{
data::store_encoded_data_in_db,
network::{p2p::Client as P2pClient, rpc::Client as RpcClient},
proof,
shutdown::Controller,
types::{AppClientConfig, BlockVerified, OptionBlockRange, State},
};

Expand Down Expand Up @@ -441,7 +442,7 @@ pub async fn run(
state: Arc<Mutex<State>>,
sync_range: Range<u32>,
data_verified_sender: broadcast::Sender<(u32, AppData)>,
error_sender: Sender<Report>,
shutdown: Controller<String>,
) {
info!("Starting for app {app_id}...");

Expand All @@ -465,9 +466,7 @@ pub async fn run(
Ok(block) => block,
Err(error) => {
error!("Cannot receive message: {error}");
if let Err(error) = error_sender.send(error.into()).await {
error!("Cannot send error message: {error}");
}
let _ = shutdown.trigger_shutdown(format!("Cannot receive message: {error:#}"));
return;
},
};
Expand Down Expand Up @@ -496,18 +495,15 @@ pub async fn run(
Ok(data) => data,
Err(error) => {
error!(block_number, "Cannot process block: {error}");
if let Err(error) = error_sender.send(error).await {
error!("Cannot send error message: {error}");
}
let _ = shutdown.trigger_shutdown(format!("Cannot process block: {error:#}"));
return;
},
};
set_data_verified_state(state.clone(), &sync_range, block_number);
if let Err(error) = data_verified_sender.send((block_number, data)) {
error!("Cannot send data verified message: {error}");
if let Err(error) = error_sender.send(error.into()).await {
error!("Cannot send error message: {error}");
}
let _ =
shutdown.trigger_shutdown(format!("Cannot send data verified message: {error:#}"));
return;
}
debug!(block_number, "Block processed");
Expand Down
36 changes: 10 additions & 26 deletions src/bin/avail-light.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use avail_light::{
use clap::Parser;
use color_eyre::{
eyre::{eyre, WrapErr},
Report, Result,
Result,
};
use kate_recovery::com::AppData;
use libp2p::{multiaddr::Protocol, Multiaddr};
Expand All @@ -27,11 +27,7 @@ use std::{
path::Path,
sync::{Arc, Mutex},
};
use tokio::sync::{
broadcast,
mpsc::{channel, Sender},
RwLock,
};
use tokio::sync::{broadcast, RwLock};
use tracing::{error, info, metadata::ParseLevelError, trace, warn, Level, Subscriber};
use tracing_subscriber::{fmt::format, EnvFilter, FmtSubscriber};

Expand Down Expand Up @@ -75,7 +71,7 @@ fn parse_log_level(log_level: &str, default: Level) -> (Level, Option<ParseLevel
.unwrap_or_else(|parse_err| (default, Some(parse_err)))
}

async fn run(error_sender: Sender<Report>, shutdown: Controller<String>) -> Result<()> {
async fn run(shutdown: Controller<String>) -> Result<()> {
let opts = CliOpts::parse();

let mut cfg: RuntimeConfig = RuntimeConfig::default();
Expand Down Expand Up @@ -200,7 +196,7 @@ async fn run(error_sender: Sender<Report>, shutdown: Controller<String>) -> Resu
info!("Bootstrap done.");
},
Err(e) => {
warn!("Bootstrap process: {e:?}.");
error!("Bootstrap process: {e:?}.");
},
}
});
Expand Down Expand Up @@ -297,7 +293,7 @@ async fn run(error_sender: Sender<Report>, shutdown: Controller<String>) -> Resu
state.clone(),
sync_range.clone(),
data_tx,
error_sender.clone(),
shutdown.clone(),
)));
data_rx
});
Expand Down Expand Up @@ -358,7 +354,7 @@ async fn run(error_sender: Sender<Report>, shutdown: Controller<String>) -> Resu
let sync_finality = avail_light::sync_finality::new(db.clone(), rpc_client.clone());
tokio::task::spawn(shutdown.with_cancel(avail_light::sync_finality::run(
sync_finality,
error_sender.clone(),
shutdown.clone(),
state.clone(),
block_header.clone(),
)));
Expand All @@ -374,13 +370,12 @@ async fn run(error_sender: Sender<Report>, shutdown: Controller<String>) -> Resu
p2p_client.clone(),
ot_metrics.clone(),
block_rx,
error_sender.clone(),
shutdown.clone(),
)));

let channels = avail_light::types::ClientChannels {
block_sender: block_tx,
rpc_event_receiver: client_rpc_event_receiver,
error_sender: error_sender.clone(),
};

if let Some(partition) = cfg.block_matrix_partition {
Expand All @@ -393,6 +388,7 @@ async fn run(error_sender: Sender<Report>, shutdown: Controller<String>) -> Resu
ot_metrics.clone(),
channels,
partition,
shutdown.clone(),
)));
} else {
let light_client = avail_light::light_client::new(db.clone());
Expand All @@ -406,6 +402,7 @@ async fn run(error_sender: Sender<Report>, shutdown: Controller<String>) -> Resu
ot_metrics,
state.clone(),
channels,
shutdown.clone(),
)));
}

Expand Down Expand Up @@ -527,24 +524,11 @@ pub async fn main() -> Result<()> {
// spawn a task to watch for ctrl-c signals from user to trigger the shutdown
tokio::spawn(shutdown.with_trigger("user signaled shutdown".to_string(), user_signal()));

let (error_sender, mut error_receiver) = channel::<Report>(1);

if let Err(error) = run(error_sender, shutdown.clone()).await {
if let Err(error) = run(shutdown.clone()).await {
error!("{error:#}");
return Err(error.wrap_err("Starting Light Client failed"));
};

tokio::spawn({
let shutdown = shutdown.clone();
async move {
let report = match error_receiver.recv().await {
Some(report) => report,
None => eyre!("Failed to receive error messages"),
};
_ = shutdown.trigger_shutdown(report.to_string());
}
});

let reason = shutdown.completed_shutdown().await;

// we are not logging error here since expectation is
Expand Down
7 changes: 4 additions & 3 deletions src/fat_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
p2p::Client as P2pClient,
rpc::{Client as RpcClient, Event},
},
shutdown::Controller,
telemetry::{MetricCounter, MetricValue, Metrics},
types::{BlockVerified, ClientChannels, FatClientConfig},
utils::extract_kate,
Expand Down Expand Up @@ -202,12 +203,14 @@ pub async fn process_block(
/// * `metrics` - Metrics registry
/// * `channels` - Communitaction channels
/// * `partition` - Assigned fat client partition
/// * `shutdown` - Shutdown controller
pub async fn run(
fat_client: impl FatClient,
cfg: FatClientConfig,
metrics: Arc<impl Metrics>,
mut channels: ClientChannels,
partition: Partition,
shutdown: Controller<String>,
) {
info!("Starting fat client...");

Expand Down Expand Up @@ -240,9 +243,7 @@ pub async fn run(
process_block(&fat_client, &metrics, &cfg, &header, received_at, partition).await
{
error!("Cannot process block: {error}");
if let Err(error) = channels.error_sender.send(error).await {
error!("Cannot send error message: {error}");
}
let _ = shutdown.trigger_shutdown(format!("Cannot process block: {error:#}"));
return;
};

Expand Down
7 changes: 4 additions & 3 deletions src/light_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::{
self,
rpc::{self, Event},
},
shutdown::Controller,
telemetry::{MetricCounter, MetricValue, Metrics},
types::{self, ClientChannels, LightClientConfig, OptionBlockRange, State},
utils::{calculate_confidence, extract_kate},
Expand Down Expand Up @@ -201,13 +202,15 @@ pub async fn process_block(
/// * `metrics` - Metrics registry
/// * `state` - Processed blocks state
/// * `channels` - Communitaction channels
/// * `shutdown` - Shutdown controller
pub async fn run(
light_client: impl LightClient,
network_client: impl network::Client,
cfg: LightClientConfig,
metrics: Arc<impl Metrics>,
state: Arc<Mutex<State>>,
mut channels: ClientChannels,
shutdown: Controller<String>,
) {
info!("Starting light client...");

Expand Down Expand Up @@ -250,9 +253,7 @@ pub async fn run(
Ok(confidence) => confidence,
Err(error) => {
error!("Cannot process block: {error}");
if let Err(error) = channels.error_sender.send(error).await {
error!("Cannot send error message: {error}");
}
let _ = shutdown.trigger_shutdown(format!("Cannot process block: {error:#}"));
return;
},
};
Expand Down
16 changes: 8 additions & 8 deletions src/maintenance.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use color_eyre::{eyre::WrapErr, Report, Result};
use color_eyre::{eyre::WrapErr, Result};
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc::Sender};
use tracing::{debug, error, info};
use tokio::sync::broadcast;
use tracing::{debug, info};

use crate::{
network::p2p::Client as P2pClient,
shutdown::Controller,
telemetry::{MetricValue, Metrics},
types::BlockVerified,
};
Expand Down Expand Up @@ -38,8 +39,8 @@ pub async fn run(
p2p_client: P2pClient,
metrics: Arc<impl Metrics>,
mut block_receiver: broadcast::Receiver<BlockVerified>,
error_sender: Sender<Report>,
) -> ! {
shutdown: Controller<String>,
) {
info!("Starting maintenance...");

loop {
Expand All @@ -49,9 +50,8 @@ pub async fn run(
};

if let Err(error) = result {
if let Err(error) = error_sender.send(error).await {
error!("Cannot send error message: {error}");
}
let _ = shutdown.trigger_shutdown(format!("{error:#}"));
break;
}
}
}
14 changes: 6 additions & 8 deletions src/sync_finality.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use avail_subxt::primitives::Header;
use codec::Encode;
use color_eyre::{
eyre::{bail, eyre, Context},
Report, Result,
Result,
};
use futures::future::join_all;
use rocksdb::DB;
Expand All @@ -12,14 +12,14 @@ use sp_core::{
twox_128, Pair, H256,
};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::Sender;
use tracing::{error, info, trace};

use crate::{
data::{
get_finality_sync_checkpoint, store_block_header_in_db, store_finality_sync_checkpoint,
},
network::rpc::{self, WrappedProof},
shutdown::Controller,
types::{FinalitySyncCheckpoint, SignerMessage, State},
utils::filter_auth_set_changes,
};
Expand Down Expand Up @@ -108,15 +108,13 @@ async fn get_valset_at_genesis(

pub async fn run(
sync_finality_impl: impl SyncFinality,
error_sender: Sender<Report>,
shutdown: Controller<String>,
state: Arc<Mutex<State>>,
from_header: Header,
) {
if let Err(err) = sync_finality(sync_finality_impl, state, from_header).await {
error!("Cannot sync finality {err}");
if let Err(error) = error_sender.send(err).await {
error!("Cannot send error message: {error}");
}
if let Err(error) = sync_finality(sync_finality_impl, state, from_header).await {
error!("Cannot sync finality {error}");
let _ = shutdown.trigger_shutdown(format!("Cannot sync finality {error:#}"));
};
}

Expand Down
2 changes: 0 additions & 2 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use std::str::FromStr;
use std::time::{Duration, Instant};
use subxt::ext::sp_core::{sr25519::Pair, Pair as _};
use tokio::sync::broadcast;
use tokio::sync::mpsc::Sender;

const CELL_SIZE: usize = 32;
const PROOF_SIZE: usize = 48;
Expand Down Expand Up @@ -94,7 +93,6 @@ pub struct BlockVerified {
pub struct ClientChannels {
pub block_sender: broadcast::Sender<BlockVerified>,
pub rpc_event_receiver: broadcast::Receiver<Event>,
pub error_sender: Sender<Report>,
}

impl TryFrom<(DaHeader, Option<f64>)> for BlockVerified {
Expand Down

0 comments on commit a1a378a

Please sign in to comment.