Skip to content

Commit 5eecd89

Browse files
committed
feat(rust): rework Sessions
1 parent 003d047 commit 5eecd89

40 files changed

+3071
-1983
lines changed

implementations/rust/ockam/ockam_api/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub mod minicbor_url;
3333
pub mod nodes;
3434
pub mod okta;
3535
pub mod port_range;
36+
pub mod session;
3637
pub mod uppercase;
3738
mod version;
3839

@@ -41,7 +42,6 @@ mod influxdb_token_lease;
4142

4243
pub mod logs;
4344
mod schema;
44-
mod session;
4545

4646
mod date;
4747
mod rendezvous_healthcheck;
@@ -54,7 +54,7 @@ pub use error::*;
5454
pub use influxdb_token_lease::*;
5555
pub use nodes::service::default_address::*;
5656
pub use rendezvous_healthcheck::*;
57-
pub use session::sessions::ConnectionStatus;
57+
pub use session::connection_status::ConnectionStatus;
5858
pub use ui::*;
5959
pub use util::*;
6060
pub use version::*;

implementations/rust/ockam/ockam_api/src/nodes/connection/mod.rs

+10-11
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ pub(crate) use plain_tcp::PlainTcpInstantiator;
1919
pub(crate) use project::ProjectInstantiator;
2020
pub(crate) use secure::SecureChannelInstantiator;
2121
use std::fmt::{Debug, Formatter};
22-
use std::sync::Arc;
2322

