Skip to content

Commit

Permalink
refacto: disable shared-memory feature by default (#182)
Browse files Browse the repository at this point in the history
The current implementation is not as optimised as it deserves to be. In short,
if the shared memory is enabled, then every time Zenoh-Flow will send data
through Zenoh (be it between runtimes on the same host or on different hosts) it
will first attempt to send it through shared memory.

As it is now, sending data through shared memory requires a copy as the data
does not reside inside a shared memory buffer (it must be copied from the
programs memory to the shared memory).

In cases where there is actually a shared memory buffer, this could still lead
to better performance. However, as the current design enables it *everywhere*,
if there is no shared memory then the data is copied twice: (i) from the
programs memory to the shared memory and (ii) from the shared memory to a Zenoh
buffer to be sent on the network.

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet authored Feb 14, 2024
1 parent e16e7f2 commit fd95d28
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 24 deletions.
4 changes: 4 additions & 0 deletions zenoh-flow-descriptors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,9 @@ uuid = { workspace = true }
zenoh-flow-commons = { workspace = true }
zenoh-keyexpr = { workspace = true }

[features]
default = []
shared-memory = []

[dev-dependencies]
uuid = { workspace = true }
22 changes: 11 additions & 11 deletions zenoh-flow-descriptors/src/flattened/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,20 +227,20 @@ fn test_flatten_descriptor() {

let expected_links = vec![
// source 1 -> operator 1 -> sink 1
LinkDescriptor::new_no_shm(
LinkDescriptor::new(
OutputDescriptor::new("source-1", "source-out"),
InputDescriptor::new("operator-1", "operator-in"),
),
LinkDescriptor::new_no_shm(
LinkDescriptor::new(
OutputDescriptor::new("operator-1", "operator-out"),
InputDescriptor::new("sink-1", "sink-in"),
),
// source 2 -> operator 2 -> sink 2
LinkDescriptor::new_no_shm(
LinkDescriptor::new(
OutputDescriptor::new("source-2", "source-out"),
InputDescriptor::new("operator-2", "operator-in"),
),
LinkDescriptor::new_no_shm(
LinkDescriptor::new(
OutputDescriptor::new("operator-2", "operator-out"),
InputDescriptor::new("sink-2", "sink-in"),
),
Expand All @@ -252,26 +252,26 @@ fn test_flatten_descriptor() {
* Hence, the name of the input ports of "sub-operator-1" (operator-composite.yml) are
* replaced by what is declared in "data-flow.yml".
*/
LinkDescriptor::new_no_shm(
LinkDescriptor::new(
OutputDescriptor::new("source-composite", "source-composite-out-1"),
InputDescriptor::new("operator-composite>sub-operator-1", "sub-operator-1-in-1"),
),
LinkDescriptor::new_no_shm(
LinkDescriptor::new(
OutputDescriptor::new("source-composite", "source-composite-out-2"),
InputDescriptor::new("operator-composite>sub-operator-1", "sub-operator-1-in-2"),
),
// operator-composite-sub-2 -> sink-composite
LinkDescriptor::new_no_shm(
LinkDescriptor::new(
OutputDescriptor::new("operator-composite>sub-operator-2", "sub-operator-2-out-1"),
InputDescriptor::new("sink-composite", "sink-composite-in-1"),
),
LinkDescriptor::new_no_shm(
LinkDescriptor::new(
OutputDescriptor::new("operator-composite>sub-operator-2", "sub-operator-2-out-2"),
InputDescriptor::new("sink-composite", "sink-composite-in-2"),
),
// operator-composite-sub-operator-1 ->
// operator-composite-sub-operator-composite-sub-sub-operator-1
LinkDescriptor::new_no_shm(
LinkDescriptor::new(
OutputDescriptor::new("operator-composite>sub-operator-1", "sub-operator-1-out"),
InputDescriptor::new(
"operator-composite>sub-operator-composite>sub-sub-operator-1",
Expand All @@ -280,7 +280,7 @@ fn test_flatten_descriptor() {
),
// operator-composite-sub-operator-composite-sub-sub-operator-2 ->
// operator-composite-sub-operator-2
LinkDescriptor::new_no_shm(
LinkDescriptor::new(
OutputDescriptor::new(
"operator-composite>sub-operator-composite>sub-sub-operator-2",
"sub-sub-operator-2-out",
Expand All @@ -289,7 +289,7 @@ fn test_flatten_descriptor() {
),
// operator-composite-sub-operator-composite-sub-sub-operator-1 ->
// operator-composite-sub-operator-composite-sub-sub-operator-2
LinkDescriptor::new_no_shm(
LinkDescriptor::new(
OutputDescriptor::new(
"operator-composite>sub-operator-composite>sub-sub-operator-1",
"sub-sub-operator-1-out",
Expand Down
24 changes: 11 additions & 13 deletions zenoh-flow-descriptors/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

use serde::{Deserialize, Serialize};
use std::fmt;
use zenoh_flow_commons::{NodeId, PortId, SharedMemoryConfiguration};
#[cfg(feature = "shared-memory")]
use zenoh_flow_commons::SharedMemoryConfiguration;
use zenoh_flow_commons::{NodeId, PortId};

/// An `InputDescriptor` describes an Input port of a Zenoh-Flow node.
///
Expand Down Expand Up @@ -86,6 +88,7 @@ impl OutputDescriptor {
pub struct LinkDescriptor {
pub from: OutputDescriptor,
pub to: InputDescriptor,
#[cfg(feature = "shared-memory")]
#[serde(default, alias = "shm", alias = "shared-memory")]
pub shared_memory: Option<SharedMemoryConfiguration>,
}
Expand All @@ -97,23 +100,18 @@ impl std::fmt::Display for LinkDescriptor {
}

impl LinkDescriptor {
pub fn new(
from: OutputDescriptor,
to: InputDescriptor,
shm: SharedMemoryConfiguration,
) -> Self {
pub fn new(from: OutputDescriptor, to: InputDescriptor) -> Self {
Self {
from,
to,
shared_memory: Some(shm),
#[cfg(feature = "shared-memory")]
shared_memory: None,
}
}

pub fn new_no_shm(from: OutputDescriptor, to: InputDescriptor) -> Self {
Self {
from,
to,
shared_memory: None,
}
#[cfg(feature = "shared-memory")]
pub fn set_shared_memory(mut self, shm: SharedMemoryConfiguration) -> Self {
self.shared_memory = Some(shm);
self
}
}
4 changes: 4 additions & 0 deletions zenoh-flow-records/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ zenoh-flow-commons = { workspace = true }
zenoh-flow-descriptors = { workspace = true }
zenoh-keyexpr = { workspace = true }

[features]
default = []
shared-memory = []

[dev-dependencies]
serde_json = { workspace = true }
serde_yaml = { workspace = true }
1 change: 1 addition & 0 deletions zenoh-flow-records/src/dataflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ Caused by:
additional_links.push(LinkDescriptor {
from: output,
to: input,
#[cfg(feature = "shared-memory")]
shared_memory: link.shared_memory,
});

Expand Down
4 changes: 4 additions & 0 deletions zenoh-flow-records/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ mapping:
node: sender_thing_edge.clone(),
input: key_expr_thing_edge.to_string().into(),
},
#[cfg(feature = "shared-memory")]
shared_memory: None,
};
assert!(record.links.contains(&link_thing));
Expand All @@ -280,6 +281,7 @@ mapping:
node: "operator-1".into(),
input: "in-1".into(),
},
#[cfg(feature = "shared-memory")]
shared_memory: None,
};
assert!(record.links.contains(&link_egde_1));
Expand All @@ -293,6 +295,7 @@ mapping:
node: sender_edge_default.clone(),
input: key_expr_edge_default.to_string().into(),
},
#[cfg(feature = "shared-memory")]
shared_memory: None,
};
assert!(record.links.contains(&link_edge_2));
Expand All @@ -306,6 +309,7 @@ mapping:
node: "sink-2".into(),
input: "in-2".into(),
},
#[cfg(feature = "shared-memory")]
shared_memory: None,
};
assert!(record.links.contains(&link_default));
Expand Down

0 comments on commit fd95d28

Please sign in to comment.