Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add start/stop extension markers for fragment chain #1597

Merged
merged 11 commits into from
Dec 5, 2024
38 changes: 36 additions & 2 deletions commons/zenoh-codec/src/transport/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ where
more,
sn,
ext_qos,
ext_start,
ext_stop,
} = x;

// Header
Expand All @@ -49,7 +51,10 @@ where
if *more {
header |= flag::M;
}
if ext_qos != &ext::QoSType::DEFAULT {
let mut n_exts = (ext_qos != &ext::QoSType::DEFAULT) as u8
+ ext_start.is_some() as u8
+ ext_stop.is_some() as u8;
if n_exts != 0 {
header |= flag::Z;
}
self.write(&mut *writer, header)?;
Expand All @@ -59,7 +64,16 @@ where

// Extensions
if ext_qos != &ext::QoSType::DEFAULT {
self.write(&mut *writer, (*ext_qos, false))?;
n_exts -= 1;
self.write(&mut *writer, (*ext_qos, n_exts != 0))?;
}
if let Some(start) = ext_start {
n_exts -= 1;
self.write(&mut *writer, (start, n_exts != 0))?
}
if let Some(stop) = ext_stop {
n_exts -= 1;
self.write(&mut *writer, (stop, n_exts != 0))?
}

Ok(())
Expand Down Expand Up @@ -99,6 +113,8 @@ where

// Extensions
let mut ext_qos = ext::QoSType::DEFAULT;
let mut ext_start = None;
let mut ext_stop = None;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand All @@ -110,6 +126,16 @@ where
ext_qos = q;
has_ext = ext;
}
ext::Start::ID => {
let (start, ext): (ext::Start, bool) = eodec.read(&mut *reader)?;
ext_start = Some(start);
has_ext = ext;
}
ext::Stop::ID => {
let (stop, ext): (ext::Stop, bool) = eodec.read(&mut *reader)?;
ext_stop = Some(stop);
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "Fragment", ext)?;
}
Expand All @@ -121,6 +147,8 @@ where
more,
sn,
ext_qos,
ext_start,
ext_stop,
})
}
}
Expand All @@ -139,6 +167,8 @@ where
sn,
payload,
ext_qos,
ext_start,
ext_stop,
} = x;

// Header
Expand All @@ -147,6 +177,8 @@ where
more: *more,
sn: *sn,
ext_qos: *ext_qos,
ext_start: *ext_start,
ext_stop: *ext_stop,
};
self.write(&mut *writer, &header)?;

Expand Down Expand Up @@ -185,6 +217,8 @@ where
more: header.more,
sn: header.sn,
ext_qos: header.ext_qos,
ext_start: header.ext_start,
ext_stop: header.ext_stop,
payload,
})
}
Expand Down
30 changes: 28 additions & 2 deletions commons/zenoh-codec/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
} = x;

// Header
Expand All @@ -64,7 +65,8 @@ where
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
+ (ext_compression.is_some() as u8);
+ (ext_compression.is_some() as u8)
+ (*ext_patch != ext::PatchType::NONE) as u8;

#[cfg(feature = "shared-memory")]
{
Expand Down Expand Up @@ -125,6 +127,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (compression, n_exts != 0))?;
}
if *ext_patch != ext::PatchType::NONE {
n_exts -= 1;
self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -186,6 +192,7 @@ where
let mut ext_mlink = None;
let mut ext_lowlatency = None;
let mut ext_compression = None;
let mut ext_patch = ext::PatchType::NONE;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand Down Expand Up @@ -228,6 +235,11 @@ where
ext_compression = Some(q);
has_ext = ext;
}
ext::Patch::ID => {
let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
ext_patch = p;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "InitSyn", ext)?;
}
Expand All @@ -248,6 +260,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
})
}
}
Expand Down Expand Up @@ -275,6 +288,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
} = x;

// Header
Expand All @@ -287,7 +301,8 @@ where
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
+ (ext_compression.is_some() as u8);
+ (ext_compression.is_some() as u8)
+ (*ext_patch != ext::PatchType::NONE) as u8;

#[cfg(feature = "shared-memory")]
{
Expand Down Expand Up @@ -351,6 +366,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (compression, n_exts != 0))?;
}
if *ext_patch != ext::PatchType::NONE {
n_exts -= 1;
self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -415,6 +434,7 @@ where
let mut ext_mlink = None;
let mut ext_lowlatency = None;
let mut ext_compression = None;
let mut ext_patch = ext::PatchType::NONE;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand Down Expand Up @@ -457,6 +477,11 @@ where
ext_compression = Some(q);
has_ext = ext;
}
ext::Patch::ID => {
let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
ext_patch = p;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "InitAck", ext)?;
}
Expand All @@ -478,6 +503,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
})
}
}
16 changes: 15 additions & 1 deletion commons/zenoh-codec/src/transport/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ where
next_sn,
ext_qos,
ext_shm,
ext_patch,
} = x;

