Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/add liveliness #15

Merged
merged 6 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.vscode
**/build
cp_plugin.sh
TEST_CONFIG.json5

zenoh-ts/dist
zenoh-ts/node_modules
Expand Down
94 changes: 90 additions & 4 deletions zenoh-plugin-remote-api/src/handle_control_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::{error::Error, net::SocketAddr};
use std::{error::Error, net::SocketAddr, time::Duration};

use tracing::{error, warn};
use uuid::Uuid;
Expand All @@ -24,9 +24,10 @@ use zenoh::{

use crate::{
interface::{
ControlMsg, DataMsg, HandlerChannel, QueryWS, QueryableMsg, RemoteAPIMsg, ReplyWS, SampleWS,
ControlMsg, DataMsg, HandlerChannel, LivelinessMsg, QueryWS, QueryableMsg, RemoteAPIMsg,
ReplyWS, SampleWS,
},
spawn_future, StateMap,
spawn_future, RemoteState, StateMap,
};

///
Expand Down Expand Up @@ -202,7 +203,7 @@ pub(crate) async fn handle_control_message(
handler,
id: subscriber_uuid,
} => {
let key_expr = KeyExpr::new(owned_key_expr.clone()).unwrap();
let key_expr = KeyExpr::new(owned_key_expr.clone())?;
let ch_tx = state_map.websocket_tx.clone();

let join_handle = match handler {
Expand Down Expand Up @@ -326,6 +327,10 @@ pub(crate) async fn handle_control_message(
queryable.undeclare().await?;
};
}
ControlMsg::Liveliness(liveliness_msg) => {
return handle_liveliness(liveliness_msg, state_map).await;
}

msg @ (ControlMsg::GetFinished { id: _ }
| ControlMsg::Session(_)
| ControlMsg::Subscriber(_)) => {
Expand All @@ -335,3 +340,84 @@ pub(crate) async fn handle_control_message(
};
Ok(None)
}

// Handle Liveliness Messages
async fn handle_liveliness(
liveliness_msg: LivelinessMsg,
state_map: &mut RemoteState,
) -> Result<Option<ControlMsg>, Box<dyn Error + Send + Sync>> {
let liveliness = state_map.session.liveliness();
match liveliness_msg {
LivelinessMsg::DeclareToken { key_expr, id } => {
let token = liveliness.declare_token(key_expr).await?;
state_map.liveliness_tokens.insert(id, token);
}
LivelinessMsg::UndeclareToken(uuid) => {
if let Some(token) = state_map.liveliness_tokens.remove(&uuid) {
token.undeclare().await?;
}
}
LivelinessMsg::DeclareSubscriber {
key_expr: owned_key_expr,
id,
history,
} => {
let key_expr = KeyExpr::new(owned_key_expr.clone())?;
let subscriber = liveliness
.declare_subscriber(key_expr)
.history(history)
.await?;
let ch_tx = state_map.websocket_tx.clone();

let handler = spawn_future(async move {
while let Ok(sample) = subscriber.recv_async().await {
let sample_ws = SampleWS::from(sample);
let remote_api_message = RemoteAPIMsg::Data(DataMsg::Sample(sample_ws, id));
if let Err(e) = ch_tx.send(remote_api_message) {
error!("Forward Sample Channel error: {e}");
};
}
});
state_map
.liveliness_subscribers
.insert(id, (handler, owned_key_expr));
}
LivelinessMsg::UndeclareSubscriber(uuid) => {
if let Some((join_handle, _)) = state_map.liveliness_subscribers.remove(&uuid) {
join_handle.abort(); // This should drop the underlying liveliness_subscribers of the future
} else {
warn!("UndeclareSubscriber: No Subscriber with UUID {uuid}");
}
}
LivelinessMsg::Get {
key_expr,
id,
timeout,
} => {
let mut builder = liveliness.get(key_expr);
if let Some(timeout) = timeout {
builder = builder.timeout(Duration::from_millis(timeout));
}
let receiver = builder.await?;

let mut receiving: bool = true;

while receiving {
match receiver.recv_async().await {
Ok(reply) => {
let reply_ws = ReplyWS::from((reply, id));
let remote_api_msg = RemoteAPIMsg::Data(DataMsg::GetReply(reply_ws));
if let Err(err) = state_map.websocket_tx.send(remote_api_msg) {
tracing::error!("{}", err);
}
}
Err(_) => receiving = false,
}
}

let remote_api_msg = RemoteAPIMsg::Control(ControlMsg::GetFinished { id });
state_map.websocket_tx.send(remote_api_msg)?;
}
}
Ok(None)
}
29 changes: 29 additions & 0 deletions zenoh-plugin-remote-api/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,35 @@ pub enum ControlMsg {
complete: bool,
},
UndeclareQueryable(Uuid),

// Liveliness
Liveliness(LivelinessMsg),
}

#[derive(Debug, Serialize, Deserialize, TS)]
#[ts(export)]
pub enum LivelinessMsg {
DeclareToken {
#[ts(as = "OwnedKeyExprWrapper")]
key_expr: OwnedKeyExpr,
id: Uuid,
},
UndeclareToken(Uuid),
DeclareSubscriber {
#[ts(as = "OwnedKeyExprWrapper")]
key_expr: OwnedKeyExpr,
id: Uuid,
history: bool,
},
UndeclareSubscriber(Uuid),
Get {
#[ts(as = "OwnedKeyExprWrapper")]
key_expr: OwnedKeyExpr,
id: Uuid,
// timeout in Milliseconds
#[ts(type = "number | undefined")]
timeout: Option<u64>,
},
}

fn deserialize_consolidation_mode<'de, D>(d: D) -> Result<Option<ConsolidationMode>, D::Error>
Expand Down
14 changes: 8 additions & 6 deletions zenoh-plugin-remote-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use zenoh::{
format::{kedefine, keformat},
keyexpr, OwnedKeyExpr,
},
liveliness::LivelinessToken,
pubsub::Publisher,
query::{Query, Queryable},
Session,
Expand Down Expand Up @@ -482,6 +483,9 @@ struct RemoteState {
// Queryable
queryables: HashMap<Uuid, (Queryable<()>, OwnedKeyExpr)>,
unanswered_queries: Arc<std::sync::RwLock<HashMap<Uuid, Query>>>,
// Liveliness
liveliness_tokens: HashMap<Uuid, LivelinessToken>,
liveliness_subscribers: HashMap<Uuid, (JoinHandle<()>, OwnedKeyExpr)>,
}

impl RemoteState {
Expand All @@ -494,6 +498,8 @@ impl RemoteState {
publishers: HashMap::new(),
queryables: HashMap::new(),
unanswered_queries: Arc::new(std::sync::RwLock::new(HashMap::new())),
liveliness_tokens: HashMap::new(),
liveliness_subscribers: HashMap::new(),
}
}

Expand Down Expand Up @@ -545,12 +551,8 @@ async fn run_websocket_server(
opt_tls_acceptor = Some(TlsAcceptor::from(Arc::new(config)));
}

tracing::info!("Spawning Remote API Plugin on {:?}", ws_port);

let tcp = TcpListener::bind(ws_port).await;

let server: TcpListener = match tcp {
Ok(x) => x,
let server: TcpListener = match TcpListener::bind(ws_port).await {
Ok(server) => server,
Err(err) => {
tracing::error!("Unable to start TcpListener {err}");
return;
Expand Down
10 changes: 8 additions & 2 deletions zenoh-ts/examples/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,24 @@ import { sub as sub } from "./z_sub.ts";
import { pub } from "./z_pub.ts";
import { put } from "./z_put.ts";
import { queryable } from "./z_queryable.ts";
import { _delete } from "./z_delete.ts";
// import { _delete } from "./z_delete.ts";
import { liveliness } from "./z_liveliness.ts";
import { sub_liveliness } from "./z_sub_liveliness.ts";
import { get_liveliness } from "./z_get_liveliness.ts";

async function main() {
// thr();
// ping();
// pong();
sub();
// sub();
// pub();
// queryable();
// get();
// _delete();
// put();
liveliness();
// sub_liveliness();
// get_liveliness();

let count = 0;
while (true) {
Expand Down
61 changes: 61 additions & 0 deletions zenoh-ts/examples/src/z_get_liveliness.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

import "./style.css";
import "./webpage.ts";

import {
RingChannel, deserialize_string, Sample, Config, Subscriber, Session, KeyExpr,
SampleKind,
Receiver,
RecvErr,
ReplyError
} from "@eclipse-zenoh/zenoh-ts";
import { Duration } from 'typed-duration'
const { seconds } = Duration

export async function get_liveliness() {

console.log("Opening session...")
const session = await Session.open(new Config("ws/127.0.0.1:10000"));
let key_expr = new KeyExpr("group1/**");
console.log("Sending Liveliness Query '", key_expr.toString(),"'");

let receiver = session.liveliness().get(key_expr, {timeout: seconds.of(20)});

if (!(receiver instanceof Receiver)){
return // Return in case of callback get query
}

let reply = await receiver.receive();

while (reply != RecvErr.Disconnected) {
if (reply == RecvErr.MalformedReply) {
console.warn("MalformedReply");
} else {
let resp = reply.result();
if (resp instanceof Sample) {
let sample: Sample = resp;
console.warn(">> Alive token ('", sample.keyexpr() ,")");
} else {
let reply_error: ReplyError = resp;
console.warn(">> Received (ERROR: '", reply_error.payload().deserialize(deserialize_string), "')");
}
}
reply = await receiver.receive();
}

console.warn("End Liveliness query");
}

41 changes: 41 additions & 0 deletions zenoh-ts/examples/src/z_liveliness.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

import "./style.css";
import "./webpage.ts";

import {
Config, Session, KeyExpr, LivelinessToken
} from "@eclipse-zenoh/zenoh-ts";

export async function liveliness() {

console.log("Opening session...")
const session = await Session.open(new Config ("ws/127.0.0.1:10000"));
let key_expr = new KeyExpr("group1/zenoh-rs");
console.log("Declaring Liveliness token on ",key_expr.toString());

let token: LivelinessToken = session.liveliness().declare_token(key_expr);
// LivelinessTokens are NOT automatically closed when dropped
// please call token.undeclare();

while (true) {
await sleep(10000);
console.log("Tick")
}
}

function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
55 changes: 55 additions & 0 deletions zenoh-ts/examples/src/z_sub_liveliness.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

import "./style.css";
import "./webpage.ts";

import {
RingChannel, deserialize_string, Sample, Config, Subscriber, Session, KeyExpr,
SampleKind
} from "@eclipse-zenoh/zenoh-ts";

export async function sub_liveliness() {

console.log("Opening session...")
const session = await Session.open(new Config("ws/127.0.0.1:10000"));
let key_expr = new KeyExpr("group1/**");
console.log("Declaring Liveliness Subscriber on ", key_expr.toString());

let liveliness_subscriber: Subscriber = session.liveliness().declare_subscriber(key_expr, { history: true });
console.log("liveliness_subscriber : ", liveliness_subscriber);

let sample = await liveliness_subscriber.receive();
while (sample != undefined) {
switch (sample.kind()) {
case SampleKind.PUT: {
console.log!(
">> [LivelinessSubscriber] New alive token ",
sample.keyexpr().toString()
)
break;
}
case SampleKind.DELETE: {
console.log!(
">> [LivelinessSubscriber] Dropped token ",
sample.keyexpr().toString()
)
break;
}
}
sample = await liveliness_subscriber.receive();
}

liveliness_subscriber.undeclare();
}
Loading
Loading