Skip to content

Commit

Permalink
add querier Class, RemoteQuerier, Locality, ReplyKeyExpr QuerierOptio…
Browse files Browse the repository at this point in the history
…ns, QuerierGetOptions
  • Loading branch information
Charles-Schleich committed Dec 9, 2024
1 parent c6cdc1d commit 6a3480f
Show file tree
Hide file tree
Showing 8 changed files with 447 additions and 4 deletions.
3 changes: 3 additions & 0 deletions zenoh-plugin-remote-api/src/handle_control_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ pub(crate) async fn handle_control_message(
priority,
consolidation,
allowed_destination,
express
} => {
let mut querier_builder = state_map.session.declare_querier(key_expr);
let timeout = timeout.map(|millis| Duration::from_millis(millis));
Expand All @@ -357,6 +358,8 @@ pub(crate) async fn handle_control_message(
add_if_some!(priority, querier_builder);
add_if_some!(consolidation, querier_builder);
add_if_some!(allowed_destination, querier_builder);
add_if_some!(express, querier_builder);

let querier = querier_builder.await?;
state_map.queriers.insert(id, querier);
}
Expand Down
4 changes: 3 additions & 1 deletion zenoh-plugin-remote-api/src/interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ pub enum ControlMsg {
)]
#[ts(type = "number | undefined")]
target: Option<QueryTarget>,
//
#[ts(type = "number | undefined")]
timeout: Option<u64>,
#[serde(
deserialize_with = "deserialize_reply_key_expr",
Expand Down Expand Up @@ -307,6 +307,8 @@ pub enum ControlMsg {
)]
#[ts(type = "number | undefined")]
consolidation: Option<ConsolidationMode>,
#[ts(type = "boolean | undefined")]
express: Option<bool>,
},
UndDeclareQuerier(Uuid),
// Querier
Expand Down
258 changes: 258 additions & 0 deletions zenoh-ts/src/querier.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

// External
import { SimpleChannel } from "channel-ts";
// Remote API
import { ReplyWS } from "./remote_api/interface/ReplyWS.js";
// API
import { IntoZBytes, ZBytes } from "./z_bytes.js";
import { CongestionControl, ConsolidationMode, Priority, } from "./sample.js";
import { TimeDuration } from "typed-duration";
import { RemoteQuerier } from "./remote_api/querier.js";
import { KeyExpr } from "./key_expr.js";
import { Encoding } from "crypto";
import { Receiver } from "./session.js";

export enum QueryTarget {
/// Let Zenoh find the BestMatching queryable capabale of serving the query.
BestMatching,
/// Deliver the query to all queryables matching the query's key expression.
All,
/// Deliver the query to all queryables matching the query's key expression that are declared as complete.
AllComplete,
}

/**
* Convenience function to convert between QueryTarget and int
* @internal
*/
export function query_target_to_int(query_target?: QueryTarget): number {
switch (query_target) {
case QueryTarget.BestMatching:
return 1;
case QueryTarget.All:
return 2;
case QueryTarget.AllComplete:
return 3;
default:
// Default is QueryTarget.BestMatching
return 1;
}
}

export enum Locality {
SessionLocal,
Remote,
Any,
}

/**
* Convenience function to convert between Locality and int
* @internal
*/
export function locality_to_int(query_target?: Locality): number {
switch (query_target) {
case Locality.SessionLocal:
return 1;
case Locality.Remote:
return 2;
case Locality.Any:
return 3;
default:
// Default is Locality.Any
return 3;
}
}

export enum ReplyKeyExpr {
/// Accept replies whose key expressions may not match the query key expression.
Any,
/// Accept replies whose key expressions match the query key expression.
MatchingQuery,
}

/**
* Convenience function to convert between QueryTarget function and int
* @internal
*/
export function reply_key_expr_to_int(query_target?: ReplyKeyExpr): number {
switch (query_target) {
case ReplyKeyExpr.Any:
return 1;
case ReplyKeyExpr.MatchingQuery:
return 2;
default:
// Default is ReplyKeyExpr.MatchingQuery
return 2;
}
}


export interface QuerierOptions {
congestion_control?: CongestionControl,
consolidation: ConsolidationMode,
priority?: Priority,
express?: boolean,
target: QueryTarget
timeout?: TimeDuration,
allowed_destination: Locality
//
accept_replies?: ReplyKeyExpr
}

export interface QuerierGetOptions {
encoding?: Encoding,
payload?: IntoZBytes,
attachment?: IntoZBytes,
parameters?: string
}