// Header
Expand All @@ -160,7 +161,9 @@ where
if resolution != &Resolution::default() || batch_size != &batch_size::MULTICAST {
header |= flag::S;
}
let mut n_exts = (ext_qos.is_some() as u8) + (ext_shm.is_some() as u8);
let mut n_exts = (ext_qos.is_some() as u8)
+ (ext_shm.is_some() as u8)
+ (*ext_patch != ext::PatchType::NONE) as u8;
if n_exts != 0 {
header |= flag::Z;
}
Expand Down Expand Up @@ -201,6 +204,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (shm, n_exts != 0))?;
}
if *ext_patch != ext::PatchType::NONE {
n_exts -= 1;
self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -264,6 +271,7 @@ where
// Extensions
let mut ext_qos = None;
let mut ext_shm = None;
let mut ext_patch = ext::PatchType::NONE;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand All @@ -280,6 +288,11 @@ where
ext_shm = Some(s);
has_ext = ext;
}
ext::Patch::ID => {
let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
ext_patch = p;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "Join", ext)?;
}
Expand All @@ -296,6 +309,7 @@ where
next_sn,
ext_qos,
ext_shm,
ext_patch,
})
}
}
40 changes: 40 additions & 0 deletions commons/zenoh-codec/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,43 @@ where
Ok((ext.into(), more))
}
}

// Extensions: Patch
impl<W, const ID: u8> WCodec<(ext::PatchType<{ ID }>, bool), &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: (ext::PatchType<{ ID }>, bool)) -> Self::Output {
let (x, more) = x;
let ext: ZExtZ64<{ ID }> = x.into();

self.write(&mut *writer, (&ext, more))
}
}

impl<R, const ID: u8> RCodec<(ext::PatchType<{ ID }>, bool), &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<(ext::PatchType<{ ID }>, bool), Self::Error> {
let header: u8 = self.read(&mut *reader)?;
let codec = Zenoh080Header::new(header);
codec.read(reader)
}
}

impl<R, const ID: u8> RCodec<(ext::PatchType<{ ID }>, bool), &mut R> for Zenoh080Header
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<(ext::PatchType<{ ID }>, bool), Self::Error> {
let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
Ok((ext.into(), more))
}
}
24 changes: 23 additions & 1 deletion commons/zenoh-protocol/src/transport/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,26 @@ pub struct Fragment {
pub sn: TransportSn,
pub payload: ZSlice,
pub ext_qos: ext::QoSType,
pub ext_start: Option<ext::Start>,
pub ext_stop: Option<ext::Stop>,
}

// Extensions
pub mod ext {
use crate::{common::ZExtZ64, zextz64};
use crate::{
common::{ZExtUnit, ZExtZ64},
zextunit, zextz64,
};

pub type QoS = zextz64!(0x1, true);
pub type QoSType = crate::transport::ext::QoSType<{ QoS::ID }>;

/// # Start extension
/// Mark the first fragment of a fragmented message
pub type Start = zextunit!(0x2, false);
/// # Stop extension
/// Indicate that the remaining fragments has been dropped
Mallets marked this conversation as resolved.
Show resolved Hide resolved
pub type Stop = zextunit!(0x3, false);
}

impl Fragment {
Expand All @@ -97,13 +109,17 @@ impl Fragment {
let sn: TransportSn = rng.gen();
let payload = ZSlice::rand(rng.gen_range(8..128));
let ext_qos = ext::QoSType::rand();
let ext_start = rng.gen_bool(0.5).then(ext::Start::rand);
let ext_stop = rng.gen_bool(0.5).then(ext::Stop::rand);

Fragment {
reliability,
sn,
more,
payload,
ext_qos,
ext_start,
ext_stop,
}
}
}
Expand All @@ -115,6 +131,8 @@ pub struct FragmentHeader {
pub more: bool,
pub sn: TransportSn,
pub ext_qos: ext::QoSType,
pub ext_start: Option<ext::Start>,
pub ext_stop: Option<ext::Stop>,
}

impl FragmentHeader {
Expand All @@ -128,12 +146,16 @@ impl FragmentHeader {
let more = rng.gen_bool(0.5);
let sn: TransportSn = rng.gen();
let ext_qos = ext::QoSType::rand();
let ext_start = rng.gen_bool(0.5).then(ext::Start::rand);
let ext_stop = rng.gen_bool(0.5).then(ext::Stop::rand);

FragmentHeader {
reliability,
more,
sn,
ext_qos,
ext_start,
ext_stop,
}
}
}
Loading