Skip to content

Commit

Permalink
Feat/session info (#60)
Browse files Browse the repository at this point in the history
* Added ZenohId Type, TS, added message inteface to Rust plugin

* Add SessionInfo Interface File, WIP add example

* Export SessnionInfo, ZenohId Types, finish implementing translation into SessionInfo List of ZenohId

* fmt
  • Loading branch information
Charles-Schleich authored Dec 27, 2024
1 parent e80302e commit 5a31b35
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 10 deletions.
30 changes: 29 additions & 1 deletion zenoh-plugin-remote-api/src/handle_control_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use zenoh::{
use crate::{
interface::{
B64String, ControlMsg, DataMsg, HandlerChannel, LivelinessMsg, QueryWS, QueryableMsg,
RemoteAPIMsg, ReplyWS, SampleWS,
RemoteAPIMsg, ReplyWS, SampleWS, SessionInfo,
},
spawn_future, RemoteState, StateMap,
};
Expand Down Expand Up @@ -65,11 +65,39 @@ pub(crate) async fn handle_control_message(
return Ok(());
}
};

// Handle Control Message
match ctrl_msg {
ControlMsg::OpenSession => {
return Ok(());
}
ControlMsg::SessionInfo => {
let session_info = state_map.session.info();

let zid = session_info.zid().await.to_string();
let z_peers: Vec<String> = session_info
.peers_zid()
.await
.map(|x| x.to_string())
.collect();
let z_routers: Vec<String> = session_info
.routers_zid()
.await
.map(|x| x.to_string())
.collect();

let session_info = SessionInfo {
zid,
z_routers,
z_peers,
};

let remote_api_message = RemoteAPIMsg::Data(DataMsg::SessionInfo(session_info));

if let Err(e) = state_map.websocket_tx.send(remote_api_message) {
error!("Forward Sample Channel error: {e}");
};
}
ControlMsg::CloseSession => {
if let Some(state_map) = state_writer.remove(&sock_addr) {
state_map.cleanup().await;
Expand Down
15 changes: 15 additions & 0 deletions zenoh-plugin-remote-api/src/interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,22 @@ pub enum DataMsg {
Sample(SampleWS, Uuid),
// GetReply
GetReply(ReplyWS),
//
SessionInfo(SessionInfo),

// Bidirectional
Queryable(QueryableMsg),
}

#[derive(TS)]
#[ts(export)]
#[derive(Debug, Serialize, Deserialize)]
pub struct SessionInfo {
pub zid: String,
pub z_routers: Vec<String>,
pub z_peers: Vec<String>,
}

#[derive(TS)]
#[ts(export)]
#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -114,6 +126,9 @@ pub enum ControlMsg {
CloseSession,
Session(Uuid),

//
SessionInfo,

// Session Action Messages
Get {
#[ts(as = "OwnedKeyExprWrapper")]
Expand Down
1 change: 0 additions & 1 deletion zenoh-ts/.npmrc

This file was deleted.

44 changes: 44 additions & 0 deletions zenoh-ts/examples/deno/src/z_info.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

import {
SessionInfo, Config, Session
} from "@eclipse-zenoh/zenoh-ts";

export async function main() {
console.log!("Opening session...");
const session = await Session.open(new Config("ws/127.0.0.1:10000"));

console.log!("Get Info...");
let info: SessionInfo = await session.info();

console.log!("zid: {}", info.zid());

console.log!(
"routers zid: {:?}",
info.routers_zid()
);

console.log!(
"peers zid: {:?}",
info.peers_zid()
);

}

function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

main()
5 changes: 3 additions & 2 deletions zenoh-ts/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { ZBytes, IntoZBytes, deserialize_bool, deserialize_uint, deserialize_int
import { CongestionControl, ConsolidationMode, Priority, Reliability, Sample, SampleKind } from "./sample.js";
import { Publisher, Subscriber, FifoChannel, RingChannel } from "./pubsub.js";
import { IntoSelector, Parameters, IntoParameters, Query, Queryable, Reply, ReplyError, Selector } from "./query.js";
import { Session, RecvErr, Receiver, DeleteOptions, PutOptions, GetOptions, QueryableOptions, PublisherOptions } from "./session.js";
import { Session, RecvErr, Receiver, DeleteOptions, PutOptions, GetOptions, QueryableOptions, PublisherOptions, ZenohId, SessionInfo } from "./session.js";
import { Config } from "./config.js";
import { Encoding, IntoEncoding } from "./encoding.js";
import { Liveliness, LivelinessToken } from "./liveliness.js";
Expand All @@ -27,13 +27,14 @@ import { Querier, QueryTarget, Locality, ReplyKeyExpr, QuerierOptions, QuerierGe
// Re-export duration external library
import { Duration } from 'typed-duration'


// Exports
export { KeyExpr, IntoKeyExpr };
export { ZBytes, IntoZBytes, deserialize_bool, deserialize_uint, deserialize_int, deserialize_float, deserialize_string };
export { CongestionControl, ConsolidationMode, Priority, Reliability, Sample, SampleKind };
export { Publisher, Subscriber, FifoChannel, RingChannel };
export { IntoSelector, Parameters, IntoParameters, Query, Queryable, Reply, ReplyError, Selector };
export { Session, RecvErr, Receiver, DeleteOptions as DeleteOpts, PutOptions, GetOptions, QueryableOptions, PublisherOptions };
export { Session, RecvErr, Receiver, DeleteOptions as DeleteOpts, PutOptions, GetOptions, QueryableOptions, PublisherOptions, ZenohId, SessionInfo};
export { Config };
export { Encoding, IntoEncoding };
export { Liveliness, LivelinessToken };
Expand Down
2 changes: 1 addition & 1 deletion zenoh-ts/src/remote_api/interface/ControlMsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ import type { HandlerChannel } from "./HandlerChannel.js";
import type { LivelinessMsg } from "./LivelinessMsg.js";
import type { OwnedKeyExprWrapper } from "./OwnedKeyExprWrapper.js";

export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg };
export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | "SessionInfo" | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg };
3 changes: 2 additions & 1 deletion zenoh-ts/src/remote_api/interface/DataMsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ import type { B64String } from "./B64String.js";
import type { QueryableMsg } from "./QueryableMsg.js";
import type { ReplyWS } from "./ReplyWS.js";
import type { SampleWS } from "./SampleWS.js";
import type { SessionInfo } from "./SessionInfo.js";

export type DataMsg = { "PublisherPut": { id: string, payload: B64String, attachment: B64String | null, encoding: string | null, } } | { "Sample": [SampleWS, string] } | { "GetReply": ReplyWS } | { "Queryable": QueryableMsg };
export type DataMsg = { "PublisherPut": { id: string, payload: B64String, attachment: B64String | null, encoding: string | null, } } | { "Sample": [SampleWS, string] } | { "GetReply": ReplyWS } | { "SessionInfo": SessionInfo } | { "Queryable": QueryableMsg };
3 changes: 3 additions & 0 deletions zenoh-ts/src/remote_api/interface/SessionInfo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.

export type SessionInfo = { zid: string, z_routers: Array<string>, z_peers: Array<string>, };
30 changes: 28 additions & 2 deletions zenoh-ts/src/remote_api/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import { ReplyWS } from "./interface/ReplyWS.js";
import { QueryableMsg } from "./interface/QueryableMsg.js";
import { QueryReplyWS } from "./interface/QueryReplyWS.js";
import { HandlerChannel } from "./interface/HandlerChannel.js";
import { SessionInfo as SessionInfoIface } from "./interface/SessionInfo.js";
import { RemoteQuerier } from "./querier.js"


// ██████ ███████ ███ ███ ██████ ████████ ███████ ███████ ███████ ███████ ███████ ██ ██████ ███ ██
// ██ ██ ██ ████ ████ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ████ ██
// ██████ █████ ██ ████ ██ ██ ██ ██ █████ ███████ █████ ███████ ███████ ██ ██ ██ ██ ██ ██
Expand All @@ -60,6 +62,7 @@ export class RemoteSession {
get_receiver: Map<UUIDv4, SimpleChannel<ReplyWS | RemoteRecvErr>>;
liveliness_subscribers: Map<UUIDv4, SimpleChannel<SampleWS>>;
liveliness_get_receiver: Map<UUIDv4, SimpleChannel<ReplyWS>>;
session_info: SessionInfoIface | null;

private constructor(ws: WebSocket, ws_channel: SimpleChannel<JSONMessage>) {
this.ws = ws;
Expand All @@ -70,6 +73,7 @@ export class RemoteSession {
this.get_receiver = new Map<UUIDv4, SimpleChannel<ReplyWS>>();
this.liveliness_subscribers = new Map<UUIDv4, SimpleChannel<SampleWS>>();
this.liveliness_get_receiver = new Map<UUIDv4, SimpleChannel<ReplyWS>>();
this.session_info = null;
}

//
Expand Down Expand Up @@ -136,6 +140,19 @@ export class RemoteSession {
//
// Zenoh Session Functions
//
// Info
async info(): Promise<SessionInfoIface> {
let ctrl_message: ControlMsg = "SessionInfo";
this.session_info = null;
this.send_ctrl_message(ctrl_message);

while (this.session_info === null) {
await sleep(10);
}

return this.session_info;
}

// Put
put(key_expr: string,
payload: Array<number>,
Expand All @@ -152,7 +169,7 @@ export class RemoteSession {
opt_attachment = b64_str_from_bytes(new Uint8Array(attachment))
}

let data_message: ControlMsg = {
let ctrl_message: ControlMsg = {
Put: {
key_expr: owned_keyexpr,
payload: b64_str_from_bytes(new Uint8Array(payload)),
Expand All @@ -163,7 +180,7 @@ export class RemoteSession {
attachment: opt_attachment,
},
};
this.send_ctrl_message(data_message);
this.send_ctrl_message(ctrl_message);
}

// get
Expand Down Expand Up @@ -433,6 +450,7 @@ export class RemoteSession {
return channel;
}


//
// Sending Messages
//
Expand Down Expand Up @@ -543,12 +561,20 @@ export class RemoteSession {
} else {
console.warn("Queryable message Variant not recognized");
}
} else if ("SessionInfo" in data_msg) {

let session_info: SessionInfoIface = data_msg["SessionInfo"];
this.session_info = session_info;

} else {
console.warn("Data Message not recognized Expected Variant", data_msg);
}
}
}




function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
63 changes: 61 additions & 2 deletions zenoh-ts/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import { Config } from "./config.js";
import { Encoding } from "./encoding.js";
import { QueryReplyWS } from "./remote_api/interface/QueryReplyWS.js";
import { HandlerChannel } from "./remote_api/interface/HandlerChannel.js";
import { SessionInfo as SessionInfoIface } from "./remote_api/interface/SessionInfo.js";
// External deps
import { Duration, TimeDuration } from 'typed-duration'
import { SimpleChannel } from "channel-ts";
Expand Down Expand Up @@ -235,6 +236,23 @@ export class Session {
);
}

/**
* Returns the Zenoh SessionInfo Object
*
* @returns SessionInfo
*/
async info(): Promise<SessionInfo> {
let session_info_iface: SessionInfoIface = await this.remote_session.info();

let zid = new ZenohId(session_info_iface.zid);
let z_peers = session_info_iface.z_peers.map(x => new ZenohId(x));
let z_routers = session_info_iface.z_routers.map(x => new ZenohId(x));

let session_info = new SessionInfo(zid, z_peers, z_routers);

return session_info;
}

/**
* Executes a Delete on a session, for a specific key expression KeyExpr
*
Expand Down Expand Up @@ -330,7 +348,7 @@ export class Session {
let [callback, handler_type] = this.check_handler_or_callback<Reply>(handler);

// Optional Parameters

let _consolidation = consolidation_mode_to_int(get_options?.consolidation)
let _encoding = get_options?.encoding?.toString();
let _congestion_control = congestion_control_to_int(get_options?.congestion_control);
Expand Down Expand Up @@ -426,7 +444,7 @@ export class Session {
handler_type,
);
}

let subscriber = Subscriber[NewSubscriber](
remote_subscriber,
callback_subscriber,
Expand Down Expand Up @@ -708,3 +726,44 @@ export class Receiver {
export function open(config: Config): Promise<Session> {
return Session.open(config);
}

/**
* Struct to expose Info for your Zenoh Session
*/
export class SessionInfo {
private _zid: ZenohId
private _routers: ZenohId[]
private _peers: ZenohId[]

constructor(
zid: ZenohId,
peers: ZenohId[],
routers: ZenohId[],
) {
this._zid = zid;
this._routers = routers;
this._peers = peers;
}

zid(): ZenohId {
return this._zid;
}
routers_zid(): ZenohId[] {
return this._routers;
}
peers_zid(): ZenohId[] {
return this._peers;
}
}

export class ZenohId {
private zid: string

constructor(zid: string) {
this.zid = zid;
}

toString(): string {
return this.zid;
}
}

0 comments on commit 5a31b35

Please sign in to comment.