/**
* Queryable class used to receive Query's from the network and handle Reply's
* created by Session.declare_queryable
*/
export class Querier {
private _remote_querier: RemoteQuerier;
private _key_expr: KeyExpr;
private _congestion_control: CongestionControl;
private _priority: Priority;
private _accept_replies: ReplyKeyExpr;
private undeclared: boolean;
/**
* @ignore
*/
dispose() {
this.undeclare();
}

/**
* Returns a Queryable
* Note! : user must use declare_queryable on a session
*/
constructor(
remote_querier: RemoteQuerier,
key_expr: KeyExpr,
congestion_control: CongestionControl,
priority: Priority,
accept_replies: ReplyKeyExpr,
) {
this._remote_querier = remote_querier;
this._key_expr = key_expr;
this._congestion_control = congestion_control;
this._priority = priority;
this._accept_replies = accept_replies;
this.undeclared = false;
// TODO: Look at finalization registry
// Queryable.registry.register(this, remote_queryable, this)
}

/**
* Undeclares Queryable
* @returns void
*/
undeclare() {
this.undeclared = true;
// Finalization registry
// Queryable.registry.unregister(this);
}

/**
* returns key expression for this Querier
* @returns KeyExpr
*/
key_expr() {
return this._key_expr;
}

/**
* returns Congestion Control for this Querier
* @returns CongestionControl
*/
congestion_control() {
return this._congestion_control;
}

/**
* returns Priority for this Querier
* @returns Priority
*/
priority() {
return this._priority;
}

/**
* returns ReplyKeyExpr for this Querier
* @returns ReplyKeyExpr
*/
accept_replies() {
return this._accept_replies;
}

/**
* Issue a Get request on this querier
* @returns Promise <Receiever | void>
*/
get(get_options: QuerierGetOptions): Receiver | undefined {
if (this.undeclared == true) {
return undefined;
}
let _payload;
let _attachment;
let _parameters;
let _encoding = get_options?.encoding?.toString()

if (get_options?.attachment != undefined) {
_attachment = Array.from(new ZBytes(get_options?.attachment).buffer())
}
if (get_options?.payload != undefined) {
_payload = Array.from(new ZBytes(get_options?.payload).buffer())
}
if (get_options?.parameters != undefined) {
_parameters = get_options?.parameters;
}

let chan: SimpleChannel<ReplyWS> = this._remote_querier.get(
_encoding,
_parameters,
_attachment,
_payload,
);

let receiver = new Receiver(chan);

return receiver;
// if (callback != undefined) {
// executeAsync(async () => {
// for await (const message of chan) {
// // This horribleness comes from SimpleChannel sending a 0 when the channel is closed
// if (message != undefined && (message as unknown as number) != 0) {
// let reply = new Reply(message);
// if (callback != undefined) {
// callback(reply);
// }
// } else {
// break
// }
// }
// });
// return undefined;
// } else {
// return receiver;
// }

}


}
2 changes: 1 addition & 1 deletion zenoh-ts/src/remote_api/interface/ControlMsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ import type { HandlerChannel } from "./HandlerChannel";
import type { LivelinessMsg } from "./LivelinessMsg";
import type { OwnedKeyExprWrapper } from "./OwnedKeyExprWrapper";

export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: bigint | null, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, } } | { "UndDeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg };
export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndDeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg };
71 changes: 71 additions & 0 deletions zenoh-ts/src/remote_api/querier.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

import { v4 as uuidv4 } from "uuid";
import { RemoteSession } from "./session.js";
import { ControlMsg } from "./interface/ControlMsg.js"
import { SimpleChannel } from "channel-ts";
import { ReplyWS } from "./interface/ReplyWS.js";
import { encode as b64_str_from_bytes } from "base64-arraybuffer";

type UUID = typeof uuidv4 | string;

export class RemoteQuerier {
private querier_id: UUID;
private session_ref: RemoteSession;

constructor(
querier_id: UUID,
session_ref: RemoteSession,
) {
this.querier_id = querier_id;
this.session_ref = session_ref;
}

get(
_encoding?: string,
_parameters?: string,
_attachment?: Array<number>,
_payload?: Array<number>,
): SimpleChannel<ReplyWS> {
let get_id = uuidv4();
let channel: SimpleChannel<ReplyWS> = new SimpleChannel<ReplyWS>();
this.session_ref.get_receiver.set(get_id, channel);

let payload = undefined;
if (_payload != undefined) {
payload = b64_str_from_bytes(new Uint8Array(_payload))
}
let attachment = undefined;
if (_attachment != undefined) {
attachment = b64_str_from_bytes(new Uint8Array(_attachment))
}

let control_msg: ControlMsg = {
QuerierGet: {
querier_id: this.querier_id as string,
get_id: get_id,
encoding: _encoding,
payload: payload,
attachment: attachment,
}
};

this.session_ref.send_ctrl_message(control_msg);
return channel;
}

}


Loading

0 comments on commit 6a3480f

Please sign in to comment.