Skip to content

Commit

Permalink
Feat/add liveliness (#15)
Browse files Browse the repository at this point in the history
* Add Livliness to remote_api_plugin

* Add Liveliness + LivelinessToken classes to typescript, move handling of liveliness in plugin to its own function

* add liveliness subscriber example

* WIP, query liveliness

* Finish implementing liveliness query, bump rust deps
  • Loading branch information
Charles-Schleich authored Nov 7, 2024
1 parent 03e9e1e commit d5fb595
Show file tree
Hide file tree
Showing 18 changed files with 765 additions and 37 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.vscode
**/build
cp_plugin.sh
TEST_CONFIG.json5

zenoh-ts/dist
zenoh-ts/node_modules
Expand Down
94 changes: 90 additions & 4 deletions zenoh-plugin-remote-api/src/handle_control_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::{error::Error, net::SocketAddr};
use std::{error::Error, net::SocketAddr, time::Duration};

use tracing::{error, warn};
use uuid::Uuid;
Expand All @@ -24,9 +24,10 @@ use zenoh::{

use crate::{
interface::{
ControlMsg, DataMsg, HandlerChannel, QueryWS, QueryableMsg, RemoteAPIMsg, ReplyWS, SampleWS,
ControlMsg, DataMsg, HandlerChannel, LivelinessMsg, QueryWS, QueryableMsg, RemoteAPIMsg,
ReplyWS, SampleWS,
},
spawn_future, StateMap,
spawn_future, RemoteState, StateMap,
};

///
Expand Down Expand Up @@ -202,7 +203,7 @@ pub(crate) async fn handle_control_message(
handler,
id: subscriber_uuid,
} => {
let key_expr = KeyExpr::new(owned_key_expr.clone()).unwrap();
let key_expr = KeyExpr::new(owned_key_expr.clone())?;
let ch_tx = state_map.websocket_tx.clone();

let join_handle = match handler {
Expand Down Expand Up @@ -326,6 +327,10 @@ pub(crate) async fn handle_control_message(
queryable.undeclare().await?;
};
}
ControlMsg::Liveliness(liveliness_msg) => {
return handle_liveliness(liveliness_msg, state_map).await;
}

msg @ (ControlMsg::GetFinished { id: _ }
| ControlMsg::Session(_)
| ControlMsg::Subscriber(_)) => {
Expand All @@ -335,3 +340,84 @@ pub(crate) async fn handle_control_message(
};
Ok(None)
}

// Handle Liveliness Messages
async fn handle_liveliness(
liveliness_msg: LivelinessMsg,
state_map: &mut RemoteState,
) -> Result<Option<ControlMsg>, Box<dyn Error + Send + Sync>> {
let liveliness = state_map.session.liveliness();
match liveliness_msg {
LivelinessMsg::DeclareToken { key_expr, id } => {
let token = liveliness.declare_token(key_expr).await?;
state_map.liveliness_tokens.insert(id, token);
}
LivelinessMsg::UndeclareToken(uuid) => {
if let Some(token) = state_map.liveliness_tokens.remove(&uuid) {
token.undeclare().await?;
}
}
LivelinessMsg::DeclareSubscriber {
key_expr: owned_key_expr,
id,
history,
} => {
let key_expr = KeyExpr::new(owned_key_expr.clone())?;
let subscriber = liveliness
.declare_subscriber(key_expr)
.history(history)
.await?;
let ch_tx = state_map.websocket_tx.clone();

let handler = spawn_future(async move {
while let Ok(sample) = subscriber.recv_async().await {
let sample_ws = SampleWS::from(sample);
let remote_api_message = RemoteAPIMsg::Data(DataMsg::Sample(sample_ws, id));
if let Err(e) = ch_tx.send(remote_api_message) {
error!("Forward Sample Channel error: {e}");
};
}
});
state_map
.liveliness_subscribers
.insert(id, (handler, owned_key_expr));
}
LivelinessMsg::UndeclareSubscriber(uuid) => {
if let Some((join_handle, _)) = state_map.liveliness_subscribers.remove(&uuid) {
join_handle.abort(); // This should drop the underlying liveliness_subscribers of the future
} else {
warn!("UndeclareSubscriber: No Subscriber with UUID {uuid}");
}
}
LivelinessMsg::Get {
key_expr,
id,
timeout,
} => {
let mut builder = liveliness.get(key_expr);
if let Some(timeout) = timeout {
builder = builder.timeout(Duration::from_millis(timeout));
}
let receiver = builder.await?;

let mut receiving: bool = true;

while receiving {
match receiver.recv_async().await {
Ok(reply) => {
let reply_ws = ReplyWS::from((reply, id));
let remote_api_msg = RemoteAPIMsg::Data(DataMsg::GetReply(reply_ws));
if let Err(err) = state_map.websocket_tx.send(remote_api_msg) {
tracing::error!("{}", err);
}
}
Err(_) => receiving = false,
}
}

let remote_api_msg = RemoteAPIMsg::Control(ControlMsg::GetFinished { id });
state_map.websocket_tx.send(remote_api_msg)?;
}
}
Ok(None)
}
29 changes: 29 additions & 0 deletions zenoh-plugin-remote-api/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,35 @@ pub enum ControlMsg {
complete: bool,
},
UndeclareQueryable(Uuid),

// Liveliness
Liveliness(LivelinessMsg),
}

#[derive(Debug, Serialize, Deserialize, TS)]
#[ts(export)]
pub enum LivelinessMsg {
DeclareToken {
#[ts(as = "OwnedKeyExprWrapper")]
key_expr: OwnedKeyExpr,
id: Uuid,
},
UndeclareToken(Uuid),
DeclareSubscriber {
#[ts(as = "OwnedKeyExprWrapper")]
key_expr: OwnedKeyExpr,
id: Uuid,
history: bool,
},
UndeclareSubscriber(Uuid),
Get {
#[ts(as = "OwnedKeyExprWrapper")]
key_expr: OwnedKeyExpr,
id: Uuid,
// timeout in Milliseconds
#[ts(type = "number | undefined")]
timeout: Option<u64>,
},
}

fn deserialize_consolidation_mode<'de, D>(d: D) -> Result<Option<ConsolidationMode>, D::Error>
Expand Down
14 changes: 8 additions & 6 deletions zenoh-plugin-remote-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use zenoh::{
format::{kedefine, keformat},
keyexpr, OwnedKeyExpr,
},
liveliness::LivelinessToken,
pubsub::Publisher,
query::{Query, Queryable},
Session,
Expand Down Expand Up @@ -482,6 +483,9 @@ struct RemoteState {
// Queryable
queryables: HashMap<Uuid, (Queryable<()>, OwnedKeyExpr)>,
unanswered_queries: Arc<std::sync::RwLock<HashMap<Uuid, Query>>>,
// Liveliness
liveliness_tokens: HashMap<Uuid, LivelinessToken>,
liveliness_subscribers: HashMap<Uuid, (JoinHandle<()>, OwnedKeyExpr)>,
}

impl RemoteState {
Expand All @@ -494,6 +498,8 @@ impl RemoteState {
publishers: HashMap::new(),
queryables: HashMap::new(),
unanswered_queries: Arc::new(std::sync::RwLock::new(HashMap::new())),
liveliness_tokens: HashMap::new(),
liveliness_subscribers: HashMap::new(),
}
}

Expand Down Expand Up @@ -545,12 +551,8 @@ async fn run_websocket_server(
opt_tls_acceptor = Some(TlsAcceptor::from(Arc::new(config)));
}

tracing::info!("Spawning Remote API Plugin on {:?}", ws_port);

let tcp = TcpListener::bind(ws_port).await;

let server: TcpListener = match tcp {
Ok(x) => x,
let server: TcpListener = match TcpListener::bind(ws_port).await {
Ok(server) => server,
Err(err) => {
tracing::error!("Unable to start TcpListener {err}");
return;
Expand Down
10 changes: 8 additions & 2 deletions zenoh-ts/examples/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,24 @@ import { sub as sub } from "./z_sub.ts";
import { pub } from "./z_pub.ts";
import { put } from "./z_put.ts";
import { queryable } from "./z_queryable.ts";
import { _delete } from "./z_delete.ts";
// import { _delete } from "./z_delete.ts";
import { liveliness } from "./z_liveliness.ts";
import { sub_liveliness } from "./z_sub_liveliness.ts";
import { get_liveliness } from "./z_get_liveliness.ts";

async function main() {
// thr();
// ping();
// pong();
sub();
// sub();
// pub();
// queryable();
// get();
// _delete();
// put();
liveliness();
// sub_liveliness();
// get_liveliness();

let count = 0;
while (true) {
Expand Down
61 changes: 61 additions & 0 deletions zenoh-ts/examples/src/z_get_liveliness.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

import "./style.css";
import "./webpage.ts";

import {
RingChannel, deserialize_string, Sample, Config, Subscriber, Session, KeyExpr,
SampleKind,
Receiver,
RecvErr,
ReplyError
} from "@eclipse-zenoh/zenoh-ts";
import { Duration } from 'typed-duration'
const { seconds } = Duration

export async function get_liveliness() {

console.log("Opening session...")
const session = await Session.open(new Config("ws/127.0.0.1:10000"));
let key_expr = new KeyExpr("group1/**");
console.log("Sending Liveliness Query '", key_expr.toString(),"'");

let receiver = session.liveliness().get(key_expr, {timeout: seconds.of(20)});

if (!(receiver instanceof Receiver)){
return // Return in case of callback get query
}

let reply = await receiver.receive();

while (reply != RecvErr.Disconnected) {
if (reply == RecvErr.MalformedReply) {
console.warn("MalformedReply");
} else {
let resp = reply.result();
if (resp instanceof Sample) {
let sample: Sample = resp;
console.warn(">> Alive token ('", sample.keyexpr() ,")");
} else {
let reply_error: ReplyError = resp;
console.warn(">> Received (ERROR: '", reply_error.payload().deserialize(deserialize_string), "')");
}
}
reply = await receiver.receive();
}

console.warn("End Liveliness query");
}

41 changes: 41 additions & 0 deletions zenoh-ts/examples/src/z_liveliness.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

import "./style.css";
import "./webpage.ts";

import {
Config, Session, KeyExpr, LivelinessToken
} from "@eclipse-zenoh/zenoh-ts";

export async function liveliness() {

console.log("Opening session...")
const session = await Session.open(new Config ("ws/127.0.0.1:10000"));
let key_expr = new KeyExpr("group1/zenoh-rs");
console.log("Declaring Liveliness token on ",key_expr.toString());

let token: LivelinessToken = session.liveliness().declare_token(key_expr);
// LivelinessTokens are NOT automatically closed when dropped
// please call token.undeclare();

while (true) {
await sleep(10000);
console.log("Tick")
}
}

function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
55 changes: 55 additions & 0 deletions zenoh-ts/examples/src/z_sub_liveliness.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

import "./style.css";
import "./webpage.ts";

import {
RingChannel, deserialize_string, Sample, Config, Subscriber, Session, KeyExpr,
SampleKind
} from "@eclipse-zenoh/zenoh-ts";

export async function sub_liveliness() {

console.log("Opening session...")
const session = await Session.open(new Config("ws/127.0.0.1:10000"));
let key_expr = new KeyExpr("group1/**");
console.log("Declaring Liveliness Subscriber on ", key_expr.toString());

let liveliness_subscriber: Subscriber = session.liveliness().declare_subscriber(key_expr, { history: true });
console.log("liveliness_subscriber : ", liveliness_subscriber);

let sample = await liveliness_subscriber.receive();
while (sample != undefined) {
switch (sample.kind()) {
case SampleKind.PUT: {
console.log!(
">> [LivelinessSubscriber] New alive token ",
sample.keyexpr().toString()
)
break;
}
case SampleKind.DELETE: {
console.log!(
">> [LivelinessSubscriber] Dropped token ",
sample.keyexpr().toString()
)
break;
}
}
sample = await liveliness_subscriber.receive();
}

liveliness_subscriber.undeclare();
}
Loading

0 comments on commit d5fb595

Please sign in to comment.