diff --git a/zenoh-plugin-remote-api/src/handle_control_message.rs b/zenoh-plugin-remote-api/src/handle_control_message.rs index e704507..2e207d5 100644 --- a/zenoh-plugin-remote-api/src/handle_control_message.rs +++ b/zenoh-plugin-remote-api/src/handle_control_message.rs @@ -345,6 +345,7 @@ pub(crate) async fn handle_control_message( priority, consolidation, allowed_destination, + express } => { let mut querier_builder = state_map.session.declare_querier(key_expr); let timeout = timeout.map(|millis| Duration::from_millis(millis)); @@ -357,6 +358,8 @@ pub(crate) async fn handle_control_message( add_if_some!(priority, querier_builder); add_if_some!(consolidation, querier_builder); add_if_some!(allowed_destination, querier_builder); + add_if_some!(express, querier_builder); + let querier = querier_builder.await?; state_map.queriers.insert(id, querier); } diff --git a/zenoh-plugin-remote-api/src/interface/mod.rs b/zenoh-plugin-remote-api/src/interface/mod.rs index 569a4b7..0eb5b6a 100644 --- a/zenoh-plugin-remote-api/src/interface/mod.rs +++ b/zenoh-plugin-remote-api/src/interface/mod.rs @@ -270,7 +270,7 @@ pub enum ControlMsg { )] #[ts(type = "number | undefined")] target: Option, - // + #[ts(type = "number | undefined")] timeout: Option, #[serde( deserialize_with = "deserialize_reply_key_expr", @@ -307,6 +307,8 @@ pub enum ControlMsg { )] #[ts(type = "number | undefined")] consolidation: Option, + #[ts(type = "boolean | undefined")] + express: Option, }, UndDeclareQuerier(Uuid), // Querier diff --git a/zenoh-ts/src/querier.ts b/zenoh-ts/src/querier.ts new file mode 100644 index 0000000..6759ee6 --- /dev/null +++ b/zenoh-ts/src/querier.ts @@ -0,0 +1,258 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +// External +import { SimpleChannel } from "channel-ts"; +// Remote API +import { ReplyWS } from "./remote_api/interface/ReplyWS.js"; +// API +import { IntoZBytes, ZBytes } from "./z_bytes.js"; +import { CongestionControl, ConsolidationMode, Priority, } from "./sample.js"; +import { TimeDuration } from "typed-duration"; +import { RemoteQuerier } from "./remote_api/querier.js"; +import { KeyExpr } from "./key_expr.js"; +import { Encoding } from "crypto"; +import { Receiver } from "./session.js"; + +export enum QueryTarget { + /// Let Zenoh find the BestMatching queryable capabale of serving the query. + BestMatching, + /// Deliver the query to all queryables matching the query's key expression. + All, + /// Deliver the query to all queryables matching the query's key expression that are declared as complete. + AllComplete, +} + +/** + * Convenience function to convert between QueryTarget and int + * @internal + */ +export function query_target_to_int(query_target?: QueryTarget): number { + switch (query_target) { + case QueryTarget.BestMatching: + return 1; + case QueryTarget.All: + return 2; + case QueryTarget.AllComplete: + return 3; + default: + // Default is QueryTarget.BestMatching + return 1; + } +} + +export enum Locality { + SessionLocal, + Remote, + Any, +} + +/** + * Convenience function to convert between Locality and int + * @internal + */ +export function locality_to_int(query_target?: Locality): number { + switch (query_target) { + case Locality.SessionLocal: + return 1; + case Locality.Remote: + return 2; + case Locality.Any: + return 3; + default: + // Default is Locality.Any + return 3; + } +} + +export enum ReplyKeyExpr { + /// Accept replies whose key expressions may not match the query key expression. + Any, + /// Accept replies whose key expressions match the query key expression. + MatchingQuery, +} + +/** + * Convenience function to convert between QueryTarget function and int + * @internal + */ +export function reply_key_expr_to_int(query_target?: ReplyKeyExpr): number { + switch (query_target) { + case ReplyKeyExpr.Any: + return 1; + case ReplyKeyExpr.MatchingQuery: + return 2; + default: + // Default is ReplyKeyExpr.MatchingQuery + return 2; + } +} + + +export interface QuerierOptions { + congestion_control?: CongestionControl, + consolidation: ConsolidationMode, + priority?: Priority, + express?: boolean, + target: QueryTarget + timeout?: TimeDuration, + allowed_destination: Locality + // + accept_replies?: ReplyKeyExpr +} + +export interface QuerierGetOptions { + encoding?: Encoding, + payload?: IntoZBytes, + attachment?: IntoZBytes, + parameters?: string +} + +/** + * Queryable class used to receive Query's from the network and handle Reply's + * created by Session.declare_queryable + */ +export class Querier { + private _remote_querier: RemoteQuerier; + private _key_expr: KeyExpr; + private _congestion_control: CongestionControl; + private _priority: Priority; + private _accept_replies: ReplyKeyExpr; + private undeclared: boolean; + /** + * @ignore + */ + dispose() { + this.undeclare(); + } + + /** + * Returns a Queryable + * Note! : user must use declare_queryable on a session + */ + constructor( + remote_querier: RemoteQuerier, + key_expr: KeyExpr, + congestion_control: CongestionControl, + priority: Priority, + accept_replies: ReplyKeyExpr, + ) { + this._remote_querier = remote_querier; + this._key_expr = key_expr; + this._congestion_control = congestion_control; + this._priority = priority; + this._accept_replies = accept_replies; + this.undeclared = false; + // TODO: Look at finalization registry + // Queryable.registry.register(this, remote_queryable, this) + } + + /** + * Undeclares Queryable + * @returns void + */ + undeclare() { + this.undeclared = true; + // Finalization registry + // Queryable.registry.unregister(this); + } + + /** + * returns key expression for this Querier + * @returns KeyExpr + */ + key_expr() { + return this._key_expr; + } + + /** + * returns Congestion Control for this Querier + * @returns CongestionControl + */ + congestion_control() { + return this._congestion_control; + } + + /** + * returns Priority for this Querier + * @returns Priority + */ + priority() { + return this._priority; + } + + /** + * returns ReplyKeyExpr for this Querier + * @returns ReplyKeyExpr + */ + accept_replies() { + return this._accept_replies; + } + + /** + * Issue a Get request on this querier + * @returns Promise + */ + get(get_options: QuerierGetOptions): Receiver | undefined { + if (this.undeclared == true) { + return undefined; + } + let _payload; + let _attachment; + let _parameters; + let _encoding = get_options?.encoding?.toString() + + if (get_options?.attachment != undefined) { + _attachment = Array.from(new ZBytes(get_options?.attachment).buffer()) + } + if (get_options?.payload != undefined) { + _payload = Array.from(new ZBytes(get_options?.payload).buffer()) + } + if (get_options?.parameters != undefined) { + _parameters = get_options?.parameters; + } + + let chan: SimpleChannel = this._remote_querier.get( + _encoding, + _parameters, + _attachment, + _payload, + ); + + let receiver = new Receiver(chan); + + return receiver; + // if (callback != undefined) { + // executeAsync(async () => { + // for await (const message of chan) { + // // This horribleness comes from SimpleChannel sending a 0 when the channel is closed + // if (message != undefined && (message as unknown as number) != 0) { + // let reply = new Reply(message); + // if (callback != undefined) { + // callback(reply); + // } + // } else { + // break + // } + // } + // }); + // return undefined; + // } else { + // return receiver; + // } + + } + + +} diff --git a/zenoh-ts/src/remote_api/interface/ControlMsg.ts b/zenoh-ts/src/remote_api/interface/ControlMsg.ts index 85e3475..23bba51 100644 --- a/zenoh-ts/src/remote_api/interface/ControlMsg.ts +++ b/zenoh-ts/src/remote_api/interface/ControlMsg.ts @@ -4,4 +4,4 @@ import type { HandlerChannel } from "./HandlerChannel"; import type { LivelinessMsg } from "./LivelinessMsg"; import type { OwnedKeyExprWrapper } from "./OwnedKeyExprWrapper"; -export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: bigint | null, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, } } | { "UndDeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg }; +export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndDeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg }; diff --git a/zenoh-ts/src/remote_api/querier.ts b/zenoh-ts/src/remote_api/querier.ts new file mode 100644 index 0000000..7b985a5 --- /dev/null +++ b/zenoh-ts/src/remote_api/querier.ts @@ -0,0 +1,71 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +import { v4 as uuidv4 } from "uuid"; +import { RemoteSession } from "./session.js"; +import { ControlMsg } from "./interface/ControlMsg.js" +import { SimpleChannel } from "channel-ts"; +import { ReplyWS } from "./interface/ReplyWS.js"; +import { encode as b64_str_from_bytes } from "base64-arraybuffer"; + +type UUID = typeof uuidv4 | string; + +export class RemoteQuerier { + private querier_id: UUID; + private session_ref: RemoteSession; + + constructor( + querier_id: UUID, + session_ref: RemoteSession, + ) { + this.querier_id = querier_id; + this.session_ref = session_ref; + } + + get( + _encoding?: string, + _parameters?: string, + _attachment?: Array, + _payload?: Array, + ): SimpleChannel { + let get_id = uuidv4(); + let channel: SimpleChannel = new SimpleChannel(); + this.session_ref.get_receiver.set(get_id, channel); + + let payload = undefined; + if (_payload != undefined) { + payload = b64_str_from_bytes(new Uint8Array(_payload)) + } + let attachment = undefined; + if (_attachment != undefined) { + attachment = b64_str_from_bytes(new Uint8Array(_attachment)) + } + + let control_msg: ControlMsg = { + QuerierGet: { + querier_id: this.querier_id as string, + get_id: get_id, + encoding: _encoding, + payload: payload, + attachment: attachment, + } + }; + + this.session_ref.send_ctrl_message(control_msg); + return channel; + } + +} + + diff --git a/zenoh-ts/src/remote_api/session.ts b/zenoh-ts/src/remote_api/session.ts index 8828d54..2222ee6 100644 --- a/zenoh-ts/src/remote_api/session.ts +++ b/zenoh-ts/src/remote_api/session.ts @@ -32,6 +32,7 @@ import { ReplyWS } from "./interface/ReplyWS.js"; import { QueryableMsg } from "./interface/QueryableMsg.js"; import { QueryReplyWS } from "./interface/QueryReplyWS.js"; import { HandlerChannel } from "./interface/HandlerChannel.js"; +import { RemoteQuerier } from "./querier.js" // ██████ ███████ ███ ███ ██████ ████████ ███████ ███████ ███████ ███████ ███████ ██ ██████ ███ ██ // ██ ██ ██ ████ ████ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ████ ██ @@ -326,6 +327,46 @@ export class RemoteSession { return publisher; } + declare_remote_querier( + key_expr: string, + consolidation?: number, + congestion_control?: number, + priority?: number, + express?: boolean, + target?: number, + allowed_destination?: number, + accept_replies?: number, + timeout_milliseconds?: number, + ): RemoteQuerier { + let timeout = undefined; + if (timeout_milliseconds !== undefined) { + timeout = timeout_milliseconds; + } + + let uuid: string = uuidv4(); + let querier = new RemoteQuerier(uuid, this); + + let control_message: ControlMsg = { + DeclareQuerier: { + id: uuid, + key_expr: key_expr, + congestion_control: congestion_control, + priority: priority, + express: express, + target: target, + timeout: timeout, + accept_replies: accept_replies, + allowed_destination: allowed_destination, + consolidation: consolidation, + }, + }; + + + this.send_ctrl_message(control_message); + return querier; + } + + // Liveliness declare_liveliness_token( key_expr: string, diff --git a/zenoh-ts/src/sample.ts b/zenoh-ts/src/sample.ts index 0487d92..8cefd44 100644 --- a/zenoh-ts/src/sample.ts +++ b/zenoh-ts/src/sample.ts @@ -107,6 +107,7 @@ export function congestion_control_to_int( return 0; case CongestionControl.BLOCK: return 1; + // Default is Drop default: return 0; } diff --git a/zenoh-ts/src/session.ts b/zenoh-ts/src/session.ts index c185b60..c08668c 100644 --- a/zenoh-ts/src/session.ts +++ b/zenoh-ts/src/session.ts @@ -55,6 +55,7 @@ import { HandlerChannel } from "./remote_api/interface/HandlerChannel.js"; // External deps import { Duration, TimeDuration } from 'typed-duration' import { SimpleChannel } from "channel-ts"; +import { locality_to_int, Querier, QuerierOptions, query_target_to_int, reply_key_expr_to_int, ReplyKeyExpr } from "./querier.js"; function executeAsync(func: any) { setTimeout(func, 0); @@ -210,7 +211,6 @@ export class Session { let _priority; let _express; let _attachment; - let _encoding = put_opts?.encoding?.toString() let _congestion_control = congestion_control_to_int(put_opts?.congestion_control); @@ -558,8 +558,75 @@ export class Session { ); return publisher; } + + /** + * Declares a Querier + * + * @param {IntoKeyExpr} keyexpr - string of key_expression + * @param {QuerierOptions} publisher_opts - Optional, set of options to be used when declaring a publisher + * @returns Publisher + */ + declare_querier( + into_keyexpr: IntoKeyExpr, + querier_opts: QuerierOptions, + ): Querier { + const key_expr = new KeyExpr(into_keyexpr); + + // Optional Parameters + let _priority; + let priority = Priority.DATA; + if (querier_opts?.priority != null) { + _priority = priority_to_int(querier_opts?.priority); + priority = querier_opts?.priority; + } + + let _congestion_control; + let congestion_control = CongestionControl.DROP; + if (querier_opts?.congestion_control != null) { + _congestion_control = congestion_control_to_int(querier_opts?.congestion_control); + congestion_control = querier_opts?.congestion_control; + } + + let _accept_replies; + let accept_replies = ReplyKeyExpr.Any; + if (querier_opts?.accept_replies != null) { + _accept_replies = reply_key_expr_to_int(querier_opts?.accept_replies); + accept_replies = querier_opts?.accept_replies; + } + + let _consolidation = consolidation_mode_to_int(querier_opts?.consolidation); + let _target = query_target_to_int(querier_opts?.target); + let _allowed_destination = locality_to_int(querier_opts?.allowed_destination); + let _express = querier_opts?.express; + let _timeout_millis: number | undefined = undefined; + + if (querier_opts?.timeout !== undefined) { + _timeout_millis = Duration.milliseconds.from(querier_opts?.timeout); + } + + let remote_querier = this.remote_session.declare_remote_querier( + key_expr.toString(), + _consolidation, + _congestion_control, + _priority, + _express, + _target, + _allowed_destination, + _accept_replies, + _timeout_millis, + ); + + return new Querier( + remote_querier, + key_expr, + congestion_control, + priority, + accept_replies, + ); + } } + function isGetChannelClose(msg: any): msg is GetChannelClose { return msg === GetChannelClose.Disconnected; } @@ -590,7 +657,7 @@ export class Receiver { /** * @ignore */ - private constructor(receiver: SimpleChannel) { + constructor(receiver: SimpleChannel) { this.receiver = receiver; }