2423
#[derive(Clone)]
2524
pub struct Connection {
@@ -43,18 +42,18 @@ pub struct Connection {
4342

4443
impl Connection {
4544
/// Shorthand to add the address as consumer to the flow control
46-
pub fn add_consumer(&self, context: Arc<Context>, address: &Address) {
45+
pub fn add_consumer(&self, context: &Context, address: &Address) {
4746
if let Some(flow_control_id) = &self.flow_control_id {
4847
context
4948
.flow_controls()
5049
.add_consumer(address.clone(), flow_control_id);
5150
}
5251
}
5352

54-
pub fn add_default_consumers(&self, ctx: Arc<Context>) {
55-
self.add_consumer(ctx.clone(), &DefaultAddress::KEY_EXCHANGER_LISTENER.into());
56-
self.add_consumer(ctx.clone(), &DefaultAddress::SECURE_CHANNEL_LISTENER.into());
57-
self.add_consumer(ctx.clone(), &DefaultAddress::UPPERCASE_SERVICE.into());
53+
pub fn add_default_consumers(&self, ctx: &Context) {
54+
self.add_consumer(ctx, &DefaultAddress::KEY_EXCHANGER_LISTENER.into());
55+
self.add_consumer(ctx, &DefaultAddress::SECURE_CHANNEL_LISTENER.into());
56+
self.add_consumer(ctx, &DefaultAddress::UPPERCASE_SERVICE.into());
5857
self.add_consumer(ctx, &DefaultAddress::ECHO_SERVICE.into());
5958
}
6059

@@ -182,7 +181,7 @@ pub trait Instantiator: Send + Sync + 'static {
182181
/// The returned [`Changes`] will be used to update the builder state.
183182
async fn instantiate(
184183
&self,
185-
ctx: Arc<Context>,
184+
ctx: &Context,
186185
node_manager: &NodeManager,
187186
transport_route: Route,
188187
extracted: (MultiAddr, MultiAddr, MultiAddr),
@@ -217,7 +216,7 @@ impl ConnectionBuilder {
217216
/// user make sure higher protocol abstraction are called before lower level ones
218217
pub async fn instantiate(
219218
mut self,
220-
ctx: Arc<Context>,
219+
ctx: &Context,
221220
node_manager: &NodeManager,
222221
instantiator: impl Instantiator,
223222
) -> Result<Self, ockam_core::Error> {
@@ -233,14 +232,14 @@ impl ConnectionBuilder {
233232
// the transport route should include only the pieces before the match
234233
self.transport_route = self
235234
.recalculate_transport_route(
236-
&ctx,
235+
ctx,
237236
self.current_multiaddr.split(start).0,
238237
false,
239238
)
240239
.await?;
241240
let mut changes = instantiator
242241
.instantiate(
243-
ctx.clone(),
242+
ctx,
244243
node_manager,
245244
self.transport_route.clone(),
246245
self.extract(start, instantiator.matches().len()),
@@ -271,7 +270,7 @@ impl ConnectionBuilder {
271270
}
272271

273272
self.transport_route = self
274-
.recalculate_transport_route(&ctx, self.current_multiaddr.clone(), true)
273+
.recalculate_transport_route(ctx, self.current_multiaddr.clone(), true)
275274
.await?;
276275

277276
Ok(Self {

implementations/rust/ockam/ockam_api/src/nodes/connection/plain_tcp.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use crate::error::ApiError;
22
use crate::nodes::connection::{Changes, ConnectionBuilder, Instantiator};
33
use crate::{multiaddr_to_route, route_to_multiaddr};
4-
use std::sync::Arc;
54

65
use crate::nodes::NodeManager;
76
use ockam_core::{async_trait, Error, Route};
@@ -30,7 +29,7 @@ impl Instantiator for PlainTcpInstantiator {
3029

3130
async fn instantiate(
3231
&self,
33-
_ctx: Arc<Context>,
32+
_ctx: &Context,
3433
node_manager: &NodeManager,
3534
_transport_route: Route,
3635
extracted: (MultiAddr, MultiAddr, MultiAddr),

implementations/rust/ockam/ockam_api/src/nodes/connection/project.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use crate::error::ApiError;
22
use crate::nodes::connection::{Changes, Instantiator};
33
use crate::nodes::NodeManager;
44
use crate::{multiaddr_to_route, try_address_to_multiaddr};
5-
use std::sync::Arc;
65

76
use ockam_core::{async_trait, Error, Route};
87
use ockam_multiaddr::proto::Project;
@@ -36,7 +35,7 @@ impl Instantiator for ProjectInstantiator {
3635

3736
async fn instantiate(
3837
&self,
39-
ctx: Arc<Context>,
38+
ctx: &Context,
4039
node_manager: &NodeManager,
4140
_transport_route: Route,
4241
extracted: (MultiAddr, MultiAddr, MultiAddr),
@@ -66,7 +65,7 @@ impl Instantiator for ProjectInstantiator {
6665
debug!("create a secure channel to the project {project_identifier}");
6766
let sc = node_manager
6867
.create_secure_channel_internal(
69-
&ctx,
68+
ctx,
7069
tcp.route,
7170
&self.identifier.clone(),
7271
Some(vec![project_identifier]),

implementations/rust/ockam/ockam_api/src/nodes/connection/secure.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::sync::Arc;
21
use std::time::Duration;
32

43
use crate::nodes::connection::{Changes, Instantiator};
@@ -41,7 +40,7 @@ impl Instantiator for SecureChannelInstantiator {
4140

4241
async fn instantiate(
4342
&self,
44-
ctx: Arc<Context>,
43+
ctx: &Context,
4544
node_manager: &NodeManager,
4645
transport_route: Route,
4746
extracted: (MultiAddr, MultiAddr, MultiAddr),

implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::colors::color_primary;
1616
use crate::error::ApiError;
1717

1818
use crate::output::Output;
19-
use crate::session::sessions::ConnectionStatus;
19+
use crate::session::connection_status::ConnectionStatus;
2020
use crate::{route_to_multiaddr, try_address_to_multiaddr};
2121

2222
/// Request body to create an inlet

implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs

+16
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use ockam_multiaddr::MultiAddr;
1010
use crate::colors::OckamColor;
1111
use crate::error::ApiError;
1212
use crate::output::{colorize_connection_status, Output};
13+
use crate::session::replacer::ReplacerOutputKind;
14+
use crate::session::session::Session;
1315
use crate::{route_to_multiaddr, ConnectionStatus};
1416

1517
/// Request body when instructing a node to create a relay
@@ -94,6 +96,20 @@ impl RelayInfo {
9496
}
9597
}
9698

99+
pub fn from_session(session: &Session, destination_address: MultiAddr, alias: String) -> Self {
100+
let relay_info = Self::new(destination_address, alias, session.connection_status());
101+
if let Some(outcome) = session.last_outcome() {
102+
match outcome {
103+
ReplacerOutputKind::Relay(info) => relay_info.with(info),
104+
ReplacerOutputKind::Inlet(_) => {
105+
panic!("InletInfo should not be in the registry")
106+
}
107+
}
108+
} else {
109+
relay_info
110+
}
111+
}
112+
97113
pub fn with(self, remote_relay_info: RemoteRelayInfo) -> Self {
98114
Self {
99115
forwarding_route: Some(remote_relay_info.forwarding_route().to_string()),

implementations/rust/ockam/ockam_api/src/nodes/registry.rs

+8-33
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
use crate::cli_state::random_name;
2-
use crate::nodes::models::relay::RelayInfo;
3-
use crate::session::sessions::{ReplacerOutputKind, Session};
42
use crate::DefaultAddress;
3+
54
use ockam::identity::Identifier;
65
use ockam::identity::{SecureChannel, SecureChannelListener};
76
use ockam_core::compat::collections::BTreeMap;
87
use ockam_core::{Address, Route};
98
use ockam_multiaddr::MultiAddr;
10-
use ockam_node::compat::asynchronous::RwLock;
9+
use ockam_node::compat::asynchronous::{Mutex, RwLock};
1110
use ockam_transport_core::HostnamePort;
11+
12+
use crate::session::session::Session;
1213
use std::borrow::Borrow;
1314
use std::fmt::Display;
15+
use std::sync::Arc;
1416

1517
#[derive(Default)]
1618
pub(crate) struct SecureChannelRegistry {
@@ -127,15 +129,15 @@ impl KafkaServiceInfo {
127129
pub(crate) struct InletInfo {
128130
pub(crate) bind_addr: String,
129131
pub(crate) outlet_addr: MultiAddr,
130-
pub(crate) session: Session,
132+
pub(crate) session: Arc<Mutex<Session>>,
131133
}
132134

133135
impl InletInfo {
134136
pub(crate) fn new(bind_addr: &str, outlet_addr: MultiAddr, session: Session) -> Self {
135137
Self {
136138
bind_addr: bind_addr.to_owned(),
137139
outlet_addr,
138-
session,
140+
session: Arc::new(Mutex::new(session)),
139141
}
140142
}
141143
}
@@ -160,34 +162,7 @@ impl OutletInfo {
160162
pub struct RegistryRelayInfo {
161163
pub(crate) destination_address: MultiAddr,
162164
pub(crate) alias: String,
163-
pub(crate) session: Session,
164-
}
165-
166-
impl From<RegistryRelayInfo> for RelayInfo {
167-
fn from(registry_relay_info: RegistryRelayInfo) -> Self {
168-
let relay_info = RelayInfo::new(
169-
registry_relay_info.destination_address.clone(),
170-
registry_relay_info.alias.clone(),
171-
registry_relay_info.session.connection_status(),
172-
);
173-
174-
let current_relay_status =
175-
registry_relay_info
176-
.session
177-
.status()
178-
.map(|info| match info.kind {
179-
ReplacerOutputKind::Inlet(_) => {
180-
panic!("InletInfo should not be in the registry")
181-
}
182-
ReplacerOutputKind::Relay(info) => info,
183-
});
184-
185-
if let Some(current_relay_status) = current_relay_status {
186-
relay_info.with(current_relay_status)
187-
} else {
188-
relay_info
189-
}
190-
}
165+
pub(crate) session: Arc<Mutex<Session>>,
191166
}
192167

193168
#[derive(Default)]

implementations/rust/ockam/ockam_api/src/nodes/service/in_memory_node.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,14 @@ impl InMemoryNode {
179179
}
180180

181181
pub async fn stop(&self, ctx: &Context) -> Result<()> {
182-
self.medic_handle.stop_medic(ctx).await?;
182+
for session in self.registry.inlets.values().await {
183+
session.session.lock().await.stop().await;
184+
}
185+
186+
for session in self.registry.relays.values().await {
187+
session.session.lock().await.stop().await;
188+
}
189+
183190
for addr in DefaultAddress::iter() {
184191
let result = ctx.stop_worker(addr).await;
185192
// when stopping we can safely ignore missing services
@@ -191,6 +198,7 @@ impl InMemoryNode {
191198
}
192199
}
193200
}
201+
194202
Ok(())
195203
}
196204

0 commit comments

Comments
 (0)