Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nodes): pass the library path in the context #247

Merged
merged 2 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions zenoh-flow-nodes/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::sync::Arc;
use std::{path::PathBuf, sync::Arc};

use zenoh_flow_commons::{InstanceId, RuntimeId};

Expand All @@ -27,15 +27,22 @@ pub struct Context {
pub(crate) flow_name: Arc<str>,
pub(crate) instance_id: InstanceId,
pub(crate) runtime_id: RuntimeId,
pub(crate) library_path: Arc<PathBuf>,
}

impl Context {
/// Creates a new node `Context`.
pub fn new(flow_name: Arc<str>, instance_id: InstanceId, runtime_id: RuntimeId) -> Self {
pub fn new(
flow_name: Arc<str>,
instance_id: InstanceId,
runtime_id: RuntimeId,
library_path: Arc<PathBuf>,
) -> Self {
Self {
flow_name,
instance_id,
runtime_id,
library_path,
}
}

Expand All @@ -58,4 +65,11 @@ impl Context {
pub fn runtime_id(&self) -> &RuntimeId {
&self.runtime_id
}

/// Returns the path of the library loaded by the Zenoh-Flow runtime.
///
/// The path is local to the machine where the Zenoh-Flow runtime is running.
pub fn library_path(&self) -> &PathBuf {
&self.library_path
}
}
53 changes: 31 additions & 22 deletions zenoh-flow-runtime/src/loader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub(crate) fn try_get_constructor<N>(
#[derive(Default)]
pub(crate) struct Loader {
pub(crate) extensions: Extensions,
pub(crate) libraries: HashMap<Url, Arc<Library>>,
pub(crate) libraries: HashMap<Url, (Arc<PathBuf>, Arc<Library>)>,
}

impl Deref for Loader {
Expand Down Expand Up @@ -191,7 +191,7 @@ impl Loader {
pub(crate) fn remove_unused_libraries(&mut self) {
let number_libraries = self.libraries.len();
self.libraries
.retain(|_, library| Arc::strong_count(library) > 1);
.retain(|_, (_, library)| Arc::strong_count(library) > 1);
tracing::trace!(
"Removed {} unused libraries.",
number_libraries - self.libraries.len()
Expand All @@ -217,12 +217,13 @@ impl Loader {
&mut self,
url: &Url,
node_symbol: &NodeSymbol,
) -> Result<(C, Arc<Library>)> {
if let Some(library) = self.libraries.get(url) {
return try_get_constructor(library.clone(), node_symbol);
) -> Result<(C, Arc<PathBuf>, Arc<Library>)> {
if let Some((path, library)) = self.libraries.get(url) {
let (constructor, library) = try_get_constructor::<C>(library.clone(), node_symbol)?;
return Ok((constructor, path.clone(), library));
}

let library = Arc::new(match url.scheme() {
let (path, library) = match url.scheme() {
"file" => self
.try_load_library_from_uri(url.path(), node_symbol)
.context(format!("Failed to load library from file:\n{}", url.path()))?,
Expand All @@ -231,12 +232,13 @@ impl Loader {
url.scheme(),
url
),
});
};

let (constructor, library) = try_get_constructor::<C>(library, node_symbol)?;
self.libraries.insert(url.clone(), library.clone());
self.libraries
.insert(url.clone(), (path.clone(), library.clone()));

Ok((constructor, library))
Ok((constructor, path, library))
}

/// Given the string representation of a path, attempts to load a library.
Expand All @@ -253,43 +255,50 @@ impl Loader {
&self,
path: &str,
node_symbol: &NodeSymbol,
) -> Result<Library> {
let path_buf = PathBuf::from_str(path)
) -> Result<(Arc<PathBuf>, Arc<Library>)> {
let library_path = PathBuf::from_str(path)
.context(format!("Failed to convert path to a `PathBuf`:\n{}", path))?;

let library_path = match path_buf.extension().and_then(|ext| ext.to_str()) {
// The `rust_library_path`, exposing the symbols Zenoh-Flow will look for, is not always the same as the
// `library_path`!
//
// For instance, for the Python extension:
// - the `library_path` points to the location of the Python script,
// - the `rust_library_path` points to the location of the Rust shared library that will then load the Python
// script.
let rust_library_path = match library_path.extension().and_then(|ext| ext.to_str()) {
Some(extension) => {
if extension == std::env::consts::DLL_EXTENSION {
&path_buf
&library_path
} else {
self.extensions
.get_library_path(extension, node_symbol)
.ok_or_else(|| {
anyhow!(
"Cannot load library, no extension found for files of type < {} > :\n{}",
extension,
path_buf.display()
library_path.display()
)
})?
}
}
None => bail!(
"Cannot load library, missing file extension:\n{}",
path_buf.display()
library_path.display()
),
};

let library_path = std::fs::canonicalize(library_path).context(format!(
let rust_library_path = std::fs::canonicalize(rust_library_path).context(format!(
"Failed to canonicalize path (did you put an absolute path?):\n{}",
path_buf.display()
library_path.display()
))?;

#[cfg(any(target_family = "unix", target_family = "windows"))]
Ok(unsafe {
Library::new(&library_path).context(format!(
Ok((Arc::new(library_path), unsafe {
Arc::new(Library::new(&rust_library_path).context(format!(
"libloading::Library::new failed:\n{}",
library_path.display()
))?
})
rust_library_path.display()
))?)
}))
}
}
65 changes: 35 additions & 30 deletions zenoh-flow-runtime/src/runtime/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::InstanceState;
use crate::{instance::DataFlowInstance, runners::Runner};

use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::{bail, Context as _};
Expand Down Expand Up @@ -115,41 +116,24 @@ impl Runtime {
Err(e) => break 'load Err(e),
};

let context = Context::new(
data_flow.name().clone(),
data_flow.instance_id().clone(),
self.runtime_id.clone(),
);

runners.extend(
match self
.try_load_operators(data_flow, &mut channels, context.clone())
.await
{
match self.try_load_operators(data_flow, &mut channels).await {
Ok(operators) => operators,
Err(e) => break 'load Err(e),
},
);

runners.extend(
match self
.try_load_sources(data_flow, &mut channels, context.clone())
.await
{
match self.try_load_sources(data_flow, &mut channels).await {
Ok(sources) => sources,
Err(e) => break 'load Err(e),
},
);

runners.extend(
match self
.try_load_sinks(data_flow, &mut channels, context.clone())
.await
{
Ok(sinks) => sinks,
Err(e) => break 'load Err(e),
},
);
runners.extend(match self.try_load_sinks(data_flow, &mut channels).await {
Ok(sinks) => sinks,
Err(e) => break 'load Err(e),
});

#[cfg(feature = "zenoh")]
{
Expand Down Expand Up @@ -263,7 +247,6 @@ The problematic link is:
&self,
record: &DataFlowRecord,
channels: &mut Channels,
context: Context,
) -> Result<HashMap<NodeId, Runner>> {
let mut runners = HashMap::default();
let assigned_nodes = match record.mapping().get(&self.runtime_id) {
Expand All @@ -284,9 +267,17 @@ The channels for the Inputs and Outputs of Operator < {} > were not created.
&operator_id
))?;

let (constructor, library) = self
let (constructor, path, library) = self
.try_load_constructor::<OperatorFn>(&operator.library, &NodeSymbol::Operator)
.await?;

let context = Context::new(
record.name().clone(),
record.instance_id().clone(),
self.runtime_id.clone(),
path,
);

let operator_node = (constructor)(
context.clone(),
operator.configuration.clone(),
Expand Down Expand Up @@ -319,7 +310,6 @@ The channels for the Inputs and Outputs of Operator < {} > were not created.
&self,
record: &DataFlowRecord,
channels: &mut Channels,
context: Context,
) -> Result<HashMap<NodeId, Runner>> {
let mut runners = HashMap::default();
let assigned_nodes = match record.mapping().get(&self.runtime_id) {
Expand All @@ -342,9 +332,17 @@ The channels for the Outputs of Source < {} > were not created.

let runner = match &source.source {
SourceVariant::Library(uri) => {
let (constructor, library) = self
let (constructor, path, library) = self
.try_load_constructor::<SourceFn>(uri, &NodeSymbol::Source)
.await?;

let context = Context::new(
record.name().clone(),
record.instance_id().clone(),
self.runtime_id.clone(),
path,
);

let source_node =
(constructor)(context.clone(), source.configuration.clone(), outputs)
.await?;
Expand Down Expand Up @@ -391,7 +389,6 @@ Maybe change the features in the Cargo.toml?
&self,
record: &DataFlowRecord,
channels: &mut Channels,
context: Context,
) -> Result<HashMap<NodeId, Runner>> {
let mut runners = HashMap::default();
let assigned_nodes = match record.mapping().get(&self.runtime_id) {
Expand All @@ -414,9 +411,17 @@ The channels for the Inputs of Sink < {} > were not created.

let runner = match &sink.sink {
SinkVariant::Library(uri) => {
let (constructor, library) = self
let (constructor, library_path, library) = self
.try_load_constructor::<SinkFn>(uri, &NodeSymbol::Sink)
.await?;

let context = Context::new(
record.name().clone(),
record.instance_id().clone(),
self.runtime_id.clone(),
library_path,
);

let sink_node =
(constructor)(context.clone(), sink.configuration.clone(), inputs).await?;

Expand Down Expand Up @@ -569,7 +574,7 @@ The channels for the Inputs of Connector Sender < {} > were not created.
&self,
url: &Url,
node_symbol: &NodeSymbol,
) -> Result<(C, Arc<Library>)> {
) -> Result<(C, Arc<PathBuf>, Arc<Library>)> {
let mut loader_write_guard = self.loader.lock().await;
loader_write_guard.try_load_constructor::<C>(url, node_symbol)
}
Expand Down