Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/session info #60

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion zenoh-plugin-remote-api/src/handle_control_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use zenoh::{
use crate::{
interface::{
ControlMsg, DataMsg, HandlerChannel, LivelinessMsg, QueryWS, QueryableMsg, RemoteAPIMsg,
ReplyWS, SampleWS,
ReplyWS, SampleWS, SessionInfo,
},
spawn_future, RemoteState, StateMap,
};
Expand Down Expand Up @@ -63,11 +63,39 @@ pub(crate) async fn handle_control_message(
return Ok(());
}
};

// Handle Control Message
match ctrl_msg {
ControlMsg::OpenSession => {
return Ok(());
}
ControlMsg::SessionInfo => {
let session_info = state_map.session.info();

let zid = session_info.zid().await.to_string();
let z_peers: Vec<String> = session_info
.peers_zid()
.await
.map(|x| x.to_string())
.collect();
let z_routers: Vec<String> = session_info
.routers_zid()
.await
.map(|x| x.to_string())
.collect();

let session_info = SessionInfo {
zid,
z_routers,
z_peers,
};

let remote_api_message = RemoteAPIMsg::Data(DataMsg::SessionInfo(session_info));

if let Err(e) = state_map.websocket_tx.send(remote_api_message) {
error!("Forward Sample Channel error: {e}");
};
}
ControlMsg::CloseSession => {
if let Some(state_map) = state_writer.remove(&sock_addr) {
state_map.cleanup().await;
Expand Down
15 changes: 15 additions & 0 deletions zenoh-plugin-remote-api/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,22 @@ pub enum DataMsg {
Sample(SampleWS, Uuid),
// GetReply
GetReply(ReplyWS),
//
SessionInfo(SessionInfo),

// Bidirectional
Queryable(QueryableMsg),
}

#[derive(TS)]
#[ts(export)]
#[derive(Debug, Serialize, Deserialize)]
pub struct SessionInfo {
pub zid: String,
pub z_routers: Vec<String>,
pub z_peers: Vec<String>,
}

#[derive(TS)]
#[ts(export)]
#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -105,6 +117,9 @@ pub enum ControlMsg {
CloseSession,
Session(Uuid),

//
SessionInfo,

// Session Action Messages
Get {
#[ts(as = "OwnedKeyExprWrapper")]
Expand Down
1 change: 0 additions & 1 deletion zenoh-ts/.npmrc

This file was deleted.

44 changes: 44 additions & 0 deletions zenoh-ts/examples/src/z_info.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

import {
SessionInfo, Config, Session
} from "@eclipse-zenoh/zenoh-ts";

export async function main() {
console.log!("Opening session...");
const session = await Session.open(new Config("ws/127.0.0.1:10000"));

console.log!("Get Info...");
let info: SessionInfo = await session.info();

console.log!("zid: {}", info.zid());

console.log!(
"routers zid: {:?}",
info.routers_zid()
);

console.log!(
"peers zid: {:?}",
info.peers_zid()
);

}

function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

main()
5 changes: 3 additions & 2 deletions zenoh-ts/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@ import { ZBytes, IntoZBytes, deserialize_bool, deserialize_uint, deserialize_int
import { CongestionControl, ConsolidationMode, Priority, Reliability, Sample, SampleKind } from "./sample.js";
import { Publisher, Subscriber, FifoChannel, RingChannel } from "./pubsub.js";
import { IntoSelector, Parameters, IntoParameters, Query, Queryable, Reply, ReplyError, Selector } from "./query.js";
import { Session, RecvErr, Receiver, DeleteOptions, PutOptions, GetOptions, QueryableOptions, PublisherOptions } from "./session.js";
import { Session, RecvErr, Receiver, DeleteOptions, PutOptions, GetOptions, QueryableOptions, PublisherOptions, ZenohId, SessionInfo } from "./session.js";
import { Config } from "./config.js";
import { Encoding, IntoEncoding } from "./encoding.js";
import { Liveliness, LivelinessToken } from "./liveliness.js";
// Re-export duration external library
import { TimeDuration as Duration } from 'typed-duration'


// Exports
export { KeyExpr, IntoKeyExpr };
export { ZBytes, IntoZBytes, deserialize_bool, deserialize_uint, deserialize_int, deserialize_float, deserialize_string };
export { CongestionControl, ConsolidationMode, Priority, Reliability, Sample, SampleKind };
export { Publisher, Subscriber, FifoChannel, RingChannel };
export { IntoSelector, Parameters, IntoParameters, Query, Queryable, Reply, ReplyError, Selector };
export { Session, RecvErr, Receiver, DeleteOptions as DeleteOpts, PutOptions, GetOptions, QueryableOptions, PublisherOptions };
export { Session, RecvErr, Receiver, DeleteOptions as DeleteOpts, PutOptions, GetOptions, QueryableOptions, PublisherOptions, ZenohId, SessionInfo};
export { Config };
export { Encoding, IntoEncoding };
export { Liveliness, LivelinessToken };
Expand Down
2 changes: 1 addition & 1 deletion zenoh-ts/src/remote_api/interface/ControlMsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ import type { HandlerChannel } from "./HandlerChannel";
import type { LivelinessMsg } from "./LivelinessMsg";
import type { OwnedKeyExprWrapper } from "./OwnedKeyExprWrapper";

export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, } } | { "UndeclareQueryable": string } | { "Liveliness": LivelinessMsg };
export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | "SessionInfo" | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, } } | { "UndeclareQueryable": string } | { "Liveliness": LivelinessMsg };
3 changes: 2 additions & 1 deletion zenoh-ts/src/remote_api/interface/DataMsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ import type { B64String } from "./B64String";
import type { QueryableMsg } from "./QueryableMsg";
import type { ReplyWS } from "./ReplyWS";
import type { SampleWS } from "./SampleWS";
import type { SessionInfo } from "./SessionInfo";

export type DataMsg = { "PublisherPut": { id: string, payload: B64String, attachment: B64String | null, encoding: string | null, } } | { "Sample": [SampleWS, string] } | { "GetReply": ReplyWS } | { "Queryable": QueryableMsg };
export type DataMsg = { "PublisherPut": { id: string, payload: B64String, attachment: B64String | null, encoding: string | null, } } | { "Sample": [SampleWS, string] } | { "GetReply": ReplyWS } | { "SessionInfo": SessionInfo } | { "Queryable": QueryableMsg };
3 changes: 3 additions & 0 deletions zenoh-ts/src/remote_api/interface/SessionInfo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.

export type SessionInfo = { zid: string, z_routers: Array<string>, z_peers: Array<string>, };
30 changes: 28 additions & 2 deletions zenoh-ts/src/remote_api/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import { ReplyWS } from "./interface/ReplyWS.js";
import { QueryableMsg } from "./interface/QueryableMsg.js";
import { QueryReplyWS } from "./interface/QueryReplyWS.js";
import { HandlerChannel } from "./interface/HandlerChannel.js";
import { SessionInfo as SessionInfoIface } from "./interface/SessionInfo.js";


// ██████ ███████ ███ ███ ██████ ████████ ███████ ███████ ███████ ███████ ███████ ██ ██████ ███ ██
// ██ ██ ██ ████ ████ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ████ ██
Expand Down Expand Up @@ -59,6 +61,7 @@ export class RemoteSession {
get_receiver: Map<UUIDv4, SimpleChannel<ReplyWS | RemoteRecvErr>>;
liveliness_subscribers: Map<UUIDv4, SimpleChannel<SampleWS>>;
liveliness_get_receiver: Map<UUIDv4, SimpleChannel<ReplyWS>>;
session_info: SessionInfoIface | null;

private constructor(ws: WebSocket, ws_channel: SimpleChannel<JSONMessage>) {
this.ws = ws;
Expand All @@ -69,6 +72,7 @@ export class RemoteSession {
this.get_receiver = new Map<UUIDv4, SimpleChannel<ReplyWS>>();
this.liveliness_subscribers = new Map<UUIDv4, SimpleChannel<SampleWS>>();
this.liveliness_get_receiver = new Map<UUIDv4, SimpleChannel<ReplyWS>>();
this.session_info = null;
}

//
Expand Down Expand Up @@ -135,6 +139,19 @@ export class RemoteSession {
//
// Zenoh Session Functions
//
// Info
async info(): Promise<SessionInfoIface> {
let ctrl_message: ControlMsg = "SessionInfo";
this.session_info = null;
this.send_ctrl_message(ctrl_message);

while (this.session_info === null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this code means that if session info is not arrive immediately, it will wait for 10 seconds until next try?
I believe there is a way to notify asynchronous function exactly at the moment when session_info is updated - by creating and awaiting for promise which callback is called at the moment when session_info is set.
Something like this - is it possible?
https://stackoverflow.com/questions/53900575/how-to-create-an-async-function-that-waits-on-an-event-in-javascript

await sleep(10);
}

return this.session_info;
}

// Put
put(key_expr: string,
payload: Array<number>,
Expand All @@ -151,7 +168,7 @@ export class RemoteSession {
opt_attachment = b64_str_from_bytes(new Uint8Array(attachment))
}

let data_message: ControlMsg = {
let ctrl_message: ControlMsg = {
Put: {
key_expr: owned_keyexpr,
payload: b64_str_from_bytes(new Uint8Array(payload)),
Expand All @@ -162,7 +179,7 @@ export class RemoteSession {
attachment: opt_attachment,
},
};
this.send_ctrl_message(data_message);
this.send_ctrl_message(ctrl_message);
}

// get
Expand Down Expand Up @@ -393,6 +410,7 @@ export class RemoteSession {
return channel;
}


//
// Sending Messages
//
Expand Down Expand Up @@ -503,12 +521,20 @@ export class RemoteSession {
} else {
console.warn("Queryable message Variant not recognized");
}
} else if ("SessionInfo" in data_msg) {

let session_info: SessionInfoIface = data_msg["SessionInfo"];
this.session_info = session_info;

} else {
console.warn("Data Message not recognized Expected Variant", data_msg);
}
}
}




function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
65 changes: 62 additions & 3 deletions zenoh-ts/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import { Config } from "./config.js";
import { Encoding } from "./encoding.js";
import { QueryReplyWS } from "./remote_api/interface/QueryReplyWS.js";
import { HandlerChannel } from "./remote_api/interface/HandlerChannel.js";
import { SessionInfo as SessionInfoIface } from "./remote_api/interface/SessionInfo.js";
// External deps
import { Duration, TimeDuration } from 'typed-duration'
import { SimpleChannel } from "channel-ts";
Expand Down Expand Up @@ -235,6 +236,23 @@ export class Session {
);
}

/**
* Returns the Zenoh SessionInfo Object
*
* @returns SessionInfo
*/
async info(): Promise<SessionInfo> {
let session_info_iface: SessionInfoIface = await this.remote_session.info();

let zid = new ZenohId(session_info_iface.zid);
let z_peers = session_info_iface.z_peers.map(x => new ZenohId(x));
let z_routers = session_info_iface.z_routers.map(x => new ZenohId(x));

let session_info = new SessionInfo(zid, z_peers, z_routers);

return session_info;
}

/**
* Executes a Delete on a session, for a specific key expression KeyExpr
*
Expand Down Expand Up @@ -330,7 +348,7 @@ export class Session {
let [callback, handler_type] = this.check_handler_or_callback<Reply>(handler);

// Optional Parameters

let _consolidation = consolidation_mode_to_int(get_options?.consolidation)
let _encoding = get_options?.encoding?.toString();
let _congestion_control = congestion_control_to_int(get_options?.congestion_control);
Expand Down Expand Up @@ -426,7 +444,7 @@ export class Session {
handler_type,
);
}

let subscriber = Subscriber[NewSubscriber](
remote_subscriber,
callback_subscriber,
Expand All @@ -435,7 +453,7 @@ export class Session {
return subscriber;
}

liveliness() : Liveliness {
liveliness(): Liveliness {
return new Liveliness(this.remote_session)
}

Expand Down Expand Up @@ -636,3 +654,44 @@ export class Receiver {
export function open(config: Config): Promise<Session> {
return Session.open(config);
}

/**
* Struct to expose Info for your Zenoh Session
*/
export class SessionInfo {
private _zid: ZenohId
private _routers: ZenohId[]
private _peers: ZenohId[]

constructor(
zid: ZenohId,
peers: ZenohId[],
routers: ZenohId[],
) {
this._zid = zid;
this._routers = routers;
this._peers = peers;
}

zid(): ZenohId {
return this._zid;
}
routers_zid(): ZenohId[] {
return this._routers;
}
peers_zid(): ZenohId[] {
return this._peers;
}
}

export class ZenohId {
private zid: string

constructor(zid: string) {
this.zid = zid;
}

toString(): string {
return this.zid;
}
}
Loading