diff --git a/zenoh-flow-nodes/src/context.rs b/zenoh-flow-nodes/src/context.rs index 1c37b2fa..b8136882 100644 --- a/zenoh-flow-nodes/src/context.rs +++ b/zenoh-flow-nodes/src/context.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // -use std::sync::Arc; +use std::{path::PathBuf, sync::Arc}; use zenoh_flow_commons::{InstanceId, RuntimeId}; @@ -27,15 +27,22 @@ pub struct Context { pub(crate) flow_name: Arc, pub(crate) instance_id: InstanceId, pub(crate) runtime_id: RuntimeId, + pub(crate) library_path: Arc, } impl Context { /// Creates a new node `Context`. - pub fn new(flow_name: Arc, instance_id: InstanceId, runtime_id: RuntimeId) -> Self { + pub fn new( + flow_name: Arc, + instance_id: InstanceId, + runtime_id: RuntimeId, + library_path: Arc, + ) -> Self { Self { flow_name, instance_id, runtime_id, + library_path, } } @@ -58,4 +65,11 @@ impl Context { pub fn runtime_id(&self) -> &RuntimeId { &self.runtime_id } + + /// Returns the path of the library loaded by the Zenoh-Flow runtime. + /// + /// The path is local to the machine where the Zenoh-Flow runtime is running. + pub fn library_path(&self) -> &PathBuf { + &self.library_path + } } diff --git a/zenoh-flow-runtime/src/loader/mod.rs b/zenoh-flow-runtime/src/loader/mod.rs index 7e746b22..30618eee 100644 --- a/zenoh-flow-runtime/src/loader/mod.rs +++ b/zenoh-flow-runtime/src/loader/mod.rs @@ -142,7 +142,7 @@ pub(crate) fn try_get_constructor( #[derive(Default)] pub(crate) struct Loader { pub(crate) extensions: Extensions, - pub(crate) libraries: HashMap>, + pub(crate) libraries: HashMap, Arc)>, } impl Deref for Loader { @@ -191,7 +191,7 @@ impl Loader { pub(crate) fn remove_unused_libraries(&mut self) { let number_libraries = self.libraries.len(); self.libraries - .retain(|_, library| Arc::strong_count(library) > 1); + .retain(|_, (_, library)| Arc::strong_count(library) > 1); tracing::trace!( "Removed {} unused libraries.", number_libraries - self.libraries.len() @@ -217,12 +217,13 @@ impl Loader { &mut self, url: &Url, node_symbol: &NodeSymbol, - ) -> Result<(C, Arc)> { - if let Some(library) = self.libraries.get(url) { - return try_get_constructor(library.clone(), node_symbol); + ) -> Result<(C, Arc, Arc)> { + if let Some((path, library)) = self.libraries.get(url) { + let (constructor, library) = try_get_constructor::(library.clone(), node_symbol)?; + return Ok((constructor, path.clone(), library)); } - let library = Arc::new(match url.scheme() { + let (path, library) = match url.scheme() { "file" => self .try_load_library_from_uri(url.path(), node_symbol) .context(format!("Failed to load library from file:\n{}", url.path()))?, @@ -231,12 +232,13 @@ impl Loader { url.scheme(), url ), - }); + }; let (constructor, library) = try_get_constructor::(library, node_symbol)?; - self.libraries.insert(url.clone(), library.clone()); + self.libraries + .insert(url.clone(), (path.clone(), library.clone())); - Ok((constructor, library)) + Ok((constructor, path, library)) } /// Given the string representation of a path, attempts to load a library. @@ -253,14 +255,21 @@ impl Loader { &self, path: &str, node_symbol: &NodeSymbol, - ) -> Result { - let path_buf = PathBuf::from_str(path) + ) -> Result<(Arc, Arc)> { + let library_path = PathBuf::from_str(path) .context(format!("Failed to convert path to a `PathBuf`:\n{}", path))?; - let library_path = match path_buf.extension().and_then(|ext| ext.to_str()) { + // The `rust_library_path`, exposing the symbols Zenoh-Flow will look for, is not always the same as the + // `library_path`! + // + // For instance, for the Python extension: + // - the `library_path` points to the location of the Python script, + // - the `rust_library_path` points to the location of the Rust shared library that will then load the Python + // script. + let rust_library_path = match library_path.extension().and_then(|ext| ext.to_str()) { Some(extension) => { if extension == std::env::consts::DLL_EXTENSION { - &path_buf + &library_path } else { self.extensions .get_library_path(extension, node_symbol) @@ -268,28 +277,28 @@ impl Loader { anyhow!( "Cannot load library, no extension found for files of type < {} > :\n{}", extension, - path_buf.display() + library_path.display() ) })? } } None => bail!( "Cannot load library, missing file extension:\n{}", - path_buf.display() + library_path.display() ), }; - let library_path = std::fs::canonicalize(library_path).context(format!( + let rust_library_path = std::fs::canonicalize(rust_library_path).context(format!( "Failed to canonicalize path (did you put an absolute path?):\n{}", - path_buf.display() + library_path.display() ))?; #[cfg(any(target_family = "unix", target_family = "windows"))] - Ok(unsafe { - Library::new(&library_path).context(format!( + Ok((Arc::new(library_path), unsafe { + Arc::new(Library::new(&rust_library_path).context(format!( "libloading::Library::new failed:\n{}", - library_path.display() - ))? - }) + rust_library_path.display() + ))?) + })) } } diff --git a/zenoh-flow-runtime/src/runtime/load.rs b/zenoh-flow-runtime/src/runtime/load.rs index 9e201021..f6670c9f 100644 --- a/zenoh-flow-runtime/src/runtime/load.rs +++ b/zenoh-flow-runtime/src/runtime/load.rs @@ -33,6 +33,7 @@ use crate::InstanceState; use crate::{instance::DataFlowInstance, runners::Runner}; use std::collections::HashMap; +use std::path::PathBuf; use std::sync::Arc; use anyhow::{bail, Context as _}; @@ -115,41 +116,24 @@ impl Runtime { Err(e) => break 'load Err(e), }; - let context = Context::new( - data_flow.name().clone(), - data_flow.instance_id().clone(), - self.runtime_id.clone(), - ); - runners.extend( - match self - .try_load_operators(data_flow, &mut channels, context.clone()) - .await - { + match self.try_load_operators(data_flow, &mut channels).await { Ok(operators) => operators, Err(e) => break 'load Err(e), }, ); runners.extend( - match self - .try_load_sources(data_flow, &mut channels, context.clone()) - .await - { + match self.try_load_sources(data_flow, &mut channels).await { Ok(sources) => sources, Err(e) => break 'load Err(e), }, ); - runners.extend( - match self - .try_load_sinks(data_flow, &mut channels, context.clone()) - .await - { - Ok(sinks) => sinks, - Err(e) => break 'load Err(e), - }, - ); + runners.extend(match self.try_load_sinks(data_flow, &mut channels).await { + Ok(sinks) => sinks, + Err(e) => break 'load Err(e), + }); #[cfg(feature = "zenoh")] { @@ -263,7 +247,6 @@ The problematic link is: &self, record: &DataFlowRecord, channels: &mut Channels, - context: Context, ) -> Result> { let mut runners = HashMap::default(); let assigned_nodes = match record.mapping().get(&self.runtime_id) { @@ -284,9 +267,17 @@ The channels for the Inputs and Outputs of Operator < {} > were not created. &operator_id ))?; - let (constructor, library) = self + let (constructor, path, library) = self .try_load_constructor::(&operator.library, &NodeSymbol::Operator) .await?; + + let context = Context::new( + record.name().clone(), + record.instance_id().clone(), + self.runtime_id.clone(), + path, + ); + let operator_node = (constructor)( context.clone(), operator.configuration.clone(), @@ -319,7 +310,6 @@ The channels for the Inputs and Outputs of Operator < {} > were not created. &self, record: &DataFlowRecord, channels: &mut Channels, - context: Context, ) -> Result> { let mut runners = HashMap::default(); let assigned_nodes = match record.mapping().get(&self.runtime_id) { @@ -342,9 +332,17 @@ The channels for the Outputs of Source < {} > were not created. let runner = match &source.source { SourceVariant::Library(uri) => { - let (constructor, library) = self + let (constructor, path, library) = self .try_load_constructor::(uri, &NodeSymbol::Source) .await?; + + let context = Context::new( + record.name().clone(), + record.instance_id().clone(), + self.runtime_id.clone(), + path, + ); + let source_node = (constructor)(context.clone(), source.configuration.clone(), outputs) .await?; @@ -391,7 +389,6 @@ Maybe change the features in the Cargo.toml? &self, record: &DataFlowRecord, channels: &mut Channels, - context: Context, ) -> Result> { let mut runners = HashMap::default(); let assigned_nodes = match record.mapping().get(&self.runtime_id) { @@ -414,9 +411,17 @@ The channels for the Inputs of Sink < {} > were not created. let runner = match &sink.sink { SinkVariant::Library(uri) => { - let (constructor, library) = self + let (constructor, library_path, library) = self .try_load_constructor::(uri, &NodeSymbol::Sink) .await?; + + let context = Context::new( + record.name().clone(), + record.instance_id().clone(), + self.runtime_id.clone(), + library_path, + ); + let sink_node = (constructor)(context.clone(), sink.configuration.clone(), inputs).await?; @@ -569,7 +574,7 @@ The channels for the Inputs of Connector Sender < {} > were not created. &self, url: &Url, node_symbol: &NodeSymbol, - ) -> Result<(C, Arc)> { + ) -> Result<(C, Arc, Arc)> { let mut loader_write_guard = self.loader.lock().await; loader_write_guard.try_load_constructor::(url, node_symbol) }