From b575e3592ef829a0ba8aa3a3aee7c8ae87ad3d07 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 17 Jun 2024 23:28:19 +0530 Subject: [PATCH] feat: endpoint for liveliness/network connection --- uplink/src/base/mqtt/mod.rs | 7 +++++++ uplink/src/base/serializer/mod.rs | 2 +- uplink/src/console.rs | 15 +++++++++++++-- uplink/src/lib.rs | 4 +++- uplink/src/main.rs | 7 ++++--- 5 files changed, 28 insertions(+), 7 deletions(-) diff --git a/uplink/src/base/mqtt/mod.rs b/uplink/src/base/mqtt/mod.rs index a529bc3c..c388757d 100644 --- a/uplink/src/base/mqtt/mod.rs +++ b/uplink/src/base/mqtt/mod.rs @@ -9,6 +9,7 @@ use tokio::{select, task}; use std::fs::File; use std::io::Read; use std::path::Path; +use std::sync::Mutex; use crate::{Action, Config}; use rumqttc::{ @@ -58,6 +59,8 @@ pub struct Mqtt { /// Control handles ctrl_rx: Receiver, ctrl_tx: Sender, + /// True when network is connected + network_up: Arc>, } impl Mqtt { @@ -65,6 +68,7 @@ impl Mqtt { config: Arc, actions_tx: Sender, metrics_tx: Sender, + network_up: Arc>, ) -> Mqtt { // create a new eventloop and reuse it during every reconnection let options = mqttoptions(&config); @@ -81,6 +85,7 @@ impl Mqtt { metrics_tx, ctrl_tx, ctrl_rx, + network_up, } } @@ -168,6 +173,7 @@ impl Mqtt { event = self.eventloop.poll() => { match event { Ok(Event::Incoming(Incoming::ConnAck(connack))) => { + *self.network_up.lock().unwrap() = true; info!("Connected to broker. Session present = {}", connack.session_present); let subscription = self.config.actions_subscription.clone(); let client = self.client(); @@ -213,6 +219,7 @@ impl Mqtt { } } Err(e) => { + *self.network_up.lock().unwrap() = false; self.metrics.add_reconnection(); self.check_disconnection_metrics(e); tokio::time::sleep(Duration::from_secs(3)).await; diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index e1edb90d..062472ab 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -224,7 +224,7 @@ impl StorageHandler { } /// The uplink Serializer is the component that deals with serializing, compressing and writing data onto disk or Network. -/// In case of network issues, the Serializer enters various states depending on the severeness, managed by [`start()`]. +/// In case of network issues, the Serializer enters various states depending on the severeness, managed by [`start()`]. /// /// The Serializer writes data directly to network in **normal mode** with the [`try_publish()`] method on the MQTT client. /// In case of the network being slow, this fails and we are forced into **slow mode**, where-in new data gets written into diff --git a/uplink/src/console.rs b/uplink/src/console.rs index ea787c39..5b1d5525 100644 --- a/uplink/src/console.rs +++ b/uplink/src/console.rs @@ -4,7 +4,7 @@ use axum::{ extract::State, http::StatusCode, response::IntoResponse, - routing::{post, put}, + routing::{get, post, put}, Router, }; use log::info; @@ -17,6 +17,7 @@ struct StateHandle { reload_handle: ReloadHandle, ctrl_tx: CtrlTx, downloader_disable: Arc>, + network_up: Arc>, } #[tokio::main] @@ -25,15 +26,17 @@ pub async fn start( reload_handle: ReloadHandle, ctrl_tx: CtrlTx, downloader_disable: Arc>, + network_up: Arc>, ) { let address = format!("0.0.0.0:{port}"); info!("Starting uplink console server: {address}"); - let state = StateHandle { reload_handle, ctrl_tx, downloader_disable }; + let state = StateHandle { reload_handle, ctrl_tx, downloader_disable, network_up }; let app = Router::new() .route("/logs", post(reload_loglevel)) .route("/shutdown", post(shutdown)) .route("/disable_downloader", put(disable_downloader)) .route("/enable_downloader", put(enable_downloader)) + .route("/is_online", get(is_online)) .with_state(state); axum::Server::bind(&address.parse().unwrap()).serve(app.into_make_service()).await.unwrap(); @@ -78,3 +81,11 @@ async fn enable_downloader(State(state): State) -> impl IntoRespons StatusCode::ACCEPTED } } + +async fn is_online(State(state): State) -> impl IntoResponse { + if *state.network_up.lock().unwrap() { + StatusCode::OK + } else { + StatusCode::SERVICE_UNAVAILABLE + } +} diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index eb7a6fa6..24394f51 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -133,11 +133,13 @@ impl Uplink { &mut self, mut bridge: Bridge, downloader_disable: Arc>, + network_up: Arc>, ) -> Result { let (mqtt_metrics_tx, mqtt_metrics_rx) = bounded(10); let (ctrl_actions_lane, ctrl_data_lane) = bridge.ctrl_tx(); - let mut mqtt = Mqtt::new(self.config.clone(), self.action_tx.clone(), mqtt_metrics_tx); + let mut mqtt = + Mqtt::new(self.config.clone(), self.action_tx.clone(), mqtt_metrics_tx, network_up); let mqtt_client = mqtt.client(); let ctrl_mqtt = mqtt.ctrl_tx(); diff --git a/uplink/src/main.rs b/uplink/src/main.rs index e4254fc2..a0c308dc 100644 --- a/uplink/src/main.rs +++ b/uplink/src/main.rs @@ -21,7 +21,7 @@ pub type ReloadHandle = use uplink::config::{AppConfig, Config, StreamConfig, MAX_BATCH_SIZE}; use uplink::{simulator, spawn_named_thread, TcpJson, Uplink}; -const DEFAULT_CONFIG: &str = r#" +const DEFAULT_CONFIG: &str = r#" [mqtt] max_packet_size = 256000 max_inflight = 100 @@ -329,7 +329,8 @@ fn main() -> Result<(), Error> { }; let downloader_disable = Arc::new(Mutex::new(false)); - let ctrl_tx = uplink.spawn(bridge, downloader_disable.clone())?; + let network_up = Arc::new(Mutex::new(false)); + let ctrl_tx = uplink.spawn(bridge, downloader_disable.clone(), network_up.clone())?; if let Some(config) = config.simulator.clone() { spawn_named_thread("Simulator", || { @@ -341,7 +342,7 @@ fn main() -> Result<(), Error> { let port = config.console.port; let ctrl_tx = ctrl_tx.clone(); spawn_named_thread("Uplink Console", move || { - console::start(port, reload_handle, ctrl_tx, downloader_disable) + console::start(port, reload_handle, ctrl_tx, downloader_disable, network_up) }); }