Skip to content

Commit

Permalink
refactor: serializer module code improvements (#214)
Browse files Browse the repository at this point in the history
* doc: reorganize serializer code documentation

* refactor: make functions private and remove unused

* doc: explain `SerializerMetrics`

* refactor: remove `MqttClient::publish_bytes`

* doc: explain `MqttClient`
  • Loading branch information
de-sh authored Apr 27, 2023
1 parent 07c55b6 commit 595a6f5
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 66 deletions.
13 changes: 11 additions & 2 deletions uplink/src/base/serializer/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,26 @@ use serde::Serialize;

use crate::collector::utils::{self, clock};

/// Metrics information relating to the operation of the `Serializer`, all values are reset on metrics flush
#[derive(Debug, Serialize, Clone)]
pub struct SerializerMetrics {
pub timestamp: u128,
pub sequence: u32,
timestamp: u128,
sequence: u32,
/// One of **Catchup**, **Normal**, **Slow** or **Crash**
pub mode: String,
/// Number of batches serialized
pub batches: usize,
/// Size of the write memory buffer within `Storage`
pub write_memory: usize,
/// Size of the read memory buffer within `Storage`
pub read_memory: usize,
/// Number of files that have been written to disk
pub disk_files: usize,
/// Nuber of persistence files that had to deleted before being consumed
pub lost_segments: usize,
/// Number of errors faced during serializer operation
pub errors: usize,
/// Size in bytes, of serialized data sent onto network
pub sent_size: usize,
}

Expand Down
91 changes: 27 additions & 64 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod metrics;
mod metrics;

pub use metrics::SerializerMetrics;

Expand Down Expand Up @@ -59,8 +59,10 @@ enum Status {
EventLoopCrash(Publish),
}

/// Description of an interface that the [`Serializer`] expects to be provided by the MQTT client to publish the serialized data with.
#[async_trait::async_trait]
pub trait MqttClient: Clone {
/// Accept payload and resolve as an error only when the client has died(thread kill). Useful in Slow/Catchup mode.
async fn publish<S, V>(
&self,
topic: S,
Expand All @@ -72,6 +74,7 @@ pub trait MqttClient: Clone {
S: Into<String> + Send,
V: Into<Vec<u8>> + Send;

/// Accept payload and resolve as an error if data can't be sent over network, immediately. Useful in Normal mode.
fn try_publish<S, V>(
&self,
topic: S,
Expand All @@ -82,15 +85,6 @@ pub trait MqttClient: Clone {
where
S: Into<String>,
V: Into<Vec<u8>>;
async fn publish_bytes<S>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: Bytes,
) -> Result<(), MqttError>
where
S: Into<String> + Send;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -124,23 +118,22 @@ impl MqttClient for AsyncClient {
self.try_publish(topic, qos, retain, payload)?;
Ok(())
}
async fn publish_bytes<S>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: Bytes,
) -> Result<(), MqttError>
where
S: Into<String> + Send,
{
self.publish_bytes(topic, qos, retain, payload).await?;
Ok(())
}
}

/// The uplink Serializer is the component that deals with sending data to the Bytebeam platform.
/// In case of network issues, the Serializer enters various states depending on severeness, managed by [`start()`].
/// The uplink Serializer is the component that deals with serializing, compressing and writing data onto disk or Network.
/// In case of network issues, the Serializer enters various states depending on the severeness, managed by [`start()`].
///
/// The Serializer writes data directly to network in **normal mode** with the [`try_publish()`] method on the MQTT client.
/// In case of the network being slow, this fails and we are forced into **slow mode**, where-in new data gets written into
/// [`Storage`] while consequently we await on a [`publish()`]. If the [`publish()`] succeeds, we move into **catchup mode**
/// or if it fails we move to the **crash mode**. In **catchup mode**, we continuously write to [`Storage`] while also
/// pushing data onto the network with a [`publish()`]. If a [`publish()`] succeds, we load the next [`Publish`] packet from
/// [`Storage`], until it is empty. We can transition back into normal mode when [`Storage`] is empty during operation in
/// the catchup mode.
///
/// P.S: We have a transition into **crash mode** when we are in catchup or slow mode and the thread running the MQTT client
/// stalls and dies out. Here we merely write all data received, directly into disk. This is a failure mode that ideally the
/// serializer should never be operated in.
///
/// ```text
///
Expand All @@ -166,6 +159,8 @@ impl MqttClient for AsyncClient {
///
///```
/// [`start()`]: Serializer::start
/// [`try_publish()`]: AsyncClient::try_publish
/// [`publish()`]: AsyncClient::publish
pub struct Serializer<C: MqttClient> {
config: Arc<Config>,
collector_rx: Receiver<Box<dyn Package>>,
Expand All @@ -177,6 +172,8 @@ pub struct Serializer<C: MqttClient> {
}

impl<C: MqttClient> Serializer<C> {
/// Construct the uplink Serializer with the necessary configuration details, a receiver handle to accept data payloads from,
/// the handle to an MQTT client(This is constructed as such for testing purposes) and a handle to update serailizer metrics.
pub fn new(
config: Arc<Config>,
collector_rx: Receiver<Box<dyn Package>>,
Expand Down Expand Up @@ -464,15 +461,7 @@ impl<C: MqttClient> Serializer<C> {
}
}

/// The Serializer writes data directly to network in **normal mode** by [`try_publish()`] on the MQTT client. In case of the
/// network being slow, this fails and we are forced into **slow mode**, where in new data is written into [`Storage`]
/// while consequently we await on a [`publish()`]. If the [`publish()`] succeeds, we move into **catchup mode** or otherwise,
/// if it fails we move to **crash mode**. In **catchup mode**, we continuously write to [`Storage`] while also pushing data
/// onto network by [`publish()`]. If a [`publish()`] succeds, we load the next [`Publish`] packet from [`Storage`], whereas
/// if it fails, we transition into **crash mode** where we merely write all data received, directly into disk.
///
/// [`try_publish()`]: AsyncClient::try_publish
/// [`publish()`]: AsyncClient::publish
/// Starts operation of the uplink serializer, which can transition between the modes mentioned earlier.
pub async fn start(mut self) -> Result<(), Error> {
let mut status = Status::EventLoopReady;

Expand All @@ -495,7 +484,7 @@ async fn send_publish<C: MqttClient>(
payload: Bytes,
) -> Result<C, MqttError> {
debug!("publishing on {topic} with size = {}", payload.len());
client.publish_bytes(topic, QoS::AtLeastOnce, false, payload).await?;
client.publish(topic, QoS::AtLeastOnce, false, payload).await?;
Ok(client)
}

Expand Down Expand Up @@ -526,7 +515,7 @@ fn write_to_disk(mut publish: Publish, storage: &mut Storage) -> Result<Option<u
Ok(deleted)
}

pub fn check_metrics(metrics: &mut SerializerMetrics, storage: &Option<Storage>) {
fn check_metrics(metrics: &mut SerializerMetrics, storage: &Option<Storage>) {
use pretty_bytes::converter::convert;

if let Some(s) = storage {
Expand All @@ -547,7 +536,7 @@ pub fn check_metrics(metrics: &mut SerializerMetrics, storage: &Option<Storage>)
);
}

pub fn save_and_prepare_next_metrics(
fn save_and_prepare_next_metrics(
pending: &mut VecDeque<SerializerMetrics>,
metrics: &mut SerializerMetrics,
storage: &Option<Storage>,
Expand All @@ -564,7 +553,7 @@ pub fn save_and_prepare_next_metrics(
}

// Enable actual metrics timers when there is data. This method is called every minute by the bridge
pub fn check_and_flush_metrics(
fn check_and_flush_metrics(
pending: &mut VecDeque<SerializerMetrics>,
metrics: &mut SerializerMetrics,
metrics_tx: &Sender<SerializerMetrics>,
Expand Down Expand Up @@ -607,15 +596,6 @@ pub fn check_and_flush_metrics(
Ok(())
}

pub fn flush_metrics(
metrics: &mut SerializerMetrics,
metrics_tx: &Sender<SerializerMetrics>,
) -> Result<(), flume::TrySendError<SerializerMetrics>> {
metrics_tx.try_send(metrics.clone())?;
metrics.prepare_next();
Ok(())
}

// TODO(RT): Test cases
// - Restart with no internet but files on disk

Expand Down Expand Up @@ -673,23 +653,6 @@ mod test {
self.net_tx.try_send(publish).map_err(|e| MqttError::TrySend(e.into_inner()))?;
Ok(())
}

async fn publish_bytes<S>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: Bytes,
) -> Result<(), MqttError>
where
S: Into<String> + Send,
{
let mut publish = Publish::from_bytes(topic, qos, payload);
publish.retain = retain;
let publish = Request::Publish(publish);
self.net_tx.send_async(publish).await.map_err(|e| MqttError::Send(e.into_inner()))?;
Ok(())
}
}

fn read_from_storage(storage: &mut Storage, max_packet_size: usize) -> Publish {
Expand Down

0 comments on commit 595a6f5

Please sign in to comment.