From 3ea3779b7650c64cdd8067f2323a4837f0ed0c46 Mon Sep 17 00:00:00 2001 From: Charles Schleich Date: Wed, 11 Dec 2024 14:19:34 +0100 Subject: [PATCH] Feat/querier api (#64) * Add Querier interface and handling of messages in plugin, Minor refactor of plugin structure * add querier Class, RemoteQuerier, Locality, ReplyKeyExpr QuerierOptions, QuerierGetOptions * Fix spelling error, add undeclare in typescript * fmt * Adding Querier Example * Fix incorrect Enum->int functions * Fix Incorrect ordering of deserializing enums --- .../src/handle_control_message.rs | 93 ++++++- .../src/{interface.rs => interface/mod.rs} | 226 ++++++--------- .../src/interface/ser_de.rs | 258 +++++++++++++++++ zenoh-plugin-remote-api/src/lib.rs | 5 +- zenoh-ts/examples/src/z_querier.ts | 59 ++++ zenoh-ts/src/index.ts | 5 +- zenoh-ts/src/liveliness.ts | 3 +- zenoh-ts/src/querier.ts | 259 ++++++++++++++++++ .../src/remote_api/interface/ControlMsg.ts | 2 +- zenoh-ts/src/remote_api/querier.ts | 80 ++++++ zenoh-ts/src/remote_api/session.ts | 40 +++ zenoh-ts/src/sample.ts | 1 + zenoh-ts/src/session.ts | 71 ++++- 13 files changed, 944 insertions(+), 158 deletions(-) rename zenoh-plugin-remote-api/src/{interface.rs => interface/mod.rs} (80%) create mode 100644 zenoh-plugin-remote-api/src/interface/ser_de.rs create mode 100644 zenoh-ts/examples/src/z_querier.ts create mode 100644 zenoh-ts/src/querier.ts create mode 100644 zenoh-ts/src/remote_api/querier.ts diff --git a/zenoh-plugin-remote-api/src/handle_control_message.rs b/zenoh-plugin-remote-api/src/handle_control_message.rs index bb41588..422b6e5 100644 --- a/zenoh-plugin-remote-api/src/handle_control_message.rs +++ b/zenoh-plugin-remote-api/src/handle_control_message.rs @@ -14,9 +14,11 @@ use std::{error::Error, net::SocketAddr, time::Duration}; +use base64::{prelude::BASE64_STANDARD, Engine}; use tracing::{error, warn}; use uuid::Uuid; use zenoh::{ + bytes::ZBytes, handlers::{FifoChannel, RingChannel}, key_expr::KeyExpr, query::Selector, @@ -24,8 +26,8 @@ use zenoh::{ use crate::{ interface::{ - ControlMsg, DataMsg, HandlerChannel, LivelinessMsg, QueryWS, QueryableMsg, RemoteAPIMsg, - ReplyWS, SampleWS, + B64String, ControlMsg, DataMsg, HandlerChannel, LivelinessMsg, QueryWS, QueryableMsg, + RemoteAPIMsg, ReplyWS, SampleWS, }, spawn_future, RemoteState, StateMap, }; @@ -333,7 +335,94 @@ pub(crate) async fn handle_control_message( ControlMsg::Liveliness(liveliness_msg) => { return handle_liveliness(liveliness_msg, state_map).await; } + ControlMsg::DeclareQuerier { + id, + key_expr, + target, + timeout, + accept_replies, + congestion_control, + priority, + consolidation, + allowed_destination, + express, + } => { + let mut querier_builder = state_map.session.declare_querier(key_expr); + let timeout = timeout.map(|millis| Duration::from_millis(millis)); + + add_if_some!(target, querier_builder); + add_if_some!(timeout, querier_builder); + add_if_some!(accept_replies, querier_builder); + add_if_some!(accept_replies, querier_builder); + add_if_some!(congestion_control, querier_builder); + add_if_some!(priority, querier_builder); + add_if_some!(consolidation, querier_builder); + add_if_some!(allowed_destination, querier_builder); + add_if_some!(express, querier_builder); + + let querier = querier_builder.await?; + state_map.queriers.insert(id, querier); + } + ControlMsg::UndeclareQuerier(uuid) => { + if let Some(querier) = state_map.queriers.remove(&uuid) { + querier.undeclare().await?; + } else { + warn!("No Querier Found with UUID {}", uuid); + }; + } + ControlMsg::QuerierGet { + get_id, + querier_id, + encoding, + payload, + attachment, + } => { + if let Some(querier) = state_map.queriers.get(&querier_id) { + let mut get_builder = querier.get(); + + let payload = payload + .map(|B64String(x)| BASE64_STANDARD.decode(x)) + .and_then(|res_vec_bytes| { + if let Ok(vec_bytes) = res_vec_bytes { + Some(ZBytes::from(vec_bytes)) + } else { + None + } + }); + + let attachment: Option = attachment + .map(|B64String(x)| BASE64_STANDARD.decode(x)) + .and_then(|res_vec_bytes| { + if let Ok(vec_bytes) = res_vec_bytes { + Some(ZBytes::from(vec_bytes)) + } else { + None + } + }); + add_if_some!(encoding, get_builder); + add_if_some!(payload, get_builder); + add_if_some!(attachment, get_builder); + let receiver = get_builder.await?; + let ws_tx = state_map.websocket_tx.clone(); + let finish_msg = RemoteAPIMsg::Control(ControlMsg::GetFinished { id: get_id }); + spawn_future(async move { + while let Ok(reply) = receiver.recv_async().await { + let reply_ws = ReplyWS::from((reply, get_id)); + let remote_api_msg = RemoteAPIMsg::Data(DataMsg::GetReply(reply_ws)); + if let Err(err) = ws_tx.send(remote_api_msg) { + tracing::error!("{}", err); + } + } + if let Err(err) = ws_tx.send(finish_msg) { + tracing::error!("{}", err); + } + }); + } else { + // TODO: Do we want to add an error here ? + warn!("No Querier With ID {querier_id} found") + } + } msg @ (ControlMsg::GetFinished { id: _ } | ControlMsg::Session(_) | ControlMsg::Subscriber(_)) => { diff --git a/zenoh-plugin-remote-api/src/interface.rs b/zenoh-plugin-remote-api/src/interface/mod.rs similarity index 80% rename from zenoh-plugin-remote-api/src/interface.rs rename to zenoh-plugin-remote-api/src/interface/mod.rs index deb9f3f..e205057 100644 --- a/zenoh-plugin-remote-api/src/interface.rs +++ b/zenoh-plugin-remote-api/src/interface/mod.rs @@ -14,15 +14,24 @@ use std::sync::Arc; +// mod interface::ser_de; +pub(crate) mod ser_de; use base64::{prelude::BASE64_STANDARD, Engine}; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use ser_de::{ + deserialize_congestion_control, deserialize_consolidation_mode, deserialize_locality, + deserialize_priority, deserialize_query_target, deserialize_reliability, + deserialize_reply_key_expr, serialize_congestion_control, serialize_consolidation_mode, + serialize_locality, serialize_priority, serialize_query_target, serialize_reliability, + serialize_reply_key_expr, +}; +use serde::{Deserialize, Serialize}; use ts_rs::TS; use uuid::Uuid; use zenoh::{ key_expr::OwnedKeyExpr, qos::{CongestionControl, Priority, Reliability}, - query::{ConsolidationMode, Query, Reply, ReplyError}, - sample::{Sample, SampleKind}, + query::{ConsolidationMode, Query, QueryTarget, Reply, ReplyError, ReplyKeyExpr}, + sample::{Locality, Sample, SampleKind}, }; // ██████ ███████ ███ ███ ██████ ████████ ███████ █████ ██████ ██ ███ ███ ███████ ███████ ███████ █████ ██████ ███████ @@ -34,7 +43,7 @@ use zenoh::{ #[derive(TS)] #[ts(export)] #[derive(Debug, Serialize, Deserialize)] -pub(crate) struct B64String(String); +pub(crate) struct B64String(pub String); impl From for B64String { fn from(value: String) -> Self { B64String(value) @@ -247,6 +256,70 @@ pub enum ControlMsg { complete: bool, }, UndeclareQueryable(Uuid), + // Quierer + DeclareQuerier { + id: Uuid, + #[ts(as = "OwnedKeyExprWrapper")] + key_expr: OwnedKeyExpr, + #[serde( + deserialize_with = "deserialize_query_target", + serialize_with = "serialize_query_target", + default + )] + #[ts(type = "number | undefined")] + target: Option, + #[ts(type = "number | undefined")] + timeout: Option, + #[serde( + deserialize_with = "deserialize_reply_key_expr", + serialize_with = "serialize_reply_key_expr", + default + )] + #[ts(type = "number | undefined")] + accept_replies: Option, + #[serde( + deserialize_with = "deserialize_locality", + serialize_with = "serialize_locality", + default + )] + #[ts(type = "number | undefined")] + allowed_destination: Option, + #[serde( + deserialize_with = "deserialize_congestion_control", + serialize_with = "serialize_congestion_control", + default + )] + #[ts(type = "number | undefined")] + congestion_control: Option, + #[serde( + deserialize_with = "deserialize_priority", + serialize_with = "serialize_priority", + default + )] + #[ts(type = "number | undefined")] + priority: Option, + #[serde( + deserialize_with = "deserialize_consolidation_mode", + serialize_with = "serialize_consolidation_mode", + default + )] + #[ts(type = "number | undefined")] + consolidation: Option, + #[ts(type = "boolean | undefined")] + express: Option, + }, + UndeclareQuerier(Uuid), + // Querier + QuerierGet { + querier_id: Uuid, + get_id: Uuid, + #[ts(type = "string | undefined")] + encoding: Option, + #[ts(type = "string | undefined")] + payload: Option, + #[ts(type = "string | undefined")] + attachment: Option, + }, // Liveliness Liveliness(LivelinessMsg), @@ -278,151 +351,6 @@ pub enum LivelinessMsg { }, } -fn deserialize_consolidation_mode<'de, D>(d: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - match Option::::deserialize(d) { - Ok(Some(value)) => Ok(Some(match value { - 0u8 => ConsolidationMode::Auto, - 1u8 => ConsolidationMode::None, - 2u8 => ConsolidationMode::Monotonic, - 3u8 => ConsolidationMode::Latest, - _ => { - return Err(serde::de::Error::custom(format!( - "Value not valid for ConsolidationMode Enum {:?}", - value - ))) - } - })), - Ok(None) => Ok(None), - Err(err) => Err(serde::de::Error::custom(format!( - "Value not valid for ConsolidationMode Enum {:?}", - err - ))), - } -} - -fn serialize_consolidation_mode( - consolidation_mode: &Option, - s: S, -) -> Result -where - S: Serializer, -{ - match consolidation_mode { - Some(c_mode) => s.serialize_u8(*c_mode as u8), - None => s.serialize_none(), - } -} - -fn deserialize_congestion_control<'de, D>(d: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - match Option::::deserialize(d) { - Ok(Some(value)) => Ok(Some(match value { - 0u8 => CongestionControl::Drop, - 1u8 => CongestionControl::Block, - val => { - return Err(serde::de::Error::custom(format!( - "Value not valid for CongestionControl Enum {:?}", - val - ))) - } - })), - Ok(None) => Ok(None), - val => Err(serde::de::Error::custom(format!( - "Value not valid for CongestionControl Enum {:?}", - val - ))), - } -} - -fn serialize_congestion_control( - congestion_control: &Option, - s: S, -) -> Result -where - S: Serializer, -{ - match congestion_control { - Some(c_ctrl) => s.serialize_u8(*c_ctrl as u8), - None => s.serialize_none(), - } -} - -fn deserialize_priority<'de, D>(d: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - match Option::::deserialize(d) { - Ok(Some(value)) => Ok(Some(match value { - 1u8 => Priority::RealTime, - 2u8 => Priority::InteractiveHigh, - 3u8 => Priority::InteractiveLow, - 4u8 => Priority::DataHigh, - 5u8 => Priority::Data, - 6u8 => Priority::DataLow, - 7u8 => Priority::Background, - val => { - return Err(serde::de::Error::custom(format!( - "Value not valid for Priority Enum {:?}", - val - ))) - } - })), - Ok(None) => Ok(None), - val => Err(serde::de::Error::custom(format!( - "Value not valid for Priority Enum {:?}", - val - ))), - } -} - -fn serialize_priority(priority: &Option, s: S) -> Result -where - S: Serializer, -{ - match priority { - Some(prio) => s.serialize_u8(*prio as u8), - None => s.serialize_none(), - } -} - -fn deserialize_reliability<'de, D>(d: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - match Option::::deserialize(d) { - Ok(Some(value)) => Ok(Some(match value { - 0u8 => Reliability::Reliable, - 1u8 => Reliability::BestEffort, - val => { - return Err(serde::de::Error::custom(format!( - "Value not valid for Reliability Enum {:?}", - val - ))) - } - })), - Ok(None) => Ok(None), - val => Err(serde::de::Error::custom(format!( - "Value not valid for Reliability Enum {:?}", - val - ))), - } -} - -fn serialize_reliability(reliability: &Option, s: S) -> Result -where - S: Serializer, -{ - match reliability { - Some(prio) => s.serialize_u8(*prio as u8), - None => s.serialize_none(), - } -} - #[derive(Debug, Serialize, Deserialize, TS)] pub(crate) enum HandlerChannel { Fifo(usize), diff --git a/zenoh-plugin-remote-api/src/interface/ser_de.rs b/zenoh-plugin-remote-api/src/interface/ser_de.rs new file mode 100644 index 0000000..964d2e7 --- /dev/null +++ b/zenoh-plugin-remote-api/src/interface/ser_de.rs @@ -0,0 +1,258 @@ +use serde::{Deserialize, Deserializer, Serializer}; +use zenoh::{ + qos::{CongestionControl, Priority, Reliability}, + query::{ConsolidationMode, QueryTarget, ReplyKeyExpr}, + sample::Locality, +}; + +pub fn deserialize_consolidation_mode<'de, D>(d: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + match Option::::deserialize(d) { + Ok(Some(value)) => Ok(Some(match value { + 0u8 => ConsolidationMode::Auto, + 1u8 => ConsolidationMode::None, + 2u8 => ConsolidationMode::Monotonic, + 3u8 => ConsolidationMode::Latest, + _ => { + return Err(serde::de::Error::custom(format!( + "Value not valid for ConsolidationMode Enum {:?}", + value + ))) + } + })), + Ok(None) => Ok(None), + Err(err) => Err(serde::de::Error::custom(format!( + "Value not valid for ConsolidationMode Enum {:?}", + err + ))), + } +} + +pub fn serialize_consolidation_mode( + consolidation_mode: &Option, + s: S, +) -> Result +where + S: Serializer, +{ + match consolidation_mode { + Some(c_mode) => s.serialize_u8(*c_mode as u8), + None => s.serialize_none(), + } +} + +pub fn deserialize_reply_key_expr<'de, D>(d: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + match Option::::deserialize(d) { + Ok(Some(value)) => Ok(Some(match value { + 0u8 => ReplyKeyExpr::Any, + 1u8 => ReplyKeyExpr::MatchingQuery, + _ => { + return Err(serde::de::Error::custom(format!( + "Value not valid for ReplyKeyExpr Enum {:?}", + value + ))) + } + })), + Ok(None) => Ok(None), + Err(err) => Err(serde::de::Error::custom(format!( + "Value not valid for ReplyKeyExpr Enum {:?}", + err + ))), + } +} + +pub fn serialize_reply_key_expr( + consolidation_mode: &Option, + s: S, +) -> Result +where + S: Serializer, +{ + match consolidation_mode { + Some(c_mode) => s.serialize_u8(*c_mode as u8), + None => s.serialize_none(), + } +} + +pub fn deserialize_congestion_control<'de, D>(d: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + match Option::::deserialize(d) { + Ok(Some(value)) => Ok(Some(match value { + 0u8 => CongestionControl::Drop, + 1u8 => CongestionControl::Block, + val => { + return Err(serde::de::Error::custom(format!( + "Value not valid for CongestionControl Enum {:?}", + val + ))) + } + })), + Ok(None) => Ok(None), + val => Err(serde::de::Error::custom(format!( + "Value not valid for CongestionControl Enum {:?}", + val + ))), + } +} + +pub fn serialize_congestion_control( + congestion_control: &Option, + s: S, +) -> Result +where + S: Serializer, +{ + match congestion_control { + Some(c_ctrl) => s.serialize_u8(*c_ctrl as u8), + None => s.serialize_none(), + } +} + +pub fn deserialize_priority<'de, D>(d: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + match Option::::deserialize(d) { + Ok(Some(value)) => Ok(Some(match value { + 1u8 => Priority::RealTime, + 2u8 => Priority::InteractiveHigh, + 3u8 => Priority::InteractiveLow, + 4u8 => Priority::DataHigh, + 5u8 => Priority::Data, + 6u8 => Priority::DataLow, + 7u8 => Priority::Background, + val => { + return Err(serde::de::Error::custom(format!( + "Value not valid for Priority Enum {:?}", + val + ))) + } + })), + Ok(None) => Ok(None), + val => Err(serde::de::Error::custom(format!( + "Value not valid for Priority Enum {:?}", + val + ))), + } +} + +pub fn serialize_priority(priority: &Option, s: S) -> Result +where + S: Serializer, +{ + match priority { + Some(prio) => s.serialize_u8(*prio as u8), + None => s.serialize_none(), + } +} + +pub fn deserialize_reliability<'de, D>(d: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + match Option::::deserialize(d) { + Ok(Some(value)) => Ok(Some(match value { + 0u8 => Reliability::BestEffort, + 1u8 => Reliability::Reliable, + val => { + return Err(serde::de::Error::custom(format!( + "Value not valid for Reliability Enum {:?}", + val + ))) + } + })), + Ok(None) => Ok(None), + val => Err(serde::de::Error::custom(format!( + "Value not valid for Reliability Enum {:?}", + val + ))), + } +} + +pub fn serialize_reliability(reliability: &Option, s: S) -> Result +where + S: Serializer, +{ + match reliability { + Some(rel) => s.serialize_u8(*rel as u8), + None => s.serialize_none(), + } +} + +pub fn deserialize_locality<'de, D>(d: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + match Option::::deserialize(d) { + Ok(Some(value)) => Ok(Some(match value { + 0u8 => Locality::SessionLocal, + 1u8 => Locality::Remote, + 2u8 => Locality::Any, + val => { + return Err(serde::de::Error::custom(format!( + "Value not valid for Locality Enum {:?}", + val + ))) + } + })), + Ok(None) => Ok(None), + val => Err(serde::de::Error::custom(format!( + "Value not valid for Locality Enum {:?}", + val + ))), + } +} + +pub fn serialize_locality(locality: &Option, s: S) -> Result +where + S: Serializer, +{ + match locality { + Some(rel) => s.serialize_u8(*rel as u8), + None => s.serialize_none(), + } +} + +pub fn deserialize_query_target<'de, D>(d: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + match Option::::deserialize(d) { + Ok(Some(value)) => Ok(Some(match value { + 0u8 => QueryTarget::BestMatching, + 1u8 => QueryTarget::All, + 2u8 => QueryTarget::AllComplete, + val => { + return Err(serde::de::Error::custom(format!( + "Value not valid for QueryTarget Enum {:?}", + val + ))) + } + })), + Ok(None) => Ok(None), + val => Err(serde::de::Error::custom(format!( + "Value not valid for QueryTarget Enum {:?}", + val + ))), + } +} + +pub fn serialize_query_target( + query_target: &Option, + s: S, +) -> Result +where + S: Serializer, +{ + match query_target { + Some(rel) => s.serialize_u8(*rel as u8), + None => s.serialize_none(), + } +} diff --git a/zenoh-plugin-remote-api/src/lib.rs b/zenoh-plugin-remote-api/src/lib.rs index f1e5fe3..0cb9363 100644 --- a/zenoh-plugin-remote-api/src/lib.rs +++ b/zenoh-plugin-remote-api/src/lib.rs @@ -62,7 +62,7 @@ use zenoh::{ }, liveliness::LivelinessToken, pubsub::Publisher, - query::{Query, Queryable}, + query::{Querier, Query, Queryable}, Session, }; use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl}; @@ -486,6 +486,8 @@ struct RemoteState { // Liveliness liveliness_tokens: HashMap, liveliness_subscribers: HashMap, OwnedKeyExpr)>, + // Querier + queriers: HashMap>, } impl RemoteState { @@ -500,6 +502,7 @@ impl RemoteState { unanswered_queries: Arc::new(std::sync::RwLock::new(HashMap::new())), liveliness_tokens: HashMap::new(), liveliness_subscribers: HashMap::new(), + queriers: HashMap::new(), } } diff --git a/zenoh-ts/examples/src/z_querier.ts b/zenoh-ts/examples/src/z_querier.ts new file mode 100644 index 0000000..5b9fd5e --- /dev/null +++ b/zenoh-ts/examples/src/z_querier.ts @@ -0,0 +1,59 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +import { Duration, deserialize_string, ReplyError, Config, Receiver, RecvErr, Reply, Sample, Session, QueryTarget, Locality } from "@eclipse-zenoh/zenoh-ts"; + + +export async function main() { + const session = await Session.open(new Config("ws/127.0.0.1:10000")); + + + let querier = session.declare_querier("demo/example/**", + { + target: QueryTarget.BestMatching, + timeout: Duration.milliseconds.of(10000), + } + ); + + for(let i =0; i<1000; i++) { + await sleep(1000) + let payload = "["+i+"] Querier Get from Zenoh-ts!"; + let receiver = querier.get({payload:payload}) as Receiver; + + let reply = await receiver.receive(); + + while (reply != RecvErr.Disconnected) { + if (reply == RecvErr.MalformedReply) { + console.warn("MalformedReply"); + } else { + let resp = reply.result(); + if (resp instanceof Sample) { + let sample: Sample = resp; + console.warn(">> Received ('", sample.keyexpr(), ":", sample.payload().deserialize(deserialize_string), "')"); + } else { + let reply_error: ReplyError = resp; + console.warn(">> Received (ERROR: '{", reply_error.payload().deserialize(deserialize_string), "}')"); + } + } + reply = await receiver.receive(); + } + console.warn("Get Finished"); + } +} + +function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +main() \ No newline at end of file diff --git a/zenoh-ts/src/index.ts b/zenoh-ts/src/index.ts index a2eaa42..68a20f0 100644 --- a/zenoh-ts/src/index.ts +++ b/zenoh-ts/src/index.ts @@ -22,8 +22,10 @@ import { Session, RecvErr, Receiver, DeleteOptions, PutOptions, GetOptions, Quer import { Config } from "./config.js"; import { Encoding, IntoEncoding } from "./encoding.js"; import { Liveliness, LivelinessToken } from "./liveliness.js"; +import { Querier, QueryTarget, Locality, ReplyKeyExpr, QuerierOptions, QuerierGetOptions } from './querier.js' + // Re-export duration external library -import { TimeDuration as Duration } from 'typed-duration' +import { Duration } from 'typed-duration' // Exports export { KeyExpr, IntoKeyExpr }; @@ -36,3 +38,4 @@ export { Config }; export { Encoding, IntoEncoding }; export { Liveliness, LivelinessToken }; export { Duration }; +export { Querier, QueryTarget, Locality, ReplyKeyExpr, QuerierOptions, QuerierGetOptions } \ No newline at end of file diff --git a/zenoh-ts/src/liveliness.ts b/zenoh-ts/src/liveliness.ts index a1a2e59..d415981 100644 --- a/zenoh-ts/src/liveliness.ts +++ b/zenoh-ts/src/liveliness.ts @@ -50,7 +50,6 @@ export class Liveliness { } declare_subscriber(key_expr: IntoKeyExpr, options?: LivelinessSubscriberOptions): Subscriber { - console.log(key_expr, options) let _key_expr = new KeyExpr(key_expr); @@ -86,7 +85,7 @@ export class Liveliness { } get(key_expr: IntoKeyExpr, options?: LivelinessGetOptions): Receiver | undefined { - console.log(key_expr, options) + let _key_expr = new KeyExpr(key_expr); let _timeout_millis: number | undefined = undefined; diff --git a/zenoh-ts/src/querier.ts b/zenoh-ts/src/querier.ts new file mode 100644 index 0000000..9fd019e --- /dev/null +++ b/zenoh-ts/src/querier.ts @@ -0,0 +1,259 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +// External +import { SimpleChannel } from "channel-ts"; +// Remote API +import { ReplyWS } from "./remote_api/interface/ReplyWS.js"; +// API +import { IntoZBytes, ZBytes } from "./z_bytes.js"; +import { CongestionControl, ConsolidationMode, Priority, } from "./sample.js"; +import { TimeDuration } from "typed-duration"; +import { RemoteQuerier } from "./remote_api/querier.js"; +import { KeyExpr } from "./key_expr.js"; +import { Encoding } from "crypto"; +import { Receiver } from "./session.js"; + +export enum QueryTarget { + /// Let Zenoh find the BestMatching queryable capabale of serving the query. + BestMatching, + /// Deliver the query to all queryables matching the query's key expression. + All, + /// Deliver the query to all queryables matching the query's key expression that are declared as complete. + AllComplete, +} + +/** + * Convenience function to convert between QueryTarget and int + * @internal + */ +export function query_target_to_int(query_target?: QueryTarget): number { + switch (query_target) { + case QueryTarget.BestMatching: + return 0; + case QueryTarget.All: + return 1; + case QueryTarget.AllComplete: + return 2; + default: + // Default is QueryTarget.BestMatching + return 1; + } +} + +export enum Locality { + SessionLocal, + Remote, + Any, +} + +/** + * Convenience function to convert between Locality and int + * @internal + */ +export function locality_to_int(query_target?: Locality): number { + switch (query_target) { + case Locality.SessionLocal: + return 0; + case Locality.Remote: + return 1; + case Locality.Any: + return 2; + default: + // Default is Locality.Any + return 2; + } +} + +export enum ReplyKeyExpr { + /// Accept replies whose key expressions may not match the query key expression. + Any, + /// Accept replies whose key expressions match the query key expression. + MatchingQuery, +} + +/** + * Convenience function to convert between QueryTarget function and int + * @internal + */ +export function reply_key_expr_to_int(query_target?: ReplyKeyExpr): number { + switch (query_target) { + case ReplyKeyExpr.Any: + return 0; + case ReplyKeyExpr.MatchingQuery: + return 1; + default: + // Default is ReplyKeyExpr.MatchingQuery + return 1; + } +} + + +export interface QuerierOptions { + congestion_control?: CongestionControl, + consolidation?: ConsolidationMode, + priority?: Priority, + express?: boolean, + target: QueryTarget + timeout?: TimeDuration, + allowed_destination?: Locality + // + accept_replies?: ReplyKeyExpr +} + +export interface QuerierGetOptions { + encoding?: Encoding, + payload?: IntoZBytes, + attachment?: IntoZBytes, + parameters?: string +} + +/** + * Queryable class used to receive Query's from the network and handle Reply's + * created by Session.declare_queryable + */ +export class Querier { + private _remote_querier: RemoteQuerier; + private _key_expr: KeyExpr; + private _congestion_control: CongestionControl; + private _priority: Priority; + private _accept_replies: ReplyKeyExpr; + private undeclared: boolean; + /** + * @ignore + */ + dispose() { + this.undeclare(); + } + + /** + * Returns a Queryable + * Note! : user must use declare_queryable on a session + */ + constructor( + remote_querier: RemoteQuerier, + key_expr: KeyExpr, + congestion_control: CongestionControl, + priority: Priority, + accept_replies: ReplyKeyExpr, + ) { + this._remote_querier = remote_querier; + this._key_expr = key_expr; + this._congestion_control = congestion_control; + this._priority = priority; + this._accept_replies = accept_replies; + this.undeclared = false; + // TODO: Look at finalization registry + // Queryable.registry.register(this, remote_queryable, this) + } + + /** + * Undeclares Queryable + * @returns void + */ + undeclare() { + this.undeclared = true; + // Finalization registry + // Queryable.registry.unregister(this); + this._remote_querier.undeclare() + } + + /** + * returns key expression for this Querier + * @returns KeyExpr + */ + key_expr() { + return this._key_expr; + } + + /** + * returns Congestion Control for this Querier + * @returns CongestionControl + */ + congestion_control() { + return this._congestion_control; + } + + /** + * returns Priority for this Querier + * @returns Priority + */ + priority() { + return this._priority; + } + + /** + * returns ReplyKeyExpr for this Querier + * @returns ReplyKeyExpr + */ + accept_replies() { + return this._accept_replies; + } + + /** + * Issue a Get request on this querier + * @returns Promise + */ + get(get_options?: QuerierGetOptions): Receiver | undefined { + if (this.undeclared == true) { + return undefined; + } + let _payload; + let _attachment; + let _parameters; + let _encoding = get_options?.encoding?.toString() + + if (get_options?.attachment != undefined) { + _attachment = Array.from(new ZBytes(get_options?.attachment).buffer()) + } + if (get_options?.payload != undefined) { + _payload = Array.from(new ZBytes(get_options?.payload).buffer()) + } + if (get_options?.parameters != undefined) { + _parameters = get_options?.parameters; + } + + let chan: SimpleChannel = this._remote_querier.get( + _encoding, + _parameters, + _attachment, + _payload, + ); + + let receiver = new Receiver(chan); + + return receiver; + // if (callback != undefined) { + // executeAsync(async () => { + // for await (const message of chan) { + // // This horribleness comes from SimpleChannel sending a 0 when the channel is closed + // if (message != undefined && (message as unknown as number) != 0) { + // let reply = new Reply(message); + // if (callback != undefined) { + // callback(reply); + // } + // } else { + // break + // } + // } + // }); + // return undefined; + // } else { + // return receiver; + // } + + } + + +} diff --git a/zenoh-ts/src/remote_api/interface/ControlMsg.ts b/zenoh-ts/src/remote_api/interface/ControlMsg.ts index 773fabd..9a534aa 100644 --- a/zenoh-ts/src/remote_api/interface/ControlMsg.ts +++ b/zenoh-ts/src/remote_api/interface/ControlMsg.ts @@ -4,4 +4,4 @@ import type { HandlerChannel } from "./HandlerChannel"; import type { LivelinessMsg } from "./LivelinessMsg"; import type { OwnedKeyExprWrapper } from "./OwnedKeyExprWrapper"; -export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, } } | { "UndeclareQueryable": string } | { "Liveliness": LivelinessMsg }; +export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg }; diff --git a/zenoh-ts/src/remote_api/querier.ts b/zenoh-ts/src/remote_api/querier.ts new file mode 100644 index 0000000..47bec58 --- /dev/null +++ b/zenoh-ts/src/remote_api/querier.ts @@ -0,0 +1,80 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +import { v4 as uuidv4 } from "uuid"; +import { RemoteSession } from "./session.js"; +import { ControlMsg } from "./interface/ControlMsg.js" +import { SimpleChannel } from "channel-ts"; +import { ReplyWS } from "./interface/ReplyWS.js"; +import { encode as b64_str_from_bytes } from "base64-arraybuffer"; + +type UUID = typeof uuidv4 | string; + +export class RemoteQuerier { + private querier_id: UUID; + private session_ref: RemoteSession; + + constructor( + querier_id: UUID, + session_ref: RemoteSession, + ) { + this.querier_id = querier_id; + this.session_ref = session_ref; + } + + undeclare() { + + let control_msg: ControlMsg = { + UndeclareQuerier: this.querier_id as string + }; + + this.session_ref.send_ctrl_message(control_msg); + } + + get( + _encoding?: string, + _parameters?: string, + _attachment?: Array, + _payload?: Array, + ): SimpleChannel { + let get_id = uuidv4(); + let channel: SimpleChannel = new SimpleChannel(); + this.session_ref.get_receiver.set(get_id, channel); + + let payload = undefined; + if (_payload != undefined) { + payload = b64_str_from_bytes(new Uint8Array(_payload)) + } + let attachment = undefined; + if (_attachment != undefined) { + attachment = b64_str_from_bytes(new Uint8Array(_attachment)) + } + + let control_msg: ControlMsg = { + QuerierGet: { + querier_id: this.querier_id as string, + get_id: get_id, + encoding: _encoding, + payload: payload, + attachment: attachment, + } + }; + + this.session_ref.send_ctrl_message(control_msg); + return channel; + } + +} + + diff --git a/zenoh-ts/src/remote_api/session.ts b/zenoh-ts/src/remote_api/session.ts index 8828d54..7906007 100644 --- a/zenoh-ts/src/remote_api/session.ts +++ b/zenoh-ts/src/remote_api/session.ts @@ -32,6 +32,7 @@ import { ReplyWS } from "./interface/ReplyWS.js"; import { QueryableMsg } from "./interface/QueryableMsg.js"; import { QueryReplyWS } from "./interface/QueryReplyWS.js"; import { HandlerChannel } from "./interface/HandlerChannel.js"; +import { RemoteQuerier } from "./querier.js" // ██████ ███████ ███ ███ ██████ ████████ ███████ ███████ ███████ ███████ ███████ ██ ██████ ███ ██ // ██ ██ ██ ████ ████ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ████ ██ @@ -326,6 +327,45 @@ export class RemoteSession { return publisher; } + declare_remote_querier( + key_expr: string, + consolidation?: number, + congestion_control?: number, + priority?: number, + express?: boolean, + target?: number, + allowed_destination?: number, + accept_replies?: number, + timeout_milliseconds?: number, + ): RemoteQuerier { + let timeout = undefined; + if (timeout_milliseconds !== undefined) { + timeout = timeout_milliseconds; + } + + let uuid: string = uuidv4(); + let querier = new RemoteQuerier(uuid, this); + + let control_message: ControlMsg = { + DeclareQuerier: { + id: uuid, + key_expr: key_expr, + congestion_control: congestion_control, + priority: priority, + express: express, + target: target, + timeout: timeout, + accept_replies: accept_replies, + allowed_destination: allowed_destination, + consolidation: consolidation, + }, + }; + + this.send_ctrl_message(control_message); + return querier; + } + + // Liveliness declare_liveliness_token( key_expr: string, diff --git a/zenoh-ts/src/sample.ts b/zenoh-ts/src/sample.ts index 0487d92..8cefd44 100644 --- a/zenoh-ts/src/sample.ts +++ b/zenoh-ts/src/sample.ts @@ -107,6 +107,7 @@ export function congestion_control_to_int( return 0; case CongestionControl.BLOCK: return 1; + // Default is Drop default: return 0; } diff --git a/zenoh-ts/src/session.ts b/zenoh-ts/src/session.ts index c185b60..c08668c 100644 --- a/zenoh-ts/src/session.ts +++ b/zenoh-ts/src/session.ts @@ -55,6 +55,7 @@ import { HandlerChannel } from "./remote_api/interface/HandlerChannel.js"; // External deps import { Duration, TimeDuration } from 'typed-duration' import { SimpleChannel } from "channel-ts"; +import { locality_to_int, Querier, QuerierOptions, query_target_to_int, reply_key_expr_to_int, ReplyKeyExpr } from "./querier.js"; function executeAsync(func: any) { setTimeout(func, 0); @@ -210,7 +211,6 @@ export class Session { let _priority; let _express; let _attachment; - let _encoding = put_opts?.encoding?.toString() let _congestion_control = congestion_control_to_int(put_opts?.congestion_control); @@ -558,8 +558,75 @@ export class Session { ); return publisher; } + + /** + * Declares a Querier + * + * @param {IntoKeyExpr} keyexpr - string of key_expression + * @param {QuerierOptions} publisher_opts - Optional, set of options to be used when declaring a publisher + * @returns Publisher + */ + declare_querier( + into_keyexpr: IntoKeyExpr, + querier_opts: QuerierOptions, + ): Querier { + const key_expr = new KeyExpr(into_keyexpr); + + // Optional Parameters + let _priority; + let priority = Priority.DATA; + if (querier_opts?.priority != null) { + _priority = priority_to_int(querier_opts?.priority); + priority = querier_opts?.priority; + } + + let _congestion_control; + let congestion_control = CongestionControl.DROP; + if (querier_opts?.congestion_control != null) { + _congestion_control = congestion_control_to_int(querier_opts?.congestion_control); + congestion_control = querier_opts?.congestion_control; + } + + let _accept_replies; + let accept_replies = ReplyKeyExpr.Any; + if (querier_opts?.accept_replies != null) { + _accept_replies = reply_key_expr_to_int(querier_opts?.accept_replies); + accept_replies = querier_opts?.accept_replies; + } + + let _consolidation = consolidation_mode_to_int(querier_opts?.consolidation); + let _target = query_target_to_int(querier_opts?.target); + let _allowed_destination = locality_to_int(querier_opts?.allowed_destination); + let _express = querier_opts?.express; + let _timeout_millis: number | undefined = undefined; + + if (querier_opts?.timeout !== undefined) { + _timeout_millis = Duration.milliseconds.from(querier_opts?.timeout); + } + + let remote_querier = this.remote_session.declare_remote_querier( + key_expr.toString(), + _consolidation, + _congestion_control, + _priority, + _express, + _target, + _allowed_destination, + _accept_replies, + _timeout_millis, + ); + + return new Querier( + remote_querier, + key_expr, + congestion_control, + priority, + accept_replies, + ); + } } + function isGetChannelClose(msg: any): msg is GetChannelClose { return msg === GetChannelClose.Disconnected; } @@ -590,7 +657,7 @@ export class Receiver { /** * @ignore */ - private constructor(receiver: SimpleChannel) { + constructor(receiver: SimpleChannel) { this.receiver = receiver; }