Skip to content

Commit

Permalink
feat: endpoint for liveliness/network connection
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Jun 17, 2024
1 parent 6d7f99a commit b575e35
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 7 deletions.
7 changes: 7 additions & 0 deletions uplink/src/base/mqtt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tokio::{select, task};
use std::fs::File;
use std::io::Read;
use std::path::Path;
use std::sync::Mutex;

use crate::{Action, Config};
use rumqttc::{
Expand Down Expand Up @@ -58,13 +59,16 @@ pub struct Mqtt {
/// Control handles
ctrl_rx: Receiver<MqttShutdown>,
ctrl_tx: Sender<MqttShutdown>,
/// True when network is connected
network_up: Arc<Mutex<bool>>,
}

impl Mqtt {
pub fn new(
config: Arc<Config>,
actions_tx: Sender<Action>,
metrics_tx: Sender<MqttMetrics>,
network_up: Arc<Mutex<bool>>,
) -> Mqtt {
// create a new eventloop and reuse it during every reconnection
let options = mqttoptions(&config);
Expand All @@ -81,6 +85,7 @@ impl Mqtt {
metrics_tx,
ctrl_tx,
ctrl_rx,
network_up,
}
}

Expand Down Expand Up @@ -168,6 +173,7 @@ impl Mqtt {
event = self.eventloop.poll() => {
match event {
Ok(Event::Incoming(Incoming::ConnAck(connack))) => {
*self.network_up.lock().unwrap() = true;
info!("Connected to broker. Session present = {}", connack.session_present);
let subscription = self.config.actions_subscription.clone();
let client = self.client();
Expand Down Expand Up @@ -213,6 +219,7 @@ impl Mqtt {
}
}
Err(e) => {
*self.network_up.lock().unwrap() = false;
self.metrics.add_reconnection();
self.check_disconnection_metrics(e);
tokio::time::sleep(Duration::from_secs(3)).await;
Expand Down
2 changes: 1 addition & 1 deletion uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl StorageHandler {
}

/// The uplink Serializer is the component that deals with serializing, compressing and writing data onto disk or Network.
/// In case of network issues, the Serializer enters various states depending on the severeness, managed by [`start()`].
/// In case of network issues, the Serializer enters various states depending on the severeness, managed by [`start()`].
///
/// The Serializer writes data directly to network in **normal mode** with the [`try_publish()`] method on the MQTT client.
/// In case of the network being slow, this fails and we are forced into **slow mode**, where-in new data gets written into
Expand Down
15 changes: 13 additions & 2 deletions uplink/src/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use axum::{
extract::State,
http::StatusCode,
response::IntoResponse,
routing::{post, put},
routing::{get, post, put},
Router,
};
use log::info;
Expand All @@ -17,6 +17,7 @@ struct StateHandle {
reload_handle: ReloadHandle,
ctrl_tx: CtrlTx,
downloader_disable: Arc<Mutex<bool>>,
network_up: Arc<Mutex<bool>>,
}

#[tokio::main]
Expand All @@ -25,15 +26,17 @@ pub async fn start(
reload_handle: ReloadHandle,
ctrl_tx: CtrlTx,
downloader_disable: Arc<Mutex<bool>>,
network_up: Arc<Mutex<bool>>,
) {
let address = format!("0.0.0.0:{port}");
info!("Starting uplink console server: {address}");
let state = StateHandle { reload_handle, ctrl_tx, downloader_disable };
let state = StateHandle { reload_handle, ctrl_tx, downloader_disable, network_up };
let app = Router::new()
.route("/logs", post(reload_loglevel))
.route("/shutdown", post(shutdown))
.route("/disable_downloader", put(disable_downloader))
.route("/enable_downloader", put(enable_downloader))
.route("/is_online", get(is_online))
.with_state(state);

axum::Server::bind(&address.parse().unwrap()).serve(app.into_make_service()).await.unwrap();
Expand Down Expand Up @@ -78,3 +81,11 @@ async fn enable_downloader(State(state): State<StateHandle>) -> impl IntoRespons
StatusCode::ACCEPTED
}
}

async fn is_online(State(state): State<StateHandle>) -> impl IntoResponse {
if *state.network_up.lock().unwrap() {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
}
}
4 changes: 3 additions & 1 deletion uplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,13 @@ impl Uplink {
&mut self,
mut bridge: Bridge,
downloader_disable: Arc<Mutex<bool>>,
network_up: Arc<Mutex<bool>>,
) -> Result<CtrlTx, Error> {
let (mqtt_metrics_tx, mqtt_metrics_rx) = bounded(10);
let (ctrl_actions_lane, ctrl_data_lane) = bridge.ctrl_tx();

let mut mqtt = Mqtt::new(self.config.clone(), self.action_tx.clone(), mqtt_metrics_tx);
let mut mqtt =
Mqtt::new(self.config.clone(), self.action_tx.clone(), mqtt_metrics_tx, network_up);
let mqtt_client = mqtt.client();
let ctrl_mqtt = mqtt.ctrl_tx();

Expand Down
7 changes: 4 additions & 3 deletions uplink/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub type ReloadHandle =
use uplink::config::{AppConfig, Config, StreamConfig, MAX_BATCH_SIZE};
use uplink::{simulator, spawn_named_thread, TcpJson, Uplink};

const DEFAULT_CONFIG: &str = r#"
const DEFAULT_CONFIG: &str = r#"
[mqtt]
max_packet_size = 256000
max_inflight = 100
Expand Down Expand Up @@ -329,7 +329,8 @@ fn main() -> Result<(), Error> {
};

let downloader_disable = Arc::new(Mutex::new(false));
let ctrl_tx = uplink.spawn(bridge, downloader_disable.clone())?;
let network_up = Arc::new(Mutex::new(false));
let ctrl_tx = uplink.spawn(bridge, downloader_disable.clone(), network_up.clone())?;

if let Some(config) = config.simulator.clone() {
spawn_named_thread("Simulator", || {
Expand All @@ -341,7 +342,7 @@ fn main() -> Result<(), Error> {
let port = config.console.port;
let ctrl_tx = ctrl_tx.clone();
spawn_named_thread("Uplink Console", move || {
console::start(port, reload_handle, ctrl_tx, downloader_disable)
console::start(port, reload_handle, ctrl_tx, downloader_disable, network_up)
});
}

Expand Down

0 comments on commit b575e35

Please sign in to comment.